feat: add NIP-52 Nostr publish + sync of calendar events

Approved events are mirrored to Nostr as NIP-52 calendar events (kind
31922) signed by the wallet owner's pubkey, and incoming kind 31922/31923
events from subscribed relays are synced into the local DB so events
created on other LNbits instances or Nostr clients show up locally.

- m009 stores nostr_event_id + nostr_event_created_at on each event
  (used for replaceable updates and NIP-09 deletes); m011 adds location
  + JSON-encoded categories list (NIP-52 location/`t` tags).
- models: Event/PublicEvent/CreateEvent gain location, categories,
  nostr_event_id, nostr_event_created_at; parse_categories validator
  decodes the JSON column on read.
- nostr/{event,nostr_client}.py: Schnorr signing, websocket relay client,
  and a NostrEvent model (publish-only and subscribe variants).
- nostr_publisher.py: build/sign NIP-52 kind 31922 events and NIP-09
  delete events; publish via the relay client.
- nostr_sync.py: subscribe to kinds 31922/31923, dedupe by nostr_event_id
  / d-tag, upsert Events; auto-approves discovered Nostr events since
  they're already public.
- nostr_hooks.py: thin bridge that views_api handlers call to publish
  or delete a NIP-52 event for a given local event. Lives in its own
  module to keep `from . import nostr_client` out of the view layer
  and avoid the views_api -> publisher import cycle.
- views_api: hooks publish_or_delete_nostr_event into create-on-approved,
  update-when-already-published, cancel (delete), delete (delete), and
  approve (publish).
- __init__.py: 3-task lifespan — wait_for_paid_invoices (upstream),
  NostrClient bootstrap, and the NIP-52 sync loop. Module-level
  nostr_client global is set by the bootstrap and read dynamically by
  publish_or_delete_nostr_event so the import order works regardless of
  whether nostrclient is up at startup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-05 18:51:43 +02:00
commit 6aa280680e
9 changed files with 575 additions and 5 deletions

View file

