Switch layout to src folder layout
This commit is contained in:
parent
0b8bddb588
commit
97035d8e4c
12 changed files with 196 additions and 17 deletions
9
src/habitmove/__init__.py
Normal file
9
src/habitmove/__init__.py
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
# init.py
|
||||
import sys
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
from importlib.metadata import version as metadata_version
|
||||
else:
|
||||
from importlib_metadata import version as metadata_version
|
||||
|
||||
__version__ = str(metadata_version(__name__))
|
||||
42
src/habitmove/cli.py
Executable file
42
src/habitmove/cli.py
Executable file
|
|
@ -0,0 +1,42 @@
|
|||
#!/usr/bin/env python
|
||||
import habitmove.schema as schema
|
||||
import habitmove.habits as habits
|
||||
import habitmove.repetitions as rep
|
||||
import habitmove.nomie as nomie
|
||||
from habitmove.nomiedata import NomieImport
|
||||
|
||||
import click
|
||||
from . import __version__
|
||||
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
def migrate(data: NomieImport):
|
||||
db = schema.migrate("output.db")
|
||||
if not db:
|
||||
raise ConnectionError
|
||||
|
||||
if data.trackers is not None:
|
||||
|
||||
habitlist = habits.migrate(db, data.trackers)
|
||||
|
||||
if data.events is not None:
|
||||
rep.migrate(db, habitlist, data.events)
|
||||
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.version_option(version=__version__)
|
||||
@click.argument("inputfile")
|
||||
def main(inputfile):
|
||||
# TODO test and error gracefully for no input given
|
||||
# file = sys.argv[1]
|
||||
data = nomie.get_data(inputfile)
|
||||
migrate(data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
62
src/habitmove/habits.py
Normal file
62
src/habitmove/habits.py
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
from habitmove.loopdata import Habit
|
||||
|
||||
|
||||
def migrate(db, trackers):
|
||||
c = db.cursor()
|
||||
habits = trackers_to_habits(trackers)
|
||||
for habit in habits:
|
||||
existing_habit = check_habit_duplicate(c, habit)
|
||||
if not existing_habit:
|
||||
add_to_database(c, habit)
|
||||
else:
|
||||
print(f"Found duplicate Habit: {existing_habit} - skipping.")
|
||||
return habits
|
||||
|
||||
|
||||
# By default set goal to half of max value
|
||||
NOMIE_MAX_TO_TARGET_VALUE_RATIO = 2
|
||||
|
||||
|
||||
def trackers_to_habits(trackers):
|
||||
habits = []
|
||||
for t in trackers:
|
||||
habits.append(
|
||||
Habit(
|
||||
archived=t.hidden,
|
||||
color=11 if t.score != "-1" else 0,
|
||||
description=t.tag,
|
||||
name=f"{t.emoji} {t.label}",
|
||||
unit="" if t.uom == "num" else t.uom,
|
||||
uuid=t.id,
|
||||
)
|
||||
)
|
||||
if t.type == "range" and len(habits) > 0:
|
||||
habits[-1].type = 1
|
||||
# nomie only has concept of max value,
|
||||
# use a percentage of it for Loop range target
|
||||
habits[-1].target_value = int(t.max) // NOMIE_MAX_TO_TARGET_VALUE_RATIO
|
||||
return habits
|
||||
|
||||
|
||||
def check_habit_duplicate(cursor, habit):
|
||||
cursor.execute("select name from Habits where uuid = ?", [habit.uuid])
|
||||
name = cursor.fetchone()
|
||||
if name:
|
||||
return name[0]
|
||||
return False
|
||||
|
||||
|
||||
def add_to_database(cursor, habit):
|
||||
"""Takes a habit in the form of a dictionary and inserts it into the Habits table.
|
||||
Parameters:
|
||||
cursor (db.cursor): SQL executing cursor
|
||||
habit (Habit): A Loop habit to be added to the database.
|
||||
"""
|
||||
habit_data = habit.__dict__
|
||||
placeholder = ", ".join("?" * len(habit_data))
|
||||
columns = ", ".join(habit_data.keys())
|
||||
sql = "insert into `{table}` ({columns}) values ({values});".format(
|
||||
table="Habits", columns=columns, values=placeholder
|
||||
)
|
||||
values = list(habit_data.values())
|
||||
cursor.execute(sql, values)
|
||||
35
src/habitmove/loopdata.py
Normal file
35
src/habitmove/loopdata.py
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
from typing import Optional
|
||||
from dataclasses import dataclass
|
||||
from uuid import uuid4
|
||||
|
||||
# A loop Habit representation.
|
||||
# Tracks anything whose value can be encapsulated in a numerical value.
|
||||
@dataclass
|
||||
class Habit:
|
||||
name: str
|
||||
type: int = 0 # 1 is range counter
|
||||
archived: int = 0
|
||||
color: int = 0
|
||||
highlight: int = 0
|
||||
freq_den: int = 1
|
||||
freq_num: int = 1
|
||||
target_value: int = 0
|
||||
description: str = ""
|
||||
question: str = ""
|
||||
unit: str = ""
|
||||
position: int = 0
|
||||
uuid: str = ""
|
||||
|
||||
# TODO test post init uuid setting
|
||||
def __post_init__(self):
|
||||
if not self.uuid or self.uuid == "":
|
||||
self.uuid = uuid4().hex
|
||||
|
||||
|
||||
# A Loop repetition representation, containing only the bare minimum
|
||||
# for its successful entry into the Loop Habit Tracker database.
|
||||
@dataclass
|
||||
class Repetition:
|
||||
habit_uuid: str
|
||||
timestamp: int
|
||||
value: Optional[int]
|
||||
108
src/habitmove/nomie.py
Normal file
108
src/habitmove/nomie.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import re
|
||||
from click import secho, echo
|
||||
from habitmove.nomiedata import Tracker, Event, Activity, NomieImport
|
||||
|
||||
|
||||
def load_file(filename):
|
||||
with open(filename) as f:
|
||||
nomie_data = json.load(f)
|
||||
return nomie_data
|
||||
|
||||
|
||||
# generate a yes/no cli question with a default answer
|
||||
def confirmation_question(question, default_no=True):
|
||||
choices = " [y/N]: " if default_no else " [Y/n]: "
|
||||
default_answer = "n" if default_no else "y"
|
||||
reply = str(input(question + choices)).lower().strip() or default_answer
|
||||
if reply[0] == "y":
|
||||
return True
|
||||
if reply[0] == "n":
|
||||
return False
|
||||
else:
|
||||
return False if default_no else True
|
||||
|
||||
|
||||
# display stats and ask user to confirm if they seem okay
|
||||
def verify_continue(data: NomieImport):
|
||||
trackers = ""
|
||||
for t in data.trackers:
|
||||
trackers += t.label + ", "
|
||||
trackers = trackers[:-2]
|
||||
|
||||
activity_count = 0
|
||||
for e in data.events:
|
||||
activity_count += len(e.activities) if e.activities else 0
|
||||
|
||||
secho(f"Exporting from nomie {data.version}:", fg="green")
|
||||
echo(f"Found trackers: {trackers}")
|
||||
echo(
|
||||
f"Found events: {len(data.events)} entries, containing {activity_count} individual activities."
|
||||
)
|
||||
if not confirmation_question("Do you want to continue?", default_no=False):
|
||||
echo("Aborted.")
|
||||
exit(0)
|
||||
|
||||
|
||||
def get_trackers(raw_trackers):
|
||||
tracker_list = list[Tracker]()
|
||||
for tracker_tuple in raw_trackers.items():
|
||||
tracker_list.append(Tracker(**tracker_tuple[1]))
|
||||
return tracker_list
|
||||
|
||||
|
||||
def get_events(raw_events, tracker_list):
|
||||
events = list[Event]()
|
||||
for event in raw_events:
|
||||
event["id"] = event["_id"]
|
||||
event.pop("_id")
|
||||
event["text"] = event["note"]
|
||||
event.pop("note")
|
||||
|
||||
activities = get_activities_for_event(event["text"], tracker_list)
|
||||
|
||||
events.append(Event(**event, activities=activities))
|
||||
|
||||
return events
|
||||
|
||||
|
||||
def extract_tags_from_text(text, tagmarker="#"):
|
||||
"""Return lists of tuples of all event tags found in text.
|
||||
Parameters:
|
||||
text (str): The text to search through.
|
||||
tagmarker (str): Optional character marking beginning of tag, defaults to '#'.
|
||||
Returns:
|
||||
tags (list): List of tuples in the form [('tag', '3'), ('anothertag', '')].
|
||||
"""
|
||||
string_tags = re.findall(rf"{tagmarker}(\w+)(?:\((\d+)\))?", text)
|
||||
tags_with_int_counters = []
|
||||
for tag in string_tags:
|
||||
tags_with_int_counters.append((tag[0], None if tag[1] == "" else int(tag[1])))
|
||||
return tags_with_int_counters
|
||||
|
||||
|
||||
def get_activities_for_event(event_text, tracker_list):
|
||||
activities = []
|
||||
tag_list = extract_tags_from_text(event_text)
|
||||
for tracker in tracker_list:
|
||||
for tag in tag_list:
|
||||
if tracker.tag in tag[0]:
|
||||
activities.append(Activity(tracker=tracker, value=tag[1]))
|
||||
return activities
|
||||
|
||||
|
||||
# return the data belonging to nomie
|
||||
def get_data(file, interactive=True):
|
||||
raw_data = load_file(file)
|
||||
nomie_version = raw_data["nomie"]["number"]
|
||||
|
||||
tracker_list = get_trackers(raw_data["trackers"])
|
||||
event_list = get_events(raw_data["events"], tracker_list)
|
||||
|
||||
data = NomieImport(nomie_version, tracker_list, event_list)
|
||||
if interactive:
|
||||
verify_continue(data)
|
||||
|
||||
return data
|
||||
58
src/habitmove/nomiedata.py
Normal file
58
src/habitmove/nomiedata.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
from typing import Optional, Any
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
# A nomie habit tracker. Tracks anything whose value can be encapsulated in a numerical value.
|
||||
@dataclass(frozen=True)
|
||||
class Tracker:
|
||||
color: str
|
||||
emoji: str
|
||||
hidden: bool
|
||||
id: str
|
||||
ignore_zeros: bool
|
||||
label: str
|
||||
math: str
|
||||
one_tap: bool
|
||||
tag: str
|
||||
type: str
|
||||
uom: str
|
||||
# TODO no idea what include does
|
||||
include: str
|
||||
min: int = 0
|
||||
max: int = 0
|
||||
goal: int = 0
|
||||
default: int = 0
|
||||
# TODO score can be string (if custom) or int (if simple good/bad)
|
||||
score: str = ""
|
||||
score_calc: list[dict[str, Any]] = field(default_factory=lambda: [])
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Activity:
|
||||
tracker: Tracker
|
||||
value: int = 1
|
||||
|
||||
|
||||
# A nomie note. Records any circumstance of 'something happened' through prose.
|
||||
# These are undigested events, whose changed trackers are still encapsulated
|
||||
# in the 'note' field as continuous text.
|
||||
@dataclass(frozen=True)
|
||||
class Event:
|
||||
id: str
|
||||
start: int
|
||||
end: int
|
||||
text: str
|
||||
activities: list[Activity] = field(default_factory=lambda: [])
|
||||
score: int = 0
|
||||
lat: float = 0.0
|
||||
lng: float = 0.0
|
||||
location: str = ""
|
||||
modified: bool = False
|
||||
offset: str = "" # local timezone offset?
|
||||
source: str = "n5" # nomie version
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class NomieImport:
|
||||
version: str
|
||||
trackers: list[Tracker]
|
||||
events: list[Event]
|
||||
107
src/habitmove/repetitions.py
Normal file
107
src/habitmove/repetitions.py
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
import sqlite3
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
from habitmove.loopdata import Habit, Repetition
|
||||
from habitmove.nomiedata import Event
|
||||
|
||||
|
||||
def migrate(db: sqlite3.Connection, habits: list[Habit], events: list[Event]) -> None:
|
||||
"""Move Loop Activities contained in all Events matching Habits passed in into database.
|
||||
:param db: Database to populate.
|
||||
:param habits: List of Habits to find matching repetitions for.
|
||||
:param events: List of events to find repetitions in.
|
||||
"""
|
||||
c = db.cursor()
|
||||
habits_with_sql_id = habit_list_add_ids(c, habits)
|
||||
repetitions = get_all_repetitions(habits_with_sql_id, events)
|
||||
for rep in repetitions:
|
||||
add_to_database(c, habits_with_sql_id, rep)
|
||||
|
||||
|
||||
LOOP_RANGE_VALUE_MULTIPLIER = 1000
|
||||
|
||||
|
||||
def get_all_repetitions(
|
||||
habits: dict[int, Habit], events: list[Event]
|
||||
) -> list[Repetition]:
|
||||
"""Return list of all repetitions found in events passed in.
|
||||
:param habits: Dict of habits with sql_ids that repetitions can be for.
|
||||
:param events: List of events to search through for repetitions.
|
||||
:return repetitions: List of Loop repetitions.
|
||||
"""
|
||||
repetitions = []
|
||||
for event in events:
|
||||
for activity in event.activities:
|
||||
for habit in habits.values():
|
||||
# TODO Fix reaching a layer too far into activity -> tracker
|
||||
if habit.uuid == activity.tracker.id:
|
||||
rep = Repetition(
|
||||
habit_uuid=habit.uuid, timestamp=event.end, value=2
|
||||
)
|
||||
if habit.type == 1 and activity.value:
|
||||
rep.value = activity.value * LOOP_RANGE_VALUE_MULTIPLIER
|
||||
repetitions.append(rep)
|
||||
|
||||
return repetitions
|
||||
|
||||
|
||||
# TODO possibly just get rid of this entirely
|
||||
def habit_list_add_ids(c: sqlite3.Cursor, habitlist: list[Habit]) -> dict[int, Habit]:
|
||||
"""Return the collection of habits with their sqlite id added.
|
||||
:param c: SQL cursor of database to query.
|
||||
:param habitlist: Habits to get sql IDs for.
|
||||
:return habit_id_dict: The habit collection as a dict with the keys
|
||||
consisting of the habit's sqlite database ID.
|
||||
"""
|
||||
with_id = {}
|
||||
for h in habitlist:
|
||||
sql_id = fetch_habit_id(c, h.uuid or "")
|
||||
with_id[sql_id] = h
|
||||
|
||||
return with_id
|
||||
|
||||
|
||||
def fetch_habit_id(cursor: sqlite3.Cursor, uuid: str) -> Optional[int]:
|
||||
"""Return sqlite internal id for habit with uuid.
|
||||
:param c: SQL cursor of database to query.
|
||||
:param uuid: Unique id of habit to query for.
|
||||
:return id: SQLite internal id for habit queried for.
|
||||
"""
|
||||
cursor.execute("select id from Habits where uuid = ?", ([uuid]))
|
||||
id = cursor.fetchone()
|
||||
if id is not None:
|
||||
return id[0]
|
||||
|
||||
|
||||
def add_to_database(
|
||||
cursor: sqlite3.Cursor, habits: dict[int, Habit], repetition: Repetition
|
||||
) -> None:
|
||||
"""Insert the repetition into a sqlite3 table suitable for Loop.
|
||||
:param c: SQL cursor of database to query.
|
||||
:sql_id: Internal sqlite database id of the habit the repetition belongs to.
|
||||
"""
|
||||
for sql_id, habit in habits.items():
|
||||
if repetition.habit_uuid == habit.uuid:
|
||||
try:
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO
|
||||
Repetitions(id, habit, timestamp, value)
|
||||
VALUES (NULL, ?, ?, ?)
|
||||
""",
|
||||
(sql_id, repetition.timestamp, repetition.value),
|
||||
)
|
||||
except sqlite3.IntegrityError:
|
||||
# TODO better error handling
|
||||
print(
|
||||
f"{sql_id}, {habit.name}: timestamp {datetime.fromtimestamp(repetition.timestamp/1000)} not unique, moving timestamp slightly."
|
||||
)
|
||||
add_to_database(
|
||||
cursor,
|
||||
habits,
|
||||
Repetition(
|
||||
repetition.habit_uuid,
|
||||
repetition.timestamp + 1,
|
||||
repetition.value,
|
||||
),
|
||||
)
|
||||
74
src/habitmove/schema.py
Normal file
74
src/habitmove/schema.py
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
import sqlite3
|
||||
|
||||
|
||||
def create_database(name):
|
||||
"""create a database connection to the SQLite database
|
||||
specified by db_file
|
||||
:param db_file: database file
|
||||
:return: Connection object or None
|
||||
"""
|
||||
conn = None
|
||||
try:
|
||||
conn = sqlite3.connect(name)
|
||||
return conn
|
||||
except sqlite3.Error as e:
|
||||
print(e)
|
||||
|
||||
return conn
|
||||
|
||||
|
||||
def create_tables(db):
|
||||
c = db.cursor()
|
||||
c.execute(
|
||||
""" CREATE TABLE IF NOT EXISTS Habits (
|
||||
id integer PRIMARY KEY AUTOINCREMENT,
|
||||
archived integer,
|
||||
color integer,
|
||||
description text,
|
||||
freq_den integer,
|
||||
freq_num integer,
|
||||
highlight integer,
|
||||
name text,
|
||||
position integer,
|
||||
reminder_hour integer,
|
||||
reminder_min integer,
|
||||
reminder_days integer NOT NULL DEFAULT 127,
|
||||
type integer NOT NULL DEFAULT 0,
|
||||
target_type integer NOT NULL DEFAULT 0,
|
||||
target_value real NOT NULL DEFAULT 0,
|
||||
unit text NOT NULL DEFAULT "",
|
||||
question text,
|
||||
uuid text
|
||||
); """
|
||||
)
|
||||
c.execute(
|
||||
""" CREATE TABLE IF NOT EXISTS Repetitions (
|
||||
id integer PRIMARY KEY AUTOINCREMENT,
|
||||
habit integer NOT NULL REFERENCES Habits(id),
|
||||
timestamp integer NOT NULL,
|
||||
value integer NOT NULL
|
||||
); """
|
||||
)
|
||||
|
||||
|
||||
def create_constraints(db):
|
||||
c = db.cursor()
|
||||
c.execute(
|
||||
""" CREATE UNIQUE INDEX IF NOT EXISTS idx_repetitions_habit_timestamp
|
||||
on Repetitions( habit, timestamp);
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def create_pragma(db):
|
||||
c = db.cursor()
|
||||
c.execute(""" PRAGMA user_version = 24; """)
|
||||
c.execute(""" PRAGMA schema_version = 30; """)
|
||||
|
||||
|
||||
def migrate(name):
|
||||
db = create_database(name)
|
||||
create_tables(db)
|
||||
create_constraints(db)
|
||||
create_pragma(db)
|
||||
return db
|
||||
Loading…
Add table
Add a link
Reference in a new issue