feat(v2): wire bitSpire invoice listener + settlement landing (P1a)
Replaces the no-op tasks.py stub with a real invoice listener that lands
bitSpire settlements idempotently into dca_settlements.
Architecture: satmachineadmin runs *inside* the LNbits process, so it
plugs into LNbits' canonical extension hook (register_invoice_listener
from lnbits.tasks) instead of going through the Nostr transport layer.
External clients like bitSpire use Nostr; internal extensions consume
the resulting Payment objects directly. One invoice_listener queue per
extension, dispatched by invoice_callback_dispatcher.
Flow:
bitSpire ATM (Nostr kind-21000)
→ LNbits nostr_transport handler
→ core Payment system (create_invoice + status=SUCCESS on settle)
→ invoice_callback_dispatcher
→ satmachineadmin's invoice_queue
→ _handle_payment filters by wallet_id → active machine
→ bitspire.parse_settlement reads Payment.extra (or back-derives)
→ create_settlement_idempotent (keyed on payment_hash UNIQUE)
The parser (new bitspire.py module) is bitSpire-specific:
- Happy path (post-aiolabs/lamassu-next#44): Payment.extra carries
{source:"bitspire", net_sats, fee_sats, fee_pct, exchange_rate,
currency, txid, machine_npub, bills, cassettes}. Read directly,
zero back-derivation.
- Fallback path (pre-#44): extra is absent. Back-derive the split
using machine.fallback_commission_pct with the Lamassu-style
formula (calculations.calculate_commission), mark
used_fallback_split=true, log a WARNING that namechecks the
upstream issue so it's findable in logs.
Two-stage commission split (super first, operator remainder) is
computed at land time so the audit row is complete:
platform_fee_sats = round(commission_sats * super_fee_pct)
operator_fee_sats = commission_sats - platform_fee_sats
The actual payout (LP DCA legs + super-fee leg + operator-split legs)
happens in a separate settlement-processor task in P2. P1 only LANDS
the settlement with status='pending'.
Smoke-tested both paths against real LNbits 1.4 (nostr-transport venv):
happy: 266800 gross → 258835 net + 7965 commission
(2390 super @ 30%, 5575 operator)
fallback: 266800 gross → 254095 net + 12705 commission @ 5% default
Also adds crud.get_active_machine_by_wallet_id, the lookup that gates
inbound payments to known machine wallets.
Refs: aiolabs/satmachineadmin#9, aiolabs/lamassu-next#44
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
cba327d0f0
commit
b91e49b642
3 changed files with 258 additions and 13 deletions
176
bitspire.py
Normal file
176
bitspire.py
Normal file
|
|
@ -0,0 +1,176 @@
|
||||||
|
# Satoshi Machine v2 — bitSpire payment parser.
|
||||||
|
#
|
||||||
|
# Translates an inbound LNbits Payment (cash-out customer paid the ATM's
|
||||||
|
# invoice) into the principal/commission split needed by satmachineadmin.
|
||||||
|
#
|
||||||
|
# Happy path: bitSpire populates Payment.extra with the canonical split
|
||||||
|
# fields per aiolabs/lamassu-next#44 — we read them directly.
|
||||||
|
#
|
||||||
|
# Fallback path: extra is missing (older bitSpire, edge case). We back-derive
|
||||||
|
# the split from the machine's fallback_commission_pct using the Lamassu-era
|
||||||
|
# formula (base = total / (1 + commission)) and mark used_fallback_split=true
|
||||||
|
# so the audit trail shows we estimated.
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from typing import Any, Optional, Tuple
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from .calculations import calculate_commission
|
||||||
|
from .models import CreateDcaSettlementData, Machine
|
||||||
|
|
||||||
|
# Sentinel value bitSpire sets in Payment.extra.source so we know an inbound
|
||||||
|
# payment originated from an ATM cash-out and not some other extension or
|
||||||
|
# customer-initiated transfer.
|
||||||
|
BITSPIRE_SOURCE = "bitspire"
|
||||||
|
|
||||||
|
|
||||||
|
def _coerce_int(v: Any) -> Optional[int]:
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return int(v)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _coerce_float(v: Any) -> Optional[float]:
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return float(v)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _coerce_str(v: Any) -> Optional[str]:
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
return str(v) if not isinstance(v, str) else v
|
||||||
|
|
||||||
|
|
||||||
|
def _json_dumps(v: Any) -> Optional[str]:
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return json.dumps(v)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def is_bitspire_payment(extra: dict) -> bool:
|
||||||
|
"""True if Payment.extra carries the bitSpire source marker (post-#44)."""
|
||||||
|
return isinstance(extra, dict) and extra.get("source") == BITSPIRE_SOURCE
|
||||||
|
|
||||||
|
|
||||||
|
def parse_settlement(
|
||||||
|
machine: Machine,
|
||||||
|
payment_hash: str,
|
||||||
|
gross_sats: int,
|
||||||
|
extra: dict,
|
||||||
|
super_fee_pct: float,
|
||||||
|
) -> Tuple[CreateDcaSettlementData, bool]:
|
||||||
|
"""Build a CreateDcaSettlementData for an inbound payment landing on
|
||||||
|
`machine`'s wallet.
|
||||||
|
|
||||||
|
Returns (data, used_fallback): when `used_fallback` is True, bitSpire
|
||||||
|
didn't populate Payment.extra so we back-derived the split. Caller
|
||||||
|
should log this for visibility — once aiolabs/lamassu-next#44 ships,
|
||||||
|
fallback usage should drop to zero.
|
||||||
|
"""
|
||||||
|
if is_bitspire_payment(extra):
|
||||||
|
data = _parse_extra(machine, payment_hash, gross_sats, extra, super_fee_pct)
|
||||||
|
return data, False
|
||||||
|
logger.warning(
|
||||||
|
f"satmachineadmin: settlement on machine {machine.machine_npub[:12]}... "
|
||||||
|
f"missing bitSpire extra metadata; back-deriving via "
|
||||||
|
f"fallback_commission_pct={machine.fallback_commission_pct}. "
|
||||||
|
f"See aiolabs/lamassu-next#44."
|
||||||
|
)
|
||||||
|
return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct), True
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_extra(
|
||||||
|
machine: Machine,
|
||||||
|
payment_hash: str,
|
||||||
|
gross_sats: int,
|
||||||
|
extra: dict,
|
||||||
|
super_fee_pct: float,
|
||||||
|
) -> CreateDcaSettlementData:
|
||||||
|
"""Happy path: bitSpire populated Payment.extra per lamassu-next#44."""
|
||||||
|
net_sats = _coerce_int(extra.get("net_sats"))
|
||||||
|
fee_sats = _coerce_int(extra.get("fee_sats"))
|
||||||
|
if net_sats is None or fee_sats is None:
|
||||||
|
# Missing key fields — shouldn't happen post-#44 but defensive.
|
||||||
|
return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct)
|
||||||
|
commission_sats = fee_sats
|
||||||
|
platform_fee_sats = round(commission_sats * super_fee_pct)
|
||||||
|
operator_fee_sats = commission_sats - platform_fee_sats
|
||||||
|
exchange_rate = _coerce_float(extra.get("exchange_rate"))
|
||||||
|
if exchange_rate is None or exchange_rate <= 0:
|
||||||
|
# Without exchange rate we can't compute fiat. Use 1.0 as a stand-in
|
||||||
|
# and let the operator correct via manual reconciliation.
|
||||||
|
exchange_rate = 1.0
|
||||||
|
fiat_amount = round(gross_sats / exchange_rate, 2) if exchange_rate > 0 else 0.0
|
||||||
|
fiat_code = _coerce_str(extra.get("currency")) or machine.fiat_code
|
||||||
|
return CreateDcaSettlementData(
|
||||||
|
machine_id=machine.id,
|
||||||
|
payment_hash=payment_hash,
|
||||||
|
bitspire_event_id=None,
|
||||||
|
bitspire_txid=_coerce_str(extra.get("txid")),
|
||||||
|
gross_sats=gross_sats,
|
||||||
|
fiat_amount=fiat_amount,
|
||||||
|
fiat_code=fiat_code,
|
||||||
|
exchange_rate=exchange_rate,
|
||||||
|
net_sats=net_sats,
|
||||||
|
commission_sats=commission_sats,
|
||||||
|
platform_fee_sats=platform_fee_sats,
|
||||||
|
operator_fee_sats=operator_fee_sats,
|
||||||
|
used_fallback_split=False,
|
||||||
|
tx_type=_coerce_str(extra.get("type")) or "cash_out",
|
||||||
|
bills_json=_json_dumps(extra.get("bills")),
|
||||||
|
cassettes_json=_json_dumps(extra.get("cassettes")),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_fallback(
|
||||||
|
machine: Machine,
|
||||||
|
payment_hash: str,
|
||||||
|
gross_sats: int,
|
||||||
|
super_fee_pct: float,
|
||||||
|
) -> CreateDcaSettlementData:
|
||||||
|
"""Back-derive the split using the machine's fallback_commission_pct.
|
||||||
|
|
||||||
|
Same formula as the Lamassu integration used:
|
||||||
|
base_amount = round(gross / (1 + commission_pct))
|
||||||
|
commission = gross - base_amount
|
||||||
|
"""
|
||||||
|
net_sats, commission_sats, _effective = calculate_commission(
|
||||||
|
crypto_atoms=gross_sats,
|
||||||
|
commission_percentage=machine.fallback_commission_pct,
|
||||||
|
discount=0.0,
|
||||||
|
)
|
||||||
|
platform_fee_sats = round(commission_sats * super_fee_pct)
|
||||||
|
operator_fee_sats = commission_sats - platform_fee_sats
|
||||||
|
# No exchange rate from the wire; leave fiat_amount=0 so it's visibly
|
||||||
|
# incomplete on the operator's reconciliation screen.
|
||||||
|
return CreateDcaSettlementData(
|
||||||
|
machine_id=machine.id,
|
||||||
|
payment_hash=payment_hash,
|
||||||
|
bitspire_event_id=None,
|
||||||
|
bitspire_txid=None,
|
||||||
|
gross_sats=gross_sats,
|
||||||
|
fiat_amount=0.0,
|
||||||
|
fiat_code=machine.fiat_code,
|
||||||
|
exchange_rate=0.0,
|
||||||
|
net_sats=net_sats,
|
||||||
|
commission_sats=commission_sats,
|
||||||
|
platform_fee_sats=platform_fee_sats,
|
||||||
|
operator_fee_sats=operator_fee_sats,
|
||||||
|
used_fallback_split=True,
|
||||||
|
tx_type="cash_out",
|
||||||
|
bills_json=None,
|
||||||
|
cassettes_json=None,
|
||||||
|
)
|
||||||
13
crud.py
13
crud.py
|
|
@ -118,6 +118,19 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]:
|
||||||
|
"""Used by the invoice listener to route an incoming payment to a machine."""
|
||||||
|
return await db.fetchone(
|
||||||
|
"""
|
||||||
|
SELECT * FROM satoshimachine.dca_machines
|
||||||
|
WHERE wallet_id = :wid AND is_active = true
|
||||||
|
LIMIT 1
|
||||||
|
""",
|
||||||
|
{"wid": wallet_id},
|
||||||
|
Machine,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def get_machines_for_operator(operator_user_id: str) -> List[Machine]:
|
async def get_machines_for_operator(operator_user_id: str) -> List[Machine]:
|
||||||
return await db.fetchall(
|
return await db.fetchall(
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
82
tasks.py
82
tasks.py
|
|
@ -1,27 +1,83 @@
|
||||||
# Satoshi Machine v2 — task placeholders.
|
# Satoshi Machine v2 — invoice listener (P1).
|
||||||
#
|
#
|
||||||
# The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent.
|
# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then
|
||||||
# They will be replaced in P1 (Nostr subscription manager: subscribes via
|
# for each successful inbound payment:
|
||||||
# lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078
|
# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip.
|
||||||
# beacons + kind-30079 telemetry per registered machine, with auto-reconnect).
|
# 2. Parses Payment.extra for bitSpire split metadata (post-lamassu-next#44).
|
||||||
|
# Falls back to machine.fallback_commission_pct if extra is absent.
|
||||||
|
# 3. Computes the two-stage split (super_fee first, operator remainder).
|
||||||
|
# 4. Inserts a dca_settlements row idempotently (keyed by payment_hash).
|
||||||
#
|
#
|
||||||
# These no-op stubs keep __init__.py importable in the interim so the
|
# The actual distribution of sats — paying out the LP DCA legs, the super-fee
|
||||||
# extension can be activated even before P1 lands.
|
# leg, and the operator's commission-split legs — happens in a separate
|
||||||
|
# settlement-processor task (P2). This listener only LANDS the settlement
|
||||||
|
# row; status='pending' tells the processor it still needs to move the money.
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
from lnbits.core.models import Payment
|
||||||
|
from lnbits.tasks import register_invoice_listener
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
from .bitspire import parse_settlement
|
||||||
|
from .crud import (
|
||||||
|
create_settlement_idempotent,
|
||||||
|
get_active_machine_by_wallet_id,
|
||||||
|
get_super_config,
|
||||||
|
)
|
||||||
|
|
||||||
|
LISTENER_NAME = "ext_satmachineadmin"
|
||||||
|
|
||||||
|
|
||||||
async def wait_for_paid_invoices() -> None:
|
async def wait_for_paid_invoices() -> None:
|
||||||
"""No-op placeholder pending P1 Nostr subscription manager."""
|
invoice_queue: asyncio.Queue = asyncio.Queue()
|
||||||
logger.debug(
|
register_invoice_listener(invoice_queue, LISTENER_NAME)
|
||||||
"satmachineadmin v2: invoice listener stub running. "
|
logger.info(
|
||||||
"Real Nostr-transport subscription pending P1."
|
"satmachineadmin v2: invoice listener registered as "
|
||||||
|
f"`{LISTENER_NAME}` — waiting for bitSpire settlements."
|
||||||
)
|
)
|
||||||
# Sleep forever; the task system expects a long-lived coroutine.
|
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(3600)
|
payment: Payment = await invoice_queue.get()
|
||||||
|
try:
|
||||||
|
await _handle_payment(payment)
|
||||||
|
except Exception as exc: # listener must never die
|
||||||
|
logger.error(
|
||||||
|
f"satmachineadmin: error handling payment "
|
||||||
|
f"{payment.payment_hash[:12]}...: {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_payment(payment: Payment) -> None:
|
||||||
|
if not payment.is_in or not payment.success:
|
||||||
|
return
|
||||||
|
machine = await get_active_machine_by_wallet_id(payment.wallet_id)
|
||||||
|
if machine is None:
|
||||||
|
return
|
||||||
|
super_config = await get_super_config()
|
||||||
|
super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0
|
||||||
|
data, used_fallback = parse_settlement(
|
||||||
|
machine=machine,
|
||||||
|
payment_hash=payment.payment_hash,
|
||||||
|
gross_sats=payment.sat,
|
||||||
|
extra=payment.extra or {},
|
||||||
|
super_fee_pct=super_fee_pct,
|
||||||
|
)
|
||||||
|
settlement = await create_settlement_idempotent(data)
|
||||||
|
if settlement is None:
|
||||||
|
logger.error(
|
||||||
|
f"satmachineadmin: failed to insert settlement for "
|
||||||
|
f"payment_hash={payment.payment_hash[:12]}..."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
fb = " (fallback split)" if used_fallback else ""
|
||||||
|
logger.info(
|
||||||
|
f"satmachineadmin: landed settlement {settlement.id} for "
|
||||||
|
f"machine={machine.machine_npub[:12]}... "
|
||||||
|
f"gross={data.gross_sats}sats net={data.net_sats}sats "
|
||||||
|
f"commission={data.commission_sats}sats "
|
||||||
|
f"(super_fee={data.platform_fee_sats} "
|
||||||
|
f"operator_fee={data.operator_fee_sats}){fb}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def hourly_transaction_polling() -> None:
|
async def hourly_transaction_polling() -> None:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue