pubs-extract/extract/extract.py

368 lines
13 KiB
Python
Raw Normal View History

2022-12-22 16:43:06 +00:00
import os
2022-12-22 23:27:19 +00:00
import re
2022-12-22 16:43:06 +00:00
import argparse
import math
2022-12-24 17:01:55 +00:00
from dataclasses import dataclass
from typing import Tuple
2022-12-22 16:43:06 +00:00
import fitz
import Levenshtein
2022-12-22 16:43:06 +00:00
from pubs.plugins import PapersPlugin
2022-12-24 17:01:55 +00:00
from pubs.paper import Paper
2022-12-22 19:28:27 +00:00
from pubs.events import DocAddEvent, NoteEvent
2022-12-22 16:43:06 +00:00
from pubs import repo, pretty
from pubs.utils import resolve_citekey_list
2022-12-22 19:52:10 +00:00
from pubs.content import check_file, read_text_file, write_file
from pubs.query import get_paper_filter
2022-12-24 17:01:55 +00:00
CONFIRMATION_PAPER_THRESHOLD = 5
COLORS = {
"red": (1, 0, 0),
"green": (0, 1, 0),
"blue": (0, 0, 1),
"yellow": (1, 1, 0),
"purple": (0.5, 0, 0.5),
"orange": (1, 0.65, 0),
}
2022-12-24 17:01:55 +00:00
@dataclass
class Annotation:
"""A PDF annotation object"""
paper: Paper
file: str
type: str = "Highlight"
text: str = ""
content: str = ""
page: int = 1
colors: Tuple = (0.0, 0.0, 0.0)
def formatted(self, formatting):
output = formatting
replacements = {
r"{quote}": self.text,
r"{note}": self.content,
r"{page}": str(self.page),
r"{newline}": "\n",
}
if self.text == "":
output = re.sub(r"{quote_begin}.*{quote_end}", "", output)
if self.content == "":
output = re.sub(r"{note_begin}.*{note_end}", "", output)
output = re.sub(r"{note_begin}", "", output)
output = re.sub(r"{note_end}", "", output)
output = re.sub(r"{quote_begin}", "", output)
output = re.sub(r"{quote_end}", "", output)
pattern = re.compile(
"|".join(
[re.escape(k) for k in sorted(replacements, key=len, reverse=True)]
),
flags=re.DOTALL,
)
return pattern.sub(lambda x: replacements[x.group(0)], output)
@property
def colorname(self):
annot_colors = self.colors.get("stroke") or self.colors.get("fill")
nearest = None
smallest_dist = 2.0
for name, values in COLORS.items():
dist = math.dist([*values], [*annot_colors])
if dist < smallest_dist:
smallest_dist = dist
nearest = name
return nearest
2022-12-22 16:43:06 +00:00
class ExtractPlugin(PapersPlugin):
2022-12-22 20:02:01 +00:00
"""Extract annotations from any pdf document.
The extract plugin allows manual or automatic extraction of all annotations
contained in the pdf documents belonging to entries of the pubs library.
2022-12-22 16:43:06 +00:00
2022-12-22 20:02:01 +00:00
It can write those changes to stdout or directly create and update notes
for the pubs entries.
2022-12-22 16:43:06 +00:00
2022-12-22 20:02:01 +00:00
It adds a `pubs extract` subcommand through which it is invoked, but can
optionally run whenever a new document is imported for a pubs entry.
2022-12-22 16:43:06 +00:00
"""
name = "extract"
description = "Extract annotations from pubs documents"
def __init__(self, conf, ui):
self.ui = ui
self.note_extension = conf["main"]["note_extension"]
2022-12-22 16:43:06 +00:00
self.repository = repo.Repository(conf)
self.pubsdir = os.path.expanduser(conf["main"]["pubsdir"])
self.broker = self.repository.databroker
2022-12-22 17:34:33 +00:00
settings = conf["plugins"].get("extract", {})
self.on_import = settings.get("on_import", False)
self.minimum_similarity = float(settings.get("minimum_similarity", 0.75))
self.formatting = settings.get(
"formatting",
"{newline}{quote_begin}> {quote} {quote_end}[{page}]{note_begin}{newline}Note: {note}{note_end}",
2022-12-22 22:21:13 +00:00
)
self.color_mapping = settings.get("color_mapping", {})
2022-12-22 16:43:06 +00:00
def update_parser(self, subparsers, _):
2022-12-22 20:02:01 +00:00
"""Allow the usage of the pubs extract subcommand"""
2022-12-22 16:43:06 +00:00
# TODO option for ignoring missing documents or erroring.
extract_parser = subparsers.add_parser(self.name, help=self.description)
2022-12-22 17:33:33 +00:00
extract_parser.add_argument(
"-w",
"--write",
2022-12-22 19:52:10 +00:00
help="Write to individual notes instead of standard out. Appends to existing notes.",
2022-12-22 19:32:06 +00:00
action="store_true",
2022-12-22 17:33:33 +00:00
default=None,
)
2022-12-22 17:34:23 +00:00
extract_parser.add_argument(
"-e",
"--edit",
2022-12-22 19:52:10 +00:00
help="Open each note in editor for manual editing after extracting annotations to it.",
2022-12-22 19:32:06 +00:00
action="store_true",
2022-12-22 17:34:23 +00:00
default=False,
)
extract_parser.add_argument(
"-q",
"--query",
help="Query library instead of providing individual citekeys. For query help see pubs list command.",
action="store_true",
default=None,
dest="is_query",
)
extract_parser.add_argument(
"-i",
"--ignore-case",
action="store_false",
default=None,
dest="case_sensitive",
help="When using query mode, perform case insensitive search.",
)
extract_parser.add_argument(
"-I",
"--force-case",
action="store_true",
dest="case_sensitive",
help="When using query mode, perform case sensitive search.",
)
extract_parser.add_argument(
"--strict",
action="store_true",
default=False,
help="Force strict unicode comparison of query.",
)
extract_parser.add_argument(
"query",
nargs=argparse.REMAINDER,
help="Citekey(s)/query for the documents to extract from.",
)
2022-12-22 16:43:06 +00:00
extract_parser.set_defaults(func=self.command)
def command(self, conf, args):
2022-12-22 20:02:01 +00:00
"""Run the annotation extraction command."""
papers = self._gather_papers(conf, args)
all_annotations = self.extract(papers)
2022-12-22 17:33:33 +00:00
if args.write:
self._to_notes(all_annotations, self.note_extension, args.edit)
2022-12-22 17:33:33 +00:00
else:
self._to_stdout(all_annotations)
2022-12-22 17:34:33 +00:00
self.repository.close()
2022-12-22 16:43:06 +00:00
def extract(self, papers):
2022-12-22 20:02:01 +00:00
"""Extracts annotations from citekeys.
Returns all annotations belonging to the papers that
are described by the citekeys passed in.
"""
2022-12-24 17:01:55 +00:00
papers_annotated = {}
2022-12-22 16:43:06 +00:00
for paper in papers:
file = self._get_file(paper)
2022-12-22 16:43:06 +00:00
try:
2022-12-24 17:01:55 +00:00
annotations = self._get_annotations(file, paper)
papers_annotated[paper.citekey] = annotations
2022-12-22 16:43:06 +00:00
except fitz.FileDataError as e:
2022-12-22 18:39:36 +00:00
self.ui.error(f"Document {file} is broken: {e}")
2022-12-22 16:43:06 +00:00
return papers_annotated
def mapped_tag(self, colorname):
return self.color_mapping.get(colorname)
def _gather_papers(self, conf, args):
2022-12-22 20:02:01 +00:00
"""Get all papers for citekeys.
Returns all Paper objects described by the citekeys
passed in.
"""
2022-12-22 16:43:06 +00:00
papers = []
if not args.is_query:
keys = resolve_citekey_list(
self.repository, conf, args.query, ui=self.ui, exit_on_fail=True
)
if not keys:
return []
for key in keys:
papers.append(self.repository.pull_paper(key))
else:
papers = list(
filter(
get_paper_filter(
args.query,
case_sensitive=args.case_sensitive,
strict=args.strict,
),
self.repository.all_papers(),
)
)
if len(papers) > CONFIRMATION_PAPER_THRESHOLD:
self.ui.message(
"\n".join(
pretty.paper_oneliner(
p, citekey_only=False, max_authors=conf["main"]["max_authors"]
)
for p in papers
)
)
self.ui.input_yn(
question=f"Extract annotations for these papers?", default="y"
)
2022-12-22 16:43:06 +00:00
return papers
def _get_file(self, paper):
2022-12-22 20:02:01 +00:00
"""Get path of document belonging to paper.
Returns the real path to the document which belongs
to the paper passed in. Emits a warning if no
document belongs to paper.
"""
2022-12-22 16:43:06 +00:00
path = self.broker.real_docpath(paper.docpath)
if not path:
2022-12-22 18:39:36 +00:00
self.ui.warning(f"{paper.citekey} has no valid document.")
2022-12-22 16:43:06 +00:00
return path
2022-12-24 17:01:55 +00:00
def _get_annotations(self, filename, paper):
2022-12-22 20:02:01 +00:00
"""Extract annotations from a file.
Returns all readable annotations contained in the file
passed in. Only returns Highlight or Text annotations
currently.
"""
2022-12-22 16:43:06 +00:00
annotations = []
with fitz.Document(filename) as doc:
for page in doc:
for annot in page.annots():
2022-12-22 22:47:50 +00:00
quote, note = self._retrieve_annotation_content(page, annot)
2022-12-22 23:27:19 +00:00
annotations.append(
2022-12-24 17:01:55 +00:00
Annotation(
file=filename,
paper=paper,
text=quote,
content=note,
colors=annot.colors,
type=annot.type,
page=(page.number or 0) + 1,
)
2022-12-22 23:27:19 +00:00
)
2022-12-22 16:43:06 +00:00
return annotations
2022-12-22 22:47:50 +00:00
def _retrieve_annotation_content(self, page, annotation):
"""Gets the text content of an annotation.
Returns the actual content of an annotation. Sometimes
that is only the written words, sometimes that is only
annotation notes, sometimes it is both. Runs a similarity
comparison between strings to find out whether they
should both be included or are doubling up.
"""
content = annotation.info["content"].replace("\n", " ")
written = page.get_textbox(annotation.rect).replace("\n", " ")
2022-12-22 21:56:07 +00:00
# highlight with selection in note
2022-12-22 22:21:13 +00:00
if Levenshtein.ratio(content, written) > self.minimum_similarity:
2022-12-22 22:47:50 +00:00
return (content, "")
2022-12-22 21:56:07 +00:00
# an independent note, not a highlight
elif content and not written:
2022-12-22 22:47:50 +00:00
return ("", content)
2022-12-22 21:56:07 +00:00
# both a highlight and a note
elif content:
2022-12-22 22:47:50 +00:00
return (written, content)
2022-12-22 21:56:07 +00:00
# highlight with selection not in note
2022-12-22 22:47:50 +00:00
return (written, "")
def _to_stdout(self, annotated_papers):
2022-12-22 20:02:01 +00:00
"""Write annotations to stdout.
Simply outputs the gathered annotations over stdout
ready to be passed on through pipelines etc.
"""
2022-12-22 18:39:36 +00:00
output = ""
2022-12-24 17:01:55 +00:00
for citekey, annotations in annotated_papers.items():
output += f"------ {citekey} ------\n"
for annotation in annotations:
# for annot in annotations:
output += f"{annotation.formatted(self.formatting)}\n"
2022-12-22 19:32:06 +00:00
output += "\n"
2022-12-22 18:39:36 +00:00
print(output)
2022-12-22 16:43:06 +00:00
def _to_notes(self, annotated_papers, note_extension="txt", edit=False):
2022-12-22 20:02:01 +00:00
"""Write annotations into pubs notes.
Permanently writes the given annotations into notes
in the pubs notes directory. Creates new notes for
citekeys missing a note or appends to existing.
"""
2022-12-24 17:01:55 +00:00
for citekey, annotations in annotated_papers.items():
2022-12-22 17:33:00 +00:00
if annotations:
2022-12-24 17:01:55 +00:00
notepath = self.broker.real_notepath(citekey, note_extension)
2022-12-22 19:52:10 +00:00
if check_file(notepath, fail=False):
self._append_to_note(notepath, annotations)
else:
self._write_new_note(notepath, annotations)
2022-12-24 17:01:55 +00:00
self.ui.info(f"Wrote annotations to {citekey} note {notepath}.")
2022-12-22 19:52:10 +00:00
2022-12-22 17:34:23 +00:00
if edit is True:
self.ui.edit_file(notepath, temporary=False)
2022-12-24 17:01:55 +00:00
NoteEvent(citekey).send()
2022-12-22 16:43:06 +00:00
2022-12-22 19:52:10 +00:00
def _write_new_note(self, notepath, annotations):
2022-12-22 20:02:01 +00:00
"""Create a new note containing the annotations.
Will create a new note in the notes folder of pubs
and fill it with the annotations extracted from pdf.
"""
2022-12-22 19:52:10 +00:00
output = "# Annotations\n\n"
for annotation in annotations:
2022-12-24 17:01:55 +00:00
output += f"{annotation.formatted(self.formatting)}\n\n"
2022-12-22 19:52:10 +00:00
write_file(notepath, output, "w")
def _append_to_note(self, notepath, annotations):
"""Append new annotations to the end of a note.
Looks through note to determine any new annotations which should be
added and adds them to the end of the note file.
"""
existing = read_text_file(notepath)
# removed annotations already found in the note
2022-12-24 17:01:55 +00:00
existing_dropped = [x for x in annotations if x.formatted(self.formatting) not in existing]
2022-12-22 19:52:10 +00:00
if not existing_dropped:
return
output = ""
for annotation in existing_dropped:
2022-12-24 17:01:55 +00:00
output += f"{annotation.formatted(self.formatting)}\n\n"
2022-12-22 19:52:10 +00:00
write_file(notepath, output, "a")
2022-12-22 16:43:06 +00:00
@DocAddEvent.listen()
def modify_event(event):
if ExtractPlugin.is_loaded():
plg = ExtractPlugin.get_instance()
if plg.on_import:
all_annotations = plg.extract([event.citekey])
if all_annotations[0][1]:
plg._to_notes(all_annotations, plg.note_extension)