@ -22,6 +22,11 @@ events_static_files = [
scheduled_tasks: list[asyncio.Task] = []
# Module-level NostrClient — None when nostrclient is unavailable. Set by the
# bootstrap task in events_start() and read via dynamic attribute lookup
# from nostr_hooks.publish_or_delete_nostr_event.
nostr_client = None
def events_stop():
for task in scheduled_tasks:
@ -30,12 +35,50 @@ def events_stop():
except Exception as ex:
logger.warning(ex)
global nostr_client
if nostr_client:
asyncio.get_event_loop().create_task(nostr_client.stop())
def events_start():
from lnbits.tasks import create_permanent_unique_task
task = create_permanent_unique_task("ext_events", wait_for_paid_invoices)
scheduled_tasks.append(task)
task1 = create_permanent_unique_task("ext_events", wait_for_paid_invoices)
scheduled_tasks.append(task1)
async def _start_nostr_client():
global nostr_client
await asyncio.sleep(10) # Wait for nostrclient to be ready
try:
from .nostr.nostr_client import NostrClient
nostr_client = NostrClient()
logger.info("[EVENTS] Starting NostrClient for NIP-52 sync")
await nostr_client.run_forever()
except Exception as exc:
logger.warning(f"[EVENTS] NostrClient failed to start: {exc}")
logger.info("[EVENTS] Events will work without Nostr sync")
task2 = create_permanent_unique_task("ext_events_nostr", _start_nostr_client)
scheduled_tasks.append(task2)
async def _sync_nostr_events():
global nostr_client
await asyncio.sleep(15) # Wait for NostrClient to connect
if not nostr_client:
logger.info("[EVENTS] No NostrClient, skipping Nostr sync")
return
try:
from .nostr_sync import wait_for_nostr_events
await wait_for_nostr_events(nostr_client)
except Exception as exc:
logger.error(f"[EVENTS] Nostr sync task failed: {exc}")
task3 = create_permanent_unique_task(
"ext_events_nostr_sync", _sync_nostr_events
)
scheduled_tasks.append(task3)
__all__ = ["db", "events_ext", "events_start", "events_static_files", "events_stop"]

View file

@ -1,3 +1,4 @@
import json
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field, root_validator, validator
@ -42,6 +43,8 @@ class CreateEvent(BaseModel):
amount_tickets: int = 0 # 0 = unlimited / not ticketed
price_per_ticket: float = 0 # 0 = free
banner: str | None = None
location: str | None = None # venue/address (NIP-52 'location' tag)
categories: list[str] = Field(default_factory=list) # NIP-52 't' tags
extra: EventExtra = Field(default_factory=EventExtra)
status: str = "approved" # proposed, approved, rejected
@ -63,8 +66,18 @@ class Event(BaseModel):
time: datetime
sold: int = 0
banner: str | None = None
location: str | None = None
categories: list[str] = Field(default_factory=list)
extra: EventExtra = Field(default_factory=EventExtra)
status: str = "approved"
nostr_event_id: str | None = None
nostr_event_created_at: int | None = None
@validator("categories", pre=True)
def parse_categories(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
class PublicEvent(BaseModel):
@ -80,9 +93,17 @@ class PublicEvent(BaseModel):
fiat_currency: str = "GBP"
price_per_ticket: float
banner: str | None
location: str | None = None
categories: list[str] = Field(default_factory=list)
extra: EventExtra = Field(default_factory=EventExtra)
status: str = "approved" # surfaces "proposed"/"rejected" so SFC can render banner
@validator("categories", pre=True)
def parse_categories(cls, v):
if isinstance(v, str):
return json.loads(v) if v else []
return v or []
class EventsSettings(BaseModel):
"""Extension-level settings for the events extension."""

0
nostr/__init__.py Normal file
View file

26
nostr/event.py Normal file
View file

@ -0,0 +1,26 @@
import hashlib
import json
from pydantic import BaseModel
class NostrEvent(BaseModel):
id: str = ""
pubkey: str
created_at: int
kind: int
tags: list[list[str]] = []
content: str = ""
sig: str | None = None
def serialize(self) -> list:
return [0, self.pubkey, self.created_at, self.kind, self.tags, self.content]
def serialize_json(self) -> str:
e = self.serialize()
return json.dumps(e, separators=(",", ":"), ensure_ascii=False)
@property
def event_id(self) -> str:
data = self.serialize_json()
return hashlib.sha256(data.encode()).hexdigest()

142
nostr/nostr_client.py Normal file
View file

@ -0,0 +1,142 @@
"""
Bidirectional Nostr client for the events extension.
Connects to the nostrclient extension's internal WebSocket to publish
and subscribe to NIP-52 calendar events. Based on nostrmarket's
NostrClient pattern.
"""
import asyncio
import json
from asyncio import Queue
from collections import OrderedDict
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
from lnbits.settings import settings
from loguru import logger
from websocket import WebSocketApp
from .event import NostrEvent
MAX_SEEN_EVENTS = 500
class NostrClient:
def __init__(self):
self.receive_event_queue: Queue = Queue()
self.send_req_queue: Queue = Queue()
self.ws: WebSocketApp | None = None
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
self.running = False
self._seen_events: OrderedDict[str, None] = OrderedDict()
@property
def is_websocket_connected(self):
if not self.ws:
return False
return self.ws.keep_running
async def connect(self) -> WebSocketApp:
relay_endpoint = encrypt_internal_message("relay", urlsafe=True)
ws_url = (
f"ws://localhost:{settings.port}"
f"/nostrclient/api/v1/{relay_endpoint}"
)
logger.info("[EVENTS] Connecting to nostrclient WebSocket...")
def on_open(_):
logger.info("[EVENTS] Connected to nostrclient WebSocket")
def on_message(_, message):
try:
self.receive_event_queue.put_nowait(message)
except Exception as e:
logger.error(f"[EVENTS] Failed to queue message: {e}")
def on_error(_, error):
logger.warning(f"[EVENTS] WebSocket error: {error}")
def on_close(_, status_code, message):
logger.warning(
f"[EVENTS] WebSocket closed: {status_code} {message}"
)
self.receive_event_queue.put_nowait(
ValueError("WebSocket closed")
)
ws = WebSocketApp(
ws_url,
on_message=on_message,
on_open=on_open,
on_close=on_close,
on_error=on_error,
)
from threading import Thread
wst = Thread(target=ws.run_forever)
wst.daemon = True
wst.start()
return ws
async def run_forever(self):
self.running = True
while self.running:
try:
if not self.is_websocket_connected:
self.ws = await self.connect()
await asyncio.sleep(5)
req = await self.send_req_queue.get()
assert self.ws
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(f"[EVENTS] NostrClient error: {ex}")
await asyncio.sleep(60)
def is_duplicate_event(self, event_id: str) -> bool:
"""Check if an event has been seen recently."""
if event_id in self._seen_events:
return True
self._seen_events[event_id] = None
if len(self._seen_events) > MAX_SEEN_EVENTS:
self._seen_events.popitem(last=False)
return False
async def get_event(self):
"""Get next event from the receive queue."""
value = await self.receive_event_queue.get()
if isinstance(value, ValueError):
raise value
return value
async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()])
async def subscribe(self, filters: list[dict]):
"""Subscribe to events matching the given filters."""
self.subscription_id = "events-" + urlsafe_short_hash()[:32]
await self.send_req_queue.put(
["REQ", self.subscription_id, *filters]
)
logger.info(
f"[EVENTS] Subscribed to NIP-52 events "
f"(sub: {self.subscription_id[:20]}...)"
)
async def unsubscribe(self):
"""Unsubscribe from current subscription."""
await self.send_req_queue.put(["CLOSE", self.subscription_id])
async def stop(self):
await self.unsubscribe()
self.running = False
await asyncio.sleep(2)
if self.ws:
try:
self.ws.close()
except Exception:
pass
self.ws = None

