Source code for mdapy.pigz

# Copyright (c) 2022-2026, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.
"""
Parallel gzip compression using multiprocessing.

Simple, fast, and reliable:
- Uses multiprocessing (true parallelism, no GIL)
- Always uses maximum compression (level 9)
- Focuses on: speed for large files, correctness for all files
- Clean code, proper resource management
"""

import os
import sys
import time
import zlib
from multiprocessing import Pool
from pathlib import Path
from queue import PriorityQueue
from threading import Lock, Thread

from mdapy.parallel import get_num_threads


__all__ = ["compress_file"]  # 只导出这个函数给Sphinx


[docs] def compress_file(input_file, output_file=None): """ Compress a file to .gz format using multiprocessing. This function provides parallel gzip compression for faster processing of large files. The number of worker processes is controlled by the ``MDAPY_NUM_THREADS`` environment variable (defaults to ``os.cpu_count()``). Parameters ---------- input_file : str Path to input file to compress. output_file : str, optional Path to output file. If not specified, adds ``.gz`` to input filename. Returns ------- str Path to the created compressed file. Raises ------ FileNotFoundError If input file doesn't exist. ValueError If input file already has ``.gz`` extension. Exception If compression fails for any reason. Notes ----- - Small files (<5 MB) use single-process compression. - Uses 512 KB chunks for optimal parallelism. Examples -------- >>> compress_file("data.txt") 'data.txt.gz' >>> compress_file("input.txt", "output.gz") 'output.gz' """ compressor = _ParallelGzip(input_file, output_file) return compressor.compress()
# ============================================================================ # Internal implementation (not exposed to Sphinx) # ============================================================================ class _ParallelGzip: """ Internal parallel gzip compressor implementation. This class handles the multiprocessing coordination for parallel compression. It's not meant to be used directly - use compress_file() instead. """ # Constants DEFAULT_BLOCKSIZE_KB = 512 SMALL_FILE_THRESHOLD_MB = 5 GZIP_MAGIC = b"\x1f\x8b" GZIP_METHOD_DEFLATE = b"\x08" GZIP_FLAG_FNAME = b"\x08" GZIP_EXTRA_MAX_COMPRESSION = b"\x02" def __init__(self, input_file, output_file=None, blocksize_kb=None): """Initialize compressor with validated parameters.""" # Validate input if not os.path.exists(input_file): raise FileNotFoundError(f"Input file not found: {input_file}") if input_file.endswith(".gz"): raise ValueError("Input file is already .gz") self.input_file = input_file self.output_file = output_file or (input_file + ".gz") self.blocksize = (blocksize_kb or self.DEFAULT_BLOCKSIZE_KB) * 1024 self.workers = get_num_threads() # Optimize for small files file_size_mb = os.path.getsize(input_file) / (1024 * 1024) if file_size_mb < self.SMALL_FILE_THRESHOLD_MB: self.workers = 1 # Internal state self._chunk_queue = PriorityQueue() self._last_chunk = -1 self._last_chunk_lock = Lock() self._input_size = 0 self._crc32 = 0 self._error = None # Resources to cleanup self._output_file = None self._pool = None self._reader_thread = None self._writer_thread = None def compress(self): """ Execute the compression process. Returns: str: Path to compressed file Raises: Exception: If compression fails """ try: self._output_file = open(self.output_file, "wb") self._write_header() # Create process pool self._pool = Pool(processes=self.workers) # Start worker threads self._reader_thread = Thread(target=self._read_and_dispatch, daemon=True) self._writer_thread = Thread(target=self._collect_and_write, daemon=True) self._writer_thread.start() self._reader_thread.start() # Wait for completion self._writer_thread.join() self._reader_thread.join(timeout=5) # Check for errors from worker threads if self._error: raise self._error return self.output_file except Exception as e: # Clean up failed compression attempt self._cleanup() if os.path.exists(self.output_file): try: os.remove(self.output_file) except OSError: pass # Ignore errors during cleanup raise Exception(f"Compression failed: {e}") from e finally: self._cleanup() def _write_header(self): """Write gzip header according to RFC 1952.""" # Magic number + compression method self._output_file.write(self.GZIP_MAGIC) self._output_file.write(self.GZIP_METHOD_DEFLATE) # Flags (include filename) self._output_file.write(self.GZIP_FLAG_FNAME) # Modification time try: mtime = int(os.path.getmtime(self.input_file)) except OSError: mtime = int(time.time()) self._output_file.write(mtime.to_bytes(4, "little")) # Extra flags (max compression) + OS self._output_file.write(self.GZIP_EXTRA_MAX_COMPRESSION) os_flag = 3 if sys.platform != "win32" else 0 self._output_file.write(bytes([os_flag])) # Original filename (optional) self._write_filename() def _write_filename(self): """Write original filename to gzip header.""" try: fname = Path(self.input_file).name.encode("latin-1") if fname.endswith(b".gz"): fname = fname[:-3] self._output_file.write(fname + b"\x00") except (UnicodeEncodeError, OSError): # If filename can't be encoded, skip it pass def _write_trailer(self): """Write gzip trailer (CRC32 and original size).""" self._output_file.write(self._crc32.to_bytes(4, "little")) # Original size modulo 2^32 self._output_file.write((self._input_size & 0xFFFFFFFF).to_bytes(4, "little")) def _read_and_dispatch(self): """Read input file in chunks and dispatch to process pool.""" chunks_to_compress = [] try: # Read all chunks with open(self.input_file, "rb") as f: chunk_num = 0 while True: chunk = f.read(self.blocksize) if not chunk: break chunk_num += 1 self._input_size += len(chunk) chunks_to_compress.append((chunk_num, chunk)) # Mark the last chunk with self._last_chunk_lock: self._last_chunk = len(chunks_to_compress) # Dispatch all chunks to process pool for idx, (chunk_num, chunk) in enumerate(chunks_to_compress): is_last = idx == len(chunks_to_compress) - 1 self._pool.apply_async( _compress_chunk, args=(chunk_num, chunk, is_last), callback=self._chunk_queue.put, error_callback=self._handle_compression_error, ) except (IOError, OSError) as e: self._error = e # Ensure last_chunk is set so writer thread can exit with self._last_chunk_lock: if self._last_chunk < 0: self._last_chunk = 0 def _handle_compression_error(self, error): """Callback for handling errors from worker processes.""" self._error = error def _collect_and_write(self): """Collect compressed chunks in order and write to output.""" next_chunk = 1 try: while True: # Check if we're done with self._last_chunk_lock: if self._last_chunk > 0 and next_chunk > self._last_chunk: break # Wait for data if self._chunk_queue.empty(): time.sleep(0.01) continue # Get next chunk chunk_num, original_data, compressed_data = self._chunk_queue.get() # If not the next chunk in sequence, put it back if chunk_num != next_chunk: self._chunk_queue.put((chunk_num, original_data, compressed_data)) time.sleep(0.01) continue # Write compressed data self._output_file.write(compressed_data) # Update CRC32 with original data self._crc32 = zlib.crc32(original_data, self._crc32) # Check if this was the last chunk with self._last_chunk_lock: if chunk_num == self._last_chunk: break next_chunk += 1 # Write gzip trailer self._write_trailer() except Exception as e: self._error = e def _cleanup(self): """Clean up all resources.""" # Close output file if self._output_file: try: self._output_file.flush() self._output_file.close() except (IOError, OSError): pass # Terminate process pool if self._pool: try: self._pool.close() self._pool.join(timeout=3) self._pool.terminate() except Exception: # Ignore all errors during pool cleanup pass def _compress_chunk(chunk_num, data, is_last=False): """ Compress a single chunk of data. This function runs in a separate process for parallel compression. Args: chunk_num: Chunk number for ordering data: Raw data bytes to compress is_last: Whether this is the last chunk Returns: tuple: (chunk_num, original_data, compressed_data) """ compressor = zlib.compressobj( level=9, # Maximum compression method=zlib.DEFLATED, wbits=-zlib.MAX_WBITS, # Raw deflate memLevel=zlib.DEF_MEM_LEVEL, strategy=zlib.Z_DEFAULT_STRATEGY, ) compressed = compressor.compress(data) # Use appropriate flush mode if is_last: compressed += compressor.flush(zlib.Z_FINISH) else: compressed += compressor.flush(zlib.Z_SYNC_FLUSH) return (chunk_num, data, compressed) if __name__ == "__main__": # Simple test start = time.time() result = compress_file("cu.xyz") end = time.time() print(f"Compressed to: {result}") print(f"Time: {end - start:.2f} seconds")