add models, migrations, and CRUD

Schema mirrors the webapp tasks module's ScheduledEvent + EventCompletion
shape:

- tasks.tasks (NIP-52 kind 31922 cache): (pubkey, d_tag) is the
  parameterized-replaceable key. JSON-encoded participants / categories
  / recurrence columns are decoded on read via _parse_task_row so each
  model can keep clean field validators.
- tasks.completions (kind 31925 cache): unique on
  (task_address, pubkey, occurrence). occurrence is NULL for one-shot
  tasks; create_completion deletes any prior claim for the same triple
  so the latest event wins.
- tasks.settings: singleton row with public_listing toggle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-13 11:36:13 +02:00
commit 6fbb6d4a42
3 changed files with 521 additions and 3 deletions

290
crud.py
View file

@ -1,5 +1,293 @@
import json
from datetime import datetime, timezone
from lnbits.db import Database from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
from .models import (
CreateTask,
CreateTaskCompletion,
Task,
TaskCompletion,
TasksSettings,
)
db = Database("ext_tasks") db = Database("ext_tasks")
# CRUD functions are filled in by the next commit.
def _serialize_task_jsonb(task_dict: dict) -> dict:
"""JSON-encode the columns we store as TEXT (participants, categories,
recurrence) so db.insert/update sees strings, not Python objects."""
out = dict(task_dict)
if isinstance(out.get("participants"), list):
out["participants"] = json.dumps(
[p if isinstance(p, dict) else p.dict() for p in out["participants"]]
)
if isinstance(out.get("categories"), list):
out["categories"] = json.dumps(out["categories"])
rec = out.get("recurrence")
if rec is not None and not isinstance(rec, str):
out["recurrence"] = json.dumps(rec if isinstance(rec, dict) else rec.dict())
return out
def _parse_task_row(row) -> dict:
"""Decode TEXT-JSON columns back into Python structures so Pydantic can
rebuild the Task without each model needing a string-parser validator on
every read path."""
data = dict(row)
if isinstance(data.get("participants"), str):
data["participants"] = json.loads(data["participants"]) if data["participants"] else []
if isinstance(data.get("categories"), str):
data["categories"] = json.loads(data["categories"]) if data["categories"] else []
rec = data.get("recurrence")
if isinstance(rec, str):
data["recurrence"] = json.loads(rec) if rec else None
return data
################################ Tasks ################################
async def create_task(pubkey: str, data: CreateTask) -> Task:
"""Insert a task. The d_tag is generated locally; subsequent edits keep
the same d_tag so the published kind 31922 stays addressable."""
task_id = urlsafe_short_hash()
d_tag = task_id
task = Task(
id=task_id,
wallet=data.wallet or "",
pubkey=pubkey,
d_tag=d_tag,
title=data.title,
start_date=data.start_date,
end_date=data.end_date,
description=data.description,
location=data.location,
status=data.status,
event_type=data.event_type,
participants=data.participants,
categories=data.categories,
recurrence=data.recurrence,
time=datetime.now(timezone.utc),
)
await db.execute(
"""
INSERT INTO tasks.tasks (
id, wallet, pubkey, d_tag, title, start_date, end_date,
description, location, status, event_type, participants,
categories, recurrence, nostr_event_id, nostr_event_created_at,
time
) VALUES (
:id, :wallet, :pubkey, :d_tag, :title, :start_date, :end_date,
:description, :location, :status, :event_type, :participants,
:categories, :recurrence, :nostr_event_id, :nostr_event_created_at,
:time
)
""",
_serialize_task_jsonb(task.dict()),
)
return task
async def update_task(task: Task) -> Task:
await db.execute(
"""
UPDATE tasks.tasks SET
wallet = :wallet,
title = :title,
start_date = :start_date,
end_date = :end_date,
description = :description,
location = :location,
status = :status,
event_type = :event_type,
participants = :participants,
categories = :categories,
recurrence = :recurrence,
nostr_event_id = :nostr_event_id,
nostr_event_created_at = :nostr_event_created_at
WHERE id = :id
""",
_serialize_task_jsonb(task.dict()),
)
return task
async def get_task(task_id: str) -> Task | None:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.tasks WHERE id = :id",
{"id": task_id},
)
return Task(**_parse_task_row(row)) if row else None
async def get_task_by_address(pubkey: str, d_tag: str) -> Task | None:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.tasks WHERE pubkey = :pubkey AND d_tag = :d_tag",
{"pubkey": pubkey, "d_tag": d_tag},
)
return Task(**_parse_task_row(row)) if row else None
async def get_tasks(wallet_ids: str | list[str]) -> list[Task]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
q = ",".join([f"'{wid}'" for wid in wallet_ids])
rows: list[dict] = await db.fetchall(
f"SELECT * FROM tasks.tasks WHERE wallet IN ({q}) ORDER BY time DESC"
)
return [Task(**_parse_task_row(row)) for row in rows]
async def get_all_tasks() -> list[Task]:
"""Admin-only. All tasks regardless of wallet, including synced-from-Nostr
rows (wallet=''). Order by recency."""
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.tasks ORDER BY time DESC"
)
return [Task(**_parse_task_row(row)) for row in rows]
async def get_public_tasks() -> list[Task]:
"""Public listing: all tasks. Gated upstream by the public_listing
settings flag."""
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.tasks ORDER BY start_date ASC"
)
return [Task(**_parse_task_row(row)) for row in rows]
async def get_tasks_by_pubkey(pubkey: str) -> list[Task]:
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.tasks WHERE pubkey = :pubkey ORDER BY time DESC",
{"pubkey": pubkey},
)
return [Task(**_parse_task_row(row)) for row in rows]
async def delete_task(task_id: str) -> None:
await db.execute("DELETE FROM tasks.tasks WHERE id = :id", {"id": task_id})
async def delete_task_completions(task_address: str) -> None:
await db.execute(
"DELETE FROM tasks.completions WHERE task_address = :addr",
{"addr": task_address},
)
############################# Completions #############################
async def create_completion(
pubkey: str, data: CreateTaskCompletion, *, nostr_event_id: str | None = None,
nostr_created_at: int | None = None,
) -> TaskCompletion:
"""Insert (or replace on conflict) a completion for (task_address, pubkey,
occurrence). The newest event_id always wins we let the caller pass an
already-published Nostr id, otherwise generate a local hash for an unsent
completion that the Nostr publisher will replace on the next round-trip."""
cid = nostr_event_id or urlsafe_short_hash()
completion = TaskCompletion(
id=cid,
task_address=data.task_address,
pubkey=pubkey,
occurrence=data.occurrence,
task_status=data.task_status,
completed_at=data.completed_at,
notes=data.notes,
nostr_created_at=nostr_created_at or int(datetime.now(timezone.utc).timestamp()),
time=datetime.now(timezone.utc),
)
# Replace any existing claim by this pubkey on this (address, occurrence).
existing = await get_completion(
data.task_address, pubkey, data.occurrence
)
if existing:
await db.execute(
"DELETE FROM tasks.completions WHERE id = :id", {"id": existing.id}
)
await db.execute(
"""
INSERT INTO tasks.completions (
id, task_address, pubkey, occurrence, task_status,
completed_at, notes, nostr_created_at, time
) VALUES (
:id, :task_address, :pubkey, :occurrence, :task_status,
:completed_at, :notes, :nostr_created_at, :time
)
""",
completion.dict(),
)
return completion
async def get_completion(
task_address: str, pubkey: str, occurrence: str | None
) -> TaskCompletion | None:
if occurrence is None:
row: dict | None = await db.fetchone(
"""
SELECT * FROM tasks.completions
WHERE task_address = :addr AND pubkey = :pubkey
AND occurrence IS NULL
""",
{"addr": task_address, "pubkey": pubkey},
)
else:
row = await db.fetchone(
"""
SELECT * FROM tasks.completions
WHERE task_address = :addr AND pubkey = :pubkey
AND occurrence = :occurrence
""",
{"addr": task_address, "pubkey": pubkey, "occurrence": occurrence},
)
return TaskCompletion(**dict(row)) if row else None
async def get_completions_for_task(task_address: str) -> list[TaskCompletion]:
rows: list[dict] = await db.fetchall(
"SELECT * FROM tasks.completions WHERE task_address = :addr "
"ORDER BY nostr_created_at DESC",
{"addr": task_address},
)
return [TaskCompletion(**dict(row)) for row in rows]
async def get_completion_by_id(completion_id: str) -> TaskCompletion | None:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.completions WHERE id = :id",
{"id": completion_id},
)
return TaskCompletion(**dict(row)) if row else None
async def delete_completion(completion_id: str) -> None:
await db.execute(
"DELETE FROM tasks.completions WHERE id = :id",
{"id": completion_id},
)
############################## Settings ##############################
async def get_settings() -> TasksSettings:
row: dict | None = await db.fetchone(
"SELECT * FROM tasks.settings WHERE id = 1"
)
if row:
return TasksSettings(**dict(row))
return TasksSettings()
async def update_settings(settings: TasksSettings) -> TasksSettings:
await db.execute(
"UPDATE tasks.settings SET public_listing = :public_listing WHERE id = 1",
{"public_listing": settings.public_listing},
)
return settings

View file

@ -1 +1,76 @@
# Migrations are filled in by the next commit. async def m001_initial(db):
"""
Tasks table (NIP-52 kind 31922 cache) and completions table (kind 31925
cache). The (pubkey, d_tag) pair is the parameterized-replaceable key.
"""
await db.execute(
f"""
CREATE TABLE tasks.tasks (
id TEXT PRIMARY KEY,
wallet TEXT NOT NULL,
pubkey TEXT NOT NULL,
d_tag TEXT NOT NULL,
title TEXT NOT NULL,
start_date TEXT NOT NULL,
end_date TEXT,
description TEXT NOT NULL DEFAULT '',
location TEXT,
status TEXT NOT NULL DEFAULT 'pending',
event_type TEXT NOT NULL DEFAULT 'task',
participants TEXT,
categories TEXT,
recurrence TEXT,
nostr_event_id TEXT,
nostr_event_created_at INTEGER,
time TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}
);
"""
)
await db.execute(
"CREATE UNIQUE INDEX tasks_pubkey_dtag_idx "
"ON tasks.tasks (pubkey, d_tag)"
)
await db.execute(
f"""
CREATE TABLE tasks.completions (
id TEXT PRIMARY KEY,
task_address TEXT NOT NULL,
pubkey TEXT NOT NULL,
occurrence TEXT,
task_status TEXT NOT NULL DEFAULT 'claimed',
completed_at INTEGER,
notes TEXT NOT NULL DEFAULT '',
nostr_created_at INTEGER NOT NULL,
time TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}
);
"""
)
# occurrence is NULL for one-shot tasks; the (task_address, pubkey,
# occurrence) tuple identifies "the latest claim by this user for this
# specific occurrence".
await db.execute(
"CREATE INDEX tasks_completions_address_idx "
"ON tasks.completions (task_address)"
)
await db.execute(
"CREATE INDEX tasks_completions_pubkey_idx "
"ON tasks.completions (pubkey)"
)
async def m002_settings(db):
"""Singleton settings row for the admin UI."""
await db.execute(
"""
CREATE TABLE IF NOT EXISTS tasks.settings (
id INTEGER PRIMARY KEY DEFAULT 1,
public_listing BOOLEAN NOT NULL DEFAULT FALSE
)
"""
)
await db.execute(
"INSERT INTO tasks.settings (id, public_listing) "
"SELECT 1, FALSE WHERE NOT EXISTS "
"(SELECT 1 FROM tasks.settings WHERE id = 1)"
)

