Compare commits

..

No commits in common. "ef5d2dcfcfb321fd32d1b276e61a543bfc1a2b91" and "4d91426e8283edf7ef76b4e1c36958c9dedbfea4" have entirely different histories.

3 changed files with 8 additions and 240 deletions

View file

@ -50,32 +50,14 @@ def events_start():
from .nostr.nostr_client import NostrClient from .nostr.nostr_client import NostrClient
nostr_client = NostrClient() nostr_client = NostrClient()
logger.info("[EVENTS] Starting NostrClient for NIP-52 sync") logger.info("[EVENTS] Starting NostrClient for NIP-52 publishing")
await nostr_client.run_forever() await nostr_client.run_forever()
except Exception as e: except Exception as e:
logger.warning(f"[EVENTS] NostrClient failed to start: {e}") logger.warning(f"[EVENTS] NostrClient failed to start: {e}")
logger.info("[EVENTS] Events will work without Nostr sync") logger.info("[EVENTS] Events will work without Nostr publishing")
task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client) task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client)
scheduled_tasks.append(task2) scheduled_tasks.append(task2)
async def _sync_nostr_events():
global nostr_client
await asyncio.sleep(15) # Wait for NostrClient to connect
if not nostr_client:
logger.info("[EVENTS] No NostrClient, skipping Nostr sync")
return
try:
from .nostr_sync import wait_for_nostr_events
await wait_for_nostr_events(nostr_client)
except Exception as e:
logger.error(f"[EVENTS] Nostr sync task failed: {e}")
task3 = create_permanent_unique_task(
"ext_events_nostr_sync", _sync_nostr_events
)
scheduled_tasks.append(task3)
__all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"] __all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"]

View file

@ -1,36 +1,30 @@
""" """
Bidirectional Nostr client for the events extension. Publish-only Nostr client for the events extension.
Connects to the nostrclient extension's internal WebSocket to publish Connects to the nostrclient extension's internal WebSocket to publish
and subscribe to NIP-52 calendar events. Based on nostrmarket's NIP-52 calendar events. No subscription/receive capabilities this
NostrClient pattern. is a stripped-down version of nostrmarket's NostrClient.
""" """
import asyncio import asyncio
import json import json
from asyncio import Queue from asyncio import Queue
from collections import OrderedDict
from typing import Optional from typing import Optional
from loguru import logger from loguru import logger
from websocket import WebSocketApp from websocket import WebSocketApp
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash from lnbits.helpers import encrypt_internal_message
from lnbits.settings import settings from lnbits.settings import settings
from .event import NostrEvent from .event import NostrEvent
MAX_SEEN_EVENTS = 500
class NostrClient: class NostrClient:
def __init__(self): def __init__(self):
self.receive_event_queue: Queue = Queue()
self.send_req_queue: Queue = Queue() self.send_req_queue: Queue = Queue()
self.ws: Optional[WebSocketApp] = None self.ws: Optional[WebSocketApp] = None
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
self.running = False self.running = False
self._seen_events: OrderedDict[str, None] = OrderedDict()
@property @property
def is_websocket_connected(self): def is_websocket_connected(self):
@ -51,10 +45,8 @@ class NostrClient:
logger.info("[EVENTS] Connected to nostrclient WebSocket") logger.info("[EVENTS] Connected to nostrclient WebSocket")
def on_message(_, message): def on_message(_, message):
try: # Log relay responses (OK, NOTICE) but don't process
self.receive_event_queue.put_nowait(message) logger.debug(f"[EVENTS] Relay response: {message[:200]}")
except Exception as e:
logger.error(f"[EVENTS] Failed to queue message: {e}")
def on_error(_, error): def on_error(_, error):
logger.warning(f"[EVENTS] WebSocket error: {error}") logger.warning(f"[EVENTS] WebSocket error: {error}")
@ -63,9 +55,6 @@ class NostrClient:
logger.warning( logger.warning(
f"[EVENTS] WebSocket closed: {status_code} {message}" f"[EVENTS] WebSocket closed: {status_code} {message}"
) )
self.receive_event_queue.put_nowait(
ValueError("WebSocket closed")
)
ws = WebSocketApp( ws = WebSocketApp(
ws_url, ws_url,
@ -98,44 +87,11 @@ class NostrClient:
logger.warning(f"[EVENTS] NostrClient error: {ex}") logger.warning(f"[EVENTS] NostrClient error: {ex}")
await asyncio.sleep(60) await asyncio.sleep(60)
def is_duplicate_event(self, event_id: str) -> bool:
"""Check if an event has been seen recently."""
if event_id in self._seen_events:
return True
self._seen_events[event_id] = None
if len(self._seen_events) > MAX_SEEN_EVENTS:
self._seen_events.popitem(last=False)
return False
async def get_event(self):
"""Get next event from the receive queue."""
value = await self.receive_event_queue.get()
if isinstance(value, ValueError):
raise value
return value
async def publish_nostr_event(self, e: NostrEvent): async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()]) await self.send_req_queue.put(["EVENT", e.dict()])
async def subscribe(self, filters: list[dict]):
"""Subscribe to events matching the given filters."""
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
await self.send_req_queue.put(
["REQ", self.subscription_id] + filters
)
logger.info(
f"[EVENTS] Subscribed to NIP-52 events "
f"(sub: {self.subscription_id[:20]}...)"
)
async def unsubscribe(self):
"""Unsubscribe from current subscription."""
await self.send_req_queue.put(["CLOSE", self.subscription_id])
async def stop(self): async def stop(self):
await self.unsubscribe()
self.running = False self.running = False
await asyncio.sleep(2)
if self.ws: if self.ws:
try: try:
self.ws.close() self.ws.close()

View file

@ -1,170 +0,0 @@
"""
Bidirectional Nostr sync for the events extension.
Subscribes to NIP-52 calendar events (kind 31922/31923) from relays
and upserts them into the local database. Enables federated event
discovery events published by other LNbits instances or Nostr
clients appear in the local events listing.
"""
import json
from datetime import datetime, timezone
from loguru import logger
from .crud import create_event, db, get_event, update_event
from .models import CreateEvent, Event
from .nostr.nostr_client import NostrClient
async def process_nostr_message(nostr_client: NostrClient, message: str):
"""Process an incoming Nostr relay message."""
try:
data = json.loads(message)
except json.JSONDecodeError:
return
if not isinstance(data, list) or len(data) < 2:
return
msg_type = data[0]
if msg_type == "EVENT" and len(data) >= 3:
event_data = data[2]
await _handle_calendar_event(nostr_client, event_data)
elif msg_type == "EOSE":
logger.debug("[EVENTS] End of stored events from relay")
elif msg_type == "NOTICE":
logger.info(f"[EVENTS] Relay notice: {data[1]}")
async def _handle_calendar_event(nostr_client: NostrClient, event_data: dict):
"""Handle an incoming NIP-52 calendar event (kind 31922 or 31923)."""
kind = event_data.get("kind")
if kind not in (31922, 31923):
return
event_id = event_data.get("id", "")
if nostr_client.is_duplicate_event(event_id):
return
tags = {t[0]: t[1] for t in event_data.get("tags", []) if len(t) >= 2}
tag_lists = {}
for t in event_data.get("tags", []):
if len(t) >= 2:
tag_lists.setdefault(t[0], []).append(t[1])
d_tag = tags.get("d")
if not d_tag:
return
title = tags.get("title", "Untitled Event")
start = tags.get("start")
if not start:
return
end = tags.get("end")
description = event_data.get("content", "")
image = tags.get("image")
location = tags.get("location")
categories = tag_lists.get("t", [])
# Check if we already have this event (by d-tag as our event ID
# or by nostr_event_id)
existing = await get_event(d_tag)
if not existing:
# Check by nostr_event_id
existing = await db.fetchone(
"SELECT * FROM events.events WHERE nostr_event_id = :nid",
{"nid": event_id},
Event,
)
if existing:
# Update if the incoming event is newer
incoming_created_at = event_data.get("created_at", 0)
if (
existing.nostr_event_created_at
and incoming_created_at <= existing.nostr_event_created_at
):
return # We already have a newer version
existing.name = title
existing.info = description
existing.event_start_date = start
existing.event_end_date = end
existing.banner = image
existing.location = location
existing.categories = categories
existing.nostr_event_id = event_id
existing.nostr_event_created_at = incoming_created_at
await update_event(existing)
logger.info(f"[EVENTS] Updated event from Nostr: {title}")
else:
# Create new event from Nostr
# Events discovered from Nostr are auto-approved (they're already public)
event = CreateEvent(
wallet="", # No wallet — discovered from Nostr, not ticketed locally
name=title,
info=description,
event_start_date=start,
event_end_date=end,
banner=image,
location=location,
categories=categories,
status="approved",
)
# Use the d-tag as the event ID for correlation
from lnbits.db import Database
new_event = Event(
id=d_tag,
wallet="",
name=title,
info=description,
event_start_date=start,
event_end_date=end,
banner=image,
location=location,
categories=categories,
status="approved",
time=datetime.now(timezone.utc),
nostr_event_id=event_id,
nostr_event_created_at=event_data.get("created_at", 0),
)
try:
await db.insert("events.events", new_event)
logger.info(f"[EVENTS] Discovered event from Nostr: {title}")
except Exception as e:
# Likely duplicate key — skip
logger.debug(f"[EVENTS] Skipped duplicate event: {e}")
async def wait_for_nostr_events(nostr_client: NostrClient):
"""
Background task: subscribe to NIP-52 events and process them.
"""
logger.info("[EVENTS] Starting Nostr event sync...")
while True:
try:
# Subscribe to NIP-52 calendar events
await nostr_client.subscribe([
{"kinds": [31922, 31923]},
])
# Process incoming events
while True:
message = await nostr_client.get_event()
await process_nostr_message(nostr_client, message)
except ValueError:
# WebSocket closed — will reconnect
logger.warning("[EVENTS] Nostr connection lost, resubscribing...")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"[EVENTS] Nostr sync error: {e}")
await asyncio.sleep(30)
import asyncio # noqa: E402