NIP-59 randomizes gift wrap created_at up to 2 days into the past so
metadata observers can't correlate publish moments. The lenient
`since = last_dm_time - 5min` window from commit e0fdada was designed
for NIP-04 messages where created_at is the real send time; with
gift wraps it locks out any wrap whose randomized timestamp falls
before the latest stored DM.
aio-demo symptom: established merchant (last_dm_time = today 14:40)
subscribes with `since = today 14:35`. Customer publishes a new gift
wrap whose randomized created_at is May 1 23:11. NostrFilter.matches
sees `event.created_at < self.since` and returns False — relay logs
"❌ Filter didn't match" and the order never reaches the merchant.
Fix: don't apply `since` at all on the kind 1059 filter. Replay risk
is bounded by server-side dedup and our existing
NostrClient.is_duplicate_event() guard. Other filters (stalls,
products, profiles) keep their `since` because those events use
real timestamps.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
242 lines
9 KiB
Python
242 lines
9 KiB
Python
import asyncio
|
|
import json
|
|
import time
|
|
from asyncio import Queue
|
|
from collections import OrderedDict
|
|
from threading import Thread
|
|
from typing import Callable, List, Optional
|
|
|
|
from loguru import logger
|
|
from websocket import WebSocketApp
|
|
|
|
from lnbits.settings import settings
|
|
from lnbits.helpers import encrypt_internal_message, urlsafe_short_hash
|
|
|
|
from .event import NostrEvent
|
|
|
|
MAX_SEEN_EVENTS = 1000
|
|
|
|
|
|
class NostrClient:
|
|
def __init__(self):
|
|
self.recieve_event_queue: Queue = Queue()
|
|
self.send_req_queue: Queue = Queue()
|
|
self.ws: Optional[WebSocketApp] = None
|
|
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
|
|
self.running = False
|
|
self._seen_events: OrderedDict[str, None] = OrderedDict()
|
|
self.last_event_at: float = 0
|
|
|
|
@property
|
|
def is_websocket_connected(self):
|
|
if not self.ws:
|
|
return False
|
|
return self.ws.keep_running
|
|
|
|
async def connect_to_nostrclient_ws(self) -> WebSocketApp:
|
|
logger.debug(f"Connecting to websockets for 'nostrclient' extension...")
|
|
|
|
relay_endpoint = encrypt_internal_message("relay", urlsafe=True)
|
|
ws_url = f"ws://localhost:{settings.port}/nostrclient/api/v1/{relay_endpoint}"
|
|
|
|
on_open, on_message, on_error, on_close = self._ws_handlers()
|
|
ws = WebSocketApp(
|
|
ws_url,
|
|
on_message=on_message,
|
|
on_open=on_open,
|
|
on_close=on_close,
|
|
on_error=on_error,
|
|
)
|
|
|
|
wst = Thread(target=ws.run_forever)
|
|
wst.daemon = True
|
|
wst.start()
|
|
|
|
return ws
|
|
|
|
async def run_forever(self):
|
|
self.running = True
|
|
while self.running:
|
|
try:
|
|
if not self.is_websocket_connected:
|
|
self.ws = await self.connect_to_nostrclient_ws()
|
|
# be sure the connection is open
|
|
await asyncio.sleep(5)
|
|
|
|
req = await self.send_req_queue.get()
|
|
assert self.ws
|
|
self.ws.send(json.dumps(req))
|
|
except Exception as ex:
|
|
logger.warning(ex)
|
|
await asyncio.sleep(60)
|
|
|
|
def is_duplicate_event(self, event_id: str) -> bool:
|
|
"""Check if an event has been seen recently. Returns True if duplicate."""
|
|
if event_id in self._seen_events:
|
|
return True
|
|
self._seen_events[event_id] = None
|
|
if len(self._seen_events) > MAX_SEEN_EVENTS:
|
|
self._seen_events.popitem(last=False)
|
|
return False
|
|
|
|
async def get_event(self):
|
|
value = await self.recieve_event_queue.get()
|
|
if isinstance(value, ValueError):
|
|
logger.error(f"[NOSTRMARKET] ❌ Queue returned error: {value}")
|
|
raise value
|
|
self.last_event_at = time.time()
|
|
return value
|
|
|
|
async def publish_nostr_event(self, e: NostrEvent):
|
|
await self.send_req_queue.put(["EVENT", e.dict()])
|
|
|
|
async def subscribe_merchants(
|
|
self,
|
|
public_keys: List[str],
|
|
dm_time=0,
|
|
stall_time=0,
|
|
product_time=0,
|
|
profile_time=0,
|
|
):
|
|
dm_filters = self._filters_for_direct_messages(public_keys, dm_time)
|
|
stall_filters = self._filters_for_stall_events(public_keys, stall_time)
|
|
product_filters = self._filters_for_product_events(public_keys, product_time)
|
|
profile_filters = self._filters_for_user_profile(public_keys, profile_time)
|
|
|
|
merchant_filters = (
|
|
dm_filters + stall_filters + product_filters + profile_filters
|
|
)
|
|
|
|
self.subscription_id = "nostrmarket-" + urlsafe_short_hash()[:32]
|
|
await self.send_req_queue.put(["REQ", self.subscription_id] + merchant_filters)
|
|
|
|
async def merchant_temp_subscription(self, pk, duration=10):
|
|
dm_filters = self._filters_for_direct_messages([pk], 0)
|
|
stall_filters = self._filters_for_stall_events([pk], 0)
|
|
product_filters = self._filters_for_product_events([pk], 0)
|
|
profile_filters = self._filters_for_user_profile([pk], 0)
|
|
|
|
merchant_filters = (
|
|
dm_filters + stall_filters + product_filters + profile_filters
|
|
)
|
|
|
|
subscription_id = "merchant-" + urlsafe_short_hash()[:32]
|
|
logger.debug(
|
|
f"New merchant temp subscription ({duration} sec). Subscription id: {subscription_id}"
|
|
)
|
|
await self.send_req_queue.put(["REQ", subscription_id] + merchant_filters)
|
|
|
|
async def unsubscribe_with_delay(sub_id, d):
|
|
await asyncio.sleep(d)
|
|
await self.unsubscribe(sub_id)
|
|
|
|
asyncio.create_task(unsubscribe_with_delay(subscription_id, duration))
|
|
|
|
async def user_profile_temp_subscribe(self, public_key: str, duration=5):
|
|
try:
|
|
profile_filter = [{"kinds": [0], "authors": [public_key]}]
|
|
subscription_id = "profile-" + urlsafe_short_hash()[:32]
|
|
logger.debug(
|
|
f"New user temp subscription ({duration} sec). Subscription id: {subscription_id}"
|
|
)
|
|
await self.send_req_queue.put(["REQ", subscription_id] + profile_filter)
|
|
|
|
async def unsubscribe_with_delay(sub_id, d):
|
|
await asyncio.sleep(d)
|
|
await self.unsubscribe(sub_id)
|
|
|
|
asyncio.create_task(unsubscribe_with_delay(subscription_id, duration))
|
|
except Exception as ex:
|
|
logger.debug(ex)
|
|
|
|
def _filters_for_direct_messages(self, public_keys: List[str], since: int) -> List:
|
|
# NIP-17/NIP-59: subscribe to kind 1059 gift wraps addressed to our merchants.
|
|
# With gift wrapping, outgoing messages are self-wrapped (same p-tag filter).
|
|
#
|
|
# Do NOT apply `since` here. Per NIP-59, gift wraps use randomized past
|
|
# timestamps (up to 2 days back) to defeat metadata correlation, so a
|
|
# `since` derived from the latest DM in our DB will reject fresh wraps
|
|
# whose randomized created_at is older than that window. Server-side
|
|
# dedup + the client's is_duplicate_event() guard handle replays.
|
|
gift_wrap_filter: dict = {"kinds": [1059], "#p": public_keys}
|
|
return [gift_wrap_filter]
|
|
|
|
def _filters_for_stall_events(self, public_keys: List[str], since: int) -> List:
|
|
stall_filter = {"kinds": [30017], "authors": public_keys}
|
|
if since and since != 0:
|
|
stall_filter["since"] = since
|
|
|
|
return [stall_filter]
|
|
|
|
def _filters_for_product_events(self, public_keys: List[str], since: int) -> List:
|
|
product_filter = {"kinds": [30018], "authors": public_keys}
|
|
if since and since != 0:
|
|
product_filter["since"] = since
|
|
|
|
return [product_filter]
|
|
|
|
def _filters_for_user_profile(self, public_keys: List[str], since: int) -> List:
|
|
profile_filter = {"kinds": [0], "authors": public_keys}
|
|
if since and since != 0:
|
|
profile_filter["since"] = since
|
|
|
|
return [profile_filter]
|
|
|
|
def _safe_ws_stop(self):
|
|
if not self.ws:
|
|
return
|
|
try:
|
|
self.ws.close()
|
|
except:
|
|
pass
|
|
self.ws = None
|
|
|
|
def _ws_handlers(self):
|
|
def on_open(_):
|
|
logger.debug("[NOSTRMARKET DEBUG] ✅ Connected to 'nostrclient' websocket successfully")
|
|
|
|
def on_message(_, message):
|
|
logger.debug(f"[NOSTRMARKET DEBUG] 📨 Received websocket message: {message[:200]}...")
|
|
try:
|
|
self.recieve_event_queue.put_nowait(message)
|
|
logger.debug(f"[NOSTRMARKET DEBUG] 📤 Message queued successfully")
|
|
except Exception as e:
|
|
logger.error(f"[NOSTRMARKET] ❌ Failed to queue message: {e}")
|
|
|
|
def on_error(_, error):
|
|
logger.warning(f"[NOSTRMARKET] ❌ Websocket error: {error}")
|
|
|
|
def on_close(x, status_code, message):
|
|
logger.warning(f"[NOSTRMARKET] 🔌 Websocket closed: {x}: '{status_code}' '{message}'")
|
|
# force re-subscribe
|
|
self.recieve_event_queue.put_nowait(ValueError("Websocket close."))
|
|
|
|
return on_open, on_message, on_error, on_close
|
|
|
|
async def restart(self):
|
|
await self.unsubscribe_merchants()
|
|
# Give some time for the CLOSE events to propagate before restarting
|
|
await asyncio.sleep(10)
|
|
|
|
logger.info("Restarting NostrClient...")
|
|
await self.recieve_event_queue.put(ValueError("Restarting NostrClient..."))
|
|
|
|
self._safe_ws_stop()
|
|
|
|
async def stop(self):
|
|
await self.unsubscribe_merchants()
|
|
self.running = False
|
|
|
|
# Give some time for the CLOSE events to propagate before closing the connection
|
|
await asyncio.sleep(10)
|
|
self._safe_ws_stop()
|
|
|
|
async def unsubscribe_merchants(self):
|
|
await self.send_req_queue.put(["CLOSE", self.subscription_id])
|
|
logger.debug(
|
|
f"Unsubscribed from all merchants events. Subscription id: {self.subscription_id}"
|
|
)
|
|
|
|
async def unsubscribe(self, subscription_id):
|
|
await self.send_req_queue.put(["CLOSE", subscription_id])
|
|
logger.debug(f"Unsubscribed from subscription id: {subscription_id}")
|