verbanote-server/verbanote/process.py

147 lines
4.1 KiB
Python

import os
import re
import json
from dataclasses import dataclass
from pathlib import Path
from pyannote.audio import Pipeline
from pydub import AudioSegment
from whisper import Whisper
MILLISECONDS_TO_SPACE = 2000
@dataclass
class TxtTranscription:
text: str
file: Path
def diarize(audiofile: Path, pipeline: Pipeline, output_path: Path) -> Path:
audiofile_prepended = _add_audio_silence(audiofile)
DIARIZE_FILE = {"uri": "not-important", "audio": audiofile_prepended}
dz = pipeline(DIARIZE_FILE)
out_file = Path.joinpath(output_path, "diarization.txt")
with open(out_file, "w") as text_file:
text_file.write(str(dz))
print("Diarized:")
print(*list(dz.itertracks(yield_label=True))[:10], sep="\n")
return out_file
def transcribe(
model: Whisper,
diarized_groups: list,
files_path: Path,
lang: str = "en",
word_timestamps: bool = True,
) -> None:
for i in range(len(diarized_groups)):
f = {Path.joinpath(output_path, str(i))}
audio_f = f"{f}.wav"
json_f = f"{f}.json"
result = model.transcribe(
audio=audio_f, language=lang, word_timestamps=word_timestamps
)
with open(json_f, "w") as outfile:
json.dump(result, outfile, indent=4)
# TODO clean up this mess
def output_txt(diarized_groups: list, transcription_path: Path) -> TxtTranscription:
txt = list("")
gidx = -1
for g in diarized_groups:
shift = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[0])[0]
shift = (
_millisec(shift) - MILLISECONDS_TO_SPACE
) # the start time in the original video
shift = max(shift, 0)
gidx += 1
with open(f"{Path.joinpath(transcription_path, str(gidx))}.json") as f:
captions = json.load(f)["segments"]
if captions:
speaker = g[0].split()[-1]
for c in captions:
txt.append(f"[{speaker}] {c['text']}\n")
txt.append("\n")
output = "".join(txt)
fname = Path.joinpath(transcription_path, "transcription_result.txt")
with open(fname, "w", encoding="utf-8") as file:
file.write(output)
return TxtTranscription(text=output, file=fname)
def save_diarized_audio_files(
diarization: Path, audiofile: Path, output_path: Path
) -> list:
groups = _group_speakers(diarization)
_save_individual_audio_files(audiofile, groups, output_path)
return groups
def _add_audio_silence(audiofile) -> Path:
spacermilli = MILLISECONDS_TO_SPACE
spacer = AudioSegment.silent(duration=spacermilli)
audio = AudioSegment.from_wav(audiofile)
audio = spacer.append(audio, crossfade=0)
out_file = Path.joinpath(Path(os.path.dirname(audiofile)), "interview_prepend.wav")
audio.export(out_file, format="wav")
return out_file
def _save_individual_audio_files(
audiofile: Path, groups: list[str], output_path: Path
) -> None:
audio = AudioSegment.from_wav(audiofile)
gidx = -1
for g in groups:
start = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[0])[0]
end = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[-1])[1]
start = _millisec(start) # - spacermilli
end = _millisec(end) # - spacermilli
gidx += 1
audio[start:end].export(
f"{Path.joinpath(output_path, str(gidx))}.wav", format="wav"
)
def _group_speakers(diarization_file: Path) -> list:
dzs = open(diarization_file).read().splitlines()
groups: list = []
g = []
lastend = 0
for d in dzs:
if g and (g[0].split()[-1] != d.split()[-1]): # same speaker
groups.append(g)
g = []
g.append(d)
end = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=d)[1]
end = _millisec(end)
if lastend > end: # segment engulfed by a previous segment
groups.append(g)
g = []
else:
lastend = end
if g:
groups.append(g)
return groups
def _millisec(timeStr):
spl = timeStr.split(":")
s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2])) * 1000)
return s