From 24acbe66747c649eb52a983859b0d647e2fe4273 Mon Sep 17 00:00:00 2001 From: Padreug Date: Wed, 13 May 2026 11:38:42 +0200 Subject: [PATCH] add Nostr publishing and bidirectional sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - nostr/ vendors NostrEvent + the nostrclient WebSocket bridge from the events extension, retagged [TASKS] / subscription-id "tasks-*". - nostr_publisher builds kind 31922 with the `event-type: task` tag (per aiolabs/webapp#25 — disambiguates from kind-31922 activities on shared relays), kind 31925 with task-status / occurrence / completed_at, and kind 5 deletions for both. - nostr_hooks bridges task/completion mutations to the publisher and persists the resulting nostr_event_id back onto the local row. - nostr_sync subscribes to {31922, 31925, 5/#k} and filters 31922 client-side on `event-type: task` because most relays don't index custom single-letter tags. Co-Authored-By: Claude Opus 4.7 (1M context) --- nostr/__init__.py | 0 nostr/event.py | 26 ++++ nostr/nostr_client.py | 131 +++++++++++++++++++ nostr_hooks.py | 97 ++++++++++++++ nostr_publisher.py | 210 ++++++++++++++++++++++++++++++ nostr_sync.py | 291 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 755 insertions(+) create mode 100644 nostr/__init__.py create mode 100644 nostr/event.py create mode 100644 nostr/nostr_client.py create mode 100644 nostr_hooks.py create mode 100644 nostr_publisher.py create mode 100644 nostr_sync.py diff --git a/nostr/__init__.py b/nostr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nostr/event.py b/nostr/event.py new file mode 100644 index 0000000..b6832b1 --- /dev/null +++ b/nostr/event.py @@ -0,0 +1,26 @@ +import hashlib +import json + +from pydantic import BaseModel + + +class NostrEvent(BaseModel): + id: str = "" + pubkey: str + created_at: int + kind: int + tags: list[list[str]] = [] + content: str = "" + sig: str | None = None + + def serialize(self) -> list: + return [0, self.pubkey, self.created_at, self.kind, self.tags, self.content] + + def serialize_json(self) -> str: + e = self.serialize() + return json.dumps(e, separators=(",", ":"), ensure_ascii=False) + + @property + def event_id(self) -> str: + data = self.serialize_json() + return hashlib.sha256(data.encode()).hexdigest() diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py new file mode 100644 index 0000000..800e9f9 --- /dev/null +++ b/nostr/nostr_client.py @@ -0,0 +1,131 @@ +""" +Bidirectional Nostr client for the tasks extension. + +Connects to the nostrclient extension's internal WebSocket to publish +and subscribe to NIP-52 calendar events (kind 31922) and the +task-status RSVP variant (kind 31925). Mirrors events/nostr/nostr_client.py +with a TASKS log prefix and subscription-id namespace. +""" + +import asyncio +import json +from asyncio import Queue +from collections import OrderedDict + +from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash +from lnbits.settings import settings +from loguru import logger +from websocket import WebSocketApp + +from .event import NostrEvent + +MAX_SEEN_EVENTS = 500 + + +class NostrClient: + def __init__(self): + self.receive_event_queue: Queue = Queue() + self.send_req_queue: Queue = Queue() + self.ws: WebSocketApp | None = None + self.subscription_id = "tasks-" + urlsafe_short_hash()[:32] + self.running = False + self._seen_events: OrderedDict[str, None] = OrderedDict() + + @property + def is_websocket_connected(self): + if not self.ws: + return False + return self.ws.keep_running + + async def connect(self) -> WebSocketApp: + relay_endpoint = encrypt_internal_message("relay", urlsafe=True) + ws_url = ( + f"ws://localhost:{settings.port}" f"/nostrclient/api/v1/{relay_endpoint}" + ) + + logger.info("[TASKS] Connecting to nostrclient WebSocket...") + + def on_open(_): + logger.info("[TASKS] Connected to nostrclient WebSocket") + + def on_message(_, message): + try: + self.receive_event_queue.put_nowait(message) + except Exception as e: + logger.error(f"[TASKS] Failed to queue message: {e}") + + def on_error(_, error): + logger.warning(f"[TASKS] WebSocket error: {error}") + + def on_close(_, status_code, message): + logger.warning(f"[TASKS] WebSocket closed: {status_code} {message}") + self.receive_event_queue.put_nowait(ValueError("WebSocket closed")) + + ws = WebSocketApp( + ws_url, + on_message=on_message, + on_open=on_open, + on_close=on_close, + on_error=on_error, + ) + + from threading import Thread + + wst = Thread(target=ws.run_forever) + wst.daemon = True + wst.start() + + return ws + + async def run_forever(self): + self.running = True + while self.running: + try: + if not self.is_websocket_connected: + self.ws = await self.connect() + await asyncio.sleep(5) + + req = await self.send_req_queue.get() + assert self.ws + self.ws.send(json.dumps(req)) + except Exception as ex: + logger.warning(f"[TASKS] NostrClient error: {ex}") + await asyncio.sleep(60) + + def is_duplicate_event(self, event_id: str) -> bool: + if event_id in self._seen_events: + return True + self._seen_events[event_id] = None + if len(self._seen_events) > MAX_SEEN_EVENTS: + self._seen_events.popitem(last=False) + return False + + async def get_event(self): + value = await self.receive_event_queue.get() + if isinstance(value, ValueError): + raise value + return value + + async def publish_nostr_event(self, e: NostrEvent): + await self.send_req_queue.put(["EVENT", e.dict()]) + + async def subscribe(self, filters: list[dict]): + self.subscription_id = "tasks-" + urlsafe_short_hash()[:32] + await self.send_req_queue.put(["REQ", self.subscription_id, *filters]) + logger.info( + f"[TASKS] Subscribed (sub: {self.subscription_id[:20]}...)" + ) + + async def unsubscribe(self): + await self.send_req_queue.put(["CLOSE", self.subscription_id]) + + async def stop(self): + await self.unsubscribe() + self.running = False + await asyncio.sleep(2) + if self.ws: + try: + self.ws.close() + except Exception: + pass + self.ws = None diff --git a/nostr_hooks.py b/nostr_hooks.py new file mode 100644 index 0000000..295d473 --- /dev/null +++ b/nostr_hooks.py @@ -0,0 +1,97 @@ +"""Helpers that bridge task-mutation handlers to the Nostr publisher. + +Sits between views_api and nostr_publisher so we don't pull the publisher +through the views module (which would create an import cycle via models).""" + +from loguru import logger + +from .crud import update_task +from .models import Task, TaskCompletion +from .nostr_publisher import ( + publish_completion_delete_to_nostr, + publish_completion_to_nostr, + publish_task_to_nostr, +) + + +async def _account_keys(wallet_id: str) -> tuple[str, str] | None: + """Fetch (pubkey, prvkey) for the wallet's owning account. Returns None + when the account is missing keys, so callers can skip cleanly.""" + from lnbits.core.crud.users import get_account + from lnbits.core.crud.wallets import get_wallet + + wallet_obj = await get_wallet(wallet_id) + if not wallet_obj: + return None + account = await get_account(wallet_obj.user) + if not account or not account.pubkey or not account.prvkey: # type: ignore[attr-defined] + return None + return account.pubkey, account.prvkey # type: ignore[attr-defined] + + +async def publish_or_delete_task_event( + task: Task, *, delete: bool = False +) -> None: + """Publish (or delete-publish) the NIP-52 kind 31922 for `task`. + + Errors are logged and swallowed so a Nostr outage doesn't break the + HTTP flow that triggered the publish.""" + try: + from . import nostr_client + + keys = await _account_keys(task.wallet) + if not keys: + return + pubkey, prvkey = keys + + nostr_event = await publish_task_to_nostr( + nostr_client, task, pubkey, prvkey, delete=delete + ) + if nostr_event and not delete: + task.nostr_event_id = nostr_event.id + task.nostr_event_created_at = nostr_event.created_at + await update_task(task) + except Exception as exc: + logger.warning(f"[TASKS] Nostr task publish failed: {exc}") + + +async def publish_task_completion( + task: Task, completion: TaskCompletion +) -> str | None: + """Publish a kind 31925 completion. Returns the Nostr event id so the + caller can persist it as the completion's primary key, replacing the + locally-generated hash from the optimistic insert.""" + try: + from . import nostr_client + + keys = await _account_keys(task.wallet) + if not keys: + return None + pubkey, prvkey = keys + + nostr_event = await publish_completion_to_nostr( + nostr_client, task.address, completion, pubkey, prvkey + ) + return nostr_event.id if nostr_event else None + except Exception as exc: + logger.warning(f"[TASKS] Nostr completion publish failed: {exc}") + return None + + +async def publish_completion_delete( + wallet_id: str, completion_id: str +) -> None: + """Publish a NIP-09 delete for a previously-published completion.""" + try: + from . import nostr_client + + keys = await _account_keys(wallet_id) + if not keys: + return + pubkey, prvkey = keys + + await publish_completion_delete_to_nostr( + nostr_client, completion_id, pubkey, prvkey + ) + except Exception as exc: + logger.warning(f"[TASKS] Nostr completion delete failed: {exc}") diff --git a/nostr_publisher.py b/nostr_publisher.py new file mode 100644 index 0000000..4198906 --- /dev/null +++ b/nostr_publisher.py @@ -0,0 +1,210 @@ +""" +NIP-52 calendar-event publishing for the tasks extension. + +Builds: +- kind 31922 task events (with `event-type: task` so the activities + feed can filter us out — see aiolabs/webapp#25 for context), +- kind 31925 RSVP-shaped completion events (carrying `task-status`, + optional `occurrence`, and `completed_at`), +- kind 5 deletes referencing either an `a` (for tasks) or `e` tag + (for completions). + +Signs with the wallet owner's Account keypair and publishes via the +NostrClient wrapping nostrclient's WebSocket. +""" + +import time + +import coincurve +from loguru import logger + +from .models import Task, TaskCompletion +from .nostr.event import NostrEvent + + +def build_task_event(task: Task, pubkey: str) -> NostrEvent: + """Convert a Task to a NIP-52 kind 31922 calendar event tagged + `event-type: task` so it is recognizable as a task on shared relays.""" + tags: list[list[str]] = [ + ["d", task.d_tag], + ["title", task.title], + ["start", task.start_date], + ["event-type", "task"], + ] + + if task.end_date: + tags.append(["end", task.end_date]) + if task.location: + tags.append(["location", task.location]) + if task.status: + tags.append(["status", task.status]) + + for cat in task.categories or []: + tags.append(["t", cat]) + + for participant in task.participants or []: + p_tag = ["p", participant.pubkey] + if participant.type: + # NIP-52 'p' tag accepts a 'relay' slot before role; we leave + # it empty (just ["p", pk, "", role]) to keep parity with the + # webapp emitter. + p_tag.extend(["", participant.type]) + tags.append(p_tag) + + if task.recurrence: + tags.append(["recurrence", task.recurrence.frequency]) + if task.recurrence.day_of_week: + tags.append(["recurrence-day", task.recurrence.day_of_week]) + if task.recurrence.end_date: + tags.append(["recurrence-end", task.recurrence.end_date]) + + nostr_event = NostrEvent( + pubkey=pubkey, + created_at=int(time.time()), + kind=31922, + tags=tags, + content=task.description or "", + ) + nostr_event.id = nostr_event.event_id + return nostr_event + + +def build_completion_event( + task_address: str, completion: TaskCompletion, pubkey: str +) -> NostrEvent: + """Build a kind 31925 RSVP carrying our extension's `task-status` tag.""" + tags: list[list[str]] = [ + ["a", task_address], + ["task-status", completion.task_status], + ] + if completion.occurrence: + tags.append(["occurrence", completion.occurrence]) + if completion.completed_at: + tags.append(["completed_at", str(completion.completed_at)]) + + nostr_event = NostrEvent( + pubkey=pubkey, + created_at=int(time.time()), + kind=31925, + tags=tags, + content=completion.notes or "", + ) + nostr_event.id = nostr_event.event_id + return nostr_event + + +def build_delete_task_event(task: Task, pubkey: str) -> NostrEvent: + """Kind 5 deletion of a (replaceable) kind 31922 task via its `a` tag.""" + nostr_event = NostrEvent( + pubkey=pubkey, + created_at=int(time.time()), + kind=5, + tags=[ + ["a", f"31922:{pubkey}:{task.d_tag}"], + ["k", "31922"], + ], + content="Task deleted", + ) + nostr_event.id = nostr_event.event_id + return nostr_event + + +def build_delete_completion_event( + completion_id: str, pubkey: str +) -> NostrEvent: + """Kind 5 deletion of a kind 31925 completion via its `e` tag (the + completion is non-replaceable so we reference by event id, not address).""" + nostr_event = NostrEvent( + pubkey=pubkey, + created_at=int(time.time()), + kind=5, + tags=[ + ["e", completion_id], + ["k", "31925"], + ], + content="Task unclaimed", + ) + nostr_event.id = nostr_event.event_id + return nostr_event + + +def sign_nostr_event(nostr_event: NostrEvent, private_key_hex: str) -> None: + privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex)) + sig = privkey.sign_schnorr(bytes.fromhex(nostr_event.id)) + nostr_event.sig = sig.hex() + + +async def publish_task_to_nostr( + nostr_client, + task: Task, + account_pubkey: str, + account_prvkey: str, + *, + delete: bool = False, +) -> NostrEvent | None: + if not nostr_client: + logger.debug("[TASKS] No NostrClient available, skipping publish") + return None + try: + nostr_event = ( + build_delete_task_event(task, account_pubkey) + if delete + else build_task_event(task, account_pubkey) + ) + sign_nostr_event(nostr_event, account_prvkey) + await nostr_client.publish_nostr_event(nostr_event) + logger.info( + f"[TASKS] Published {'delete' if delete else '31922 task'} " + f"{nostr_event.id[:16]}... (kind {nostr_event.kind})" + ) + return nostr_event + except Exception as e: + logger.warning(f"[TASKS] Failed to publish task: {e}") + return None + + +async def publish_completion_to_nostr( + nostr_client, + task_address: str, + completion: TaskCompletion, + account_pubkey: str, + account_prvkey: str, +) -> NostrEvent | None: + if not nostr_client: + logger.debug("[TASKS] No NostrClient available, skipping publish") + return None + try: + nostr_event = build_completion_event( + task_address, completion, account_pubkey + ) + sign_nostr_event(nostr_event, account_prvkey) + await nostr_client.publish_nostr_event(nostr_event) + logger.info( + f"[TASKS] Published 31925 completion {nostr_event.id[:16]}... " + f"(status: {completion.task_status})" + ) + return nostr_event + except Exception as e: + logger.warning(f"[TASKS] Failed to publish completion: {e}") + return None + + +async def publish_completion_delete_to_nostr( + nostr_client, + completion_id: str, + account_pubkey: str, + account_prvkey: str, +) -> NostrEvent | None: + if not nostr_client: + return None + try: + nostr_event = build_delete_completion_event(completion_id, account_pubkey) + sign_nostr_event(nostr_event, account_prvkey) + await nostr_client.publish_nostr_event(nostr_event) + logger.info( + f"[TASKS] Published completion delete {nostr_event.id[:16]}..." + ) + return nostr_event + except Exception as e: + logger.warning(f"[TASKS] Failed to publish completion delete: {e}") + return None diff --git a/nostr_sync.py b/nostr_sync.py new file mode 100644 index 0000000..36a4a90 --- /dev/null +++ b/nostr_sync.py @@ -0,0 +1,291 @@ +""" +Bidirectional Nostr sync for the tasks extension. + +Subscribes to NIP-52 kind 31922 (filtered to `event-type: task` client-side +because not all relays index our custom tag), kind 31925 completions, and +kind 5 deletions targeting either. Upserts everything into the local DB +so federated tasks published by other LNbits instances or webapp clients +show up in `tasks.tasks` / `tasks.completions`. +""" + +import asyncio +import json +from datetime import datetime, timezone + +from loguru import logger + +from .crud import ( + create_completion, + db, + delete_completion, + delete_task, + delete_task_completions, + get_completion_by_id, + get_task_by_address, + update_task, +) +from .models import ( + CreateTaskCompletion, + Participant, + Recurrence, + Task, +) +from .nostr.nostr_client import NostrClient + + +def _is_task_event(event_data: dict) -> bool: + """Filter 31922 events to ones that opt in to the task convention. + + See aiolabs/webapp#25: kind 31922 is shared with activities so we + must whitelist `event-type: task` to avoid swallowing community + activity events.""" + for tag in event_data.get("tags", []): + if len(tag) >= 2 and tag[0] == "event-type" and tag[1] == "task": + return True + return False + + +async def _handle_task_event(nostr_client: NostrClient, event_data: dict) -> None: + event_id = event_data.get("id", "") + if nostr_client.is_duplicate_event(event_id): + return + if not _is_task_event(event_data): + return + + raw_tags = event_data.get("tags", []) + single_tags = {t[0]: t[1] for t in raw_tags if len(t) >= 2} + tag_lists: dict[str, list[list[str]]] = {} + for t in raw_tags: + if len(t) >= 1: + tag_lists.setdefault(t[0], []).append(t) + + d_tag = single_tags.get("d") + if not d_tag: + return + start = single_tags.get("start") + if not start: + return + + title = single_tags.get("title", "Untitled Task") + end = single_tags.get("end") + description = event_data.get("content", "") + location = single_tags.get("location") + status = single_tags.get("status", "pending") + event_type = single_tags.get("event-type", "task") + + categories = [t[1] for t in tag_lists.get("t", []) if len(t) >= 2] + participants: list[Participant] = [] + for t in tag_lists.get("p", []): + if len(t) < 2: + continue + ptype = t[3] if len(t) >= 4 else None + participants.append(Participant(pubkey=t[1], type=ptype)) + + recurrence = None + rec_freq = single_tags.get("recurrence") + if rec_freq in ("daily", "weekly"): + recurrence = Recurrence( + frequency=rec_freq, + day_of_week=single_tags.get("recurrence-day"), + end_date=single_tags.get("recurrence-end"), + ) + + incoming_created_at = event_data.get("created_at", 0) + pubkey = event_data.get("pubkey", "") + + existing = await get_task_by_address(pubkey, d_tag) + if existing: + if ( + existing.nostr_event_created_at + and incoming_created_at <= existing.nostr_event_created_at + ): + return + existing.title = title + existing.start_date = start + existing.end_date = end + existing.description = description + existing.location = location + existing.status = status + existing.event_type = event_type + existing.participants = participants + existing.categories = categories + existing.recurrence = recurrence + existing.nostr_event_id = event_id + existing.nostr_event_created_at = incoming_created_at + await update_task(existing) + logger.info(f"[TASKS] Updated task from Nostr: {title}") + return + + # New task discovered from Nostr — wallet stays empty (we don't own it). + new_task = Task( + id=d_tag, + wallet="", + pubkey=pubkey, + d_tag=d_tag, + title=title, + start_date=start, + end_date=end, + description=description, + location=location, + status=status, + event_type=event_type, + participants=participants, + categories=categories, + recurrence=recurrence, + nostr_event_id=event_id, + nostr_event_created_at=incoming_created_at, + time=datetime.now(timezone.utc), + ) + try: + # Bypass db.insert so we can route the JSON-encoded payload through + # the same serializer crud.create_task uses. + from .crud import _serialize_task_jsonb + + await db.execute( + """ + INSERT INTO tasks.tasks ( + id, wallet, pubkey, d_tag, title, start_date, end_date, + description, location, status, event_type, participants, + categories, recurrence, nostr_event_id, + nostr_event_created_at, time + ) VALUES ( + :id, :wallet, :pubkey, :d_tag, :title, :start_date, + :end_date, :description, :location, :status, :event_type, + :participants, :categories, :recurrence, :nostr_event_id, + :nostr_event_created_at, :time + ) + """, + _serialize_task_jsonb(new_task.dict()), + ) + logger.info(f"[TASKS] Discovered task from Nostr: {title}") + except Exception as e: + logger.debug(f"[TASKS] Skipped duplicate task insert: {e}") + + +async def _handle_completion_event( + nostr_client: NostrClient, event_data: dict +) -> None: + event_id = event_data.get("id", "") + if nostr_client.is_duplicate_event(event_id): + return + + raw_tags = event_data.get("tags", []) + single_tags = {t[0]: t[1] for t in raw_tags if len(t) >= 2} + + task_address = single_tags.get("a") + if not task_address or not task_address.startswith("31922:"): + return + + task_status = single_tags.get("task-status") + if task_status not in ( + "claimed", "in-progress", "completed", "blocked", "cancelled" + ): + # Fall back to NIP-52 `status` semantics if a plain RSVP arrives + # claiming/accepting the task. Treat anything else as a claim. + task_status = "claimed" + + completed_at_str = single_tags.get("completed_at") + completed_at = int(completed_at_str) if completed_at_str else None + occurrence = single_tags.get("occurrence") + + pubkey = event_data.get("pubkey", "") + incoming_created_at = event_data.get("created_at", 0) + + # If we already have a newer completion for this triple, skip. + existing_dup = await get_completion_by_id(event_id) + if existing_dup: + return + + await create_completion( + pubkey, + CreateTaskCompletion( + task_address=task_address, + task_status=task_status, + occurrence=occurrence, + completed_at=completed_at, + notes=event_data.get("content", ""), + ), + nostr_event_id=event_id, + nostr_created_at=incoming_created_at, + ) + + +async def _handle_deletion_event(event_data: dict) -> None: + """Route NIP-09 deletions by their `k` tag to the right resource.""" + raw_tags = event_data.get("tags", []) + k_tag = next( + (t[1] for t in raw_tags if len(t) >= 2 and t[0] == "k"), + None, + ) + author = event_data.get("pubkey", "") + + if k_tag == "31922": + # Delete by address: only honor when the deleter is the task's author. + for t in raw_tags: + if len(t) >= 2 and t[0] == "a" and t[1].startswith("31922:"): + _, addr_pubkey, d_tag = t[1].split(":", 2) + if addr_pubkey != author: + continue + existing = await get_task_by_address(addr_pubkey, d_tag) + if existing: + await delete_task(existing.id) + await delete_task_completions(existing.address) + logger.info(f"[TASKS] Deleted task from Nostr: {d_tag}") + + elif k_tag == "31925": + for t in raw_tags: + if len(t) >= 2 and t[0] == "e": + comp = await get_completion_by_id(t[1]) + if comp and comp.pubkey == author: + await delete_completion(comp.id) + + +async def process_nostr_message(nostr_client: NostrClient, message: str) -> None: + try: + data = json.loads(message) + except json.JSONDecodeError: + return + if not isinstance(data, list) or len(data) < 2: + return + + msg_type = data[0] + if msg_type == "EVENT" and len(data) >= 3: + event_data = data[2] + kind = event_data.get("kind") + if kind == 31922: + await _handle_task_event(nostr_client, event_data) + elif kind == 31925: + await _handle_completion_event(nostr_client, event_data) + elif kind == 5: + await _handle_deletion_event(event_data) + elif msg_type == "EOSE": + logger.debug("[TASKS] End of stored events from relay") + elif msg_type == "NOTICE": + logger.info(f"[TASKS] Relay notice: {data[1]}") + + +async def wait_for_nostr_events(nostr_client: NostrClient) -> None: + """Background task: subscribe and feed inbound events through the + handlers above. We can't use the `#event-type=task` filter at the + relay level because most relays don't index custom single-letter + tags, so we accept all 31922 and filter in _handle_task_event.""" + logger.info("[TASKS] Starting Nostr event sync...") + + while True: + try: + await nostr_client.subscribe( + [ + {"kinds": [31922]}, + {"kinds": [31925]}, + {"kinds": [5], "#k": ["31922", "31925"]}, + ] + ) + while True: + message = await nostr_client.get_event() + await process_nostr_message(nostr_client, message) + except ValueError: + logger.warning("[TASKS] Nostr connection lost, resubscribing...") + await asyncio.sleep(10) + except Exception as e: + logger.error(f"[TASKS] Nostr sync error: {e}") + await asyncio.sleep(30)