Compare commits

..

3 commits

Author SHA1 Message Date
ef5d2dcfcf feat: wire Nostr subscription sync into extension lifecycle
Some checks failed
lint.yml / feat: wire Nostr subscription sync into extension lifecycle (pull_request) Failing after 0s
lint.yml / feat: wire Nostr subscription sync into extension lifecycle (push) Failing after 0s
Add background task that subscribes to kind 31922/31923 events
from relays and processes them into the local database. Starts
15s after NostrClient connects (sequenced after publish client).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 18:30:00 +02:00
e937883564 feat: add NIP-52 event sync from Nostr relays
Subscribe to kind 31922/31923 events and upsert into local DB:
- New events discovered from relays are auto-approved
- Existing events are updated if incoming version is newer
- Deduplication via event ID and d-tag correlation
- Events from Nostr have empty wallet (not ticketed locally)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 18:29:14 +02:00
1bddb99132 feat: upgrade NostrClient to bidirectional (publish + subscribe)
Add receive queue, subscription management, and event deduplication
to support incoming NIP-52 calendar events from relays.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 18:28:21 +02:00
3 changed files with 240 additions and 8 deletions

View file

@ -50,14 +50,32 @@ def events_start():
from .nostr.nostr_client import NostrClient from .nostr.nostr_client import NostrClient
nostr_client = NostrClient() nostr_client = NostrClient()
logger.info("[EVENTS] Starting NostrClient for NIP-52 publishing") logger.info("[EVENTS] Starting NostrClient for NIP-52 sync")
await nostr_client.run_forever() await nostr_client.run_forever()
except Exception as e: except Exception as e:
logger.warning(f"[EVENTS] NostrClient failed to start: {e}") logger.warning(f"[EVENTS] NostrClient failed to start: {e}")
logger.info("[EVENTS] Events will work without Nostr publishing") logger.info("[EVENTS] Events will work without Nostr sync")
task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client) task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client)
scheduled_tasks.append(task2) scheduled_tasks.append(task2)
async def _sync_nostr_events():
global nostr_client
await asyncio.sleep(15) # Wait for NostrClient to connect
if not nostr_client:
logger.info("[EVENTS] No NostrClient, skipping Nostr sync")
return
try:
from .nostr_sync import wait_for_nostr_events
await wait_for_nostr_events(nostr_client)
except Exception as e:
logger.error(f"[EVENTS] Nostr sync task failed: {e}")
task3 = create_permanent_unique_task(
"ext_events_nostr_sync", _sync_nostr_events
)
scheduled_tasks.append(task3)
__all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"] __all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"]

View file

@ -1,30 +1,36 @@
""" """
Publish-only Nostr client for the events extension. Bidirectional Nostr client for the events extension.
Connects to the nostrclient extension's internal WebSocket to publish Connects to the nostrclient extension's internal WebSocket to publish
NIP-52 calendar events. No subscription/receive capabilities this and subscribe to NIP-52 calendar events. Based on nostrmarket's
is a stripped-down version of nostrmarket's NostrClient. NostrClient pattern.
""" """
import asyncio import asyncio
import json import json
from asyncio import Queue from asyncio import Queue
from collections import OrderedDict
from typing import Optional from typing import Optional
from loguru import logger from loguru import logger
from websocket import WebSocketApp from websocket import WebSocketApp
from lnbits.helpers import encrypt_internal_message from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
from lnbits.settings import settings from lnbits.settings import settings
from .event import NostrEvent from .event import NostrEvent
MAX_SEEN_EVENTS = 500
class NostrClient: class NostrClient:
def __init__(self): def __init__(self):
self.receive_event_queue: Queue = Queue()
self.send_req_queue: Queue = Queue() self.send_req_queue: Queue = Queue()
self.ws: Optional[WebSocketApp] = None self.ws: Optional[WebSocketApp] = None
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
self.running = False self.running = False
self._seen_events: OrderedDict[str, None] = OrderedDict()
@property @property
def is_websocket_connected(self): def is_websocket_connected(self):
@ -45,8 +51,10 @@ class NostrClient:
logger.info("[EVENTS] Connected to nostrclient WebSocket") logger.info("[EVENTS] Connected to nostrclient WebSocket")
def on_message(_, message): def on_message(_, message):
# Log relay responses (OK, NOTICE) but don't process try:
logger.debug(f"[EVENTS] Relay response: {message[:200]}") self.receive_event_queue.put_nowait(message)
except Exception as e:
logger.error(f"[EVENTS] Failed to queue message: {e}")
def on_error(_, error): def on_error(_, error):
logger.warning(f"[EVENTS] WebSocket error: {error}") logger.warning(f"[EVENTS] WebSocket error: {error}")
@ -55,6 +63,9 @@ class NostrClient:
logger.warning( logger.warning(
f"[EVENTS] WebSocket closed: {status_code} {message}" f"[EVENTS] WebSocket closed: {status_code} {message}"
) )
self.receive_event_queue.put_nowait(
ValueError("WebSocket closed")
)
ws = WebSocketApp( ws = WebSocketApp(
ws_url, ws_url,
@ -87,11 +98,44 @@ class NostrClient:
logger.warning(f"[EVENTS] NostrClient error: {ex}") logger.warning(f"[EVENTS] NostrClient error: {ex}")
await asyncio.sleep(60) await asyncio.sleep(60)
def is_duplicate_event(self, event_id: str) -> bool:
"""Check if an event has been seen recently."""
if event_id in self._seen_events:
return True
self._seen_events[event_id] = None
if len(self._seen_events) > MAX_SEEN_EVENTS:
self._seen_events.popitem(last=False)
return False
async def get_event(self):
"""Get next event from the receive queue."""
value = await self.receive_event_queue.get()
if isinstance(value, ValueError):
raise value
return value
async def publish_nostr_event(self, e: NostrEvent): async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()]) await self.send_req_queue.put(["EVENT", e.dict()])
async def subscribe(self, filters: list[dict]):
"""Subscribe to events matching the given filters."""
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
await self.send_req_queue.put(
["REQ", self.subscription_id] + filters
)
logger.info(
f"[EVENTS] Subscribed to NIP-52 events "
f"(sub: {self.subscription_id[:20]}...)"
)
async def unsubscribe(self):
"""Unsubscribe from current subscription."""
await self.send_req_queue.put(["CLOSE", self.subscription_id])
async def stop(self): async def stop(self):
await self.unsubscribe()
self.running = False self.running = False
await asyncio.sleep(2)
if self.ws: if self.ws:
try: try:
self.ws.close() self.ws.close()

