Validate CSV output schemas

Also moved code dir to src.
There are reasons to do standard things in standard ways. While it is
possible to get the `code/` directory to work, and recognize it as a
package path, this requires wrangling the pyproject.toml file.
Additionally, any import from the `code.something` path automatically
shadows the python stdlib `code` module. While it may not be necessary,
it still is good to not shadow standard library modules.
This commit is contained in:
Marty Oehme 2025-09-30 22:14:30 +02:00
parent de96b67fac
commit 2faeda87c3
Signed by: Marty
GPG key ID: 4E535BC19C61886E
14 changed files with 111 additions and 7 deletions

1
src/.gitattributes vendored Normal file
View file

@ -0,0 +1 @@
* annex.largefiles=nothing

12
src/README.md Normal file
View file

@ -0,0 +1,12 @@
# Popcorn dataset code
Each script can be run stand-alone like `python src/files.py <input-dir> <output-dir>`,
exchanging the script file for the one intended.
It is suggested, however, to run the scripts using the `just` command runner from the
dataset root, such as `just files` for the same effect as above.
This will automatically populate the correct input and output directories.
To create new `datalad` versioned output data, run `just versioned` or `just` without any arguments.
A new commit containing the updated data will be created,
and an automatic entry in the CHANGELOG made.

41
src/files.py Normal file
View file

@ -0,0 +1,41 @@
import csv
from pathlib import Path
def filesize_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "files.csv"
with output_file.open("w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "filename", "mtime", "filesize"])
for j in input_dir.glob("*.json"):
p_date = j.stem
p_fname = j.name
stat = j.stat()
p_mtime = stat.st_mtime
p_size = stat.st_size
writer.writerow([p_date, p_fname, p_mtime, p_size])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
filesize_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)

57
src/kernels.py Normal file
View file

@ -0,0 +1,57 @@
import csv
import json
from pathlib import Path
from typing import Any, cast
def package_kernel_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "kernels.csv"
with output_file.open("w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "kernel", "downloads"])
for j in input_dir.glob("*.json"):
with open(j) as fr:
date = j.stem
data: dict[str, Any] = {}
try:
data = cast(dict[str, object], json.load(fr))
except json.JSONDecodeError:
print(f"WARN: Could not decode JSON data for file {j}")
continue
if "XuKernel" not in data or not isinstance(data["XuKernel"], dict):
print(
f"WARN: No correct json structure containing 'XuKernel' field in file {j}"
)
continue
for entry in data["XuKernel"]:
p_name = cast(str, entry)
p_count = cast(int, data["XuKernel"][entry])
p_date = date
writer.writerow([p_date, p_name, p_count])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
package_kernel_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)

65
src/packages.py Normal file
View file

@ -0,0 +1,65 @@
import csv
import json
from pathlib import Path
from typing import cast
def packages_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "packages.csv"
with output_file.open("w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "package", "version", "count"])
for j in input_dir.glob("*.json"):
with open(j) as fr:
date = j.stem
data: dict[str, object] = {}
try:
data = json.load(fr)
except json.JSONDecodeError:
print(f"WARN: Could not decode JSON data for file {j}")
continue
if "Versions" not in data or not isinstance(data["Versions"], dict):
print(
f"WARN: No correct json structure containing 'Versions' field in file {j}"
)
continue
data_versions = cast(dict[str, dict[str, int]], data["Versions"])
for package_name, package_versions in data_versions.items():
if not isinstance(package_versions, dict):
print(
f"WARN: No correct json version structure containing versions in the Version field in file {j}"
)
continue
for version, count in package_versions.items():
p_name = package_name
p_version = version
v_count = count
p_date = date
writer.writerow([p_date, p_name, p_version, v_count])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
packages_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)

0
src/tests/__init__.py Normal file
View file

View file

@ -0,0 +1,10 @@
import dataframely as dy
import polars as pl
class DateSchema(dy.Schema):
date: dy.Date = dy.Date(nullable=False)
@dy.rule()
def minimum_starting_date() -> pl.Expr:
return pl.col("date") > pl.date(2018, 5, 8)

View file

@ -0,0 +1,24 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class FilesSchema(DateSchema):
filename: dy.String = dy.String(nullable=False)
mtime: dy.Float = dy.Float(nullable=False)
filesize: dy.Integer = dy.Integer(nullable=False)
def test_files_schema():
_ = FilesSchema.validate(
pl.scan_csv(
"output/files.csv",
schema={
"date": pl.Date,
"filename": pl.String,
"mtime": pl.Float32,
"filesize": pl.UInt32,
},
).collect(engine="streaming")
)

View file

@ -0,0 +1,22 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class KernelsSchema(DateSchema):
kernel: dy.String = dy.String(nullable=False)
downloads: dy.Integer = dy.Integer(nullable=False)
def test_kernels_schema():
_ = KernelsSchema.validate(
pl.scan_csv(
"output/kernels.csv",
schema={
"date": pl.Date,
"kernel": pl.String,
"downloads": pl.UInt32,
},
).collect(engine="streaming")
)

View file

@ -0,0 +1,24 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class PackagesSchema(DateSchema):
package: dy.String = dy.String(nullable=False)
version: dy.String = dy.String(nullable=False)
count: dy.Integer = dy.Integer(nullable=False)
def test_packages_schema():
_ = PackagesSchema.validate(
pl.scan_csv(
"output/packages.csv",
schema={
"date": pl.Date,
"package": pl.String,
"version": pl.String,
"count": pl.UInt16,
},
).collect(engine="streaming")
)

View file

@ -0,0 +1,24 @@
import dataframely as dy
import polars as pl
from tests.test_validate_date_col import DateSchema
class UniquesSchema(DateSchema):
unique: dy.Integer = dy.Integer(nullable=False)
@dy.rule()
def cannot_be_zero() -> pl.Expr:
return pl.col("unique") > 0
def test_uniques_schema():
_ = UniquesSchema.validate(
pl.scan_csv(
"output/unique_installs.csv",
schema={
"date": pl.Date,
"unique": pl.UInt16,
},
).collect(engine="streaming")
)

56
src/unique.py Normal file
View file

@ -0,0 +1,56 @@
import csv
import json
from pathlib import Path
def unique_install_csv(input_dir: Path, output_dir: Path) -> None:
output_file = output_dir / "unique_installs.csv"
with open(output_file, "w") as fw:
writer = csv.writer(fw)
writer.writerow(["date", "unique"])
for j in input_dir.glob("*.json"):
with open(j) as fr:
date = j.stem
data: dict[str, object] = {}
try:
data = json.load(fr)
except json.JSONDecodeError:
print(f"WARN: Could not decode JSON data for file {j}")
continue
if "UniqueInstalls" not in data or not isinstance(
data["UniqueInstalls"], int
):
print(
f"WARN: No correct json structure containing 'UniqueInstalls' field in file {j}"
)
continue
p_date = date
p_count = data["UniqueInstalls"]
writer.writerow([p_date, p_count])
def ensure_dirs(input_dir: Path, output_dir: Path):
if not input_dir.is_dir():
raise ValueError
output_dir.mkdir(exist_ok=True, parents=True)
def main(input: str, output: str) -> None:
input_dir = Path(input)
output_dir = Path(output)
ensure_dirs(input_dir, output_dir)
unique_install_csv(input_dir, output_dir)
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print("Please provide exactly one input directory and one output directory.")
sys.exit(1)
inp = sys.argv[1]
out = sys.argv[2]
main(inp, out)