diff --git a/verbanote_client/cli.py b/verbanote_client/cli.py new file mode 100644 index 0000000..9cb3435 --- /dev/null +++ b/verbanote_client/cli.py @@ -0,0 +1,79 @@ +import quick +import click +import requests +import logging +from pathlib import Path +from rich.console import Console +from network_functions import _upload_to_oxo +from job_functions import print_job_status +from configuration import Config + +# TODO turn all this into config style options or @click-style flags/options +logging.basicConfig(level=logging.INFO) + + +@quick.gui_option() +@click.group() +@click.pass_context +@click.option("--endpoint", "-e", help="URL of runpod serverless endpoint to use.") +@click.option("--token", "-t", help="Access token for runpod instance.") +# TODO @click.version_option() +def cli(ctx, endpoint, token): + """Verbanote + + Transcribes any audio file given using OpenAI's whisper AI + and pyannote for speaker detection. + """ + print(f"Token: {token}") + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {token}", + } + options: Config = Config( + endpoint=endpoint, token=token, headers=headers, console=Console() + ) + ctx.obj = options + + +@cli.command() +@click.pass_obj +@click.argument( + "audiofile", + type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path), +) +def start(config: Config, audiofile: Path) -> None: + """Start processing the given audiofile. + + Queues a job in the processing queue of the AI api. + """ + endpoint_new_job = f"{config.endpoint}/run" + with config.console.status("[bold green]Uploading data..."): + url = _upload_to_oxo(audiofile) + + input_data = {"input": {"url": url}} + config.console.log(f"[green]Requesting new job for[/green] {audiofile}...") + response = requests.post(endpoint_new_job, json=input_data, headers=config.headers) + job_id = response.json()["id"] + config.console.log(f"[green]Job[/green] {job_id} [green]has been queued.[/green]") + print_job_status(config, job_id) + + +@cli.command() +@click.pass_obj +def health(config: Config) -> None: + endpoint_health = f"{config.endpoint}/health" + with config.console.status("[bold green]Requesting health status..."): + resp = requests.get(endpoint_health, headers=config.headers) + json = resp.json() + config.console.print_json(data=json) + + +@cli.command() +@click.pass_obj +@click.argument("job_id") +def job(config: Config, job_id: str) -> None: + print_job_status(config, job_id) + + +def cancel(config: Config, job_id: str) -> None: + ... diff --git a/verbanote_client/configuration.py b/verbanote_client/configuration.py new file mode 100644 index 0000000..e57113c --- /dev/null +++ b/verbanote_client/configuration.py @@ -0,0 +1,12 @@ +from dataclasses import dataclass + +from rich.console import Console + +@dataclass +class Config: + endpoint: str + token: str + headers: dict[str, str] + + console: Console + diff --git a/verbanote_client/job_functions.py b/verbanote_client/job_functions.py new file mode 100644 index 0000000..42b032a --- /dev/null +++ b/verbanote_client/job_functions.py @@ -0,0 +1,76 @@ +import time +import requests +from datetime import timedelta +from math import floor +from rich.table import Table +from rich.live import Live +from configuration import Config + +STATUS_MAPPING = { + "IN_QUEUE": "[yellow]queued[/yellow]", + "IN_PROGRESS": "[blue]running[/blue]", + "CANCELLED": "[orange1]cancelled[/orange1]", + "COMPLETED": "[green]complete[/green]", + "FAILED": "[red]failed[/red]", +} + + +def print_job_status(config: Config, job_id: str, once:bool = False) -> None: + result = _request_job_state(config, job_id) + if not result: + return + + def result_to_values(result:dict)-> dict[str,str]: + return { + "status": STATUS_MAPPING[result["status"]], + "transcription": result.get("transcription_url", "..."), + "diarization": result.get("diarization_url", "..."), + } + values: dict[str, str] = result_to_values(result) + + def rebuild_table(): + table = Table() + table.add_column("Status") + table.add_column("Time running") + table.add_column("Job ID") + table.add_column("Diarization") + table.add_column("Transcription") + table.add_row( + values.get("status", "unknown"), + str(sw_current), + job_id, + values.get("diarization", "..."), + values.get("transcription", "..."), + ) + return table + + sw_start: float = time.time() + sw_current: timedelta = timedelta() + with Live(get_renderable=rebuild_table, refresh_per_second=2): + while not once: + result = _request_job_state(config, job_id, silent=True) + values: dict[str, str] = result_to_values(result) + sw_current = timedelta(seconds=floor(time.time() - sw_start)) + + if result["status"] != "IN_QUEUE" and result["status"] != "IN_PROGRESS": + break + time.sleep(1) + + +def _request_job_state(config: Config, id: str, silent: bool = False) -> dict: + endpoint_health = f"{config.endpoint}/status/{id}" + if silent: + response = requests.get(endpoint_health, headers=config.headers) + else: + with config.console.status( + f"[bold green]Requesting job[/bold green] {id}" + " [bold green]status...[/bold green]" + ): + response = requests.get(endpoint_health, headers=config.headers) + if response.status_code == 404: + config.console.log(f"[red]Job[/red] {id} [red]not found on endpoint.[/red]") + return {} + if not response.ok: + raise requests.exceptions.HTTPError() + return response.json() + diff --git a/verbanote_client/main.py b/verbanote_client/main.py index 6054203..082dcbc 100644 --- a/verbanote_client/main.py +++ b/verbanote_client/main.py @@ -1,170 +1,4 @@ -import time -import requests -import logging -from datetime import timedelta -from math import floor -from pathlib import Path -from dataclasses import dataclass -import click -from rich.console import Console -from rich.table import Table -from rich.live import Live -import quick - -# TODO turn all this into config style options or @click-style flags/options -logging.basicConfig(level=logging.INFO) -console = Console() - - -@dataclass -class Config: - endpoint: str - token: str - headers: dict[str, str] - - -@quick.gui_option() -@click.group() -@click.pass_context -@click.option("--endpoint", "-e", help="URL of runpod serverless endpoint to use.") -@click.option("--token", "-t", help="Access token for runpod instance.") -# TODO @click.version_option() -def cli(ctx, endpoint, token): - """Verbanote - - Transcribes any audio file given using OpenAI's whisper AI - and pyannote for speaker detection. - """ - print(f"Token: {token}") - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - } - options: Config = Config(endpoint=endpoint, token=token, headers=headers) - ctx.obj = options - - -@cli.command() -@click.pass_obj -@click.argument( - "audiofile", - type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path), -) -def start(config: Config, audiofile: Path) -> None: - """Start processing the given audiofile. - - Queues a job in the processing queue of the AI api. - """ - endpoint_new_job = f"{config.endpoint}/run" - with console.status("[bold green]Uploading data..."): - url = _upload_to_oxo(audiofile) - - input_data = {"input": {"url": url}} - console.log(f"[green]Requesting new job for[/green] {audiofile}...") - response = requests.post(endpoint_new_job, json=input_data, headers=config.headers) - job_id = response.json()["id"] - console.log(f"[green]Job[/green] {job_id} [green]has been queued.[/green]") - print_job_status(config, job_id) - - -@cli.command() -@click.pass_obj -def health(config: Config) -> None: - endpoint_health = f"{config.endpoint}/health" - with console.status("[bold green]Requesting health status..."): - resp = requests.get(endpoint_health, headers=config.headers) - json = resp.json() - console.print_json(data=json) - - -@cli.command() -@click.pass_obj -@click.argument("job_id") -def job(config: Config, job_id: str) -> None: - print_job_status(config, job_id) - - -def cancel(config: Config, job_id: str) -> None: - ... - - -STATUS_MAPPING = { - "IN_QUEUE": "[yellow]queued[/yellow]", - "IN_PROGRESS": "[blue]running[/blue]", - "CANCELLED": "[orange1]cancelled[/orange1]", - "COMPLETED": "[green]complete[/green]", - "FAILED": "[red]failed[/red]", -} - - -def print_job_status(config: Config, job_id: str) -> None: - result = _request_job_state(config, job_id) - if not result: - return - - values: dict[str, str] = {} - sw_start: float = time.time() - sw_current: timedelta = timedelta() - def rebuild_table(): - table = Table() - table.add_column("Status") - table.add_column("Time running") - table.add_column("Job ID") - table.add_column("Diarization") - table.add_column("Transcription") - table.add_row( - values.get("status", "unknown"), - str(sw_current), - job_id, - values.get("diarization", "..."), - values.get("transcription", "..."), - ) - return table - - with Live(get_renderable=rebuild_table, refresh_per_second=1): - while True: - result = _request_job_state(config, job_id, silent=True) - sw_current = timedelta(seconds=floor(time.time() - sw_start)) - values: dict[str, str] = { - "status": STATUS_MAPPING[result["status"]], - "transcription": result.get("transcription_url", "..."), - "diarization": result.get("diarization_url", "..."), - } - - if result["status"] != "IN_QUEUE" and result["status"] != "IN_PROGRESS": - break - time.sleep(1) - - -def _request_job_state(config: Config, id: str, silent: bool = False) -> dict: - endpoint_health = f"{config.endpoint}/status/{id}" - if silent: - response = requests.get(endpoint_health, headers=config.headers) - else: - with console.status( - f"[bold green]Requesting job[/bold green] {id}" - " [bold green]status...[/bold green]" - ): - response = requests.get(endpoint_health, headers=config.headers) - if response.status_code == 404: - console.log(f"[red]Job[/red] {id} [red]not found on endpoint.[/red]") - return {} - if not response.ok: - raise requests.exceptions.HTTPError() - return response.json() - - -# TODO switch server component to be able to use S3 storage options -def _upload_to_oxo(file: Path, url: str = "https://0x0.st", expires: int = 2) -> str: - resp = requests.post( - url=url, - files={"file": open(file, "rb"), "expires": str(expires)}, - ) - if not resp.ok: - raise requests.exceptions.HTTPError() - console.log(f"Uploaded file {file} to {str(resp.content)}") - return str(resp.content) - +from cli import cli # def main(args: list[str]) -> None: # if args[1] == "status": diff --git a/verbanote_client/network_functions.py b/verbanote_client/network_functions.py new file mode 100644 index 0000000..29b56d3 --- /dev/null +++ b/verbanote_client/network_functions.py @@ -0,0 +1,16 @@ +import requests +from pathlib import Path +from rich.console import Console + +console = Console() +# TODO switch server component to be able to use S3 storage options +def _upload_to_oxo(file: Path, url: str = "https://0x0.st", expires: int = 2) -> str: + resp = requests.post( + url=url, + files={"file": open(file, "rb"), "expires": str(expires)}, + ) + if not resp.ok: + raise requests.exceptions.HTTPError() + console.log(f"Uploaded file {file} to {str(resp.content)}") + return str(resp.content).strip() +