Source code for mdapy.pigz

# Copyright (c) 2022-2024, mushroomfire in Beijing Institute of Technology
# This file is from the mdapy project, released under the BSD 3-Clause License.
"""
Functions and classes to speed up compression of files by utilizing
multiple cores on a system.
"""

"""
The original file comes from https://github.com/bguise987/pigz-python/tree/master.
mdapy changes some details to compress the dump to dump.gz.
"""
import os
import sys
import time
import zlib
from multiprocessing.dummy import Pool
from pathlib import Path
from queue import PriorityQueue
from threading import Lock, Thread

CPU_COUNT = os.cpu_count()
DEFAULT_BLOCK_SIZE_KB = 128

# 1 is fastest but worst, 9 is slowest but best
GZIP_COMPRESS_OPTIONS = list(range(1, 9 + 1))
_COMPRESS_LEVEL_BEST = max(GZIP_COMPRESS_OPTIONS)

# FLG bits
FTEXT = 0x1
FHCRC = 0x2
FEXTRA = 0x4
FNAME = 0x8
FCOMMENT = 0x10


[docs] class PigzFile: # pylint: disable=too-many-instance-attributes """Class to implement Pigz functionality in Python""" def __init__( self, compression_target, output_filename=None, compresslevel=_COMPRESS_LEVEL_BEST, blocksize=DEFAULT_BLOCK_SIZE_KB, workers=CPU_COUNT, ): """ Take in a file or directory and gzip using multiple system cores. """ assert isinstance(compression_target, str) assert ( compression_target.split(".")[-1] != "gz" ), "Can not compress the .gz file." assert os.path.exists( compression_target ), f"No such file or directory: {compression_target}" self.compression_target = compression_target self.compression_level = compresslevel self.blocksize = blocksize * 1000 self.workers = workers self.output_file = None if output_filename is None: output_filename = self.compression_target + ".gz" self.output_filename = output_filename # This is how we know if we're done reading, compressing, & writing the file self._last_chunk = -1 self._last_chunk_lock = Lock() # This is calculated as data is written out self.checksum = 0 # This is calculated as data is read in self.input_size = 0 self.chunk_queue = PriorityQueue() # Setup the system threads for compression self.pool = Pool(processes=self.workers) # Setup read thread self.read_thread = Thread(target=self._read_file) # Setup write thread self.write_thread = Thread(target=self._write_file)
[docs] def process_compression_target(self): """ Setup output file. Start read and write threads. Join to write thread. """ self._setup_output_file() # Start the write thread first so it's ready to accept data self.write_thread.start() # Start the read thread self.read_thread.start() # Block until writing is complete # This prevents us from returning prior to the work being done self.write_thread.join()
def _write_output_header(self): """ Write gzip header to file See RFC documentation: http://www.zlib.org/rfc-gzip.html#header-trailer """ self._write_header_id() self._write_header_cm() # We must first figure out if we can write out the filename before writing FLG fname = self._determine_fname(self.compression_target) flags = 0x0 if fname: flags = flags | FNAME self._write_header_flg(flags) self._write_header_mtime() self._write_header_xfl() self._write_header_os() # After this point, content of flags (FLG) determines what (if anything) # we write to header if flags & FNAME: # Write the FNAME self.output_file.write(fname) def _write_header_id(self): """ Write ID (IDentification) ID1, then ID2 to file header These denote the file as being gzip format """ self.output_file.write((0x1F).to_bytes(1, sys.byteorder)) self.output_file.write((0x8B).to_bytes(1, sys.byteorder)) def _write_header_cm(self): """Write the CM (compression method) to file header""" self.output_file.write((8).to_bytes(1, sys.byteorder)) def _write_header_flg(self, flags): """Write FLG (FLaGs)""" self.output_file.write((flags).to_bytes(1, sys.byteorder)) def _write_header_mtime(self): """Write MTIME (Modification time)""" mtime = self._determine_mtime() self.output_file.write((mtime).to_bytes(4, sys.byteorder)) def _write_header_xfl(self): """Write XFL (eXtra FLags)""" extra_flags = self._determine_extra_flags(self.compression_level) self.output_file.write((extra_flags).to_bytes(1, sys.byteorder)) def _write_header_os(self): """Write OS""" os_number = self._determine_operating_system() self.output_file.write((os_number).to_bytes(1, sys.byteorder)) def _setup_output_file(self): """ Setup the output file """ self.output_file = open(self.output_filename, "wb") self._write_output_header() def _determine_mtime(self): """ Determine MTIME to write out in Unix format (seconds since Unix epoch). From http://www.zlib.org/rfc-gzip.html#header-trailer: If the compressed data did not come from a file, MTIME is set to the time at which compression started. MTIME = 0 means no time stamp is available. """ try: return int(os.stat(self.compression_target).st_mtime) except Exception: # pylint: disable=broad-except return int(time.time()) @staticmethod def _determine_extra_flags(compression_level): """ Determine the XFL or eXtra FLags value based on compression level. Note this is copied from the pigz implementation. """ return 2 if compression_level >= 9 else 4 if compression_level == 1 else 0 @staticmethod def _determine_operating_system(): """ Return appropriate number based on OS format. 0 - FAT filesystem (MS-DOS, OS/2, NT/Win32) 1 - Amiga 2 - VMS (or OpenVMS) 3 - Unix 4 - VM/CMS 5 - Atari TOS 6 - HPFS filesystem (OS/2, NT) 7 - Macintosh 8 - Z-System 9 - CP/M 10 - TOPS-20 11 - NTFS filesystem (NT) 12 - QDOS 13 - Acorn RISCOS 255 - unknown """ if sys.platform.startswith(("freebsd", "linux", "aix", "darwin")): return 3 if sys.platform.startswith(("win32")): return 0 return 255 @staticmethod def _determine_fname(input_filename): """ Determine the FNAME (filename) of the source file to the output """ try: # RFC 1952 requires the FNAME field to be Latin-1. Do not # include filenames that cannot be represented that way. fname = Path(input_filename).name if not isinstance(fname, bytes): fname = fname.encode("latin-1") if fname.endswith(b".gz"): fname = fname[:-3] # Terminate with zero byte fname += b"\0" except UnicodeEncodeError: fname = b"" return fname def _read_file(self): """ Read {filename} in {blocksize} chunks. This method is run on the read thread. """ # Initialize this to 0 so our increment sets first chunk to 1 chunk_num = 0 with open(self.compression_target, "rb") as input_file: while True: chunk = input_file.read(self.blocksize) # Break out of the loop if we didn't read anything if not chunk: with self._last_chunk_lock: self._last_chunk = chunk_num break self.input_size += len(chunk) chunk_num += 1 # Apply this chunk to the pool self.pool.apply_async(self._process_chunk, (chunk_num, chunk)) def _process_chunk(self, chunk_num: int, chunk: bytes): """ Overall method to handle the chunk and pass it back to the write thread. This method is run on the pool. """ with self._last_chunk_lock: last_chunk = chunk_num == self._last_chunk compressed_chunk = self._compress_chunk(chunk, last_chunk) self.chunk_queue.put((chunk_num, chunk, compressed_chunk)) def _compress_chunk(self, chunk: bytes, is_last_chunk: bool): """ Compress the chunk. """ compressor = zlib.compressobj( level=self.compression_level, method=zlib.DEFLATED, wbits=-zlib.MAX_WBITS, memLevel=zlib.DEF_MEM_LEVEL, strategy=zlib.Z_DEFAULT_STRATEGY, ) compressed_data = compressor.compress(chunk) if is_last_chunk: compressed_data += compressor.flush(zlib.Z_FINISH) else: compressed_data += compressor.flush(zlib.Z_SYNC_FLUSH) return compressed_data def _write_file(self): """ Write compressed data to disk. Read chunks off of the priority queue. Priority is the chunk number, so we can keep track of which chunk to get next. This is run from the write thread. """ next_chunk_num = 1 while True: if not self.chunk_queue.empty(): chunk_num, chunk, compressed_chunk = self.chunk_queue.get() if chunk_num != next_chunk_num: # If this isn't the next chunk we're looking for, # place it back on the queue and sleep self.chunk_queue.put((chunk_num, chunk, compressed_chunk)) time.sleep(0.5) else: # Calculate running checksum self.calculate_chunk_check(chunk) # Write chunk to file, advance next chunk we're looking for self.output_file.write(compressed_chunk) # If this was the last chunk, # we can break the loop and close the file if chunk_num == self._last_chunk: break next_chunk_num += 1 else: # If the queue is empty, we're likely waiting for data. time.sleep(0.5) # Loop breaks out if we've received the final chunk self.clean_up()
[docs] def calculate_chunk_check(self, chunk: bytes): """ Calculate the check value for the chunk. """ self.checksum = zlib.crc32(chunk, self.checksum)
[docs] def clean_up(self): """ Close the output file. Clean up the processing pool. """ self.write_file_trailer() # Flush internal buffers self.output_file.flush() self.output_file.close() self._close_workers()
[docs] def write_file_trailer(self): """ Write the trailer for the compressed data. """ # Write CRC32 self.output_file.write((self.checksum).to_bytes(4, sys.byteorder)) # Write ISIZE (Input SIZE) # This contains the size of the original (uncompressed) input data modulo 2^32. self.output_file.write( (self.input_size & 0xFFFFFFFF).to_bytes(4, sys.byteorder) )
def _close_workers(self): """ Close compression thread pool. """ self.pool.close() self.pool.join()
[docs] def compress_file( source_file, output_file=None, inplace=False, compresslevel=_COMPRESS_LEVEL_BEST, blocksize=DEFAULT_BLOCK_SIZE_KB, workers=CPU_COUNT, ): """This function provides a interface to compress a file to .gz format parallelly. Args: source_file (str): filename you want to compress. output_file (str, optional): output compressed filename. If not give, mdapy will automatically append a .gz postfix. inplace (bool, optional): whether inplace the original file. Defaults to False. compresslevel (int, optional): 1 is fastest but worst, 9 is slowest but best. Defaults to 9. blocksize (int, optional): blocksize, generally do not need change. Defaults to 128 KB. workers (int, optional): number of threads to be used to do compression. Defaults to all your CPU cores. """ assert isinstance(source_file, str) file_size = os.path.getsize(source_file) / 1024**2 # M if file_size < 4: workers = 1 pigz_file = PigzFile(source_file, output_file, compresslevel, blocksize, workers) pigz_file.process_compression_target() if inplace: try: os.remove(source_file) except Exception: pass
if __name__ == "__main__": start = time.time() compress_file(r"E:/MyPackage/mdapy/test.dump") end = time.time() print(f"Time cost {end-start} s.")