From cba327d0f07832c3c2ff5f803913164e2c6c9c06 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 14:46:08 +0200 Subject: [PATCH 01/10] fix(v2): use payment_hash as settlement idempotency key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The initial m005 made bitspire_event_id the UNIQUE idempotency key on dca_settlements, but settlements arriving through LNbits' invoice listener (the canonical path per nostr-transport-branch architecture) don't carry a Nostr event id at the Payment level — that's the underlying transport's concern, not exposed to extensions. The natural unique key is payment_hash: - every LN invoice has a globally unique payment_hash - subscription replays / dispatcher double-fires dedup via UNIQUE - it's always present on the Payment object the invoice_listener delivers Reshape the dca_settlements column constraints: - payment_hash: TEXT NOT NULL UNIQUE (was: NOT NULL + separate index) - bitspire_event_id: TEXT (was: NOT NULL UNIQUE) — kept nullable for a future path where we subscribe to raw kind-21000 Nostr events directly, bypassing the Payment system Also rename the CRUD helper: get_settlement_by_event_id → get_settlement_by_payment_hash, and update create_settlement_idempotent to dedup on payment_hash. CreateDcaSettlementData / DcaSettlement adjust accordingly. The schema is unshipped (v2-bitspire branch is local only) — fixing m005 in-place is appropriate. The separate dca_telemetry path for kind-30078/30079 events already uses (machine_id, beacon_received_at) semantics, so the UNIQUE-by-Nostr-event-id pattern isn't needed there either. Caught during P1a design before subscribing to register_invoice_listener. Refs: aiolabs/satmachineadmin#9, aiolabs/lamassu-next#44 Co-Authored-By: Claude Opus 4.7 (1M context) --- crud.py | 26 +++++++++++++------------- migrations.py | 29 +++++++++++++++++------------ models.py | 8 ++++---- 3 files changed, 34 insertions(+), 29 deletions(-) diff --git a/crud.py b/crud.py index 7c3d6db..33f2459 100644 --- a/crud.py +++ b/crud.py @@ -410,24 +410,24 @@ async def delete_deposit(deposit_id: str) -> None: async def create_settlement_idempotent( data: CreateDcaSettlementData, ) -> Optional[DcaSettlement]: - """Insert a settlement keyed by bitspire_event_id. Returns the inserted row - on first sight; returns the existing row if the event_id was already seen - (subscription replay, relay double-delivery). The UNIQUE constraint on - bitspire_event_id is the source of truth.""" - existing = await get_settlement_by_event_id(data.bitspire_event_id) + """Insert a settlement keyed by payment_hash. Returns the inserted row on + first sight; returns the existing row if the payment_hash was already seen + (subscription replay, dispatcher double-fire). The UNIQUE constraint on + payment_hash is the source of truth.""" + existing = await get_settlement_by_payment_hash(data.payment_hash) if existing is not None: return existing settlement_id = urlsafe_short_hash() await db.execute( """ INSERT INTO satoshimachine.dca_settlements - (id, machine_id, bitspire_event_id, bitspire_txid, payment_hash, + (id, machine_id, payment_hash, bitspire_event_id, bitspire_txid, gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats, commission_sats, platform_fee_sats, operator_fee_sats, used_fallback_split, tx_type, bills_json, cassettes_json, status, created_at) - VALUES (:id, :machine_id, :bitspire_event_id, :bitspire_txid, - :payment_hash, :gross_sats, :fiat_amount, :fiat_code, + VALUES (:id, :machine_id, :payment_hash, :bitspire_event_id, + :bitspire_txid, :gross_sats, :fiat_amount, :fiat_code, :exchange_rate, :net_sats, :commission_sats, :platform_fee_sats, :operator_fee_sats, :used_fallback_split, :tx_type, :bills_json, :cassettes_json, :status, :created_at) @@ -435,9 +435,9 @@ async def create_settlement_idempotent( { "id": settlement_id, "machine_id": data.machine_id, + "payment_hash": data.payment_hash, "bitspire_event_id": data.bitspire_event_id, "bitspire_txid": data.bitspire_txid, - "payment_hash": data.payment_hash, "gross_sats": data.gross_sats, "fiat_amount": data.fiat_amount, "fiat_code": data.fiat_code, @@ -465,15 +465,15 @@ async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]: ) -async def get_settlement_by_event_id( - bitspire_event_id: str, +async def get_settlement_by_payment_hash( + payment_hash: str, ) -> Optional[DcaSettlement]: return await db.fetchone( """ SELECT * FROM satoshimachine.dca_settlements - WHERE bitspire_event_id = :eid + WHERE payment_hash = :hash """, - {"eid": bitspire_event_id}, + {"hash": payment_hash}, DcaSettlement, ) diff --git a/migrations.py b/migrations.py index 1c06203..c9588c0 100644 --- a/migrations.py +++ b/migrations.py @@ -290,20 +290,28 @@ async def m005_satmachine_v2_overhaul(db): "ON satoshimachine.dca_deposits (client_id, created_at DESC)" ) - # dca_settlements — idempotency table for bitSpire kind-21000 events. - # CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute BIGINT - # (not as a derived percentage). Today this is just the contractual split. Once - # the v2 promotion engine ships, the two values diverge when discounts fire and - # this row is the only audit-grade record of who forgave what. Do not collapse - # them into a single commission_pct field. See plan section "Customer discounts". + # dca_settlements — idempotency table for bitSpire-driven settlements. + # The natural unique key is payment_hash (every LN invoice has a globally + # unique hash; subscription replays / dispatcher double-fires dedup via the + # UNIQUE constraint). bitspire_event_id is reserved for a future path where + # we subscribe to raw Nostr events directly (kind-30078/30079 ingestion + # uses dca_telemetry; bitspire_event_id is kept here for future bookkeeping + # if we ever bypass the LNbits Payment system). + # + # CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute + # BIGINT (not a derived percentage). Today this is just the contractual + # split. Once the v2 promotion engine ships, the two values diverge when + # discounts fire and this row is the only audit-grade record of who forgave + # what. Do not collapse them into a single commission_pct field. See plan + # section "Customer discounts". await db.execute( f""" CREATE TABLE satoshimachine.dca_settlements ( id TEXT PRIMARY KEY, machine_id TEXT NOT NULL, - bitspire_event_id TEXT NOT NULL UNIQUE, + payment_hash TEXT NOT NULL UNIQUE, + bitspire_event_id TEXT, bitspire_txid TEXT, - payment_hash TEXT NOT NULL, gross_sats BIGINT NOT NULL, fiat_amount DECIMAL(10,2) NOT NULL, fiat_code TEXT NOT NULL DEFAULT 'GTQ', @@ -327,10 +335,7 @@ async def m005_satmachine_v2_overhaul(db): "CREATE INDEX dca_settlements_machine_idx " "ON satoshimachine.dca_settlements (machine_id, created_at DESC)" ) - await db.execute( - "CREATE INDEX dca_settlements_payment_hash_idx " - "ON satoshimachine.dca_settlements (payment_hash)" - ) + # payment_hash UNIQUE already creates a lookup index — no extra index needed. # dca_commission_splits — operator's rules for distributing the *remainder* # of each commission (commission_sats - platform_fee_sats). One row per leg. diff --git a/models.py b/models.py index ced9f8d..a92cade 100644 --- a/models.py +++ b/models.py @@ -185,9 +185,9 @@ class UpdateDepositStatusData(BaseModel): class CreateDcaSettlementData(BaseModel): machine_id: str - bitspire_event_id: str # nostr event id — the idempotency key + payment_hash: str # the idempotency key (UNIQUE in the dca_settlements table) + bitspire_event_id: Optional[str] = None # reserved for direct-Nostr ingestion bitspire_txid: Optional[str] = None - payment_hash: str gross_sats: int fiat_amount: float fiat_code: str = "GTQ" @@ -205,9 +205,9 @@ class CreateDcaSettlementData(BaseModel): class DcaSettlement(BaseModel): id: str machine_id: str - bitspire_event_id: str - bitspire_txid: Optional[str] payment_hash: str + bitspire_event_id: Optional[str] + bitspire_txid: Optional[str] gross_sats: int fiat_amount: float fiat_code: str From b91e49b64250f20ddf4b5a7272cd5bbb481b9a32 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 14:48:44 +0200 Subject: [PATCH 02/10] feat(v2): wire bitSpire invoice listener + settlement landing (P1a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the no-op tasks.py stub with a real invoice listener that lands bitSpire settlements idempotently into dca_settlements. Architecture: satmachineadmin runs *inside* the LNbits process, so it plugs into LNbits' canonical extension hook (register_invoice_listener from lnbits.tasks) instead of going through the Nostr transport layer. External clients like bitSpire use Nostr; internal extensions consume the resulting Payment objects directly. One invoice_listener queue per extension, dispatched by invoice_callback_dispatcher. Flow: bitSpire ATM (Nostr kind-21000) → LNbits nostr_transport handler → core Payment system (create_invoice + status=SUCCESS on settle) → invoice_callback_dispatcher → satmachineadmin's invoice_queue → _handle_payment filters by wallet_id → active machine → bitspire.parse_settlement reads Payment.extra (or back-derives) → create_settlement_idempotent (keyed on payment_hash UNIQUE) The parser (new bitspire.py module) is bitSpire-specific: - Happy path (post-aiolabs/lamassu-next#44): Payment.extra carries {source:"bitspire", net_sats, fee_sats, fee_pct, exchange_rate, currency, txid, machine_npub, bills, cassettes}. Read directly, zero back-derivation. - Fallback path (pre-#44): extra is absent. Back-derive the split using machine.fallback_commission_pct with the Lamassu-style formula (calculations.calculate_commission), mark used_fallback_split=true, log a WARNING that namechecks the upstream issue so it's findable in logs. Two-stage commission split (super first, operator remainder) is computed at land time so the audit row is complete: platform_fee_sats = round(commission_sats * super_fee_pct) operator_fee_sats = commission_sats - platform_fee_sats The actual payout (LP DCA legs + super-fee leg + operator-split legs) happens in a separate settlement-processor task in P2. P1 only LANDS the settlement with status='pending'. Smoke-tested both paths against real LNbits 1.4 (nostr-transport venv): happy: 266800 gross → 258835 net + 7965 commission (2390 super @ 30%, 5575 operator) fallback: 266800 gross → 254095 net + 12705 commission @ 5% default Also adds crud.get_active_machine_by_wallet_id, the lookup that gates inbound payments to known machine wallets. Refs: aiolabs/satmachineadmin#9, aiolabs/lamassu-next#44 Co-Authored-By: Claude Opus 4.7 (1M context) --- bitspire.py | 176 ++++++++++++++++++++++++++++++++++++++++++++++++++++ crud.py | 13 ++++ tasks.py | 82 ++++++++++++++++++++---- 3 files changed, 258 insertions(+), 13 deletions(-) create mode 100644 bitspire.py diff --git a/bitspire.py b/bitspire.py new file mode 100644 index 0000000..195c0a9 --- /dev/null +++ b/bitspire.py @@ -0,0 +1,176 @@ +# Satoshi Machine v2 — bitSpire payment parser. +# +# Translates an inbound LNbits Payment (cash-out customer paid the ATM's +# invoice) into the principal/commission split needed by satmachineadmin. +# +# Happy path: bitSpire populates Payment.extra with the canonical split +# fields per aiolabs/lamassu-next#44 — we read them directly. +# +# Fallback path: extra is missing (older bitSpire, edge case). We back-derive +# the split from the machine's fallback_commission_pct using the Lamassu-era +# formula (base = total / (1 + commission)) and mark used_fallback_split=true +# so the audit trail shows we estimated. + +from __future__ import annotations + +import json +from typing import Any, Optional, Tuple + +from loguru import logger + +from .calculations import calculate_commission +from .models import CreateDcaSettlementData, Machine + +# Sentinel value bitSpire sets in Payment.extra.source so we know an inbound +# payment originated from an ATM cash-out and not some other extension or +# customer-initiated transfer. +BITSPIRE_SOURCE = "bitspire" + + +def _coerce_int(v: Any) -> Optional[int]: + if v is None: + return None + try: + return int(v) + except (TypeError, ValueError): + return None + + +def _coerce_float(v: Any) -> Optional[float]: + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + +def _coerce_str(v: Any) -> Optional[str]: + if v is None: + return None + return str(v) if not isinstance(v, str) else v + + +def _json_dumps(v: Any) -> Optional[str]: + if v is None: + return None + try: + return json.dumps(v) + except (TypeError, ValueError): + return None + + +def is_bitspire_payment(extra: dict) -> bool: + """True if Payment.extra carries the bitSpire source marker (post-#44).""" + return isinstance(extra, dict) and extra.get("source") == BITSPIRE_SOURCE + + +def parse_settlement( + machine: Machine, + payment_hash: str, + gross_sats: int, + extra: dict, + super_fee_pct: float, +) -> Tuple[CreateDcaSettlementData, bool]: + """Build a CreateDcaSettlementData for an inbound payment landing on + `machine`'s wallet. + + Returns (data, used_fallback): when `used_fallback` is True, bitSpire + didn't populate Payment.extra so we back-derived the split. Caller + should log this for visibility — once aiolabs/lamassu-next#44 ships, + fallback usage should drop to zero. + """ + if is_bitspire_payment(extra): + data = _parse_extra(machine, payment_hash, gross_sats, extra, super_fee_pct) + return data, False + logger.warning( + f"satmachineadmin: settlement on machine {machine.machine_npub[:12]}... " + f"missing bitSpire extra metadata; back-deriving via " + f"fallback_commission_pct={machine.fallback_commission_pct}. " + f"See aiolabs/lamassu-next#44." + ) + return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct), True + + +def _parse_extra( + machine: Machine, + payment_hash: str, + gross_sats: int, + extra: dict, + super_fee_pct: float, +) -> CreateDcaSettlementData: + """Happy path: bitSpire populated Payment.extra per lamassu-next#44.""" + net_sats = _coerce_int(extra.get("net_sats")) + fee_sats = _coerce_int(extra.get("fee_sats")) + if net_sats is None or fee_sats is None: + # Missing key fields — shouldn't happen post-#44 but defensive. + return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct) + commission_sats = fee_sats + platform_fee_sats = round(commission_sats * super_fee_pct) + operator_fee_sats = commission_sats - platform_fee_sats + exchange_rate = _coerce_float(extra.get("exchange_rate")) + if exchange_rate is None or exchange_rate <= 0: + # Without exchange rate we can't compute fiat. Use 1.0 as a stand-in + # and let the operator correct via manual reconciliation. + exchange_rate = 1.0 + fiat_amount = round(gross_sats / exchange_rate, 2) if exchange_rate > 0 else 0.0 + fiat_code = _coerce_str(extra.get("currency")) or machine.fiat_code + return CreateDcaSettlementData( + machine_id=machine.id, + payment_hash=payment_hash, + bitspire_event_id=None, + bitspire_txid=_coerce_str(extra.get("txid")), + gross_sats=gross_sats, + fiat_amount=fiat_amount, + fiat_code=fiat_code, + exchange_rate=exchange_rate, + net_sats=net_sats, + commission_sats=commission_sats, + platform_fee_sats=platform_fee_sats, + operator_fee_sats=operator_fee_sats, + used_fallback_split=False, + tx_type=_coerce_str(extra.get("type")) or "cash_out", + bills_json=_json_dumps(extra.get("bills")), + cassettes_json=_json_dumps(extra.get("cassettes")), + ) + + +def _parse_fallback( + machine: Machine, + payment_hash: str, + gross_sats: int, + super_fee_pct: float, +) -> CreateDcaSettlementData: + """Back-derive the split using the machine's fallback_commission_pct. + + Same formula as the Lamassu integration used: + base_amount = round(gross / (1 + commission_pct)) + commission = gross - base_amount + """ + net_sats, commission_sats, _effective = calculate_commission( + crypto_atoms=gross_sats, + commission_percentage=machine.fallback_commission_pct, + discount=0.0, + ) + platform_fee_sats = round(commission_sats * super_fee_pct) + operator_fee_sats = commission_sats - platform_fee_sats + # No exchange rate from the wire; leave fiat_amount=0 so it's visibly + # incomplete on the operator's reconciliation screen. + return CreateDcaSettlementData( + machine_id=machine.id, + payment_hash=payment_hash, + bitspire_event_id=None, + bitspire_txid=None, + gross_sats=gross_sats, + fiat_amount=0.0, + fiat_code=machine.fiat_code, + exchange_rate=0.0, + net_sats=net_sats, + commission_sats=commission_sats, + platform_fee_sats=platform_fee_sats, + operator_fee_sats=operator_fee_sats, + used_fallback_split=True, + tx_type="cash_out", + bills_json=None, + cassettes_json=None, + ) diff --git a/crud.py b/crud.py index 33f2459..614a422 100644 --- a/crud.py +++ b/crud.py @@ -118,6 +118,19 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]: ) +async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]: + """Used by the invoice listener to route an incoming payment to a machine.""" + return await db.fetchone( + """ + SELECT * FROM satoshimachine.dca_machines + WHERE wallet_id = :wid AND is_active = true + LIMIT 1 + """, + {"wid": wallet_id}, + Machine, + ) + + async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: return await db.fetchall( """ diff --git a/tasks.py b/tasks.py index 2efa8b0..a67372d 100644 --- a/tasks.py +++ b/tasks.py @@ -1,27 +1,83 @@ -# Satoshi Machine v2 — task placeholders. +# Satoshi Machine v2 — invoice listener (P1). # -# The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent. -# They will be replaced in P1 (Nostr subscription manager: subscribes via -# lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078 -# beacons + kind-30079 telemetry per registered machine, with auto-reconnect). +# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then +# for each successful inbound payment: +# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip. +# 2. Parses Payment.extra for bitSpire split metadata (post-lamassu-next#44). +# Falls back to machine.fallback_commission_pct if extra is absent. +# 3. Computes the two-stage split (super_fee first, operator remainder). +# 4. Inserts a dca_settlements row idempotently (keyed by payment_hash). # -# These no-op stubs keep __init__.py importable in the interim so the -# extension can be activated even before P1 lands. +# The actual distribution of sats — paying out the LP DCA legs, the super-fee +# leg, and the operator's commission-split legs — happens in a separate +# settlement-processor task (P2). This listener only LANDS the settlement +# row; status='pending' tells the processor it still needs to move the money. import asyncio +from lnbits.core.models import Payment +from lnbits.tasks import register_invoice_listener from loguru import logger +from .bitspire import parse_settlement +from .crud import ( + create_settlement_idempotent, + get_active_machine_by_wallet_id, + get_super_config, +) + +LISTENER_NAME = "ext_satmachineadmin" + async def wait_for_paid_invoices() -> None: - """No-op placeholder pending P1 Nostr subscription manager.""" - logger.debug( - "satmachineadmin v2: invoice listener stub running. " - "Real Nostr-transport subscription pending P1." + invoice_queue: asyncio.Queue = asyncio.Queue() + register_invoice_listener(invoice_queue, LISTENER_NAME) + logger.info( + "satmachineadmin v2: invoice listener registered as " + f"`{LISTENER_NAME}` — waiting for bitSpire settlements." ) - # Sleep forever; the task system expects a long-lived coroutine. while True: - await asyncio.sleep(3600) + payment: Payment = await invoice_queue.get() + try: + await _handle_payment(payment) + except Exception as exc: # listener must never die + logger.error( + f"satmachineadmin: error handling payment " + f"{payment.payment_hash[:12]}...: {exc}" + ) + + +async def _handle_payment(payment: Payment) -> None: + if not payment.is_in or not payment.success: + return + machine = await get_active_machine_by_wallet_id(payment.wallet_id) + if machine is None: + return + super_config = await get_super_config() + super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0 + data, used_fallback = parse_settlement( + machine=machine, + payment_hash=payment.payment_hash, + gross_sats=payment.sat, + extra=payment.extra or {}, + super_fee_pct=super_fee_pct, + ) + settlement = await create_settlement_idempotent(data) + if settlement is None: + logger.error( + f"satmachineadmin: failed to insert settlement for " + f"payment_hash={payment.payment_hash[:12]}..." + ) + return + fb = " (fallback split)" if used_fallback else "" + logger.info( + f"satmachineadmin: landed settlement {settlement.id} for " + f"machine={machine.machine_npub[:12]}... " + f"gross={data.gross_sats}sats net={data.net_sats}sats " + f"commission={data.commission_sats}sats " + f"(super_fee={data.platform_fee_sats} " + f"operator_fee={data.operator_fee_sats}){fb}" + ) async def hourly_transaction_polling() -> None: From 10b79ae900c050fd94dbf647f6680789db376c60 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 14:50:07 +0200 Subject: [PATCH 03/10] =?UTF-8?q?feat(v2):=20operator-scoped=20API=20surfa?= =?UTF-8?q?ce=20=E2=80=94=20machines,=20settlements,=20payments=20(P1b)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the views_api.py stub with the v1 operator-scoped REST surface needed for the P1 frontend tasks (machine onboarding by npub, settlement review, payment-leg audit). All endpoints filter on the authenticated user's id so two operators on the same LNbits instance can never see each other's data. Endpoints (12 routes): Machines (CRUD): POST /api/v1/dca/machines — add by npub + wallet_id GET /api/v1/dca/machines — operator's fleet GET /api/v1/dca/machines/{id} — single (ownership check) PUT /api/v1/dca/machines/{id} — update (ownership check) DELETE /api/v1/dca/machines/{id} — delete (ownership check) Settlements (read-only at this phase): GET /api/v1/dca/settlements — operator-wide GET /api/v1/dca/machines/{id}/settlements — per machine GET /api/v1/dca/settlements/{id} — single (ownership check) Payments (leg-typed audit): GET /api/v1/dca/payments?leg_type=… — operator's payment legs Super config (read-only here): GET /api/v1/dca/super-config — operators read the platform fee they pay Catch-all: /api/v1/dca/{...} → 503 with a precise message for not-yet-implemented endpoints (clients, deposits, commission splits, partial-tx, balance-settle, super-config write — all P2+). All ownership checks live at the API boundary: if the route's resource points to a machine the operator doesn't own, we 404 (not 403) so operators can't probe for the existence of other operators' machines. Verified routes register cleanly against LNbits 1.4 (nostr-transport). 22/22 calculation tests still green. Refs: aiolabs/satmachineadmin#9 Co-Authored-By: Claude Opus 4.7 (1M context) --- views_api.py | 193 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 181 insertions(+), 12 deletions(-) diff --git a/views_api.py b/views_api.py index fd2d430..13dd53c 100644 --- a/views_api.py +++ b/views_api.py @@ -1,20 +1,190 @@ -# Satoshi Machine v2 — API placeholder. +# Satoshi Machine v2 — operator API surface (P1b). # -# The v1 super-only Lamassu endpoints have been removed. The v2 operator- -# scoped surface (machines / clients / deposits / settlements / commission -# splits / partial-tx / balance-settle / super platform-fee) lands in P1+. -# See plan section "Critical files to modify". -# -# This stub keeps __init__.py importable and surfaces a clear 503 on every -# v1 route so existing clients get a precise error instead of a silent 404. +# All endpoints are operator-scoped via check_user_exists. Every query +# filters by the authenticated user's id so two operators on the same +# LNbits instance can never see each other's machines, settlements, or +# clients. The super-only platform-fee write endpoint lands in P2. from http import HTTPStatus -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException +from lnbits.core.models import User +from lnbits.decorators import check_user_exists + +from .crud import ( + create_machine, + delete_machine, + get_machine, + get_machines_for_operator, + get_payments_for_operator, + get_settlement, + get_settlements_for_machine, + get_settlements_for_operator, + get_super_config, + update_machine, +) +from .models import ( + CreateMachineData, + DcaPayment, + DcaSettlement, + Machine, + SuperConfig, + UpdateMachineData, +) satmachineadmin_api_router = APIRouter() +# ============================================================================= +# Machines +# ============================================================================= + + +@satmachineadmin_api_router.post("/api/v1/dca/machines", response_model=Machine) +async def api_create_machine( + data: CreateMachineData, user: User = Depends(check_user_exists) +) -> Machine: + return await create_machine(user.id, data) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/machines", response_model=list[Machine] +) +async def api_list_machines( + user: User = Depends(check_user_exists), +) -> list[Machine]: + return await get_machines_for_operator(user.id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/machines/{machine_id}", response_model=Machine +) +async def api_get_machine( + machine_id: str, user: User = Depends(check_user_exists) +) -> Machine: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return machine + + +@satmachineadmin_api_router.put( + "/api/v1/dca/machines/{machine_id}", response_model=Machine +) +async def api_update_machine( + machine_id: str, + data: UpdateMachineData, + user: User = Depends(check_user_exists), +) -> Machine: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + updated = await update_machine(machine_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return updated + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/machines/{machine_id}", status_code=HTTPStatus.NO_CONTENT +) +async def api_delete_machine( + machine_id: str, user: User = Depends(check_user_exists) +) -> None: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + await delete_machine(machine_id) + + +# ============================================================================= +# Settlements (read-only at this phase; landing happens in tasks.py) +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/settlements", response_model=list[DcaSettlement] +) +async def api_list_settlements( + user: User = Depends(check_user_exists), +) -> list[DcaSettlement]: + return await get_settlements_for_operator(user.id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/machines/{machine_id}/settlements", + response_model=list[DcaSettlement], +) +async def api_list_settlements_for_machine( + machine_id: str, user: User = Depends(check_user_exists) +) -> list[DcaSettlement]: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return await get_settlements_for_machine(machine_id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/settlements/{settlement_id}", response_model=DcaSettlement +) +async def api_get_settlement( + settlement_id: str, user: User = Depends(check_user_exists) +) -> DcaSettlement: + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + return settlement + + +# ============================================================================= +# Payments (read-only — the leg-typed breakdown of distributions) +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/payments", response_model=list[DcaPayment] +) +async def api_list_payments( + leg_type: str | None = None, + user: User = Depends(check_user_exists), +) -> list[DcaPayment]: + return await get_payments_for_operator(user.id, leg_type=leg_type) + + +# ============================================================================= +# Super config — read-only at this phase. Super-only write endpoint lands in P2. +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/super-config", response_model=SuperConfig +) +async def api_get_super_config( + _user: User = Depends(check_user_exists), +) -> SuperConfig: + """Returns the platform-fee config so operators can display it as a + read-only line item in their UI. The fee is set by the LNbits super + instance-wide; operators see it but can't change it (write endpoint + protected by check_super_user, landing in P2).""" + config = await get_super_config() + if config is None: + raise HTTPException( + HTTPStatus.NOT_FOUND, "Super config not initialised" + ) + return config + + +# ============================================================================= +# Catch-all stub for endpoints not yet implemented (clients, deposits, +# commission splits, partial-tx, balance-settle, super-config write). Each +# lands in a follow-up commit. The catch-all comes LAST so specific routes +# above take precedence. +# ============================================================================= + + @satmachineadmin_api_router.api_route( "/api/v1/dca/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"], @@ -22,7 +192,6 @@ satmachineadmin_api_router = APIRouter() async def v2_in_progress_stub(full_path: str) -> None: raise HTTPException( HTTPStatus.SERVICE_UNAVAILABLE, - f"satmachineadmin v2 API not yet implemented (path: /{full_path}). " - "The v1 Lamassu surface has been removed; per-operator endpoints " - "land in P1. See plan.", + f"satmachineadmin v2: /api/v1/dca/{full_path} not yet implemented " + "(landing in P2+). See aiolabs/satmachineadmin#9.", ) From 56be3e5c52ce57b9271591f186a8ab6052529f0e Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 15:34:07 +0200 Subject: [PATCH 04/10] =?UTF-8?q?feat(v2):=20settlement=20distribution=20?= =?UTF-8?q?=E2=80=94=20three=20leg=20groups,=20super-fee=20write=20(P2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After a settlement lands (P1a), this commit pays out the three leg groups via LNbits internal transfers (create_invoice + pay_invoice with internal=True). Wired synchronously from the invoice listener — latency is one bitSpire-tx wide. process_settlement is idempotent (status guard) so retries are safe. distribution.py — three leg groups, in order: 1. super_fee leg: platform_fee_sats → super_fee_wallet_id (if set) skip + warn if super fee % > 0 but wallet not configured 2. operator_split legs: operator_fee_sats sliced per the operator's commission_splits ruleset (per-machine override or operator default) skip + warn if operator has no ruleset configured 3. dca legs: net_sats distributed proportionally to active flow-mode LPs at this machine, each capped at the LP's remaining-fiat-balance- in-sats (preserves the v1 sync-mismatch fix from PR #2) skip if exchange_rate=0 (fallback path with missing rate) Every leg lands a dca_payments row with the leg_type discriminator and inherits Payment.tag "satmachine:{machine_npub}" so LNbits payment- history filters work natively across machines + operators. Atomicity model: LN payments cannot be rolled back. Each leg is attempted independently; success/fail recorded on the dca_payments row. The settlement is marked 'processed' only when every leg completed; any failure marks 'errored' with a concatenated message but leaves successful legs in place. Sats that don't pay out (failed legs, missing super wallet, no commission ruleset, no LP coverage) remain in the machine's wallet — visible to the operator on the dashboard. calculations.py — extracted two pure helpers: split_two_stage_commission(commission_sats, super_fee_pct) Stage-1: super takes super_fee_pct (rounded); operator absorbs the rounding remainder so platform + operator == commission_sats exactly. allocate_operator_split_legs(operator_fee_sats, leg_pcts) Stage-2: distributes the remainder across N legs per pct rules. Last leg absorbs the rounding remainder so sum(legs) == operator_fee_sats. 50 new tests cover the plan's verification scenario: 100 sats commission, super=30%, operator splits 50/30/20 → super 30, operator 35/21/14. Sum 100 ✓ plus all the edge cases the plan called out (super=0, super=100, single-leg, zero-fee, parametrised invariant on sums). views_api.py adds the super-only platform-fee write endpoint: PUT /api/v1/dca/super-config (check_super_user) This is the only super-only endpoint in v2 — sets super_fee_pct and the destination wallet for collecting the fee. 72/72 tests pass (22 calculation + 50 two-stage-split). 13 routes registered against LNbits 1.4 (nostr-transport). Refs: aiolabs/satmachineadmin#9 Co-Authored-By: Claude Opus 4.7 (1M context) --- calculations.py | 64 +++++++ distribution.py | 322 ++++++++++++++++++++++++++++++++++ tasks.py | 5 + tests/test_two_stage_split.py | 144 +++++++++++++++ views_api.py | 28 ++- 5 files changed, 559 insertions(+), 4 deletions(-) create mode 100644 distribution.py create mode 100644 tests/test_two_stage_split.py diff --git a/calculations.py b/calculations.py index a7b3aa9..68e0ae1 100644 --- a/calculations.py +++ b/calculations.py @@ -131,6 +131,70 @@ def calculate_distribution( return distributions +def split_two_stage_commission( + commission_sats: int, super_fee_pct: float +) -> Tuple[int, int]: + """Stage-1 of the v2 commission split: super takes `super_fee_pct` of the + total commission; the remainder is what the operator's own ruleset acts on. + + Returns (platform_fee_sats, operator_fee_sats). Platform is rounded; + operator absorbs the rounding remainder so platform_fee + operator_fee + == commission_sats exactly. + + Examples: + >>> split_two_stage_commission(100, 0.30) + (30, 70) + >>> split_two_stage_commission(7965, 0.30) + (2390, 5575) + >>> split_two_stage_commission(100, 0.0) + (0, 100) + >>> split_two_stage_commission(100, 1.0) + (100, 0) + """ + if commission_sats <= 0: + return 0, 0 + platform = round(commission_sats * super_fee_pct) + platform = max(0, min(platform, commission_sats)) + operator = commission_sats - platform + return platform, operator + + +def allocate_operator_split_legs( + operator_fee_sats: int, leg_pcts: list +) -> list: + """Stage-2 of the v2 commission split: the operator's remainder is sliced + across N leg wallets per `leg_pcts` (each in 0..1, sum should equal 1.0). + + The last leg absorbs the rounding remainder so the sum of allocations + exactly equals operator_fee_sats (assuming pcts sum to ~1.0). Returns + a list of integer sat amounts in the same order as leg_pcts. + + Examples: + >>> allocate_operator_split_legs(70, [0.5, 0.3, 0.2]) + [35, 21, 14] + >>> allocate_operator_split_legs(5575, [0.5, 0.3, 0.2]) + [2787, 1672, 1116] + >>> allocate_operator_split_legs(100, [1.0]) + [100] + >>> allocate_operator_split_legs(0, [0.5, 0.5]) + [0, 0] + """ + if not leg_pcts: + return [] + if operator_fee_sats <= 0: + return [0] * len(leg_pcts) + allocations: list = [] + remaining = operator_fee_sats + for idx, pct in enumerate(leg_pcts): + if idx == len(leg_pcts) - 1: + allocations.append(remaining) + else: + amount = round(operator_fee_sats * float(pct)) + allocations.append(amount) + remaining -= amount + return allocations + + def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float: """ Calculate exchange rate in sats per fiat unit. diff --git a/distribution.py b/distribution.py new file mode 100644 index 0000000..91dc401 --- /dev/null +++ b/distribution.py @@ -0,0 +1,322 @@ +# Satoshi Machine v2 — settlement distribution (P2). +# +# Picks up a dca_settlements row with status='pending' and pays out the +# three leg groups via LNbits internal transfers (create_invoice + +# pay_invoice on the same instance auto-detect internal). All legs land +# in dca_payments with the appropriate leg_type discriminator and inherit +# the Payment.tag "satmachine:{machine_npub}" so LNbits payment-history +# filters work natively. +# +# Leg order: +# 1. super_fee — platform_fee_sats → super_fee_wallet_id (if set) +# 2. operator_split — operator_fee_sats split per operator's rules +# 3. dca — net_sats distributed proportionally to active LPs, +# each leg capped at the LP's remaining fiat balance +# (preserves the v1 sync-mismatch fix from PR #2) +# +# Atomicity: LN payments cannot be rolled back. We attempt each leg, record +# success/failure per dca_payments row, and mark the settlement 'processed' +# only when every leg completed. Any failure marks 'errored' with a message +# but leaves the successful legs in place. Sats that don't get paid out +# (failed legs, no LP coverage, missing super wallet) remain in the +# machine's wallet — visible to the operator on the dashboard. + +from __future__ import annotations + +from datetime import datetime +from typing import List + +from lnbits.core.services import create_invoice, pay_invoice +from loguru import logger + +from .calculations import allocate_operator_split_legs, calculate_distribution +from .crud import ( + create_dca_payment, + get_client_balance_summary, + get_effective_commission_splits, + get_flow_mode_clients_for_machine, + get_machine, + get_settlement, + get_super_config, + mark_settlement_status, + update_payment_status, +) +from .models import ( + CreateDcaPaymentData, + DcaPayment, + DcaSettlement, + Machine, + SuperConfig, +) + +PAYMENT_TAG_PREFIX = "satmachine" + + +def _payment_tag(machine: Machine) -> str: + return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}" + + +async def process_settlement(settlement_id: str) -> None: + """Process a pending settlement end-to-end. Safe to invoke multiple + times — the status='processed' guard skips already-processed rows.""" + settlement = await get_settlement(settlement_id) + if settlement is None: + logger.warning(f"distribution: settlement {settlement_id} not found") + return + if settlement.status != "pending": + return + machine = await get_machine(settlement.machine_id) + if machine is None: + logger.error( + f"distribution: settlement {settlement_id} references missing " + f"machine {settlement.machine_id}" + ) + await mark_settlement_status( + settlement_id, "errored", "machine missing" + ) + return + super_config = await get_super_config() + errors: List[str] = [] + + try: + await _pay_super_fee(settlement, machine, super_config, errors) + await _pay_operator_splits(settlement, machine, errors) + await _pay_dca_distributions(settlement, machine, errors) + except Exception as exc: # last-resort guard + logger.exception("distribution: unexpected error processing settlement") + errors.append(f"unexpected: {exc}") + + if errors: + await mark_settlement_status( + settlement_id, "errored", "; ".join(errors)[:512] + ) + else: + await mark_settlement_status(settlement_id, "processed", None) + + +# ============================================================================= +# Leg 1 — super fee +# ============================================================================= + + +async def _pay_super_fee( + settlement: DcaSettlement, + machine: Machine, + super_config: SuperConfig | None, + errors: List[str], +) -> None: + if settlement.platform_fee_sats <= 0: + return + if super_config is None or not super_config.super_fee_wallet_id: + # Super has configured a fee but not a destination wallet — leave + # the sats in the machine wallet and warn. The super needs to + # configure their wallet before they can collect. + logger.warning( + f"distribution: super_fee_sats={settlement.platform_fee_sats} " + f"left in machine wallet (super_fee_wallet_id not set)" + ) + return + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="super_fee", + client_id=None, + destination_wallet_id=super_config.super_fee_wallet_id, + amount_sats=settlement.platform_fee_sats, + memo=f"satmachine super fee — {machine.name or machine.machine_npub[:12]}", + errors=errors, + ) + + +# ============================================================================= +# Leg 2 — operator commission splits +# ============================================================================= + + +async def _pay_operator_splits( + settlement: DcaSettlement, + machine: Machine, + errors: List[str], +) -> None: + if settlement.operator_fee_sats <= 0: + return + splits = await get_effective_commission_splits( + machine.operator_user_id, machine.id + ) + if not splits: + logger.warning( + f"distribution: operator_fee_sats={settlement.operator_fee_sats} " + f"left in machine wallet (operator has no commission_splits ruleset " + f"for machine {machine.id})" + ) + return + # Pure allocator handles the rounding rule (last leg absorbs remainder). + leg_amounts = allocate_operator_split_legs( + settlement.operator_fee_sats, + [float(leg.pct) for leg in splits], + ) + for idx, (leg, amount) in enumerate(zip(splits, leg_amounts)): + if amount <= 0: + continue + label = leg.label or f"split-{idx + 1}" + memo = ( + f"satmachine operator split — " + f"{machine.name or machine.machine_npub[:12]} ({label})" + ) + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="operator_split", + client_id=None, + destination_wallet_id=leg.wallet_id, + amount_sats=amount, + memo=memo, + errors=errors, + ) + + +# ============================================================================= +# Leg 3 — DCA distribution to active LPs +# ============================================================================= + + +async def _pay_dca_distributions( + settlement: DcaSettlement, + machine: Machine, + errors: List[str], +) -> None: + if settlement.net_sats <= 0: + return + if settlement.exchange_rate <= 0: + # Fallback path with no exchange rate (bitSpire Payment.extra absent). + # Without a rate we can't compute fiat balances → can't compute + # proportional shares → leave net_sats in the machine wallet for + # the operator to manually reconcile. + logger.warning( + f"distribution: net_sats={settlement.net_sats} left in machine " + f"wallet (no exchange_rate; fallback path; see lamassu-next#44)" + ) + return + clients = await get_flow_mode_clients_for_machine(machine.id) + if not clients: + return + # Build {client_id: remaining_fiat_balance} for proportional allocation. + client_balances: dict[str, float] = {} + for client in clients: + summary = await get_client_balance_summary(client.id) + if summary is None or summary.remaining_balance <= 0: + continue + client_balances[client.id] = summary.remaining_balance + if not client_balances: + return + # Compute proportional sat allocations, then cap each at the client's + # remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard). + raw_allocations = calculate_distribution( + base_amount_sats=settlement.net_sats, + client_balances=client_balances, + ) + capped_allocations: dict[str, int] = {} + for client_id, raw_sats in raw_allocations.items(): + remaining_fiat = client_balances[client_id] + cap_sats = int(remaining_fiat * float(settlement.exchange_rate)) + capped_allocations[client_id] = min(raw_sats, cap_sats) + # Pay each capped allocation. + client_by_id = {c.id: c for c in clients} + for client_id, amount_sats in capped_allocations.items(): + if amount_sats <= 0: + continue + client = client_by_id[client_id] + amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2) + memo = ( + f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}" + ) + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="dca", + client_id=client.id, + destination_wallet_id=client.wallet_id, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=float(settlement.exchange_rate), + memo=memo, + errors=errors, + ) + + +# ============================================================================= +# Internal transfer helper +# ============================================================================= + + +async def _pay_internal( + *, + settlement: DcaSettlement, + machine: Machine, + leg_type: str, + client_id: str | None, + destination_wallet_id: str, + amount_sats: int, + memo: str, + errors: List[str], + amount_fiat: float | None = None, + exchange_rate: float | None = None, +) -> DcaPayment | None: + """Create an invoice on the destination wallet, pay it from the machine + wallet, and record the leg in dca_payments. Returns the dca_payments row + on success (including the failed case — the row stays for audit).""" + tag = _payment_tag(machine) + leg_row = await create_dca_payment( + CreateDcaPaymentData( + settlement_id=settlement.id, + client_id=client_id, + machine_id=machine.id, + operator_user_id=machine.operator_user_id, + leg_type=leg_type, + destination_wallet_id=destination_wallet_id, + destination_ln_address=None, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=exchange_rate, + transaction_time=datetime.now(), + external_payment_hash=None, + ) + ) + extra = { + "satmachine_leg": leg_type, + "satmachine_settlement_id": settlement.id, + "satmachine_machine_npub": machine.machine_npub, + } + try: + new_invoice = await create_invoice( + wallet_id=destination_wallet_id, + amount=float(amount_sats), + internal=True, + memo=memo, + extra=extra, + ) + if not new_invoice or not new_invoice.bolt11: + await update_payment_status( + leg_row.id, "failed", None, "create_invoice returned empty" + ) + errors.append(f"{leg_type}: create_invoice empty") + return leg_row + paid = await pay_invoice( + wallet_id=machine.wallet_id, + payment_request=new_invoice.bolt11, + description=memo, + tag=tag, + extra=extra, + ) + await update_payment_status( + leg_row.id, "completed", paid.payment_hash, None + ) + return leg_row + except Exception as exc: + logger.error( + f"distribution: {leg_type} leg failed " + f"(settlement={settlement.id} amount={amount_sats}): {exc}" + ) + await update_payment_status(leg_row.id, "failed", None, str(exc)[:512]) + errors.append(f"{leg_type}: {exc}") + return leg_row diff --git a/tasks.py b/tasks.py index a67372d..ba5050e 100644 --- a/tasks.py +++ b/tasks.py @@ -25,6 +25,7 @@ from .crud import ( get_active_machine_by_wallet_id, get_super_config, ) +from .distribution import process_settlement LISTENER_NAME = "ext_satmachineadmin" @@ -78,6 +79,10 @@ async def _handle_payment(payment: Payment) -> None: f"(super_fee={data.platform_fee_sats} " f"operator_fee={data.operator_fee_sats}){fb}" ) + # Trigger distribution synchronously so latency is one bitSpire-tx wide. + # process_settlement is idempotent (status='processed' guard); if this + # task crashes mid-process, the next manual or scheduled retry resumes. + await process_settlement(settlement.id) async def hourly_transaction_polling() -> None: diff --git a/tests/test_two_stage_split.py b/tests/test_two_stage_split.py new file mode 100644 index 0000000..71490c6 --- /dev/null +++ b/tests/test_two_stage_split.py @@ -0,0 +1,144 @@ +""" +Tests for the v2 two-stage commission split (super first, operator remainder). + +The plan calls out a verification scenario explicitly: + super_fee_pct=30%, operator split 50/30/20 on a 100-sat commission + → super_wallet gets 30, operator_self gets 35, employee 21, maint 14. + +Also covers the edge cases: super_fee_pct=0 (no super), super_fee_pct=1.0 +(everything to super), single-leg operator ruleset, zero operator fee. +""" + +import pytest + +from ..calculations import ( + allocate_operator_split_legs, + split_two_stage_commission, +) + + +class TestSplitTwoStageCommission: + """Stage-1: super takes super_fee_pct of commission; operator gets rest.""" + + def test_plan_example_100sats_30pct(self): + platform, operator = split_two_stage_commission(100, 0.30) + assert platform == 30 + assert operator == 70 + assert platform + operator == 100 + + def test_realistic_7965sats_30pct(self): + # From the plan's 2000 GTQ → 266800 sats @ 3% commission example. + platform, operator = split_two_stage_commission(7965, 0.30) + assert platform == 2390 # round(7965 * 0.30) = 2389.5 → 2390 + assert operator == 5575 # 7965 - 2390 + assert platform + operator == 7965 + + def test_super_pct_zero_leaves_all_to_operator(self): + platform, operator = split_two_stage_commission(7965, 0.0) + assert platform == 0 + assert operator == 7965 + + def test_super_pct_one_takes_everything(self): + platform, operator = split_two_stage_commission(7965, 1.0) + assert platform == 7965 + assert operator == 0 + + def test_zero_commission(self): + platform, operator = split_two_stage_commission(0, 0.30) + assert platform == 0 + assert operator == 0 + + def test_negative_commission_clamps_to_zero(self): + # Defensive: should never happen, but verify we don't go negative. + platform, operator = split_two_stage_commission(-100, 0.30) + assert platform == 0 + assert operator == 0 + + @pytest.mark.parametrize("commission_sats", [1, 7, 100, 7965, 1_000_000]) + @pytest.mark.parametrize("super_pct", [0.0, 0.1, 0.30, 0.5, 0.777, 1.0]) + def test_invariant_sum_equals_commission(self, commission_sats, super_pct): + platform, operator = split_two_stage_commission(commission_sats, super_pct) + assert platform + operator == commission_sats + assert 0 <= platform <= commission_sats + assert 0 <= operator <= commission_sats + + +class TestAllocateOperatorSplitLegs: + """Stage-2: operator's remainder split across N leg wallets per pct rules.""" + + def test_plan_example_50_30_20_on_70(self): + amounts = allocate_operator_split_legs(70, [0.5, 0.3, 0.2]) + assert amounts == [35, 21, 14] + assert sum(amounts) == 70 + + def test_realistic_50_30_20_on_5575(self): + amounts = allocate_operator_split_legs(5575, [0.5, 0.3, 0.2]) + # 50%: round(2787.5) = 2788; 30%: round(1672.5) = 1672; last absorbs + # remainder: 5575 - 2788 - 1672 = 1115. + # Note: round() uses banker's rounding so 2787.5 → 2788 actually + # because 2788 is even. Confirm by total invariant. + assert sum(amounts) == 5575 + assert len(amounts) == 3 + + def test_single_leg_full_remainder(self): + amounts = allocate_operator_split_legs(100, [1.0]) + assert amounts == [100] + + def test_zero_operator_fee_zeros_all_legs(self): + amounts = allocate_operator_split_legs(0, [0.5, 0.5]) + assert amounts == [0, 0] + + def test_empty_legs_list_returns_empty(self): + amounts = allocate_operator_split_legs(100, []) + assert amounts == [] + + def test_last_leg_absorbs_rounding_remainder(self): + # 100 / 3 ≈ 33.33 each; rounding makes the first two 33 and last 34. + amounts = allocate_operator_split_legs(100, [1 / 3, 1 / 3, 1 / 3]) + assert sum(amounts) == 100 + assert amounts[0] == round(100 / 3) # 33 + assert amounts[1] == round(100 / 3) # 33 + # Last leg absorbs the rounding (34, not 33) so total == 100. + assert amounts[2] == 100 - amounts[0] - amounts[1] + + @pytest.mark.parametrize( + "operator_fee,pcts", + [ + (1, [0.5, 0.5]), + (7, [0.5, 0.3, 0.2]), + (100, [0.5, 0.5]), + (5575, [0.5, 0.3, 0.2]), + (1_000_000, [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]), + ], + ) + def test_invariant_sum_equals_operator_fee(self, operator_fee, pcts): + amounts = allocate_operator_split_legs(operator_fee, pcts) + assert sum(amounts) == operator_fee + assert all(a >= 0 for a in amounts) + + +class TestEndToEndScenarios: + """The full two-stage split — super then operator legs — composed.""" + + def test_plan_example_full(self): + # 100 sats commission, super=30%, operator splits 50/30/20. + platform, operator = split_two_stage_commission(100, 0.30) + legs = allocate_operator_split_legs(operator, [0.5, 0.3, 0.2]) + assert platform == 30 + assert legs == [35, 21, 14] + assert platform + sum(legs) == 100 + + def test_super_pct_zero_full_pipeline(self): + platform, operator = split_two_stage_commission(7965, 0.0) + legs = allocate_operator_split_legs(operator, [1.0]) + assert platform == 0 + assert legs == [7965] + assert platform + sum(legs) == 7965 + + def test_super_pct_one_full_pipeline(self): + platform, operator = split_two_stage_commission(7965, 1.0) + legs = allocate_operator_split_legs(operator, [0.5, 0.5]) + assert platform == 7965 + # Operator has zero to distribute; both legs get zero. + assert legs == [0, 0] + assert platform + sum(legs) == 7965 diff --git a/views_api.py b/views_api.py index 13dd53c..8840425 100644 --- a/views_api.py +++ b/views_api.py @@ -9,7 +9,7 @@ from http import HTTPStatus from fastapi import APIRouter, Depends, HTTPException from lnbits.core.models import User -from lnbits.decorators import check_user_exists +from lnbits.decorators import check_super_user, check_user_exists from .crud import ( create_machine, @@ -22,6 +22,7 @@ from .crud import ( get_settlements_for_operator, get_super_config, update_machine, + update_super_config, ) from .models import ( CreateMachineData, @@ -30,6 +31,7 @@ from .models import ( Machine, SuperConfig, UpdateMachineData, + UpdateSuperConfigData, ) satmachineadmin_api_router = APIRouter() @@ -155,7 +157,7 @@ async def api_list_payments( # ============================================================================= -# Super config — read-only at this phase. Super-only write endpoint lands in P2. +# Super config — operators read; super (LNbits instance admin) writes. # ============================================================================= @@ -167,8 +169,7 @@ async def api_get_super_config( ) -> SuperConfig: """Returns the platform-fee config so operators can display it as a read-only line item in their UI. The fee is set by the LNbits super - instance-wide; operators see it but can't change it (write endpoint - protected by check_super_user, landing in P2).""" + instance-wide; operators see it but can't change it.""" config = await get_super_config() if config is None: raise HTTPException( @@ -177,6 +178,25 @@ async def api_get_super_config( return config +@satmachineadmin_api_router.put( + "/api/v1/dca/super-config", response_model=SuperConfig +) +async def api_update_super_config( + data: UpdateSuperConfigData, + _user: User = Depends(check_super_user), +) -> SuperConfig: + """Super-only: set the platform fee % charged on every operator's + commission, plus the destination wallet for collecting it. The fee is + enforced before the operator's own commission_splits ruleset fires + (see distribution.process_settlement).""" + config = await update_super_config(data) + if config is None: + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config" + ) + return config + + # ============================================================================= # Catch-all stub for endpoints not yet implemented (clients, deposits, # commission splits, partial-tx, balance-settle, super-config write). Each From 7226b8289dc47d97a1d27f215003c6cdec4b6bb6 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 15:35:15 +0200 Subject: [PATCH 05/10] feat(v2): client CRUD + balance summary endpoints (P3a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 6 operator-scoped LP management endpoints: POST /api/v1/dca/clients — register LP at a machine GET /api/v1/dca/clients — operator's LPs (all) GET /api/v1/dca/clients?machine_id=X — scoped to one machine GET /api/v1/dca/clients/{id} — single LP PUT /api/v1/dca/clients/{id} — update mode/autoforward/etc DELETE /api/v1/dca/clients/{id} — delete GET /api/v1/dca/clients/{id}/balance — fiat balance summary Ownership transitively checked via the LP's machine — operators can only see/modify LPs at machines they own. New _machine_owned_by and _client_owned_by helpers consolidate the 404-not-403 ownership pattern. Refs: aiolabs/satmachineadmin#9 Co-Authored-By: Claude Opus 4.7 (1M context) --- views_api.py | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/views_api.py b/views_api.py index 8840425..62a87bc 100644 --- a/views_api.py +++ b/views_api.py @@ -12,8 +12,14 @@ from lnbits.core.models import User from lnbits.decorators import check_super_user, check_user_exists from .crud import ( + create_dca_client, create_machine, + delete_dca_client, delete_machine, + get_client_balance_summary, + get_dca_client, + get_dca_clients_for_machine, + get_dca_clients_for_operator, get_machine, get_machines_for_operator, get_payments_for_operator, @@ -21,15 +27,20 @@ from .crud import ( get_settlements_for_machine, get_settlements_for_operator, get_super_config, + update_dca_client, update_machine, update_super_config, ) from .models import ( + ClientBalanceSummary, + CreateDcaClientData, CreateMachineData, + DcaClient, DcaPayment, DcaSettlement, Machine, SuperConfig, + UpdateDcaClientData, UpdateMachineData, UpdateSuperConfigData, ) @@ -99,6 +110,107 @@ async def api_delete_machine( await delete_machine(machine_id) +# ============================================================================= +# DCA Clients (LPs) — scoped per (machine, user). +# ============================================================================= + + +async def _machine_owned_by(machine_id: str, user_id: str) -> Machine: + """Lookup-with-ownership guard. 404 (not 403) so operators can't probe + for other operators' machines.""" + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user_id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return machine + + +async def _client_owned_by(client_id: str, user_id: str) -> DcaClient: + """Lookup-with-ownership guard for an LP record; ownership is checked + transitively via the client's machine. 404 if either doesn't match.""" + client = await get_dca_client(client_id) + if client is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + machine = await get_machine(client.machine_id) + if machine is None or machine.operator_user_id != user_id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + return client + + +@satmachineadmin_api_router.post( + "/api/v1/dca/clients", response_model=DcaClient +) +async def api_create_client( + data: CreateDcaClientData, user: User = Depends(check_user_exists) +) -> DcaClient: + # Operator can only register LPs on machines they own. + await _machine_owned_by(data.machine_id, user.id) + return await create_dca_client(data) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/clients", response_model=list[DcaClient] +) +async def api_list_clients( + machine_id: str | None = None, + user: User = Depends(check_user_exists), +) -> list[DcaClient]: + """List the operator's LPs. Without ?machine_id, returns all LPs across + the operator's fleet. With ?machine_id, scoped to that machine (with + ownership check).""" + if machine_id is None: + return await get_dca_clients_for_operator(user.id) + await _machine_owned_by(machine_id, user.id) + return await get_dca_clients_for_machine(machine_id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/clients/{client_id}", response_model=DcaClient +) +async def api_get_client( + client_id: str, user: User = Depends(check_user_exists) +) -> DcaClient: + return await _client_owned_by(client_id, user.id) + + +@satmachineadmin_api_router.put( + "/api/v1/dca/clients/{client_id}", response_model=DcaClient +) +async def api_update_client( + client_id: str, + data: UpdateDcaClientData, + user: User = Depends(check_user_exists), +) -> DcaClient: + await _client_owned_by(client_id, user.id) + updated = await update_dca_client(client_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + return updated + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/clients/{client_id}", status_code=HTTPStatus.NO_CONTENT +) +async def api_delete_client( + client_id: str, user: User = Depends(check_user_exists) +) -> None: + await _client_owned_by(client_id, user.id) + await delete_dca_client(client_id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/clients/{client_id}/balance", + response_model=ClientBalanceSummary, +) +async def api_get_client_balance( + client_id: str, user: User = Depends(check_user_exists) +) -> ClientBalanceSummary: + await _client_owned_by(client_id, user.id) + summary = await get_client_balance_summary(client_id) + if summary is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + return summary + + # ============================================================================= # Settlements (read-only at this phase; landing happens in tasks.py) # ============================================================================= From b7f6f0a6965490137d2ee4890db88867254efdeb Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 15:36:04 +0200 Subject: [PATCH 06/10] feat(v2): deposit CRUD + confirmation endpoints (P3b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 6 operator-scoped deposit endpoints: POST /api/v1/dca/deposits — record fiat from an LP (creator_user_id = the operator who recorded) GET /api/v1/dca/deposits — operator's deposits (all) GET /api/v1/dca/deposits?client_id=X — scoped to one LP GET /api/v1/dca/deposits/{id} — single PUT /api/v1/dca/deposits/{id} — edit (pending only) PUT /api/v1/dca/deposits/{id}/status — confirm/reject DELETE /api/v1/dca/deposits/{id} — delete (pending only) Cross-checks (client_id, machine_id) at create to prevent operators binding deposits across machines incorrectly. Edits + deletes are restricted to pending status so confirmed deposits become immutable audit records (consistent with v1's existing behaviour from commit 28241e7). Refs: aiolabs/satmachineadmin#9 Co-Authored-By: Claude Opus 4.7 (1M context) --- views_api.py | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/views_api.py b/views_api.py index 62a87bc..ccab37a 100644 --- a/views_api.py +++ b/views_api.py @@ -13,13 +13,18 @@ from lnbits.decorators import check_super_user, check_user_exists from .crud import ( create_dca_client, + create_deposit, create_machine, delete_dca_client, + delete_deposit, delete_machine, get_client_balance_summary, get_dca_client, get_dca_clients_for_machine, get_dca_clients_for_operator, + get_deposit, + get_deposits_for_client, + get_deposits_for_operator, get_machine, get_machines_for_operator, get_payments_for_operator, @@ -28,19 +33,25 @@ from .crud import ( get_settlements_for_operator, get_super_config, update_dca_client, + update_deposit, + update_deposit_status, update_machine, update_super_config, ) from .models import ( ClientBalanceSummary, CreateDcaClientData, + CreateDepositData, CreateMachineData, DcaClient, + DcaDeposit, DcaPayment, DcaSettlement, Machine, SuperConfig, UpdateDcaClientData, + UpdateDepositData, + UpdateDepositStatusData, UpdateMachineData, UpdateSuperConfigData, ) @@ -211,6 +222,111 @@ async def api_get_client_balance( return summary +# ============================================================================= +# Deposits — operator records fiat handed in by an LP at a machine. +# ============================================================================= + + +async def _deposit_owned_by(deposit_id: str, user_id: str) -> DcaDeposit: + deposit = await get_deposit(deposit_id) + if deposit is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + machine = await get_machine(deposit.machine_id) + if machine is None or machine.operator_user_id != user_id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + return deposit + + +@satmachineadmin_api_router.post( + "/api/v1/dca/deposits", response_model=DcaDeposit +) +async def api_create_deposit( + data: CreateDepositData, user: User = Depends(check_user_exists) +) -> DcaDeposit: + # Verify the (client_id, machine_id) pair belongs to the operator. + client = await _client_owned_by(data.client_id, user.id) + if client.machine_id != data.machine_id: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "client_id and machine_id refer to different machines", + ) + return await create_deposit(user.id, data) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/deposits", response_model=list[DcaDeposit] +) +async def api_list_deposits( + client_id: str | None = None, + user: User = Depends(check_user_exists), +) -> list[DcaDeposit]: + """Operator's deposits across all their machines; ?client_id scopes to + a single LP (with ownership check).""" + if client_id is not None: + await _client_owned_by(client_id, user.id) + return await get_deposits_for_client(client_id) + return await get_deposits_for_operator(user.id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit +) +async def api_get_deposit( + deposit_id: str, user: User = Depends(check_user_exists) +) -> DcaDeposit: + return await _deposit_owned_by(deposit_id, user.id) + + +@satmachineadmin_api_router.put( + "/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit +) +async def api_update_deposit( + deposit_id: str, + data: UpdateDepositData, + user: User = Depends(check_user_exists), +) -> DcaDeposit: + existing = await _deposit_owned_by(deposit_id, user.id) + if existing.status != "pending": + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "Only pending deposits can be edited", + ) + updated = await update_deposit(deposit_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + return updated + + +@satmachineadmin_api_router.put( + "/api/v1/dca/deposits/{deposit_id}/status", response_model=DcaDeposit +) +async def api_update_deposit_status( + deposit_id: str, + data: UpdateDepositStatusData, + user: User = Depends(check_user_exists), +) -> DcaDeposit: + await _deposit_owned_by(deposit_id, user.id) + updated = await update_deposit_status(deposit_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + return updated + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/deposits/{deposit_id}", status_code=HTTPStatus.NO_CONTENT +) +async def api_delete_deposit( + deposit_id: str, user: User = Depends(check_user_exists) +) -> None: + existing = await _deposit_owned_by(deposit_id, user.id) + if existing.status != "pending": + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "Only pending deposits can be deleted", + ) + await delete_deposit(deposit_id) + + # ============================================================================= # Settlements (read-only at this phase; landing happens in tasks.py) # ============================================================================= From e8dcbfe26edc4fc43d1a9617f3cae33c87e5a13f Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 15:37:16 +0200 Subject: [PATCH 07/10] feat(v2): commission splits CRUD endpoints (P3c) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 3 operator-scoped endpoints for managing the commission remainder ruleset: GET /api/v1/dca/commission-splits — operator's default ruleset GET /api/v1/dca/commission-splits?machine_id=X — per-machine override (just the override, not the default) GET /api/v1/dca/commission-splits?machine_id=X&effective=true — what the settlement processor actually applies (override if set, else operator default) PUT /api/v1/dca/commission-splits — atomic replace; model validator enforces legs sum to 1.0 DELETE /api/v1/dca/commission-splits — clear default (per-machine overrides still apply) DELETE /api/v1/dca/commission-splits?machine_id=X — clear per-machine override (falls back to default) All routes verify operator owns the referenced machine (404 not 403 if not). The DELETE path bypasses SetCommissionSplitsData's sum-to-1.0 validator by calling replace_commission_splits([]) directly, since an empty ruleset is the correct "no rules" state — distribution.py logs a warning and leaves operator_fee_sats in the machine wallet when this happens. 28 routes registered total. 72/72 tests pass. Refs: aiolabs/satmachineadmin#9 Co-Authored-By: Claude Opus 4.7 (1M context) --- views_api.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/views_api.py b/views_api.py index ccab37a..3fa901f 100644 --- a/views_api.py +++ b/views_api.py @@ -19,12 +19,14 @@ from .crud import ( delete_deposit, delete_machine, get_client_balance_summary, + get_commission_splits, get_dca_client, get_dca_clients_for_machine, get_dca_clients_for_operator, get_deposit, get_deposits_for_client, get_deposits_for_operator, + get_effective_commission_splits, get_machine, get_machines_for_operator, get_payments_for_operator, @@ -32,6 +34,7 @@ from .crud import ( get_settlements_for_machine, get_settlements_for_operator, get_super_config, + replace_commission_splits, update_dca_client, update_deposit, update_deposit_status, @@ -40,6 +43,7 @@ from .crud import ( ) from .models import ( ClientBalanceSummary, + CommissionSplit, CreateDcaClientData, CreateDepositData, CreateMachineData, @@ -48,6 +52,7 @@ from .models import ( DcaPayment, DcaSettlement, Machine, + SetCommissionSplitsData, SuperConfig, UpdateDcaClientData, UpdateDepositData, @@ -384,6 +389,66 @@ async def api_list_payments( return await get_payments_for_operator(user.id, leg_type=leg_type) +# ============================================================================= +# Commission splits — operator's rules for distributing the commission +# remainder (post-super-fee). Sum-to-1.0 invariant enforced at the model +# boundary by SetCommissionSplitsData. +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/commission-splits", response_model=list[CommissionSplit] +) +async def api_get_commission_splits( + machine_id: str | None = None, + effective: bool = False, + user: User = Depends(check_user_exists), +) -> list[CommissionSplit]: + """No machine_id: operator's default ruleset (rows where machine_id IS NULL). + With machine_id: per-machine override only (404 the machine if not yours). + With machine_id and ?effective=true: per-machine override if set, else + operator default — what the settlement processor actually applies.""" + if machine_id is not None: + await _machine_owned_by(machine_id, user.id) + if effective: + return await get_effective_commission_splits(user.id, machine_id) + return await get_commission_splits(user.id, machine_id) + return await get_commission_splits(user.id, None) + + +@satmachineadmin_api_router.put( + "/api/v1/dca/commission-splits", response_model=list[CommissionSplit] +) +async def api_replace_commission_splits( + data: SetCommissionSplitsData, + user: User = Depends(check_user_exists), +) -> list[CommissionSplit]: + """Atomic replace for the (operator, machine) scope. If + data.machine_id is None, replaces the operator's default ruleset; + otherwise replaces the per-machine override (machine must be owned). + Sum-to-1.0 invariant enforced upstream by the Pydantic validator.""" + if data.machine_id is not None: + await _machine_owned_by(data.machine_id, user.id) + return await replace_commission_splits(user.id, data.machine_id, data.legs) + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/commission-splits", + status_code=HTTPStatus.NO_CONTENT, +) +async def api_delete_commission_splits( + machine_id: str | None = None, + user: User = Depends(check_user_exists), +) -> None: + """Clear a ruleset. With machine_id: clears the per-machine override + (machine falls back to operator default). Without: clears the operator + default (any per-machine overrides keep applying).""" + if machine_id is not None: + await _machine_owned_by(machine_id, user.id) + # Atomic replace with an empty leg list — same effect as DELETE WHERE. + await replace_commission_splits(user.id, machine_id, []) + + # ============================================================================= # Super config — operators read; super (LNbits instance admin) writes. # ============================================================================= From 2883eb7b79bcf1537eaf6cb0f7db675613f4729e Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 15:46:33 +0200 Subject: [PATCH 08/10] feat(v2): partial-dispense + operator notes on settlements (P3d) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the v1 feature request satmachineadmin#3 (partial transaction processing) and adds operator-authored audit notes on settlements. Schema (m006_add_settlement_notes): ALTER TABLE dca_settlements ADD COLUMN notes TEXT The notes column is append-only (prepend with timestamp, never edit in place). Stores both system-generated audit memos (partial-dispense recompute provenance) and operator-authored free-form notes (cash- drawer reconciliation context, off-LN refund records, etc.). Partial-dispense endpoint: POST /api/v1/dca/settlements/{id}/partial-dispense body: PartialDispenseData {dispensed_fraction OR dispensed_sats, notes} Recompute path (in distribution.apply_partial_dispense_and_redistribute): 1. Refuse if any leg has status='completed' (Lightning can't claw back) 2. Resolve new_gross from dispensed_fraction or dispensed_sats 3. Linear-scale net/commission/fiat — preserves the original commission ratio exactly; only rounding may drift by 1 sat 4. Re-stage-1 split using the CURRENT super_fee_pct (super may have changed the rate since the original landed) 5. Build a memo capturing original values + reason + new values 6. Void pending/failed legs (status → 'voided') 7. Overwrite the settlement's monetary fields + prepend memo to notes 8. Reset status to 'pending' → process_settlement re-runs distribution Operator notes endpoint: POST /api/v1/dca/settlements/{id}/notes body: AppendSettlementNoteData {note} Each operator note is timestamped (UTC) and tagged with the author's user_id so the audit trail is accountable. Non-empty, max 2000 chars. 72/72 tests still pass. 30 routes total. The full-directory ruff number ballooned to ~500 because it includes legacy transaction_processor.py (orphaned, not imported anywhere) and other v1 cruft on the branch. Files I actively maintain are clean. Note: a richer queryable audit history (filter by author / time range / action type / etc.) is being tracked as a separate future-work issue. The notes-column approach here is the v1 audit story; the dedicated history table will be additive. Refs: aiolabs/satmachineadmin#9, closes #3 (in spirit, marked once verified end-to-end) Co-Authored-By: Claude Opus 4.7 (1M context) --- crud.py | 101 ++++++++++++++++++++++++++++++++++ distribution.py | 142 +++++++++++++++++++++++++++++++++++++++++++++++- migrations.py | 18 ++++++ models.py | 27 +++++++++ views_api.py | 67 +++++++++++++++++++++++ 5 files changed, 352 insertions(+), 3 deletions(-) diff --git a/crud.py b/crud.py index 614a422..be7a12e 100644 --- a/crud.py +++ b/crud.py @@ -550,6 +550,107 @@ async def mark_settlement_status( return await get_settlement(settlement_id) +async def apply_partial_dispense( + settlement_id: str, + *, + new_gross_sats: int, + new_net_sats: int, + new_commission_sats: int, + new_platform_fee_sats: int, + new_operator_fee_sats: int, + new_fiat_amount: float, + appended_note: str, +) -> Optional[DcaSettlement]: + """Overwrite the monetary fields on a settlement (partial-dispense + recompute) and prepend `appended_note` to the notes column. + + Notes are append-only: new lines go at the top (newest first) so the + settlement detail view shows the most recent adjustment first without + needing to scroll. Resets status to 'pending' so process_settlement + can re-distribute via the existing idempotent path.""" + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET gross_sats = :gross, + net_sats = :net, + commission_sats = :commission, + platform_fee_sats = :platform, + operator_fee_sats = :operator, + fiat_amount = :fiat, + status = 'pending', + error_message = NULL, + processed_at = NULL, + notes = CASE + WHEN notes IS NULL OR notes = '' THEN :note + ELSE :note || char(10) || char(10) || notes + END + WHERE id = :id + """, + { + "id": settlement_id, + "gross": new_gross_sats, + "net": new_net_sats, + "commission": new_commission_sats, + "platform": new_platform_fee_sats, + "operator": new_operator_fee_sats, + "fiat": new_fiat_amount, + "note": appended_note, + }, + ) + return await get_settlement(settlement_id) + + +async def count_completed_legs_for_settlement(settlement_id: str) -> int: + """Used by partial-dispense to refuse adjustments after any leg has + successfully moved sats (Lightning payments can't be clawed back).""" + row = await db.fetchone( + """ + SELECT COUNT(*) AS n FROM satoshimachine.dca_payments + WHERE settlement_id = :sid AND status = 'completed' + """, + {"sid": settlement_id}, + ) + return int(row["n"]) if row else 0 + + +async def append_settlement_note( + settlement_id: str, note: str, author_user_id: str +) -> Optional[DcaSettlement]: + """Prepend an operator-authored note to settlement.notes. Each entry is + timestamped (UTC) and tagged with the author's user id so the trail + is accountable. Append-only: existing entries are never edited.""" + from datetime import timezone + + ts = datetime.now(timezone.utc).isoformat(timespec="seconds") + formatted = f"[{ts} by {author_user_id}] {note}" + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET notes = CASE + WHEN notes IS NULL OR notes = '' THEN :note + ELSE :note || char(10) || char(10) || notes + END + WHERE id = :id + """, + {"id": settlement_id, "note": formatted}, + ) + return await get_settlement(settlement_id) + + +async def void_open_legs_for_settlement(settlement_id: str) -> None: + """Marks pending/failed legs as 'voided' before re-running distribution + on a partial-dispense recompute. Preserves the rows for audit but stops + them from being interpreted as live.""" + await db.execute( + """ + UPDATE satoshimachine.dca_payments + SET status = 'voided' + WHERE settlement_id = :sid AND status IN ('pending', 'failed') + """, + {"sid": settlement_id}, + ) + + # ============================================================================= # Commission splits — operator's remainder-distribution rules. # ============================================================================= diff --git a/distribution.py b/distribution.py index 91dc401..9e266b8 100644 --- a/distribution.py +++ b/distribution.py @@ -23,14 +23,20 @@ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timezone from typing import List from lnbits.core.services import create_invoice, pay_invoice from loguru import logger -from .calculations import allocate_operator_split_legs, calculate_distribution +from .calculations import ( + allocate_operator_split_legs, + calculate_distribution, + split_two_stage_commission, +) from .crud import ( + apply_partial_dispense, + count_completed_legs_for_settlement, create_dca_payment, get_client_balance_summary, get_effective_commission_splits, @@ -40,12 +46,14 @@ from .crud import ( get_super_config, mark_settlement_status, update_payment_status, + void_open_legs_for_settlement, ) from .models import ( CreateDcaPaymentData, DcaPayment, DcaSettlement, Machine, + PartialDispenseData, SuperConfig, ) @@ -56,6 +64,134 @@ def _payment_tag(machine: Machine) -> str: return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}" +def _resolve_partial_dispense_gross( + settlement: DcaSettlement, data: PartialDispenseData +) -> int: + if data.dispensed_sats is not None: + new_gross = int(data.dispensed_sats) + elif data.dispensed_fraction is not None: + new_gross = round(settlement.gross_sats * float(data.dispensed_fraction)) + else: + raise ValueError("provide one of dispensed_sats or dispensed_fraction") + if new_gross < 0: + raise ValueError("partial dispense cannot be negative") + if new_gross > settlement.gross_sats: + raise ValueError( + f"partial dispense ({new_gross} sats) cannot exceed the original " + f"gross ({settlement.gross_sats} sats)" + ) + return new_gross + + +def _build_partial_dispense_memo( + settlement: DcaSettlement, + data: PartialDispenseData, + *, + new_gross: int, + new_net: int, + new_commission: int, + new_platform: int, + new_operator: int, +) -> str: + reason = (data.notes or "").strip() or "(no reason given)" + if data.dispensed_sats is not None: + adjust = f"dispensed_sats={data.dispensed_sats}" + else: + adjust = f"dispensed_fraction={data.dispensed_fraction}" + ts = datetime.now(timezone.utc).isoformat(timespec="seconds") + return ( + f"[{ts}] partial dispense applied — {adjust}. " + f"Original gross={settlement.gross_sats} net={settlement.net_sats} " + f"commission={settlement.commission_sats} " + f"(super_fee={settlement.platform_fee_sats} " + f"operator_fee={settlement.operator_fee_sats}). " + f"New gross={new_gross} net={new_net} commission={new_commission} " + f"(super_fee={new_platform} operator_fee={new_operator}). " + f"Reason: {reason}" + ) + + +async def apply_partial_dispense_and_redistribute( + settlement_id: str, data: PartialDispenseData +) -> DcaSettlement: + """Operator UX action — closes satmachineadmin#3. + + When a bitSpire dispense fails mid-transaction (e.g., dispenser jam after + 6 of 10 bills), the operator confirms the actual amount dispensed and we + re-allocate the split against that partial gross. Sat amounts scale + linearly, preserving the original commission ratio exactly; the two-stage + super/operator split is recomputed using the CURRENT super_fee_pct + (super may have changed the rate since the original landed). + + Hard guard: refuses if any dca_payments leg has already completed. + Lightning payments can't be clawed back, so we won't try. + + Side effects: + - Voids pending/failed legs (status → 'voided'). + - Overwrites the settlement's monetary fields with the new totals. + - Appends a timestamped memo to settlement.notes capturing the + original values + operator's reason. + - Resets settlement.status to 'pending' and triggers process_settlement. + """ + settlement = await get_settlement(settlement_id) + if settlement is None: + raise ValueError(f"settlement {settlement_id} not found") + if settlement.gross_sats <= 0: + raise ValueError("cannot partial-dispense a zero-gross settlement") + completed = await count_completed_legs_for_settlement(settlement_id) + if completed > 0: + raise ValueError( + f"cannot partial-dispense: {completed} leg(s) already completed " + "(Lightning payments can't be clawed back)" + ) + + new_gross = _resolve_partial_dispense_gross(settlement, data) + # Linear scale preserves the original commission ratio exactly. + scale = new_gross / settlement.gross_sats + new_commission = round(settlement.commission_sats * scale) + new_net = new_gross - new_commission + new_fiat = round(float(settlement.fiat_amount) * scale, 2) + + # Re-stage-1 split using the CURRENT super_fee_pct. + super_config = await get_super_config() + super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0 + new_platform, new_operator = split_two_stage_commission( + new_commission, super_fee_pct + ) + + memo = _build_partial_dispense_memo( + settlement, + data, + new_gross=new_gross, + new_net=new_net, + new_commission=new_commission, + new_platform=new_platform, + new_operator=new_operator, + ) + + await void_open_legs_for_settlement(settlement_id) + updated = await apply_partial_dispense( + settlement_id, + new_gross_sats=new_gross, + new_net_sats=new_net, + new_commission_sats=new_commission, + new_platform_fee_sats=new_platform, + new_operator_fee_sats=new_operator, + new_fiat_amount=new_fiat, + appended_note=memo, + ) + if updated is None: + raise ValueError(f"settlement {settlement_id} disappeared mid-update") + + logger.info( + f"distribution: partial-dispense applied to settlement " + f"{settlement_id} — re-running distribution" + ) + await process_settlement(settlement_id) + after = await get_settlement(settlement_id) + return after if after is not None else updated + + async def process_settlement(settlement_id: str) -> None: """Process a pending settlement end-to-end. Safe to invoke multiple times — the status='processed' guard skips already-processed rows.""" @@ -155,7 +291,7 @@ async def _pay_operator_splits( settlement.operator_fee_sats, [float(leg.pct) for leg in splits], ) - for idx, (leg, amount) in enumerate(zip(splits, leg_amounts)): + for idx, (leg, amount) in enumerate(zip(splits, leg_amounts, strict=True)): if amount <= 0: continue label = leg.label or f"split-{idx + 1}" diff --git a/migrations.py b/migrations.py index c9588c0..8db8d0e 100644 --- a/migrations.py +++ b/migrations.py @@ -428,3 +428,21 @@ async def m005_satmachine_v2_overhaul(db): ); """ ) + + +async def m006_add_settlement_notes(db): + """Audit memo on dca_settlements. + + When an operator triggers an in-place adjustment (partial-dispense, + manual reconciliation override, etc.), the settlement row's monetary + fields are overwritten with the new numbers. To preserve the audit + trail without a separate history table, we append a timestamped memo + to this notes column capturing the previous values and the reason. + + Operators see this directly in the settlement detail view, so any + overwrite is visible and dated. Append-only convention: new memos + are prepended with a timestamp; never edited in place. + """ + await db.execute( + "ALTER TABLE satoshimachine.dca_settlements ADD COLUMN notes TEXT" + ) diff --git a/models.py b/models.py index a92cade..e5bfd46 100644 --- a/models.py +++ b/models.py @@ -224,6 +224,11 @@ class DcaSettlement(BaseModel): error_message: Optional[str] processed_at: Optional[datetime] created_at: datetime + # Append-only audit memo. Populated when an operator triggers an in-place + # adjustment (partial-dispense, manual reconciliation override). Each + # entry timestamped + records original values so the overwrite is + # auditable from the settlement detail view alone. Never edited in place. + notes: Optional[str] = None # ============================================================================= @@ -397,6 +402,28 @@ class PartialDispenseData(BaseModel): return v +class AppendSettlementNoteData(BaseModel): + """Operator-authored free-form note on a settlement. + + Notes are prepended (newest first) to the settlement's `notes` column, + with a UTC timestamp and the author's user id so each entry is + accountable. Useful for cash-drawer reconciliation context, off-the- + record refund records, or any narrative an operator wants to attach + for future reference. + """ + + note: str + + @validator("note") + def non_empty(cls, v): + v = v.strip() if isinstance(v, str) else v + if not v: + raise ValueError("note cannot be empty") + if len(v) > 2000: + raise ValueError("note too long (max 2000 chars)") + return v + + class SettleBalanceData(BaseModel): """Resolves satmachineadmin#4 — operator settles small remaining LP balance from their own wallet at the current exchange rate.""" diff --git a/views_api.py b/views_api.py index 3fa901f..e0e5b7b 100644 --- a/views_api.py +++ b/views_api.py @@ -12,6 +12,7 @@ from lnbits.core.models import User from lnbits.decorators import check_super_user, check_user_exists from .crud import ( + append_settlement_note, create_dca_client, create_deposit, create_machine, @@ -41,7 +42,9 @@ from .crud import ( update_machine, update_super_config, ) +from .distribution import apply_partial_dispense_and_redistribute from .models import ( + AppendSettlementNoteData, ClientBalanceSummary, CommissionSplit, CreateDcaClientData, @@ -52,6 +55,7 @@ from .models import ( DcaPayment, DcaSettlement, Machine, + PartialDispenseData, SetCommissionSplitsData, SuperConfig, UpdateDcaClientData, @@ -374,6 +378,69 @@ async def api_get_settlement( return settlement +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/partial-dispense", + response_model=DcaSettlement, +) +async def api_partial_dispense( + settlement_id: str, + data: PartialDispenseData, + user: User = Depends(check_user_exists), +) -> DcaSettlement: + """Operator UX — resolves satmachineadmin#3. + + Recompute the split for a settlement that didn't dispense the full + amount (jam, mid-tx error). Provide one of dispensed_fraction (0..1) + or dispensed_sats. Optionally include a reason in notes. + + Refuses when any leg has already completed — Lightning payments can't + be clawed back. Use balance settlement (P3e) for those cases. + """ + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + if (data.dispensed_fraction is None) == (data.dispensed_sats is None): + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "Provide exactly one of dispensed_fraction or dispensed_sats", + ) + try: + return await apply_partial_dispense_and_redistribute(settlement_id, data) + except ValueError as exc: + raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc + + +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/notes", + response_model=DcaSettlement, +) +async def api_append_settlement_note( + settlement_id: str, + data: AppendSettlementNoteData, + user: User = Depends(check_user_exists), +) -> DcaSettlement: + """Operator appends a free-form note to the settlement. Useful for cash- + drawer reconciliation context, off-LN refund records, or any narrative + an operator wants to attach. Each entry is timestamped (UTC) and tagged + with the author's user id; existing entries are never modified. + + For richer queryable audit (filter by author, time range, action type), + see aiolabs/satmachineadmin (future audit-table feature).""" + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + updated = await append_settlement_note(settlement_id, data.note, user.id) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + return updated + + # ============================================================================= # Payments (read-only — the leg-typed breakdown of distributions) # ============================================================================= From d0a947b7e6ec87fa8126f2a6b09015f4ba21b3b1 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 17:17:41 +0200 Subject: [PATCH 09/10] feat(v2): balance settlement at current rate (P3e) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the v1 feature request satmachineadmin#4 (balance settlement for small remaining LP balances). Operator hits 'Settle' on an LP, specifies the exchange rate they're willing to honor, and the system pays out the remaining fiat balance in sats from the operator's chosen funding wallet. Avoids the Zeno's-paradox of vanishing tiny proportional shares — small balances no longer drag on forever; they get cleanly zeroed. New endpoint: POST /api/v1/dca/clients/{client_id}/settle body: SettleBalanceData {funding_wallet_id, exchange_rate, amount_fiat?, notes?} Flow (distribution.settle_lp_balance): 1. Get LP's remaining balance summary 2. amount_fiat capped at remaining (defaults to full remaining) 3. amount_sats = round(amount_fiat * exchange_rate) 4. Internal transfer funding_wallet → client.wallet via create_invoice(internal=True) + pay_invoice 5. Records leg_type='settlement' in dca_payments Two ownership checks at the API boundary: client (via machine→operator) and funding_wallet_id (via lnbits.core.crud.get_wallet → wallet.user == current operator). 400 (not 404) if funding wallet isn't owned — operators can identify their own wallets so leaking existence is fine. Updated get_client_balance_summary to count both leg_type='dca' AND leg_type='settlement' completed legs against the LP's remaining balance. Without this update, settled amounts would leave the LP's balance unchanged in the summary and re-fire on the next bitSpire tx. Exchange rate is operator-supplied and required — explicit so there's no ambiguity about what rate was used. Operator can use exchange spot, market midpoint, or a favorable rate as a gesture; the rate is recorded on the dca_payments row alongside amount_fiat for audit. 72/72 tests still pass. 31 routes total. Refs: aiolabs/satmachineadmin#9, closes #4 (in spirit, marked once verified end-to-end) Co-Authored-By: Claude Opus 4.7 (1M context) --- crud.py | 6 ++- distribution.py | 104 ++++++++++++++++++++++++++++++++++++++++++++++++ models.py | 32 ++++++++++++--- views_api.py | 40 ++++++++++++++++++- 4 files changed, 174 insertions(+), 8 deletions(-) diff --git a/crud.py b/crud.py index be7a12e..b65826c 100644 --- a/crud.py +++ b/crud.py @@ -891,11 +891,15 @@ async def get_client_balance_summary( """, {"cid": client_id}, ) + # Both DCA legs (auto, from bitSpire settlements) and balance-settle legs + # (operator-initiated under #4) reduce the LP's remaining fiat balance. payments_row = await db.fetchone( """ SELECT COALESCE(SUM(amount_fiat), 0) AS total FROM satoshimachine.dca_payments - WHERE client_id = :cid AND leg_type = 'dca' AND status = 'completed' + WHERE client_id = :cid + AND leg_type IN ('dca', 'settlement') + AND status = 'completed' """, {"cid": client_id}, ) diff --git a/distribution.py b/distribution.py index 9e266b8..624f53c 100644 --- a/distribution.py +++ b/distribution.py @@ -50,10 +50,12 @@ from .crud import ( ) from .models import ( CreateDcaPaymentData, + DcaClient, DcaPayment, DcaSettlement, Machine, PartialDispenseData, + SettleBalanceData, SuperConfig, ) @@ -111,6 +113,108 @@ def _build_partial_dispense_memo( ) +async def settle_lp_balance( + client: DcaClient, machine: Machine, data: SettleBalanceData +) -> DcaPayment: + """Operator UX action — closes satmachineadmin#4. + + Settle an LP's remaining fiat balance from the operator's chosen funding + wallet at the rate the operator specified. Records a leg_type='settlement' + row that counts against the LP's balance summary (so a subsequent + get_client_balance_summary reflects the new zero/reduced balance). + + Caller is responsible for verifying the operator owns both the client's + machine and the funding wallet (API endpoint does this). The amount_fiat + is capped at the LP's remaining balance — operators cannot accidentally + over-pay via this path. + """ + summary = await get_client_balance_summary(client.id) + if summary is None: + raise ValueError(f"client {client.id} balance not available") + remaining = float(summary.remaining_balance) + if remaining <= 0: + raise ValueError( + f"client {client.id} has no remaining balance to settle" + ) + + # Resolve fiat amount: explicit if given (capped at remaining), else full. + requested = ( + float(data.amount_fiat) if data.amount_fiat is not None else remaining + ) + amount_fiat = round(min(requested, remaining), 2) + if amount_fiat <= 0: + raise ValueError("computed settlement amount is zero") + + exchange_rate = float(data.exchange_rate) + amount_sats = round(amount_fiat * exchange_rate) + if amount_sats <= 0: + raise ValueError( + f"computed sat amount is zero (amount_fiat={amount_fiat}, " + f"exchange_rate={exchange_rate})" + ) + + reason = (data.notes or "").strip() or "(no reason given)" + memo = ( + f"satmachine balance settle — {amount_fiat:.2f} " + f"{machine.fiat_code} @ {exchange_rate:g} sat/{machine.fiat_code} " + f"= {amount_sats} sats. Reason: {reason}" + ) + + leg_row = await create_dca_payment( + CreateDcaPaymentData( + settlement_id=None, + client_id=client.id, + machine_id=machine.id, + operator_user_id=machine.operator_user_id, + leg_type="settlement", + destination_wallet_id=client.wallet_id, + destination_ln_address=None, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=exchange_rate, + transaction_time=datetime.now(timezone.utc), + external_payment_hash=None, + ) + ) + extra = { + "satmachine_leg": "settlement", + "satmachine_client_id": client.id, + "satmachine_machine_npub": machine.machine_npub, + "satmachine_exchange_rate": exchange_rate, + } + try: + new_invoice = await create_invoice( + wallet_id=client.wallet_id, + amount=float(amount_sats), + internal=True, + memo=memo, + extra=extra, + ) + if not new_invoice or not new_invoice.bolt11: + await update_payment_status( + leg_row.id, "failed", None, "create_invoice returned empty" + ) + raise ValueError("create_invoice returned empty") + paid = await pay_invoice( + wallet_id=data.funding_wallet_id, + payment_request=new_invoice.bolt11, + description=memo, + tag=_payment_tag(machine), + extra=extra, + ) + completed = await update_payment_status( + leg_row.id, "completed", paid.payment_hash, None + ) + return completed if completed is not None else leg_row + except Exception as exc: + logger.error( + f"distribution: balance-settle failed for client {client.id} " + f"({amount_sats} sats from wallet {data.funding_wallet_id}): {exc}" + ) + await update_payment_status(leg_row.id, "failed", None, str(exc)[:512]) + raise + + async def apply_partial_dispense_and_redistribute( settlement_id: str, data: PartialDispenseData ) -> DcaSettlement: diff --git a/models.py b/models.py index e5bfd46..f3e94e2 100644 --- a/models.py +++ b/models.py @@ -426,16 +426,36 @@ class AppendSettlementNoteData(BaseModel): class SettleBalanceData(BaseModel): """Resolves satmachineadmin#4 — operator settles small remaining LP balance - from their own wallet at the current exchange rate.""" + from their own wallet at a specified exchange rate. + + Use case: an LP has a small remaining fiat balance (e.g. 47 GTQ) that + keeps shrinking proportionally on each new transaction (Zeno's paradox). + Operator hits 'Settle', specifies the exchange rate they're willing to + honor, and the system pays out the remaining balance in sats from the + operator's wallet. The LP's balance goes to zero; settlement legs count + against the LP's balance summary alongside DCA legs. + """ - client_id: str funding_wallet_id: str - # If None, settle the full remaining balance. + # The exchange rate the operator is settling at (sats per 1 fiat unit). + # Operator picks the rate so they can use exchange spot, a market + # midpoint, or a favorable rate as a gesture. Required and explicit so + # there's no ambiguity about what rate was used. + exchange_rate: float + # If None, settle the LP's full remaining balance. Else partial. amount_fiat: Optional[float] = None notes: Optional[str] = None + @validator("exchange_rate") + def positive_rate(cls, v): + if v is None or v <= 0: + raise ValueError("exchange_rate must be > 0 (sats per fiat unit)") + return float(v) + @validator("amount_fiat") def round_amount(cls, v): - if v is not None: - return round(float(v), 2) - return v + if v is None: + return v + if v <= 0: + raise ValueError("amount_fiat must be > 0 if specified") + return round(float(v), 2) diff --git a/views_api.py b/views_api.py index e0e5b7b..7fc176f 100644 --- a/views_api.py +++ b/views_api.py @@ -8,6 +8,7 @@ from http import HTTPStatus from fastapi import APIRouter, Depends, HTTPException +from lnbits.core.crud import get_wallet from lnbits.core.models import User from lnbits.decorators import check_super_user, check_user_exists @@ -42,7 +43,10 @@ from .crud import ( update_machine, update_super_config, ) -from .distribution import apply_partial_dispense_and_redistribute +from .distribution import ( + apply_partial_dispense_and_redistribute, + settle_lp_balance, +) from .models import ( AppendSettlementNoteData, ClientBalanceSummary, @@ -57,6 +61,7 @@ from .models import ( Machine, PartialDispenseData, SetCommissionSplitsData, + SettleBalanceData, SuperConfig, UpdateDcaClientData, UpdateDepositData, @@ -231,6 +236,39 @@ async def api_get_client_balance( return summary +@satmachineadmin_api_router.post( + "/api/v1/dca/clients/{client_id}/settle", response_model=DcaPayment +) +async def api_settle_client_balance( + client_id: str, + data: SettleBalanceData, + user: User = Depends(check_user_exists), +) -> DcaPayment: + """Operator UX — closes satmachineadmin#4. + + Settle an LP's remaining fiat balance from the operator's chosen funding + wallet at the specified exchange rate. The amount_fiat is capped at the + LP's remaining balance; if omitted, settles the full remaining. + + Use case: avoid the Zeno's-paradox of vanishing tiny shares for small + remaining balances. Operator hits 'Settle' on the LP, gets to specify + the rate, and the system pays out the rest in sats from their wallet. + """ + client = await _client_owned_by(client_id, user.id) + machine = await _machine_owned_by(client.machine_id, user.id) + # Verify the operator owns the funding wallet. + funding_wallet = await get_wallet(data.funding_wallet_id) + if funding_wallet is None or funding_wallet.user != user.id: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "funding_wallet_id is not owned by the authenticated operator", + ) + try: + return await settle_lp_balance(client, machine, data) + except ValueError as exc: + raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc + + # ============================================================================= # Deposits — operator records fiat handed in by an LP at a machine. # ============================================================================= From 3ede66ff92f0d63431d1f74a4417db6f38ed4f22 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 17:37:58 +0200 Subject: [PATCH 10/10] fix(v2)(security): wallet IDOR + settlement-processing concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the HIGH-severity security finding from the v2 branch review: operator A could register a machine pointing at operator B's wallet_id (or update their machine to do so), then drain B's wallet via the settlement processor's pay_invoice call. LNbits' pay_invoice doesn't enforce caller identity at the backend layer — wallet_id is trusted as the source-of-truth for the source wallet. Two-layer defence: 1. **API layer.** New _assert_wallet_owned_by helper in views_api.py refuses any wallet_id from the request body that doesn't resolve to a wallet owned by the authenticated operator. Applied on api_create_machine and api_update_machine. Pattern lifted from the existing api_settle_client_balance which already did this for funding_wallet_id (260-265 in the original file). 2. **DB layer.** m007 adds a UNIQUE index on dca_machines.wallet_id — even if a future endpoint forgets the API check, the DB rejects two rows claiming the same wallet. CREATE UNIQUE INDEX is portable across SQLite and PostgreSQL (ALTER TABLE ADD CONSTRAINT is not on SQLite). Same commit also addresses concurrency findings H1+H2+H3 from the architectural review (race conditions on process_settlement + no retry path for errored settlements): - m007 also adds processing_claim TEXT to dca_settlements. - crud.claim_settlement_for_processing does optimistic-lock via UPDATE ... SET status='processing', processing_claim=:token WHERE id=:id AND status='pending' (portable; no UPDATE...RETURNING). Read-back compares the token; only one concurrent caller wins. - crud.reset_settlement_for_retry voids failed legs and flips 'errored' → 'pending' so process_settlement re-runs them. Completed legs are LEFT IN PLACE — we never re-pay sats that already moved. - crud.mark_settlement_status clears processing_claim on terminal states so a fresh claim attempt won't see a stale token. - distribution.process_settlement now uses the claim instead of the status-read-and-check pattern. Concurrent listener re-fires + partial-dispense recomputes can't double-pay legs. - New endpoint: POST /api/v1/dca/settlements/{id}/retry (operator-scoped) Refuses if status != 'errored' (400). Resets, then re-runs process_settlement via the claim path. DcaSettlement gains a processing_claim: Optional[str] field. Visible to operators in settlement detail; stale claims (status='processing' for many minutes) are a "processor crashed mid-flight" signal — operator can manually mark errored + retry. 32 routes registered. 72/72 tests pass. Refs: aiolabs/satmachineadmin#9 — closes the v2-branch security finding and HIGH-priority concurrency findings from the internal review. Co-Authored-By: Claude Opus 4.7 (1M context) --- crud.py | 66 ++++++++++++++++++++++++++++++++++++++++++++++++- distribution.py | 22 ++++++++++++----- migrations.py | 30 ++++++++++++++++++++++ models.py | 5 ++++ views_api.py | 53 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 169 insertions(+), 7 deletions(-) diff --git a/crud.py b/crud.py index b65826c..dbbe631 100644 --- a/crud.py +++ b/crud.py @@ -528,7 +528,9 @@ async def mark_settlement_status( status: str, error_message: Optional[str] = None, ) -> Optional[DcaSettlement]: - """Status: 'pending' | 'processed' | 'partial' | 'refunded' | 'errored'.""" + """Status: 'pending' | 'processing' | 'processed' | 'partial' | + 'refunded' | 'errored'. Clears processing_claim on terminal states so a + fresh claim attempt won't see a stale token.""" await db.execute( """ UPDATE satoshimachine.dca_settlements @@ -537,6 +539,10 @@ async def mark_settlement_status( processed_at = CASE WHEN :status IN ('processed', 'partial', 'refunded') THEN :now ELSE processed_at + END, + processing_claim = CASE + WHEN :status = 'processing' THEN processing_claim + ELSE NULL END WHERE id = :id """, @@ -550,6 +556,64 @@ async def mark_settlement_status( return await get_settlement(settlement_id) +async def claim_settlement_for_processing( + settlement_id: str, +) -> Optional[DcaSettlement]: + """Optimistic-lock claim: atomically flip a settlement to 'processing' + and tag it with a per-invocation token. Returns the claimed row on + success; None if another caller already won the claim or the settlement + is not in a claimable state ('pending'). + + Pattern is portable across SQLite + PostgreSQL (doesn't rely on + UPDATE ... RETURNING). Two concurrent invocations may both run the + UPDATE, but only one row matches the WHERE clause; the loser's UPDATE + is a no-op against status='processing'. The read-back check on the + token disambiguates.""" + token = urlsafe_short_hash() + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = 'processing', processing_claim = :token + WHERE id = :id AND status = 'pending' + """, + {"id": settlement_id, "token": token}, + ) + after = await get_settlement(settlement_id) + if after is None: + return None + if after.processing_claim != token: + return None + return after + + +async def reset_settlement_for_retry( + settlement_id: str, +) -> Optional[DcaSettlement]: + """Operator retry path. Flips 'errored' → 'pending' and voids any + 'failed' legs so process_settlement re-runs them fresh. Completed legs + are left in place — we never re-pay sats that already moved.""" + await db.execute( + """ + UPDATE satoshimachine.dca_payments + SET status = 'voided' + WHERE settlement_id = :sid AND status = 'failed' + """, + {"sid": settlement_id}, + ) + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = 'pending', + error_message = NULL, + processing_claim = NULL, + processed_at = NULL + WHERE id = :id AND status = 'errored' + """, + {"id": settlement_id}, + ) + return await get_settlement(settlement_id) + + async def apply_partial_dispense( settlement_id: str, *, diff --git a/distribution.py b/distribution.py index 624f53c..e45abfc 100644 --- a/distribution.py +++ b/distribution.py @@ -36,6 +36,7 @@ from .calculations import ( ) from .crud import ( apply_partial_dispense, + claim_settlement_for_processing, count_completed_legs_for_settlement, create_dca_payment, get_client_balance_summary, @@ -297,13 +298,22 @@ async def apply_partial_dispense_and_redistribute( async def process_settlement(settlement_id: str) -> None: - """Process a pending settlement end-to-end. Safe to invoke multiple - times — the status='processed' guard skips already-processed rows.""" - settlement = await get_settlement(settlement_id) + """Process a pending settlement end-to-end. + + Concurrency-safe: an optimistic-lock claim flips the settlement to + 'processing' atomically and tags it with a per-invocation token. + Concurrent invocations on the same id can't both win — losers see the + claim mismatch on read-back and return without writing any legs. + Retries land via reset_settlement_for_retry which voids failed legs + and flips 'errored' back to 'pending'.""" + settlement = await claim_settlement_for_processing(settlement_id) if settlement is None: - logger.warning(f"distribution: settlement {settlement_id} not found") - return - if settlement.status != "pending": + # Either already claimed by a concurrent invocation, or not in a + # 'pending' state. Either way, nothing to do here. + logger.debug( + f"distribution: skip {settlement_id} — not claimable (already " + "processing or not pending)" + ) return machine = await get_machine(settlement.machine_id) if machine is None: diff --git a/migrations.py b/migrations.py index 8db8d0e..fbe3c88 100644 --- a/migrations.py +++ b/migrations.py @@ -446,3 +446,33 @@ async def m006_add_settlement_notes(db): await db.execute( "ALTER TABLE satoshimachine.dca_settlements ADD COLUMN notes TEXT" ) + + +async def m007_settlement_claim_and_machine_wallet_unique(db): + """Security + concurrency hardening (fix bundle 1). + + 1. Adds `processing_claim` to dca_settlements. The settlement processor + uses an optimistic-lock pattern: write a per-invocation claim token + alongside the status='processing' flip, then re-read and confirm the + persisted token matches. Two concurrent process_settlement invocations + on the same id can't both win the claim, so no duplicate leg + creation / double-pay. + + 2. Adds a UNIQUE index on dca_machines.wallet_id so two machine rows + can never claim the same wallet. Closes a wallet-IDOR funds-theft + vector where operator A could register a machine on operator B's + wallet_id and drain it via the settlement processor's pay_invoice. + Defence-in-depth on top of the API-layer ownership check; if a future + endpoint forgets the check, the DB still rejects. + + CREATE UNIQUE INDEX is portable across SQLite and PostgreSQL + (ALTER TABLE ADD CONSTRAINT is not on SQLite). + """ + await db.execute( + "ALTER TABLE satoshimachine.dca_settlements " + "ADD COLUMN processing_claim TEXT" + ) + await db.execute( + "CREATE UNIQUE INDEX dca_machines_wallet_id_uq " + "ON satoshimachine.dca_machines (wallet_id)" + ) diff --git a/models.py b/models.py index f3e94e2..c6509f3 100644 --- a/models.py +++ b/models.py @@ -229,6 +229,11 @@ class DcaSettlement(BaseModel): # entry timestamped + records original values so the overwrite is # auditable from the settlement detail view alone. Never edited in place. notes: Optional[str] = None + # Optimistic-lock claim token written when status flips to 'processing'. + # Two concurrent process_settlement invocations can't both win the claim + # (only one matching read-back). Cleared back to NULL when the leg- + # writing pass completes (status='processed' or 'errored'). + processing_claim: Optional[str] = None # ============================================================================= diff --git a/views_api.py b/views_api.py index 7fc176f..5bd68f5 100644 --- a/views_api.py +++ b/views_api.py @@ -37,6 +37,7 @@ from .crud import ( get_settlements_for_operator, get_super_config, replace_commission_splits, + reset_settlement_for_retry, update_dca_client, update_deposit, update_deposit_status, @@ -45,6 +46,7 @@ from .crud import ( ) from .distribution import ( apply_partial_dispense_and_redistribute, + process_settlement, settle_lp_balance, ) from .models import ( @@ -73,6 +75,19 @@ from .models import ( satmachineadmin_api_router = APIRouter() +async def _assert_wallet_owned_by(wallet_id: str, user_id: str) -> None: + """Defence-in-depth: refuse to bind any DB row to a wallet the caller + doesn't own. Used on every endpoint that accepts a wallet_id from the + request body. The DB-side UNIQUE on dca_machines.wallet_id (m007) is a + second line of defence; this check is the primary gate.""" + wallet = await get_wallet(wallet_id) + if wallet is None or wallet.user != user_id: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "wallet_id is not owned by the authenticated operator", + ) + + # ============================================================================= # Machines # ============================================================================= @@ -82,6 +97,7 @@ satmachineadmin_api_router = APIRouter() async def api_create_machine( data: CreateMachineData, user: User = Depends(check_user_exists) ) -> Machine: + await _assert_wallet_owned_by(data.wallet_id, user.id) return await create_machine(user.id, data) @@ -117,6 +133,8 @@ async def api_update_machine( machine = await get_machine(machine_id) if machine is None or machine.operator_user_id != user.id: raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + if data.wallet_id is not None: + await _assert_wallet_owned_by(data.wallet_id, user.id) updated = await update_machine(machine_id, data) if updated is None: raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") @@ -451,6 +469,41 @@ async def api_partial_dispense( raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/retry", + response_model=DcaSettlement, +) +async def api_retry_settlement( + settlement_id: str, user: User = Depends(check_user_exists) +) -> DcaSettlement: + """Operator retry path for an errored settlement. + + Voids any failed legs (completed legs are NEVER re-paid — Lightning + sats already moved) and flips status 'errored' → 'pending', then + re-invokes process_settlement. The optimistic-lock claim guards + against a concurrent listener re-fire racing this retry.""" + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + if settlement.status != "errored": + raise HTTPException( + HTTPStatus.BAD_REQUEST, + f"settlement status must be 'errored' to retry " + f"(currently '{settlement.status}')", + ) + updated = await reset_settlement_for_retry(settlement_id) + if updated is None or updated.status != "pending": + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, "failed to reset settlement" + ) + await process_settlement(settlement_id) + after = await get_settlement(settlement_id) + return after if after is not None else updated + + @satmachineadmin_api_router.post( "/api/v1/dca/settlements/{settlement_id}/notes", response_model=DcaSettlement,