1
0
Fork 0
mirror of https://github.com/marty-oehme/scripts.git synced 2025-12-10 22:12:45 +00:00

Version 0.2

This commit is contained in:
Maboroshy 2017-09-13 22:10:38 +04:00 committed by GitHub
parent 031666520f
commit 8a2a34c114
5 changed files with 249 additions and 129 deletions

View file

@ -12,11 +12,39 @@ import md_link
import md_convert
import safe_path
try:
import watchdog.events
import watchdog.observers
except ImportError:
pass
def text_to_md(file_attrs):
File_attrs = collections.namedtuple('File_attrs', 'file_path folder_dir_path output_dir_path output_file')
"""
A named tuple which functions use to pass input data - data of files to be processed
:param file_path: full absolute path to the file to process
:param folder_dir_path: full absolute path to directory where 'media' and 'attachment' directories are
:param output_dir_path: full absolute path to directory where resulting text file will be stored
:param output_file: empty for new standalone text file with mtime in the name,
'*no mtime*' for or new standalone text file without mtime in the name
or full absolute path to the text file which will be appended with a new entry
"""
Note_attrs = collections.namedtuple('Note_attrs', 'input_file_path output_file_path text mtime title')
'''A named tuple which functions use to pass output data - data of notes to be written.
:param input_file_path: full absolute path to the file which was processed to this tuple
:param output_file_path: full absolute path to the output text file which should be written
:param text: content of the text file which should be written
:param mtime: modification time of input file as markdown headline to optionally prepend a text
:param title: title of a input file as markdown headline to optionally prepend a text'''
def text_to_md(file_attrs, topic_marker):
"""
This will process specified text file getting its topics and replacing urls with favicons and titles where possible
:param file_attrs: File_attrs named tuple
:param topic_marker: symbol(s) which start the 'topic' word, if such word present in text, it will go to 'topic.md'
:return: list of Note_attrs named tuple
"""
filename = os.path.splitext(os.path.basename(file_attrs.file_path))[0]
@ -28,8 +56,8 @@ def text_to_md(file_attrs):
except(UnicodeDecodeError):
return
topics = re.findall(file_attrs.topic_marker + '(\w*)', text)
text = re.sub(file_attrs.topic_marker + '\w*[ ]?', '', text).strip()
topics = re.findall(topic_marker + '(\w*)', text)
text = re.sub(topic_marker + '\w*[ ]?', '', text).strip()
if re.match('^http[s]?://[^\s]*$', text):
is_bookmark = True
@ -70,6 +98,7 @@ def text_to_md(file_attrs):
title=headline_title))
return output
def html_to_md(file_attrs, pandoc_bin='pandoc', pandoc_ver=''):
"""
This will move specified convert specified html file to markdown and move all in-line images to sub-folder at media directory
@ -89,6 +118,7 @@ def html_to_md(file_attrs, pandoc_bin='pandoc', pandoc_ver=''):
mtime='**{}** \n'.format(time.strftime('%x %a %X', mtime)),
title='')
def file_to_md(file_attrs, media_dir_name):
"""
This will move specified file to media_dir_name and put note with a reference to that file instead
@ -121,45 +151,100 @@ def file_to_md(file_attrs, media_dir_name):
title='# {}\n'.format(file.title))
if __name__ == '__main__':
File_attrs = collections.namedtuple('File_attrs', 'file_path folder_dir_path output_dir_path topic_marker output_file')
def make_flat_list(mixed_list, target_item_type=tuple):
"""
A named tuple which functions use to pass input data - data of files to be processed
:param file_path: full absolute path to the file to process
:param folder_dir_path: full absolute path to directory where 'media' and 'attachment' directories are
:param output_dir_path: full absolute path to directory where resulting text file will be stored
:param topic_marker: symbol(s) which start the 'topic' word (for text files)
:param output_file: empty for new standalone text file with mtime in the name,
'*no mtime*' for or new standalone text file without mtime in the name
or full absolute path to the text file which will be appended with a new entry
Make a list that has lists and 'target_item_type' as items flat, not recursive.
:param mixed_list: list to make flat
:param target_item_type: type of items in the flat list
:return: flat list of 'target_item_type'
"""
flat_list = []
for object in mixed_list:
if type(object) == list:
for item in object:
if type(item) == target_item_type:
flat_list.append(item)
elif type(object) == target_item_type:
flat_list.append(object)
return flat_list
Note_attrs = collections.namedtuple('Note_attrs', 'input_file_path output_file_path text mtime title')
'''A named tuple which functions use to pass output data - data of notes to be written.
:param input_file_path: full absolute path to the file which was processed to this tuple
:param output_file_path: full absolute path to the output text file which should be written
:param text: content of the text file which should be written
:param mtime: modification time of input file as markdown headline to optionally prepend a text
:param title: title of a input file as markdown headline to optionally prepend a text'''
def process_by_ext(file_attrs):
"""
This will run different functions to process specified File_attrs tuple based on file extension
:param file_attrs: File_attrs named tuple
:return: Note_attrs named tuple
"""
# print(file_attrs.file_path)
if file_attrs.file_path.endswith('.txt') or not os.path.splitext(file_attrs.file_path)[1]:
return text_to_md(file_attrs)
elif args.pandoc_bin and args.pandoc_ver and file_attrs.file_path.endswith(('.htm', '.html')):
return html_to_md(file_attrs, args.pandoc_bin, args.pandoc_ver)
elif file_attrs.file_path.endswith(('.jpg', '.png', '.gif')):
return file_to_md(file_attrs, 'media')
def process_by_path(file_path):
"""
Checks if the file is valid for processing and returns File_attrs tuple depending on its path
:param file_path: Absolute file path
:return: File_attrs named tuple
"""
if file_path.endswith(('.md', 'notes.sqlite')) \
or file_path.startswith((folder_dir + os.sep + 'media', folder_dir + os.sep + 'attachments')) \
or os.sep + '.' in file_path[len(folder_dir):] \
or '_files' + os.sep in file_path[len(folder_dir):]:
return
if file_path[:len(inbox_dir)] == inbox_dir:
if os.path.dirname(file_path) == inbox_dir:
return File_attrs(file_path=file_path, folder_dir_path=folder_dir,
output_dir_path=inbox_dir, output_file='')
else:
return file_to_md(file_attrs, 'attachments')
return File_attrs(file_path=file_path, folder_dir_path=folder_dir,
output_dir_path=inbox_dir,
output_file=os.path.dirname(file_path)[len(inbox_dir)+1:].replace(os.sep, ' - ') + '.md')
else:
return File_attrs(file_path=file_path, folder_dir_path=folder_dir,
output_dir_path=os.path.dirname(file_path), output_file='*no mtime*')
def process_by_ext(file_attrs):
"""
This will run different functions to process specified File_attrs tuple based on file extension
:param file_attrs: File_attrs named tuple
:return: Note_attrs named tuple
"""
if file_attrs.file_path.endswith('.txt') or not os.path.splitext(file_attrs.file_path)[1]:
return text_to_md(file_attrs, args.topic_marker)
elif args.pandoc_bin and args.pandoc_ver and file_attrs.file_path.endswith(('.htm', '.html')):
return html_to_md(file_attrs, args.pandoc_bin, args.pandoc_ver)
elif file_attrs.file_path.endswith(('.jpg', '.png', '.gif')):
return file_to_md(file_attrs, 'media')
else:
return file_to_md(file_attrs, 'attachments')
def write_note_and_delete(note_attrs): # TODO Test
"""
Create or append existing note files based on Note_attrs tuples data, then delete the source file
:param note_attrs: Note_attrs named tuple
"""
if os.path.isfile(note_attrs.output_file_path):
if os.path.dirname(note_attrs.output_file_path) == inbox_dir:
note_file_path = note_attrs.output_file_path
with open(note_file_path, 'r') as source:
content = note_attrs.mtime + note_attrs.text + '\n\n' + source.read()
else:
i = 1
while os.path.isfile(os.path.splitext(note_attrs.output_file_path)[0] + '_' + str(i) + '.md'):
i += 1
note_file_path = os.path.splitext(note_attrs.output_file_path)[0] + '_' + str(i) + '.md'
content = note_attrs.mtime + note_attrs.text
else:
note_file_path = note_attrs.output_file_path
if note_attrs.title:
content = note_attrs.title + note_attrs.text
else:
content = note_attrs.mtime + note_attrs.text
with open(note_file_path, 'w') as output:
output.write(content)
if os.path.isfile(note_file_path):
try:
os.remove(note_attrs.input_file_path)
except OSError:
pass
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser(description='A script to turn everything in the inbox directory to markdown notes.')
arg_parser.add_argument('-i', '--inbox', action='store', dest='inbox_dir', required=True,
help="Full absolute path to the inbox directory to organize")
@ -173,77 +258,88 @@ if __name__ == '__main__':
help="Command/path to run pandoc")
arg_parser.add_argument('-pv', '--pandoc-ver', action='store', dest='pandoc_ver', required=False,
help="Installed pandoc version")
arg_parser.add_argument('-w', '--watch', action='store_true', dest='watch_fs', required=False,
help="Watch and process new files as they appear after initial scan")
args = arg_parser.parse_args()
inbox_dir = args.inbox_dir
folder_dir = args.folder_dir
topic_marker = args.topic_marker
os.makedirs(inbox_dir, exist_ok=True)
os.makedirs(folder_dir + os.sep + 'media', exist_ok=True)
os.makedirs(folder_dir + os.sep + 'attachments', exist_ok=True)
# Prepare a list of File_attrs tuples for process_by_ext function, based on file location, older files first
file_list = []
if args.scan_folder:
for subfolder, dirs, files in os.walk(folder_dir):
for file_path in sorted([subfolder + os.sep + file for file in files], key=os.path.getmtime):
if os.path.isfile(file_path) \
and not file_path.endswith(('.md', 'notes.sqlite')) \
and not file_path.startswith((inbox_dir, folder_dir + os.sep + 'media', folder_dir + os.sep + 'attachments')) \
and os.sep + '.' not in file_path.replace(folder_dir, '') \
and '_files' + os.sep not in file_path.replace(folder_dir, ''):
file_list.append([File_attrs(file_path=file_path, folder_dir_path=folder_dir, output_dir_path=os.path.dirname(file_path),
topic_marker=topic_marker, output_file='*no mtime*')])
scan_path = folder_dir
else:
scan_path = inbox_dir
for file_path in sorted([inbox_dir + os.sep + path for path in os.listdir(inbox_dir)], key=os.path.getmtime):
if os.path.isdir(file_path) \
and not os.path.basename(file_path).startswith('.') \
and not file_path.endswith('_files'):
for sub_file in sorted([file_path + os.sep + path for path in os.listdir(file_path)], key=os.path.getmtime):
if not sub_file.endswith('.md') \
and not os.path.basename(sub_file).startswith('.'):
file_list.append([File_attrs(file_path=sub_file, folder_dir_path=folder_dir, output_dir_path=inbox_dir,
topic_marker=topic_marker, output_file=os.path.basename(file_path) + '.md')])
else:
if os.path.isfile(file_path) \
and not file_path.endswith('.md') \
and not os.path.basename(file_path).startswith('.'):
file_list.append([File_attrs(file_path=file_path, folder_dir_path=folder_dir, output_dir_path=inbox_dir,
topic_marker=topic_marker, output_file='')])
file_list = []
for dir, subdirs, files in os.walk(scan_path):
for file_path in sorted([dir + os.sep + file for file in files], key=os.path.getmtime):
file_attrs = process_by_path(file_path)
if file_attrs:
file_list.append([file_attrs])
# Run process_by_ext for each File_attrs tuple putting resulting Note_attrs tuples to write_list
write_list = multiprocessing.dummy.Pool(100).starmap(process_by_ext, file_list)
# Due to text_to_md outputs list of Note_attrs tuples, this should turn write_list to a flat list
flat_write_list = []
for object in write_list:
if type(object) == list:
for item in object:
if type(item) == Note_attrs:
flat_write_list.append(item)
elif type(object) == Note_attrs:
flat_write_list.append(object)
flat_write_list = make_flat_list(write_list, Note_attrs)
# Create or append existing text files based on Note_attrs tuples data
for note_attrs in flat_write_list:
write_note_and_delete(note_attrs)
if args.watch_fs:
try:
with open(note_attrs.output_file_path, 'r') as source:
content = note_attrs.mtime + note_attrs.text + '\n\n' + source.read()
except OSError:
if note_attrs.title:
content = note_attrs.title + note_attrs.text
else:
content = note_attrs.mtime + note_attrs.text
import watchdog.events
import watchdog.observers
except ImportError:
print("Can't find Watchdog module. Watching for changes won't work.")
exit(1)
with open(note_attrs.output_file_path, 'w') as output:
output.write(content)
if os.path.isfile(note_attrs.output_file_path):
try:
os.remove(note_attrs.input_file_path)
except OSError:
pass
class FsEventHandler(watchdog.events.FileSystemEventHandler):
def on_any_event(self, event):
if event.is_directory:
return
elif event.event_type == 'created':
file_path = event.src_path
elif event.event_type == 'moved':
file_path = event.dest_path
else:
return
if platform.system() == 'Linux':
os.system('notify-send "-a" "Inbox script" "Your inbox is organized"') # TODO maybe change to gi.repository: Notify
file_attrs = process_by_path(file_path)
if file_attrs:
# Wait for all the web page resources saved/synced
if file_path.endswith(('.htm', '.html')): time.sleep(2)
obj_to_write = process_by_ext(file_attrs)
else:
return
if type(obj_to_write) == list:
for note_attrs in obj_to_write:
write_note_and_delete(note_attrs)
else:
write_note_and_delete(obj_to_write)
event_handler = FsEventHandler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, scan_path, recursive=True)
observer.start()
try:
while True:
time.sleep(5)
except:
observer.stop()
observer.join()
# if platform.system() == 'Linux':
# os.system('notify-send "-a" "Inbox script" "Your inbox is organized"') # TODO maybe change to gi.repository: Notify