verbanote-server/verbanote/process.py

156 lines
4.7 KiB
Python

import logging
import os
import re
import json
from dataclasses import dataclass
from pathlib import Path
from pyannote.audio import Pipeline
from pydub import AudioSegment
from whisper import Whisper
MILLISECONDS_TO_SPACE = 2000
@dataclass
class TxtTranscription:
text: str
file: Path
def diarize(audiofile: Path, pipeline: Pipeline, output_path: Path) -> Path:
audiofile_prepended = _add_audio_silence(audiofile)
logging.info(f"Beginning diarization of {audiofile}...")
DIARIZE_FILE = {"uri": "not-important", "audio": audiofile_prepended}
dz = pipeline(DIARIZE_FILE)
out_file = Path.joinpath(output_path, "diarization.txt")
with open(out_file, "w") as text_file:
text_file.write(str(dz))
logging.info(f"Created diarization in {out_file}.")
return out_file
def transcribe(
model: Whisper,
diarized_groups: list,
files_path: Path,
lang: str = "en",
word_timestamps: bool = True,
) -> None:
for i in range(len(diarized_groups)):
audio_f = Path.joinpath(files_path, f"{str(i)}.wav")
json_f = Path.joinpath(files_path, f"{str(i)}.json")
logging.info(f"Starting transcription of {str(audio_f)}...")
result = model.transcribe(
audio=str(audio_f), language=lang, word_timestamps=word_timestamps
)
with open(json_f, "w") as outfile:
json.dump(result, outfile, indent=4)
logging.info(f"Transcription written to {str(json_f)}.")
# TODO clean up this mess
def output_txt(diarized_groups: list, transcription_path: Path) -> TxtTranscription:
txt = list("")
gidx = -1
for g in diarized_groups:
shift = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[0])[0]
shift = (
_millisec(shift) - MILLISECONDS_TO_SPACE
) # the start time in the original video
shift = max(shift, 0)
gidx += 1
fname = Path.joinpath(transcription_path, f"{str(gidx)}.json")
with open(fname) as f:
captions = json.load(f)["segments"]
logging.info(f"Loaded {fname} for transcription...")
if captions:
speaker = g[0].split()[-1]
txt.append(f"[{speaker}] ")
for c in captions:
txt.append(f"{c['text']}")
txt.append("\n\n")
output = "".join(txt)
fname = Path.joinpath(transcription_path, "transcription_result.txt")
with open(fname, "w", encoding="utf-8") as file:
file.write(output)
logging.info(f"Wrote transcription to output file {fname}.")
return TxtTranscription(text=output, file=fname)
def save_diarized_audio_files(
diarization: Path, audiofile: Path, output_path: Path
) -> list:
groups = _group_speakers(diarization)
_save_individual_audio_files(
audiofile=audiofile, groups=groups, output_path=output_path
)
return groups
def _add_audio_silence(audiofile) -> Path:
spacermilli = MILLISECONDS_TO_SPACE
spacer = AudioSegment.silent(duration=spacermilli)
audio = AudioSegment.from_wav(audiofile)
audio = spacer.append(audio, crossfade=0)
fname = Path.joinpath(Path(os.path.dirname(audiofile)), "interview_prepend.wav")
audio.export(fname, format="wav")
logging.info(f"Exported audiofile with silence prepended to {fname}.")
return fname
def _save_individual_audio_files(
audiofile: Path, groups: list[str], output_path: Path
) -> None:
audio = AudioSegment.from_wav(audiofile)
gidx = -1
for g in groups:
start = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[0])[0]
end = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[-1])[1]
start = _millisec(start) # - spacermilli
end = _millisec(end) # - spacermilli
gidx += 1
fname = Path.joinpath(output_path, f"{str(gidx)}.wav")
audio[start:end].export(fname, format="wav")
logging.info(f"Exported audiopart {gidx} of {len(groups)} to {fname}.")
def _group_speakers(diarization_file: Path) -> list:
dzs = open(diarization_file).read().splitlines()
groups: list = []
g = []
lastend = 0
for d in dzs:
if g and (g[0].split()[-1] != d.split()[-1]): # same speaker
groups.append(g)
g = []
g.append(d)
end = re.findall(r"[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=d)[1]
end = _millisec(end)
if lastend > end: # segment engulfed by a previous segment
groups.append(g)
g = []
else:
lastend = end
if g:
groups.append(g)
return groups
def _millisec(timeStr):
spl = timeStr.split(":")
s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2])) * 1000)
return s