import logging import os import re import json from dataclasses import dataclass from pathlib import Path from pyannote.audio import Pipeline from pydub import AudioSegment from whisper import Whisper MILLISECONDS_TO_SPACE = 2000 @dataclass class TxtTranscription: text: str file: Path def diarize(audiofile: Path, pipeline: Pipeline, output_path: Path) -> Path: audiofile_prepended = _add_audio_silence(audiofile) DIARIZE_FILE = {"uri": "not-important", "audio": audiofile_prepended} dz = pipeline(DIARIZE_FILE) out_file = Path.joinpath(output_path, "diarization.txt") with open(out_file, "w") as text_file: text_file.write(str(dz)) print("Diarized:") print(*list(dz.itertracks(yield_label=True))[:10], sep="\n") return out_file def transcribe( model: Whisper, diarized_groups: list, files_path: Path, lang: str = "en", word_timestamps: bool = True, ) -> None: for i in range(len(diarized_groups)): audio_f = Path.joinpath(files_path, f"{str(i)}.wav") json_f = Path.joinpath(files_path, f"{str(i)}.json") logging.info(f"Starting transcription of {str(audio_f)}...") result = model.transcribe( audio=str(audio_f), language=lang, word_timestamps=word_timestamps ) with open(json_f, "w") as outfile: json.dump(result, outfile, indent=4) logging.info(f"Transcription written to {str(json_f)}.") # TODO clean up this mess def output_txt(diarized_groups: list, transcription_path: Path) -> TxtTranscription: txt = list("") gidx = -1 for g in diarized_groups: shift = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[0])[0] shift = ( _millisec(shift) - MILLISECONDS_TO_SPACE ) # the start time in the original video shift = max(shift, 0) gidx += 1 fname = Path.joinpath(transcription_path, f"{str(gidx)}.json") with open(fname) as f: captions = json.load(f)["segments"] logging.info(f"Loaded {fname} for transcription...") if captions: speaker = g[0].split()[-1] for c in captions: txt.append(f"[{speaker}] {c['text']}\n") txt.append("\n") output = "".join(txt) fname = Path.joinpath(transcription_path, "transcription_result.txt") with open(fname, "w", encoding="utf-8") as file: file.write(output) logging.info(f"Wrote transcription to output file {fname}.") return TxtTranscription(text=output, file=fname) def save_diarized_audio_files( diarization: Path, audiofile: Path, output_path: Path ) -> list: groups = _group_speakers(diarization) _save_individual_audio_files( audiofile=audiofile, groups=groups, output_path=output_path ) return groups def _add_audio_silence(audiofile) -> Path: spacermilli = MILLISECONDS_TO_SPACE spacer = AudioSegment.silent(duration=spacermilli) audio = AudioSegment.from_wav(audiofile) audio = spacer.append(audio, crossfade=0) fname = Path.joinpath(Path(os.path.dirname(audiofile)), "interview_prepend.wav") audio.export(fname, format="wav") logging.info(f"Exported audiofile with silence prepended to {fname}.") return fname def _save_individual_audio_files( audiofile: Path, groups: list[str], output_path: Path ) -> None: audio = AudioSegment.from_wav(audiofile) gidx = -1 for g in groups: start = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[0])[0] end = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[-1])[1] start = _millisec(start) # - spacermilli end = _millisec(end) # - spacermilli gidx += 1 fname = Path.joinpath(output_path, f"{str(gidx)}.wav") audio[start:end].export(fname, format="wav") logging.info(f"Exported audiopart {gidx} of {len(groups)} to {fname}.") def _group_speakers(diarization_file: Path) -> list: dzs = open(diarization_file).read().splitlines() groups: list = [] g = [] lastend = 0 for d in dzs: if g and (g[0].split()[-1] != d.split()[-1]): # same speaker groups.append(g) g = [] g.append(d) end = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=d)[1] end = _millisec(end) if lastend > end: # segment engulfed by a previous segment groups.append(g) g = [] else: lastend = end if g: groups.append(g) return groups def _millisec(timeStr): spl = timeStr.split(":") s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2])) * 1000) return s