Compare commits

..

No commits in common. "fix/queue-outgoing-events" and "main" have entirely different histories.

3 changed files with 2 additions and 128 deletions

View file

@ -1,59 +0,0 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Nostrclient is an LNbits extension that acts as an always-on Nostr relay multiplexer. Multiple Nostr clients connect to a single WebSocket endpoint, which fans out requests to multiple configured Nostr relays and aggregates/deduplicates responses. It rewrites subscription IDs per-client to prevent conflicts.
## Build & Development Commands
All commands use `uv` as the Python package manager:
```bash
make format # Format: prettier + black + ruff --fix
make check # All checks: mypy, pyright, black --check, ruff check, prettier --check
make test # Run pytest (DEBUG=true PYTHONUNBUFFERED=1 uv run pytest)
make mypy # Type check (excludes nostr/ directory)
make pre-commit # Run pre-commit hooks on all files
```
Individual checks: `make checkblack`, `make checkruff`, `make checkprettier`.
CI runs lint then pytest with `LNBITS_BACKEND_WALLET_CLASS=FakeWallet`.
## Architecture
**Request flow:** Nostr Clients → WebSocket → NostrRouter → RelayManager → Nostr Relays
Key components:
- **NostrRouter** (`router.py`) — One per client WebSocket connection. Rewrites subscription IDs (original → hashed → original) to isolate clients. Two async tasks: `_client_to_nostr` (forward requests) and `_nostr_to_client` (deliver aggregated responses).
- **NostrClient** (`nostr/client/client.py`) — Singleton orchestrator. Owns the RelayManager. Polls MessagePool and dispatches events via callbacks to routers.
- **RelayManager** (`nostr/relay_manager.py`) — Manages connections to multiple relays. Caches subscriptions so new relays receive existing subscriptions. Runs health checks via `check_and_restart_relays()`.
- **Relay** (`nostr/relay.py`) — Individual relay WebSocket connection with retry/backoff, ping latency tracking, and error counting.
- **MessagePool** (`nostr/message_pool.py`) — Thread-safe event aggregation with deduplication by event ID across all relays.
**Hybrid threading model:** Relay connections use threads (via `RelayManager.open_connections()`); client communication uses asyncio. The bridge is in `tasks.py` where `subscribe_events()` runs in a thread executor.
**Lifecycle** (`__init__.py`): `nostrclient_start()` spawns three background tasks (init relays, subscribe events, check relays). `nostrclient_stop()` cancels tasks, stops routers, closes the client.
## API Endpoints (views_api.py)
- REST endpoints under `/api/v1/` for relay CRUD and config (admin-authenticated)
- WebSocket endpoints: `/api/v1/{ws_id}` (private, encrypted ID) and `/api/v1/relay` (public, if enabled)
## Database
Three migrations in `migrations.py`: relays table, config table (JSON `extra` field), config owner scoping. CRUD in `crud.py` uses LNbits database abstraction.
## Code Quality Notes
- **mypy excludes `nostr/*`** — this is a custom Nostr protocol implementation, not a third-party package
- **Ruff rules:** F, E, W, I, A, C, N, UP, RUF, B
- **Frontend:** Vue.js + Quasar via LNbits base templates (`templates/nostrclient/index.html`)
- Pub key helpers in `helpers.py` normalize between hex and bech32 (npub1) formats

View file

