diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..981d3f0 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,59 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Nostrclient is an LNbits extension that acts as an always-on Nostr relay multiplexer. Multiple Nostr clients connect to a single WebSocket endpoint, which fans out requests to multiple configured Nostr relays and aggregates/deduplicates responses. It rewrites subscription IDs per-client to prevent conflicts. + +## Build & Development Commands + +All commands use `uv` as the Python package manager: + +```bash +make format # Format: prettier + black + ruff --fix +make check # All checks: mypy, pyright, black --check, ruff check, prettier --check +make test # Run pytest (DEBUG=true PYTHONUNBUFFERED=1 uv run pytest) +make mypy # Type check (excludes nostr/ directory) +make pre-commit # Run pre-commit hooks on all files +``` + +Individual checks: `make checkblack`, `make checkruff`, `make checkprettier`. + +CI runs lint then pytest with `LNBITS_BACKEND_WALLET_CLASS=FakeWallet`. + +## Architecture + +**Request flow:** Nostr Clients → WebSocket → NostrRouter → RelayManager → Nostr Relays + +Key components: + +- **NostrRouter** (`router.py`) — One per client WebSocket connection. Rewrites subscription IDs (original → hashed → original) to isolate clients. Two async tasks: `_client_to_nostr` (forward requests) and `_nostr_to_client` (deliver aggregated responses). + +- **NostrClient** (`nostr/client/client.py`) — Singleton orchestrator. Owns the RelayManager. Polls MessagePool and dispatches events via callbacks to routers. + +- **RelayManager** (`nostr/relay_manager.py`) — Manages connections to multiple relays. Caches subscriptions so new relays receive existing subscriptions. Runs health checks via `check_and_restart_relays()`. + +- **Relay** (`nostr/relay.py`) — Individual relay WebSocket connection with retry/backoff, ping latency tracking, and error counting. + +- **MessagePool** (`nostr/message_pool.py`) — Thread-safe event aggregation with deduplication by event ID across all relays. + +**Hybrid threading model:** Relay connections use threads (via `RelayManager.open_connections()`); client communication uses asyncio. The bridge is in `tasks.py` where `subscribe_events()` runs in a thread executor. + +**Lifecycle** (`__init__.py`): `nostrclient_start()` spawns three background tasks (init relays, subscribe events, check relays). `nostrclient_stop()` cancels tasks, stops routers, closes the client. + +## API Endpoints (views_api.py) + +- REST endpoints under `/api/v1/` for relay CRUD and config (admin-authenticated) +- WebSocket endpoints: `/api/v1/{ws_id}` (private, encrypted ID) and `/api/v1/relay` (public, if enabled) + +## Database + +Three migrations in `migrations.py`: relays table, config table (JSON `extra` field), config owner scoping. CRUD in `crud.py` uses LNbits database abstraction. + +## Code Quality Notes + +- **mypy excludes `nostr/*`** — this is a custom Nostr protocol implementation, not a third-party package +- **Ruff rules:** F, E, W, I, A, C, N, UP, RUF, B +- **Frontend:** Vue.js + Quasar via LNbits base templates (`templates/nostrclient/index.html`) +- Pub key helpers in `helpers.py` normalize between hex and bech32 (npub1) formats diff --git a/nostr/relay.py b/nostr/relay.py index d762963..bac1be7 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import asyncio import json import time +from collections.abc import Callable from queue import Queue from loguru import logger @@ -28,6 +31,7 @@ class Relay: self.num_subscriptions: int = 0 self.queue: Queue = Queue() + self.on_connect: Callable[[Relay], None] | None = None def connect(self): self.ws = WebSocketApp( @@ -97,6 +101,11 @@ class Relay: logger.info(f"[Relay: {self.url}] Connected.") self.connected = True self.shutdown = False + if self.on_connect: + try: + self.on_connect(self) + except Exception as e: + logger.warning(f"[Relay: {self.url}] on_connect callback error: {e}") def _on_close(self, _, status_code, message): logger.warning( diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index 2aa27c5..2b90216 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -1,6 +1,8 @@ import asyncio +import json import threading import time +from collections import deque from typing import List from loguru import logger @@ -9,6 +11,8 @@ from .message_pool import MessagePool, NoticeMessage from .relay import Relay from .subscription import Subscription +PENDING_EVENTS_MAX = 100 + class RelayManager: def __init__(self) -> None: @@ -18,6 +22,8 @@ class RelayManager: self.message_pool = MessagePool() self._cached_subscriptions: dict[str, Subscription] = {} self._subscriptions_lock = threading.Lock() + self._pending_events: deque[str] = deque(maxlen=PENDING_EVENTS_MAX) + self._pending_events_lock = threading.Lock() def add_relay(self, url: str) -> Relay: if url in list(self.relays.keys()): @@ -25,6 +31,7 @@ class RelayManager: return self.relays[url] relay = Relay(url, self.message_pool) + relay.on_connect = self._on_relay_connect self.relays[url] = relay self._open_connection(relay) @@ -100,8 +107,19 @@ class RelayManager: relay.close() def publish_message(self, message: str): - for relay in self.relays.values(): - relay.publish(message) + connected_relays = [ + r for r in self.relays.values() if r.connected and not r.shutdown + ] + if connected_relays: + for relay in self.relays.values(): + relay.publish(message) + else: + with self._pending_events_lock: + self._pending_events.append(message) + logger.warning( + f"No connected relays. Queued outgoing event " + f"({len(self._pending_events)}/{PENDING_EVENTS_MAX})." + ) def handle_notice(self, notice: NoticeMessage): relay = next((r for r in self.relays.values() if r.url == notice.url)) @@ -137,7 +155,47 @@ class RelayManager: logger.info(f"Restarting connection to relay '{relay.url}'") + self._drain_relay_queue(relay) self.remove_relay(relay.url) new_relay = self.add_relay(relay.url) new_relay.error_counter = relay.error_counter new_relay.error_list = relay.error_list + + def _drain_relay_queue(self, relay: Relay): + """Move pending EVENT messages from a dead relay's queue to the + manager's pending queue so they can be resent on reconnection.""" + drained = 0 + while not relay.queue.empty(): + try: + message = relay.queue.get_nowait() + data = json.loads(message) + if data[0] == "EVENT": + with self._pending_events_lock: + self._pending_events.append(message) + drained += 1 + except Exception: + break + if drained: + logger.info(f"Drained {drained} pending event(s) from relay '{relay.url}'.") + + def _on_relay_connect(self, _relay: Relay): + self._flush_pending_events() + + def _flush_pending_events(self): + with self._pending_events_lock: + if not self._pending_events: + return + connected_relays = [ + r for r in self.relays.values() if r.connected and not r.shutdown + ] + if not connected_relays: + return + count = len(self._pending_events) + while self._pending_events: + message = self._pending_events.popleft() + for relay in connected_relays: + relay.publish(message) + logger.info( + f"Flushed {count} pending event(s) to " + f"{len(connected_relays)} relay(s)." + )