ds-voidlinux-popcorn/code/get_raw.py
2025-10-09 15:39:25 +02:00

266 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
Download daily JSON files from a web server, skipping existing files.
"""
import asyncio
import logging
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import cast
import aiofiles
import aiohttp
from aiohttp import ClientSession, ClientTimeout
from yarl import URL
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class FileDownloader:
def __init__(
self,
base_url: str,
local_dir: Path,
timeout: int = 30,
max_concurrent: int = 5,
overwrite: bool = False,
delay_between: float = 0.0,
):
self.base_url: URL = URL(base_url)
self.local_dir: Path = Path(local_dir)
self.local_dir.mkdir(parents=True, exist_ok=True)
self.timeout: ClientTimeout = ClientTimeout(total=timeout)
self.max_concurrent: int = max_concurrent
self.overwrite: bool = overwrite
self.delay_between: float = delay_between
# Semaphore to limit concurrent downloads
self.semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrent)
def _get_remote_filename(self, date: date) -> str:
"""Generate remote filename based on date."""
return f"popcorn_{date.strftime('%Y-%m-%d')}.json"
def _get_local_filename(self, date: date) -> str:
"""Generate local filename based on date."""
return f"{date.strftime('%Y-%m-%d')}.json"
def _file_exists(self, date: date) -> bool:
"""Check if file already exists locally."""
local_path = self.local_dir / self._get_local_filename(date)
return local_path.exists() and local_path.stat().st_size > 0
def _get_date_range(self, start_date: date, end_date: date) -> list[date]:
"""Generate list of dates to download."""
dates: list[date] = []
current_date = start_date
while current_date <= end_date:
dates.append(current_date)
current_date += timedelta(days=1)
return dates
async def _download_file(
self,
session: ClientSession,
date: date,
) -> Exception | None:
"""Download a single file with error handling."""
async with self.semaphore:
remote_filename = self._get_remote_filename(date)
local_filename = self._get_local_filename(date)
remote_url = self.base_url / remote_filename
local_path = self.local_dir / local_filename
# Skip if file exists and overwrite is False
if self._file_exists(date) and not self.overwrite:
logger.info(f"Skipping {local_filename} (already exists)")
return None
try:
logger.info(f"Downloading {remote_filename}...")
async with session.get(remote_url, timeout=self.timeout) as response:
response.raise_for_status()
content = await response.read()
# Write file atomically
temp_path = local_path.with_suffix(".tmp")
async with aiofiles.open(temp_path, "wb") as f:
_ = await f.write(content)
# Atomic move
_ = temp_path.replace(local_path)
logger.info(f"Downloaded {local_filename} ({len(content):,} bytes)")
if self.delay_between > 0:
await asyncio.sleep(self.delay_between)
return None
except asyncio.TimeoutError:
error = Exception(f"Timeout downloading {remote_filename}")
logger.error(str(error))
return error
except aiohttp.ClientResponseError as e:
# we skip 404 errors since some files are simply missing
if e.status == 404:
logger.warning(f"File not found: {remote_filename}")
return None
else:
logger.error(
f"HTTP {e.status} downloading {remote_filename}: {e.message}"
)
return e
except Exception as e:
logger.error(f"Error downloading {remote_filename}: {e}")
return e
async def download_files(
self,
start_date: date,
end_date: date,
) -> list[Exception]:
"""Download files for date range, returning any errors that occurred."""
dates = self._get_date_range(start_date, end_date)
logger.info(f"Processing {len(dates)} files from {start_date} to {end_date}")
errors: list[Exception] = []
async with ClientSession() as session:
tasks: list[asyncio.Task[Exception | None]] = []
for date in dates:
task = asyncio.create_task(self._download_file(session, date))
tasks.append(task)
# Wait for all downloads to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Collect errors
for result in results:
if isinstance(result, Exception):
errors.append(result)
if errors:
logger.warning(f"Completed with {len(errors)} errors")
else:
logger.info("All downloads completed successfully")
return errors
def parse_cli_args() -> tuple[str, Path, date, date, bool, int, float]:
"""Parse command line arguments."""
import argparse
parser = argparse.ArgumentParser(
description="Download daily JSON files from web server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
_ = parser.add_argument(
"start_date",
help="Start date [YYYY-MM-DD]",
type=lambda d: date.fromisoformat(d),
)
_ = parser.add_argument(
"end_date",
nargs="?",
help="End date [YYYY-MM-DD]",
type=lambda d: date.fromisoformat(d),
default=date.today(),
)
_ = parser.add_argument(
"--url",
default="https://popcorn.voidlinux.org/",
help="Base URL for files",
)
_ = parser.add_argument(
"--dir",
default="./data",
help="Local directory for files",
type=Path,
)
_ = parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing files",
)
_ = parser.add_argument(
"--concurrent",
type=int,
default=5,
help="Maximum concurrent downloads",
)
_ = parser.add_argument(
"--delay",
type=float,
default=0.0,
help="Delay in seconds between concurrent batches of downloads",
)
args = parser.parse_args()
if not isinstance(args.url, str):
parser.error("Url must be a valid string")
url: str = args.url.rstrip("/") + "/"
args.start_date = cast(date, args.start_date)
args.end_date = cast(date, args.end_date)
if args.start_date > args.end_date:
parser.error("Start date must be before or equal to end date")
return (
url,
Path(args.dir),
args.start_date,
args.end_date,
args.overwrite,
args.concurrent,
args.delay,
)
async def main():
"""Main entry point."""
(
base_url,
local_dir,
start_date,
end_date,
overwrite,
max_concurrent,
delay_between,
) = parse_cli_args()
downloader = FileDownloader(
base_url=base_url,
local_dir=local_dir,
overwrite=overwrite,
max_concurrent=max_concurrent,
delay_between=delay_between
)
errors = await downloader.download_files(start_date, end_date)
if errors:
logger.error(f"Download completed with {len(errors)} errors")
return 1
return 0
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except KeyboardInterrupt:
logger.info("Download interrupted by user")
sys.exit(130)