nostrmarket/nostr/nostr_client.py
Padreug 50f87c9970
Some checks failed
ci.yml / fix(nip17): drop `since` filter on kind 1059 subscription (pull_request) Failing after 0s
ci.yml / fix(nip17): drop `since` filter on kind 1059 subscription (push) Failing after 0s
fix(nip17): drop since filter on kind 1059 subscription
NIP-59 randomizes gift wrap created_at up to 2 days into the past so
metadata observers can't correlate publish moments. The lenient
`since = last_dm_time - 5min` window from commit e0fdada was designed
for NIP-04 messages where created_at is the real send time; with
gift wraps it locks out any wrap whose randomized timestamp falls
before the latest stored DM.

aio-demo symptom: established merchant (last_dm_time = today 14:40)
subscribes with `since = today 14:35`. Customer publishes a new gift
wrap whose randomized created_at is May 1 23:11. NostrFilter.matches
sees `event.created_at < self.since` and returns False — relay logs
" Filter didn't match" and the order never reaches the merchant.

Fix: don't apply `since` at all on the kind 1059 filter. Replay risk
is bounded by server-side dedup and our existing
NostrClient.is_duplicate_event() guard. Other filters (stalls,
products, profiles) keep their `since` because those events use
real timestamps.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-03 17:41:30 +02:00

242 lines
9 KiB
Python

import asyncio
import json
import time
from asyncio import Queue
from collections import OrderedDict
from threading import Thread
from typing import Callable, List, Optional
from loguru import logger
from websocket import WebSocketApp
from lnbits.settings import settings
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
from .event import NostrEvent
MAX_SEEN_EVENTS = 1000
class NostrClient:
def __init__(self):
self.recieve_event_queue: Queue = Queue()
self.send_req_queue: Queue = Queue()
self.ws: Optional[WebSocketApp] = None
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
self.running = False
self._seen_events: OrderedDict[str, None] = OrderedDict()
self.last_event_at: float = 0
@property
def is_websocket_connected(self):
if not self.ws:
return False
return self.ws.keep_running
async def connect_to_nostrclient_ws(self) -> WebSocketApp:
logger.debug(f"Connecting to websockets for 'nostrclient' extension...")
relay_endpoint = encrypt_internal_message("relay", urlsafe=True)
ws_url = f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}"
on_open, on_message, on_error, on_close = self._ws_handlers()
ws = WebSocketApp(
ws_url,
on_message=on_message,
on_open=on_open,
on_close=on_close,
on_error=on_error,
)
wst = Thread(target=ws.run_forever)
wst.daemon = True
wst.start()
return ws
async def run_forever(self):
self.running = True
while self.running:
try:
if not self.is_websocket_connected:
self.ws = await self.connect_to_nostrclient_ws()
# be sure the connection is open
await asyncio.sleep(5)
req = await self.send_req_queue.get()
assert self.ws
self.ws.send(json.dumps(req))
except Exception as ex:
logger.warning(ex)
await asyncio.sleep(60)
def is_duplicate_event(self, event_id: str) -> bool:
"""Check if an event has been seen recently. Returns True if duplicate."""
if event_id in self._seen_events:
return True
self._seen_events[event_id] = None
if len(self._seen_events) > MAX_SEEN_EVENTS:
self._seen_events.popitem(last=False)
return False
async def get_event(self):
value = await self.recieve_event_queue.get()
if isinstance(value, ValueError):
logger.error(f"[NOSTRMARKET] ❌ Queue returned error: {value}")
raise value
self.last_event_at = time.time()
return value
async def publish_nostr_event(self, e: NostrEvent):
await self.send_req_queue.put(["EVENT", e.dict()])
async def subscribe_merchants(
self,
public_keys: List[str],
dm_time=0,
stall_time=0,
product_time=0,
profile_time=0,
):
dm_filters = self._filters_for_direct_messages(public_keys, dm_time)
stall_filters = self._filters_for_stall_events(public_keys, stall_time)
product_filters = self._filters_for_product_events(public_keys, product_time)
profile_filters = self._filters_for_user_profile(public_keys, profile_time)
merchant_filters = (
dm_filters + stall_filters + product_filters + profile_filters
)
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters)
async def merchant_temp_subscription(self, pk, duration=10):
dm_filters = self._filters_for_direct_messages([pk], 0)
stall_filters = self._filters_for_stall_events([pk], 0)
product_filters = self._filters_for_product_events([pk], 0)
profile_filters = self._filters_for_user_profile([pk], 0)
merchant_filters = (
dm_filters + stall_filters + product_filters + profile_filters
)
subscription_id = "merchant-" + urlsafe_short_hash()[:32]
logger.debug(
f"New merchant temp subscription ({duration} sec). Subscription id: {subscription_id}"
)
await self.send_req_queue.put(["REQ", subscription_id] + merchant_filters)
async def unsubscribe_with_delay(sub_id, d):
await asyncio.sleep(d)
await self.unsubscribe(sub_id)
asyncio.create_task(unsubscribe_with_delay(subscription_id, duration))
async def user_profile_temp_subscribe(self, public_key: str, duration=5):
try:
profile_filter = [{"kinds": [0], "authors": [public_key]}]
subscription_id = "profile-" + urlsafe_short_hash()[:32]
logger.debug(
f"New user temp subscription ({duration} sec). Subscription id: {subscription_id}"
)
await self.send_req_queue.put(["REQ", subscription_id] + profile_filter)
async def unsubscribe_with_delay(sub_id, d):
await asyncio.sleep(d)
await self.unsubscribe(sub_id)
asyncio.create_task(unsubscribe_with_delay(subscription_id, duration))
except Exception as ex:
logger.debug(ex)
def _filters_for_direct_messages(self, public_keys: List[str], since: int) -> List:
# NIP-17/NIP-59: subscribe to kind 1059 gift wraps addressed to our merchants.
# With gift wrapping, outgoing messages are self-wrapped (same p-tag filter).
#
# Do NOT apply `since` here. Per NIP-59, gift wraps use randomized past
# timestamps (up to 2 days back) to defeat metadata correlation, so a
# `since` derived from the latest DM in our DB will reject fresh wraps
# whose randomized created_at is older than that window. Server-side
# dedup + the client's is_duplicate_event() guard handle replays.
gift_wrap_filter: dict = {"kinds": [1059], "#p": public_keys}
return [gift_wrap_filter]
def _filters_for_stall_events(self, public_keys: List[str], since: int) -> List:
stall_filter = {"kinds": [30017], "authors": public_keys}
if since and since != 0:
stall_filter["since"] = since
return [stall_filter]
def _filters_for_product_events(self, public_keys: List[str], since: int) -> List:
product_filter = {"kinds": [30018], "authors": public_keys}
if since and since != 0:
product_filter["since"] = since
return [product_filter]
def _filters_for_user_profile(self, public_keys: List[str], since: int) -> List:
profile_filter = {"kinds": [0], "authors": public_keys}
if since and since != 0:
profile_filter["since"] = since
return [profile_filter]
def _safe_ws_stop(self):
if not self.ws:
return
try:
self.ws.close()
except:
pass
self.ws = None
def _ws_handlers(self):
def on_open(_):
logger.debug("[NOSTRMARKET DEBUG] ✅ Connected to 'nostrclient' websocket successfully")
def on_message(_, message):
logger.debug(f"[NOSTRMARKET DEBUG] 📨 Received websocket message: {message[:200]}...")
try:
self.recieve_event_queue.put_nowait(message)
logger.debug(f"[NOSTRMARKET DEBUG] 📤 Message queued successfully")
except Exception as e:
logger.error(f"[NOSTRMARKET] ❌ Failed to queue message: {e}")
def on_error(_, error):
logger.warning(f"[NOSTRMARKET] ❌ Websocket error: {error}")
def on_close(x, status_code, message):
logger.warning(f"[NOSTRMARKET] 🔌 Websocket closed: {x}: '{status_code}' '{message}'")
# force re-subscribe
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))
return on_open, on_message, on_error, on_close
async def restart(self):
await self.unsubscribe_merchants()
# Give some time for the CLOSE events to propagate before restarting
await asyncio.sleep(10)
logger.info("Restarting NostrClient...")
await self.recieve_event_queue.put(ValueError("Restarting NostrClient..."))
self._safe_ws_stop()
async def stop(self):
await self.unsubscribe_merchants()
self.running = False
# Give some time for the CLOSE events to propagate before closing the connection
await asyncio.sleep(10)
self._safe_ws_stop()
async def unsubscribe_merchants(self):
await self.send_req_queue.put(["CLOSE", self.subscription_id])
logger.debug(
f"Unsubscribed from all merchants events. Subscription id: {self.subscription_id}"
)
async def unsubscribe(self, subscription_id):
await self.send_req_queue.put(["CLOSE", subscription_id])
logger.debug(f"Unsubscribed from subscription id: {subscription_id}")