Move code to code dir

Follow yoda
This commit is contained in:
Marty Oehme 2025-10-09 15:20:32 +02:00
parent e18669065e
commit 1084d78c3e
Signed by: Marty
GPG key ID: 4E535BC19C61886E
15 changed files with 12 additions and 5 deletions

1
code/.gitattributes vendored Normal file
View file

@ -0,0 +1 @@
* annex.largefiles=nothing

12
code/README.md Normal file
View file

@ -0,0 +1,12 @@
# Popcorn dataset code
Each script can be run stand-alone like `python src/files.py <input-dir> <output-dir>`,
exchanging the script file for the one intended.
It is suggested, however, to run the scripts using the `just` command runner from the
dataset root, such as `just files` for the same effect as above.
This will automatically populate the correct input and output directories.
To create new `datalad` versioned output data, run `just versioned` or `just` without any arguments.
A new commit containing the updated data will be created,
and an automatic entry in the CHANGELOG made.

41
code/files.py Normal file
View file

@ -0,0 +1,41 @@
import csv
from pathlib import Path
def filesize_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "files.csv"
with output_file.open("w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "filename", "mtime", "filesize"])
for j in input_dir.glob("*.json"):
p_date = j.stem
p_fname = j.name
stat = j.stat()
p_mtime = stat.st_mtime
p_size = stat.st_size
writer.writerow([p_date, p_fname, p_mtime, p_size])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
filesize_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)

266
code/get_raw.py Normal file
View file