43
nostr_hooks.py Normal file
View file

@ -0,0 +1,43 @@
"""Helpers that bridge event-mutation handlers to the Nostr publisher.
Lives in its own module so both `events_api_router` and any future router
can call it without importing through `views_api`, which would create an
import cycle (views_api -> nostr_hooks -> nostr_publisher -> models).
"""
from loguru import logger
from .crud import update_event
from .models import Event
from .nostr_publisher import publish_event_to_nostr
async def publish_or_delete_nostr_event(event: Event, *, delete: bool = False) -> None:
"""Publish or delete the NIP-52 calendar event for `event`.
Pulls the wallet owner's pubkey/prvkey to sign with the user's identity.
Failures are logged and swallowed so a Nostr outage doesn't break the
HTTP flow that triggered the publish.
"""
try:
from lnbits.core.crud.users import get_account
from lnbits.core.crud.wallets import get_wallet
from . import nostr_client
wallet_obj = await get_wallet(event.wallet)
if not wallet_obj:
return
account = await get_account(wallet_obj.user)
if not account or not account.pubkey or not account.prvkey:
return
nostr_event = await publish_event_to_nostr(
nostr_client, event, account.pubkey, account.prvkey, delete=delete
)
if nostr_event and not delete:
event.nostr_event_id = nostr_event.id
event.nostr_event_created_at = nostr_event.created_at
await update_event(event)
except Exception as exc:
logger.warning(f"[EVENTS] Nostr publish failed: {exc}")

118
nostr_publisher.py Normal file
View file

