nightjetter/main.py

343 lines
11 KiB
Python

import csv
import json
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
import requests
import typer
BASE_URL = "https://www.nightjet.com"
BASE_DIR = "out"
CSV_LOWEST_FILE = "lowest.csv"
CSV_ALL_PRICES_PATTERN = "all_prices_%%DATE%%.csv"
NOTIFICATION_CHANNEL = "nightjet-price-notifier"
START_STATION = "8096003" # BerlinHBF
END_STATION = "8796001" # Paris Est
TRAVEL_DATE = "2025-10-14"
def dprint(txt) -> None:
print(f"{datetime.now()}: {txt}")
def request_init_token(endpoint: str = "/nj-booking-ocp/init/start") -> str:
headers = {
"Referer": "https://www.nightjet.com",
"Content-Type": "application/json",
}
body = {"lang": "en"}
resp_json = requests.post(
f"{BASE_URL}{endpoint}", data=json.dumps(body), headers=headers
).json()
token = resp_json["token"]
dprint(f"Received init token: {token}")
return token
def request_connections(
token: str,
start_station: int,
end_station: int,
travel_date: datetime,
endpoint: str = "/nj-booking-ocp/connection",
) -> list[Any]:
uri = f"{BASE_URL}{endpoint}/{start_station}/{end_station}/{travel_date.strftime('%Y-%m-%d')}"
headers = {
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.nightjet.com/en/ticket-buchen/",
"x-token": token,
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:139.0) Gecko/20100101 Firefox/139.0",
}
resp_json = requests.get(
uri,
headers=headers,
).json()
return resp_json["connections"]
TRAVELLER_BIRTHDATE = "2000-07-15" # TODO: randomize a little
def connection_data_to_booking_requests(connections) -> list[dict[str, Any]]:
b_requests = []
for c in connections:
train = c["trains"][0]
dep = train["departure"]["utc"]
req = {
"njFrom": c["from"]["number"], # from station,
"njTo": c["to"]["number"], # to station
"njDep": dep, # departure time,
"maxChanges": 0,
"connections": 1,
"filter": {
"njTrain": train["train"], # train number
"njDeparture": dep, # departure time again
},
"objects": [ # traveller
{"type": "person", "birthDate": TRAVELLER_BIRTHDATE, "cards": []}
],
"relations": [],
"lang": "en",
}
b_requests.append(req)
dprint(
f"Crafted booking request {c['from']['name']} -> {c['to']['name']}: {train['departure']['local']}-{train['arrival']['local']}."
)
return b_requests
def request_bookings(
token: str, booking_req: dict[str, Any], endpoint: str = "/nj-booking-ocp/offer/get"
) -> dict[Any, Any]:
headers = {
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.5",
"Content-Type": "application/json",
"Referer": "https://www.nightjet.com/en/ticket-buchen/",
"Origin": "https://www.nightjet.com",
"x-token": token,
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:139.0) Gecko/20100101 Firefox/139.0",
}
resp_json = requests.post(
f"{BASE_URL}{endpoint}", headers=headers, data=json.dumps(booking_req)
).json()
dprint(
f"Requested prices ({booking_req['njFrom']} -> {booking_req['njTo']} at {booking_req['njDep']})."
)
return resp_json
def json_extract(obj, key):
"""Recursively fetch values from nested JSON."""
arr = []
def extract(obj, arr, key):
"""Recursively search for values of key in JSON tree."""
if isinstance(obj, dict):
for k, v in obj.items():
if isinstance(v, (dict, list)):
extract(v, arr, key)
elif k == key:
arr.append(v)
elif isinstance(obj, list):
for item in obj:
extract(item, arr, key)
return arr
values = extract(obj, arr, key)
return values
@dataclass
class Price:
id: str
name: str
price: float
dt_from: datetime
dt_to: datetime
def extract_prices(bookings_dict: list[dict[Any, Any]]) -> list[Price]:
prices = []
# .result[].connections[].offers[].reservation.reservationSegments[].compartments[].objects
for booking in bookings_dict:
for reservation in booking["result"]:
for connection in reservation["connections"]:
for offer in connection["offers"]:
for reservation in offer["reservation"]["reservationSegments"]:
for compartment in reservation["compartments"]:
id = compartment["externalIdentifier"]
name = compartment["name"]["en"]
# filter undesired compartments
if id in ["sideCorridorCoach_2"]:
continue
# print all compartment identifiers w/ full name
# dprint(f"{id}: {name}")
# only keep those with a price (i.e. bookable?)
if "objects" not in compartment:
continue
price = compartment["objects"][0]["price"]
prices.append(
Price(
id,
name,
price,
dt_from=datetime.strptime(
offer["validityPeriodFrom"],
"%Y-%m-%dT%H:%M:%S.%f%z",
),
dt_to=datetime.strptime(
offer["validityPeriodTo"],
"%Y-%m-%dT%H:%M:%S.%f%z",
),
)
)
return prices
def_time = datetime.fromtimestamp(0.0)
def get_lowest_price(prices: list[Price]) -> Price:
lowest = Price("", "", 10000000.0, def_time, def_time)
for p in prices:
if p.price < lowest.price:
lowest = p
return lowest
def dump_all_prices_to_csv(prices: list[Price], fpath: Path) -> None:
fstr = str(fpath)
fpath_replaced = Path(
fstr.replace("%%DATE%%", str(int(datetime.now().timestamp())))
)
with open(fpath_replaced, "w") as f:
writer = csv.writer(f)
writer.writerow(["id", "price", "ts_from", "ts_to", "name"])
writer.writerows(
[
[
price.id,
price.price,
price.dt_from.timestamp(),
price.dt_to.timestamp(),
price.name,
]
for price in prices
]
)
dprint(f"Dumped current query snapshot into: {fpath_replaced}.")
def add_to_csv(price: Price, file: Path) -> None:
if not file.is_file():
with open(file, "w") as f:
csv.writer(f).writerow(["id", "price", "ts_from", "ts_to", "name"])
with open(file, "a") as f:
csv.writer(f).writerow(
[
price.id,
price.price,
price.dt_from.timestamp(),
price.dt_to.timestamp(),
price.name,
]
)
def get_last_price_from_csv(file: Path) -> Price | None:
if not file.is_file():
return
with open(file) as f:
last = next(reversed(list(csv.reader(f))))
return Price(
id=last[0],
price=float(last[1]),
dt_from=datetime.fromtimestamp(float(last[2])),
dt_to=datetime.fromtimestamp(float(last[3])),
name=last[4],
)
def notify_user(previous: Price, new: Price, channel: str) -> None:
requests.post(
f"https://ntfy.sh/{channel}",
data=f"from {previous.price} -> {new.price} ({new.name}: {new.dt_from.strftime('%Y-%m-%d %H:%M')} - {new.dt_to.strftime('%Y-%m-%d %H:%M')})",
headers={
"Title": f"Nightjet train price went {'down' if new.price < previous.price else 'up'}",
"Priority": "urgent" if new.price < previous.price else "default",
"Tags": "green_circle" if new.price < previous.price else "orange_circle",
},
)
def query(start_station: int, end_station: int, travel_date: datetime) -> list[Price]:
token = request_init_token()
connections = request_connections(token, start_station, end_station, travel_date)
booking_requests = connection_data_to_booking_requests(connections)
bookings = [request_bookings(token, req) for req in booking_requests]
prices = extract_prices(bookings)
return prices
## CLI
app = typer.Typer()
@app.command()
def main(
start_station: int = typer.Option(
START_STATION, help="Departure station number. (default: Berlin Hbf)"
),
end_station: int = typer.Option(
END_STATION, help="Destination station number. (default: Paris Est)"
),
travel_date: str = typer.Option(help="Travel day to search from. (YYYY-MM-DD)"),
notification_channel: str = typer.Option(
NOTIFICATION_CHANNEL, help="ntfy channel to inform user on."
),
base_output_directory: Path = typer.Option(
Path(BASE_DIR), help="Directory in which to output all result files."
),
lowest_prices_filename: str = typer.Option(
CSV_LOWEST_FILE, help="Filename for collecting lowest found prices."
),
price_snapshot_pattern: str = typer.Option(
CSV_ALL_PRICES_PATTERN,
help="Filename pattern for saving all prices of each query. Takes %%DATE%% as pattern to replace with current unix timestamp.",
),
dump_price_snapshot: bool = typer.Option(
True, help="Dump _all_ queried prices into a timestamped csv file."
),
):
base_output_directory.mkdir(exist_ok=True, parents=True)
lowest_prices_path = base_output_directory.joinpath(lowest_prices_filename)
price_snapshot_path = base_output_directory.joinpath(price_snapshot_pattern)
try:
date_obj = datetime.strptime(travel_date, "%Y-%m-%d")
except ValueError:
typer.echo(f"Invalid date format: {travel_date}. Use YYYY-MM-DD", err=True)
raise typer.Exit(1)
prices = query(
start_station=start_station, end_station=end_station, travel_date=date_obj
)
# create a snapshot of all current prices
if dump_price_snapshot:
dump_all_prices_to_csv(prices, price_snapshot_path)
# extract the lowest and the last lowest price
new = get_lowest_price(prices)
previous = get_last_price_from_csv(lowest_prices_path)
# if the price changed, add it to lowest prices
if not previous or new.price != previous.price:
dprint(f"PRICE CHANGE. {previous} -> {new}")
add_to_csv(new, lowest_prices_path)
notify_user(
previous
or Price(
"",
"No previous price",
0.0,
datetime.fromtimestamp(0),
datetime.fromtimestamp(0),
),
new,
notification_channel,
)
if __name__ == "__main__":
app()