80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
|
import quick
|
||
|
import click
|
||
|
import requests
|
||
|
import logging
|
||
|
from pathlib import Path
|
||
|
from rich.console import Console
|
||
|
from network_functions import _upload_to_oxo
|
||
|
from job_functions import print_job_status
|
||
|
from configuration import Config
|
||
|
|
||
|
# TODO turn all this into config style options or @click-style flags/options
|
||
|
logging.basicConfig(level=logging.INFO)
|
||
|
|
||
|
|
||
|
@quick.gui_option()
|
||
|
@click.group()
|
||
|
@click.pass_context
|
||
|
@click.option("--endpoint", "-e", help="URL of runpod serverless endpoint to use.")
|
||
|
@click.option("--token", "-t", help="Access token for runpod instance.")
|
||
|
# TODO @click.version_option()
|
||
|
def cli(ctx, endpoint, token):
|
||
|
"""Verbanote
|
||
|
|
||
|
Transcribes any audio file given using OpenAI's whisper AI
|
||
|
and pyannote for speaker detection.
|
||
|
"""
|
||
|
print(f"Token: {token}")
|
||
|
headers = {
|
||
|
"Content-Type": "application/json",
|
||
|
"Authorization": f"Bearer {token}",
|
||
|
}
|
||
|
options: Config = Config(
|
||
|
endpoint=endpoint, token=token, headers=headers, console=Console()
|
||
|
)
|
||
|
ctx.obj = options
|
||
|
|
||
|
|
||
|
@cli.command()
|
||
|
@click.pass_obj
|
||
|
@click.argument(
|
||
|
"audiofile",
|
||
|
type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path),
|
||
|
)
|
||
|
def start(config: Config, audiofile: Path) -> None:
|
||
|
"""Start processing the given audiofile.
|
||
|
|
||
|
Queues a job in the processing queue of the AI api.
|
||
|
"""
|
||
|
endpoint_new_job = f"{config.endpoint}/run"
|
||
|
with config.console.status("[bold green]Uploading data..."):
|
||
|
url = _upload_to_oxo(audiofile)
|
||
|
|
||
|
input_data = {"input": {"url": url}}
|
||
|
config.console.log(f"[green]Requesting new job for[/green] {audiofile}...")
|
||
|
response = requests.post(endpoint_new_job, json=input_data, headers=config.headers)
|
||
|
job_id = response.json()["id"]
|
||
|
config.console.log(f"[green]Job[/green] {job_id} [green]has been queued.[/green]")
|
||
|
print_job_status(config, job_id)
|
||
|
|
||
|
|
||
|
@cli.command()
|
||
|
@click.pass_obj
|
||
|
def health(config: Config) -> None:
|
||
|
endpoint_health = f"{config.endpoint}/health"
|
||
|
with config.console.status("[bold green]Requesting health status..."):
|
||
|
resp = requests.get(endpoint_health, headers=config.headers)
|
||
|
json = resp.json()
|
||
|
config.console.print_json(data=json)
|
||
|
|
||
|
|
||
|
@cli.command()
|
||
|
@click.pass_obj
|
||
|
@click.argument("job_id")
|
||
|
def job(config: Config, job_id: str) -> None:
|
||
|
print_job_status(config, job_id)
|
||
|
|
||
|
|
||
|
def cancel(config: Config, job_id: str) -> None:
|
||
|
...
|