tasks/crud.py
Padreug 6fbb6d4a42 add models, migrations, and CRUD
Schema mirrors the webapp tasks module's ScheduledEvent + EventCompletion
shape:

- tasks.tasks (NIP-52 kind 31922 cache): (pubkey, d_tag) is the
  parameterized-replaceable key. JSON-encoded participants / categories
  / recurrence columns are decoded on read via _parse_task_row so each
  model can keep clean field validators.
- tasks.completions (kind 31925 cache): unique on
  (task_address, pubkey, occurrence). occurrence is NULL for one-shot
  tasks; create_completion deletes any prior claim for the same triple
  so the latest event wins.
- tasks.settings: singleton row with public_listing toggle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 11:36:13 +02:00

293 lines
9.6 KiB
Python

import json
from datetime import datetime, timezone
from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
from .models import (
CreateTask,
CreateTaskCompletion,
Task,
TaskCompletion,
TasksSettings,
)
db = Database("ext_tasks")
def _serialize_task_jsonb(task_dict: dict) -> dict:
"""JSON-encode the columns we store as TEXT (participants, categories,
recurrence) so db.insert/update sees strings, not Python objects."""
out = dict(task_dict)
if isinstance(out.get("participants"), list):
out["participants"] = json.dumps(
[p if isinstance(p, dict) else p.dict() for p in out["participants"]]
)
if isinstance(out.get("categories"), list):
out["categories"] = json.dumps(out["categories"])
rec = out.get("recurrence")
if rec is not None and not isinstance(rec, str):
out["recurrence"] = json.dumps(rec if isinstance(rec, dict) else rec.dict())
return out
def _parse_task_row(row) -> dict:
"""Decode TEXT-JSON columns back into Python structures so Pydantic can
rebuild the Task without each model needing a string-parser validator on
every read path."""
data = dict(row)
if isinstance(data.get("participants"), str):
data["participants"] = json.loads(data["participants"]) if data["participants"] else []
if isinstance(data.get("categories"), str):
data["categories"] = json.loads(data["categories"]) if data["categories"] else []
rec = data.get("recurrence")
if isinstance(rec, str):
data["recurrence"] = json.loads(rec) if rec else None
return data
################################ Tasks ################################
async def create_task(pubkey: str, data: CreateTask) -> Task:
"""Insert a task. The d_tag is generated locally; subsequent edits keep
the same d_tag so the published kind 31922 stays addressable."""
task_id = urlsafe_short_hash()
d_tag = task_id
task = Task(
id=task_id,
wallet=data.wallet or "",
pubkey=pubkey,
d_tag=d_tag,
title=data.title,
start_date=data.start_date,
end_date=data.end_date,
description=data.description,
location=data.location,
status=data.status,
event_type=data.event_type,
participants=data.participants,
categories=data.categories,
recurrence=data.recurrence,
time=datetime.now(timezone.utc),
)
await db.execute(
"""
INSERT INTO tasks.tasks (
id, wallet, pubkey, d_tag, title, start_date, end_date,
description, location, status, event_type, participants,
categories, recurrence, nostr_event_id, nostr_event_created_at,
time
) VALUES (
:id, :wallet, :pubkey, :d_tag, :title, :start_date, :end_date,
:description, :location, :status, :event_type, :participants,
:categories, :recurrence, :nostr_event_id, :nostr_event_created_at,
:time
)
""",
_serialize_task_jsonb(task.dict()),
)
return task
async def update_task(task: Task) -> Task:
await db.execute(
"""
UPDATE tasks.tasks SET
wallet = :wallet,
title = :title,
start_date = :start_date,
end_date = :end_date,
description = :description,
location = :location,
status = :status,
event_type = :event_type,
participants = :participants,
categories = :categories,
recurrence = :recurrence,
nostr_event_id = :nostr_event_id,
nostr_event_created_at = :nostr_event_created_at
WHERE id = :id
""",
_serialize_task_jsonb(task.dict()),
)
return task
async def get_task(task_id: str) -> Task | None:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.tasks WHERE id = :id",
{"id": task_id},
)
return Task(**_parse_task_row(row)) if row else None
async def get_task_by_address(pubkey: str, d_tag: str) -> Task | None:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.tasks WHERE pubkey = :pubkey AND d_tag = :d_tag",
{"pubkey": pubkey, "d_tag": d_tag},
)
return Task(**_parse_task_row(row)) if row else None
async def get_tasks(wallet_ids: str | list[str]) -> list[Task]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
q = ",".join([f"'{wid}'" for wid in wallet_ids])
rows: list[dict] = await db.fetchall(
f"SELECT * FROM tasks.tasks WHERE wallet IN ({q}) ORDER BY time DESC"
)
return [Task(**_parse_task_row(row)) for row in rows]
async def get_all_tasks() -> list[Task]:
"""Admin-only. All tasks regardless of wallet, including synced-from-Nostr
rows (wallet=''). Order by recency."""
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.tasks ORDER BY time DESC"
)
return [Task(**_parse_task_row(row)) for row in rows]
async def get_public_tasks() -> list[Task]:
"""Public listing: all tasks. Gated upstream by the public_listing
settings flag."""
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.tasks ORDER BY start_date ASC"
)
return [Task(**_parse_task_row(row)) for row in rows]
async def get_tasks_by_pubkey(pubkey: str) -> list[Task]:
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.tasks WHERE pubkey = :pubkey ORDER BY time DESC",
{"pubkey": pubkey},
)
return [Task(**_parse_task_row(row)) for row in rows]
async def delete_task(task_id: str) -> None:
await db.execute("DELETE FROM tasks.tasks WHERE id = :id", {"id": task_id})
async def delete_task_completions(task_address: str) -> None:
await db.execute(
"DELETE FROM tasks.completions WHERE task_address = :addr",
{"addr": task_address},
)
############################# Completions #############################
async def create_completion(
pubkey: str, data: CreateTaskCompletion, *, nostr_event_id: str | None = None,
nostr_created_at: int | None = None,
) -> TaskCompletion:
"""Insert (or replace on conflict) a completion for (task_address, pubkey,
occurrence). The newest event_id always wins — we let the caller pass an
already-published Nostr id, otherwise generate a local hash for an unsent
completion that the Nostr publisher will replace on the next round-trip."""
cid = nostr_event_id or urlsafe_short_hash()
completion = TaskCompletion(
id=cid,
task_address=data.task_address,
pubkey=pubkey,
occurrence=data.occurrence,
task_status=data.task_status,
completed_at=data.completed_at,
notes=data.notes,
nostr_created_at=nostr_created_at or int(datetime.now(timezone.utc).timestamp()),
time=datetime.now(timezone.utc),
)
# Replace any existing claim by this pubkey on this (address, occurrence).
existing = await get_completion(
data.task_address, pubkey, data.occurrence
)
if existing:
await db.execute(
"DELETE FROM tasks.completions WHERE id = :id", {"id": existing.id}
)
await db.execute(
"""
INSERT INTO tasks.completions (
id, task_address, pubkey, occurrence, task_status,
completed_at, notes, nostr_created_at, time
) VALUES (
:id, :task_address, :pubkey, :occurrence, :task_status,
:completed_at, :notes, :nostr_created_at, :time
)
""",
completion.dict(),
)
return completion
async def get_completion(
task_address: str, pubkey: str, occurrence: str | None
) -> TaskCompletion | None:
if occurrence is None:
row: dict | None = await db.fetchone(
"""
SELECT * FROM tasks.completions
WHERE task_address = :addr AND pubkey = :pubkey
AND occurrence IS NULL
""",
{"addr": task_address, "pubkey": pubkey},
)
else:
row = await db.fetchone(
"""
SELECT * FROM tasks.completions
WHERE task_address = :addr AND pubkey = :pubkey
AND occurrence = :occurrence
""",
{"addr": task_address, "pubkey": pubkey, "occurrence": occurrence},
)
return TaskCompletion(**dict(row)) if row else None
async def get_completions_for_task(task_address: str) -> list[TaskCompletion]:
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.completions WHERE task_address = :addr "
"ORDER BY nostr_created_at DESC",
{"addr": task_address},
)
return [TaskCompletion(**dict(row)) for row in rows]
async def get_completion_by_id(completion_id: str) -> TaskCompletion | None:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.completions WHERE id = :id",
{"id": completion_id},
)
return TaskCompletion(**dict(row)) if row else None
async def delete_completion(completion_id: str) -> None:
await db.execute(
"DELETE FROM tasks.completions WHERE id = :id",
{"id": completion_id},
)
############################## Settings ##############################
async def get_settings() -> TasksSettings:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.settings WHERE id = 1"
)
if row:
return TasksSettings(**dict(row))
return TasksSettings()
async def update_settings(settings: TasksSettings) -> TasksSettings:
await db.execute(
"UPDATE tasks.settings SET public_listing = :public_listing WHERE id = 1",
{"public_listing": settings.public_listing},
)
return settings