From 1bddb99132b1bfec9d858a30f7d1dd86227c0171 Mon Sep 17 00:00:00 2001 From: Padreug Date: Mon, 27 Apr 2026 18:28:21 +0200 Subject: [PATCH] feat: upgrade NostrClient to bidirectional (publish + subscribe) Add receive queue, subscription management, and event deduplication to support incoming NIP-52 calendar events from relays. Co-Authored-By: Claude Opus 4.6 (1M context) --- nostr/nostr_client.py | 56 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/nostr/nostr_client.py b/nostr/nostr_client.py index 09f57d4..f8f63de 100644 --- a/nostr/nostr_client.py +++ b/nostr/nostr_client.py @@ -1,30 +1,36 @@ """ -Publish-only Nostr client for the events extension. +Bidirectional Nostr client for the events extension. Connects to the nostrclient extension's internal WebSocket to publish -NIP-52 calendar events. No subscription/receive capabilities — this -is a stripped-down version of nostrmarket's NostrClient. +and subscribe to NIP-52 calendar events. Based on nostrmarket's +NostrClient pattern. """ import asyncio import json from asyncio import Queue +from collections import OrderedDict from typing import Optional from loguru import logger from websocket import WebSocketApp -from lnbits.helpers import encrypt_internal_message +from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash from lnbits.settings import settings from .event import NostrEvent +MAX_SEEN_EVENTS = 500 + class NostrClient: def __init__(self): + self.receive_event_queue: Queue = Queue() self.send_req_queue: Queue = Queue() self.ws: Optional[WebSocketApp] = None + self.subscription_id = "events-" + urlsafe_short_hash()[:32] self.running = False + self._seen_events: OrderedDict[str, None] = OrderedDict() @property def is_websocket_connected(self): @@ -45,8 +51,10 @@ class NostrClient: logger.info("[EVENTS] Connected to nostrclient WebSocket") def on_message(_, message): - # Log relay responses (OK, NOTICE) but don't process - logger.debug(f"[EVENTS] Relay response: {message[:200]}") + try: + self.receive_event_queue.put_nowait(message) + except Exception as e: + logger.error(f"[EVENTS] Failed to queue message: {e}") def on_error(_, error): logger.warning(f"[EVENTS] WebSocket error: {error}") @@ -55,6 +63,9 @@ class NostrClient: logger.warning( f"[EVENTS] WebSocket closed: {status_code} {message}" ) + self.receive_event_queue.put_nowait( + ValueError("WebSocket closed") + ) ws = WebSocketApp( ws_url, @@ -87,11 +98,44 @@ class NostrClient: logger.warning(f"[EVENTS] NostrClient error: {ex}") await asyncio.sleep(60) + def is_duplicate_event(self, event_id: str) -> bool: + """Check if an event has been seen recently.""" + if event_id in self._seen_events: + return True + self._seen_events[event_id] = None + if len(self._seen_events) > MAX_SEEN_EVENTS: + self._seen_events.popitem(last=False) + return False + + async def get_event(self): + """Get next event from the receive queue.""" + value = await self.receive_event_queue.get() + if isinstance(value, ValueError): + raise value + return value + async def publish_nostr_event(self, e: NostrEvent): await self.send_req_queue.put(["EVENT", e.dict()]) + async def subscribe(self, filters: list[dict]): + """Subscribe to events matching the given filters.""" + self.subscription_id = "events-" + urlsafe_short_hash()[:32] + await self.send_req_queue.put( + ["REQ", self.subscription_id] + filters + ) + logger.info( + f"[EVENTS] Subscribed to NIP-52 events " + f"(sub: {self.subscription_id[:20]}...)" + ) + + async def unsubscribe(self): + """Unsubscribe from current subscription.""" + await self.send_req_queue.put(["CLOSE", self.subscription_id]) + async def stop(self): + await self.unsubscribe() self.running = False + await asyncio.sleep(2) if self.ws: try: self.ws.close()