170
nostr_sync.py Normal file
View file

@ -0,0 +1,170 @@
"""
Bidirectional Nostr sync for the events extension.
Subscribes to NIP-52 calendar events (kind 31922/31923) from relays
and upserts them into the local database. Enables federated event
discovery events published by other LNbits instances or Nostr
clients appear in the local events listing.
"""
import json
from datetime import datetime, timezone
from loguru import logger
from .crud import create_event, db, get_event, update_event
from .models import CreateEvent, Event
from .nostr.nostr_client import NostrClient
async def process_nostr_message(nostr_client: NostrClient, message: str):
"""Process an incoming Nostr relay message."""
try:
data = json.loads(message)
except json.JSONDecodeError:
return
if not isinstance(data, list) or len(data) < 2:
return
msg_type = data[0]
if msg_type == "EVENT" and len(data) >= 3:
event_data = data[2]
await _handle_calendar_event(nostr_client, event_data)
elif msg_type == "EOSE":
logger.debug("[EVENTS] End of stored events from relay")
elif msg_type == "NOTICE":
logger.info(f"[EVENTS] Relay notice: {data[1]}")
async def _handle_calendar_event(nostr_client: NostrClient, event_data: dict):
"""Handle an incoming NIP-52 calendar event (kind 31922 or 31923)."""
kind = event_data.get("kind")
if kind not in (31922, 31923):
return
event_id = event_data.get("id", "")
if nostr_client.is_duplicate_event(event_id):
return
tags = {t[0]: t[1] for t in event_data.get("tags", []) if len(t) >= 2}
tag_lists = {}
for t in event_data.get("tags", []):
if len(t) >= 2:
tag_lists.setdefault(t[0], []).append(t[1])
d_tag = tags.get("d")
if not d_tag:
return
title = tags.get("title", "Untitled Event")
start = tags.get("start")
if not start:
return
end = tags.get("end")
description = event_data.get("content", "")
image = tags.get("image")
location = tags.get("location")
categories = tag_lists.get("t", [])
# Check if we already have this event (by d-tag as our event ID
# or by nostr_event_id)
existing = await get_event(d_tag)
if not existing:
# Check by nostr_event_id
existing = await db.fetchone(
"SELECT * FROM events.events WHERE nostr_event_id = :nid",
{"nid": event_id},
Event,
)
if existing:
# Update if the incoming event is newer
incoming_created_at = event_data.get("created_at", 0)
if (
existing.nostr_event_created_at
and incoming_created_at <= existing.nostr_event_created_at
):
return # We already have a newer version
existing.name = title
existing.info = description
existing.event_start_date = start
existing.event_end_date = end
existing.banner = image
existing.location = location
existing.categories = categories
existing.nostr_event_id = event_id
existing.nostr_event_created_at = incoming_created_at
await update_event(existing)
logger.info(f"[EVENTS] Updated event from Nostr: {title}")
else:
# Create new event from Nostr
# Events discovered from Nostr are auto-approved (they're already public)
event = CreateEvent(
wallet="", # No wallet — discovered from Nostr, not ticketed locally
name=title,
info=description,
event_start_date=start,
event_end_date=end,
banner=image,
location=location,
categories=categories,
status="approved",
)
# Use the d-tag as the event ID for correlation
from lnbits.db import Database
new_event = Event(
id=d_tag,
wallet="",
name=title,
info=description,
event_start_date=start,
event_end_date=end,
banner=image,
location=location,
categories=categories,
status="approved",
time=datetime.now(timezone.utc),
nostr_event_id=event_id,
nostr_event_created_at=event_data.get("created_at", 0),
)
try:
await db.insert("events.events", new_event)
logger.info(f"[EVENTS] Discovered event from Nostr: {title}")
except Exception as e:
# Likely duplicate key — skip
logger.debug(f"[EVENTS] Skipped duplicate event: {e}")
async def wait_for_nostr_events(nostr_client: NostrClient):
"""
Background task: subscribe to NIP-52 events and process them.
"""
logger.info("[EVENTS] Starting Nostr event sync...")
while True:
try:
# Subscribe to NIP-52 calendar events
await nostr_client.subscribe([
{"kinds": [31922, 31923]},
])
# Process incoming events
while True:
message = await nostr_client.get_event()
await process_nostr_message(nostr_client, message)
except ValueError:
# WebSocket closed — will reconnect
logger.warning("[EVENTS] Nostr connection lost, resubscribing...")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"[EVENTS] Nostr sync error: {e}")
await asyncio.sleep(30)
import asyncio # noqa: E402