diff --git a/__init__.py b/__init__.py index ef37528..c2f2300 100644 --- a/__init__.py +++ b/__init__.py @@ -22,6 +22,11 @@ events_static_files = [ scheduled_tasks: list[asyncio.Task] = [] +# Module-level NostrClient — None when nostrclient is unavailable. Set by the +# bootstrap task in events_start() and read via dynamic attribute lookup +# from nostr_hooks.publish_or_delete_nostr_event. +nostr_client = None + def events_stop(): for task in scheduled_tasks: @@ -30,12 +35,50 @@ def events_stop(): except Exception as ex: logger.warning(ex) + global nostr_client + if nostr_client: + asyncio.get_event_loop().create_task(nostr_client.stop()) + def events_start(): from lnbits.tasks import create_permanent_unique_task - task = create_permanent_unique_task("ext_events", wait_for_paid_invoices) - scheduled_tasks.append(task) + task1 = create_permanent_unique_task("ext_events", wait_for_paid_invoices) + scheduled_tasks.append(task1) + + async def _start_nostr_client(): + global nostr_client + await asyncio.sleep(10) # Wait for nostrclient to be ready + try: + from .nostr.nostr_client import NostrClient + + nostr_client = NostrClient() + logger.info("[EVENTS] Starting NostrClient for NIP-52 sync") + await nostr_client.run_forever() + except Exception as exc: + logger.warning(f"[EVENTS] NostrClient failed to start: {exc}") + logger.info("[EVENTS] Events will work without Nostr sync") + + task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client) + scheduled_tasks.append(task2) + + async def _sync_nostr_events(): + global nostr_client + await asyncio.sleep(15) # Wait for NostrClient to connect + if not nostr_client: + logger.info("[EVENTS] No NostrClient, skipping Nostr sync") + return + try: + from .nostr_sync import wait_for_nostr_events + + await wait_for_nostr_events(nostr_client) + except Exception as exc: + logger.error(f"[EVENTS] Nostr sync task failed: {exc}") + + task3 = create_permanent_unique_task( + "ext_events_nostr_sync", _sync_nostr_events + ) + scheduled_tasks.append(task3) __all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"] diff --git a/models.py b/models.py index a9cb31b..e759d66 100644 --- a/models.py +++ b/models.py @@ -1,3 +1,4 @@ +import json from datetime import datetime from pydantic import BaseModel, EmailStr, Field, root_validator, validator @@ -42,6 +43,8 @@ class CreateEvent(BaseModel): amount_tickets: int = 0 # 0 = unlimited / not ticketed price_per_ticket: float = 0 # 0 = free banner: str | None = None + location: str | None = None # venue/address (NIP-52 'location' tag) + categories: list[str] = Field(default_factory=list) # NIP-52 't' tags extra: EventExtra = Field(default_factory=EventExtra) status: str = "approved" # proposed, approved, rejected @@ -63,8 +66,18 @@ class Event(BaseModel): time: datetime sold: int = 0 banner: str | None = None + location: str | None = None + categories: list[str] = Field(default_factory=list) extra: EventExtra = Field(default_factory=EventExtra) status: str = "approved" + nostr_event_id: str | None = None + nostr_event_created_at: int | None = None + + @validator("categories", pre=True) + def parse_categories(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] class PublicEvent(BaseModel): @@ -80,9 +93,17 @@ class PublicEvent(BaseModel): fiat_currency: str = "GBP" price_per_ticket: float banner: str | None + location: str | None = None + categories: list[str] = Field(default_factory=list) extra: EventExtra = Field(default_factory=EventExtra) status: str = "approved" # surfaces "proposed"/"rejected" so SFC can render banner + @validator("categories", pre=True) + def parse_categories(cls, v): + if isinstance(v, str): + return json.loads(v) if v else [] + return v or [] + class EventsSettings(BaseModel): """Extension-level settings for the events extension.""" diff --git a/nostr/__init__.py b/nostr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nostr/event.py b/nostr/event.py new file mode 100644 index 0000000..b6832b1 --- /dev/null +++ b/nostr/event.py @@ -0,0 +1,26 @@ +import hashlib +import json + +from pydantic import BaseModel + + +class NostrEvent(BaseModel): + id: str = "" + pubkey: str + created_at: int + kind: int + tags: list[list[str]] = [] + content: str = "" + sig: str | None = None + + def serialize(self) -> list: + return [0, self.pubkey, self.created_at, self.kind, self.tags, self.content] + + def serialize_json(self) -> str: + e = self.serialize() + return json.dumps(e, separators=(",", ":"), ensure_ascii=False) + + @property + def event_id(self) -> str: + data = self.serialize_json() + return hashlib.sha256(data.encode()).hexdigest() diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py new file mode 100644 index 0000000..8e0afc5 --- /dev/null +++ b/nostr/nostr_client.py @@ -0,0 +1,142 @@ +""" +Bidirectional Nostr client for the events extension. + +Connects to the nostrclient extension's internal WebSocket to publish +and subscribe to NIP-52 calendar events. Based on nostrmarket's +NostrClient pattern. +""" + +import asyncio +import json +from asyncio import Queue +from collections import OrderedDict + +from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash +from lnbits.settings import settings +from loguru import logger +from websocket import WebSocketApp + +from .event import NostrEvent + +MAX_SEEN_EVENTS = 500 + + +class NostrClient: + def __init__(self): + self.receive_event_queue: Queue = Queue() + self.send_req_queue: Queue = Queue() + self.ws: WebSocketApp | None = None + self.subscription_id = "events-" + urlsafe_short_hash()[:32] + self.running = False + self._seen_events: OrderedDict[str, None] = OrderedDict() + + @property + def is_websocket_connected(self): + if not self.ws: + return False + return self.ws.keep_running + + async def connect(self) -> WebSocketApp: + relay_endpoint = encrypt_internal_message("relay", urlsafe=True) + ws_url = ( + f"ws://localhost:{settings.port}" + f"/nostrclient/api/v1/{relay_endpoint}" + ) + + logger.info("[EVENTS] Connecting to nostrclient WebSocket...") + + def on_open(_): + logger.info("[EVENTS] Connected to nostrclient WebSocket") + + def on_message(_, message): + try: + self.receive_event_queue.put_nowait(message) + except Exception as e: + logger.error(f"[EVENTS] Failed to queue message: {e}") + + def on_error(_, error): + logger.warning(f"[EVENTS] WebSocket error: {error}") + + def on_close(_, status_code, message): + logger.warning( + f"[EVENTS] WebSocket closed: {status_code} {message}" + ) + self.receive_event_queue.put_nowait( + ValueError("WebSocket closed") + ) + + ws = WebSocketApp( + ws_url, + on_message=on_message, + on_open=on_open, + on_close=on_close, + on_error=on_error, + ) + + from threading import Thread + + wst = Thread(target=ws.run_forever) + wst.daemon = True + wst.start() + + return ws + + async def run_forever(self): + self.running = True + while self.running: + try: + if not self.is_websocket_connected: + self.ws = await self.connect() + await asyncio.sleep(5) + + req = await self.send_req_queue.get() + assert self.ws + self.ws.send(json.dumps(req)) + except Exception as ex: + logger.warning(f"[EVENTS] NostrClient error: {ex}") + await asyncio.sleep(60) + + def is_duplicate_event(self, event_id: str) -> bool: + """Check if an event has been seen recently.""" + if event_id in self._seen_events: + return True + self._seen_events[event_id] = None + if len(self._seen_events) > MAX_SEEN_EVENTS: + self._seen_events.popitem(last=False) + return False + + async def get_event(self): + """Get next event from the receive queue.""" + value = await self.receive_event_queue.get() + if isinstance(value, ValueError): + raise value + return value + + async def publish_nostr_event(self, e: NostrEvent): + await self.send_req_queue.put(["EVENT", e.dict()]) + + async def subscribe(self, filters: list[dict]): + """Subscribe to events matching the given filters.""" + self.subscription_id = "events-" + urlsafe_short_hash()[:32] + await self.send_req_queue.put( + ["REQ", self.subscription_id, *filters] + ) + logger.info( + f"[EVENTS] Subscribed to NIP-52 events " + f"(sub: {self.subscription_id[:20]}...)" + ) + + async def unsubscribe(self): + """Unsubscribe from current subscription.""" + await self.send_req_queue.put(["CLOSE", self.subscription_id]) + + async def stop(self): + await self.unsubscribe() + self.running = False + await asyncio.sleep(2) + if self.ws: + try: + self.ws.close() + except Exception: + pass + self.ws = None diff --git a/nostr_hooks.py b/nostr_hooks.py new file mode 100644 index 0000000..3211b24 --- /dev/null +++ b/nostr_hooks.py @@ -0,0 +1,43 @@ +"""Helpers that bridge event-mutation handlers to the Nostr publisher. + +Lives in its own module so both `events_api_router` and any future router +can call it without importing through `views_api`, which would create an +import cycle (views_api -> nostr_hooks -> nostr_publisher -> models). +""" + +from loguru import logger + +from .crud import update_event +from .models import Event +from .nostr_publisher import publish_event_to_nostr + + +async def publish_or_delete_nostr_event(event: Event, *, delete: bool = False) -> None: + """Publish or delete the NIP-52 calendar event for `event`. + + Pulls the wallet owner's pubkey/prvkey to sign with the user's identity. + Failures are logged and swallowed so a Nostr outage doesn't break the + HTTP flow that triggered the publish. + """ + try: + from lnbits.core.crud.users import get_account + from lnbits.core.crud.wallets import get_wallet + + from . import nostr_client + + wallet_obj = await get_wallet(event.wallet) + if not wallet_obj: + return + account = await get_account(wallet_obj.user) + if not account or not account.pubkey or not account.prvkey: + return + + nostr_event = await publish_event_to_nostr( + nostr_client, event, account.pubkey, account.prvkey, delete=delete + ) + if nostr_event and not delete: + event.nostr_event_id = nostr_event.id + event.nostr_event_created_at = nostr_event.created_at + await update_event(event) + except Exception as exc: + logger.warning(f"[EVENTS] Nostr publish failed: {exc}") diff --git a/nostr_publisher.py b/nostr_publisher.py new file mode 100644 index 0000000..bc109b2 --- /dev/null +++ b/nostr_publisher.py @@ -0,0 +1,118 @@ +""" +NIP-52 calendar event publishing for the events extension. + +Builds kind 31922 (date-based) calendar events from the Event model, +signs them with the event creator's Account keypair, and publishes +via the NostrClient to nostrclient relays. + +Reference: https://github.com/nostr-protocol/nips/blob/master/52.md +""" + +import time + +import coincurve +from loguru import logger + +from .models import Event +from .nostr.event import NostrEvent + + +def build_nip52_event(event: Event, pubkey: str) -> NostrEvent: + """ + Convert an Event model to a NIP-52 kind 31922 (date-based) calendar event. + + Tags: + d - event.id (addressable identifier) + title - event.name + start - event.event_start_date (ISO date string) + end - event.event_end_date (optional) + image - event.banner (optional) + Content: event.info (description) + """ + tags = [ + ["d", event.id], + ["title", event.name], + ["start", event.event_start_date], + ] + + if event.event_end_date: + tags.append(["end", event.event_end_date]) + if event.banner: + tags.append(["image", event.banner]) + if event.location: + tags.append(["location", event.location]) + for cat in (event.categories or []): + tags.append(["t", cat]) + + nostr_event = NostrEvent( + pubkey=pubkey, + created_at=int(time.time()), + kind=31922, + tags=tags, + content=event.info or "", + ) + nostr_event.id = nostr_event.event_id + return nostr_event + + +def build_nip52_delete_event(event: Event, pubkey: str) -> NostrEvent: + """ + Build a kind 5 delete event for a published NIP-52 calendar event. + + Uses an 'a' tag to reference the parameterized replaceable event + (kind 31922) per NIP-09. + """ + nostr_event = NostrEvent( + pubkey=pubkey, + created_at=int(time.time()), + kind=5, + tags=[ + ["a", f"31922:{pubkey}:{event.id}"], + ], + content="Event canceled", + ) + nostr_event.id = nostr_event.event_id + return nostr_event + + +def sign_nostr_event(nostr_event: NostrEvent, private_key_hex: str) -> None: + """Sign a NostrEvent in-place using Schnorr signature.""" + privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex)) + sig = privkey.sign_schnorr(bytes.fromhex(nostr_event.id)) + nostr_event.sig = sig.hex() + + +async def publish_event_to_nostr( + nostr_client, + event: Event, + account_pubkey: str, + account_prvkey: str, + delete: bool = False, +) -> NostrEvent | None: + """ + Build, sign, and publish a NIP-52 calendar event (or delete event). + + Returns the published NostrEvent for metadata storage, or None on failure. + """ + if not nostr_client: + logger.debug("[EVENTS] No NostrClient available, skipping publish") + return None + + try: + if delete: + nostr_event = build_nip52_delete_event(event, account_pubkey) + else: + nostr_event = build_nip52_event(event, account_pubkey) + + sign_nostr_event(nostr_event, account_prvkey) + await nostr_client.publish_nostr_event(nostr_event) + + logger.info( + f"[EVENTS] Published NIP-52 {'delete' if delete else 'calendar'} " + f"event: {nostr_event.id[:16]}... (kind {nostr_event.kind})" + ) + return nostr_event + + except Exception as e: + logger.warning(f"[EVENTS] Failed to publish to Nostr: {e}") + return None diff --git a/nostr_sync.py b/nostr_sync.py new file mode 100644 index 0000000..380da3c --- /dev/null +++ b/nostr_sync.py @@ -0,0 +1,155 @@ +""" +Bidirectional Nostr sync for the events extension. + +Subscribes to NIP-52 calendar events (kind 31922/31923) from relays +and upserts them into the local database. Enables federated event +discovery — events published by other LNbits instances or Nostr +clients appear in the local events listing. +""" + +import asyncio +import json +from datetime import datetime, timezone + +from loguru import logger + +from .crud import db, get_event, update_event +from .models import Event +from .nostr.nostr_client import NostrClient + + +async def process_nostr_message(nostr_client: NostrClient, message: str): + """Process an incoming Nostr relay message.""" + try: + data = json.loads(message) + except json.JSONDecodeError: + return + + if not isinstance(data, list) or len(data) < 2: + return + + msg_type = data[0] + + if msg_type == "EVENT" and len(data) >= 3: + event_data = data[2] + await _handle_calendar_event(nostr_client, event_data) + elif msg_type == "EOSE": + logger.debug("[EVENTS] End of stored events from relay") + elif msg_type == "NOTICE": + logger.info(f"[EVENTS] Relay notice: {data[1]}") + + +async def _handle_calendar_event(nostr_client: NostrClient, event_data: dict): + """Handle an incoming NIP-52 calendar event (kind 31922 or 31923).""" + kind = event_data.get("kind") + if kind not in (31922, 31923): + return + + event_id = event_data.get("id", "") + if nostr_client.is_duplicate_event(event_id): + return + + tags = {t[0]: t[1] for t in event_data.get("tags", []) if len(t) >= 2} + tag_lists = {} + for t in event_data.get("tags", []): + if len(t) >= 2: + tag_lists.setdefault(t[0], []).append(t[1]) + + d_tag = tags.get("d") + if not d_tag: + return + + title = tags.get("title", "Untitled Event") + start = tags.get("start") + if not start: + return + + end = tags.get("end") + description = event_data.get("content", "") + image = tags.get("image") + location = tags.get("location") + categories = tag_lists.get("t", []) + + # Check if we already have this event (by d-tag as our event ID + # or by nostr_event_id) + existing = await get_event(d_tag) + if not existing: + # Check by nostr_event_id + existing = await db.fetchone( + "SELECT * FROM events.events WHERE nostr_event_id = :nid", + {"nid": event_id}, + Event, + ) + + if existing: + # Update if the incoming event is newer + incoming_created_at = event_data.get("created_at", 0) + if ( + existing.nostr_event_created_at + and incoming_created_at <= existing.nostr_event_created_at + ): + return # We already have a newer version + + existing.name = title + existing.info = description + existing.event_start_date = start + existing.event_end_date = end + existing.banner = image + existing.location = location + existing.categories = categories + existing.nostr_event_id = event_id + existing.nostr_event_created_at = incoming_created_at + await update_event(existing) + logger.info(f"[EVENTS] Updated event from Nostr: {title}") + else: + # Create new event from Nostr — discovered events are auto-approved + # (they're already public on relays). Use the d-tag as the event ID + # for replaceable-event correlation. + new_event = Event( + id=d_tag, + wallet="", + name=title, + info=description, + event_start_date=start, + event_end_date=end, + banner=image, + location=location, + categories=categories, + status="approved", + time=datetime.now(timezone.utc), + nostr_event_id=event_id, + nostr_event_created_at=event_data.get("created_at", 0), + ) + try: + await db.insert("events.events", new_event) + logger.info(f"[EVENTS] Discovered event from Nostr: {title}") + except Exception as e: + # Likely duplicate key — skip + logger.debug(f"[EVENTS] Skipped duplicate event: {e}") + + +async def wait_for_nostr_events(nostr_client: NostrClient): + """ + Background task: subscribe to NIP-52 events and process them. + """ + logger.info("[EVENTS] Starting Nostr event sync...") + + while True: + try: + # Subscribe to NIP-52 calendar events + await nostr_client.subscribe([ + {"kinds": [31922, 31923]}, + ]) + + # Process incoming events + while True: + message = await nostr_client.get_event() + await process_nostr_message(nostr_client, message) + + except ValueError: + # WebSocket closed — will reconnect + logger.warning("[EVENTS] Nostr connection lost, resubscribing...") + await asyncio.sleep(10) + except Exception as e: + logger.error(f"[EVENTS] Nostr sync error: {e}") + await asyncio.sleep(30) diff --git a/views_api.py b/views_api.py index a81967f..dc39f7b 100644 --- a/views_api.py +++ b/views_api.py @@ -60,6 +60,7 @@ from .models import ( Ticket, TicketPaymentRequest, ) +from .nostr_hooks import publish_or_delete_nostr_event from .services import refund_tickets, resend_ticket_email_notification from .tasks import deregister_payment_listener, register_payment_listener @@ -196,7 +197,12 @@ async def api_event_create( if not is_admin and not ext_settings.auto_approve: data.status = "proposed" - return await create_event(data) + event = await create_event(data) + + if event.status == "approved": + await publish_or_delete_nostr_event(event) + + return event @events_api_router.put("/{event_id}") @@ -216,7 +222,13 @@ async def api_event_update( ) for k, v in data.dict().items(): setattr(event, k, v) - return await update_event(event) + event = await update_event(event) + + # Re-publish the replaceable NIP-52 event if we already announced it. + if event.status == "approved" and event.nostr_event_id: + await publish_or_delete_nostr_event(event) + + return event @events_api_router.put("/{event_id}/cancel") @@ -234,6 +246,10 @@ async def api_event_cancel( event.canceled = True event = await update_event(event) await refund_tickets(event.id) + + if event.nostr_event_id: + await publish_or_delete_nostr_event(event, delete=True) + return event @@ -248,6 +264,10 @@ async def api_form_delete( ) if event.wallet != wallet.wallet.id: raise HTTPException(status_code=HTTPStatus.FORBIDDEN, detail="Not your event.") + + if event.nostr_event_id: + await publish_or_delete_nostr_event(event, delete=True) + await delete_event(event_id) await delete_event_tickets(event_id) @@ -268,7 +288,9 @@ async def api_event_approve( detail=f"Event is already {event.status}.", ) event.status = "approved" - return await update_event(event) + event = await update_event(event) + await publish_or_delete_nostr_event(event) + return event @events_api_router.put("/{event_id}/reject")