feat(transport): nostr-tools relay pool that re-subscribes on reconnect (#42)
Some checks failed
Docker image / build-and-push-image (push) Has been cancelled

First increment of the NDK -> nostr-tools transport swap (#42), the root fix
for #41 (bunker goes silently deaf after a relay flap).

NDK does not replay subscriptions on reconnect: a NDKRelaySubscription registers
`relay.once("ready", execute)` and never re-arms, so after a flap the socket
reconnects but the kind:24133 REQ is never re-sent. We chased that through
#4/#7/#20/#21 without closing it because it is structural in NDK.

`RelayPool` (src/daemon/lib/relay-pool.ts) owns the connect loop, modelled on
lightning.pub's RelayConnection and signet's relay-pool (both nostr-tools, both
bind resubscribe to reconnect). Every (re)connect re-subscribes the whole
registry, so subscription liveness can't drift from socket liveness. It also
exposes `healthy()` (connected AND registry subscribed on the wire) — the
session-liveness signal the old connectedRelays()-only watchdog couldn't make,
which is what let #20's reconnect mask the deaf state.

We disable nostr-tools' own `enableReconnect`: its auto-resubscribe is
version-fragile right now (regressed in 2.23.0 fb7de7f; the 455124e fix is
unreleased as of 2026-06-26), so the resubscribe is OUR code, not a function of
which nostr-tools version is installed.

Regression test (tests/relay-pool.test.ts + tests/helpers/mock-relay.ts): an
in-process mock relay flaps mid-session (down + back up on the same port) and we
assert a subsequent inbound kind:24133 is still delivered — the exact #41
scenario, and the test that was missing every prior round. Green; existing
lifecycle suite unchanged.

Next increments on this branch: port Backend (NIP-46) + AdminInterface (RPC)
onto the pool, wire run.ts, retire relay-reconnect.ts + the connection-only
watchdog.

Refs: #42, #41, #21, #20, #9
This commit is contained in:
Padreug 2026-06-26 23:42:36 +02:00
commit 1409941d11
4 changed files with 636 additions and 1 deletions

View file

@ -22,8 +22,9 @@
"build": "tsup src/index.ts; tsup src/daemon/index.ts -d dist/daemon; tsup src/client.ts -d dist/client",
"build:client": "tsup src/client.ts -d dist/client",
"test": "TS_NODE_TRANSPILE_ONLY=1 node -r ts-node/register --test tests/lifecycle.test.ts",
"test:relay": "TS_NODE_TRANSPILE_ONLY=1 node -r ts-node/register --test tests/relay-pool.test.ts",
"test:integration": "DATABASE_URL=\"file:./tests/.tmp/acl-int.db\" node -r ./tests/register-ts.cjs --test tests/acl.integration.test.ts",
"test:all": "npm run test && npm run test:integration",
"test:all": "npm run test && npm run test:relay && npm run test:integration",
"prisma:generate": "npx prisma generate",
"prisma:migrate": "npx prisma migrate deploy",
"prisma:create": "npx prisma db push --preview-feature",

View file

@ -0,0 +1,393 @@
import { Relay } from "nostr-tools";
import type { Event, Filter } from "nostr-tools";
// nostr-tools needs a WebSocket implementation injected under Node (no global
// WebSocket on the deploy target, Node 20). `useWebSocketImplementation` lives
// in the `nostr-tools/relay` subpath export, which tsc's classic
// `moduleResolution: node` can't resolve — but the build target is CommonJS and
// Node honours the package exports map at runtime, so require it. The same goes
// for `ws` (no @types/ws in the tree). `useWebSocketImplementation` sets
// nostr-tools' internal WS binding at call time, so import-order / global-capture
// don't apply.
// eslint-disable-next-line @typescript-eslint/no-var-requires
const WebSocket = require("ws");
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { useWebSocketImplementation } = require("nostr-tools/relay") as {
useWebSocketImplementation: (ws: unknown) => void;
};
useWebSocketImplementation(WebSocket);
/**
* RelayPool the daemon's relay/transport layer (aiolabs/nsecbunkerd#42).
*
* Replaces the NDK-based transport whose subscriptions did not survive a relay
* reconnect (#41): NDK registers `relay.once("ready", execute)` fires once on
* the initial connect, never re-arms so after a flap the socket reconnects but
* the kind:24133 REQ is never re-sent, and the bunker goes silently deaf. We
* chased that through #4 #7 #20 #21 without closing it, because it is
* structural in NDK.
*
* The fix, modelled on lightning.pub's `RelayConnection` and signet's
* `relay-pool` (both nostr-tools, both bind resubscribe to reconnect): **we own
* the connect loop**, and every (re)connect re-subscribes the entire registry of
* active subscriptions. Subscription liveness can no longer drift from socket
* liveness because the two are established together, atomically, on every cycle.
*
* We deliberately disable nostr-tools' own `enableReconnect`: its resubscribe
* behaviour is version-fragile right now (the auto-resubscribe regressed in
* 2.23.0 `fb7de7f` and the fix `455124e` is unreleased as of 2026-06-26), so we
* make the resubscribe OUR code instead of depending on which nostr-tools
* version is installed. See #42 for the version analysis.
*/
export interface PoolSubscription {
id: string;
filters: Filter[];
onevent: (event: Event) => void;
/** Fires on EOSE note it fires again on every reconnect's resubscribe, so
* callers that want only the FIRST EOSE (e.g. the #9 start-race guard) must
* latch it themselves. */
oneose?: () => void;
}
interface ActiveSub {
close: () => void;
}
const RECONNECT_BASE_MS = 1_000;
const RECONNECT_CAP_MS = 10_000; // match #20's cap — cheap to retry a LAN relay
/**
* A single relay connection that owns its (re)connect loop. On every successful
* connect it re-subscribes the shared registry; when the socket closes the loop
* reconnects and re-subscribes again. This is the unit that makes
* connected-but-deaf impossible.
*/
class ManagedRelay {
private relay: Relay | null = null;
private active: Map<string, ActiveSub> = new Map();
private stopped = false;
/** Interrupts a pending reconnect backoff so stop() takes effect at once. */
private wake: (() => void) | null = null;
public connected = false;
public lastConnectedAt = 0;
public lastDisconnectedAt = 0;
constructor(
public readonly url: string,
private readonly registry: Map<string, PoolSubscription>,
private readonly log: (...args: any[]) => void,
) {}
start(): void {
void this.connectLoop();
}
stop(): void {
this.stopped = true;
this.wake?.(); // break any pending reconnect backoff so we exit promptly
this.closeAllSubs();
try {
this.relay?.close();
} catch {
/* ignore */
}
this.relay = null;
this.connected = false;
}
/** Reconnect backoff wait that (a) unrefs so a stopped daemon isn't held
* open by a pending timer, and (b) can be woken early by stop(). */
private backoff(ms: number): Promise<void> {
return new Promise<void>((resolve) => {
const t = setTimeout(() => {
this.wake = null;
resolve();
}, ms);
t.unref?.();
this.wake = () => {
clearTimeout(t);
this.wake = null;
resolve();
};
});
}
/** Subscribe one entry on the live connection (no-op if not connected it
* will be picked up by the next resubscribeAll on connect). */
subscribeOne(id: string, s: PoolSubscription): void {
if (!this.relay || !this.connected) return;
// Replace any existing handle for this id (idempotent).
this.active.get(id)?.close();
try {
const sub = this.relay.subscribe(s.filters, {
onevent: (e: Event) => s.onevent(e),
oneose: () => s.oneose?.(),
});
this.active.set(id, sub);
} catch (e) {
this.log("subscribe failed", id, e);
}
}
closeSub(id: string): void {
const sub = this.active.get(id);
if (sub) {
try {
sub.close();
} catch {
/* ignore */
}
this.active.delete(id);
}
}
async publish(event: Event): Promise<void> {
if (!this.relay || !this.connected) {
throw new Error(`relay not connected: ${this.url}`);
}
await this.relay.publish(event);
}
/** Number of registry entries currently active on the wire. The watchdog
* uses this to detect a connected-but-deaf relay (active < registry). */
activeCount(): number {
return this.active.size;
}
private async connectLoop(): Promise<void> {
let failures = 0;
while (!this.stopped) {
const wasReal = await this.connectOnce();
if (this.stopped) break;
// A real connection that later dropped resets the backoff; a failed
// connect attempt grows it (capped). Either way we keep trying — a
// bunker disconnected is strictly worse than retry pressure on a LAN
// relay (#20's rationale).
failures = wasReal ? 0 : failures + 1;
const delay = Math.min(RECONNECT_BASE_MS * 2 ** failures, RECONNECT_CAP_MS);
await this.backoff(delay);
}
}
/**
* Open the socket, re-subscribe the whole registry, and resolve when the
* socket closes (or immediately if the open failed). Returns true if we had
* a real connection (so the caller resets backoff), false if the open failed.
*/
private connectOnce(): Promise<boolean> {
return new Promise<boolean>((resolve) => {
void (async () => {
let relay: Relay;
try {
// enableReconnect:false — WE own reconnect, not nostr-tools.
relay = await Relay.connect(this.url, { enableReconnect: false });
} catch (e: any) {
this.log("connect failed:", e?.message ?? e);
resolve(false);
return;
}
this.relay = relay;
this.connected = true;
this.lastConnectedAt = Date.now();
this.log(`connected; (re)subscribing ${this.registry.size} sub(s)`);
this.resubscribeAll();
relay.onclose = () => {
this.connected = false;
this.lastDisconnectedAt = Date.now();
this.closeAllSubs();
if (this.relay) {
this.relay.onclose = null;
this.relay = null;
}
this.log("disconnected");
resolve(true);
};
})();
});
}
/** Re-establish every registered subscription on the current connection.
* This is the line that fixes #41. */
private resubscribeAll(): void {
this.closeAllSubs();
for (const [id, s] of this.registry) {
this.subscribeOne(id, s);
}
}
private closeAllSubs(): void {
for (const sub of this.active.values()) {
try {
sub.close();
} catch {
/* ignore */
}
}
this.active.clear();
}
/** Force a clean reconnect (close loop reconnects resubscribes). Used by
* the pool's heartbeat after a detected time-jump (sleep/wake), when the
* socket may look alive but be stale. */
forceReconnect(): void {
if (this.relay) {
try {
this.relay.close();
} catch {
/* the onclose handler drives the reconnect */
}
}
}
}
export interface RelayPoolOptions {
log?: (...args: any[]) => void;
/** Heartbeat interval for sleep/wake detection (signet pattern). 0 disables. */
heartbeatMs?: number;
/** If a heartbeat tick is later than interval + this slack, treat it as a
* process suspend (sleep/VM-pause) and force-reconnect all relays. */
sleepSlackMs?: number;
}
/**
* A pool of ManagedRelay connections sharing one subscription registry. Mirrors
* the surface the daemon needs from the old NDK instance: subscribe / publish /
* connect-status plus a `healthy()` signal the watchdog can trust.
*/
export class RelayPool {
private readonly registry: Map<string, PoolSubscription> = new Map();
private readonly relays: ManagedRelay[];
private readonly log: (...args: any[]) => void;
private counter = 0;
private heartbeatTimer: ReturnType<typeof setInterval> | undefined;
private lastHeartbeat = 0;
constructor(
public readonly relayUrls: string[],
private readonly opts: RelayPoolOptions = {},
) {
this.log = opts.log ?? (() => {});
this.relays = relayUrls.map(
(url) =>
new ManagedRelay(url, this.registry, (...a: any[]) =>
this.log(`[relay:${url}]`, ...a),
),
);
}
/** Start every relay's connect loop + (optionally) the sleep/wake heartbeat. */
start(): void {
for (const r of this.relays) r.start();
const interval = this.opts.heartbeatMs ?? 0;
if (interval > 0) {
this.lastHeartbeat = Date.now();
this.heartbeatTimer = setInterval(() => this.runHeartbeat(), interval);
}
}
stop(): void {
if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
this.heartbeatTimer = undefined;
for (const r of this.relays) r.stop();
}
/**
* Register a subscription and establish it on all connected relays. The
* registry entry persists across reconnects (each ManagedRelay re-subscribes
* it on every connect), which is the whole point. Returns a handle whose
* `close()` removes it from the registry so it stops being replayed.
*/
subscribe(
filters: Filter[],
onevent: (event: Event) => void,
opts: { id?: string; oneose?: () => void } = {},
): { id: string; close: () => void } {
const id = opts.id ?? `sub-${++this.counter}`;
const s: PoolSubscription = { id, filters, onevent, oneose: opts.oneose };
this.registry.set(id, s);
for (const r of this.relays) r.subscribeOne(id, s);
return {
id,
close: () => {
this.registry.delete(id);
for (const r of this.relays) r.closeSub(id);
},
};
}
/**
* Subscribe and resolve once the FIRST EOSE arrives from any relay (the #9
* race guard: callers must not publish before the relay has the REQ on
* file). Re-EOSEs on later reconnects are ignored.
*/
subscribeAwaitingEose(
filters: Filter[],
onevent: (event: Event) => void,
opts: { id?: string } = {},
): Promise<{ id: string; close: () => void }> {
return new Promise((resolve) => {
let eosed = false;
const handle = this.subscribe(filters, onevent, {
id: opts.id,
oneose: () => {
if (!eosed) {
eosed = true;
resolve(handle);
}
},
});
});
}
/** Publish to every relay; resolves if at least one accepts it. */
async publish(event: Event): Promise<void> {
const results = await Promise.allSettled(this.relays.map((r) => r.publish(event)));
if (!results.some((r) => r.status === "fulfilled")) {
const reasons = results
.map((r) => (r.status === "rejected" ? r.reason?.message ?? r.reason : ""))
.filter(Boolean)
.join("; ");
throw new Error(`publish failed on all relays: ${reasons}`);
}
}
/** Relays currently holding an open socket. */
connectedCount(): number {
return this.relays.filter((r) => r.connected).length;
}
/**
* Session-liveness, not just socket-liveness. Healthy iff at least one relay
* is connected AND has every registered subscription active on the wire.
* This is the check the old watchdog couldn't make: after #20 a reconnected
* socket looked healthy while the subscription set was empty (#41). Here a
* connected relay that hasn't (re)subscribed the registry reads as unhealthy.
*/
healthy(): boolean {
const want = this.registry.size;
return this.relays.some((r) => r.connected && r.activeCount() >= want);
}
private runHeartbeat(): void {
const now = Date.now();
const elapsed = now - this.lastHeartbeat;
this.lastHeartbeat = now;
const interval = this.opts.heartbeatMs ?? 0;
const slack = this.opts.sleepSlackMs ?? 30_000;
// A tick far later than scheduled means the process was suspended
// (laptop sleep, VM pause); sockets may be half-open but look alive.
// Force a clean reconnect so subscriptions are re-established. (signet)
if (interval > 0 && elapsed > interval + slack) {
this.log(
`heartbeat: ${Math.round(elapsed / 1000)}s gap (expected ~${Math.round(
interval / 1000,
)}s) suspected sleep/wake, forcing reconnect`,
);
for (const r of this.relays) r.forceReconnect();
}
}
}

137
tests/helpers/mock-relay.ts Normal file
View file

@ -0,0 +1,137 @@
import type { AddressInfo } from "net";
// `ws` has no @types in this tree; require it (any) — same as src/relay-pool.ts.
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { WebSocketServer, WebSocket } = require("ws");
/**
* Minimal in-process nostr relay for transport tests (#42). Speaks just enough
* of the protocol REQ / CLOSE / EVENT EVENT / EOSE / OK to exercise the
* RelayPool's subscribe + resubscribe-on-reconnect behaviour. It can be
* `flap()`ped: drop every client + close the server, then re-listen on the SAME
* port, simulating the relay restart that took the bunker deaf (#41).
*/
type Filter = Record<string, any>;
interface Sub {
subId: string;
filters: Filter[];
socket: any;
}
function matches(filter: Filter, event: any): boolean {
if (filter.ids && !filter.ids.includes(event.id)) return false;
if (filter.kinds && !filter.kinds.includes(event.kind)) return false;
if (filter.authors && !filter.authors.includes(event.pubkey)) return false;
for (const key of Object.keys(filter)) {
if (key.startsWith("#")) {
const tag = key.slice(1);
const want: string[] = filter[key];
const have = (event.tags ?? [])
.filter((t: string[]) => t[0] === tag)
.map((t: string[]) => t[1]);
if (!want.some((v) => have.includes(v))) return false;
}
}
return true;
}
export class MockRelay {
private wss: any = null;
private subs: Sub[] = [];
private sockets: Set<any> = new Set();
public port = 0;
/** Total REQs seen across the relay's life — proves a (re)subscribe landed. */
public reqCount = 0;
get url(): string {
return `ws://127.0.0.1:${this.port}`;
}
async start(port = 0): Promise<void> {
await new Promise<void>((resolve) => {
this.wss = new WebSocketServer({ port }, () => {
this.port = (this.wss!.address() as AddressInfo).port;
resolve();
});
this.wss.on("connection", (socket: any) => {
this.sockets.add(socket);
socket.on("message", (data: any) => this.onMessage(socket, data.toString()));
socket.on("close", () => {
this.sockets.delete(socket);
this.subs = this.subs.filter((s) => s.socket !== socket);
});
socket.on("error", () => {
/* ignore — flapping closes sockets abruptly */
});
});
});
}
private onMessage(socket: any, raw: string): void {
let msg: any[];
try {
msg = JSON.parse(raw);
} catch {
return;
}
const [type, ...rest] = msg;
if (type === "REQ") {
const [subId, ...filters] = rest;
this.reqCount++;
this.subs.push({ subId, filters, socket });
// No stored events in this mock; just acknowledge the REQ is live.
socket.send(JSON.stringify(["EOSE", subId]));
} else if (type === "CLOSE") {
const [subId] = rest;
this.subs = this.subs.filter((s) => !(s.socket === socket && s.subId === subId));
} else if (type === "EVENT") {
const [event] = rest;
socket.send(JSON.stringify(["OK", event.id, true, ""]));
this.deliver(event);
}
}
/** Push a server-originated event to every matching live subscription. */
inject(event: any): void {
this.deliver(event);
}
private deliver(event: any): void {
for (const sub of this.subs) {
if (sub.socket.readyState !== WebSocket.OPEN) continue;
if (sub.filters.some((f) => matches(f, event))) {
sub.socket.send(JSON.stringify(["EVENT", sub.subId, event]));
}
}
}
/** Drop all clients + close the server, keeping the port for a restart. */
private async down(): Promise<void> {
for (const s of this.sockets) {
try {
s.terminate();
} catch {
/* ignore */
}
}
this.sockets.clear();
this.subs = [];
await new Promise<void>((resolve) => {
if (!this.wss) return resolve();
this.wss.close(() => resolve());
this.wss = null;
});
}
/** Simulate a relay restart: go down, then come back on the SAME port. */
async flap(): Promise<void> {
const port = this.port;
await this.down();
await this.start(port);
}
async stop(): Promise<void> {
await this.down();
}
}

104
tests/relay-pool.test.ts Normal file
View file

@ -0,0 +1,104 @@
import { test } from "node:test";
import assert from "node:assert/strict";
import { finalizeEvent, generateSecretKey, getPublicKey } from "nostr-tools";
import { MockRelay } from "./helpers/mock-relay";
import { RelayPool } from "../src/daemon/lib/relay-pool";
// A throwaway client key. Events must be REAL (valid id + sig): nostr-tools
// verifies inbound events and silently drops invalid ones, so the test has to
// sign them exactly as a real bunker client would.
const CLIENT_SK = generateSecretKey();
// The spire pubkey the bunker subscribes for (`#p`), a fixture here.
const PUBKEY = "1508b42094e65dff982ac8ca5a264089f7de2d4bbda81bf32a91678f337ced3b";
function makeEvent(): { id: string; [k: string]: any } {
return finalizeEvent(
{
kind: 24133,
created_at: Math.floor(Date.now() / 1000),
tags: [["p", PUBKEY]],
content: "encrypted-blob",
},
CLIENT_SK,
) as any;
}
void getPublicKey; // (kept available for future signed-response assertions)
async function waitFor(pred: () => boolean, timeoutMs = 5000, stepMs = 25): Promise<void> {
const start = Date.now();
while (!pred()) {
if (Date.now() - start > timeoutMs) throw new Error("waitFor timed out");
await new Promise((r) => setTimeout(r, stepMs));
}
}
/**
* The regression that was missing every prior round (#4/#7/#20/#21): flap the
* relay mid-session and assert a subsequent inbound kind:24133 is still
* delivered. With the old NDK transport the socket reconnected but the REQ was
* never re-sent, so the bunker went silently deaf (#41). The RelayPool owns the
* connect loop and re-subscribes on every (re)connect, so this must pass.
*/
test("RelayPool re-subscribes after a relay flap — inbound events still delivered (#41/#42)", async () => {
const relay = new MockRelay();
await relay.start();
const received: string[] = [];
const pool = new RelayPool([relay.url], { log: () => {} });
pool.start();
// Subscribe for the bunker's kind:24133 channel; await the REQ landing.
await pool.subscribeAwaitingEose([{ kinds: [24133], "#p": [PUBKEY] }], (e) =>
received.push(e.id),
);
// Baseline: a matching event before any flap is delivered.
const before = makeEvent();
relay.inject(before);
await waitFor(() => received.includes(before.id));
// FLAP: relay down + back up on the same port (the relay-restart that took
// the demo bunker deaf). The pool must reconnect AND re-subscribe.
const reqsBefore = relay.reqCount;
await relay.flap();
// Wait until reconnected AND the subscription is re-established on the wire.
await waitFor(() => pool.healthy() && relay.reqCount > reqsBefore);
// THE assertion: a matching event AFTER the flap is still delivered.
const after = makeEvent();
relay.inject(after);
await waitFor(() => received.includes(after.id));
assert.ok(received.includes(before.id), "event before flap delivered");
assert.ok(received.includes(after.id), "event after flap delivered (resubscribed)");
assert.ok(pool.healthy(), "pool healthy (connected + subscribed) after flap");
pool.stop();
await relay.stop();
});
/**
* healthy() must distinguish a connected-but-deaf relay (the #41 state the old
* connectedRelays()-only watchdog could not see) from a genuinely serving one.
*/
test("RelayPool.healthy() is false until the registry is subscribed on the wire (#42)", async () => {
const relay = new MockRelay();
await relay.start();
const pool = new RelayPool([relay.url], { log: () => {} });
pool.start();
// No subscriptions registered yet — but once connected with an empty
// registry, healthy() is trivially true (nothing to be deaf about).
await waitFor(() => pool.connectedCount() === 1);
assert.equal(pool.healthy(), true, "connected, empty registry -> healthy");
// Register a sub; healthy stays true once it lands.
await pool.subscribeAwaitingEose([{ kinds: [24133], "#p": [PUBKEY] }], () => {});
assert.equal(pool.healthy(), true, "connected + subscribed -> healthy");
pool.stop();
await relay.stop();
});