fix(v2): decouple listener + skipped-leg audit (fix bundle 2)

Closes H4, H5, M8 from the v2-bitspire review (omnibus follow-up #11).

H4 — Decouple invoice listener from distribution.
  tasks._handle_payment now spawns process_settlement on a background
  task instead of awaiting it. The LNbits invoice queue is shared
  across every extension on the node; under load (a machine with 50
  LPs, a stalled internal payment, etc.) the previous synchronous path
  could freeze the queue for everyone. Concurrency is safe because
  fix bundle 1's claim_settlement_for_processing already prevents
  double-processing on listener re-fires.

  RUF006 fix: hold strong refs to in-flight tasks via a module-level
  set so the GC doesn't collect them mid-flight (asyncio.create_task
  only weakly references its task). Tasks self-clean via
  add_done_callback(set.discard).

H5 + M8 — Skipped-leg audit rows for stranded sats.
  Previously, four paths in distribution.py logged a warning and left
  sats in the machine wallet, marking the settlement 'processed' with
  no row-level visibility into where the un-paid sats sit:
    1. _pay_super_fee: super_fee_pct > 0 but super_fee_wallet_id unset
    2. _pay_operator_splits: no commission ruleset (default + override)
    3. _pay_dca_distributions: exchange_rate = 0 (fallback path)
    4. _pay_dca_distributions: no eligible LPs with positive balance
  Plus a fifth case the review didn't enumerate but is the same shape:
    5. _pay_dca_distributions: no flow-mode LPs at the machine at all

  Each now writes a dca_payments row with status='skipped', the
  intended leg_type (super_fee / operator_split / dca), the stranded
  amount in amount_sats, and a human-readable error_message explaining
  why. New _record_skipped_leg helper consolidates the pattern.

  This makes stranded sats visible in:
    - The machine detail dialog's settlements rows (the legs are
      filtered into the audit blob alongside completed/failed legs)
    - The payments CSV export
    - GET /api/v1/dca/payments?leg_type=...

  'skipped' is a documented leg-status value now (alongside pending /
  completed / failed / voided / refunded) — no schema change since
  status is TEXT.

Knock-on fix: void_open_legs_for_settlement (used by partial-dispense
recompute) now also includes status='skipped' in its WHERE clause so a
re-run doesn't double-count the audit rows from a prior attempt.

72/72 tests still pass. Lint clean.

Refs: aiolabs/satmachineadmin#11 — fix bundle 2 
Remaining in #11: H6 (partial-dispense split ratio) + fix bundle 3
(dead-code purge) + the M and N items.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-14 18:49:16 +02:00
commit ecef916dda
4 changed files with 118 additions and 28 deletions

11
crud.py
View file

