Compare commits

...

10 commits

8 changed files with 3792 additions and 3598 deletions

5
.dockerignore Normal file
View file

@ -0,0 +1,5 @@
out/
.git/
.jj/
.gitignore

31
Dockerfile Normal file
View file

@ -0,0 +1,31 @@
FROM python:3.13-slim AS base
# FROM ghcr.io/astral-sh/uv:0.8-debian-slim AS builder
#
# WORKDIR /app
#
# COPY pyproject.toml README.md uv.lock ./
#
# RUN uv sync --frozen
#
# COPY . /app
FROM base AS builder
COPY --from=ghcr.io/astral-sh/uv:0.8 /uv /bin/uv
ENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copy
WORKDIR /app
COPY uv.lock pyproject.toml /app/
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-install-project --no-dev
COPY . /app
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev
# TODO: Run as non-root user (but careful with output directory ownership)
FROM base AS runtime
WORKDIR /app
ENV PATH="/app/.venv/bin:$PATH"
COPY --from=builder /app /app
ENTRYPOINT ["nightjet"]

3602
README.md

File diff suppressed because it is too large Load diff

3549
docs/API.md Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,13 +1,21 @@
[project]
name = "nj-api"
name = "nightjetter"
version = "0.1.0"
description = "Add your description here"
description = "Continuously monitors nightjet travel dates for price fluctuations"
readme = "README.md"
authors = [{ name = "Marty Oehme", email = "contact@martyoeh.me" }]
requires-python = ">=3.13"
dependencies = [
"requests>=2.32.4",
"typer>=0.16.0",
]
dependencies = ["requests>=2.32.4", "typer>=0.16.0"]
[project.scripts]
nightjet = "nj:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.pyright]
typeCheckingMode = "basic"
[tool.hatch.build.targets.wheel]
packages = ["src/nj"]

5
src/nj/__init__.py Normal file
View file

@ -0,0 +1,5 @@
from nj import main as cli
def main() -> None:
cli.app()

View file

