import requests import logging import time import sys import click from pathlib import Path args = sys.argv # TODO turn all this into config style options or @click-style flags/options logging.basicConfig(level=logging.INFO) pod_id = "7x0b7u16s6vyrc" bearer_token = "EIWX9RO18PRXCD0RUSY26MSD062GUF6REQGGV6QB" api = f"https://api.runpod.ai/v2/{pod_id}" run_endpoint = f"{api}/run" status_endpoint = f"{api}/status" health_endpoint = f"{api}/health" purge_endpoint = f"{api}/purge-queue" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {bearer_token}", } @click.group() @click.option("--endpoint", "-e", help="URL of runpod serverless endpoint.") @click.option("--token", "-t", help="Access token for runpod instance.") # TODO @click.version_option() def cli(token): """Verbanote Transcribes any audio file given using OpenAI's whisper AI and pyannote for speaker detection. """ print(f"Token: {token}") @cli.command() @click.argument( "audiofile", type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path), ) def start(audiofile): """Start processing the given audiofile. Queues a job in the processing queue of the AI api. """ url = _upload_to_oxo(audiofile) input_data = {"input": {"url": url}} logging.info(f"Requesting new job for {audiofile}...") response = requests.post(run_endpoint, json=input_data, headers=headers) click.echo(f"Job {response} has been queued.") @cli.command() def health(): logging.info("requesting health status...") resp = requests.get(health_endpoint, headers=headers) click.echo(resp) def _upload_to_oxo(file: Path, url: str = "https://0x0.st", expires: int = 2) -> str: resp = requests.post( url=url, files={"file": open(file, "rb"), "expires": str(expires)}, ) if not resp.ok: raise requests.exceptions.HTTPError() logging.info(f"Uploaded file {file} to {str(resp.content)}") return str(resp.content) def main(args: list[str]) -> None: if args[1] == "status": if len(args) <= 2: logging.error("No job id to get status from supplied.") sys.exit(1) logging.info(f"requesting job {args[2]} status...") response = requests.get(f"{status_endpoint}/{args[2]}", headers=headers) elif args[1] == "cancel": if len(args) <= 2: logging.error("No job id to cancel supplied.") sys.exit(1) logging.info(f"requesting job {args[2]} cancellation...") response = requests.get(f"{status_endpoint}/{args[2]}", headers=headers) elif args[1] == "purge": logging.info("purging all jobs in queue...") response = requests.post(purge_endpoint, headers=headers) json = response.json() # the json will be similar to # {'id': 'e3d2e250-ea81-4074-9838-1c52d006ddcf', 'status': 'IN_QUEUE'} while "status" in json and ( json["status"] == "IN_QUEUE" or json["status"] == "IN_PROGRESS" ): logging.info(f"{json['status']} for job {json['id']}, waiting...") time.sleep(3) response = requests.get(f"{status_endpoint}/{json['id']}", headers=headers) json = response.json() logging.info(json) if __name__ == "__main__": cli(auto_envvar_prefix='VERBANOTE')