From b91e49b64250f20ddf4b5a7272cd5bbb481b9a32 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 14:48:44 +0200 Subject: [PATCH] feat(v2): wire bitSpire invoice listener + settlement landing (P1a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the no-op tasks.py stub with a real invoice listener that lands bitSpire settlements idempotently into dca_settlements. Architecture: satmachineadmin runs *inside* the LNbits process, so it plugs into LNbits' canonical extension hook (register_invoice_listener from lnbits.tasks) instead of going through the Nostr transport layer. External clients like bitSpire use Nostr; internal extensions consume the resulting Payment objects directly. One invoice_listener queue per extension, dispatched by invoice_callback_dispatcher. Flow: bitSpire ATM (Nostr kind-21000) → LNbits nostr_transport handler → core Payment system (create_invoice + status=SUCCESS on settle) → invoice_callback_dispatcher → satmachineadmin's invoice_queue → _handle_payment filters by wallet_id → active machine → bitspire.parse_settlement reads Payment.extra (or back-derives) → create_settlement_idempotent (keyed on payment_hash UNIQUE) The parser (new bitspire.py module) is bitSpire-specific: - Happy path (post-aiolabs/lamassu-next#44): Payment.extra carries {source:"bitspire", net_sats, fee_sats, fee_pct, exchange_rate, currency, txid, machine_npub, bills, cassettes}. Read directly, zero back-derivation. - Fallback path (pre-#44): extra is absent. Back-derive the split using machine.fallback_commission_pct with the Lamassu-style formula (calculations.calculate_commission), mark used_fallback_split=true, log a WARNING that namechecks the upstream issue so it's findable in logs. Two-stage commission split (super first, operator remainder) is computed at land time so the audit row is complete: platform_fee_sats = round(commission_sats * super_fee_pct) operator_fee_sats = commission_sats - platform_fee_sats The actual payout (LP DCA legs + super-fee leg + operator-split legs) happens in a separate settlement-processor task in P2. P1 only LANDS the settlement with status='pending'. Smoke-tested both paths against real LNbits 1.4 (nostr-transport venv): happy: 266800 gross → 258835 net + 7965 commission (2390 super @ 30%, 5575 operator) fallback: 266800 gross → 254095 net + 12705 commission @ 5% default Also adds crud.get_active_machine_by_wallet_id, the lookup that gates inbound payments to known machine wallets. Refs: aiolabs/satmachineadmin#9, aiolabs/lamassu-next#44 Co-Authored-By: Claude Opus 4.7 (1M context) --- bitspire.py | 176 ++++++++++++++++++++++++++++++++++++++++++++++++++++ crud.py | 13 ++++ tasks.py | 82 ++++++++++++++++++++---- 3 files changed, 258 insertions(+), 13 deletions(-) create mode 100644 bitspire.py diff --git a/bitspire.py b/bitspire.py new file mode 100644 index 0000000..195c0a9 --- /dev/null +++ b/bitspire.py @@ -0,0 +1,176 @@ +# Satoshi Machine v2 — bitSpire payment parser. +# +# Translates an inbound LNbits Payment (cash-out customer paid the ATM's +# invoice) into the principal/commission split needed by satmachineadmin. +# +# Happy path: bitSpire populates Payment.extra with the canonical split +# fields per aiolabs/lamassu-next#44 — we read them directly. +# +# Fallback path: extra is missing (older bitSpire, edge case). We back-derive +# the split from the machine's fallback_commission_pct using the Lamassu-era +# formula (base = total / (1 + commission)) and mark used_fallback_split=true +# so the audit trail shows we estimated. + +from __future__ import annotations + +import json +from typing import Any, Optional, Tuple + +from loguru import logger + +from .calculations import calculate_commission +from .models import CreateDcaSettlementData, Machine + +# Sentinel value bitSpire sets in Payment.extra.source so we know an inbound +# payment originated from an ATM cash-out and not some other extension or +# customer-initiated transfer. +BITSPIRE_SOURCE = "bitspire" + + +def _coerce_int(v: Any) -> Optional[int]: + if v is None: + return None + try: + return int(v) + except (TypeError, ValueError): + return None + + +def _coerce_float(v: Any) -> Optional[float]: + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + +def _coerce_str(v: Any) -> Optional[str]: + if v is None: + return None + return str(v) if not isinstance(v, str) else v + + +def _json_dumps(v: Any) -> Optional[str]: + if v is None: + return None + try: + return json.dumps(v) + except (TypeError, ValueError): + return None + + +def is_bitspire_payment(extra: dict) -> bool: + """True if Payment.extra carries the bitSpire source marker (post-#44).""" + return isinstance(extra, dict) and extra.get("source") == BITSPIRE_SOURCE + + +def parse_settlement( + machine: Machine, + payment_hash: str, + gross_sats: int, + extra: dict, + super_fee_pct: float, +) -> Tuple[CreateDcaSettlementData, bool]: + """Build a CreateDcaSettlementData for an inbound payment landing on + `machine`'s wallet. + + Returns (data, used_fallback): when `used_fallback` is True, bitSpire + didn't populate Payment.extra so we back-derived the split. Caller + should log this for visibility — once aiolabs/lamassu-next#44 ships, + fallback usage should drop to zero. + """ + if is_bitspire_payment(extra): + data = _parse_extra(machine, payment_hash, gross_sats, extra, super_fee_pct) + return data, False + logger.warning( + f"satmachineadmin: settlement on machine {machine.machine_npub[:12]}... " + f"missing bitSpire extra metadata; back-deriving via " + f"fallback_commission_pct={machine.fallback_commission_pct}. " + f"See aiolabs/lamassu-next#44." + ) + return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct), True + + +def _parse_extra( + machine: Machine, + payment_hash: str, + gross_sats: int, + extra: dict, + super_fee_pct: float, +) -> CreateDcaSettlementData: + """Happy path: bitSpire populated Payment.extra per lamassu-next#44.""" + net_sats = _coerce_int(extra.get("net_sats")) + fee_sats = _coerce_int(extra.get("fee_sats")) + if net_sats is None or fee_sats is None: + # Missing key fields — shouldn't happen post-#44 but defensive. + return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct) + commission_sats = fee_sats + platform_fee_sats = round(commission_sats * super_fee_pct) + operator_fee_sats = commission_sats - platform_fee_sats + exchange_rate = _coerce_float(extra.get("exchange_rate")) + if exchange_rate is None or exchange_rate <= 0: + # Without exchange rate we can't compute fiat. Use 1.0 as a stand-in + # and let the operator correct via manual reconciliation. + exchange_rate = 1.0 + fiat_amount = round(gross_sats / exchange_rate, 2) if exchange_rate > 0 else 0.0 + fiat_code = _coerce_str(extra.get("currency")) or machine.fiat_code + return CreateDcaSettlementData( + machine_id=machine.id, + payment_hash=payment_hash, + bitspire_event_id=None, + bitspire_txid=_coerce_str(extra.get("txid")), + gross_sats=gross_sats, + fiat_amount=fiat_amount, + fiat_code=fiat_code, + exchange_rate=exchange_rate, + net_sats=net_sats, + commission_sats=commission_sats, + platform_fee_sats=platform_fee_sats, + operator_fee_sats=operator_fee_sats, + used_fallback_split=False, + tx_type=_coerce_str(extra.get("type")) or "cash_out", + bills_json=_json_dumps(extra.get("bills")), + cassettes_json=_json_dumps(extra.get("cassettes")), + ) + + +def _parse_fallback( + machine: Machine, + payment_hash: str, + gross_sats: int, + super_fee_pct: float, +) -> CreateDcaSettlementData: + """Back-derive the split using the machine's fallback_commission_pct. + + Same formula as the Lamassu integration used: + base_amount = round(gross / (1 + commission_pct)) + commission = gross - base_amount + """ + net_sats, commission_sats, _effective = calculate_commission( + crypto_atoms=gross_sats, + commission_percentage=machine.fallback_commission_pct, + discount=0.0, + ) + platform_fee_sats = round(commission_sats * super_fee_pct) + operator_fee_sats = commission_sats - platform_fee_sats + # No exchange rate from the wire; leave fiat_amount=0 so it's visibly + # incomplete on the operator's reconciliation screen. + return CreateDcaSettlementData( + machine_id=machine.id, + payment_hash=payment_hash, + bitspire_event_id=None, + bitspire_txid=None, + gross_sats=gross_sats, + fiat_amount=0.0, + fiat_code=machine.fiat_code, + exchange_rate=0.0, + net_sats=net_sats, + commission_sats=commission_sats, + platform_fee_sats=platform_fee_sats, + operator_fee_sats=operator_fee_sats, + used_fallback_split=True, + tx_type="cash_out", + bills_json=None, + cassettes_json=None, + ) diff --git a/crud.py b/crud.py index 33f2459..614a422 100644 --- a/crud.py +++ b/crud.py @@ -118,6 +118,19 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]: ) +async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]: + """Used by the invoice listener to route an incoming payment to a machine.""" + return await db.fetchone( + """ + SELECT * FROM satoshimachine.dca_machines + WHERE wallet_id = :wid AND is_active = true + LIMIT 1 + """, + {"wid": wallet_id}, + Machine, + ) + + async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: return await db.fetchall( """ diff --git a/tasks.py b/tasks.py index 2efa8b0..a67372d 100644 --- a/tasks.py +++ b/tasks.py @@ -1,27 +1,83 @@ -# Satoshi Machine v2 — task placeholders. +# Satoshi Machine v2 — invoice listener (P1). # -# The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent. -# They will be replaced in P1 (Nostr subscription manager: subscribes via -# lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078 -# beacons + kind-30079 telemetry per registered machine, with auto-reconnect). +# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then +# for each successful inbound payment: +# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip. +# 2. Parses Payment.extra for bitSpire split metadata (post-lamassu-next#44). +# Falls back to machine.fallback_commission_pct if extra is absent. +# 3. Computes the two-stage split (super_fee first, operator remainder). +# 4. Inserts a dca_settlements row idempotently (keyed by payment_hash). # -# These no-op stubs keep __init__.py importable in the interim so the -# extension can be activated even before P1 lands. +# The actual distribution of sats — paying out the LP DCA legs, the super-fee +# leg, and the operator's commission-split legs — happens in a separate +# settlement-processor task (P2). This listener only LANDS the settlement +# row; status='pending' tells the processor it still needs to move the money. import asyncio +from lnbits.core.models import Payment +from lnbits.tasks import register_invoice_listener from loguru import logger +from .bitspire import parse_settlement +from .crud import ( + create_settlement_idempotent, + get_active_machine_by_wallet_id, + get_super_config, +) + +LISTENER_NAME = "ext_satmachineadmin" + async def wait_for_paid_invoices() -> None: - """No-op placeholder pending P1 Nostr subscription manager.""" - logger.debug( - "satmachineadmin v2: invoice listener stub running. " - "Real Nostr-transport subscription pending P1." + invoice_queue: asyncio.Queue = asyncio.Queue() + register_invoice_listener(invoice_queue, LISTENER_NAME) + logger.info( + "satmachineadmin v2: invoice listener registered as " + f"`{LISTENER_NAME}` — waiting for bitSpire settlements." ) - # Sleep forever; the task system expects a long-lived coroutine. while True: - await asyncio.sleep(3600) + payment: Payment = await invoice_queue.get() + try: + await _handle_payment(payment) + except Exception as exc: # listener must never die + logger.error( + f"satmachineadmin: error handling payment " + f"{payment.payment_hash[:12]}...: {exc}" + ) + + +async def _handle_payment(payment: Payment) -> None: + if not payment.is_in or not payment.success: + return + machine = await get_active_machine_by_wallet_id(payment.wallet_id) + if machine is None: + return + super_config = await get_super_config() + super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0 + data, used_fallback = parse_settlement( + machine=machine, + payment_hash=payment.payment_hash, + gross_sats=payment.sat, + extra=payment.extra or {}, + super_fee_pct=super_fee_pct, + ) + settlement = await create_settlement_idempotent(data) + if settlement is None: + logger.error( + f"satmachineadmin: failed to insert settlement for " + f"payment_hash={payment.payment_hash[:12]}..." + ) + return + fb = " (fallback split)" if used_fallback else "" + logger.info( + f"satmachineadmin: landed settlement {settlement.id} for " + f"machine={machine.machine_npub[:12]}... " + f"gross={data.gross_sats}sats net={data.net_sats}sats " + f"commission={data.commission_sats}sats " + f"(super_fee={data.platform_fee_sats} " + f"operator_fee={data.operator_fee_sats}){fb}" + ) async def hourly_transaction_polling() -> None: