Schema mirrors the webapp tasks module's ScheduledEvent + EventCompletion shape: - tasks.tasks (NIP-52 kind 31922 cache): (pubkey, d_tag) is the parameterized-replaceable key. JSON-encoded participants / categories / recurrence columns are decoded on read via _parse_task_row so each model can keep clean field validators. - tasks.completions (kind 31925 cache): unique on (task_address, pubkey, occurrence). occurrence is NULL for one-shot tasks; create_completion deletes any prior claim for the same triple so the latest event wins. - tasks.settings: singleton row with public_listing toggle. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
293 lines
9.6 KiB
Python
293 lines
9.6 KiB
Python
import json
|
|
from datetime import datetime, timezone
|
|
|
|
from lnbits.db import Database
|
|
from lnbits.helpers import urlsafe_short_hash
|
|
|
|
from .models import (
|
|
CreateTask,
|
|
CreateTaskCompletion,
|
|
Task,
|
|
TaskCompletion,
|
|
TasksSettings,
|
|
)
|
|
|
|
db = Database("ext_tasks")
|
|
|
|
|
|
def _serialize_task_jsonb(task_dict: dict) -> dict:
|
|
"""JSON-encode the columns we store as TEXT (participants, categories,
|
|
recurrence) so db.insert/update sees strings, not Python objects."""
|
|
out = dict(task_dict)
|
|
if isinstance(out.get("participants"), list):
|
|
out["participants"] = json.dumps(
|
|
[p if isinstance(p, dict) else p.dict() for p in out["participants"]]
|
|
)
|
|
if isinstance(out.get("categories"), list):
|
|
out["categories"] = json.dumps(out["categories"])
|
|
rec = out.get("recurrence")
|
|
if rec is not None and not isinstance(rec, str):
|
|
out["recurrence"] = json.dumps(rec if isinstance(rec, dict) else rec.dict())
|
|
return out
|
|
|
|
|
|
def _parse_task_row(row) -> dict:
|
|
"""Decode TEXT-JSON columns back into Python structures so Pydantic can
|
|
rebuild the Task without each model needing a string-parser validator on
|
|
every read path."""
|
|
data = dict(row)
|
|
if isinstance(data.get("participants"), str):
|
|
data["participants"] = json.loads(data["participants"]) if data["participants"] else []
|
|
if isinstance(data.get("categories"), str):
|
|
data["categories"] = json.loads(data["categories"]) if data["categories"] else []
|
|
rec = data.get("recurrence")
|
|
if isinstance(rec, str):
|
|
data["recurrence"] = json.loads(rec) if rec else None
|
|
return data
|
|
|
|
|
|
################################ Tasks ################################
|
|
|
|
|
|
async def create_task(pubkey: str, data: CreateTask) -> Task:
|
|
"""Insert a task. The d_tag is generated locally; subsequent edits keep
|
|
the same d_tag so the published kind 31922 stays addressable."""
|
|
task_id = urlsafe_short_hash()
|
|
d_tag = task_id
|
|
task = Task(
|
|
id=task_id,
|
|
wallet=data.wallet or "",
|
|
pubkey=pubkey,
|
|
d_tag=d_tag,
|
|
title=data.title,
|
|
start_date=data.start_date,
|
|
end_date=data.end_date,
|
|
description=data.description,
|
|
location=data.location,
|
|
status=data.status,
|
|
event_type=data.event_type,
|
|
participants=data.participants,
|
|
categories=data.categories,
|
|
recurrence=data.recurrence,
|
|
time=datetime.now(timezone.utc),
|
|
)
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO tasks.tasks (
|
|
id, wallet, pubkey, d_tag, title, start_date, end_date,
|
|
description, location, status, event_type, participants,
|
|
categories, recurrence, nostr_event_id, nostr_event_created_at,
|
|
time
|
|
) VALUES (
|
|
:id, :wallet, :pubkey, :d_tag, :title, :start_date, :end_date,
|
|
:description, :location, :status, :event_type, :participants,
|
|
:categories, :recurrence, :nostr_event_id, :nostr_event_created_at,
|
|
:time
|
|
)
|
|
""",
|
|
_serialize_task_jsonb(task.dict()),
|
|
)
|
|
return task
|
|
|
|
|
|
async def update_task(task: Task) -> Task:
|
|
await db.execute(
|
|
"""
|
|
UPDATE tasks.tasks SET
|
|
wallet = :wallet,
|
|
title = :title,
|
|
start_date = :start_date,
|
|
end_date = :end_date,
|
|
description = :description,
|
|
location = :location,
|
|
status = :status,
|
|
event_type = :event_type,
|
|
participants = :participants,
|
|
categories = :categories,
|
|
recurrence = :recurrence,
|
|
nostr_event_id = :nostr_event_id,
|
|
nostr_event_created_at = :nostr_event_created_at
|
|
WHERE id = :id
|
|
""",
|
|
_serialize_task_jsonb(task.dict()),
|
|
)
|
|
return task
|
|
|
|
|
|
async def get_task(task_id: str) -> Task | None:
|
|
row: dict | None = await db.fetchone(
|
|
"SELECT * FROM tasks.tasks WHERE id = :id",
|
|
{"id": task_id},
|
|
)
|
|
return Task(**_parse_task_row(row)) if row else None
|
|
|
|
|
|
async def get_task_by_address(pubkey: str, d_tag: str) -> Task | None:
|
|
row: dict | None = await db.fetchone(
|
|
"SELECT * FROM tasks.tasks WHERE pubkey = :pubkey AND d_tag = :d_tag",
|
|
{"pubkey": pubkey, "d_tag": d_tag},
|
|
)
|
|
return Task(**_parse_task_row(row)) if row else None
|
|
|
|
|
|
async def get_tasks(wallet_ids: str | list[str]) -> list[Task]:
|
|
if isinstance(wallet_ids, str):
|
|
wallet_ids = [wallet_ids]
|
|
q = ",".join([f"'{wid}'" for wid in wallet_ids])
|
|
rows: list[dict] = await db.fetchall(
|
|
f"SELECT * FROM tasks.tasks WHERE wallet IN ({q}) ORDER BY time DESC"
|
|
)
|
|
return [Task(**_parse_task_row(row)) for row in rows]
|
|
|
|
|
|
async def get_all_tasks() -> list[Task]:
|
|
"""Admin-only. All tasks regardless of wallet, including synced-from-Nostr
|
|
rows (wallet=''). Order by recency."""
|
|
rows: list[dict] = await db.fetchall(
|
|
"SELECT * FROM tasks.tasks ORDER BY time DESC"
|
|
)
|
|
return [Task(**_parse_task_row(row)) for row in rows]
|
|
|
|
|
|
async def get_public_tasks() -> list[Task]:
|
|
"""Public listing: all tasks. Gated upstream by the public_listing
|
|
settings flag."""
|
|
rows: list[dict] = await db.fetchall(
|
|
"SELECT * FROM tasks.tasks ORDER BY start_date ASC"
|
|
)
|
|
return [Task(**_parse_task_row(row)) for row in rows]
|
|
|
|
|
|
async def get_tasks_by_pubkey(pubkey: str) -> list[Task]:
|
|
rows: list[dict] = await db.fetchall(
|
|
"SELECT * FROM tasks.tasks WHERE pubkey = :pubkey ORDER BY time DESC",
|
|
{"pubkey": pubkey},
|
|
)
|
|
return [Task(**_parse_task_row(row)) for row in rows]
|
|
|
|
|
|
async def delete_task(task_id: str) -> None:
|
|
await db.execute("DELETE FROM tasks.tasks WHERE id = :id", {"id": task_id})
|
|
|
|
|
|
async def delete_task_completions(task_address: str) -> None:
|
|
await db.execute(
|
|
"DELETE FROM tasks.completions WHERE task_address = :addr",
|
|
{"addr": task_address},
|
|
)
|
|
|
|
|
|
############################# Completions #############################
|
|
|
|
|
|
async def create_completion(
|
|
pubkey: str, data: CreateTaskCompletion, *, nostr_event_id: str | None = None,
|
|
nostr_created_at: int | None = None,
|
|
) -> TaskCompletion:
|
|
"""Insert (or replace on conflict) a completion for (task_address, pubkey,
|
|
occurrence). The newest event_id always wins — we let the caller pass an
|
|
already-published Nostr id, otherwise generate a local hash for an unsent
|
|
completion that the Nostr publisher will replace on the next round-trip."""
|
|
cid = nostr_event_id or urlsafe_short_hash()
|
|
completion = TaskCompletion(
|
|
id=cid,
|
|
task_address=data.task_address,
|
|
pubkey=pubkey,
|
|
occurrence=data.occurrence,
|
|
task_status=data.task_status,
|
|
completed_at=data.completed_at,
|
|
notes=data.notes,
|
|
nostr_created_at=nostr_created_at or int(datetime.now(timezone.utc).timestamp()),
|
|
time=datetime.now(timezone.utc),
|
|
)
|
|
|
|
# Replace any existing claim by this pubkey on this (address, occurrence).
|
|
existing = await get_completion(
|
|
data.task_address, pubkey, data.occurrence
|
|
)
|
|
if existing:
|
|
await db.execute(
|
|
"DELETE FROM tasks.completions WHERE id = :id", {"id": existing.id}
|
|
)
|
|
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO tasks.completions (
|
|
id, task_address, pubkey, occurrence, task_status,
|
|
completed_at, notes, nostr_created_at, time
|
|
) VALUES (
|
|
:id, :task_address, :pubkey, :occurrence, :task_status,
|
|
:completed_at, :notes, :nostr_created_at, :time
|
|
)
|
|
""",
|
|
completion.dict(),
|
|
)
|
|
return completion
|
|
|
|
|
|
async def get_completion(
|
|
task_address: str, pubkey: str, occurrence: str | None
|
|
) -> TaskCompletion | None:
|
|
if occurrence is None:
|
|
row: dict | None = await db.fetchone(
|
|
"""
|
|
SELECT * FROM tasks.completions
|
|
WHERE task_address = :addr AND pubkey = :pubkey
|
|
AND occurrence IS NULL
|
|
""",
|
|
{"addr": task_address, "pubkey": pubkey},
|
|
)
|
|
else:
|
|
row = await db.fetchone(
|
|
"""
|
|
SELECT * FROM tasks.completions
|
|
WHERE task_address = :addr AND pubkey = :pubkey
|
|
AND occurrence = :occurrence
|
|
""",
|
|
{"addr": task_address, "pubkey": pubkey, "occurrence": occurrence},
|
|
)
|
|
return TaskCompletion(**dict(row)) if row else None
|
|
|
|
|
|
async def get_completions_for_task(task_address: str) -> list[TaskCompletion]:
|
|
rows: list[dict] = await db.fetchall(
|
|
"SELECT * FROM tasks.completions WHERE task_address = :addr "
|
|
"ORDER BY nostr_created_at DESC",
|
|
{"addr": task_address},
|
|
)
|
|
return [TaskCompletion(**dict(row)) for row in rows]
|
|
|
|
|
|
async def get_completion_by_id(completion_id: str) -> TaskCompletion | None:
|
|
row: dict | None = await db.fetchone(
|
|
"SELECT * FROM tasks.completions WHERE id = :id",
|
|
{"id": completion_id},
|
|
)
|
|
return TaskCompletion(**dict(row)) if row else None
|
|
|
|
|
|
async def delete_completion(completion_id: str) -> None:
|
|
await db.execute(
|
|
"DELETE FROM tasks.completions WHERE id = :id",
|
|
{"id": completion_id},
|
|
)
|
|
|
|
|
|
############################## Settings ##############################
|
|
|
|
|
|
async def get_settings() -> TasksSettings:
|
|
row: dict | None = await db.fetchone(
|
|
"SELECT * FROM tasks.settings WHERE id = 1"
|
|
)
|
|
if row:
|
|
return TasksSettings(**dict(row))
|
|
return TasksSettings()
|
|
|
|
|
|
async def update_settings(settings: TasksSettings) -> TasksSettings:
|
|
await db.execute(
|
|
"UPDATE tasks.settings SET public_listing = :public_listing WHERE id = 1",
|
|
{"public_listing": settings.public_listing},
|
|
)
|
|
return settings
|