diff --git a/tasks.py b/tasks.py index 9d840d2..956a051 100644 --- a/tasks.py +++ b/tasks.py @@ -1,15 +1,13 @@ import asyncio import json -import time -from threading import Thread from typing import List import httpx +import websockets from lnbits.core.crud import get_payment, update_payment from lnbits.core.models import Payment from lnbits.tasks import register_invoice_listener from loguru import logger -from websocket import WebSocketApp from .crud import get_or_create_lnurlp_settings, get_pay_link from .models import PayLink @@ -26,7 +24,6 @@ async def wait_for_paid_invoices(): async def on_invoice_paid(payment: Payment): - if not payment.extra or payment.extra.get("tag") != "lnurlp": return @@ -142,43 +139,29 @@ async def send_zap(payment: Payment): settings = await get_or_create_lnurlp_settings() settings.private_key.sign_event(zap_receipt) - def send(relay): - def send_event(_): - logger.debug(f"Sending zap to {ws.url}") - ws.send(zap_receipt.to_message()) - time.sleep(2) - ws.close() + async def send_to_relay(relay_url: str, event_message: str): + """Helper function to send an event to a single relay.""" + try: + async with websockets.connect(relay_url, open_timeout=5) as websocket: + logger.debug(f"Sending zap to {relay_url}") + await websocket.send(event_message) + except Exception as e: + logger.warning(f"Failed to send zap to {relay_url}: {e}") - ws = WebSocketApp(relay, on_open=send_event) - wst = Thread(target=ws.run_forever, name=f"LNURL zap {relay}") - wst.daemon = True - wst.start() - return ws, wst - - # list of all websockets - wss: List[WebSocketApp] = [] - # list of all threads for these websockets - wsts: List[Thread] = [] - - # # send zap via nostrclient - # ws, wst = send(f"wss://localhost:{settings.port}/nostrclient/api/v1/relay") - # wss += [ws] - # wsts += [wst] - - # send zap receipt to relays in zap request + # Get relays from the zap request, with a reasonable limit relays = get_tag(event_json, "relays") - if relays: - if len(relays) > 50: - relays = relays[:50] - for r in relays: - ws, wst = send(r) - wss += [ws] - wsts += [wst] + if not relays: + return zap_receipt - await asyncio.sleep(10) - for ws, wst in zip(wss, wsts): - logger.debug(f"Closing websocket {ws.url}") - ws.close() - wst.join() + if len(relays) > 50: + relays = relays[:50] + + # Create a list of tasks to run concurrently + tasks = [send_to_relay(relay, zap_receipt.to_message()) for relay in relays] + + # Run all tasks concurrently. This is a "fire-and-forget" approach. + # We don't need to wait for all of them to complete here. + for task in tasks: + asyncio.create_task(task) return zap_receipt