#!/usr/bin/env python3 """ Download daily JSON files from a web server, skipping existing files. """ import asyncio import logging import sys from datetime import date, datetime, timedelta from pathlib import Path from typing import cast import aiofiles import aiohttp from aiohttp import ClientSession, ClientTimeout from yarl import URL # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) class FileDownloader: def __init__( self, base_url: str, local_dir: Path, timeout: int = 30, max_concurrent: int = 5, overwrite: bool = False, delay_between: float = 0.0, ): self.base_url: URL = URL(base_url) self.local_dir: Path = Path(local_dir) self.local_dir.mkdir(parents=True, exist_ok=True) self.timeout: ClientTimeout = ClientTimeout(total=timeout) self.max_concurrent: int = max_concurrent self.overwrite: bool = overwrite self.delay_between: float = delay_between # Semaphore to limit concurrent downloads self.semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrent) def _get_remote_filename(self, date: date) -> str: """Generate remote filename based on date.""" return f"popcorn_{date.strftime('%Y-%m-%d')}.json" def _get_local_filename(self, date: date) -> str: """Generate local filename based on date.""" return f"{date.strftime('%Y-%m-%d')}.json" def _file_exists(self, date: date) -> bool: """Check if file already exists locally.""" local_path = self.local_dir / self._get_local_filename(date) return local_path.exists() and local_path.stat().st_size > 0 def _get_date_range(self, start_date: date, end_date: date) -> list[date]: """Generate list of dates to download.""" dates: list[date] = [] current_date = start_date while current_date <= end_date: dates.append(current_date) current_date += timedelta(days=1) return dates async def _download_file( self, session: ClientSession, date: date, ) -> Exception | None: """Download a single file with error handling.""" async with self.semaphore: remote_filename = self._get_remote_filename(date) local_filename = self._get_local_filename(date) remote_url = self.base_url / remote_filename local_path = self.local_dir / local_filename # Skip if file exists and overwrite is False if self._file_exists(date) and not self.overwrite: logger.info(f"Skipping {local_filename} (already exists)") return None try: logger.info(f"Downloading {remote_filename}...") async with session.get(remote_url, timeout=self.timeout) as response: response.raise_for_status() content = await response.read() # Write file atomically temp_path = local_path.with_suffix(".tmp") async with aiofiles.open(temp_path, "wb") as f: _ = await f.write(content) # Atomic move _ = temp_path.replace(local_path) logger.info(f"Downloaded {local_filename} ({len(content):,} bytes)") if self.delay_between > 0: await asyncio.sleep(self.delay_between) return None except asyncio.TimeoutError: error = Exception(f"Timeout downloading {remote_filename}") logger.error(str(error)) return error except aiohttp.ClientResponseError as e: # we skip 404 errors since some files are simply missing if e.status == 404: logger.warning(f"File not found: {remote_filename}") return None else: logger.error( f"HTTP {e.status} downloading {remote_filename}: {e.message}" ) return e except Exception as e: logger.error(f"Error downloading {remote_filename}: {e}") return e async def download_files( self, start_date: date, end_date: date, ) -> list[Exception]: """Download files for date range, returning any errors that occurred.""" dates = self._get_date_range(start_date, end_date) logger.info(f"Processing {len(dates)} files from {start_date} to {end_date}") errors: list[Exception] = [] async with ClientSession() as session: tasks: list[asyncio.Task[Exception | None]] = [] for date in dates: task = asyncio.create_task(self._download_file(session, date)) tasks.append(task) # Wait for all downloads to complete results = await asyncio.gather(*tasks, return_exceptions=True) # Collect errors for result in results: if isinstance(result, Exception): errors.append(result) if errors: logger.warning(f"Completed with {len(errors)} errors") else: logger.info("All downloads completed successfully") return errors def parse_cli_args() -> tuple[str, Path, date, date, bool, int, float]: """Parse command line arguments.""" import argparse parser = argparse.ArgumentParser( description="Download daily JSON files from web server", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) _ = parser.add_argument( "start_date", help="Start date [YYYY-MM-DD]", type=lambda d: date.fromisoformat(d), ) _ = parser.add_argument( "end_date", nargs="?", help="End date [YYYY-MM-DD]", type=lambda d: date.fromisoformat(d), default=date.today(), ) _ = parser.add_argument( "--url", default="https://popcorn.voidlinux.org/", help="Base URL for files", ) _ = parser.add_argument( "--dir", default="./data", help="Local directory for files", type=Path, ) _ = parser.add_argument( "--overwrite", action="store_true", help="Overwrite existing files", ) _ = parser.add_argument( "--concurrent", type=int, default=5, help="Maximum concurrent downloads", ) _ = parser.add_argument( "--delay", type=float, default=0.0, help="Delay in seconds between concurrent batches of downloads", ) args = parser.parse_args() if not isinstance(args.url, str): parser.error("Url must be a valid string") url: str = args.url.rstrip("/") + "/" args.start_date = cast(date, args.start_date) args.end_date = cast(date, args.end_date) if args.start_date > args.end_date: parser.error("Start date must be before or equal to end date") return ( url, Path(args.dir), args.start_date, args.end_date, args.overwrite, args.concurrent, args.delay, ) async def main(): """Main entry point.""" ( base_url, local_dir, start_date, end_date, overwrite, max_concurrent, delay_between, ) = parse_cli_args() downloader = FileDownloader( base_url=base_url, local_dir=local_dir, overwrite=overwrite, max_concurrent=max_concurrent, delay_between=delay_between ) errors = await downloader.download_files(start_date, end_date) if errors: logger.error(f"Download completed with {len(errors)} errors") return 1 return 0 if __name__ == "__main__": try: exit_code = asyncio.run(main()) sys.exit(exit_code) except KeyboardInterrupt: logger.info("Download interrupted by user") sys.exit(130)