feat: event proposal and approval workflow #9
1 changed files with 50 additions and 6 deletions
feat: upgrade NostrClient to bidirectional (publish + subscribe)
Add receive queue, subscription management, and event deduplication to support incoming NIP-52 calendar events from relays. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
commit
1bddb99132
|
|
@ -1,30 +1,36 @@
|
||||||
"""
|
"""
|
||||||
Publish-only Nostr client for the events extension.
|
Bidirectional Nostr client for the events extension.
|
||||||
|
|
||||||
Connects to the nostrclient extension's internal WebSocket to publish
|
Connects to the nostrclient extension's internal WebSocket to publish
|
||||||
NIP-52 calendar events. No subscription/receive capabilities — this
|
and subscribe to NIP-52 calendar events. Based on nostrmarket's
|
||||||
is a stripped-down version of nostrmarket's NostrClient.
|
NostrClient pattern.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
from asyncio import Queue
|
from asyncio import Queue
|
||||||
|
from collections import OrderedDict
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from websocket import WebSocketApp
|
from websocket import WebSocketApp
|
||||||
|
|
||||||
from lnbits.helpers import encrypt_internal_message
|
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
|
||||||
from lnbits.settings import settings
|
from lnbits.settings import settings
|
||||||
|
|
||||||
from .event import NostrEvent
|
from .event import NostrEvent
|
||||||
|
|
||||||
|
MAX_SEEN_EVENTS = 500
|
||||||
|
|
||||||
|
|
||||||
class NostrClient:
|
class NostrClient:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
self.receive_event_queue: Queue = Queue()
|
||||||
self.send_req_queue: Queue = Queue()
|
self.send_req_queue: Queue = Queue()
|
||||||
self.ws: Optional[WebSocketApp] = None
|
self.ws: Optional[WebSocketApp] = None
|
||||||
|
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
|
||||||
self.running = False
|
self.running = False
|
||||||
|
self._seen_events: OrderedDict[str, None] = OrderedDict()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_websocket_connected(self):
|
def is_websocket_connected(self):
|
||||||
|
|
@ -45,8 +51,10 @@ class NostrClient:
|
||||||
logger.info("[EVENTS] Connected to nostrclient WebSocket")
|
logger.info("[EVENTS] Connected to nostrclient WebSocket")
|
||||||
|
|
||||||
def on_message(_, message):
|
def on_message(_, message):
|
||||||
# Log relay responses (OK, NOTICE) but don't process
|
try:
|
||||||
logger.debug(f"[EVENTS] Relay response: {message[:200]}")
|
self.receive_event_queue.put_nowait(message)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[EVENTS] Failed to queue message: {e}")
|
||||||
|
|
||||||
def on_error(_, error):
|
def on_error(_, error):
|
||||||
logger.warning(f"[EVENTS] WebSocket error: {error}")
|
logger.warning(f"[EVENTS] WebSocket error: {error}")
|
||||||
|
|
@ -55,6 +63,9 @@ class NostrClient:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[EVENTS] WebSocket closed: {status_code} {message}"
|
f"[EVENTS] WebSocket closed: {status_code} {message}"
|
||||||
)
|
)
|
||||||
|
self.receive_event_queue.put_nowait(
|
||||||
|
ValueError("WebSocket closed")
|
||||||
|
)
|
||||||
|
|
||||||
ws = WebSocketApp(
|
ws = WebSocketApp(
|
||||||
ws_url,
|
ws_url,
|
||||||
|
|
@ -87,11 +98,44 @@ class NostrClient:
|
||||||
logger.warning(f"[EVENTS] NostrClient error: {ex}")
|
logger.warning(f"[EVENTS] NostrClient error: {ex}")
|
||||||
await asyncio.sleep(60)
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
|
def is_duplicate_event(self, event_id: str) -> bool:
|
||||||
|
"""Check if an event has been seen recently."""
|
||||||
|
if event_id in self._seen_events:
|
||||||
|
return True
|
||||||
|
self._seen_events[event_id] = None
|
||||||
|
if len(self._seen_events) > MAX_SEEN_EVENTS:
|
||||||
|
self._seen_events.popitem(last=False)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def get_event(self):
|
||||||
|
"""Get next event from the receive queue."""
|
||||||
|
value = await self.receive_event_queue.get()
|
||||||
|
if isinstance(value, ValueError):
|
||||||
|
raise value
|
||||||
|
return value
|
||||||
|
|
||||||
async def publish_nostr_event(self, e: NostrEvent):
|
async def publish_nostr_event(self, e: NostrEvent):
|
||||||
await self.send_req_queue.put(["EVENT", e.dict()])
|
await self.send_req_queue.put(["EVENT", e.dict()])
|
||||||
|
|
||||||
|
async def subscribe(self, filters: list[dict]):
|
||||||
|
"""Subscribe to events matching the given filters."""
|
||||||
|
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
|
||||||
|
await self.send_req_queue.put(
|
||||||
|
["REQ", self.subscription_id] + filters
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"[EVENTS] Subscribed to NIP-52 events "
|
||||||
|
f"(sub: {self.subscription_id[:20]}...)"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def unsubscribe(self):
|
||||||
|
"""Unsubscribe from current subscription."""
|
||||||
|
await self.send_req_queue.put(["CLOSE", self.subscription_id])
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
await self.unsubscribe()
|
||||||
self.running = False
|
self.running = False
|
||||||
|
await asyncio.sleep(2)
|
||||||
if self.ws:
|
if self.ws:
|
||||||
try:
|
try:
|
||||||
self.ws.close()
|
self.ws.close()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue