fix(tasks): Refactor send_zap to use async websockets and prevent crashes (#90)
* fix(tasks): Make send_zap non-blocking to prevent freezes The send_zap async function contained blocking calls (thread.join()) which halted the main asyncio event loop. This caused the application to become unresponsive or "freeze" until all zap receipt threads completed. Refactored the function to be fully non-blocking by removing the join() calls and the arbitrary sleep(). Zap receipts are now dispatched in background threads on a fire-and-forget basis, allowing the main application to remain responsive. * fix(tasks): Use async websockets in send_zap to prevent crashes Refactor to replace the threading and websocket-client logic with the native asyncio websockets library. Create a non-blocking asyncio task for each relay,
This commit is contained in:
parent
1455afa219
commit
7eea2330c9
1 changed files with 24 additions and 41 deletions
61
tasks.py
61
tasks.py
|
|
@ -1,15 +1,13 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import time
|
|
||||||
from threading import Thread
|
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
import websockets
|
||||||
from lnbits.core.crud import get_payment, update_payment
|
from lnbits.core.crud import get_payment, update_payment
|
||||||
from lnbits.core.models import Payment
|
from lnbits.core.models import Payment
|
||||||
from lnbits.tasks import register_invoice_listener
|
from lnbits.tasks import register_invoice_listener
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from websocket import WebSocketApp
|
|
||||||
|
|
||||||
from .crud import get_or_create_lnurlp_settings, get_pay_link
|
from .crud import get_or_create_lnurlp_settings, get_pay_link
|
||||||
from .models import PayLink
|
from .models import PayLink
|
||||||
|
|
@ -26,7 +24,6 @@ async def wait_for_paid_invoices():
|
||||||
|
|
||||||
|
|
||||||
async def on_invoice_paid(payment: Payment):
|
async def on_invoice_paid(payment: Payment):
|
||||||
|
|
||||||
if not payment.extra or payment.extra.get("tag") != "lnurlp":
|
if not payment.extra or payment.extra.get("tag") != "lnurlp":
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -142,43 +139,29 @@ async def send_zap(payment: Payment):
|
||||||
settings = await get_or_create_lnurlp_settings()
|
settings = await get_or_create_lnurlp_settings()
|
||||||
settings.private_key.sign_event(zap_receipt)
|
settings.private_key.sign_event(zap_receipt)
|
||||||
|
|
||||||
def send(relay):
|
async def send_to_relay(relay_url: str, event_message: str):
|
||||||
def send_event(_):
|
"""Helper function to send an event to a single relay."""
|
||||||
logger.debug(f"Sending zap to {ws.url}")
|
try:
|
||||||
ws.send(zap_receipt.to_message())
|
async with websockets.connect(relay_url, open_timeout=5) as websocket:
|
||||||
time.sleep(2)
|
logger.debug(f"Sending zap to {relay_url}")
|
||||||
ws.close()
|
await websocket.send(event_message)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to send zap to {relay_url}: {e}")
|
||||||
|
|
||||||
ws = WebSocketApp(relay, on_open=send_event)
|
# Get relays from the zap request, with a reasonable limit
|
||||||
wst = Thread(target=ws.run_forever, name=f"LNURL zap {relay}")
|
|
||||||
wst.daemon = True
|
|
||||||
wst.start()
|
|
||||||
return ws, wst
|
|
||||||
|
|
||||||
# list of all websockets
|
|
||||||
wss: List[WebSocketApp] = []
|
|
||||||
# list of all threads for these websockets
|
|
||||||
wsts: List[Thread] = []
|
|
||||||
|
|
||||||
# # send zap via nostrclient
|
|
||||||
# ws, wst = send(f"wss://localhost:{settings.port}/nostrclient/api/v1/relay")
|
|
||||||
# wss += [ws]
|
|
||||||
# wsts += [wst]
|
|
||||||
|
|
||||||
# send zap receipt to relays in zap request
|
|
||||||
relays = get_tag(event_json, "relays")
|
relays = get_tag(event_json, "relays")
|
||||||
if relays:
|
if not relays:
|
||||||
if len(relays) > 50:
|
return zap_receipt
|
||||||
relays = relays[:50]
|
|
||||||
for r in relays:
|
|
||||||
ws, wst = send(r)
|
|
||||||
wss += [ws]
|
|
||||||
wsts += [wst]
|
|
||||||
|
|
||||||
await asyncio.sleep(10)
|
if len(relays) > 50:
|
||||||
for ws, wst in zip(wss, wsts):
|
relays = relays[:50]
|
||||||
logger.debug(f"Closing websocket {ws.url}")
|
|
||||||
ws.close()
|
# Create a list of tasks to run concurrently
|
||||||
wst.join()
|
tasks = [send_to_relay(relay, zap_receipt.to_message()) for relay in relays]
|
||||||
|
|
||||||
|
# Run all tasks concurrently. This is a "fire-and-forget" approach.
|
||||||
|
# We don't need to wait for all of them to complete here.
|
||||||
|
for task in tasks:
|
||||||
|
asyncio.create_task(task)
|
||||||
|
|
||||||
return zap_receipt
|
return zap_receipt
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue