fix: queue outgoing events when relay connection is down
Some checks failed
ci.yml / fix: queue outgoing events when relay connection is down (pull_request) Failing after 0s
Some checks failed
ci.yml / fix: queue outgoing events when relay connection is down (pull_request) Failing after 0s
When all relay connections are temporarily lost, EVENT messages published by extensions (nostrmarket, events) are now queued in a bounded deque (max 100) instead of being silently dropped. On reconnection, queued events are flushed to all connected relays. Dead relay queues are also drained before restart to preserve in-flight events. Closes aiolabs/nostrclient#1 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
801ce44561
commit
115e869225
3 changed files with 128 additions and 2 deletions
|
|
@ -1,6 +1,9 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from queue import Queue
|
||||
|
||||
from loguru import logger
|
||||
|
|
@ -28,6 +31,7 @@ class Relay:
|
|||
self.num_subscriptions: int = 0
|
||||
|
||||
self.queue: Queue = Queue()
|
||||
self.on_connect: Callable[[Relay], None] | None = None
|
||||
|
||||
def connect(self):
|
||||
self.ws = WebSocketApp(
|
||||
|
|
@ -97,6 +101,11 @@ class Relay:
|
|||
logger.info(f"[Relay: {self.url}] Connected.")
|
||||
self.connected = True
|
||||
self.shutdown = False
|
||||
if self.on_connect:
|
||||
try:
|
||||
self.on_connect(self)
|
||||
except Exception as e:
|
||||
logger.warning(f"[Relay: {self.url}] on_connect callback error: {e}")
|
||||
|
||||
def _on_close(self, _, status_code, message):
|
||||
logger.warning(
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
|
|
@ -9,6 +11,8 @@ from .message_pool import MessagePool, NoticeMessage
|
|||
from .relay import Relay
|
||||
from .subscription import Subscription
|
||||
|
||||
PENDING_EVENTS_MAX = 100
|
||||
|
||||
|
||||
class RelayManager:
|
||||
def __init__(self) -> None:
|
||||
|
|
@ -18,6 +22,8 @@ class RelayManager:
|
|||
self.message_pool = MessagePool()
|
||||
self._cached_subscriptions: dict[str, Subscription] = {}
|
||||
self._subscriptions_lock = threading.Lock()
|
||||
self._pending_events: deque[str] = deque(maxlen=PENDING_EVENTS_MAX)
|
||||
self._pending_events_lock = threading.Lock()
|
||||
|
||||
def add_relay(self, url: str) -> Relay:
|
||||
if url in list(self.relays.keys()):
|
||||
|
|
@ -25,6 +31,7 @@ class RelayManager:
|
|||
return self.relays[url]
|
||||
|
||||
relay = Relay(url, self.message_pool)
|
||||
relay.on_connect = self._on_relay_connect
|
||||
self.relays[url] = relay
|
||||
|
||||
self._open_connection(relay)
|
||||
|
|
@ -100,8 +107,19 @@ class RelayManager:
|
|||
relay.close()
|
||||
|
||||
def publish_message(self, message: str):
|
||||
for relay in self.relays.values():
|
||||
relay.publish(message)
|
||||
connected_relays = [
|
||||
r for r in self.relays.values() if r.connected and not r.shutdown
|
||||
]
|
||||
if connected_relays:
|
||||
for relay in self.relays.values():
|
||||
relay.publish(message)
|
||||
else:
|
||||
with self._pending_events_lock:
|
||||
self._pending_events.append(message)
|
||||
logger.warning(
|
||||
f"No connected relays. Queued outgoing event "
|
||||
f"({len(self._pending_events)}/{PENDING_EVENTS_MAX})."
|
||||
)
|
||||
|
||||
def handle_notice(self, notice: NoticeMessage):
|
||||
relay = next((r for r in self.relays.values() if r.url == notice.url))
|
||||
|
|
@ -137,7 +155,47 @@ class RelayManager:
|
|||
|
||||
logger.info(f"Restarting connection to relay '{relay.url}'")
|
||||
|
||||
self._drain_relay_queue(relay)
|
||||
self.remove_relay(relay.url)
|
||||
new_relay = self.add_relay(relay.url)
|
||||
new_relay.error_counter = relay.error_counter
|
||||
new_relay.error_list = relay.error_list
|
||||
|
||||
def _drain_relay_queue(self, relay: Relay):
|
||||
"""Move pending EVENT messages from a dead relay's queue to the
|
||||
manager's pending queue so they can be resent on reconnection."""
|
||||
drained = 0
|
||||
while not relay.queue.empty():
|
||||
try:
|
||||
message = relay.queue.get_nowait()
|
||||
data = json.loads(message)
|
||||
if data[0] == "EVENT":
|
||||
with self._pending_events_lock:
|
||||
self._pending_events.append(message)
|
||||
drained += 1
|
||||
except Exception:
|
||||
break
|
||||
if drained:
|
||||
logger.info(f"Drained {drained} pending event(s) from relay '{relay.url}'.")
|
||||
|
||||
def _on_relay_connect(self, _relay: Relay):
|
||||
self._flush_pending_events()
|
||||
|
||||
def _flush_pending_events(self):
|
||||
with self._pending_events_lock:
|
||||
if not self._pending_events:
|
||||
return
|
||||
connected_relays = [
|
||||
r for r in self.relays.values() if r.connected and not r.shutdown
|
||||
]
|
||||
if not connected_relays:
|
||||
return
|
||||
count = len(self._pending_events)
|
||||
while self._pending_events:
|
||||
message = self._pending_events.popleft()
|
||||
for relay in connected_relays:
|
||||
relay.publish(message)
|
||||
logger.info(
|
||||
f"Flushed {count} pending event(s) to "
|
||||
f"{len(connected_relays)} relay(s)."
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue