feat: add batch invoice polling and persistence for StrikeWallet (#3300)
This commit is contained in:
parent
bf06def9b7
commit
4cf9fae3e3
1 changed files with 54 additions and 15 deletions
|
|
@ -1,4 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
from collections.abc import AsyncGenerator
|
from collections.abc import AsyncGenerator
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
|
|
@ -110,8 +112,17 @@ class StrikeWallet(Wallet):
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# runtime state
|
|
||||||
self.pending_invoices: list[str] = [] # Keep it as a list
|
self.pending_invoices: list[str] = [] # Keep it as a list
|
||||||
|
# path for persisting pending invoices
|
||||||
|
self.state_path = os.path.join(
|
||||||
|
settings.lnbits_data_folder, "strike_pending_invoices.json"
|
||||||
|
)
|
||||||
|
# load persisted pending invoices
|
||||||
|
try:
|
||||||
|
with open(self.state_path) as f:
|
||||||
|
self.pending_invoices = json.load(f)
|
||||||
|
except Exception:
|
||||||
|
self.pending_invoices = []
|
||||||
self.pending_payments: dict[str, str] = {}
|
self.pending_payments: dict[str, str] = {}
|
||||||
self.failed_payments: dict[str, str] = {}
|
self.failed_payments: dict[str, str] = {}
|
||||||
|
|
||||||
|
|
@ -120,6 +131,13 @@ class StrikeWallet(Wallet):
|
||||||
self._cached_balance_ts: float = 0.0
|
self._cached_balance_ts: float = 0.0
|
||||||
self._cache_ttl = 30 # seconds
|
self._cache_ttl = 30 # seconds
|
||||||
|
|
||||||
|
def _persist_pending(self):
|
||||||
|
try:
|
||||||
|
with open(self.state_path, "w") as f:
|
||||||
|
json.dump(self.pending_invoices, f)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Could not persist pending invoices: {e}")
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
async def cleanup(self) -> None:
|
||||||
try:
|
try:
|
||||||
await self.client.aclose()
|
await self.client.aclose()
|
||||||
|
|
@ -235,6 +253,7 @@ class StrikeWallet(Wallet):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.pending_invoices.append(invoice_id)
|
self.pending_invoices.append(invoice_id)
|
||||||
|
self._persist_pending()
|
||||||
return InvoiceResponse(
|
return InvoiceResponse(
|
||||||
ok=True, checking_id=invoice_id, payment_request=bolt11
|
ok=True, checking_id=invoice_id, payment_request=bolt11
|
||||||
)
|
)
|
||||||
|
|
@ -371,15 +390,36 @@ class StrikeWallet(Wallet):
|
||||||
logger.debug(f"Error while fetching payment {checking_id}.")
|
logger.debug(f"Error while fetching payment {checking_id}.")
|
||||||
return PaymentPendingStatus()
|
return PaymentPendingStatus()
|
||||||
|
|
||||||
|
async def _get_invoices_status_batch(
|
||||||
|
self, invoice_ids: list[str]
|
||||||
|
) -> dict[str, PaymentStatus]:
|
||||||
|
out: dict[str, PaymentStatus] = {}
|
||||||
|
if not invoice_ids:
|
||||||
|
return out
|
||||||
|
ids_list = ",".join(f"'{i}'" for i in invoice_ids)
|
||||||
|
filter_expr = f"receiveRequestId in ({ids_list})"
|
||||||
|
params = {"$filter": filter_expr, "$top": len(invoice_ids)}
|
||||||
|
r = await self._get("/receive-requests/receives", params=params)
|
||||||
|
r.raise_for_status()
|
||||||
|
items = r.json().get("items") or r.json().get("value") or []
|
||||||
|
completed = {item.get("receiveRequestId") for item in items}
|
||||||
|
for inv in invoice_ids:
|
||||||
|
out[inv] = (
|
||||||
|
PaymentSuccessStatus(fee_msat=0)
|
||||||
|
if inv in completed
|
||||||
|
else PaymentPendingStatus()
|
||||||
|
)
|
||||||
|
return out
|
||||||
|
|
||||||
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
|
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
|
||||||
"""
|
"""
|
||||||
Poll Strike for invoice settlement while respecting the documented API limits.
|
Poll Strike for invoice settlement while respecting the documented API limits.
|
||||||
|
|
||||||
Uses dynamic adjustment of polling frequency based on activity.
|
Uses dynamic adjustment of polling frequency based on activity.
|
||||||
"""
|
"""
|
||||||
min_poll, max_poll = 1, 15
|
min_poll, max_poll = 0.2, 3 # Increase polling frequency (was 1, 15)
|
||||||
# 1,000 requests / 10 minutes = ~100 requests/minute.
|
# 1,000 requests / 10 minutes = ~100 requests/minute.
|
||||||
rate_limit = 100
|
rate_limit = 250
|
||||||
sleep_s = min_poll
|
sleep_s = min_poll
|
||||||
# Main loop for polling invoices.
|
# Main loop for polling invoices.
|
||||||
self._running = True
|
self._running = True
|
||||||
|
|
@ -391,19 +431,18 @@ class StrikeWallet(Wallet):
|
||||||
req_budget = max(
|
req_budget = max(
|
||||||
1, rate_limit * sleep_s // 60
|
1, rate_limit * sleep_s // 60
|
||||||
) # Calculate request budget based on sleep time.
|
) # Calculate request budget based on sleep time.
|
||||||
|
batch = list(self.pending_invoices)[: int(req_budget)]
|
||||||
processed = 0
|
processed = 0
|
||||||
|
if batch:
|
||||||
for inv in list(self.pending_invoices):
|
statuses = await self._get_invoices_status_batch(batch)
|
||||||
if processed >= req_budget: # If request budget is exhausted.
|
processed = 1
|
||||||
break
|
for inv, status in statuses.items():
|
||||||
status = await self.get_invoice_status(inv)
|
if status.success or status.failed:
|
||||||
processed += 1
|
self.pending_invoices.remove(inv)
|
||||||
|
self._persist_pending()
|
||||||
if status.success or status.failed:
|
if status.success:
|
||||||
self.pending_invoices.remove(inv)
|
had_activity = True
|
||||||
if status.success:
|
yield inv
|
||||||
had_activity = True
|
|
||||||
yield inv
|
|
||||||
|
|
||||||
# Dynamic adjustment of polling frequency based on activity.
|
# Dynamic adjustment of polling frequency based on activity.
|
||||||
sleep_s = (
|
sleep_s = (
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue