pubs-extract/extract/extract.py

223 lines
7.9 KiB
Python
Raw Normal View History

2022-12-22 16:43:06 +00:00
import os
import argparse
import fitz
from pubs.plugins import PapersPlugin
2022-12-22 19:28:27 +00:00
from pubs.events import DocAddEvent, NoteEvent
2022-12-22 16:43:06 +00:00
from pubs import repo
from pubs.utils import resolve_citekey_list
2022-12-22 19:52:10 +00:00
from pubs.content import check_file, read_text_file, write_file
2022-12-22 16:43:06 +00:00
class ExtractPlugin(PapersPlugin):
2022-12-22 20:02:01 +00:00
"""Extract annotations from any pdf document.
The extract plugin allows manual or automatic extraction of all annotations
contained in the pdf documents belonging to entries of the pubs library.
2022-12-22 16:43:06 +00:00
2022-12-22 20:02:01 +00:00
It can write those changes to stdout or directly create and update notes
for the pubs entries.
2022-12-22 16:43:06 +00:00
2022-12-22 20:02:01 +00:00
It adds a `pubs extract` subcommand through which it is invoked, but can
optionally run whenever a new document is imported for a pubs entry.
2022-12-22 16:43:06 +00:00
"""
name = "extract"
description = "Extract annotations from pubs documents"
def __init__(self, conf, ui):
self.ui = ui
self.note_extension = conf["main"]["note_extension"]
2022-12-22 16:43:06 +00:00
self.repository = repo.Repository(conf)
self.pubsdir = os.path.expanduser(conf["main"]["pubsdir"])
self.broker = self.repository.databroker
2022-12-22 17:34:33 +00:00
2022-12-22 19:31:28 +00:00
# TODO implement custom annotation formatting, akin to main config citekey format
# e.g. `> [{page}] {annotation}`
# or `:: {annotation} :: {page} ::`
# and so on
self.onimport = conf["plugins"].get("extract", {}).get("onimport", False)
2022-12-22 16:43:06 +00:00
def update_parser(self, subparsers, conf):
2022-12-22 20:02:01 +00:00
"""Allow the usage of the pubs extract subcommand"""
2022-12-22 16:43:06 +00:00
# TODO option for ignoring missing documents or erroring.
extract_parser = subparsers.add_parser(self.name, help=self.description)
extract_parser.add_argument(
"citekeys",
nargs=argparse.REMAINDER,
help="citekey(s) of the documents to extract from",
)
2022-12-22 17:33:33 +00:00
extract_parser.add_argument(
"-w",
"--write",
2022-12-22 19:52:10 +00:00
help="Write to individual notes instead of standard out. Appends to existing notes.",
2022-12-22 19:32:06 +00:00
action="store_true",
2022-12-22 17:33:33 +00:00
default=None,
)
2022-12-22 17:34:23 +00:00
extract_parser.add_argument(
"-e",
"--edit",
2022-12-22 19:52:10 +00:00
help="Open each note in editor for manual editing after extracting annotations to it.",
2022-12-22 19:32:06 +00:00
action="store_true",
2022-12-22 17:34:23 +00:00
default=False,
)
2022-12-22 16:43:06 +00:00
extract_parser.set_defaults(func=self.command)
def command(self, conf, args):
2022-12-22 20:02:01 +00:00
"""Run the annotation extraction command."""
2022-12-22 16:43:06 +00:00
citekeys = resolve_citekey_list(
self.repository, conf, args.citekeys, ui=self.ui, exit_on_fail=True
)
if not citekeys:
return
all_annotations = self.extract(citekeys)
2022-12-22 17:33:33 +00:00
if args.write:
self._to_notes(all_annotations, self.note_extension, args.edit)
2022-12-22 17:33:33 +00:00
else:
self._to_stdout(all_annotations)
2022-12-22 17:34:33 +00:00
self.repository.close()
2022-12-22 16:43:06 +00:00
def extract(self, citekeys):
2022-12-22 20:02:01 +00:00
"""Extracts annotations from citekeys.
Returns all annotations belonging to the papers that
are described by the citekeys passed in.
"""
papers = self._gather_papers(citekeys)
papers_annotated = []
2022-12-22 16:43:06 +00:00
for paper in papers:
file = self._get_file(paper)
2022-12-22 16:43:06 +00:00
try:
papers_annotated.append((paper, self._get_annotations(file)))
2022-12-22 16:43:06 +00:00
except fitz.FileDataError as e:
2022-12-22 18:39:36 +00:00
self.ui.error(f"Document {file} is broken: {e}")
2022-12-22 16:43:06 +00:00
return papers_annotated
def _gather_papers(self, citekeys):
2022-12-22 20:02:01 +00:00
"""Get all papers for citekeys.
Returns all Paper objects described by the citekeys
passed in.
"""
2022-12-22 16:43:06 +00:00
papers = []
for key in citekeys:
papers.append(self.repository.pull_paper(key))
return papers
def _get_file(self, paper):
2022-12-22 20:02:01 +00:00
"""Get path of document belonging to paper.
Returns the real path to the document which belongs
to the paper passed in. Emits a warning if no
document belongs to paper.
"""
2022-12-22 16:43:06 +00:00
path = self.broker.real_docpath(paper.docpath)
if not path:
2022-12-22 18:39:36 +00:00
self.ui.warning(f"{paper.citekey} has no valid document.")
2022-12-22 16:43:06 +00:00
return path
def _get_annotations(self, filename):
2022-12-22 20:02:01 +00:00
"""Extract annotations from a file.
Returns all readable annotations contained in the file
passed in. Only returns Highlight or Text annotations
currently.
"""
2022-12-22 16:43:06 +00:00
annotations = []
with fitz.Document(filename) as doc:
for page in doc:
for annot in page.annots():
content = self._retrieve_annotation_content(page, annot)
2022-12-22 16:43:06 +00:00
if content:
2022-12-22 20:42:56 +00:00
annotations.append(f"[{(page.number or 0) + 1}] {content}")
2022-12-22 16:43:06 +00:00
return annotations
def _retrieve_annotation_content(self, page, annotation):
content = annotation.info["content"].replace("\n", " ")
written = page.get_textbox(annotation.rect).replace("\n", " ")
if written in content:
return content
elif content:
return f"{written}\nNote: {content}"
return written
def _to_stdout(self, annotated_papers):
2022-12-22 20:02:01 +00:00
"""Write annotations to stdout.
Simply outputs the gathered annotations over stdout
ready to be passed on through pipelines etc.
"""
2022-12-22 18:39:36 +00:00
output = ""
for contents in annotated_papers:
paper = contents[0]
annotations = contents[1]
2022-12-22 16:43:06 +00:00
if annotations:
2022-12-22 19:32:06 +00:00
output += f"{paper.citekey}\n"
2022-12-22 16:43:06 +00:00
for annot in annotations:
2022-12-22 19:32:06 +00:00
output += f'> "{annot}"\n'
output += "\n"
2022-12-22 18:39:36 +00:00
print(output)
2022-12-22 16:43:06 +00:00
def _to_notes(self, annotated_papers, note_extension="txt", edit=False):
2022-12-22 20:02:01 +00:00
"""Write annotations into pubs notes.
Permanently writes the given annotations into notes
in the pubs notes directory. Creates new notes for
citekeys missing a note or appends to existing.
"""
2022-12-22 17:33:00 +00:00
for contents in annotated_papers:
paper = contents[0]
annotations = contents[1]
if annotations:
2022-12-22 19:32:06 +00:00
notepath = self.broker.real_notepath(paper.citekey, note_extension)
2022-12-22 19:52:10 +00:00
if check_file(notepath, fail=False):
self._append_to_note(notepath, annotations)
else:
self._write_new_note(notepath, annotations)
2022-12-22 20:10:56 +00:00
self.ui.info(f"Wrote annotations to {paper.citekey} note {notepath}.")
2022-12-22 19:52:10 +00:00
2022-12-22 17:34:23 +00:00
if edit is True:
self.ui.edit_file(notepath, temporary=False)
2022-12-22 19:28:27 +00:00
NoteEvent(paper.citekey).send()
2022-12-22 16:43:06 +00:00
2022-12-22 19:52:10 +00:00
def _write_new_note(self, notepath, annotations):
2022-12-22 20:02:01 +00:00
"""Create a new note containing the annotations.
Will create a new note in the notes folder of pubs
and fill it with the annotations extracted from pdf.
"""
2022-12-22 19:52:10 +00:00
output = "# Annotations\n\n"
for annotation in annotations:
output += f"> {annotation}\n\n"
write_file(notepath, output, "w")
def _append_to_note(self, notepath, annotations):
"""Append new annotations to the end of a note.
Looks through note to determine any new annotations which should be
added and adds them to the end of the note file.
"""
existing = read_text_file(notepath)
# removed annotations already found in the note
existing_dropped = [x for x in annotations if x not in existing]
if not existing_dropped:
return
output = ""
for annotation in existing_dropped:
output += f"> {annotation}\n\n"
write_file(notepath, output, "a")
2022-12-22 16:43:06 +00:00
@DocAddEvent.listen()
def modify_event(event):
if ExtractPlugin.is_loaded():
plg = ExtractPlugin.get_instance()
if plg.onimport:
all_annotations = plg.extract([event.citekey])
if all_annotations[0][1]:
plg._to_notes(all_annotations, plg.note_extension)
plg.ui.info(f"Imported {event.citekey} annotations.")