diff --git a/crud.py b/crud.py index b542278..1e5ac73 100644 --- a/crud.py +++ b/crud.py @@ -1,5 +1,293 @@ +import json +from datetime import datetime, timezone + from lnbits.db import Database +from lnbits.helpers import urlsafe_short_hash + +from .models import ( + CreateTask, + CreateTaskCompletion, + Task, + TaskCompletion, + TasksSettings, +) db = Database("ext_tasks") -# CRUD functions are filled in by the next commit. + +def _serialize_task_jsonb(task_dict: dict) -> dict: + """JSON-encode the columns we store as TEXT (participants, categories, + recurrence) so db.insert/update sees strings, not Python objects.""" + out = dict(task_dict) + if isinstance(out.get("participants"), list): + out["participants"] = json.dumps( + [p if isinstance(p, dict) else p.dict() for p in out["participants"]] + ) + if isinstance(out.get("categories"), list): + out["categories"] = json.dumps(out["categories"]) + rec = out.get("recurrence") + if rec is not None and not isinstance(rec, str): + out["recurrence"] = json.dumps(rec if isinstance(rec, dict) else rec.dict()) + return out + + +def _parse_task_row(row) -> dict: + """Decode TEXT-JSON columns back into Python structures so Pydantic can + rebuild the Task without each model needing a string-parser validator on + every read path.""" + data = dict(row) + if isinstance(data.get("participants"), str): + data["participants"] = json.loads(data["participants"]) if data["participants"] else [] + if isinstance(data.get("categories"), str): + data["categories"] = json.loads(data["categories"]) if data["categories"] else [] + rec = data.get("recurrence") + if isinstance(rec, str): + data["recurrence"] = json.loads(rec) if rec else None + return data + + +################################ Tasks ################################ + + +async def create_task(pubkey: str, data: CreateTask) -> Task: + """Insert a task. The d_tag is generated locally; subsequent edits keep + the same d_tag so the published kind 31922 stays addressable.""" + task_id = urlsafe_short_hash() + d_tag = task_id + task = Task( + id=task_id, + wallet=data.wallet or "", + pubkey=pubkey, + d_tag=d_tag, + title=data.title, + start_date=data.start_date, + end_date=data.end_date, + description=data.description, + location=data.location, + status=data.status, + event_type=data.event_type, + participants=data.participants, + categories=data.categories, + recurrence=data.recurrence, + time=datetime.now(timezone.utc), + ) + await db.execute( + """ + INSERT INTO tasks.tasks ( + id, wallet, pubkey, d_tag, title, start_date, end_date, + description, location, status, event_type, participants, + categories, recurrence, nostr_event_id, nostr_event_created_at, + time + ) VALUES ( + :id, :wallet, :pubkey, :d_tag, :title, :start_date, :end_date, + :description, :location, :status, :event_type, :participants, + :categories, :recurrence, :nostr_event_id, :nostr_event_created_at, + :time + ) + """, + _serialize_task_jsonb(task.dict()), + ) + return task + + +async def update_task(task: Task) -> Task: + await db.execute( + """ + UPDATE tasks.tasks SET + wallet = :wallet, + title = :title, + start_date = :start_date, + end_date = :end_date, + description = :description, + location = :location, + status = :status, + event_type = :event_type, + participants = :participants, + categories = :categories, + recurrence = :recurrence, + nostr_event_id = :nostr_event_id, + nostr_event_created_at = :nostr_event_created_at + WHERE id = :id + """, + _serialize_task_jsonb(task.dict()), + ) + return task + + +async def get_task(task_id: str) -> Task | None: + row: dict | None = await db.fetchone( + "SELECT * FROM tasks.tasks WHERE id = :id", + {"id": task_id}, + ) + return Task(**_parse_task_row(row)) if row else None + + +async def get_task_by_address(pubkey: str, d_tag: str) -> Task | None: + row: dict | None = await db.fetchone( + "SELECT * FROM tasks.tasks WHERE pubkey = :pubkey AND d_tag = :d_tag", + {"pubkey": pubkey, "d_tag": d_tag}, + ) + return Task(**_parse_task_row(row)) if row else None + + +async def get_tasks(wallet_ids: str | list[str]) -> list[Task]: + if isinstance(wallet_ids, str): + wallet_ids = [wallet_ids] + q = ",".join([f"'{wid}'" for wid in wallet_ids]) + rows: list[dict] = await db.fetchall( + f"SELECT * FROM tasks.tasks WHERE wallet IN ({q}) ORDER BY time DESC" + ) + return [Task(**_parse_task_row(row)) for row in rows] + + +async def get_all_tasks() -> list[Task]: + """Admin-only. All tasks regardless of wallet, including synced-from-Nostr + rows (wallet=''). Order by recency.""" + rows: list[dict] = await db.fetchall( + "SELECT * FROM tasks.tasks ORDER BY time DESC" + ) + return [Task(**_parse_task_row(row)) for row in rows] + + +async def get_public_tasks() -> list[Task]: + """Public listing: all tasks. Gated upstream by the public_listing + settings flag.""" + rows: list[dict] = await db.fetchall( + "SELECT * FROM tasks.tasks ORDER BY start_date ASC" + ) + return [Task(**_parse_task_row(row)) for row in rows] + + +async def get_tasks_by_pubkey(pubkey: str) -> list[Task]: + rows: list[dict] = await db.fetchall( + "SELECT * FROM tasks.tasks WHERE pubkey = :pubkey ORDER BY time DESC", + {"pubkey": pubkey}, + ) + return [Task(**_parse_task_row(row)) for row in rows] + + +async def delete_task(task_id: str) -> None: + await db.execute("DELETE FROM tasks.tasks WHERE id = :id", {"id": task_id}) + + +async def delete_task_completions(task_address: str) -> None: + await db.execute( + "DELETE FROM tasks.completions WHERE task_address = :addr", + {"addr": task_address}, + ) + + +############################# Completions ############################# + + +async def create_completion( + pubkey: str, data: CreateTaskCompletion, *, nostr_event_id: str | None = None, + nostr_created_at: int | None = None, +) -> TaskCompletion: + """Insert (or replace on conflict) a completion for (task_address, pubkey, + occurrence). The newest event_id always wins — we let the caller pass an + already-published Nostr id, otherwise generate a local hash for an unsent + completion that the Nostr publisher will replace on the next round-trip.""" + cid = nostr_event_id or urlsafe_short_hash() + completion = TaskCompletion( + id=cid, + task_address=data.task_address, + pubkey=pubkey, + occurrence=data.occurrence, + task_status=data.task_status, + completed_at=data.completed_at, + notes=data.notes, + nostr_created_at=nostr_created_at or int(datetime.now(timezone.utc).timestamp()), + time=datetime.now(timezone.utc), + ) + + # Replace any existing claim by this pubkey on this (address, occurrence). + existing = await get_completion( + data.task_address, pubkey, data.occurrence + ) + if existing: + await db.execute( + "DELETE FROM tasks.completions WHERE id = :id", {"id": existing.id} + ) + + await db.execute( + """ + INSERT INTO tasks.completions ( + id, task_address, pubkey, occurrence, task_status, + completed_at, notes, nostr_created_at, time + ) VALUES ( + :id, :task_address, :pubkey, :occurrence, :task_status, + :completed_at, :notes, :nostr_created_at, :time + ) + """, + completion.dict(), + ) + return completion + + +async def get_completion( + task_address: str, pubkey: str, occurrence: str | None +) -> TaskCompletion | None: + if occurrence is None: + row: dict | None = await db.fetchone( + """ + SELECT * FROM tasks.completions + WHERE task_address = :addr AND pubkey = :pubkey + AND occurrence IS NULL + """, + {"addr": task_address, "pubkey": pubkey}, + ) + else: + row = await db.fetchone( + """ + SELECT * FROM tasks.completions + WHERE task_address = :addr AND pubkey = :pubkey + AND occurrence = :occurrence + """, + {"addr": task_address, "pubkey": pubkey, "occurrence": occurrence}, + ) + return TaskCompletion(**dict(row)) if row else None + + +async def get_completions_for_task(task_address: str) -> list[TaskCompletion]: + rows: list[dict] = await db.fetchall( + "SELECT * FROM tasks.completions WHERE task_address = :addr " + "ORDER BY nostr_created_at DESC", + {"addr": task_address}, + ) + return [TaskCompletion(**dict(row)) for row in rows] + + +async def get_completion_by_id(completion_id: str) -> TaskCompletion | None: + row: dict | None = await db.fetchone( + "SELECT * FROM tasks.completions WHERE id = :id", + {"id": completion_id}, + ) + return TaskCompletion(**dict(row)) if row else None + + +async def delete_completion(completion_id: str) -> None: + await db.execute( + "DELETE FROM tasks.completions WHERE id = :id", + {"id": completion_id}, + ) + + +############################## Settings ############################## + + +async def get_settings() -> TasksSettings: + row: dict | None = await db.fetchone( + "SELECT * FROM tasks.settings WHERE id = 1" + ) + if row: + return TasksSettings(**dict(row)) + return TasksSettings() + + +async def update_settings(settings: TasksSettings) -> TasksSettings: + await db.execute( + "UPDATE tasks.settings SET public_listing = :public_listing WHERE id = 1", + {"public_listing": settings.public_listing}, + ) + return settings diff --git a/migrations.py b/migrations.py index be88854..93c27a9 100644 --- a/migrations.py +++ b/migrations.py @@ -1 +1,76 @@ -# Migrations are filled in by the next commit. +async def m001_initial(db): + """ + Tasks table (NIP-52 kind 31922 cache) and completions table (kind 31925 + cache). The (pubkey, d_tag) pair is the parameterized-replaceable key. + """ + await db.execute( + f""" + CREATE TABLE tasks.tasks ( + id TEXT PRIMARY KEY, + wallet TEXT NOT NULL, + pubkey TEXT NOT NULL, + d_tag TEXT NOT NULL, + title TEXT NOT NULL, + start_date TEXT NOT NULL, + end_date TEXT, + description TEXT NOT NULL DEFAULT '', + location TEXT, + status TEXT NOT NULL DEFAULT 'pending', + event_type TEXT NOT NULL DEFAULT 'task', + participants TEXT, + categories TEXT, + recurrence TEXT, + nostr_event_id TEXT, + nostr_event_created_at INTEGER, + time TIMESTAMP NOT NULL DEFAULT {db.timestamp_now} + ); + """ + ) + await db.execute( + "CREATE UNIQUE INDEX tasks_pubkey_dtag_idx " + "ON tasks.tasks (pubkey, d_tag)" + ) + + await db.execute( + f""" + CREATE TABLE tasks.completions ( + id TEXT PRIMARY KEY, + task_address TEXT NOT NULL, + pubkey TEXT NOT NULL, + occurrence TEXT, + task_status TEXT NOT NULL DEFAULT 'claimed', + completed_at INTEGER, + notes TEXT NOT NULL DEFAULT '', + nostr_created_at INTEGER NOT NULL, + time TIMESTAMP NOT NULL DEFAULT {db.timestamp_now} + ); + """ + ) + # occurrence is NULL for one-shot tasks; the (task_address, pubkey, + # occurrence) tuple identifies "the latest claim by this user for this + # specific occurrence". + await db.execute( + "CREATE INDEX tasks_completions_address_idx " + "ON tasks.completions (task_address)" + ) + await db.execute( + "CREATE INDEX tasks_completions_pubkey_idx " + "ON tasks.completions (pubkey)" + ) + + +async def m002_settings(db): + """Singleton settings row for the admin UI.""" + await db.execute( + """ + CREATE TABLE IF NOT EXISTS tasks.settings ( + id INTEGER PRIMARY KEY DEFAULT 1, + public_listing BOOLEAN NOT NULL DEFAULT FALSE + ) + """ + ) + await db.execute( + "INSERT INTO tasks.settings (id, public_listing) " + "SELECT 1, FALSE WHERE NOT EXISTS " + "(SELECT 1 FROM tasks.settings WHERE id = 1)" + ) diff --git a/models.py b/models.py index 3270f77..1874b34 100644 --- a/models.py +++ b/models.py @@ -1 +1,156 @@ -# Pydantic models are filled in by the next commit. +import json +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field, validator + + +TaskStatus = Literal["claimed", "in-progress", "completed", "blocked", "cancelled"] + + +class Participant(BaseModel): + pubkey: str + type: str | None = None # "required" | "optional" | "organizer" + + +class Recurrence(BaseModel): + frequency: Literal["daily", "weekly"] + day_of_week: str | None = None # for weekly: monday..sunday + end_date: str | None = None # YYYY-MM-DD, optional cutoff + + +class CreateTask(BaseModel): + wallet: str | None = None # filled from caller's wallet if absent + title: str + start_date: str # YYYY-MM-DD or ISO datetime + end_date: str | None = None + description: str = "" + location: str | None = None + status: str = "pending" + event_type: str = "task" # 'task' | 'announcement' + participants: list[Participant] = Field(default_factory=list) + categories: list[str] = Field(default_factory=list) + recurrence: Recurrence | None = None + + @validator("participants", pre=True) + def parse_participants(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] + + @validator("categories", pre=True) + def parse_categories(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] + + @validator("recurrence", pre=True) + def parse_recurrence(cls, v): + if isinstance(v, str): + return json.loads(v) if v else None + return v + + +class Task(BaseModel): + id: str + wallet: str + pubkey: str + d_tag: str + title: str + start_date: str + end_date: str | None = None + description: str = "" + location: str | None = None + status: str = "pending" + event_type: str = "task" + participants: list[Participant] = Field(default_factory=list) + categories: list[str] = Field(default_factory=list) + recurrence: Recurrence | None = None + nostr_event_id: str | None = None + nostr_event_created_at: int | None = None + time: datetime + + @validator("participants", pre=True) + def parse_participants(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] + + @validator("categories", pre=True) + def parse_categories(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] + + @validator("recurrence", pre=True) + def parse_recurrence(cls, v): + if isinstance(v, str): + return json.loads(v) if v else None + return v + + @property + def address(self) -> str: + """NIP-52 replaceable-event address (31922:pubkey:d-tag).""" + return f"31922:{self.pubkey}:{self.d_tag}" + + +class PublicTask(BaseModel): + """Trimmed task payload for the public/anonymous endpoint.""" + + id: str + pubkey: str + d_tag: str + title: str + start_date: str + end_date: str | None = None + description: str = "" + location: str | None = None + status: str + event_type: str + participants: list[Participant] = Field(default_factory=list) + categories: list[str] = Field(default_factory=list) + recurrence: Recurrence | None = None + + @validator("participants", pre=True) + def parse_participants(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] + + @validator("categories", pre=True) + def parse_categories(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] + + @validator("recurrence", pre=True) + def parse_recurrence(cls, v): + if isinstance(v, str): + return json.loads(v) if v else None + return v + + +class CreateTaskCompletion(BaseModel): + task_address: str # "31922:pubkey:d_tag" + task_status: TaskStatus = "claimed" + occurrence: str | None = None # YYYY-MM-DD for recurring tasks + completed_at: int | None = None # unix ts; set when task_status == 'completed' + notes: str = "" + + +class TaskCompletion(BaseModel): + id: str # Nostr event id (or local hash for not-yet-published) + task_address: str + pubkey: str # claimer + occurrence: str | None = None + task_status: TaskStatus = "claimed" + completed_at: int | None = None + notes: str = "" + nostr_created_at: int + time: datetime + + +class TasksSettings(BaseModel): + """Extension-level settings singleton.""" + + public_listing: bool = False # expose /api/v1/tasks/public if enabled