Compare commits
No commits in common. "fix/queue-outgoing-events" and "main" have entirely different histories.
fix/queue-
...
main
3 changed files with 2 additions and 128 deletions
59
CLAUDE.md
59
CLAUDE.md
|
|
@ -1,59 +0,0 @@
|
||||||
# CLAUDE.md
|
|
||||||
|
|
||||||
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
|
|
||||||
|
|
||||||
## Project Overview
|
|
||||||
|
|
||||||
Nostrclient is an LNbits extension that acts as an always-on Nostr relay multiplexer. Multiple Nostr clients connect to a single WebSocket endpoint, which fans out requests to multiple configured Nostr relays and aggregates/deduplicates responses. It rewrites subscription IDs per-client to prevent conflicts.
|
|
||||||
|
|
||||||
## Build & Development Commands
|
|
||||||
|
|
||||||
All commands use `uv` as the Python package manager:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
make format # Format: prettier + black + ruff --fix
|
|
||||||
make check # All checks: mypy, pyright, black --check, ruff check, prettier --check
|
|
||||||
make test # Run pytest (DEBUG=true PYTHONUNBUFFERED=1 uv run pytest)
|
|
||||||
make mypy # Type check (excludes nostr/ directory)
|
|
||||||
make pre-commit # Run pre-commit hooks on all files
|
|
||||||
```
|
|
||||||
|
|
||||||
Individual checks: `make checkblack`, `make checkruff`, `make checkprettier`.
|
|
||||||
|
|
||||||
CI runs lint then pytest with `LNBITS_BACKEND_WALLET_CLASS=FakeWallet`.
|
|
||||||
|
|
||||||
## Architecture
|
|
||||||
|
|
||||||
**Request flow:** Nostr Clients → WebSocket → NostrRouter → RelayManager → Nostr Relays
|
|
||||||
|
|
||||||
Key components:
|
|
||||||
|
|
||||||
- **NostrRouter** (`router.py`) — One per client WebSocket connection. Rewrites subscription IDs (original → hashed → original) to isolate clients. Two async tasks: `_client_to_nostr` (forward requests) and `_nostr_to_client` (deliver aggregated responses).
|
|
||||||
|
|
||||||
- **NostrClient** (`nostr/client/client.py`) — Singleton orchestrator. Owns the RelayManager. Polls MessagePool and dispatches events via callbacks to routers.
|
|
||||||
|
|
||||||
- **RelayManager** (`nostr/relay_manager.py`) — Manages connections to multiple relays. Caches subscriptions so new relays receive existing subscriptions. Runs health checks via `check_and_restart_relays()`.
|
|
||||||
|
|
||||||
- **Relay** (`nostr/relay.py`) — Individual relay WebSocket connection with retry/backoff, ping latency tracking, and error counting.
|
|
||||||
|
|
||||||
- **MessagePool** (`nostr/message_pool.py`) — Thread-safe event aggregation with deduplication by event ID across all relays.
|
|
||||||
|
|
||||||
**Hybrid threading model:** Relay connections use threads (via `RelayManager.open_connections()`); client communication uses asyncio. The bridge is in `tasks.py` where `subscribe_events()` runs in a thread executor.
|
|
||||||
|
|
||||||
**Lifecycle** (`__init__.py`): `nostrclient_start()` spawns three background tasks (init relays, subscribe events, check relays). `nostrclient_stop()` cancels tasks, stops routers, closes the client.
|
|
||||||
|
|
||||||
## API Endpoints (views_api.py)
|
|
||||||
|
|
||||||
- REST endpoints under `/api/v1/` for relay CRUD and config (admin-authenticated)
|
|
||||||
- WebSocket endpoints: `/api/v1/{ws_id}` (private, encrypted ID) and `/api/v1/relay` (public, if enabled)
|
|
||||||
|
|
||||||
## Database
|
|
||||||
|
|
||||||
Three migrations in `migrations.py`: relays table, config table (JSON `extra` field), config owner scoping. CRUD in `crud.py` uses LNbits database abstraction.
|
|
||||||
|
|
||||||
## Code Quality Notes
|
|
||||||
|
|
||||||
- **mypy excludes `nostr/*`** — this is a custom Nostr protocol implementation, not a third-party package
|
|
||||||
- **Ruff rules:** F, E, W, I, A, C, N, UP, RUF, B
|
|
||||||
- **Frontend:** Vue.js + Quasar via LNbits base templates (`templates/nostrclient/index.html`)
|
|
||||||
- Pub key helpers in `helpers.py` normalize between hex and bech32 (npub1) formats
|
|
||||||
|
|
@ -1,9 +1,6 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
from collections.abc import Callable
|
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
@ -31,7 +28,6 @@ class Relay:
|
||||||
self.num_subscriptions: int = 0
|
self.num_subscriptions: int = 0
|
||||||
|
|
||||||
self.queue: Queue = Queue()
|
self.queue: Queue = Queue()
|
||||||
self.on_connect: Callable[[Relay], None] | None = None
|
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
self.ws = WebSocketApp(
|
self.ws = WebSocketApp(
|
||||||
|
|
@ -101,11 +97,6 @@ class Relay:
|
||||||
logger.info(f"[Relay: {self.url}] Connected.")
|
logger.info(f"[Relay: {self.url}] Connected.")
|
||||||
self.connected = True
|
self.connected = True
|
||||||
self.shutdown = False
|
self.shutdown = False
|
||||||
if self.on_connect:
|
|
||||||
try:
|
|
||||||
self.on_connect(self)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"[Relay: {self.url}] on_connect callback error: {e}")
|
|
||||||
|
|
||||||
def _on_close(self, _, status_code, message):
|
def _on_close(self, _, status_code, message):
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from collections import deque
|
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
@ -11,8 +9,6 @@ from .message_pool import MessagePool, NoticeMessage
|
||||||
from .relay import Relay
|
from .relay import Relay
|
||||||
from .subscription import Subscription
|
from .subscription import Subscription
|
||||||
|
|
||||||
PENDING_EVENTS_MAX = 100
|
|
||||||
|
|
||||||
|
|
||||||
class RelayManager:
|
class RelayManager:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
|
|
@ -22,8 +18,6 @@ class RelayManager:
|
||||||
self.message_pool = MessagePool()
|
self.message_pool = MessagePool()
|
||||||
self._cached_subscriptions: dict[str, Subscription] = {}
|
self._cached_subscriptions: dict[str, Subscription] = {}
|
||||||
self._subscriptions_lock = threading.Lock()
|
self._subscriptions_lock = threading.Lock()
|
||||||
self._pending_events: deque[str] = deque(maxlen=PENDING_EVENTS_MAX)
|
|
||||||
self._pending_events_lock = threading.Lock()
|
|
||||||
|
|
||||||
def add_relay(self, url: str) -> Relay:
|
def add_relay(self, url: str) -> Relay:
|
||||||
if url in list(self.relays.keys()):
|
if url in list(self.relays.keys()):
|
||||||
|
|
@ -31,7 +25,6 @@ class RelayManager:
|
||||||
return self.relays[url]
|
return self.relays[url]
|
||||||
|
|
||||||
relay = Relay(url, self.message_pool)
|
relay = Relay(url, self.message_pool)
|
||||||
relay.on_connect = self._on_relay_connect
|
|
||||||
self.relays[url] = relay
|
self.relays[url] = relay
|
||||||
|
|
||||||
self._open_connection(relay)
|
self._open_connection(relay)
|
||||||
|
|
@ -107,19 +100,8 @@ class RelayManager:
|
||||||
relay.close()
|
relay.close()
|
||||||
|
|
||||||
def publish_message(self, message: str):
|
def publish_message(self, message: str):
|
||||||
connected_relays = [
|
for relay in self.relays.values():
|
||||||
r for r in self.relays.values() if r.connected and not r.shutdown
|
relay.publish(message)
|
||||||
]
|
|
||||||
if connected_relays:
|
|
||||||
for relay in self.relays.values():
|
|
||||||
relay.publish(message)
|
|
||||||
else:
|
|
||||||
with self._pending_events_lock:
|
|
||||||
self._pending_events.append(message)
|
|
||||||
logger.warning(
|
|
||||||
f"No connected relays. Queued outgoing event "
|
|
||||||
f"({len(self._pending_events)}/{PENDING_EVENTS_MAX})."
|
|
||||||
)
|
|
||||||
|
|
||||||
def handle_notice(self, notice: NoticeMessage):
|
def handle_notice(self, notice: NoticeMessage):
|
||||||
relay = next((r for r in self.relays.values() if r.url == notice.url))
|
relay = next((r for r in self.relays.values() if r.url == notice.url))
|
||||||
|
|
@ -155,47 +137,7 @@ class RelayManager:
|
||||||
|
|
||||||
logger.info(f"Restarting connection to relay '{relay.url}'")
|
logger.info(f"Restarting connection to relay '{relay.url}'")
|
||||||
|
|
||||||
self._drain_relay_queue(relay)
|
|
||||||
self.remove_relay(relay.url)
|
self.remove_relay(relay.url)
|
||||||
new_relay = self.add_relay(relay.url)
|
new_relay = self.add_relay(relay.url)
|
||||||
new_relay.error_counter = relay.error_counter
|
new_relay.error_counter = relay.error_counter
|
||||||
new_relay.error_list = relay.error_list
|
new_relay.error_list = relay.error_list
|
||||||
|
|
||||||
def _drain_relay_queue(self, relay: Relay):
|
|
||||||
"""Move pending EVENT messages from a dead relay's queue to the
|
|
||||||
manager's pending queue so they can be resent on reconnection."""
|
|
||||||
drained = 0
|
|
||||||
while not relay.queue.empty():
|
|
||||||
try:
|
|
||||||
message = relay.queue.get_nowait()
|
|
||||||
data = json.loads(message)
|
|
||||||
if data[0] == "EVENT":
|
|
||||||
with self._pending_events_lock:
|
|
||||||
self._pending_events.append(message)
|
|
||||||
drained += 1
|
|
||||||
except Exception:
|
|
||||||
break
|
|
||||||
if drained:
|
|
||||||
logger.info(f"Drained {drained} pending event(s) from relay '{relay.url}'.")
|
|
||||||
|
|
||||||
def _on_relay_connect(self, _relay: Relay):
|
|
||||||
self._flush_pending_events()
|
|
||||||
|
|
||||||
def _flush_pending_events(self):
|
|
||||||
with self._pending_events_lock:
|
|
||||||
if not self._pending_events:
|
|
||||||
return
|
|
||||||
connected_relays = [
|
|
||||||
r for r in self.relays.values() if r.connected and not r.shutdown
|
|
||||||
]
|
|
||||||
if not connected_relays:
|
|
||||||
return
|
|
||||||
count = len(self._pending_events)
|
|
||||||
while self._pending_events:
|
|
||||||
message = self._pending_events.popleft()
|
|
||||||
for relay in connected_relays:
|
|
||||||
relay.publish(message)
|
|
||||||
logger.info(
|
|
||||||
f"Flushed {count} pending event(s) to "
|
|
||||||
f"{len(connected_relays)} relay(s)."
|
|
||||||
)
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue