diff --git a/package.json b/package.json index 07fa987..12a64b6 100644 --- a/package.json +++ b/package.json @@ -22,8 +22,9 @@ "build": "tsup src/index.ts; tsup src/daemon/index.ts -d dist/daemon; tsup src/client.ts -d dist/client", "build:client": "tsup src/client.ts -d dist/client", "test": "TS_NODE_TRANSPILE_ONLY=1 node -r ts-node/register --test tests/lifecycle.test.ts", + "test:relay": "TS_NODE_TRANSPILE_ONLY=1 node -r ts-node/register --test tests/relay-pool.test.ts", "test:integration": "DATABASE_URL=\"file:./tests/.tmp/acl-int.db\" node -r ./tests/register-ts.cjs --test tests/acl.integration.test.ts", - "test:all": "npm run test && npm run test:integration", + "test:all": "npm run test && npm run test:relay && npm run test:integration", "prisma:generate": "npx prisma generate", "prisma:migrate": "npx prisma migrate deploy", "prisma:create": "npx prisma db push --preview-feature", diff --git a/src/daemon/lib/relay-pool.ts b/src/daemon/lib/relay-pool.ts new file mode 100644 index 0000000..c3b476f --- /dev/null +++ b/src/daemon/lib/relay-pool.ts @@ -0,0 +1,393 @@ +import { Relay } from "nostr-tools"; +import type { Event, Filter } from "nostr-tools"; + +// nostr-tools needs a WebSocket implementation injected under Node (no global +// WebSocket on the deploy target, Node 20). `useWebSocketImplementation` lives +// in the `nostr-tools/relay` subpath export, which tsc's classic +// `moduleResolution: node` can't resolve — but the build target is CommonJS and +// Node honours the package exports map at runtime, so require it. The same goes +// for `ws` (no @types/ws in the tree). `useWebSocketImplementation` sets +// nostr-tools' internal WS binding at call time, so import-order / global-capture +// don't apply. +// eslint-disable-next-line @typescript-eslint/no-var-requires +const WebSocket = require("ws"); +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { useWebSocketImplementation } = require("nostr-tools/relay") as { + useWebSocketImplementation: (ws: unknown) => void; +}; +useWebSocketImplementation(WebSocket); + +/** + * RelayPool — the daemon's relay/transport layer (aiolabs/nsecbunkerd#42). + * + * Replaces the NDK-based transport whose subscriptions did not survive a relay + * reconnect (#41): NDK registers `relay.once("ready", execute)` — fires once on + * the initial connect, never re-arms — so after a flap the socket reconnects but + * the kind:24133 REQ is never re-sent, and the bunker goes silently deaf. We + * chased that through #4 → #7 → #20 → #21 without closing it, because it is + * structural in NDK. + * + * The fix, modelled on lightning.pub's `RelayConnection` and signet's + * `relay-pool` (both nostr-tools, both bind resubscribe to reconnect): **we own + * the connect loop**, and every (re)connect re-subscribes the entire registry of + * active subscriptions. Subscription liveness can no longer drift from socket + * liveness because the two are established together, atomically, on every cycle. + * + * We deliberately disable nostr-tools' own `enableReconnect`: its resubscribe + * behaviour is version-fragile right now (the auto-resubscribe regressed in + * 2.23.0 `fb7de7f` and the fix `455124e` is unreleased as of 2026-06-26), so we + * make the resubscribe OUR code instead of depending on which nostr-tools + * version is installed. See #42 for the version analysis. + */ + +export interface PoolSubscription { + id: string; + filters: Filter[]; + onevent: (event: Event) => void; + /** Fires on EOSE — note it fires again on every reconnect's resubscribe, so + * callers that want only the FIRST EOSE (e.g. the #9 start-race guard) must + * latch it themselves. */ + oneose?: () => void; +} + +interface ActiveSub { + close: () => void; +} + +const RECONNECT_BASE_MS = 1_000; +const RECONNECT_CAP_MS = 10_000; // match #20's cap — cheap to retry a LAN relay + +/** + * A single relay connection that owns its (re)connect loop. On every successful + * connect it re-subscribes the shared registry; when the socket closes the loop + * reconnects and re-subscribes again. This is the unit that makes + * connected-but-deaf impossible. + */ +class ManagedRelay { + private relay: Relay | null = null; + private active: Map = new Map(); + private stopped = false; + /** Interrupts a pending reconnect backoff so stop() takes effect at once. */ + private wake: (() => void) | null = null; + + public connected = false; + public lastConnectedAt = 0; + public lastDisconnectedAt = 0; + + constructor( + public readonly url: string, + private readonly registry: Map, + private readonly log: (...args: any[]) => void, + ) {} + + start(): void { + void this.connectLoop(); + } + + stop(): void { + this.stopped = true; + this.wake?.(); // break any pending reconnect backoff so we exit promptly + this.closeAllSubs(); + try { + this.relay?.close(); + } catch { + /* ignore */ + } + this.relay = null; + this.connected = false; + } + + /** Reconnect backoff wait that (a) unrefs so a stopped daemon isn't held + * open by a pending timer, and (b) can be woken early by stop(). */ + private backoff(ms: number): Promise { + return new Promise((resolve) => { + const t = setTimeout(() => { + this.wake = null; + resolve(); + }, ms); + t.unref?.(); + this.wake = () => { + clearTimeout(t); + this.wake = null; + resolve(); + }; + }); + } + + /** Subscribe one entry on the live connection (no-op if not connected — it + * will be picked up by the next resubscribeAll on connect). */ + subscribeOne(id: string, s: PoolSubscription): void { + if (!this.relay || !this.connected) return; + // Replace any existing handle for this id (idempotent). + this.active.get(id)?.close(); + try { + const sub = this.relay.subscribe(s.filters, { + onevent: (e: Event) => s.onevent(e), + oneose: () => s.oneose?.(), + }); + this.active.set(id, sub); + } catch (e) { + this.log("subscribe failed", id, e); + } + } + + closeSub(id: string): void { + const sub = this.active.get(id); + if (sub) { + try { + sub.close(); + } catch { + /* ignore */ + } + this.active.delete(id); + } + } + + async publish(event: Event): Promise { + if (!this.relay || !this.connected) { + throw new Error(`relay not connected: ${this.url}`); + } + await this.relay.publish(event); + } + + /** Number of registry entries currently active on the wire. The watchdog + * uses this to detect a connected-but-deaf relay (active < registry). */ + activeCount(): number { + return this.active.size; + } + + private async connectLoop(): Promise { + let failures = 0; + while (!this.stopped) { + const wasReal = await this.connectOnce(); + if (this.stopped) break; + // A real connection that later dropped resets the backoff; a failed + // connect attempt grows it (capped). Either way we keep trying — a + // bunker disconnected is strictly worse than retry pressure on a LAN + // relay (#20's rationale). + failures = wasReal ? 0 : failures + 1; + const delay = Math.min(RECONNECT_BASE_MS * 2 ** failures, RECONNECT_CAP_MS); + await this.backoff(delay); + } + } + + /** + * Open the socket, re-subscribe the whole registry, and resolve when the + * socket closes (or immediately if the open failed). Returns true if we had + * a real connection (so the caller resets backoff), false if the open failed. + */ + private connectOnce(): Promise { + return new Promise((resolve) => { + void (async () => { + let relay: Relay; + try { + // enableReconnect:false — WE own reconnect, not nostr-tools. + relay = await Relay.connect(this.url, { enableReconnect: false }); + } catch (e: any) { + this.log("connect failed:", e?.message ?? e); + resolve(false); + return; + } + + this.relay = relay; + this.connected = true; + this.lastConnectedAt = Date.now(); + this.log(`connected; (re)subscribing ${this.registry.size} sub(s)`); + this.resubscribeAll(); + + relay.onclose = () => { + this.connected = false; + this.lastDisconnectedAt = Date.now(); + this.closeAllSubs(); + if (this.relay) { + this.relay.onclose = null; + this.relay = null; + } + this.log("disconnected"); + resolve(true); + }; + })(); + }); + } + + /** Re-establish every registered subscription on the current connection. + * This is the line that fixes #41. */ + private resubscribeAll(): void { + this.closeAllSubs(); + for (const [id, s] of this.registry) { + this.subscribeOne(id, s); + } + } + + private closeAllSubs(): void { + for (const sub of this.active.values()) { + try { + sub.close(); + } catch { + /* ignore */ + } + } + this.active.clear(); + } + + /** Force a clean reconnect (close → loop reconnects → resubscribes). Used by + * the pool's heartbeat after a detected time-jump (sleep/wake), when the + * socket may look alive but be stale. */ + forceReconnect(): void { + if (this.relay) { + try { + this.relay.close(); + } catch { + /* the onclose handler drives the reconnect */ + } + } + } +} + +export interface RelayPoolOptions { + log?: (...args: any[]) => void; + /** Heartbeat interval for sleep/wake detection (signet pattern). 0 disables. */ + heartbeatMs?: number; + /** If a heartbeat tick is later than interval + this slack, treat it as a + * process suspend (sleep/VM-pause) and force-reconnect all relays. */ + sleepSlackMs?: number; +} + +/** + * A pool of ManagedRelay connections sharing one subscription registry. Mirrors + * the surface the daemon needs from the old NDK instance: subscribe / publish / + * connect-status — plus a `healthy()` signal the watchdog can trust. + */ +export class RelayPool { + private readonly registry: Map = new Map(); + private readonly relays: ManagedRelay[]; + private readonly log: (...args: any[]) => void; + private counter = 0; + private heartbeatTimer: ReturnType | undefined; + private lastHeartbeat = 0; + + constructor( + public readonly relayUrls: string[], + private readonly opts: RelayPoolOptions = {}, + ) { + this.log = opts.log ?? (() => {}); + this.relays = relayUrls.map( + (url) => + new ManagedRelay(url, this.registry, (...a: any[]) => + this.log(`[relay:${url}]`, ...a), + ), + ); + } + + /** Start every relay's connect loop + (optionally) the sleep/wake heartbeat. */ + start(): void { + for (const r of this.relays) r.start(); + const interval = this.opts.heartbeatMs ?? 0; + if (interval > 0) { + this.lastHeartbeat = Date.now(); + this.heartbeatTimer = setInterval(() => this.runHeartbeat(), interval); + } + } + + stop(): void { + if (this.heartbeatTimer) clearInterval(this.heartbeatTimer); + this.heartbeatTimer = undefined; + for (const r of this.relays) r.stop(); + } + + /** + * Register a subscription and establish it on all connected relays. The + * registry entry persists across reconnects (each ManagedRelay re-subscribes + * it on every connect), which is the whole point. Returns a handle whose + * `close()` removes it from the registry so it stops being replayed. + */ + subscribe( + filters: Filter[], + onevent: (event: Event) => void, + opts: { id?: string; oneose?: () => void } = {}, + ): { id: string; close: () => void } { + const id = opts.id ?? `sub-${++this.counter}`; + const s: PoolSubscription = { id, filters, onevent, oneose: opts.oneose }; + this.registry.set(id, s); + for (const r of this.relays) r.subscribeOne(id, s); + return { + id, + close: () => { + this.registry.delete(id); + for (const r of this.relays) r.closeSub(id); + }, + }; + } + + /** + * Subscribe and resolve once the FIRST EOSE arrives from any relay (the #9 + * race guard: callers must not publish before the relay has the REQ on + * file). Re-EOSEs on later reconnects are ignored. + */ + subscribeAwaitingEose( + filters: Filter[], + onevent: (event: Event) => void, + opts: { id?: string } = {}, + ): Promise<{ id: string; close: () => void }> { + return new Promise((resolve) => { + let eosed = false; + const handle = this.subscribe(filters, onevent, { + id: opts.id, + oneose: () => { + if (!eosed) { + eosed = true; + resolve(handle); + } + }, + }); + }); + } + + /** Publish to every relay; resolves if at least one accepts it. */ + async publish(event: Event): Promise { + const results = await Promise.allSettled(this.relays.map((r) => r.publish(event))); + if (!results.some((r) => r.status === "fulfilled")) { + const reasons = results + .map((r) => (r.status === "rejected" ? r.reason?.message ?? r.reason : "")) + .filter(Boolean) + .join("; "); + throw new Error(`publish failed on all relays: ${reasons}`); + } + } + + /** Relays currently holding an open socket. */ + connectedCount(): number { + return this.relays.filter((r) => r.connected).length; + } + + /** + * Session-liveness, not just socket-liveness. Healthy iff at least one relay + * is connected AND has every registered subscription active on the wire. + * This is the check the old watchdog couldn't make: after #20 a reconnected + * socket looked healthy while the subscription set was empty (#41). Here a + * connected relay that hasn't (re)subscribed the registry reads as unhealthy. + */ + healthy(): boolean { + const want = this.registry.size; + return this.relays.some((r) => r.connected && r.activeCount() >= want); + } + + private runHeartbeat(): void { + const now = Date.now(); + const elapsed = now - this.lastHeartbeat; + this.lastHeartbeat = now; + const interval = this.opts.heartbeatMs ?? 0; + const slack = this.opts.sleepSlackMs ?? 30_000; + // A tick far later than scheduled means the process was suspended + // (laptop sleep, VM pause); sockets may be half-open but look alive. + // Force a clean reconnect so subscriptions are re-established. (signet) + if (interval > 0 && elapsed > interval + slack) { + this.log( + `heartbeat: ${Math.round(elapsed / 1000)}s gap (expected ~${Math.round( + interval / 1000, + )}s) — suspected sleep/wake, forcing reconnect`, + ); + for (const r of this.relays) r.forceReconnect(); + } + } +} diff --git a/tests/helpers/mock-relay.ts b/tests/helpers/mock-relay.ts new file mode 100644 index 0000000..6689c68 --- /dev/null +++ b/tests/helpers/mock-relay.ts @@ -0,0 +1,137 @@ +import type { AddressInfo } from "net"; + +// `ws` has no @types in this tree; require it (any) — same as src/relay-pool.ts. +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { WebSocketServer, WebSocket } = require("ws"); + +/** + * Minimal in-process nostr relay for transport tests (#42). Speaks just enough + * of the protocol — REQ / CLOSE / EVENT → EVENT / EOSE / OK — to exercise the + * RelayPool's subscribe + resubscribe-on-reconnect behaviour. It can be + * `flap()`ped: drop every client + close the server, then re-listen on the SAME + * port, simulating the relay restart that took the bunker deaf (#41). + */ + +type Filter = Record; +interface Sub { + subId: string; + filters: Filter[]; + socket: any; +} + +function matches(filter: Filter, event: any): boolean { + if (filter.ids && !filter.ids.includes(event.id)) return false; + if (filter.kinds && !filter.kinds.includes(event.kind)) return false; + if (filter.authors && !filter.authors.includes(event.pubkey)) return false; + for (const key of Object.keys(filter)) { + if (key.startsWith("#")) { + const tag = key.slice(1); + const want: string[] = filter[key]; + const have = (event.tags ?? []) + .filter((t: string[]) => t[0] === tag) + .map((t: string[]) => t[1]); + if (!want.some((v) => have.includes(v))) return false; + } + } + return true; +} + +export class MockRelay { + private wss: any = null; + private subs: Sub[] = []; + private sockets: Set = new Set(); + public port = 0; + /** Total REQs seen across the relay's life — proves a (re)subscribe landed. */ + public reqCount = 0; + + get url(): string { + return `ws://127.0.0.1:${this.port}`; + } + + async start(port = 0): Promise { + await new Promise((resolve) => { + this.wss = new WebSocketServer({ port }, () => { + this.port = (this.wss!.address() as AddressInfo).port; + resolve(); + }); + this.wss.on("connection", (socket: any) => { + this.sockets.add(socket); + socket.on("message", (data: any) => this.onMessage(socket, data.toString())); + socket.on("close", () => { + this.sockets.delete(socket); + this.subs = this.subs.filter((s) => s.socket !== socket); + }); + socket.on("error", () => { + /* ignore — flapping closes sockets abruptly */ + }); + }); + }); + } + + private onMessage(socket: any, raw: string): void { + let msg: any[]; + try { + msg = JSON.parse(raw); + } catch { + return; + } + const [type, ...rest] = msg; + if (type === "REQ") { + const [subId, ...filters] = rest; + this.reqCount++; + this.subs.push({ subId, filters, socket }); + // No stored events in this mock; just acknowledge the REQ is live. + socket.send(JSON.stringify(["EOSE", subId])); + } else if (type === "CLOSE") { + const [subId] = rest; + this.subs = this.subs.filter((s) => !(s.socket === socket && s.subId === subId)); + } else if (type === "EVENT") { + const [event] = rest; + socket.send(JSON.stringify(["OK", event.id, true, ""])); + this.deliver(event); + } + } + + /** Push a server-originated event to every matching live subscription. */ + inject(event: any): void { + this.deliver(event); + } + + private deliver(event: any): void { + for (const sub of this.subs) { + if (sub.socket.readyState !== WebSocket.OPEN) continue; + if (sub.filters.some((f) => matches(f, event))) { + sub.socket.send(JSON.stringify(["EVENT", sub.subId, event])); + } + } + } + + /** Drop all clients + close the server, keeping the port for a restart. */ + private async down(): Promise { + for (const s of this.sockets) { + try { + s.terminate(); + } catch { + /* ignore */ + } + } + this.sockets.clear(); + this.subs = []; + await new Promise((resolve) => { + if (!this.wss) return resolve(); + this.wss.close(() => resolve()); + this.wss = null; + }); + } + + /** Simulate a relay restart: go down, then come back on the SAME port. */ + async flap(): Promise { + const port = this.port; + await this.down(); + await this.start(port); + } + + async stop(): Promise { + await this.down(); + } +} diff --git a/tests/relay-pool.test.ts b/tests/relay-pool.test.ts new file mode 100644 index 0000000..148f144 --- /dev/null +++ b/tests/relay-pool.test.ts @@ -0,0 +1,104 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { finalizeEvent, generateSecretKey, getPublicKey } from "nostr-tools"; +import { MockRelay } from "./helpers/mock-relay"; +import { RelayPool } from "../src/daemon/lib/relay-pool"; + +// A throwaway client key. Events must be REAL (valid id + sig): nostr-tools +// verifies inbound events and silently drops invalid ones, so the test has to +// sign them exactly as a real bunker client would. +const CLIENT_SK = generateSecretKey(); +// The spire pubkey the bunker subscribes for (`#p`), a fixture here. +const PUBKEY = "1508b42094e65dff982ac8ca5a264089f7de2d4bbda81bf32a91678f337ced3b"; + +function makeEvent(): { id: string; [k: string]: any } { + return finalizeEvent( + { + kind: 24133, + created_at: Math.floor(Date.now() / 1000), + tags: [["p", PUBKEY]], + content: "encrypted-blob", + }, + CLIENT_SK, + ) as any; +} + +void getPublicKey; // (kept available for future signed-response assertions) + +async function waitFor(pred: () => boolean, timeoutMs = 5000, stepMs = 25): Promise { + const start = Date.now(); + while (!pred()) { + if (Date.now() - start > timeoutMs) throw new Error("waitFor timed out"); + await new Promise((r) => setTimeout(r, stepMs)); + } +} + +/** + * The regression that was missing every prior round (#4/#7/#20/#21): flap the + * relay mid-session and assert a subsequent inbound kind:24133 is still + * delivered. With the old NDK transport the socket reconnected but the REQ was + * never re-sent, so the bunker went silently deaf (#41). The RelayPool owns the + * connect loop and re-subscribes on every (re)connect, so this must pass. + */ +test("RelayPool re-subscribes after a relay flap — inbound events still delivered (#41/#42)", async () => { + const relay = new MockRelay(); + await relay.start(); + + const received: string[] = []; + const pool = new RelayPool([relay.url], { log: () => {} }); + pool.start(); + + // Subscribe for the bunker's kind:24133 channel; await the REQ landing. + await pool.subscribeAwaitingEose([{ kinds: [24133], "#p": [PUBKEY] }], (e) => + received.push(e.id), + ); + + // Baseline: a matching event before any flap is delivered. + const before = makeEvent(); + relay.inject(before); + await waitFor(() => received.includes(before.id)); + + // FLAP: relay down + back up on the same port (the relay-restart that took + // the demo bunker deaf). The pool must reconnect AND re-subscribe. + const reqsBefore = relay.reqCount; + await relay.flap(); + + // Wait until reconnected AND the subscription is re-established on the wire. + await waitFor(() => pool.healthy() && relay.reqCount > reqsBefore); + + // THE assertion: a matching event AFTER the flap is still delivered. + const after = makeEvent(); + relay.inject(after); + await waitFor(() => received.includes(after.id)); + + assert.ok(received.includes(before.id), "event before flap delivered"); + assert.ok(received.includes(after.id), "event after flap delivered (resubscribed)"); + assert.ok(pool.healthy(), "pool healthy (connected + subscribed) after flap"); + + pool.stop(); + await relay.stop(); +}); + +/** + * healthy() must distinguish a connected-but-deaf relay (the #41 state the old + * connectedRelays()-only watchdog could not see) from a genuinely serving one. + */ +test("RelayPool.healthy() is false until the registry is subscribed on the wire (#42)", async () => { + const relay = new MockRelay(); + await relay.start(); + + const pool = new RelayPool([relay.url], { log: () => {} }); + pool.start(); + + // No subscriptions registered yet — but once connected with an empty + // registry, healthy() is trivially true (nothing to be deaf about). + await waitFor(() => pool.connectedCount() === 1); + assert.equal(pool.healthy(), true, "connected, empty registry -> healthy"); + + // Register a sub; healthy stays true once it lands. + await pool.subscribeAwaitingEose([{ kinds: [24133], "#p": [PUBKEY] }], () => {}); + assert.equal(pool.healthy(), true, "connected + subscribed -> healthy"); + + pool.stop(); + await relay.stop(); +});