@ -1,9 +1,6 @@
from __future__ import annotations
import asyncio import asyncio
import json import json
import time import time
from collections.abc import Callable
from queue import Queue from queue import Queue
from loguru import logger from loguru import logger
@ -31,7 +28,6 @@ class Relay:
self.num_subscriptions: int = 0 self.num_subscriptions: int = 0
self.queue: Queue = Queue() self.queue: Queue = Queue()
self.on_connect: Callable[[Relay], None] | None = None
def connect(self): def connect(self):
self.ws = WebSocketApp( self.ws = WebSocketApp(
@ -101,11 +97,6 @@ class Relay:
logger.info(f"[Relay: {self.url}] Connected.") logger.info(f"[Relay: {self.url}] Connected.")
self.connected = True self.connected = True
self.shutdown = False self.shutdown = False
if self.on_connect:
try:
self.on_connect(self)
except Exception as e:
logger.warning(f"[Relay: {self.url}] on_connect callback error: {e}")
def _on_close(self, _, status_code, message): def _on_close(self, _, status_code, message):
logger.warning( logger.warning(

View file

@ -1,8 +1,6 @@
import asyncio import asyncio
import json
import threading import threading
import time import time
from collections import deque
from typing import List from typing import List
from loguru import logger from loguru import logger
@ -11,8 +9,6 @@ from .message_pool import MessagePool, NoticeMessage
from .relay import Relay from .relay import Relay
from .subscription import Subscription from .subscription import Subscription
PENDING_EVENTS_MAX = 100
class RelayManager: class RelayManager:
def __init__(self) -> None: def __init__(self) -> None:
@ -22,8 +18,6 @@ class RelayManager:
self.message_pool = MessagePool() self.message_pool = MessagePool()
self._cached_subscriptions: dict[str, Subscription] = {} self._cached_subscriptions: dict[str, Subscription] = {}
self._subscriptions_lock = threading.Lock() self._subscriptions_lock = threading.Lock()
self._pending_events: deque[str] = deque(maxlen=PENDING_EVENTS_MAX)
self._pending_events_lock = threading.Lock()
def add_relay(self, url: str) -> Relay: def add_relay(self, url: str) -> Relay:
if url in list(self.relays.keys()): if url in list(self.relays.keys()):
@ -31,7 +25,6 @@ class RelayManager:
return self.relays[url] return self.relays[url]
relay = Relay(url, self.message_pool) relay = Relay(url, self.message_pool)
relay.on_connect = self._on_relay_connect
self.relays[url] = relay self.relays[url] = relay
self._open_connection(relay) self._open_connection(relay)
@ -107,19 +100,8 @@ class RelayManager:
relay.close() relay.close()
def publish_message(self, message: str): def publish_message(self, message: str):
connected_relays = [ for relay in self.relays.values():
r for r in self.relays.values() if r.connected and not r.shutdown relay.publish(message)
]
if connected_relays:
for relay in self.relays.values():
relay.publish(message)
else:
with self._pending_events_lock:
self._pending_events.append(message)
logger.warning(
f"No connected relays. Queued outgoing event "
f"({len(self._pending_events)}/{PENDING_EVENTS_MAX})."
)
def handle_notice(self, notice: NoticeMessage): def handle_notice(self, notice: NoticeMessage):
relay = next((r for r in self.relays.values() if r.url == notice.url)) relay = next((r for r in self.relays.values() if r.url == notice.url))
@ -155,47 +137,7 @@ class RelayManager:
logger.info(f"Restarting connection to relay '{relay.url}'") logger.info(f"Restarting connection to relay '{relay.url}'")
self._drain_relay_queue(relay)
self.remove_relay(relay.url) self.remove_relay(relay.url)
new_relay = self.add_relay(relay.url) new_relay = self.add_relay(relay.url)
new_relay.error_counter = relay.error_counter new_relay.error_counter = relay.error_counter
new_relay.error_list = relay.error_list new_relay.error_list = relay.error_list
def _drain_relay_queue(self, relay: Relay):
"""Move pending EVENT messages from a dead relay's queue to the
manager's pending queue so they can be resent on reconnection."""
drained = 0
while not relay.queue.empty():
try:
message = relay.queue.get_nowait()
data = json.loads(message)
if data[0] == "EVENT":
with self._pending_events_lock:
self._pending_events.append(message)
drained += 1
except Exception:
break
if drained:
logger.info(f"Drained {drained} pending event(s) from relay '{relay.url}'.")
def _on_relay_connect(self, _relay: Relay):
self._flush_pending_events()
def _flush_pending_events(self):
with self._pending_events_lock:
if not self._pending_events:
return
connected_relays = [
r for r in self.relays.values() if r.connected and not r.shutdown
]
if not connected_relays:
return
count = len(self._pending_events)
while self._pending_events:
message = self._pending_events.popleft()
for relay in connected_relays:
relay.publish(message)
logger.info(
f"Flushed {count} pending event(s) to "
f"{len(connected_relays)} relay(s)."
)