@ -3,20 +3,23 @@ import json
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
from time import sleep
from typing import Annotated, Any
import requests
import typer
BASE_URL = "https://www.nightjet.com"
BASE_DIR = "out"
CSV_LOWEST_FILE = f"{BASE_DIR}/lowest.csv"
CSV_ALL_PRICES_PATTERN = f"{BASE_DIR}/%%DATE%%_all_prices.csv"
CSV_LOWEST_FILE = "lowest.csv"
CSV_ALL_PRICES_PATTERN = "all_prices_%%DATE%%.csv"
NOTIFICATION_CHANNEL = "nightjet-price-notifier"
START_STATION = "8096003" # BerlinHBF
END_STATION = "8796001" # Paris Est
TRAVEL_DATE = "2025-10-14"
TRAVELLER_BIRTHDATE = datetime(1990, 1, 1) # TODO: could randomize a little
MONITOR_FREQUENCY = 3600
def dprint(txt) -> None:
@ -61,10 +64,9 @@ def request_connections(
return resp_json["connections"]
TRAVELLER_BIRTHDATE = "2000-07-15" # TODO: randomize a little
def connection_data_to_booking_requests(connections) -> list[dict[str, Any]]:
def connection_data_to_booking_requests(
connections, traveller_birthdate: datetime = TRAVELLER_BIRTHDATE
) -> list[dict[str, Any]]:
b_requests = []
for c in connections:
train = c["trains"][0]
@ -80,7 +82,11 @@ def connection_data_to_booking_requests(connections) -> list[dict[str, Any]]:
"njDeparture": dep, # departure time again
},
"objects": [ # traveller
{"type": "person", "birthDate": TRAVELLER_BIRTHDATE, "cards": []}
{
"type": "person",
"birthDate": traveller_birthdate.strftime("%Y-%m-%d"),
"cards": [],
}
],
"relations": [],
"lang": "en",
@ -193,22 +199,35 @@ def get_lowest_price(prices: list[Price]) -> Price:
return lowest
def dump_all_prices_to_csv(prices: list[Price]) -> None:
fname = CSV_ALL_PRICES_PATTERN.replace(
"%%DATE%%", str(int(datetime.now().timestamp()))
def dump_all_prices_to_csv(prices: list[Price], fpath: Path) -> None:
fstr = str(fpath)
fpath_replaced = Path(
fstr.replace("%%DATE%%", str(int(datetime.now().timestamp())))
)
with open(fname, "w") as f:
with open(fpath_replaced, "w") as f:
writer = csv.writer(f)
writer.writerow(["id", "price", "name"])
writer.writerows([[price.id, price.price, price.name] for price in prices])
writer.writerow(["id", "price", "ts_from", "ts_to", "name"])
writer.writerows(
[
[
price.id,
price.price,
price.dt_from.timestamp(),
price.dt_to.timestamp(),
price.name,
]
for price in prices
]
)
dprint(f"Dumped current query snapshot into: {fpath_replaced}.")
def add_to_csv(price: Price) -> None:
if not Path(CSV_LOWEST_FILE).is_file():
with open(CSV_LOWEST_FILE, "w") as f:
def add_to_csv(price: Price, file: Path) -> None:
if not file.is_file():
with open(file, "w") as f:
csv.writer(f).writerow(["id", "price", "ts_from", "ts_to", "name"])
with open(CSV_LOWEST_FILE, "a") as f:
with open(file, "a") as f:
csv.writer(f).writerow(
[
price.id,
@ -220,11 +239,11 @@ def add_to_csv(price: Price) -> None:
)
def get_last_price_from_csv() -> Price | None:
if not Path(CSV_LOWEST_FILE).is_file():
def get_last_price_from_csv(file: Path) -> Price | None:
if not file.is_file():
return
with open(CSV_LOWEST_FILE) as f:
with open(file) as f:
last = next(reversed(list(csv.reader(f))))
return Price(
id=last[0],
@ -247,40 +266,14 @@ def notify_user(previous: Price, new: Price, channel: str) -> None:
)
def main(start_station: int, end_station: int, travel_date: datetime):
Path(BASE_DIR).mkdir(exist_ok=True, parents=True)
print(start_station, end_station, travel_date)
# return
def query(start_station: int, end_station: int, travel_date: datetime, traveller_birthdate: datetime) -> list[Price]:
token = request_init_token()
connections = request_connections(token, start_station, end_station, travel_date)
booking_requests = connection_data_to_booking_requests(connections)
booking_requests = connection_data_to_booking_requests(connections, traveller_birthdate=traveller_birthdate)
bookings = [request_bookings(token, req) for req in booking_requests]
prices = extract_prices(bookings)
# create a snapshot of all current prices
dump_all_prices_to_csv(prices)
# extract the lowest and the last lowest price
new = get_lowest_price(prices)
previous = get_last_price_from_csv()
# if the price changed, add it to lowest prices
if not previous or new.price != previous.price:
dprint(f"PRICE CHANGE. {previous} -> {new}")
notify_user(
previous
or Price(
"",
"No previous price",
0.0,
datetime.fromtimestamp(0),
datetime.fromtimestamp(0),
),
new,
NOTIFICATION_CHANNEL,
)
add_to_csv(new)
return prices
## CLI
@ -288,21 +281,90 @@ app = typer.Typer()
@app.command()
def search(
def main(
travel_date: Annotated[
str, typer.Argument(help="Travel day to search from. (YYYY-MM-DD)")
],
start_station: int = typer.Option(
START_STATION, help="Departure station number. (default: Berlin Hbf)"
),
end_station: int = typer.Option(
END_STATION, help="Destination station number. (default: Paris Est)"
),
travel_date: str = typer.Option(help="Travel day to search from. (YYYY-MM-DD)"),
birthdate: str = typer.Option(TRAVELLER_BIRTHDATE.strftime("%Y-%m-%d"), help="Traveller birthdate, may be important for discounts. (YYYY-MM-DD)"),
notification_channel: str = typer.Option(
NOTIFICATION_CHANNEL, help="ntfy channel to inform user on."
),
monitor_mode: bool = typer.Option(
True,
help="Run queries repeatedly over time. If False only runs a single query (oneshot mode).",
),
monitor_frequency: int = typer.Option(
MONITOR_FREQUENCY,
help="How often to run price queries if in monitoring mode, in seconds.",
),
base_output_directory: Path = typer.Option(
Path(BASE_DIR), help="Directory in which to output all result files."
),
lowest_prices_filename: str = typer.Option(
CSV_LOWEST_FILE, help="Filename for collecting lowest found prices."
),
price_snapshot_pattern: str = typer.Option(
CSV_ALL_PRICES_PATTERN,
help="Filename pattern for saving all prices of each query. Takes %%DATE%% as pattern to replace with current unix timestamp.",
),
dump_price_snapshot: bool = typer.Option(
True, help="Dump _all_ queried prices into a timestamped csv file."
),
):
base_output_directory.mkdir(exist_ok=True, parents=True)
lowest_prices_path = base_output_directory.joinpath(lowest_prices_filename)
price_snapshot_path = base_output_directory.joinpath(price_snapshot_pattern)
try:
date_obj = datetime.strptime(travel_date, "%Y-%m-%d")
travel_date_obj = datetime.strptime(travel_date, "%Y-%m-%d")
birth_date_obj = datetime.strptime(birthdate, "%Y-%m-%d")
except ValueError:
typer.echo(f"Invalid date format: {travel_date}. Use YYYY-MM-DD", err=True)
typer.echo("Invalid date format. Use YYYY-MM-DD", err=True)
raise typer.Exit(1)
main(start_station=start_station, end_station=end_station, travel_date=date_obj)
while True:
prices = query(
start_station=start_station, end_station=end_station, travel_date=travel_date_obj, traveller_birthdate=birth_date_obj,
)
# create a snapshot of all current prices
if dump_price_snapshot:
dump_all_prices_to_csv(prices, price_snapshot_path)
# extract the lowest and the last lowest price
new = get_lowest_price(prices)
previous = get_last_price_from_csv(lowest_prices_path)
# if the price changed, add it to lowest prices
if not previous or new.price != previous.price:
dprint(f"PRICE CHANGE. {previous} -> {new}")
add_to_csv(new, lowest_prices_path)
notify_user(
previous
or Price(
"",
"No previous price",
0.0,
datetime.fromtimestamp(0),
datetime.fromtimestamp(0),
),
new,
notification_channel,
)
# oneshot exit
if not monitor_mode:
break
dprint(
f"Query complete. Monitoring mode active, sleeping for {monitor_frequency} seconds..."
)
sleep(monitor_frequency)
if __name__ == "__main__":

4
uv.lock generated
View file

@ -94,9 +94,9 @@ wheels = [
]
[[package]]
name = "nj-api"
name = "nightjetter"
version = "0.1.0"
source = { virtual = "." }
source = { editable = "." }
dependencies = [
{ name = "requests" },
{ name = "typer" },