@ -0,0 +1,118 @@
"""
NIP-52 calendar event publishing for the events extension.
Builds kind 31922 (date-based) calendar events from the Event model,
signs them with the event creator's Account keypair, and publishes
via the NostrClient to nostrclient relays.
Reference: https://github.com/nostr-protocol/nips/blob/master/52.md
"""
import time
import coincurve
from loguru import logger
from .models import Event
from .nostr.event import NostrEvent
def build_nip52_event(event: Event, pubkey: str) -> NostrEvent:
"""
Convert an Event model to a NIP-52 kind 31922 (date-based) calendar event.
Tags:
d - event.id (addressable identifier)
title - event.name
start - event.event_start_date (ISO date string)
end - event.event_end_date (optional)
image - event.banner (optional)
Content: event.info (description)
"""
tags = [
["d", event.id],
["title", event.name],
["start", event.event_start_date],
]
if event.event_end_date:
tags.append(["end", event.event_end_date])
if event.banner:
tags.append(["image", event.banner])
if event.location:
tags.append(["location", event.location])
for cat in (event.categories or []):
tags.append(["t", cat])
nostr_event = NostrEvent(
pubkey=pubkey,
created_at=int(time.time()),
kind=31922,
tags=tags,
content=event.info or "",
)
nostr_event.id = nostr_event.event_id
return nostr_event
def build_nip52_delete_event(event: Event, pubkey: str) -> NostrEvent:
"""
Build a kind 5 delete event for a published NIP-52 calendar event.
Uses an 'a' tag to reference the parameterized replaceable event
(kind 31922) per NIP-09.
"""
nostr_event = NostrEvent(
pubkey=pubkey,
created_at=int(time.time()),
kind=5,
tags=[
["a", f"31922:{pubkey}:{event.id}"],
],
content="Event canceled",
)
nostr_event.id = nostr_event.event_id
return nostr_event
def sign_nostr_event(nostr_event: NostrEvent, private_key_hex: str) -> None:
"""Sign a NostrEvent in-place using Schnorr signature."""
privkey = coincurve.PrivateKey(bytes.fromhex(private_key_hex))
sig = privkey.sign_schnorr(bytes.fromhex(nostr_event.id))
nostr_event.sig = sig.hex()
async def publish_event_to_nostr(
nostr_client,
event: Event,
account_pubkey: str,
account_prvkey: str,
delete: bool = False,
) -> NostrEvent | None:
"""
Build, sign, and publish a NIP-52 calendar event (or delete event).
Returns the published NostrEvent for metadata storage, or None on failure.
"""
if not nostr_client:
logger.debug("[EVENTS] No NostrClient available, skipping publish")
return None
try:
if delete:
nostr_event = build_nip52_delete_event(event, account_pubkey)
else:
nostr_event = build_nip52_event(event, account_pubkey)
sign_nostr_event(nostr_event, account_prvkey)
await nostr_client.publish_nostr_event(nostr_event)
logger.info(
f"[EVENTS] Published NIP-52 {'delete' if delete else 'calendar'} "
f"event: {nostr_event.id[:16]}... (kind {nostr_event.kind})"
)
return nostr_event
except Exception as e:
logger.warning(f"[EVENTS] Failed to publish to Nostr: {e}")
return None

155
nostr_sync.py Normal file
View file

