From 7eea2330c98e2b28728f3251a9249caaacc85cdb Mon Sep 17 00:00:00 2001 From: Sat <792024+santyr@users.noreply.github.com> Date: Wed, 13 Aug 2025 06:07:18 -0600 Subject: [PATCH] fix(tasks): Refactor send_zap to use async websockets and prevent crashes (#90) * fix(tasks): Make send_zap non-blocking to prevent freezes The send_zap async function contained blocking calls (thread.join()) which halted the main asyncio event loop. This caused the application to become unresponsive or "freeze" until all zap receipt threads completed. Refactored the function to be fully non-blocking by removing the join() calls and the arbitrary sleep(). Zap receipts are now dispatched in background threads on a fire-and-forget basis, allowing the main application to remain responsive. * fix(tasks): Use async websockets in send_zap to prevent crashes Refactor to replace the threading and websocket-client logic with the native asyncio websockets library. Create a non-blocking asyncio task for each relay, --- tasks.py | 61 ++++++++++++++++++++------------------------------------ 1 file changed, 22 insertions(+), 39 deletions(-) diff --git a/tasks.py b/tasks.py index 9d840d2..956a051 100644 --- a/tasks.py +++ b/tasks.py @@ -1,15 +1,13 @@ import asyncio import json -import time -from threading import Thread from typing import List import httpx +import websockets from lnbits.core.crud import get_payment, update_payment from lnbits.core.models import Payment from lnbits.tasks import register_invoice_listener from loguru import logger -from websocket import WebSocketApp from .crud import get_or_create_lnurlp_settings, get_pay_link from .models import PayLink @@ -26,7 +24,6 @@ async def wait_for_paid_invoices(): async def on_invoice_paid(payment: Payment): - if not payment.extra or payment.extra.get("tag") != "lnurlp": return @@ -142,43 +139,29 @@ async def send_zap(payment: Payment): settings = await get_or_create_lnurlp_settings() settings.private_key.sign_event(zap_receipt) - def send(relay): - def send_event(_): - logger.debug(f"Sending zap to {ws.url}") - ws.send(zap_receipt.to_message()) - time.sleep(2) - ws.close() + async def send_to_relay(relay_url: str, event_message: str): + """Helper function to send an event to a single relay.""" + try: + async with websockets.connect(relay_url, open_timeout=5) as websocket: + logger.debug(f"Sending zap to {relay_url}") + await websocket.send(event_message) + except Exception as e: + logger.warning(f"Failed to send zap to {relay_url}: {e}") - ws = WebSocketApp(relay, on_open=send_event) - wst = Thread(target=ws.run_forever, name=f"LNURL zap {relay}") - wst.daemon = True - wst.start() - return ws, wst - - # list of all websockets - wss: List[WebSocketApp] = [] - # list of all threads for these websockets - wsts: List[Thread] = [] - - # # send zap via nostrclient - # ws, wst = send(f"wss://localhost:{settings.port}/nostrclient/api/v1/relay") - # wss += [ws] - # wsts += [wst] - - # send zap receipt to relays in zap request + # Get relays from the zap request, with a reasonable limit relays = get_tag(event_json, "relays") - if relays: - if len(relays) > 50: - relays = relays[:50] - for r in relays: - ws, wst = send(r) - wss += [ws] - wsts += [wst] + if not relays: + return zap_receipt - await asyncio.sleep(10) - for ws, wst in zip(wss, wsts): - logger.debug(f"Closing websocket {ws.url}") - ws.close() - wst.join() + if len(relays) > 50: + relays = relays[:50] + + # Create a list of tasks to run concurrently + tasks = [send_to_relay(relay, zap_receipt.to_message()) for relay in relays] + + # Run all tasks concurrently. This is a "fire-and-forget" approach. + # We don't need to wait for all of them to complete here. + for task in tasks: + asyncio.create_task(task) return zap_receipt