@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
Download daily JSON files from a web server, skipping existing files.
"""
import asyncio
import logging
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import cast
import aiofiles
import aiohttp
from aiohttp import ClientSession, ClientTimeout
from yarl import URL
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class FileDownloader:
def __init__(
self,
base_url: str,
local_dir: Path,
timeout: int = 30,
max_concurrent: int = 5,
overwrite: bool = False,
delay_between: float = 0.0,
):
self.base_url: URL = URL(base_url)
self.local_dir: Path = Path(local_dir)
self.local_dir.mkdir(parents=True, exist_ok=True)
self.timeout: ClientTimeout = ClientTimeout(total=timeout)
self.max_concurrent: int = max_concurrent
self.overwrite: bool = overwrite
self.delay_between: float = delay_between
# Semaphore to limit concurrent downloads
self.semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrent)
def _get_remote_filename(self, date: date) -> str:
"""Generate remote filename based on date."""
return f"popcorn_{date.strftime('%Y-%m-%d')}.json"
def _get_local_filename(self, date: date) -> str:
"""Generate local filename based on date."""
return f"{date.strftime('%Y-%m-%d')}.json"
def _file_exists(self, date: date) -> bool:
"""Check if file already exists locally."""
local_path = self.local_dir / self._get_local_filename(date)
return local_path.exists() and local_path.stat().st_size > 0
def _get_date_range(self, start_date: date, end_date: date) -> list[date]:
"""Generate list of dates to download."""
dates: list[date] = []
current_date = start_date
while current_date <= end_date:
dates.append(current_date)
current_date += timedelta(days=1)
return dates
async def _download_file(
self,
session: ClientSession,
date: date,
) -> Exception | None:
"""Download a single file with error handling."""
async with self.semaphore:
remote_filename = self._get_remote_filename(date)
local_filename = self._get_local_filename(date)
remote_url = self.base_url / remote_filename
local_path = self.local_dir / local_filename
# Skip if file exists and overwrite is False
if self._file_exists(date) and not self.overwrite:
logger.info(f"Skipping {local_filename} (already exists)")
return None
try:
logger.info(f"Downloading {remote_filename}...")
async with session.get(remote_url, timeout=self.timeout) as response:
response.raise_for_status()
content = await response.read()
# Write file atomically
temp_path = local_path.with_suffix(".tmp")
async with aiofiles.open(temp_path, "wb") as f:
_ = await f.write(content)
# Atomic move
_ = temp_path.replace(local_path)
logger.info(f"Downloaded {local_filename} ({len(content):,} bytes)")
if self.delay_between > 0:
await asyncio.sleep(self.delay_between)
return None
except asyncio.TimeoutError:
error = Exception(f"Timeout downloading {remote_filename}")
logger.error(str(error))
return error
except aiohttp.ClientResponseError as e:
# we skip 404 errors since some files are simply missing
if e.status == 404:
logger.warning(f"File not found: {remote_filename}")
return None
else:
logger.error(
f"HTTP {e.status} downloading {remote_filename}: {e.message}"
)
return e
except Exception as e:
logger.error(f"Error downloading {remote_filename}: {e}")
return e
async def download_files(
self,
start_date: date,
end_date: date,
) -> list[Exception]:
"""Download files for date range, returning any errors that occurred."""
dates = self._get_date_range(start_date, end_date)
logger.info(f"Processing {len(dates)} files from {start_date} to {end_date}")
errors: list[Exception] = []
async with ClientSession() as session:
tasks: list[asyncio.Task[Exception | None]] = []
for date in dates:
task = asyncio.create_task(self._download_file(session, date))
tasks.append(task)
# Wait for all downloads to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Collect errors
for result in results:
if isinstance(result, Exception):
errors.append(result)
if errors:
logger.warning(f"Completed with {len(errors)} errors")
else:
logger.info("All downloads completed successfully")
return errors
def parse_cli_args() -> tuple[str, Path, date, date, bool, int, float]:
"""Parse command line arguments."""
import argparse
parser = argparse.ArgumentParser(
description="Download daily JSON files from web server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
_ = parser.add_argument(
"start_date",
help="Start date [YYYY-MM-DD]",
type=lambda d: date.fromisoformat(d),
)
_ = parser.add_argument(
"end_date",
nargs="?",
help="End date [YYYY-MM-DD]",
type=lambda d: date.fromisoformat(d),
default=date.today(),
)
_ = parser.add_argument(
"--url",
default="https://popcorn.voidlinux.org/",
help="Base URL for files",
)
_ = parser.add_argument(
"--dir",
default="./data",
help="Local directory for files",
type=Path,
)
_ = parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing files",
)
_ = parser.add_argument(
"--concurrent",
type=int,
default=5,
help="Maximum concurrent downloads",
)
_ = parser.add_argument(
"--delay",
type=float,
default=0.0,
help="Delay in seconds between concurrent batches of downloads",
)
args = parser.parse_args()
if not isinstance(args.url, str):
parser.error("Url must be a valid string")
url: str = args.url.rstrip("/") + "/"
args.start_date = cast(date, args.start_date)
args.end_date = cast(date, args.end_date)
if args.start_date > args.end_date:
parser.error("Start date must be before or equal to end date")
return (
url,
Path(args.dir),
args.start_date,
args.end_date,
args.overwrite,
args.concurrent,
args.delay,
)
async def main():
"""Main entry point."""
(
base_url,
local_dir,
start_date,
end_date,
overwrite,
max_concurrent,
delay_between,
) = parse_cli_args()
downloader = FileDownloader(
base_url=base_url,
local_dir=local_dir,
overwrite=overwrite,
max_concurrent=max_concurrent,
delay_between=delay_between
)
errors = await downloader.download_files(start_date, end_date)
if errors:
logger.error(f"Download completed with {len(errors)} errors")
return 1
return 0
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except KeyboardInterrupt:
logger.info("Download interrupted by user")
sys.exit(130)

57
code/kernels.py Normal file
View file

@ -0,0 +1,57 @@
import csv
import json
from pathlib import Path
from typing import Any, cast
def package_kernel_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "kernels.csv"
with output_file.open("w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "kernel", "downloads"])
for j in input_dir.glob("*.json"):
with open(j) as fr:
date = j.stem
data: dict[str, Any] = {}
try:
data = cast(dict[str, object], json.load(fr))
except json.JSONDecodeError:
print(f"WARN: Could not decode JSON data for file {j}")
continue
if "XuKernel" not in data or not isinstance(data["XuKernel"], dict):
print(
f"WARN: No correct json structure containing 'XuKernel' field in file {j}"
)
continue
for entry in data["XuKernel"]:
p_name = cast(str, entry)
p_count = cast(int, data["XuKernel"][entry])
p_date = date
writer.writerow([p_date, p_name, p_count])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
package_kernel_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)

65
code/packages.py Normal file
View file

@ -0,0 +1,65 @@
import csv
import json
from pathlib import Path
from typing import cast
def packages_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "packages.csv"
with output_file.open("w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "package", "version", "count"])
for j in input_dir.glob("*.json"):
with open(j) as fr:
date = j.stem
data: dict[str, object] = {}
try:
data = json.load(fr)
except json.JSONDecodeError:
print(f"WARN: Could not decode JSON data for file {j}")
continue
if "Versions" not in data or not isinstance(data["Versions"], dict):
print(
f"WARN: No correct json structure containing 'Versions' field in file {j}"
)
continue
data_versions = cast(dict[str, dict[str, int]], data["Versions"])
for package_name, package_versions in data_versions.items():
if not isinstance(package_versions, dict):
print(
f"WARN: No correct json version structure containing versions in the Version field in file {j}"
)
continue
for version, count in package_versions.items():
p_name = package_name
p_version = version
v_count = count
p_date = date
writer.writerow([p_date, p_name, p_version, v_count])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
packages_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)

0
code/tests/__init__.py Normal file
View file

View file

@ -0,0 +1,10 @@
import dataframely as dy
import polars as pl
class DateSchema(dy.Schema):
date: dy.Date = dy.Date(nullable=False)
@dy.rule()
def minimum_starting_date() -> pl.Expr:
return pl.col("date") > pl.date(2018, 5, 8)

View file

@ -0,0 +1,24 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class FilesSchema(DateSchema):
filename: dy.String = dy.String(nullable=False)
mtime: dy.Float = dy.Float(nullable=False)
filesize: dy.Integer = dy.Integer(nullable=False)
def test_files_schema():
_ = FilesSchema.validate(
pl.scan_csv(
"output/files.csv",
schema={
"date": pl.Date,
"filename": pl.String,
"mtime": pl.Float32,
"filesize": pl.UInt32,
},
).collect(engine="streaming")
)

View file

@ -0,0 +1,22 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class KernelsSchema(DateSchema):
kernel: dy.String = dy.String(nullable=False)
downloads: dy.Integer = dy.Integer(nullable=False)
def test_kernels_schema():
_ = KernelsSchema.validate(
pl.scan_csv(
"output/kernels.csv",
schema={
"date": pl.Date,
"kernel": pl.String,
"downloads": pl.UInt32,
},
).collect(engine="streaming")
)

View file

@ -0,0 +1,24 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class PackagesSchema(DateSchema):
package: dy.String = dy.String(nullable=False)
version: dy.String = dy.String(nullable=False)
count: dy.Integer = dy.Integer(nullable=False)
def test_packages_schema():
_ = PackagesSchema.validate(
pl.scan_csv(
"output/packages.csv",
schema={
"date": pl.Date,
"package": pl.String,
"version": pl.String,
"count": pl.UInt16,
},
).collect(engine="streaming")
)

View file

@ -0,0 +1,24 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class UniquesSchema(DateSchema):
unique: dy.Integer = dy.Integer(nullable=False)
@dy.rule()
def cannot_be_zero() -> pl.Expr:
return pl.col("unique") > 0
def test_uniques_schema():
_ = UniquesSchema.validate(
pl.scan_csv(
"output/unique_installs.csv",
schema={
"date": pl.Date,
"unique": pl.UInt16,
},
).collect(engine="streaming")
)

56
code/unique.py Normal file
View file

@ -0,0 +1,56 @@
import csv
import json
from pathlib import Path
def unique_install_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "unique_installs.csv"
with open(output_file, "w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "unique"])
for j in input_dir.glob("*.json"):
with open(j) as fr:
date = j.stem
data: dict[str, object] = {}
try:
data = json.load(fr)
except json.JSONDecodeError:
print(f"WARN: Could not decode JSON data for file {j}")
continue
if "UniqueInstalls" not in data or not isinstance(
data["UniqueInstalls"], int
):
print(
f"WARN: No correct json structure containing 'UniqueInstalls' field in file {j}"
)
continue
p_date = date
p_count = data["UniqueInstalls"]
writer.writerow([p_date, p_count])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
unique_install_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)