@ -0,0 +1,155 @@
"""
Bidirectional Nostr sync for the events extension.
Subscribes to NIP-52 calendar events (kind 31922/31923) from relays
and upserts them into the local database. Enables federated event
discovery events published by other LNbits instances or Nostr
clients appear in the local events listing.
"""
import asyncio
import json
from datetime import datetime, timezone
from loguru import logger
from .crud import db, get_event, update_event
from .models import Event
from .nostr.nostr_client import NostrClient
async def process_nostr_message(nostr_client: NostrClient, message: str):
"""Process an incoming Nostr relay message."""
try:
data = json.loads(message)
except json.JSONDecodeError:
return
if not isinstance(data, list) or len(data) < 2:
return
msg_type = data[0]
if msg_type == "EVENT" and len(data) >= 3:
event_data = data[2]
await _handle_calendar_event(nostr_client, event_data)
elif msg_type == "EOSE":
logger.debug("[EVENTS] End of stored events from relay")
elif msg_type == "NOTICE":
logger.info(f"[EVENTS] Relay notice: {data[1]}")
async def _handle_calendar_event(nostr_client: NostrClient, event_data: dict):
"""Handle an incoming NIP-52 calendar event (kind 31922 or 31923)."""
kind = event_data.get("kind")
if kind not in (31922, 31923):
return
event_id = event_data.get("id", "")
if nostr_client.is_duplicate_event(event_id):
return
tags = {t[0]: t[1] for t in event_data.get("tags", []) if len(t) >= 2}
tag_lists = {}
for t in event_data.get("tags", []):
if len(t) >= 2:
tag_lists.setdefault(t[0], []).append(t[1])
d_tag = tags.get("d")
if not d_tag:
return
title = tags.get("title", "Untitled Event")
start = tags.get("start")
if not start:
return
end = tags.get("end")
description = event_data.get("content", "")
image = tags.get("image")
location = tags.get("location")
categories = tag_lists.get("t", [])
# Check if we already have this event (by d-tag as our event ID
# or by nostr_event_id)
existing = await get_event(d_tag)
if not existing:
# Check by nostr_event_id
existing = await db.fetchone(
"SELECT * FROM events.events WHERE nostr_event_id = :nid",
{"nid": event_id},
Event,
)
if existing:
# Update if the incoming event is newer
incoming_created_at = event_data.get("created_at", 0)
if (
existing.nostr_event_created_at
and incoming_created_at <= existing.nostr_event_created_at
):
return # We already have a newer version
existing.name = title
existing.info = description
existing.event_start_date = start
existing.event_end_date = end
existing.banner = image
existing.location = location
existing.categories = categories
existing.nostr_event_id = event_id
existing.nostr_event_created_at = incoming_created_at
await update_event(existing)
logger.info(f"[EVENTS] Updated event from Nostr: {title}")
else:
# Create new event from Nostr — discovered events are auto-approved
# (they're already public on relays). Use the d-tag as the event ID
# for replaceable-event correlation.
new_event = Event(
id=d_tag,
wallet="",
name=title,
info=description,
event_start_date=start,
event_end_date=end,
banner=image,
location=location,
categories=categories,
status="approved",
time=datetime.now(timezone.utc),
nostr_event_id=event_id,
nostr_event_created_at=event_data.get("created_at", 0),
)
try:
await db.insert("events.events", new_event)
logger.info(f"[EVENTS] Discovered event from Nostr: {title}")
except Exception as e:
# Likely duplicate key — skip
logger.debug(f"[EVENTS] Skipped duplicate event: {e}")
async def wait_for_nostr_events(nostr_client: NostrClient):
"""
Background task: subscribe to NIP-52 events and process them.
"""
logger.info("[EVENTS] Starting Nostr event sync...")
while True:
try:
# Subscribe to NIP-52 calendar events
await nostr_client.subscribe([
{"kinds": [31922, 31923]},
])
# Process incoming events
while True:
message = await nostr_client.get_event()
await process_nostr_message(nostr_client, message)
except ValueError:
# WebSocket closed — will reconnect
logger.warning("[EVENTS] Nostr connection lost, resubscribing...")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"[EVENTS] Nostr sync error: {e}")
await asyncio.sleep(30)

View file

@ -60,6 +60,7 @@ from .models import (
Ticket,
TicketPaymentRequest,
)
from .nostr_hooks import publish_or_delete_nostr_event
from .services import refund_tickets, resend_ticket_email_notification
from .tasks import deregister_payment_listener, register_payment_listener
@ -196,7 +197,12 @@ async def api_event_create(
if not is_admin and not ext_settings.auto_approve:
data.status = "proposed"
return await create_event(data)
event = await create_event(data)
if event.status == "approved":
await publish_or_delete_nostr_event(event)
return event
@events_api_router.put("/{event_id}")
@ -216,7 +222,13 @@ async def api_event_update(
)
for k, v in data.dict().items():
setattr(event, k, v)
return await update_event(event)
event = await update_event(event)
# Re-publish the replaceable NIP-52 event if we already announced it.
if event.status == "approved" and event.nostr_event_id:
await publish_or_delete_nostr_event(event)
return event
@events_api_router.put("/{event_id}/cancel")
@ -234,6 +246,10 @@ async def api_event_cancel(
event.canceled = True
event = await update_event(event)
await refund_tickets(event.id)
if event.nostr_event_id:
await publish_or_delete_nostr_event(event, delete=True)
return event
@ -248,6 +264,10 @@ async def api_form_delete(
)
if event.wallet != wallet.wallet.id:
raise HTTPException(status_code=HTTPStatus.FORBIDDEN, detail="Not your event.")
if event.nostr_event_id:
await publish_or_delete_nostr_event(event, delete=True)
await delete_event(event_id)
await delete_event_tickets(event_id)
@ -268,7 +288,9 @@ async def api_event_approve(
detail=f"Event is already {event.status}.",
)
event.status = "approved"
return await update_event(event)
event = await update_event(event)
await publish_or_delete_nostr_event(event)
return event
@events_api_router.put("/{event_id}/reject")