diff --git a/crud.py b/crud.py index 1e6780e..1fa5360 100644 --- a/crud.py +++ b/crud.py @@ -786,14 +786,17 @@ async def append_settlement_note( async def void_open_legs_for_settlement(settlement_id: str) -> None: - """Marks pending/failed legs as 'voided' before re-running distribution - on a partial-dispense recompute. Preserves the rows for audit but stops - them from being interpreted as live.""" + """Marks open legs as 'voided' before re-running distribution on a + partial-dispense recompute. Preserves the rows for audit but stops + them from being interpreted as live. Includes 'skipped' so that audit + rows from a prior attempt don't double-count once the new attempt + writes its own (possibly different) skipped reasons.""" await db.execute( """ UPDATE satoshimachine.dca_payments SET status = 'voided' - WHERE settlement_id = :sid AND status IN ('pending', 'failed') + WHERE settlement_id = :sid + AND status IN ('pending', 'failed', 'skipped') """, {"sid": settlement_id}, ) diff --git a/distribution.py b/distribution.py index 1941838..94e181d 100644 --- a/distribution.py +++ b/distribution.py @@ -69,6 +69,48 @@ def _payment_tag(machine: Machine) -> str: return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}" +async def _record_skipped_leg( + settlement: DcaSettlement, + machine: Machine, + leg_type: str, + amount_sats: int, + reason: str, + client_id: str | None = None, +) -> None: + """Audit row for sats intentionally left in the machine wallet. + + Distinct from 'failed' (which means pay_invoice errored). 'skipped' means + we never attempted the pay — by design, because some prerequisite was + missing (super wallet not configured, no operator ruleset, no exchange + rate, no eligible LPs). Operator sees these in payment history and on + the settlement detail blob; the audit trail explains where un-paid + sats are sitting. + """ + if amount_sats <= 0: + return + leg = await create_dca_payment( + CreateDcaPaymentData( + settlement_id=settlement.id, + client_id=client_id, + machine_id=machine.id, + operator_user_id=machine.operator_user_id, + leg_type=leg_type, + destination_wallet_id=None, + destination_ln_address=None, + amount_sats=amount_sats, + amount_fiat=None, + exchange_rate=None, + transaction_time=datetime.now(timezone.utc), + external_payment_hash=None, + ) + ) + await update_payment_status(leg.id, "skipped", None, reason[:512]) + logger.info( + f"distribution: skipped {leg_type} leg " + f"({amount_sats} sats) — {reason}" + ) + + def _resolve_partial_dispense_gross( settlement: DcaSettlement, data: PartialDispenseData ) -> int: @@ -361,11 +403,13 @@ async def _pay_super_fee( return if super_config is None or not super_config.super_fee_wallet_id: # Super has configured a fee but not a destination wallet — leave - # the sats in the machine wallet and warn. The super needs to - # configure their wallet before they can collect. - logger.warning( - f"distribution: super_fee_sats={settlement.platform_fee_sats} " - f"left in machine wallet (super_fee_wallet_id not set)" + # the sats in the machine wallet and record a skipped audit row. + # The super needs to configure their wallet before they can collect. + await _record_skipped_leg( + settlement, machine, + leg_type="super_fee", + amount_sats=settlement.platform_fee_sats, + reason="super_fee_wallet_id not configured by LNbits super", ) return await _pay_internal( @@ -396,10 +440,14 @@ async def _pay_operator_splits( machine.operator_user_id, machine.id ) if not splits: - logger.warning( - f"distribution: operator_fee_sats={settlement.operator_fee_sats} " - f"left in machine wallet (operator has no commission_splits ruleset " - f"for machine {machine.id})" + await _record_skipped_leg( + settlement, machine, + leg_type="operator_split", + amount_sats=settlement.operator_fee_sats, + reason=( + "operator has no commission_splits ruleset for this machine " + "(neither per-machine override nor operator default)" + ), ) return # Pure allocator handles the rounding rule (last leg absorbs remainder). @@ -443,14 +491,25 @@ async def _pay_dca_distributions( # Fallback path with no exchange rate (bitSpire Payment.extra absent). # Without a rate we can't compute fiat balances → can't compute # proportional shares → leave net_sats in the machine wallet for - # the operator to manually reconcile. - logger.warning( - f"distribution: net_sats={settlement.net_sats} left in machine " - f"wallet (no exchange_rate; fallback path; see lamassu-next#44)" + # manual reconciliation. Audit row makes the strand visible. + await _record_skipped_leg( + settlement, machine, + leg_type="dca", + amount_sats=settlement.net_sats, + reason=( + "no exchange_rate on settlement (bitSpire fallback path; " + "see aiolabs/lamassu-next#44)" + ), ) return clients = await get_flow_mode_clients_for_machine(machine.id) if not clients: + await _record_skipped_leg( + settlement, machine, + leg_type="dca", + amount_sats=settlement.net_sats, + reason="no active flow-mode LPs registered at this machine", + ) return # Build {client_id: remaining_fiat_balance} for proportional allocation. client_balances: dict[str, float] = {} @@ -460,6 +519,15 @@ async def _pay_dca_distributions( continue client_balances[client.id] = summary.remaining_balance if not client_balances: + await _record_skipped_leg( + settlement, machine, + leg_type="dca", + amount_sats=settlement.net_sats, + reason=( + "no LP has remaining-fiat-balance > 0 — all confirmed deposits " + "already paid out" + ), + ) return # Compute proportional sat allocations, then cap each at the client's # remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard). diff --git a/models.py b/models.py index 1d23a77..cfae3a2 100644 --- a/models.py +++ b/models.py @@ -323,7 +323,16 @@ class DcaPayment(BaseModel): exchange_rate: Optional[float] transaction_time: datetime external_payment_hash: Optional[str] - status: str # 'pending' | 'completed' | 'failed' | 'refunded' + status: str + # Leg status enum: + # 'pending' — row written, payment not yet attempted + # 'completed' — pay_invoice succeeded; sats moved + # 'failed' — pay_invoice errored; sats stayed at source + # 'voided' — superseded (e.g. partial-dispense recompute voided + # the previous pending/failed leg) + # 'skipped' — intentionally not paid (no super wallet configured, + # no commission ruleset, no exchange rate, no LPs) + # 'refunded' — reserved for future refund flows error_message: Optional[str] created_at: datetime diff --git a/tasks.py b/tasks.py index ba5050e..68bbd30 100644 --- a/tasks.py +++ b/tasks.py @@ -1,4 +1,4 @@ -# Satoshi Machine v2 — invoice listener (P1). +# Satoshi Machine v2 — invoice listener (P1 + fix bundle 2). # # Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then # for each successful inbound payment: @@ -7,11 +7,10 @@ # Falls back to machine.fallback_commission_pct if extra is absent. # 3. Computes the two-stage split (super_fee first, operator remainder). # 4. Inserts a dca_settlements row idempotently (keyed by payment_hash). -# -# The actual distribution of sats — paying out the LP DCA legs, the super-fee -# leg, and the operator's commission-split legs — happens in a separate -# settlement-processor task (P2). This listener only LANDS the settlement -# row; status='pending' tells the processor it still needs to move the money. +# 5. Spawns the distribution processor on a background task so the +# LNbits invoice queue (which serves ALL extensions on the node) +# keeps draining while we move sats. Concurrency is safe because +# process_settlement now uses an optimistic-lock claim (fix bundle 1). import asyncio @@ -29,6 +28,12 @@ from .distribution import process_settlement LISTENER_NAME = "ext_satmachineadmin" +# Holds strong refs to in-flight distribution tasks so Python's GC doesn't +# collect them mid-flight (asyncio.create_task only weakly references its +# task once awaiters drop). Tasks self-clean by removing themselves on +# completion via the done_callback below. +_inflight_distributions: set = set() + async def wait_for_paid_invoices() -> None: invoice_queue: asyncio.Queue = asyncio.Queue() @@ -79,10 +84,15 @@ async def _handle_payment(payment: Payment) -> None: f"(super_fee={data.platform_fee_sats} " f"operator_fee={data.operator_fee_sats}){fb}" ) - # Trigger distribution synchronously so latency is one bitSpire-tx wide. - # process_settlement is idempotent (status='processed' guard); if this - # task crashes mid-process, the next manual or scheduled retry resumes. - await process_settlement(settlement.id) + # Spawn distribution on a background task so the LNbits invoice queue + # (shared across all extensions) keeps draining while we move sats. + # Concurrency-safe: process_settlement uses claim_settlement_for_processing + # so a listener re-fire can't double-process. Listener latency is now + # bounded by the create_settlement_idempotent insert, not by the N+M + # internal pay_invoice round-trips of a full distribution. + task = asyncio.create_task(process_settlement(settlement.id)) + _inflight_distributions.add(task) + task.add_done_callback(_inflight_distributions.discard) async def hourly_transaction_polling() -> None: