refactor: Extract PDF extractor into class

Extractor is a general protocol with the PDF extraction routine now being
one implementation of the protocol. Preparation for adding multiple
extractors (epub,djvu, or specific progammes) in the future.
This commit is contained in:
Marty Oehme 2024-01-20 18:02:18 +01:00
parent 765de505bb
commit 3b4db7b6b8
Signed by: Marty
GPG key ID: EDBF2ED917B2EF6A
2 changed files with 147 additions and 123 deletions

View file

@ -1,20 +1,27 @@
import re import re
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Protocol
import Levenshtein
import magic
import fitz import fitz
import papis.logging
import papis.config import papis.config
import papis.document import papis.document
import papis.logging
from papis.document import Document from papis.document import Document
from papis_extract.annotation import Annotation from papis_extract.annotation import Annotation
from papis_extract.extractors.pdf import PdfExtractor
logger = papis.logging.get_logger(__name__) logger = papis.logging.get_logger(__name__)
class Extractor(Protocol):
def can_process(self, filename: Path) -> bool:
...
def run(self, filename: Path) -> list[Annotation]:
...
def start( def start(
document: Document, document: Document,
) -> list[Annotation]: ) -> list[Annotation]:
@ -24,16 +31,18 @@ def start(
documents passed in. documents passed in.
""" """
pdf_extractor: Extractor = PdfExtractor()
annotations: list[Annotation] = [] annotations: list[Annotation] = []
found_pdf: bool = False found_pdf: bool = False
for file in document.get_files(): for file in document.get_files():
fname = Path(file) fname = Path(file)
if not _is_file_processable(fname): if not pdf_extractor.can_process(fname):
break break
found_pdf = True found_pdf = True
try: try:
annotations.extend(extract(fname)) annotations.extend(pdf_extractor.run(fname))
except fitz.FileDataError as e: except fitz.FileDataError as e:
print(f"File structure errors for {file}.\n{e}") print(f"File structure errors for {file}.\n{e}")
@ -45,120 +54,3 @@ def start(
return annotations return annotations
def extract(filename: Path) -> list[Annotation]:
"""Extract annotations from a file.
Returns all readable annotations contained in the file
passed in. Only returns Highlight or Text annotations.
"""
annotations = []
with fitz.Document(filename) as doc:
for page in doc:
for annot in page.annots():
quote, note = _retrieve_annotation_content(page, annot)
if not quote and not note:
continue
col = (
annot.colors.get("fill")
or annot.colors.get("stroke")
or (0.0, 0.0, 0.0)
)
a = Annotation(
file=str(filename),
text=quote or "",
content=note or "",
colors=col,
type=annot.type[1],
page=(page.number or 0) + 1,
)
a.tag = _tag_from_colorname(a.colorname or "")
annotations.append(a)
logger.debug(
f"Found {len(annotations)} "
f"{'annotation' if len(annotations) == 1 else 'annotations'} for {filename}."
)
return annotations
def is_pdf(fname: Path) -> bool:
"""Check if file is a pdf, using mime type."""
return magic.from_file(fname, mime=True) == "application/pdf"
def _is_file_processable(fname: Path) -> bool:
if not fname.is_file():
logger.error(f"File {str(fname)} not readable.")
return False
if not is_pdf(fname):
return False
return True
def _tag_from_colorname(colorname: str) -> str:
color_mapping: dict[str, str] = getdict("tags", "plugins.extract")
if not color_mapping:
return ""
return color_mapping.get(colorname, "")
def _retrieve_annotation_content(
page: fitz.Page, annotation: fitz.Annot
) -> tuple[str | None, str | None]:
"""Gets the text content of an annotation.
Returns the actual content of an annotation. Sometimes
that is only the written words, sometimes that is only
annotation notes, sometimes it is both. Runs a similarity
comparison between strings to find out whether they
should both be included or are the same, using
Levenshtein distance.
"""
content = annotation.info["content"].replace("\n", " ")
written = page.get_textbox(annotation.rect).replace("\n", " ")
# highlight with selection in note
minimum_similarity = (
papis.config.getfloat("minimum_similarity_content", "plugins.extract") or 1.0
)
if Levenshtein.ratio(content, written) > minimum_similarity:
return (content, None)
# both a highlight and a note
elif content and written:
return (written, content)
# an independent note, not a highlight
elif content:
return (None, content)
# highlight with selection not in note
elif written:
return (written, None)
# just a highlight without any text
return (None, None)
# mimics the functions in papis.config.{getlist,getint,getfloat} etc.
def getdict(key: str, section: Optional[str] = None) -> dict[str, str]:
"""Dict getter
:returns: A python dict
:raises SyntaxError: Whenever the parsed syntax is either not a valid
python object or a valid python dict.
"""
rawvalue: Any = papis.config.general_get(key, section=section)
if isinstance(rawvalue, dict):
return rawvalue
try:
rawvalue = eval(rawvalue)
except Exception:
raise SyntaxError(
"The key '{}' must be a valid Python object: {}".format(key, rawvalue)
)
else:
if not isinstance(rawvalue, dict):
raise SyntaxError(
"The key '{}' must be a valid Python dict. Got: {} (type {!r})".format(
key, rawvalue, type(rawvalue).__name__
)
)
return rawvalue

View file

@ -0,0 +1,132 @@
from pathlib import Path
from typing import Any, Optional
import fitz
import Levenshtein
import magic
import papis.config
import papis.logging
from papis_extract.annotation import Annotation
logger = papis.logging.get_logger(__name__)
class PdfExtractor:
def can_process(self, filename: Path) -> bool:
if not filename.is_file():
logger.error(f"File {str(filename)} not readable.")
return False
if not self._is_pdf(filename):
return False
return True
def run(self, filename: Path) -> list[Annotation]:
"""Extract annotations from a file.
Returns all readable annotations contained in the file
passed in. Only returns Highlight or Text annotations.
"""
annotations = []
with fitz.Document(filename) as doc:
for page in doc:
for annot in page.annots():
quote, note = self._retrieve_annotation_content(page, annot)
if not quote and not note:
continue
col = (
annot.colors.get("fill")
or annot.colors.get("stroke")
or (0.0, 0.0, 0.0)
)
a = Annotation(
file=str(filename),
text=quote or "",
content=note or "",
colors=col,
type=annot.type[1],
page=(page.number or 0) + 1,
)
a.tag = self._tag_from_colorname(a.colorname or "")
annotations.append(a)
logger.debug(
f"Found {len(annotations)} "
f"{'annotation' if len(annotations) == 1 else 'annotations'} for {filename}."
)
return annotations
def _is_pdf(self, fname: Path) -> bool:
"""Check if file is a pdf, using mime type."""
return magic.from_file(fname, mime=True) == "application/pdf"
def _tag_from_colorname(self, colorname: str) -> str:
color_mapping: dict[str, str] = self._getdict("tags", "plugins.extract")
if not color_mapping:
return ""
return color_mapping.get(colorname, "")
def _retrieve_annotation_content(self,
page: fitz.Page, annotation: fitz.Annot
) -> tuple[str | None, str | None]:
"""Gets the text content of an annotation.
Returns the actual content of an annotation. Sometimes
that is only the written words, sometimes that is only
annotation notes, sometimes it is both. Runs a similarity
comparison between strings to find out whether they
should both be included or are the same, using
Levenshtein distance.
"""
content = annotation.info["content"].replace("\n", " ")
written = page.get_textbox(annotation.rect).replace("\n", " ")
# highlight with selection in note
minimum_similarity = (
papis.config.getfloat("minimum_similarity_content", "plugins.extract") or 1.0
)
if Levenshtein.ratio(content, written) > minimum_similarity:
return (content, None)
# both a highlight and a note
elif content and written:
return (written, content)
# an independent note, not a highlight
elif content:
return (None, content)
# highlight with selection not in note
elif written:
return (written, None)
# just a highlight without any text
return (None, None)
# mimics the functions in papis.config.{getlist,getint,getfloat} etc.
def _getdict(self, key: str, section: Optional[str] = None) -> dict[str, str]:
"""Dict getter
:returns: A python dict
:raises SyntaxError: Whenever the parsed syntax is either not a valid
python object or a valid python dict.
"""
rawvalue: Any = papis.config.general_get(key, section=section)
if isinstance(rawvalue, dict):
return rawvalue
try:
rawvalue = eval(rawvalue)
except Exception:
raise SyntaxError(
"The key '{}' must be a valid Python object: {}".format(key, rawvalue)
)
else:
if not isinstance(rawvalue, dict):
raise SyntaxError(
"The key '{}' must be a valid Python dict. Got: {} (type {!r})".format(
key, rawvalue, type(rawvalue).__name__
)
)
return rawvalue