diff --git a/__init__.py b/__init__.py index 974c20c..8d42ecf 100644 --- a/__init__.py +++ b/__init__.py @@ -50,14 +50,32 @@ def events_start(): from .nostr.nostr_client import NostrClient nostr_client = NostrClient() - logger.info("[EVENTS] Starting NostrClient for NIP-52 publishing") + logger.info("[EVENTS] Starting NostrClient for NIP-52 sync") await nostr_client.run_forever() except Exception as e: logger.warning(f"[EVENTS] NostrClient failed to start: {e}") - logger.info("[EVENTS] Events will work without Nostr publishing") + logger.info("[EVENTS] Events will work without Nostr sync") task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client) scheduled_tasks.append(task2) + async def _sync_nostr_events(): + global nostr_client + await asyncio.sleep(15) # Wait for NostrClient to connect + if not nostr_client: + logger.info("[EVENTS] No NostrClient, skipping Nostr sync") + return + try: + from .nostr_sync import wait_for_nostr_events + + await wait_for_nostr_events(nostr_client) + except Exception as e: + logger.error(f"[EVENTS] Nostr sync task failed: {e}") + + task3 = create_permanent_unique_task( + "ext_events_nostr_sync", _sync_nostr_events + ) + scheduled_tasks.append(task3) + __all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"] diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index 09f57d4..f8f63de 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -1,30 +1,36 @@ """ -Publish-only Nostr client for the events extension. +Bidirectional Nostr client for the events extension. Connects to the nostrclient extension's internal WebSocket to publish -NIP-52 calendar events. No subscription/receive capabilities — this -is a stripped-down version of nostrmarket's NostrClient. +and subscribe to NIP-52 calendar events. Based on nostrmarket's +NostrClient pattern. """ import asyncio import json from asyncio import Queue +from collections import OrderedDict from typing import Optional from loguru import logger from websocket import WebSocketApp -from lnbits.helpers import encrypt_internal_message +from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash from lnbits.settings import settings from .event import NostrEvent +MAX_SEEN_EVENTS = 500 + class NostrClient: def __init__(self): + self.receive_event_queue: Queue = Queue() self.send_req_queue: Queue = Queue() self.ws: Optional[WebSocketApp] = None + self.subscription_id = "events-" + urlsafe_short_hash()[:32] self.running = False + self._seen_events: OrderedDict[str, None] = OrderedDict() @property def is_websocket_connected(self): @@ -45,8 +51,10 @@ class NostrClient: logger.info("[EVENTS] Connected to nostrclient WebSocket") def on_message(_, message): - # Log relay responses (OK, NOTICE) but don't process - logger.debug(f"[EVENTS] Relay response: {message[:200]}") + try: + self.receive_event_queue.put_nowait(message) + except Exception as e: + logger.error(f"[EVENTS] Failed to queue message: {e}") def on_error(_, error): logger.warning(f"[EVENTS] WebSocket error: {error}") @@ -55,6 +63,9 @@ class NostrClient: logger.warning( f"[EVENTS] WebSocket closed: {status_code} {message}" ) + self.receive_event_queue.put_nowait( + ValueError("WebSocket closed") + ) ws = WebSocketApp( ws_url, @@ -87,11 +98,44 @@ class NostrClient: logger.warning(f"[EVENTS] NostrClient error: {ex}") await asyncio.sleep(60) + def is_duplicate_event(self, event_id: str) -> bool: + """Check if an event has been seen recently.""" + if event_id in self._seen_events: + return True + self._seen_events[event_id] = None + if len(self._seen_events) > MAX_SEEN_EVENTS: + self._seen_events.popitem(last=False) + return False + + async def get_event(self): + """Get next event from the receive queue.""" + value = await self.receive_event_queue.get() + if isinstance(value, ValueError): + raise value + return value + async def publish_nostr_event(self, e: NostrEvent): await self.send_req_queue.put(["EVENT", e.dict()]) + async def subscribe(self, filters: list[dict]): + """Subscribe to events matching the given filters.""" + self.subscription_id = "events-" + urlsafe_short_hash()[:32] + await self.send_req_queue.put( + ["REQ", self.subscription_id] + filters + ) + logger.info( + f"[EVENTS] Subscribed to NIP-52 events " + f"(sub: {self.subscription_id[:20]}...)" + ) + + async def unsubscribe(self): + """Unsubscribe from current subscription.""" + await self.send_req_queue.put(["CLOSE", self.subscription_id]) + async def stop(self): + await self.unsubscribe() self.running = False + await asyncio.sleep(2) if self.ws: try: self.ws.close() diff --git a/nostr_sync.py b/nostr_sync.py new file mode 100644 index 0000000..1c508dc --- /dev/null +++ b/nostr_sync.py @@ -0,0 +1,170 @@ +""" +Bidirectional Nostr sync for the events extension. + +Subscribes to NIP-52 calendar events (kind 31922/31923) from relays +and upserts them into the local database. Enables federated event +discovery — events published by other LNbits instances or Nostr +clients appear in the local events listing. +""" + +import json +from datetime import datetime, timezone + +from loguru import logger + +from .crud import create_event, db, get_event, update_event +from .models import CreateEvent, Event +from .nostr.nostr_client import NostrClient + + +async def process_nostr_message(nostr_client: NostrClient, message: str): + """Process an incoming Nostr relay message.""" + try: + data = json.loads(message) + except json.JSONDecodeError: + return + + if not isinstance(data, list) or len(data) < 2: + return + + msg_type = data[0] + + if msg_type == "EVENT" and len(data) >= 3: + event_data = data[2] + await _handle_calendar_event(nostr_client, event_data) + elif msg_type == "EOSE": + logger.debug("[EVENTS] End of stored events from relay") + elif msg_type == "NOTICE": + logger.info(f"[EVENTS] Relay notice: {data[1]}") + + +async def _handle_calendar_event(nostr_client: NostrClient, event_data: dict): + """Handle an incoming NIP-52 calendar event (kind 31922 or 31923).""" + kind = event_data.get("kind") + if kind not in (31922, 31923): + return + + event_id = event_data.get("id", "") + if nostr_client.is_duplicate_event(event_id): + return + + tags = {t[0]: t[1] for t in event_data.get("tags", []) if len(t) >= 2} + tag_lists = {} + for t in event_data.get("tags", []): + if len(t) >= 2: + tag_lists.setdefault(t[0], []).append(t[1]) + + d_tag = tags.get("d") + if not d_tag: + return + + title = tags.get("title", "Untitled Event") + start = tags.get("start") + if not start: + return + + end = tags.get("end") + description = event_data.get("content", "") + image = tags.get("image") + location = tags.get("location") + categories = tag_lists.get("t", []) + + # Check if we already have this event (by d-tag as our event ID + # or by nostr_event_id) + existing = await get_event(d_tag) + if not existing: + # Check by nostr_event_id + existing = await db.fetchone( + "SELECT * FROM events.events WHERE nostr_event_id = :nid", + {"nid": event_id}, + Event, + ) + + if existing: + # Update if the incoming event is newer + incoming_created_at = event_data.get("created_at", 0) + if ( + existing.nostr_event_created_at + and incoming_created_at <= existing.nostr_event_created_at + ): + return # We already have a newer version + + existing.name = title + existing.info = description + existing.event_start_date = start + existing.event_end_date = end + existing.banner = image + existing.location = location + existing.categories = categories + existing.nostr_event_id = event_id + existing.nostr_event_created_at = incoming_created_at + await update_event(existing) + logger.info(f"[EVENTS] Updated event from Nostr: {title}") + else: + # Create new event from Nostr + # Events discovered from Nostr are auto-approved (they're already public) + event = CreateEvent( + wallet="", # No wallet — discovered from Nostr, not ticketed locally + name=title, + info=description, + event_start_date=start, + event_end_date=end, + banner=image, + location=location, + categories=categories, + status="approved", + ) + # Use the d-tag as the event ID for correlation + from lnbits.db import Database + + new_event = Event( + id=d_tag, + wallet="", + name=title, + info=description, + event_start_date=start, + event_end_date=end, + banner=image, + location=location, + categories=categories, + status="approved", + time=datetime.now(timezone.utc), + nostr_event_id=event_id, + nostr_event_created_at=event_data.get("created_at", 0), + ) + try: + await db.insert("events.events", new_event) + logger.info(f"[EVENTS] Discovered event from Nostr: {title}") + except Exception as e: + # Likely duplicate key — skip + logger.debug(f"[EVENTS] Skipped duplicate event: {e}") + + +async def wait_for_nostr_events(nostr_client: NostrClient): + """ + Background task: subscribe to NIP-52 events and process them. + """ + logger.info("[EVENTS] Starting Nostr event sync...") + + while True: + try: + # Subscribe to NIP-52 calendar events + await nostr_client.subscribe([ + {"kinds": [31922, 31923]}, + ]) + + # Process incoming events + while True: + message = await nostr_client.get_event() + await process_nostr_message(nostr_client, message) + + except ValueError: + # WebSocket closed — will reconnect + logger.warning("[EVENTS] Nostr connection lost, resubscribing...") + await asyncio.sleep(10) + except Exception as e: + logger.error(f"[EVENTS] Nostr sync error: {e}") + await asyncio.sleep(30) + + +import asyncio # noqa: E402