@ -786,14 +786,17 @@ async def append_settlement_note(
async def void_open_legs_for_settlement(settlement_id: str) -> None: async def void_open_legs_for_settlement(settlement_id: str) -> None:
"""Marks pending/failed legs as 'voided' before re-running distribution """Marks open legs as 'voided' before re-running distribution on a
on a partial-dispense recompute. Preserves the rows for audit but stops partial-dispense recompute. Preserves the rows for audit but stops
them from being interpreted as live.""" them from being interpreted as live. Includes 'skipped' so that audit
rows from a prior attempt don't double-count once the new attempt
writes its own (possibly different) skipped reasons."""
await db.execute( await db.execute(
""" """
UPDATE satoshimachine.dca_payments UPDATE satoshimachine.dca_payments
SET status = 'voided' SET status = 'voided'
WHERE settlement_id = :sid AND status IN ('pending', 'failed') WHERE settlement_id = :sid
AND status IN ('pending', 'failed', 'skipped')
""", """,
{"sid": settlement_id}, {"sid": settlement_id},
) )

View file

@ -69,6 +69,48 @@ def _payment_tag(machine: Machine) -> str:
return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}" return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}"
async def _record_skipped_leg(
settlement: DcaSettlement,
machine: Machine,
leg_type: str,
amount_sats: int,
reason: str,
client_id: str | None = None,
) -> None:
"""Audit row for sats intentionally left in the machine wallet.
Distinct from 'failed' (which means pay_invoice errored). 'skipped' means
we never attempted the pay by design, because some prerequisite was
missing (super wallet not configured, no operator ruleset, no exchange
rate, no eligible LPs). Operator sees these in payment history and on
the settlement detail blob; the audit trail explains where un-paid
sats are sitting.
"""
if amount_sats <= 0:
return
leg = await create_dca_payment(
CreateDcaPaymentData(
settlement_id=settlement.id,
client_id=client_id,
machine_id=machine.id,
operator_user_id=machine.operator_user_id,
leg_type=leg_type,
destination_wallet_id=None,
destination_ln_address=None,
amount_sats=amount_sats,
amount_fiat=None,
exchange_rate=None,
transaction_time=datetime.now(timezone.utc),
external_payment_hash=None,
)
)
await update_payment_status(leg.id, "skipped", None, reason[:512])
logger.info(
f"distribution: skipped {leg_type} leg "
f"({amount_sats} sats) — {reason}"
)
def _resolve_partial_dispense_gross( def _resolve_partial_dispense_gross(
settlement: DcaSettlement, data: PartialDispenseData settlement: DcaSettlement, data: PartialDispenseData
) -> int: ) -> int:
@ -361,11 +403,13 @@ async def _pay_super_fee(
return return
if super_config is None or not super_config.super_fee_wallet_id: if super_config is None or not super_config.super_fee_wallet_id:
# Super has configured a fee but not a destination wallet — leave # Super has configured a fee but not a destination wallet — leave
# the sats in the machine wallet and warn. The super needs to # the sats in the machine wallet and record a skipped audit row.
# configure their wallet before they can collect. # The super needs to configure their wallet before they can collect.
logger.warning( await _record_skipped_leg(
f"distribution: super_fee_sats={settlement.platform_fee_sats} " settlement, machine,
f"left in machine wallet (super_fee_wallet_id not set)" leg_type="super_fee",
amount_sats=settlement.platform_fee_sats,
reason="super_fee_wallet_id not configured by LNbits super",
) )
return return
await _pay_internal( await _pay_internal(
@ -396,10 +440,14 @@ async def _pay_operator_splits(
machine.operator_user_id, machine.id machine.operator_user_id, machine.id
) )
if not splits: if not splits:
logger.warning( await _record_skipped_leg(
f"distribution: operator_fee_sats={settlement.operator_fee_sats} " settlement, machine,
f"left in machine wallet (operator has no commission_splits ruleset " leg_type="operator_split",
f"for machine {machine.id})" amount_sats=settlement.operator_fee_sats,
reason=(
"operator has no commission_splits ruleset for this machine "
"(neither per-machine override nor operator default)"
),
) )
return return
# Pure allocator handles the rounding rule (last leg absorbs remainder). # Pure allocator handles the rounding rule (last leg absorbs remainder).
@ -443,14 +491,25 @@ async def _pay_dca_distributions(
# Fallback path with no exchange rate (bitSpire Payment.extra absent). # Fallback path with no exchange rate (bitSpire Payment.extra absent).
# Without a rate we can't compute fiat balances → can't compute # Without a rate we can't compute fiat balances → can't compute
# proportional shares → leave net_sats in the machine wallet for # proportional shares → leave net_sats in the machine wallet for
# the operator to manually reconcile. # manual reconciliation. Audit row makes the strand visible.
logger.warning( await _record_skipped_leg(
f"distribution: net_sats={settlement.net_sats} left in machine " settlement, machine,
f"wallet (no exchange_rate; fallback path; see lamassu-next#44)" leg_type="dca",
amount_sats=settlement.net_sats,
reason=(
"no exchange_rate on settlement (bitSpire fallback path; "
"see aiolabs/lamassu-next#44)"
),
) )
return return
clients = await get_flow_mode_clients_for_machine(machine.id) clients = await get_flow_mode_clients_for_machine(machine.id)
if not clients: if not clients:
await _record_skipped_leg(
settlement, machine,
leg_type="dca",
amount_sats=settlement.net_sats,
reason="no active flow-mode LPs registered at this machine",
)
return return
# Build {client_id: remaining_fiat_balance} for proportional allocation. # Build {client_id: remaining_fiat_balance} for proportional allocation.
client_balances: dict[str, float] = {} client_balances: dict[str, float] = {}
@ -460,6 +519,15 @@ async def _pay_dca_distributions(
continue continue
client_balances[client.id] = summary.remaining_balance client_balances[client.id] = summary.remaining_balance
if not client_balances: if not client_balances:
await _record_skipped_leg(
settlement, machine,
leg_type="dca",
amount_sats=settlement.net_sats,
reason=(
"no LP has remaining-fiat-balance > 0 — all confirmed deposits "
"already paid out"
),
)
return return
# Compute proportional sat allocations, then cap each at the client's # Compute proportional sat allocations, then cap each at the client's
# remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard). # remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard).

View file

@ -323,7 +323,16 @@ class DcaPayment(BaseModel):
exchange_rate: Optional[float] exchange_rate: Optional[float]
transaction_time: datetime transaction_time: datetime
external_payment_hash: Optional[str] external_payment_hash: Optional[str]
status: str # 'pending' | 'completed' | 'failed' | 'refunded' status: str
# Leg status enum:
# 'pending' — row written, payment not yet attempted
# 'completed' — pay_invoice succeeded; sats moved
# 'failed' — pay_invoice errored; sats stayed at source
# 'voided' — superseded (e.g. partial-dispense recompute voided
# the previous pending/failed leg)
# 'skipped' — intentionally not paid (no super wallet configured,
# no commission ruleset, no exchange rate, no LPs)
# 'refunded' — reserved for future refund flows
error_message: Optional[str] error_message: Optional[str]
created_at: datetime created_at: datetime

View file

@ -1,4 +1,4 @@
# Satoshi Machine v2 — invoice listener (P1). # Satoshi Machine v2 — invoice listener (P1 + fix bundle 2).
# #
# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then # Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then
# for each successful inbound payment: # for each successful inbound payment:
@ -7,11 +7,10 @@
# Falls back to machine.fallback_commission_pct if extra is absent. # Falls back to machine.fallback_commission_pct if extra is absent.
# 3. Computes the two-stage split (super_fee first, operator remainder). # 3. Computes the two-stage split (super_fee first, operator remainder).
# 4. Inserts a dca_settlements row idempotently (keyed by payment_hash). # 4. Inserts a dca_settlements row idempotently (keyed by payment_hash).
# # 5. Spawns the distribution processor on a background task so the
# The actual distribution of sats — paying out the LP DCA legs, the super-fee # LNbits invoice queue (which serves ALL extensions on the node)
# leg, and the operator's commission-split legs — happens in a separate # keeps draining while we move sats. Concurrency is safe because
# settlement-processor task (P2). This listener only LANDS the settlement # process_settlement now uses an optimistic-lock claim (fix bundle 1).
# row; status='pending' tells the processor it still needs to move the money.
import asyncio import asyncio
@ -29,6 +28,12 @@ from .distribution import process_settlement
LISTENER_NAME = "ext_satmachineadmin" LISTENER_NAME = "ext_satmachineadmin"
# Holds strong refs to in-flight distribution tasks so Python's GC doesn't
# collect them mid-flight (asyncio.create_task only weakly references its
# task once awaiters drop). Tasks self-clean by removing themselves on
# completion via the done_callback below.
_inflight_distributions: set = set()
async def wait_for_paid_invoices() -> None: async def wait_for_paid_invoices() -> None:
invoice_queue: asyncio.Queue = asyncio.Queue() invoice_queue: asyncio.Queue = asyncio.Queue()
@ -79,10 +84,15 @@ async def _handle_payment(payment: Payment) -> None:
f"(super_fee={data.platform_fee_sats} " f"(super_fee={data.platform_fee_sats} "
f"operator_fee={data.operator_fee_sats}){fb}" f"operator_fee={data.operator_fee_sats}){fb}"
) )
# Trigger distribution synchronously so latency is one bitSpire-tx wide. # Spawn distribution on a background task so the LNbits invoice queue
# process_settlement is idempotent (status='processed' guard); if this # (shared across all extensions) keeps draining while we move sats.
# task crashes mid-process, the next manual or scheduled retry resumes. # Concurrency-safe: process_settlement uses claim_settlement_for_processing
await process_settlement(settlement.id) # so a listener re-fire can't double-process. Listener latency is now
# bounded by the create_settlement_idempotent insert, not by the N+M
# internal pay_invoice round-trips of a full distribution.
task = asyncio.create_task(process_settlement(settlement.id))
_inflight_distributions.add(task)
task.add_done_callback(_inflight_distributions.discard)
async def hourly_transaction_polling() -> None: async def hourly_transaction_polling() -> None: