Compare commits
No commits in common. "ef5d2dcfcfb321fd32d1b276e61a543bfc1a2b91" and "4d91426e8283edf7ef76b4e1c36958c9dedbfea4" have entirely different histories.
ef5d2dcfcf
...
4d91426e82
3 changed files with 8 additions and 240 deletions
22
__init__.py
22
__init__.py
|
|
@ -50,32 +50,14 @@ def events_start():
|
||||||
from .nostr.nostr_client import NostrClient
|
from .nostr.nostr_client import NostrClient
|
||||||
|
|
||||||
nostr_client = NostrClient()
|
nostr_client = NostrClient()
|
||||||
logger.info("[EVENTS] Starting NostrClient for NIP-52 sync")
|
logger.info("[EVENTS] Starting NostrClient for NIP-52 publishing")
|
||||||
await nostr_client.run_forever()
|
await nostr_client.run_forever()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"[EVENTS] NostrClient failed to start: {e}")
|
logger.warning(f"[EVENTS] NostrClient failed to start: {e}")
|
||||||
logger.info("[EVENTS] Events will work without Nostr sync")
|
logger.info("[EVENTS] Events will work without Nostr publishing")
|
||||||
|
|
||||||
task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client)
|
task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client)
|
||||||
scheduled_tasks.append(task2)
|
scheduled_tasks.append(task2)
|
||||||
|
|
||||||
async def _sync_nostr_events():
|
|
||||||
global nostr_client
|
|
||||||
await asyncio.sleep(15) # Wait for NostrClient to connect
|
|
||||||
if not nostr_client:
|
|
||||||
logger.info("[EVENTS] No NostrClient, skipping Nostr sync")
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
from .nostr_sync import wait_for_nostr_events
|
|
||||||
|
|
||||||
await wait_for_nostr_events(nostr_client)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"[EVENTS] Nostr sync task failed: {e}")
|
|
||||||
|
|
||||||
task3 = create_permanent_unique_task(
|
|
||||||
"ext_events_nostr_sync", _sync_nostr_events
|
|
||||||
)
|
|
||||||
scheduled_tasks.append(task3)
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"]
|
__all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"]
|
||||||
|
|
|
||||||
|
|
@ -1,36 +1,30 @@
|
||||||
"""
|
"""
|
||||||
Bidirectional Nostr client for the events extension.
|
Publish-only Nostr client for the events extension.
|
||||||
|
|
||||||
Connects to the nostrclient extension's internal WebSocket to publish
|
Connects to the nostrclient extension's internal WebSocket to publish
|
||||||
and subscribe to NIP-52 calendar events. Based on nostrmarket's
|
NIP-52 calendar events. No subscription/receive capabilities — this
|
||||||
NostrClient pattern.
|
is a stripped-down version of nostrmarket's NostrClient.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
from asyncio import Queue
|
from asyncio import Queue
|
||||||
from collections import OrderedDict
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from websocket import WebSocketApp
|
from websocket import WebSocketApp
|
||||||
|
|
||||||
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
|
from lnbits.helpers import encrypt_internal_message
|
||||||
from lnbits.settings import settings
|
from lnbits.settings import settings
|
||||||
|
|
||||||
from .event import NostrEvent
|
from .event import NostrEvent
|
||||||
|
|
||||||
MAX_SEEN_EVENTS = 500
|
|
||||||
|
|
||||||
|
|
||||||
class NostrClient:
|
class NostrClient:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.receive_event_queue: Queue = Queue()
|
|
||||||
self.send_req_queue: Queue = Queue()
|
self.send_req_queue: Queue = Queue()
|
||||||
self.ws: Optional[WebSocketApp] = None
|
self.ws: Optional[WebSocketApp] = None
|
||||||
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
|
|
||||||
self.running = False
|
self.running = False
|
||||||
self._seen_events: OrderedDict[str, None] = OrderedDict()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_websocket_connected(self):
|
def is_websocket_connected(self):
|
||||||
|
|
@ -51,10 +45,8 @@ class NostrClient:
|
||||||
logger.info("[EVENTS] Connected to nostrclient WebSocket")
|
logger.info("[EVENTS] Connected to nostrclient WebSocket")
|
||||||
|
|
||||||
def on_message(_, message):
|
def on_message(_, message):
|
||||||
try:
|
# Log relay responses (OK, NOTICE) but don't process
|
||||||
self.receive_event_queue.put_nowait(message)
|
logger.debug(f"[EVENTS] Relay response: {message[:200]}")
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"[EVENTS] Failed to queue message: {e}")
|
|
||||||
|
|
||||||
def on_error(_, error):
|
def on_error(_, error):
|
||||||
logger.warning(f"[EVENTS] WebSocket error: {error}")
|
logger.warning(f"[EVENTS] WebSocket error: {error}")
|
||||||
|
|
@ -63,9 +55,6 @@ class NostrClient:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[EVENTS] WebSocket closed: {status_code} {message}"
|
f"[EVENTS] WebSocket closed: {status_code} {message}"
|
||||||
)
|
)
|
||||||
self.receive_event_queue.put_nowait(
|
|
||||||
ValueError("WebSocket closed")
|
|
||||||
)
|
|
||||||
|
|
||||||
ws = WebSocketApp(
|
ws = WebSocketApp(
|
||||||
ws_url,
|
ws_url,
|
||||||
|
|
@ -98,44 +87,11 @@ class NostrClient:
|
||||||
logger.warning(f"[EVENTS] NostrClient error: {ex}")
|
logger.warning(f"[EVENTS] NostrClient error: {ex}")
|
||||||
await asyncio.sleep(60)
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
def is_duplicate_event(self, event_id: str) -> bool:
|
|
||||||
"""Check if an event has been seen recently."""
|
|
||||||
if event_id in self._seen_events:
|
|
||||||
return True
|
|
||||||
self._seen_events[event_id] = None
|
|
||||||
if len(self._seen_events) > MAX_SEEN_EVENTS:
|
|
||||||
self._seen_events.popitem(last=False)
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def get_event(self):
|
|
||||||
"""Get next event from the receive queue."""
|
|
||||||
value = await self.receive_event_queue.get()
|
|
||||||
if isinstance(value, ValueError):
|
|
||||||
raise value
|
|
||||||
return value
|
|
||||||
|
|
||||||
async def publish_nostr_event(self, e: NostrEvent):
|
async def publish_nostr_event(self, e: NostrEvent):
|
||||||
await self.send_req_queue.put(["EVENT", e.dict()])
|
await self.send_req_queue.put(["EVENT", e.dict()])
|
||||||
|
|
||||||
async def subscribe(self, filters: list[dict]):
|
|
||||||
"""Subscribe to events matching the given filters."""
|
|
||||||
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
|
|
||||||
await self.send_req_queue.put(
|
|
||||||
["REQ", self.subscription_id] + filters
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
f"[EVENTS] Subscribed to NIP-52 events "
|
|
||||||
f"(sub: {self.subscription_id[:20]}...)"
|
|
||||||
)
|
|
||||||
|
|
||||||
async def unsubscribe(self):
|
|
||||||
"""Unsubscribe from current subscription."""
|
|
||||||
await self.send_req_queue.put(["CLOSE", self.subscription_id])
|
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
await self.unsubscribe()
|
|
||||||
self.running = False
|
self.running = False
|
||||||
await asyncio.sleep(2)
|
|
||||||
if self.ws:
|
if self.ws:
|
||||||
try:
|
try:
|
||||||
self.ws.close()
|
self.ws.close()
|
||||||
|
|
|
||||||
170
nostr_sync.py
170
nostr_sync.py
|
|
@ -1,170 +0,0 @@
|
||||||
"""
|
|
||||||
Bidirectional Nostr sync for the events extension.
|
|
||||||
|
|
||||||
Subscribes to NIP-52 calendar events (kind 31922/31923) from relays
|
|
||||||
and upserts them into the local database. Enables federated event
|
|
||||||
discovery — events published by other LNbits instances or Nostr
|
|
||||||
clients appear in the local events listing.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import json
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
|
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
from .crud import create_event, db, get_event, update_event
|
|
||||||
from .models import CreateEvent, Event
|
|
||||||
from .nostr.nostr_client import NostrClient
|
|
||||||
|
|
||||||
|
|
||||||
async def process_nostr_message(nostr_client: NostrClient, message: str):
|
|
||||||
"""Process an incoming Nostr relay message."""
|
|
||||||
try:
|
|
||||||
data = json.loads(message)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not isinstance(data, list) or len(data) < 2:
|
|
||||||
return
|
|
||||||
|
|
||||||
msg_type = data[0]
|
|
||||||
|
|
||||||
if msg_type == "EVENT" and len(data) >= 3:
|
|
||||||
event_data = data[2]
|
|
||||||
await _handle_calendar_event(nostr_client, event_data)
|
|
||||||
elif msg_type == "EOSE":
|
|
||||||
logger.debug("[EVENTS] End of stored events from relay")
|
|
||||||
elif msg_type == "NOTICE":
|
|
||||||
logger.info(f"[EVENTS] Relay notice: {data[1]}")
|
|
||||||
|
|
||||||
|
|
||||||
async def _handle_calendar_event(nostr_client: NostrClient, event_data: dict):
|
|
||||||
"""Handle an incoming NIP-52 calendar event (kind 31922 or 31923)."""
|
|
||||||
kind = event_data.get("kind")
|
|
||||||
if kind not in (31922, 31923):
|
|
||||||
return
|
|
||||||
|
|
||||||
event_id = event_data.get("id", "")
|
|
||||||
if nostr_client.is_duplicate_event(event_id):
|
|
||||||
return
|
|
||||||
|
|
||||||
tags = {t[0]: t[1] for t in event_data.get("tags", []) if len(t) >= 2}
|
|
||||||
tag_lists = {}
|
|
||||||
for t in event_data.get("tags", []):
|
|
||||||
if len(t) >= 2:
|
|
||||||
tag_lists.setdefault(t[0], []).append(t[1])
|
|
||||||
|
|
||||||
d_tag = tags.get("d")
|
|
||||||
if not d_tag:
|
|
||||||
return
|
|
||||||
|
|
||||||
title = tags.get("title", "Untitled Event")
|
|
||||||
start = tags.get("start")
|
|
||||||
if not start:
|
|
||||||
return
|
|
||||||
|
|
||||||
end = tags.get("end")
|
|
||||||
description = event_data.get("content", "")
|
|
||||||
image = tags.get("image")
|
|
||||||
location = tags.get("location")
|
|
||||||
categories = tag_lists.get("t", [])
|
|
||||||
|
|
||||||
# Check if we already have this event (by d-tag as our event ID
|
|
||||||
# or by nostr_event_id)
|
|
||||||
existing = await get_event(d_tag)
|
|
||||||
if not existing:
|
|
||||||
# Check by nostr_event_id
|
|
||||||
existing = await db.fetchone(
|
|
||||||
"SELECT * FROM events.events WHERE nostr_event_id = :nid",
|
|
||||||
{"nid": event_id},
|
|
||||||
Event,
|
|
||||||
)
|
|
||||||
|
|
||||||
if existing:
|
|
||||||
# Update if the incoming event is newer
|
|
||||||
incoming_created_at = event_data.get("created_at", 0)
|
|
||||||
if (
|
|
||||||
existing.nostr_event_created_at
|
|
||||||
and incoming_created_at <= existing.nostr_event_created_at
|
|
||||||
):
|
|
||||||
return # We already have a newer version
|
|
||||||
|
|
||||||
existing.name = title
|
|
||||||
existing.info = description
|
|
||||||
existing.event_start_date = start
|
|
||||||
existing.event_end_date = end
|
|
||||||
existing.banner = image
|
|
||||||
existing.location = location
|
|
||||||
existing.categories = categories
|
|
||||||
existing.nostr_event_id = event_id
|
|
||||||
existing.nostr_event_created_at = incoming_created_at
|
|
||||||
await update_event(existing)
|
|
||||||
logger.info(f"[EVENTS] Updated event from Nostr: {title}")
|
|
||||||
else:
|
|
||||||
# Create new event from Nostr
|
|
||||||
# Events discovered from Nostr are auto-approved (they're already public)
|
|
||||||
event = CreateEvent(
|
|
||||||
wallet="", # No wallet — discovered from Nostr, not ticketed locally
|
|
||||||
name=title,
|
|
||||||
info=description,
|
|
||||||
event_start_date=start,
|
|
||||||
event_end_date=end,
|
|
||||||
banner=image,
|
|
||||||
location=location,
|
|
||||||
categories=categories,
|
|
||||||
status="approved",
|
|
||||||
)
|
|
||||||
# Use the d-tag as the event ID for correlation
|
|
||||||
from lnbits.db import Database
|
|
||||||
|
|
||||||
new_event = Event(
|
|
||||||
id=d_tag,
|
|
||||||
wallet="",
|
|
||||||
name=title,
|
|
||||||
info=description,
|
|
||||||
event_start_date=start,
|
|
||||||
event_end_date=end,
|
|
||||||
banner=image,
|
|
||||||
location=location,
|
|
||||||
categories=categories,
|
|
||||||
status="approved",
|
|
||||||
time=datetime.now(timezone.utc),
|
|
||||||
nostr_event_id=event_id,
|
|
||||||
nostr_event_created_at=event_data.get("created_at", 0),
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
await db.insert("events.events", new_event)
|
|
||||||
logger.info(f"[EVENTS] Discovered event from Nostr: {title}")
|
|
||||||
except Exception as e:
|
|
||||||
# Likely duplicate key — skip
|
|
||||||
logger.debug(f"[EVENTS] Skipped duplicate event: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
async def wait_for_nostr_events(nostr_client: NostrClient):
|
|
||||||
"""
|
|
||||||
Background task: subscribe to NIP-52 events and process them.
|
|
||||||
"""
|
|
||||||
logger.info("[EVENTS] Starting Nostr event sync...")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Subscribe to NIP-52 calendar events
|
|
||||||
await nostr_client.subscribe([
|
|
||||||
{"kinds": [31922, 31923]},
|
|
||||||
])
|
|
||||||
|
|
||||||
# Process incoming events
|
|
||||||
while True:
|
|
||||||
message = await nostr_client.get_event()
|
|
||||||
await process_nostr_message(nostr_client, message)
|
|
||||||
|
|
||||||
except ValueError:
|
|
||||||
# WebSocket closed — will reconnect
|
|
||||||
logger.warning("[EVENTS] Nostr connection lost, resubscribing...")
|
|
||||||
await asyncio.sleep(10)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"[EVENTS] Nostr sync error: {e}")
|
|
||||||
await asyncio.sleep(30)
|
|
||||||
|
|
||||||
|
|
||||||
import asyncio # noqa: E402
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue