diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml new file mode 100644 index 0000000..72c76e3 --- /dev/null +++ b/.github/workflows/run-tests.yml @@ -0,0 +1,30 @@ +name: Run Tests + +on: + pull_request: + branches: [ main ] + workflow_dispatch: + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.x' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -e . + + - name: Run tests + run: pytest tests/ diff --git a/setup.py b/setup.py index a6632a4..a1c960f 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='python-pgp-check', - version='0.3.1', + version='0.3.2', packages=find_packages(where='src'), package_dir={'': 'src'}, install_requires=[], diff --git a/src/pgp_check/cli.py b/src/pgp_check/cli.py index 073b44e..5ec0797 100644 --- a/src/pgp_check/cli.py +++ b/src/pgp_check/cli.py @@ -5,9 +5,6 @@ import time import threading import itertools -import os -import multiprocessing -from concurrent.futures import ThreadPoolExecutor from importlib.metadata import version from pathlib import Path from typing import Optional, Tuple @@ -26,83 +23,48 @@ __version__ = version("python-pgp-check") + class HashCalculator: - """Class to handle hash calculation with improved performance and feedback.""" - + """Class to handle hash calculation with consistent hashing method.""" + CHUNK_SIZE = 1024 * 1024 # 1MB chunks for better memory management - - def __init__(self, file_path: str, algorithm: str = 'sha256'): + + def __init__(self, file_path: str, algorithm: str = "sha256"): self.file_path = Path(file_path) self.algorithm = algorithm self.hash_func = getattr(hashlib, algorithm) self._stop_event = threading.Event() self.progress = 0 - - def _calculate_chunk_hash(self, chunk: bytes) -> bytes: - """Calculate hash for a single chunk.""" - return self.hash_func(chunk).digest() - + def calculate_hash(self) -> Tuple[str, float]: """Calculate file hash with progress tracking.""" start_time = time.time() - file_size = self.file_path.stat().st_size - + hasher = self.hash_func() + try: - # For small files, use single-threaded processing - if file_size < 10 * 1024 * 1024: # 10MB threshold - return self._calculate_single_threaded(), time.time() - start_time - - # For large files, use parallel processing - return self._calculate_parallel(), time.time() - start_time - + with open(self.file_path, "rb") as f: + while not self._stop_event.is_set(): + chunk = f.read(self.CHUNK_SIZE) + if not chunk: + break + hasher.update(chunk) + + return hasher.hexdigest(), time.time() - start_time + except IOError as e: raise IOError(f"Error reading file: {e}") - - def _calculate_single_threaded(self) -> str: - """Calculate hash using single thread for small files.""" - hasher = self.hash_func() - - with open(self.file_path, 'rb') as f: - mm = memoryview(bytearray(self.CHUNK_SIZE)) - while not self._stop_event.is_set(): - n = f.readinto(mm) - if not n: - break - hasher.update(mm[:n]) - - return hasher.hexdigest() - - def _calculate_parallel(self) -> str: - """Calculate hash using parallel processing for large files.""" - chunks = [] - final_hasher = self.hash_func() - - with open(self.file_path, 'rb') as f: - while True: - chunk = f.read(self.CHUNK_SIZE) - if not chunk: - break - chunks.append(chunk) - - # Use ThreadPoolExecutor for I/O-bound operations - with ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor: - chunk_hashes = list(executor.map(self._calculate_chunk_hash, chunks)) - - # Combine chunk hashes - for chunk_hash in chunk_hashes: - final_hasher.update(chunk_hash) - - return final_hasher.hexdigest() + def spinner_animation(stop_event: threading.Event): """Display an enhanced spinner animation.""" - spinners = itertools.cycle(['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']) + spinners = itertools.cycle(["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]) while not stop_event.is_set(): sys.stdout.write(next(spinners)) sys.stdout.flush() - sys.stdout.write('\b') + sys.stdout.write("\b") time.sleep(0.1) + def format_time(seconds: float) -> str: """Format time duration in a human-readable format.""" if seconds < 60: @@ -111,59 +73,59 @@ def format_time(seconds: float) -> str: seconds = seconds % 60 return f"{minutes} minutes {seconds:.2f} seconds" + def verify_hash(calculated_hash: str, expected_hash: str) -> bool: """Compare hashes using constant-time comparison.""" if not calculated_hash or not expected_hash: return False - + # Convert to bytes for constant-time comparison return hmac.compare_digest( - calculated_hash.lower().encode(), - expected_hash.lower().encode() + calculated_hash.lower().encode(), expected_hash.lower().encode() ) def main(): parser = argparse.ArgumentParser( - description='Fast and secure file hash calculator and verifier', - formatter_class=argparse.RawDescriptionHelpFormatter + description="Fast and secure file hash calculator and verifier", + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('file_location', help='Path to the file to check') - parser.add_argument('expected_hash', nargs='?', help='Expected hash value for verification') + parser.add_argument("file_location", help="Path to the file to check") + parser.add_argument("expected_hash", nargs="?", help="Expected hash value for verification") parser.add_argument( - '--algorithm', - default='sha256', - choices=['md5', 'sha1', 'sha256', 'sha384', 'sha512'], - help='Hash algorithm to use (default: sha256)' + "--algorithm", + default="sha256", + choices=["md5", "sha1", "sha256", "sha384", "sha512"], + help="Hash algorithm to use (default: sha256)", ) parser.add_argument( - '-v', '--version', - action='version', - version=f'%(prog)s {__version__}', - help='Show the version of the package' + "-v", + "--version", + action="version", + version=f"%(prog)s {__version__}", + help="Show the version of the package", ) - + args = parser.parse_args() try: file_path = Path(args.file_location).expanduser().resolve() - + if not file_path.exists(): print(f"\n{RED_X} {RED}Error: File not found - {file_path}{RESET}") sys.exit(2) - + # Show file info with better formatting file_size = file_path.stat().st_size - file_size_mb = file_size / (1024*1024) - - print(f"{'─'*80}") + file_size_mb = file_size / (1024 * 1024) + print(f"{'─'*80}") print(f"{INFO} File Details:") print(f" • Path: {file_path}") print(f" • Size: {file_size_mb:.2f} MB") print(f" • Algorithm: {args.algorithm.upper()}") print(f"{'─'*80}") - print(f"Calculating hash... ", end='', flush=True) + print(f"Calculating hash... ", end="", flush=True) stop_event = threading.Event() spinner_thread = threading.Thread(target=spinner_animation, args=(stop_event,)) @@ -176,17 +138,16 @@ def main(): finally: stop_event.set() time.sleep(0.1) - sys.stdout.write('\r' + ' ' * 50 + '\r') + sys.stdout.write("\r" + " " * 50 + "\r") sys.stdout.flush() - - print(f"Completed in {format_time(duration)}") + print(f"Completed in {format_time(duration)}") if args.expected_hash: print("Hash Verification:") print(f" Calculated: {YELLOW}{calculated_hash}{RESET}") print(f" Expected: {YELLOW}{args.expected_hash}{RESET}") - + if verify_hash(calculated_hash, args.expected_hash): print(f"{CHECK_MARK} {GREEN}Success: Hashes match!{RESET}") sys.exit(0) @@ -195,7 +156,6 @@ def main(): sys.exit(1) else: print(f"Generated Hash: {YELLOW}{calculated_hash}{RESET}") - except KeyboardInterrupt: print(f"\n{WARNING} {YELLOW}Operation cancelled by user{RESET}") @@ -204,5 +164,6 @@ def main(): print(f"\n{RED_X} {RED}An error occurred: {str(e)}{RESET}") sys.exit(3) -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main()