Compare commits

...

4 commits

View file

@ -1,13 +1,15 @@
import csv
import json
import sys
from dataclasses import dataclass
from datetime import datetime
from datetime import date, datetime
from pathlib import Path
from time import sleep
from typing import Annotated, Any
import requests
import typer
from rich import print
BASE_URL = "https://www.nightjet.com"
BASE_DIR = "out"
@ -26,7 +28,7 @@ def dprint(txt) -> None:
print(f"{datetime.now()}: {txt}")
def request_init_token(endpoint: str = "/nj-booking-ocp/init/start") -> str:
def request_start(endpoint: str = "/nj-booking-ocp/init/start") -> dict:
headers = {
"Referer": "https://www.nightjet.com",
"Content-Type": "application/json",
@ -35,8 +37,11 @@ def request_init_token(endpoint: str = "/nj-booking-ocp/init/start") -> str:
resp_json = requests.post(
f"{BASE_URL}{endpoint}", data=json.dumps(body), headers=headers
).json()
token = resp_json["token"]
return resp_json
def get_init_token(endpoint: str = "/nj-booking-ocp/init/start") -> str:
token = request_start(endpoint)["token"]
dprint(f"Received init token: {token}")
return token
@ -266,21 +271,35 @@ def notify_user(previous: Price, new: Price, channel: str) -> None:
)
def query(start_station: int, end_station: int, travel_date: datetime, traveller_birthdate: datetime) -> list[Price]:
token = request_init_token()
def query(
start_station: int,
end_station: int,
travel_date: datetime,
traveller_birthdate: datetime,
) -> list[Price]:
token = get_init_token()
connections = request_connections(token, start_station, end_station, travel_date)
booking_requests = connection_data_to_booking_requests(connections, traveller_birthdate=traveller_birthdate)
booking_requests = connection_data_to_booking_requests(
connections, traveller_birthdate=traveller_birthdate
)
bookings = [request_bookings(token, req) for req in booking_requests]
prices = extract_prices(bookings)
return prices
def query_max_booking_date(endpoint: str = "/nj-booking-ocp/init/start") -> date | None:
latest_date = request_start().get("maxBookableDate")
if not latest_date:
return None
return date.fromisoformat(latest_date)
## CLI
app = typer.Typer()
@app.command()
@app.command("query")
def main(
travel_date: Annotated[
str, typer.Argument(help="Travel day to search from. (YYYY-MM-DD)")
@ -291,7 +310,10 @@ def main(
end_station: int = typer.Option(
END_STATION, help="Destination station number. (default: Paris Est)"
),
birthdate: str = typer.Option(TRAVELLER_BIRTHDATE.strftime("%Y-%m-%d"), help="Traveller birthdate, may be important for discounts. (YYYY-MM-DD)"),
birthdate: str = typer.Option(
TRAVELLER_BIRTHDATE.strftime("%Y-%m-%d"),
help="Traveller birthdate, may be important for discounts. (YYYY-MM-DD)",
),
notification_channel: str = typer.Option(
NOTIFICATION_CHANNEL, help="ntfy channel to inform user on."
),
@ -316,7 +338,23 @@ def main(
dump_price_snapshot: bool = typer.Option(
True, help="Dump _all_ queried prices into a timestamped csv file."
),
latest_bookable_date: bool = typer.Option(
False, help="Check for latest currently possible booking date only."
),
):
"""Check the (lowest) prices for a single connection.
Will run repeatedly or one-shot and query the prices for a specific connection.
Can be set to output all available prices or just output the lowest found.
Outputs will (curently) always be given as csv files.
"""
if latest_bookable_date:
latest_date = query_max_booking_date()
if not latest_date:
dprint("Could not determine max bookable date.")
dprint(f"Latest currently bookable date: {latest_date}")
sys.exit(0)
base_output_directory.mkdir(exist_ok=True, parents=True)
lowest_prices_path = base_output_directory.joinpath(lowest_prices_filename)
price_snapshot_path = base_output_directory.joinpath(price_snapshot_pattern)
@ -330,7 +368,10 @@ def main(
while True:
prices = query(
start_station=start_station, end_station=end_station, travel_date=travel_date_obj, traveller_birthdate=birth_date_obj,
start_station=start_station,
end_station=end_station,
travel_date=travel_date_obj,
traveller_birthdate=birth_date_obj,
)
# create a snapshot of all current prices
@ -362,7 +403,41 @@ def main(
if not monitor_mode:
break
dprint(
f"Query complete. Monitoring mode active, sleeping for {monitor_frequency} seconds..."
f"Checked for connection prices. Monitoring mode active, sleeping for {monitor_frequency} seconds..."
)
sleep(monitor_frequency)
@app.command("lastdate")
def latestbookable_command(
notification_channel: str = typer.Option(
NOTIFICATION_CHANNEL, help="ntfy channel to inform user on."
),
monitor_mode: bool = typer.Option(
True,
help="Run queries repeatedly over time. If False only runs a single query (oneshot mode).",
),
monitor_frequency: int = typer.Option(
MONITOR_FREQUENCY,
help="How often to run price queries if in monitoring mode, in seconds.",
),
):
"""Check for the currently latest possible booking date.
Will run repeatedly or one-shot and simply query for the date that is
currently the _last_ day which can be selected for any booking query.
"""
while True:
latest_date = query_max_booking_date()
if not latest_date:
dprint("Could not determine max bookable date.")
dprint(f"Latest currently bookable date: {latest_date}")
if not monitor_mode:
break
dprint(
f"Checked for latest bookable date. Monitoring mode active, sleeping for {monitor_frequency} seconds..."
)
sleep(monitor_frequency)