91 lines
2.6 KiB
Python
91 lines
2.6 KiB
Python
import quick
|
|
import click
|
|
import requests
|
|
import logging
|
|
from pathlib import Path
|
|
from rich.console import Console
|
|
import runpod
|
|
from runpod.endpoint import Job
|
|
from network_functions import _upload_to_oxo
|
|
from job_functions import print_job_status, start_job
|
|
from configuration import Config
|
|
|
|
# TODO turn all this into config style options or @click-style flags/options
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
RUNPOD_API_URL = "https://api.runpod.ai/v2/"
|
|
|
|
|
|
@quick.gui_option()
|
|
@click.group()
|
|
@click.pass_context
|
|
@click.option("--pod-id", "-i", help="ID of runpod serverless endpoint to use.")
|
|
@click.option("--token", "-t", help="Access token for runpod instance.")
|
|
# TODO @click.version_option()
|
|
def cli(ctx, pod_id, token):
|
|
"""Verbanote
|
|
|
|
Transcribes any audio file given using OpenAI's whisper AI
|
|
and pyannote for speaker detection.
|
|
"""
|
|
runpod.api_key = token
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {token}",
|
|
}
|
|
options: Config = Config(
|
|
endpoint=f"{RUNPOD_API_URL}{pod_id}",
|
|
pod_id=pod_id,
|
|
token=token,
|
|
headers=headers,
|
|
console=Console(),
|
|
)
|
|
ctx.obj = options
|
|
|
|
|
|
@cli.command()
|
|
@click.pass_obj
|
|
@click.option("--language", "-l", help="Language to use for transcription in 2-letter ISO code (`de` or `fr`). Defaults to `en`.")
|
|
@click.argument(
|
|
"audiofile",
|
|
type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path),
|
|
)
|
|
def start(config: Config, language: str, audiofile: Path) -> None:
|
|
"""Start processing the given audiofile.
|
|
|
|
Queues a job in the processing queue of the AI api.
|
|
"""
|
|
with config.console.status("[bold green]Uploading data to 0x0.st...\n"):
|
|
url = _upload_to_oxo(audiofile)
|
|
|
|
input_data = {"url": url, "lang": language}
|
|
config.console.print(
|
|
f"[green]Requesting new job for[/green] {audiofile} over {url}..."
|
|
)
|
|
job = start_job(config, input_data)
|
|
config.console.print(
|
|
f"[green]Job[/green] {job.job_id} [green]has been queued.[/green]"
|
|
)
|
|
print_job_status(config, job)
|
|
|
|
|
|
@cli.command()
|
|
@click.pass_obj
|
|
def health(config: Config) -> None:
|
|
endpoint_health = f"{config.endpoint}/health"
|
|
with config.console.status("[bold green]Requesting health status..."):
|
|
resp = requests.get(endpoint_health, headers=config.headers)
|
|
json = resp.json()
|
|
config.console.print_json(data=json)
|
|
|
|
|
|
@cli.command()
|
|
@click.pass_obj
|
|
@click.argument("job_id")
|
|
def job(config: Config, job_id: str) -> None:
|
|
job = Job(config.pod_id, job_id)
|
|
print_job_status(config, job, once=True)
|
|
|
|
|
|
def cancel(config: Config, job_id: str) -> None:
|
|
...
|