diff --git a/__init__.py b/__init__.py index 8d42ecf..974c20c 100644 --- a/__init__.py +++ b/__init__.py @@ -50,32 +50,14 @@ def events_start(): from .nostr.nostr_client import NostrClient nostr_client = NostrClient() - logger.info("[EVENTS] Starting NostrClient for NIP-52 sync") + logger.info("[EVENTS] Starting NostrClient for NIP-52 publishing") await nostr_client.run_forever() except Exception as e: logger.warning(f"[EVENTS] NostrClient failed to start: {e}") - logger.info("[EVENTS] Events will work without Nostr sync") + logger.info("[EVENTS] Events will work without Nostr publishing") task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client) scheduled_tasks.append(task2) - async def _sync_nostr_events(): - global nostr_client - await asyncio.sleep(15) # Wait for NostrClient to connect - if not nostr_client: - logger.info("[EVENTS] No NostrClient, skipping Nostr sync") - return - try: - from .nostr_sync import wait_for_nostr_events - - await wait_for_nostr_events(nostr_client) - except Exception as e: - logger.error(f"[EVENTS] Nostr sync task failed: {e}") - - task3 = create_permanent_unique_task( - "ext_events_nostr_sync", _sync_nostr_events - ) - scheduled_tasks.append(task3) - __all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"] diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index f8f63de..09f57d4 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -1,36 +1,30 @@ """ -Bidirectional Nostr client for the events extension. +Publish-only Nostr client for the events extension. Connects to the nostrclient extension's internal WebSocket to publish -and subscribe to NIP-52 calendar events. Based on nostrmarket's -NostrClient pattern. +NIP-52 calendar events. No subscription/receive capabilities — this +is a stripped-down version of nostrmarket's NostrClient. """ import asyncio import json from asyncio import Queue -from collections import OrderedDict from typing import Optional from loguru import logger from websocket import WebSocketApp -from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash +from lnbits.helpers import encrypt_internal_message from lnbits.settings import settings from .event import NostrEvent -MAX_SEEN_EVENTS = 500 - class NostrClient: def __init__(self): - self.receive_event_queue: Queue = Queue() self.send_req_queue: Queue = Queue() self.ws: Optional[WebSocketApp] = None - self.subscription_id = "events-" + urlsafe_short_hash()[:32] self.running = False - self._seen_events: OrderedDict[str, None] = OrderedDict() @property def is_websocket_connected(self): @@ -51,10 +45,8 @@ class NostrClient: logger.info("[EVENTS] Connected to nostrclient WebSocket") def on_message(_, message): - try: - self.receive_event_queue.put_nowait(message) - except Exception as e: - logger.error(f"[EVENTS] Failed to queue message: {e}") + # Log relay responses (OK, NOTICE) but don't process + logger.debug(f"[EVENTS] Relay response: {message[:200]}") def on_error(_, error): logger.warning(f"[EVENTS] WebSocket error: {error}") @@ -63,9 +55,6 @@ class NostrClient: logger.warning( f"[EVENTS] WebSocket closed: {status_code} {message}" ) - self.receive_event_queue.put_nowait( - ValueError("WebSocket closed") - ) ws = WebSocketApp( ws_url, @@ -98,44 +87,11 @@ class NostrClient: logger.warning(f"[EVENTS] NostrClient error: {ex}") await asyncio.sleep(60) - def is_duplicate_event(self, event_id: str) -> bool: - """Check if an event has been seen recently.""" - if event_id in self._seen_events: - return True - self._seen_events[event_id] = None - if len(self._seen_events) > MAX_SEEN_EVENTS: - self._seen_events.popitem(last=False) - return False - - async def get_event(self): - """Get next event from the receive queue.""" - value = await self.receive_event_queue.get() - if isinstance(value, ValueError): - raise value - return value - async def publish_nostr_event(self, e: NostrEvent): await self.send_req_queue.put(["EVENT", e.dict()]) - async def subscribe(self, filters: list[dict]): - """Subscribe to events matching the given filters.""" - self.subscription_id = "events-" + urlsafe_short_hash()[:32] - await self.send_req_queue.put( - ["REQ", self.subscription_id] + filters - ) - logger.info( - f"[EVENTS] Subscribed to NIP-52 events " - f"(sub: {self.subscription_id[:20]}...)" - ) - - async def unsubscribe(self): - """Unsubscribe from current subscription.""" - await self.send_req_queue.put(["CLOSE", self.subscription_id]) - async def stop(self): - await self.unsubscribe() self.running = False - await asyncio.sleep(2) if self.ws: try: self.ws.close() diff --git a/nostr_sync.py b/nostr_sync.py deleted file mode 100644 index 1c508dc..0000000 --- a/nostr_sync.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -Bidirectional Nostr sync for the events extension. - -Subscribes to NIP-52 calendar events (kind 31922/31923) from relays -and upserts them into the local database. Enables federated event -discovery — events published by other LNbits instances or Nostr -clients appear in the local events listing. -""" - -import json -from datetime import datetime, timezone - -from loguru import logger - -from .crud import create_event, db, get_event, update_event -from .models import CreateEvent, Event -from .nostr.nostr_client import NostrClient - - -async def process_nostr_message(nostr_client: NostrClient, message: str): - """Process an incoming Nostr relay message.""" - try: - data = json.loads(message) - except json.JSONDecodeError: - return - - if not isinstance(data, list) or len(data) < 2: - return - - msg_type = data[0] - - if msg_type == "EVENT" and len(data) >= 3: - event_data = data[2] - await _handle_calendar_event(nostr_client, event_data) - elif msg_type == "EOSE": - logger.debug("[EVENTS] End of stored events from relay") - elif msg_type == "NOTICE": - logger.info(f"[EVENTS] Relay notice: {data[1]}") - - -async def _handle_calendar_event(nostr_client: NostrClient, event_data: dict): - """Handle an incoming NIP-52 calendar event (kind 31922 or 31923).""" - kind = event_data.get("kind") - if kind not in (31922, 31923): - return - - event_id = event_data.get("id", "") - if nostr_client.is_duplicate_event(event_id): - return - - tags = {t[0]: t[1] for t in event_data.get("tags", []) if len(t) >= 2} - tag_lists = {} - for t in event_data.get("tags", []): - if len(t) >= 2: - tag_lists.setdefault(t[0], []).append(t[1]) - - d_tag = tags.get("d") - if not d_tag: - return - - title = tags.get("title", "Untitled Event") - start = tags.get("start") - if not start: - return - - end = tags.get("end") - description = event_data.get("content", "") - image = tags.get("image") - location = tags.get("location") - categories = tag_lists.get("t", []) - - # Check if we already have this event (by d-tag as our event ID - # or by nostr_event_id) - existing = await get_event(d_tag) - if not existing: - # Check by nostr_event_id - existing = await db.fetchone( - "SELECT * FROM events.events WHERE nostr_event_id = :nid", - {"nid": event_id}, - Event, - ) - - if existing: - # Update if the incoming event is newer - incoming_created_at = event_data.get("created_at", 0) - if ( - existing.nostr_event_created_at - and incoming_created_at <= existing.nostr_event_created_at - ): - return # We already have a newer version - - existing.name = title - existing.info = description - existing.event_start_date = start - existing.event_end_date = end - existing.banner = image - existing.location = location - existing.categories = categories - existing.nostr_event_id = event_id - existing.nostr_event_created_at = incoming_created_at - await update_event(existing) - logger.info(f"[EVENTS] Updated event from Nostr: {title}") - else: - # Create new event from Nostr - # Events discovered from Nostr are auto-approved (they're already public) - event = CreateEvent( - wallet="", # No wallet — discovered from Nostr, not ticketed locally - name=title, - info=description, - event_start_date=start, - event_end_date=end, - banner=image, - location=location, - categories=categories, - status="approved", - ) - # Use the d-tag as the event ID for correlation - from lnbits.db import Database - - new_event = Event( - id=d_tag, - wallet="", - name=title, - info=description, - event_start_date=start, - event_end_date=end, - banner=image, - location=location, - categories=categories, - status="approved", - time=datetime.now(timezone.utc), - nostr_event_id=event_id, - nostr_event_created_at=event_data.get("created_at", 0), - ) - try: - await db.insert("events.events", new_event) - logger.info(f"[EVENTS] Discovered event from Nostr: {title}") - except Exception as e: - # Likely duplicate key — skip - logger.debug(f"[EVENTS] Skipped duplicate event: {e}") - - -async def wait_for_nostr_events(nostr_client: NostrClient): - """ - Background task: subscribe to NIP-52 events and process them. - """ - logger.info("[EVENTS] Starting Nostr event sync...") - - while True: - try: - # Subscribe to NIP-52 calendar events - await nostr_client.subscribe([ - {"kinds": [31922, 31923]}, - ]) - - # Process incoming events - while True: - message = await nostr_client.get_event() - await process_nostr_message(nostr_client, message) - - except ValueError: - # WebSocket closed — will reconnect - logger.warning("[EVENTS] Nostr connection lost, resubscribing...") - await asyncio.sleep(10) - except Exception as e: - logger.error(f"[EVENTS] Nostr sync error: {e}") - await asyncio.sleep(30) - - -import asyncio # noqa: E402