157
models.py
View file

@ -1 +1,156 @@
# Pydantic models are filled in by the next commit. import json
from datetime import datetime
from typing import Literal
from pydantic import BaseModel, Field, validator
TaskStatus = Literal["claimed", "in-progress", "completed", "blocked", "cancelled"]
class Participant(BaseModel):
pubkey: str
type: str | None = None # "required" | "optional" | "organizer"
class Recurrence(BaseModel):
frequency: Literal["daily", "weekly"]
day_of_week: str | None = None # for weekly: monday..sunday
end_date: str | None = None # YYYY-MM-DD, optional cutoff
class CreateTask(BaseModel):
wallet: str | None = None # filled from caller's wallet if absent
title: str
start_date: str # YYYY-MM-DD or ISO datetime
end_date: str | None = None
description: str = ""
location: str | None = None
status: str = "pending"
event_type: str = "task" # 'task' | 'announcement'
participants: list[Participant] = Field(default_factory=list)
categories: list[str] = Field(default_factory=list)
recurrence: Recurrence | None = None
@validator("participants", pre=True)
def parse_participants(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
@validator("categories", pre=True)
def parse_categories(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
@validator("recurrence", pre=True)
def parse_recurrence(cls, v):
if isinstance(v, str):
return json.loads(v) if v else None
return v
class Task(BaseModel):
id: str
wallet: str
pubkey: str
d_tag: str
title: str
start_date: str
end_date: str | None = None
description: str = ""
location: str | None = None
status: str = "pending"
event_type: str = "task"
participants: list[Participant] = Field(default_factory=list)
categories: list[str] = Field(default_factory=list)
recurrence: Recurrence | None = None
nostr_event_id: str | None = None
nostr_event_created_at: int | None = None
time: datetime
@validator("participants", pre=True)
def parse_participants(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
@validator("categories", pre=True)
def parse_categories(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
@validator("recurrence", pre=True)
def parse_recurrence(cls, v):
if isinstance(v, str):
return json.loads(v) if v else None
return v
@property
def address(self) -> str:
"""NIP-52 replaceable-event address (31922:pubkey:d-tag)."""
return f"31922:{self.pubkey}:{self.d_tag}"
class PublicTask(BaseModel):
"""Trimmed task payload for the public/anonymous endpoint."""
id: str
pubkey: str
d_tag: str
title: str
start_date: str
end_date: str | None = None
description: str = ""
location: str | None = None
status: str
event_type: str
participants: list[Participant] = Field(default_factory=list)
categories: list[str] = Field(default_factory=list)
recurrence: Recurrence | None = None
@validator("participants", pre=True)
def parse_participants(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
@validator("categories", pre=True)
def parse_categories(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
@validator("recurrence", pre=True)
def parse_recurrence(cls, v):
if isinstance(v, str):
return json.loads(v) if v else None
return v
class CreateTaskCompletion(BaseModel):
task_address: str # "31922:pubkey:d_tag"
task_status: TaskStatus = "claimed"
occurrence: str | None = None # YYYY-MM-DD for recurring tasks
completed_at: int | None = None # unix ts; set when task_status == 'completed'
notes: str = ""
class TaskCompletion(BaseModel):
id: str # Nostr event id (or local hash for not-yet-published)
task_address: str
pubkey: str # claimer
occurrence: str | None = None
task_status: TaskStatus = "claimed"
completed_at: int | None = None
notes: str = ""
nostr_created_at: int
time: datetime
class TasksSettings(BaseModel):
"""Extension-level settings singleton."""
public_listing: bool = False # expose /api/v1/tasks/public if enabled