diff --git a/bitspire.py b/bitspire.py new file mode 100644 index 0000000..195c0a9 --- /dev/null +++ b/bitspire.py @@ -0,0 +1,176 @@ +# Satoshi Machine v2 — bitSpire payment parser. +# +# Translates an inbound LNbits Payment (cash-out customer paid the ATM's +# invoice) into the principal/commission split needed by satmachineadmin. +# +# Happy path: bitSpire populates Payment.extra with the canonical split +# fields per aiolabs/lamassu-next#44 — we read them directly. +# +# Fallback path: extra is missing (older bitSpire, edge case). We back-derive +# the split from the machine's fallback_commission_pct using the Lamassu-era +# formula (base = total / (1 + commission)) and mark used_fallback_split=true +# so the audit trail shows we estimated. + +from __future__ import annotations + +import json +from typing import Any, Optional, Tuple + +from loguru import logger + +from .calculations import calculate_commission +from .models import CreateDcaSettlementData, Machine + +# Sentinel value bitSpire sets in Payment.extra.source so we know an inbound +# payment originated from an ATM cash-out and not some other extension or +# customer-initiated transfer. +BITSPIRE_SOURCE = "bitspire" + + +def _coerce_int(v: Any) -> Optional[int]: + if v is None: + return None + try: + return int(v) + except (TypeError, ValueError): + return None + + +def _coerce_float(v: Any) -> Optional[float]: + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + +def _coerce_str(v: Any) -> Optional[str]: + if v is None: + return None + return str(v) if not isinstance(v, str) else v + + +def _json_dumps(v: Any) -> Optional[str]: + if v is None: + return None + try: + return json.dumps(v) + except (TypeError, ValueError): + return None + + +def is_bitspire_payment(extra: dict) -> bool: + """True if Payment.extra carries the bitSpire source marker (post-#44).""" + return isinstance(extra, dict) and extra.get("source") == BITSPIRE_SOURCE + + +def parse_settlement( + machine: Machine, + payment_hash: str, + gross_sats: int, + extra: dict, + super_fee_pct: float, +) -> Tuple[CreateDcaSettlementData, bool]: + """Build a CreateDcaSettlementData for an inbound payment landing on + `machine`'s wallet. + + Returns (data, used_fallback): when `used_fallback` is True, bitSpire + didn't populate Payment.extra so we back-derived the split. Caller + should log this for visibility — once aiolabs/lamassu-next#44 ships, + fallback usage should drop to zero. + """ + if is_bitspire_payment(extra): + data = _parse_extra(machine, payment_hash, gross_sats, extra, super_fee_pct) + return data, False + logger.warning( + f"satmachineadmin: settlement on machine {machine.machine_npub[:12]}... " + f"missing bitSpire extra metadata; back-deriving via " + f"fallback_commission_pct={machine.fallback_commission_pct}. " + f"See aiolabs/lamassu-next#44." + ) + return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct), True + + +def _parse_extra( + machine: Machine, + payment_hash: str, + gross_sats: int, + extra: dict, + super_fee_pct: float, +) -> CreateDcaSettlementData: + """Happy path: bitSpire populated Payment.extra per lamassu-next#44.""" + net_sats = _coerce_int(extra.get("net_sats")) + fee_sats = _coerce_int(extra.get("fee_sats")) + if net_sats is None or fee_sats is None: + # Missing key fields — shouldn't happen post-#44 but defensive. + return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct) + commission_sats = fee_sats + platform_fee_sats = round(commission_sats * super_fee_pct) + operator_fee_sats = commission_sats - platform_fee_sats + exchange_rate = _coerce_float(extra.get("exchange_rate")) + if exchange_rate is None or exchange_rate <= 0: + # Without exchange rate we can't compute fiat. Use 1.0 as a stand-in + # and let the operator correct via manual reconciliation. + exchange_rate = 1.0 + fiat_amount = round(gross_sats / exchange_rate, 2) if exchange_rate > 0 else 0.0 + fiat_code = _coerce_str(extra.get("currency")) or machine.fiat_code + return CreateDcaSettlementData( + machine_id=machine.id, + payment_hash=payment_hash, + bitspire_event_id=None, + bitspire_txid=_coerce_str(extra.get("txid")), + gross_sats=gross_sats, + fiat_amount=fiat_amount, + fiat_code=fiat_code, + exchange_rate=exchange_rate, + net_sats=net_sats, + commission_sats=commission_sats, + platform_fee_sats=platform_fee_sats, + operator_fee_sats=operator_fee_sats, + used_fallback_split=False, + tx_type=_coerce_str(extra.get("type")) or "cash_out", + bills_json=_json_dumps(extra.get("bills")), + cassettes_json=_json_dumps(extra.get("cassettes")), + ) + + +def _parse_fallback( + machine: Machine, + payment_hash: str, + gross_sats: int, + super_fee_pct: float, +) -> CreateDcaSettlementData: + """Back-derive the split using the machine's fallback_commission_pct. + + Same formula as the Lamassu integration used: + base_amount = round(gross / (1 + commission_pct)) + commission = gross - base_amount + """ + net_sats, commission_sats, _effective = calculate_commission( + crypto_atoms=gross_sats, + commission_percentage=machine.fallback_commission_pct, + discount=0.0, + ) + platform_fee_sats = round(commission_sats * super_fee_pct) + operator_fee_sats = commission_sats - platform_fee_sats + # No exchange rate from the wire; leave fiat_amount=0 so it's visibly + # incomplete on the operator's reconciliation screen. + return CreateDcaSettlementData( + machine_id=machine.id, + payment_hash=payment_hash, + bitspire_event_id=None, + bitspire_txid=None, + gross_sats=gross_sats, + fiat_amount=0.0, + fiat_code=machine.fiat_code, + exchange_rate=0.0, + net_sats=net_sats, + commission_sats=commission_sats, + platform_fee_sats=platform_fee_sats, + operator_fee_sats=operator_fee_sats, + used_fallback_split=True, + tx_type="cash_out", + bills_json=None, + cassettes_json=None, + ) diff --git a/calculations.py b/calculations.py index a7b3aa9..68e0ae1 100644 --- a/calculations.py +++ b/calculations.py @@ -131,6 +131,70 @@ def calculate_distribution( return distributions +def split_two_stage_commission( + commission_sats: int, super_fee_pct: float +) -> Tuple[int, int]: + """Stage-1 of the v2 commission split: super takes `super_fee_pct` of the + total commission; the remainder is what the operator's own ruleset acts on. + + Returns (platform_fee_sats, operator_fee_sats). Platform is rounded; + operator absorbs the rounding remainder so platform_fee + operator_fee + == commission_sats exactly. + + Examples: + >>> split_two_stage_commission(100, 0.30) + (30, 70) + >>> split_two_stage_commission(7965, 0.30) + (2390, 5575) + >>> split_two_stage_commission(100, 0.0) + (0, 100) + >>> split_two_stage_commission(100, 1.0) + (100, 0) + """ + if commission_sats <= 0: + return 0, 0 + platform = round(commission_sats * super_fee_pct) + platform = max(0, min(platform, commission_sats)) + operator = commission_sats - platform + return platform, operator + + +def allocate_operator_split_legs( + operator_fee_sats: int, leg_pcts: list +) -> list: + """Stage-2 of the v2 commission split: the operator's remainder is sliced + across N leg wallets per `leg_pcts` (each in 0..1, sum should equal 1.0). + + The last leg absorbs the rounding remainder so the sum of allocations + exactly equals operator_fee_sats (assuming pcts sum to ~1.0). Returns + a list of integer sat amounts in the same order as leg_pcts. + + Examples: + >>> allocate_operator_split_legs(70, [0.5, 0.3, 0.2]) + [35, 21, 14] + >>> allocate_operator_split_legs(5575, [0.5, 0.3, 0.2]) + [2787, 1672, 1116] + >>> allocate_operator_split_legs(100, [1.0]) + [100] + >>> allocate_operator_split_legs(0, [0.5, 0.5]) + [0, 0] + """ + if not leg_pcts: + return [] + if operator_fee_sats <= 0: + return [0] * len(leg_pcts) + allocations: list = [] + remaining = operator_fee_sats + for idx, pct in enumerate(leg_pcts): + if idx == len(leg_pcts) - 1: + allocations.append(remaining) + else: + amount = round(operator_fee_sats * float(pct)) + allocations.append(amount) + remaining -= amount + return allocations + + def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float: """ Calculate exchange rate in sats per fiat unit. diff --git a/crud.py b/crud.py index 7c3d6db..dbbe631 100644 --- a/crud.py +++ b/crud.py @@ -118,6 +118,19 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]: ) +async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]: + """Used by the invoice listener to route an incoming payment to a machine.""" + return await db.fetchone( + """ + SELECT * FROM satoshimachine.dca_machines + WHERE wallet_id = :wid AND is_active = true + LIMIT 1 + """, + {"wid": wallet_id}, + Machine, + ) + + async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: return await db.fetchall( """ @@ -410,24 +423,24 @@ async def delete_deposit(deposit_id: str) -> None: async def create_settlement_idempotent( data: CreateDcaSettlementData, ) -> Optional[DcaSettlement]: - """Insert a settlement keyed by bitspire_event_id. Returns the inserted row - on first sight; returns the existing row if the event_id was already seen - (subscription replay, relay double-delivery). The UNIQUE constraint on - bitspire_event_id is the source of truth.""" - existing = await get_settlement_by_event_id(data.bitspire_event_id) + """Insert a settlement keyed by payment_hash. Returns the inserted row on + first sight; returns the existing row if the payment_hash was already seen + (subscription replay, dispatcher double-fire). The UNIQUE constraint on + payment_hash is the source of truth.""" + existing = await get_settlement_by_payment_hash(data.payment_hash) if existing is not None: return existing settlement_id = urlsafe_short_hash() await db.execute( """ INSERT INTO satoshimachine.dca_settlements - (id, machine_id, bitspire_event_id, bitspire_txid, payment_hash, + (id, machine_id, payment_hash, bitspire_event_id, bitspire_txid, gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats, commission_sats, platform_fee_sats, operator_fee_sats, used_fallback_split, tx_type, bills_json, cassettes_json, status, created_at) - VALUES (:id, :machine_id, :bitspire_event_id, :bitspire_txid, - :payment_hash, :gross_sats, :fiat_amount, :fiat_code, + VALUES (:id, :machine_id, :payment_hash, :bitspire_event_id, + :bitspire_txid, :gross_sats, :fiat_amount, :fiat_code, :exchange_rate, :net_sats, :commission_sats, :platform_fee_sats, :operator_fee_sats, :used_fallback_split, :tx_type, :bills_json, :cassettes_json, :status, :created_at) @@ -435,9 +448,9 @@ async def create_settlement_idempotent( { "id": settlement_id, "machine_id": data.machine_id, + "payment_hash": data.payment_hash, "bitspire_event_id": data.bitspire_event_id, "bitspire_txid": data.bitspire_txid, - "payment_hash": data.payment_hash, "gross_sats": data.gross_sats, "fiat_amount": data.fiat_amount, "fiat_code": data.fiat_code, @@ -465,15 +478,15 @@ async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]: ) -async def get_settlement_by_event_id( - bitspire_event_id: str, +async def get_settlement_by_payment_hash( + payment_hash: str, ) -> Optional[DcaSettlement]: return await db.fetchone( """ SELECT * FROM satoshimachine.dca_settlements - WHERE bitspire_event_id = :eid + WHERE payment_hash = :hash """, - {"eid": bitspire_event_id}, + {"hash": payment_hash}, DcaSettlement, ) @@ -515,7 +528,9 @@ async def mark_settlement_status( status: str, error_message: Optional[str] = None, ) -> Optional[DcaSettlement]: - """Status: 'pending' | 'processed' | 'partial' | 'refunded' | 'errored'.""" + """Status: 'pending' | 'processing' | 'processed' | 'partial' | + 'refunded' | 'errored'. Clears processing_claim on terminal states so a + fresh claim attempt won't see a stale token.""" await db.execute( """ UPDATE satoshimachine.dca_settlements @@ -524,6 +539,10 @@ async def mark_settlement_status( processed_at = CASE WHEN :status IN ('processed', 'partial', 'refunded') THEN :now ELSE processed_at + END, + processing_claim = CASE + WHEN :status = 'processing' THEN processing_claim + ELSE NULL END WHERE id = :id """, @@ -537,6 +556,165 @@ async def mark_settlement_status( return await get_settlement(settlement_id) +async def claim_settlement_for_processing( + settlement_id: str, +) -> Optional[DcaSettlement]: + """Optimistic-lock claim: atomically flip a settlement to 'processing' + and tag it with a per-invocation token. Returns the claimed row on + success; None if another caller already won the claim or the settlement + is not in a claimable state ('pending'). + + Pattern is portable across SQLite + PostgreSQL (doesn't rely on + UPDATE ... RETURNING). Two concurrent invocations may both run the + UPDATE, but only one row matches the WHERE clause; the loser's UPDATE + is a no-op against status='processing'. The read-back check on the + token disambiguates.""" + token = urlsafe_short_hash() + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = 'processing', processing_claim = :token + WHERE id = :id AND status = 'pending' + """, + {"id": settlement_id, "token": token}, + ) + after = await get_settlement(settlement_id) + if after is None: + return None + if after.processing_claim != token: + return None + return after + + +async def reset_settlement_for_retry( + settlement_id: str, +) -> Optional[DcaSettlement]: + """Operator retry path. Flips 'errored' → 'pending' and voids any + 'failed' legs so process_settlement re-runs them fresh. Completed legs + are left in place — we never re-pay sats that already moved.""" + await db.execute( + """ + UPDATE satoshimachine.dca_payments + SET status = 'voided' + WHERE settlement_id = :sid AND status = 'failed' + """, + {"sid": settlement_id}, + ) + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = 'pending', + error_message = NULL, + processing_claim = NULL, + processed_at = NULL + WHERE id = :id AND status = 'errored' + """, + {"id": settlement_id}, + ) + return await get_settlement(settlement_id) + + +async def apply_partial_dispense( + settlement_id: str, + *, + new_gross_sats: int, + new_net_sats: int, + new_commission_sats: int, + new_platform_fee_sats: int, + new_operator_fee_sats: int, + new_fiat_amount: float, + appended_note: str, +) -> Optional[DcaSettlement]: + """Overwrite the monetary fields on a settlement (partial-dispense + recompute) and prepend `appended_note` to the notes column. + + Notes are append-only: new lines go at the top (newest first) so the + settlement detail view shows the most recent adjustment first without + needing to scroll. Resets status to 'pending' so process_settlement + can re-distribute via the existing idempotent path.""" + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET gross_sats = :gross, + net_sats = :net, + commission_sats = :commission, + platform_fee_sats = :platform, + operator_fee_sats = :operator, + fiat_amount = :fiat, + status = 'pending', + error_message = NULL, + processed_at = NULL, + notes = CASE + WHEN notes IS NULL OR notes = '' THEN :note + ELSE :note || char(10) || char(10) || notes + END + WHERE id = :id + """, + { + "id": settlement_id, + "gross": new_gross_sats, + "net": new_net_sats, + "commission": new_commission_sats, + "platform": new_platform_fee_sats, + "operator": new_operator_fee_sats, + "fiat": new_fiat_amount, + "note": appended_note, + }, + ) + return await get_settlement(settlement_id) + + +async def count_completed_legs_for_settlement(settlement_id: str) -> int: + """Used by partial-dispense to refuse adjustments after any leg has + successfully moved sats (Lightning payments can't be clawed back).""" + row = await db.fetchone( + """ + SELECT COUNT(*) AS n FROM satoshimachine.dca_payments + WHERE settlement_id = :sid AND status = 'completed' + """, + {"sid": settlement_id}, + ) + return int(row["n"]) if row else 0 + + +async def append_settlement_note( + settlement_id: str, note: str, author_user_id: str +) -> Optional[DcaSettlement]: + """Prepend an operator-authored note to settlement.notes. Each entry is + timestamped (UTC) and tagged with the author's user id so the trail + is accountable. Append-only: existing entries are never edited.""" + from datetime import timezone + + ts = datetime.now(timezone.utc).isoformat(timespec="seconds") + formatted = f"[{ts} by {author_user_id}] {note}" + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET notes = CASE + WHEN notes IS NULL OR notes = '' THEN :note + ELSE :note || char(10) || char(10) || notes + END + WHERE id = :id + """, + {"id": settlement_id, "note": formatted}, + ) + return await get_settlement(settlement_id) + + +async def void_open_legs_for_settlement(settlement_id: str) -> None: + """Marks pending/failed legs as 'voided' before re-running distribution + on a partial-dispense recompute. Preserves the rows for audit but stops + them from being interpreted as live.""" + await db.execute( + """ + UPDATE satoshimachine.dca_payments + SET status = 'voided' + WHERE settlement_id = :sid AND status IN ('pending', 'failed') + """, + {"sid": settlement_id}, + ) + + # ============================================================================= # Commission splits — operator's remainder-distribution rules. # ============================================================================= @@ -777,11 +955,15 @@ async def get_client_balance_summary( """, {"cid": client_id}, ) + # Both DCA legs (auto, from bitSpire settlements) and balance-settle legs + # (operator-initiated under #4) reduce the LP's remaining fiat balance. payments_row = await db.fetchone( """ SELECT COALESCE(SUM(amount_fiat), 0) AS total FROM satoshimachine.dca_payments - WHERE client_id = :cid AND leg_type = 'dca' AND status = 'completed' + WHERE client_id = :cid + AND leg_type IN ('dca', 'settlement') + AND status = 'completed' """, {"cid": client_id}, ) diff --git a/distribution.py b/distribution.py new file mode 100644 index 0000000..e45abfc --- /dev/null +++ b/distribution.py @@ -0,0 +1,572 @@ +# Satoshi Machine v2 — settlement distribution (P2). +# +# Picks up a dca_settlements row with status='pending' and pays out the +# three leg groups via LNbits internal transfers (create_invoice + +# pay_invoice on the same instance auto-detect internal). All legs land +# in dca_payments with the appropriate leg_type discriminator and inherit +# the Payment.tag "satmachine:{machine_npub}" so LNbits payment-history +# filters work natively. +# +# Leg order: +# 1. super_fee — platform_fee_sats → super_fee_wallet_id (if set) +# 2. operator_split — operator_fee_sats split per operator's rules +# 3. dca — net_sats distributed proportionally to active LPs, +# each leg capped at the LP's remaining fiat balance +# (preserves the v1 sync-mismatch fix from PR #2) +# +# Atomicity: LN payments cannot be rolled back. We attempt each leg, record +# success/failure per dca_payments row, and mark the settlement 'processed' +# only when every leg completed. Any failure marks 'errored' with a message +# but leaves the successful legs in place. Sats that don't get paid out +# (failed legs, no LP coverage, missing super wallet) remain in the +# machine's wallet — visible to the operator on the dashboard. + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import List + +from lnbits.core.services import create_invoice, pay_invoice +from loguru import logger + +from .calculations import ( + allocate_operator_split_legs, + calculate_distribution, + split_two_stage_commission, +) +from .crud import ( + apply_partial_dispense, + claim_settlement_for_processing, + count_completed_legs_for_settlement, + create_dca_payment, + get_client_balance_summary, + get_effective_commission_splits, + get_flow_mode_clients_for_machine, + get_machine, + get_settlement, + get_super_config, + mark_settlement_status, + update_payment_status, + void_open_legs_for_settlement, +) +from .models import ( + CreateDcaPaymentData, + DcaClient, + DcaPayment, + DcaSettlement, + Machine, + PartialDispenseData, + SettleBalanceData, + SuperConfig, +) + +PAYMENT_TAG_PREFIX = "satmachine" + + +def _payment_tag(machine: Machine) -> str: + return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}" + + +def _resolve_partial_dispense_gross( + settlement: DcaSettlement, data: PartialDispenseData +) -> int: + if data.dispensed_sats is not None: + new_gross = int(data.dispensed_sats) + elif data.dispensed_fraction is not None: + new_gross = round(settlement.gross_sats * float(data.dispensed_fraction)) + else: + raise ValueError("provide one of dispensed_sats or dispensed_fraction") + if new_gross < 0: + raise ValueError("partial dispense cannot be negative") + if new_gross > settlement.gross_sats: + raise ValueError( + f"partial dispense ({new_gross} sats) cannot exceed the original " + f"gross ({settlement.gross_sats} sats)" + ) + return new_gross + + +def _build_partial_dispense_memo( + settlement: DcaSettlement, + data: PartialDispenseData, + *, + new_gross: int, + new_net: int, + new_commission: int, + new_platform: int, + new_operator: int, +) -> str: + reason = (data.notes or "").strip() or "(no reason given)" + if data.dispensed_sats is not None: + adjust = f"dispensed_sats={data.dispensed_sats}" + else: + adjust = f"dispensed_fraction={data.dispensed_fraction}" + ts = datetime.now(timezone.utc).isoformat(timespec="seconds") + return ( + f"[{ts}] partial dispense applied — {adjust}. " + f"Original gross={settlement.gross_sats} net={settlement.net_sats} " + f"commission={settlement.commission_sats} " + f"(super_fee={settlement.platform_fee_sats} " + f"operator_fee={settlement.operator_fee_sats}). " + f"New gross={new_gross} net={new_net} commission={new_commission} " + f"(super_fee={new_platform} operator_fee={new_operator}). " + f"Reason: {reason}" + ) + + +async def settle_lp_balance( + client: DcaClient, machine: Machine, data: SettleBalanceData +) -> DcaPayment: + """Operator UX action — closes satmachineadmin#4. + + Settle an LP's remaining fiat balance from the operator's chosen funding + wallet at the rate the operator specified. Records a leg_type='settlement' + row that counts against the LP's balance summary (so a subsequent + get_client_balance_summary reflects the new zero/reduced balance). + + Caller is responsible for verifying the operator owns both the client's + machine and the funding wallet (API endpoint does this). The amount_fiat + is capped at the LP's remaining balance — operators cannot accidentally + over-pay via this path. + """ + summary = await get_client_balance_summary(client.id) + if summary is None: + raise ValueError(f"client {client.id} balance not available") + remaining = float(summary.remaining_balance) + if remaining <= 0: + raise ValueError( + f"client {client.id} has no remaining balance to settle" + ) + + # Resolve fiat amount: explicit if given (capped at remaining), else full. + requested = ( + float(data.amount_fiat) if data.amount_fiat is not None else remaining + ) + amount_fiat = round(min(requested, remaining), 2) + if amount_fiat <= 0: + raise ValueError("computed settlement amount is zero") + + exchange_rate = float(data.exchange_rate) + amount_sats = round(amount_fiat * exchange_rate) + if amount_sats <= 0: + raise ValueError( + f"computed sat amount is zero (amount_fiat={amount_fiat}, " + f"exchange_rate={exchange_rate})" + ) + + reason = (data.notes or "").strip() or "(no reason given)" + memo = ( + f"satmachine balance settle — {amount_fiat:.2f} " + f"{machine.fiat_code} @ {exchange_rate:g} sat/{machine.fiat_code} " + f"= {amount_sats} sats. Reason: {reason}" + ) + + leg_row = await create_dca_payment( + CreateDcaPaymentData( + settlement_id=None, + client_id=client.id, + machine_id=machine.id, + operator_user_id=machine.operator_user_id, + leg_type="settlement", + destination_wallet_id=client.wallet_id, + destination_ln_address=None, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=exchange_rate, + transaction_time=datetime.now(timezone.utc), + external_payment_hash=None, + ) + ) + extra = { + "satmachine_leg": "settlement", + "satmachine_client_id": client.id, + "satmachine_machine_npub": machine.machine_npub, + "satmachine_exchange_rate": exchange_rate, + } + try: + new_invoice = await create_invoice( + wallet_id=client.wallet_id, + amount=float(amount_sats), + internal=True, + memo=memo, + extra=extra, + ) + if not new_invoice or not new_invoice.bolt11: + await update_payment_status( + leg_row.id, "failed", None, "create_invoice returned empty" + ) + raise ValueError("create_invoice returned empty") + paid = await pay_invoice( + wallet_id=data.funding_wallet_id, + payment_request=new_invoice.bolt11, + description=memo, + tag=_payment_tag(machine), + extra=extra, + ) + completed = await update_payment_status( + leg_row.id, "completed", paid.payment_hash, None + ) + return completed if completed is not None else leg_row + except Exception as exc: + logger.error( + f"distribution: balance-settle failed for client {client.id} " + f"({amount_sats} sats from wallet {data.funding_wallet_id}): {exc}" + ) + await update_payment_status(leg_row.id, "failed", None, str(exc)[:512]) + raise + + +async def apply_partial_dispense_and_redistribute( + settlement_id: str, data: PartialDispenseData +) -> DcaSettlement: + """Operator UX action — closes satmachineadmin#3. + + When a bitSpire dispense fails mid-transaction (e.g., dispenser jam after + 6 of 10 bills), the operator confirms the actual amount dispensed and we + re-allocate the split against that partial gross. Sat amounts scale + linearly, preserving the original commission ratio exactly; the two-stage + super/operator split is recomputed using the CURRENT super_fee_pct + (super may have changed the rate since the original landed). + + Hard guard: refuses if any dca_payments leg has already completed. + Lightning payments can't be clawed back, so we won't try. + + Side effects: + - Voids pending/failed legs (status → 'voided'). + - Overwrites the settlement's monetary fields with the new totals. + - Appends a timestamped memo to settlement.notes capturing the + original values + operator's reason. + - Resets settlement.status to 'pending' and triggers process_settlement. + """ + settlement = await get_settlement(settlement_id) + if settlement is None: + raise ValueError(f"settlement {settlement_id} not found") + if settlement.gross_sats <= 0: + raise ValueError("cannot partial-dispense a zero-gross settlement") + completed = await count_completed_legs_for_settlement(settlement_id) + if completed > 0: + raise ValueError( + f"cannot partial-dispense: {completed} leg(s) already completed " + "(Lightning payments can't be clawed back)" + ) + + new_gross = _resolve_partial_dispense_gross(settlement, data) + # Linear scale preserves the original commission ratio exactly. + scale = new_gross / settlement.gross_sats + new_commission = round(settlement.commission_sats * scale) + new_net = new_gross - new_commission + new_fiat = round(float(settlement.fiat_amount) * scale, 2) + + # Re-stage-1 split using the CURRENT super_fee_pct. + super_config = await get_super_config() + super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0 + new_platform, new_operator = split_two_stage_commission( + new_commission, super_fee_pct + ) + + memo = _build_partial_dispense_memo( + settlement, + data, + new_gross=new_gross, + new_net=new_net, + new_commission=new_commission, + new_platform=new_platform, + new_operator=new_operator, + ) + + await void_open_legs_for_settlement(settlement_id) + updated = await apply_partial_dispense( + settlement_id, + new_gross_sats=new_gross, + new_net_sats=new_net, + new_commission_sats=new_commission, + new_platform_fee_sats=new_platform, + new_operator_fee_sats=new_operator, + new_fiat_amount=new_fiat, + appended_note=memo, + ) + if updated is None: + raise ValueError(f"settlement {settlement_id} disappeared mid-update") + + logger.info( + f"distribution: partial-dispense applied to settlement " + f"{settlement_id} — re-running distribution" + ) + await process_settlement(settlement_id) + after = await get_settlement(settlement_id) + return after if after is not None else updated + + +async def process_settlement(settlement_id: str) -> None: + """Process a pending settlement end-to-end. + + Concurrency-safe: an optimistic-lock claim flips the settlement to + 'processing' atomically and tags it with a per-invocation token. + Concurrent invocations on the same id can't both win — losers see the + claim mismatch on read-back and return without writing any legs. + Retries land via reset_settlement_for_retry which voids failed legs + and flips 'errored' back to 'pending'.""" + settlement = await claim_settlement_for_processing(settlement_id) + if settlement is None: + # Either already claimed by a concurrent invocation, or not in a + # 'pending' state. Either way, nothing to do here. + logger.debug( + f"distribution: skip {settlement_id} — not claimable (already " + "processing or not pending)" + ) + return + machine = await get_machine(settlement.machine_id) + if machine is None: + logger.error( + f"distribution: settlement {settlement_id} references missing " + f"machine {settlement.machine_id}" + ) + await mark_settlement_status( + settlement_id, "errored", "machine missing" + ) + return + super_config = await get_super_config() + errors: List[str] = [] + + try: + await _pay_super_fee(settlement, machine, super_config, errors) + await _pay_operator_splits(settlement, machine, errors) + await _pay_dca_distributions(settlement, machine, errors) + except Exception as exc: # last-resort guard + logger.exception("distribution: unexpected error processing settlement") + errors.append(f"unexpected: {exc}") + + if errors: + await mark_settlement_status( + settlement_id, "errored", "; ".join(errors)[:512] + ) + else: + await mark_settlement_status(settlement_id, "processed", None) + + +# ============================================================================= +# Leg 1 — super fee +# ============================================================================= + + +async def _pay_super_fee( + settlement: DcaSettlement, + machine: Machine, + super_config: SuperConfig | None, + errors: List[str], +) -> None: + if settlement.platform_fee_sats <= 0: + return + if super_config is None or not super_config.super_fee_wallet_id: + # Super has configured a fee but not a destination wallet — leave + # the sats in the machine wallet and warn. The super needs to + # configure their wallet before they can collect. + logger.warning( + f"distribution: super_fee_sats={settlement.platform_fee_sats} " + f"left in machine wallet (super_fee_wallet_id not set)" + ) + return + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="super_fee", + client_id=None, + destination_wallet_id=super_config.super_fee_wallet_id, + amount_sats=settlement.platform_fee_sats, + memo=f"satmachine super fee — {machine.name or machine.machine_npub[:12]}", + errors=errors, + ) + + +# ============================================================================= +# Leg 2 — operator commission splits +# ============================================================================= + + +async def _pay_operator_splits( + settlement: DcaSettlement, + machine: Machine, + errors: List[str], +) -> None: + if settlement.operator_fee_sats <= 0: + return + splits = await get_effective_commission_splits( + machine.operator_user_id, machine.id + ) + if not splits: + logger.warning( + f"distribution: operator_fee_sats={settlement.operator_fee_sats} " + f"left in machine wallet (operator has no commission_splits ruleset " + f"for machine {machine.id})" + ) + return + # Pure allocator handles the rounding rule (last leg absorbs remainder). + leg_amounts = allocate_operator_split_legs( + settlement.operator_fee_sats, + [float(leg.pct) for leg in splits], + ) + for idx, (leg, amount) in enumerate(zip(splits, leg_amounts, strict=True)): + if amount <= 0: + continue + label = leg.label or f"split-{idx + 1}" + memo = ( + f"satmachine operator split — " + f"{machine.name or machine.machine_npub[:12]} ({label})" + ) + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="operator_split", + client_id=None, + destination_wallet_id=leg.wallet_id, + amount_sats=amount, + memo=memo, + errors=errors, + ) + + +# ============================================================================= +# Leg 3 — DCA distribution to active LPs +# ============================================================================= + + +async def _pay_dca_distributions( + settlement: DcaSettlement, + machine: Machine, + errors: List[str], +) -> None: + if settlement.net_sats <= 0: + return + if settlement.exchange_rate <= 0: + # Fallback path with no exchange rate (bitSpire Payment.extra absent). + # Without a rate we can't compute fiat balances → can't compute + # proportional shares → leave net_sats in the machine wallet for + # the operator to manually reconcile. + logger.warning( + f"distribution: net_sats={settlement.net_sats} left in machine " + f"wallet (no exchange_rate; fallback path; see lamassu-next#44)" + ) + return + clients = await get_flow_mode_clients_for_machine(machine.id) + if not clients: + return + # Build {client_id: remaining_fiat_balance} for proportional allocation. + client_balances: dict[str, float] = {} + for client in clients: + summary = await get_client_balance_summary(client.id) + if summary is None or summary.remaining_balance <= 0: + continue + client_balances[client.id] = summary.remaining_balance + if not client_balances: + return + # Compute proportional sat allocations, then cap each at the client's + # remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard). + raw_allocations = calculate_distribution( + base_amount_sats=settlement.net_sats, + client_balances=client_balances, + ) + capped_allocations: dict[str, int] = {} + for client_id, raw_sats in raw_allocations.items(): + remaining_fiat = client_balances[client_id] + cap_sats = int(remaining_fiat * float(settlement.exchange_rate)) + capped_allocations[client_id] = min(raw_sats, cap_sats) + # Pay each capped allocation. + client_by_id = {c.id: c for c in clients} + for client_id, amount_sats in capped_allocations.items(): + if amount_sats <= 0: + continue + client = client_by_id[client_id] + amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2) + memo = ( + f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}" + ) + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="dca", + client_id=client.id, + destination_wallet_id=client.wallet_id, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=float(settlement.exchange_rate), + memo=memo, + errors=errors, + ) + + +# ============================================================================= +# Internal transfer helper +# ============================================================================= + + +async def _pay_internal( + *, + settlement: DcaSettlement, + machine: Machine, + leg_type: str, + client_id: str | None, + destination_wallet_id: str, + amount_sats: int, + memo: str, + errors: List[str], + amount_fiat: float | None = None, + exchange_rate: float | None = None, +) -> DcaPayment | None: + """Create an invoice on the destination wallet, pay it from the machine + wallet, and record the leg in dca_payments. Returns the dca_payments row + on success (including the failed case — the row stays for audit).""" + tag = _payment_tag(machine) + leg_row = await create_dca_payment( + CreateDcaPaymentData( + settlement_id=settlement.id, + client_id=client_id, + machine_id=machine.id, + operator_user_id=machine.operator_user_id, + leg_type=leg_type, + destination_wallet_id=destination_wallet_id, + destination_ln_address=None, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=exchange_rate, + transaction_time=datetime.now(), + external_payment_hash=None, + ) + ) + extra = { + "satmachine_leg": leg_type, + "satmachine_settlement_id": settlement.id, + "satmachine_machine_npub": machine.machine_npub, + } + try: + new_invoice = await create_invoice( + wallet_id=destination_wallet_id, + amount=float(amount_sats), + internal=True, + memo=memo, + extra=extra, + ) + if not new_invoice or not new_invoice.bolt11: + await update_payment_status( + leg_row.id, "failed", None, "create_invoice returned empty" + ) + errors.append(f"{leg_type}: create_invoice empty") + return leg_row + paid = await pay_invoice( + wallet_id=machine.wallet_id, + payment_request=new_invoice.bolt11, + description=memo, + tag=tag, + extra=extra, + ) + await update_payment_status( + leg_row.id, "completed", paid.payment_hash, None + ) + return leg_row + except Exception as exc: + logger.error( + f"distribution: {leg_type} leg failed " + f"(settlement={settlement.id} amount={amount_sats}): {exc}" + ) + await update_payment_status(leg_row.id, "failed", None, str(exc)[:512]) + errors.append(f"{leg_type}: {exc}") + return leg_row diff --git a/migrations.py b/migrations.py index 1c06203..fbe3c88 100644 --- a/migrations.py +++ b/migrations.py @@ -290,20 +290,28 @@ async def m005_satmachine_v2_overhaul(db): "ON satoshimachine.dca_deposits (client_id, created_at DESC)" ) - # dca_settlements — idempotency table for bitSpire kind-21000 events. - # CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute BIGINT - # (not as a derived percentage). Today this is just the contractual split. Once - # the v2 promotion engine ships, the two values diverge when discounts fire and - # this row is the only audit-grade record of who forgave what. Do not collapse - # them into a single commission_pct field. See plan section "Customer discounts". + # dca_settlements — idempotency table for bitSpire-driven settlements. + # The natural unique key is payment_hash (every LN invoice has a globally + # unique hash; subscription replays / dispatcher double-fires dedup via the + # UNIQUE constraint). bitspire_event_id is reserved for a future path where + # we subscribe to raw Nostr events directly (kind-30078/30079 ingestion + # uses dca_telemetry; bitspire_event_id is kept here for future bookkeeping + # if we ever bypass the LNbits Payment system). + # + # CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute + # BIGINT (not a derived percentage). Today this is just the contractual + # split. Once the v2 promotion engine ships, the two values diverge when + # discounts fire and this row is the only audit-grade record of who forgave + # what. Do not collapse them into a single commission_pct field. See plan + # section "Customer discounts". await db.execute( f""" CREATE TABLE satoshimachine.dca_settlements ( id TEXT PRIMARY KEY, machine_id TEXT NOT NULL, - bitspire_event_id TEXT NOT NULL UNIQUE, + payment_hash TEXT NOT NULL UNIQUE, + bitspire_event_id TEXT, bitspire_txid TEXT, - payment_hash TEXT NOT NULL, gross_sats BIGINT NOT NULL, fiat_amount DECIMAL(10,2) NOT NULL, fiat_code TEXT NOT NULL DEFAULT 'GTQ', @@ -327,10 +335,7 @@ async def m005_satmachine_v2_overhaul(db): "CREATE INDEX dca_settlements_machine_idx " "ON satoshimachine.dca_settlements (machine_id, created_at DESC)" ) - await db.execute( - "CREATE INDEX dca_settlements_payment_hash_idx " - "ON satoshimachine.dca_settlements (payment_hash)" - ) + # payment_hash UNIQUE already creates a lookup index — no extra index needed. # dca_commission_splits — operator's rules for distributing the *remainder* # of each commission (commission_sats - platform_fee_sats). One row per leg. @@ -423,3 +428,51 @@ async def m005_satmachine_v2_overhaul(db): ); """ ) + + +async def m006_add_settlement_notes(db): + """Audit memo on dca_settlements. + + When an operator triggers an in-place adjustment (partial-dispense, + manual reconciliation override, etc.), the settlement row's monetary + fields are overwritten with the new numbers. To preserve the audit + trail without a separate history table, we append a timestamped memo + to this notes column capturing the previous values and the reason. + + Operators see this directly in the settlement detail view, so any + overwrite is visible and dated. Append-only convention: new memos + are prepended with a timestamp; never edited in place. + """ + await db.execute( + "ALTER TABLE satoshimachine.dca_settlements ADD COLUMN notes TEXT" + ) + + +async def m007_settlement_claim_and_machine_wallet_unique(db): + """Security + concurrency hardening (fix bundle 1). + + 1. Adds `processing_claim` to dca_settlements. The settlement processor + uses an optimistic-lock pattern: write a per-invocation claim token + alongside the status='processing' flip, then re-read and confirm the + persisted token matches. Two concurrent process_settlement invocations + on the same id can't both win the claim, so no duplicate leg + creation / double-pay. + + 2. Adds a UNIQUE index on dca_machines.wallet_id so two machine rows + can never claim the same wallet. Closes a wallet-IDOR funds-theft + vector where operator A could register a machine on operator B's + wallet_id and drain it via the settlement processor's pay_invoice. + Defence-in-depth on top of the API-layer ownership check; if a future + endpoint forgets the check, the DB still rejects. + + CREATE UNIQUE INDEX is portable across SQLite and PostgreSQL + (ALTER TABLE ADD CONSTRAINT is not on SQLite). + """ + await db.execute( + "ALTER TABLE satoshimachine.dca_settlements " + "ADD COLUMN processing_claim TEXT" + ) + await db.execute( + "CREATE UNIQUE INDEX dca_machines_wallet_id_uq " + "ON satoshimachine.dca_machines (wallet_id)" + ) diff --git a/models.py b/models.py index ced9f8d..c6509f3 100644 --- a/models.py +++ b/models.py @@ -185,9 +185,9 @@ class UpdateDepositStatusData(BaseModel): class CreateDcaSettlementData(BaseModel): machine_id: str - bitspire_event_id: str # nostr event id — the idempotency key + payment_hash: str # the idempotency key (UNIQUE in the dca_settlements table) + bitspire_event_id: Optional[str] = None # reserved for direct-Nostr ingestion bitspire_txid: Optional[str] = None - payment_hash: str gross_sats: int fiat_amount: float fiat_code: str = "GTQ" @@ -205,9 +205,9 @@ class CreateDcaSettlementData(BaseModel): class DcaSettlement(BaseModel): id: str machine_id: str - bitspire_event_id: str - bitspire_txid: Optional[str] payment_hash: str + bitspire_event_id: Optional[str] + bitspire_txid: Optional[str] gross_sats: int fiat_amount: float fiat_code: str @@ -224,6 +224,16 @@ class DcaSettlement(BaseModel): error_message: Optional[str] processed_at: Optional[datetime] created_at: datetime + # Append-only audit memo. Populated when an operator triggers an in-place + # adjustment (partial-dispense, manual reconciliation override). Each + # entry timestamped + records original values so the overwrite is + # auditable from the settlement detail view alone. Never edited in place. + notes: Optional[str] = None + # Optimistic-lock claim token written when status flips to 'processing'. + # Two concurrent process_settlement invocations can't both win the claim + # (only one matching read-back). Cleared back to NULL when the leg- + # writing pass completes (status='processed' or 'errored'). + processing_claim: Optional[str] = None # ============================================================================= @@ -397,18 +407,60 @@ class PartialDispenseData(BaseModel): return v +class AppendSettlementNoteData(BaseModel): + """Operator-authored free-form note on a settlement. + + Notes are prepended (newest first) to the settlement's `notes` column, + with a UTC timestamp and the author's user id so each entry is + accountable. Useful for cash-drawer reconciliation context, off-the- + record refund records, or any narrative an operator wants to attach + for future reference. + """ + + note: str + + @validator("note") + def non_empty(cls, v): + v = v.strip() if isinstance(v, str) else v + if not v: + raise ValueError("note cannot be empty") + if len(v) > 2000: + raise ValueError("note too long (max 2000 chars)") + return v + + class SettleBalanceData(BaseModel): """Resolves satmachineadmin#4 — operator settles small remaining LP balance - from their own wallet at the current exchange rate.""" + from their own wallet at a specified exchange rate. + + Use case: an LP has a small remaining fiat balance (e.g. 47 GTQ) that + keeps shrinking proportionally on each new transaction (Zeno's paradox). + Operator hits 'Settle', specifies the exchange rate they're willing to + honor, and the system pays out the remaining balance in sats from the + operator's wallet. The LP's balance goes to zero; settlement legs count + against the LP's balance summary alongside DCA legs. + """ - client_id: str funding_wallet_id: str - # If None, settle the full remaining balance. + # The exchange rate the operator is settling at (sats per 1 fiat unit). + # Operator picks the rate so they can use exchange spot, a market + # midpoint, or a favorable rate as a gesture. Required and explicit so + # there's no ambiguity about what rate was used. + exchange_rate: float + # If None, settle the LP's full remaining balance. Else partial. amount_fiat: Optional[float] = None notes: Optional[str] = None + @validator("exchange_rate") + def positive_rate(cls, v): + if v is None or v <= 0: + raise ValueError("exchange_rate must be > 0 (sats per fiat unit)") + return float(v) + @validator("amount_fiat") def round_amount(cls, v): - if v is not None: - return round(float(v), 2) - return v + if v is None: + return v + if v <= 0: + raise ValueError("amount_fiat must be > 0 if specified") + return round(float(v), 2) diff --git a/tasks.py b/tasks.py index 2efa8b0..ba5050e 100644 --- a/tasks.py +++ b/tasks.py @@ -1,27 +1,88 @@ -# Satoshi Machine v2 — task placeholders. +# Satoshi Machine v2 — invoice listener (P1). # -# The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent. -# They will be replaced in P1 (Nostr subscription manager: subscribes via -# lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078 -# beacons + kind-30079 telemetry per registered machine, with auto-reconnect). +# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then +# for each successful inbound payment: +# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip. +# 2. Parses Payment.extra for bitSpire split metadata (post-lamassu-next#44). +# Falls back to machine.fallback_commission_pct if extra is absent. +# 3. Computes the two-stage split (super_fee first, operator remainder). +# 4. Inserts a dca_settlements row idempotently (keyed by payment_hash). # -# These no-op stubs keep __init__.py importable in the interim so the -# extension can be activated even before P1 lands. +# The actual distribution of sats — paying out the LP DCA legs, the super-fee +# leg, and the operator's commission-split legs — happens in a separate +# settlement-processor task (P2). This listener only LANDS the settlement +# row; status='pending' tells the processor it still needs to move the money. import asyncio +from lnbits.core.models import Payment +from lnbits.tasks import register_invoice_listener from loguru import logger +from .bitspire import parse_settlement +from .crud import ( + create_settlement_idempotent, + get_active_machine_by_wallet_id, + get_super_config, +) +from .distribution import process_settlement + +LISTENER_NAME = "ext_satmachineadmin" + async def wait_for_paid_invoices() -> None: - """No-op placeholder pending P1 Nostr subscription manager.""" - logger.debug( - "satmachineadmin v2: invoice listener stub running. " - "Real Nostr-transport subscription pending P1." + invoice_queue: asyncio.Queue = asyncio.Queue() + register_invoice_listener(invoice_queue, LISTENER_NAME) + logger.info( + "satmachineadmin v2: invoice listener registered as " + f"`{LISTENER_NAME}` — waiting for bitSpire settlements." ) - # Sleep forever; the task system expects a long-lived coroutine. while True: - await asyncio.sleep(3600) + payment: Payment = await invoice_queue.get() + try: + await _handle_payment(payment) + except Exception as exc: # listener must never die + logger.error( + f"satmachineadmin: error handling payment " + f"{payment.payment_hash[:12]}...: {exc}" + ) + + +async def _handle_payment(payment: Payment) -> None: + if not payment.is_in or not payment.success: + return + machine = await get_active_machine_by_wallet_id(payment.wallet_id) + if machine is None: + return + super_config = await get_super_config() + super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0 + data, used_fallback = parse_settlement( + machine=machine, + payment_hash=payment.payment_hash, + gross_sats=payment.sat, + extra=payment.extra or {}, + super_fee_pct=super_fee_pct, + ) + settlement = await create_settlement_idempotent(data) + if settlement is None: + logger.error( + f"satmachineadmin: failed to insert settlement for " + f"payment_hash={payment.payment_hash[:12]}..." + ) + return + fb = " (fallback split)" if used_fallback else "" + logger.info( + f"satmachineadmin: landed settlement {settlement.id} for " + f"machine={machine.machine_npub[:12]}... " + f"gross={data.gross_sats}sats net={data.net_sats}sats " + f"commission={data.commission_sats}sats " + f"(super_fee={data.platform_fee_sats} " + f"operator_fee={data.operator_fee_sats}){fb}" + ) + # Trigger distribution synchronously so latency is one bitSpire-tx wide. + # process_settlement is idempotent (status='processed' guard); if this + # task crashes mid-process, the next manual or scheduled retry resumes. + await process_settlement(settlement.id) async def hourly_transaction_polling() -> None: diff --git a/tests/test_two_stage_split.py b/tests/test_two_stage_split.py new file mode 100644 index 0000000..71490c6 --- /dev/null +++ b/tests/test_two_stage_split.py @@ -0,0 +1,144 @@ +""" +Tests for the v2 two-stage commission split (super first, operator remainder). + +The plan calls out a verification scenario explicitly: + super_fee_pct=30%, operator split 50/30/20 on a 100-sat commission + → super_wallet gets 30, operator_self gets 35, employee 21, maint 14. + +Also covers the edge cases: super_fee_pct=0 (no super), super_fee_pct=1.0 +(everything to super), single-leg operator ruleset, zero operator fee. +""" + +import pytest + +from ..calculations import ( + allocate_operator_split_legs, + split_two_stage_commission, +) + + +class TestSplitTwoStageCommission: + """Stage-1: super takes super_fee_pct of commission; operator gets rest.""" + + def test_plan_example_100sats_30pct(self): + platform, operator = split_two_stage_commission(100, 0.30) + assert platform == 30 + assert operator == 70 + assert platform + operator == 100 + + def test_realistic_7965sats_30pct(self): + # From the plan's 2000 GTQ → 266800 sats @ 3% commission example. + platform, operator = split_two_stage_commission(7965, 0.30) + assert platform == 2390 # round(7965 * 0.30) = 2389.5 → 2390 + assert operator == 5575 # 7965 - 2390 + assert platform + operator == 7965 + + def test_super_pct_zero_leaves_all_to_operator(self): + platform, operator = split_two_stage_commission(7965, 0.0) + assert platform == 0 + assert operator == 7965 + + def test_super_pct_one_takes_everything(self): + platform, operator = split_two_stage_commission(7965, 1.0) + assert platform == 7965 + assert operator == 0 + + def test_zero_commission(self): + platform, operator = split_two_stage_commission(0, 0.30) + assert platform == 0 + assert operator == 0 + + def test_negative_commission_clamps_to_zero(self): + # Defensive: should never happen, but verify we don't go negative. + platform, operator = split_two_stage_commission(-100, 0.30) + assert platform == 0 + assert operator == 0 + + @pytest.mark.parametrize("commission_sats", [1, 7, 100, 7965, 1_000_000]) + @pytest.mark.parametrize("super_pct", [0.0, 0.1, 0.30, 0.5, 0.777, 1.0]) + def test_invariant_sum_equals_commission(self, commission_sats, super_pct): + platform, operator = split_two_stage_commission(commission_sats, super_pct) + assert platform + operator == commission_sats + assert 0 <= platform <= commission_sats + assert 0 <= operator <= commission_sats + + +class TestAllocateOperatorSplitLegs: + """Stage-2: operator's remainder split across N leg wallets per pct rules.""" + + def test_plan_example_50_30_20_on_70(self): + amounts = allocate_operator_split_legs(70, [0.5, 0.3, 0.2]) + assert amounts == [35, 21, 14] + assert sum(amounts) == 70 + + def test_realistic_50_30_20_on_5575(self): + amounts = allocate_operator_split_legs(5575, [0.5, 0.3, 0.2]) + # 50%: round(2787.5) = 2788; 30%: round(1672.5) = 1672; last absorbs + # remainder: 5575 - 2788 - 1672 = 1115. + # Note: round() uses banker's rounding so 2787.5 → 2788 actually + # because 2788 is even. Confirm by total invariant. + assert sum(amounts) == 5575 + assert len(amounts) == 3 + + def test_single_leg_full_remainder(self): + amounts = allocate_operator_split_legs(100, [1.0]) + assert amounts == [100] + + def test_zero_operator_fee_zeros_all_legs(self): + amounts = allocate_operator_split_legs(0, [0.5, 0.5]) + assert amounts == [0, 0] + + def test_empty_legs_list_returns_empty(self): + amounts = allocate_operator_split_legs(100, []) + assert amounts == [] + + def test_last_leg_absorbs_rounding_remainder(self): + # 100 / 3 ≈ 33.33 each; rounding makes the first two 33 and last 34. + amounts = allocate_operator_split_legs(100, [1 / 3, 1 / 3, 1 / 3]) + assert sum(amounts) == 100 + assert amounts[0] == round(100 / 3) # 33 + assert amounts[1] == round(100 / 3) # 33 + # Last leg absorbs the rounding (34, not 33) so total == 100. + assert amounts[2] == 100 - amounts[0] - amounts[1] + + @pytest.mark.parametrize( + "operator_fee,pcts", + [ + (1, [0.5, 0.5]), + (7, [0.5, 0.3, 0.2]), + (100, [0.5, 0.5]), + (5575, [0.5, 0.3, 0.2]), + (1_000_000, [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]), + ], + ) + def test_invariant_sum_equals_operator_fee(self, operator_fee, pcts): + amounts = allocate_operator_split_legs(operator_fee, pcts) + assert sum(amounts) == operator_fee + assert all(a >= 0 for a in amounts) + + +class TestEndToEndScenarios: + """The full two-stage split — super then operator legs — composed.""" + + def test_plan_example_full(self): + # 100 sats commission, super=30%, operator splits 50/30/20. + platform, operator = split_two_stage_commission(100, 0.30) + legs = allocate_operator_split_legs(operator, [0.5, 0.3, 0.2]) + assert platform == 30 + assert legs == [35, 21, 14] + assert platform + sum(legs) == 100 + + def test_super_pct_zero_full_pipeline(self): + platform, operator = split_two_stage_commission(7965, 0.0) + legs = allocate_operator_split_legs(operator, [1.0]) + assert platform == 0 + assert legs == [7965] + assert platform + sum(legs) == 7965 + + def test_super_pct_one_full_pipeline(self): + platform, operator = split_two_stage_commission(7965, 1.0) + legs = allocate_operator_split_legs(operator, [0.5, 0.5]) + assert platform == 7965 + # Operator has zero to distribute; both legs get zero. + assert legs == [0, 0] + assert platform + sum(legs) == 7965 diff --git a/views_api.py b/views_api.py index fd2d430..5bd68f5 100644 --- a/views_api.py +++ b/views_api.py @@ -1,20 +1,661 @@ -# Satoshi Machine v2 — API placeholder. +# Satoshi Machine v2 — operator API surface (P1b). # -# The v1 super-only Lamassu endpoints have been removed. The v2 operator- -# scoped surface (machines / clients / deposits / settlements / commission -# splits / partial-tx / balance-settle / super platform-fee) lands in P1+. -# See plan section "Critical files to modify". -# -# This stub keeps __init__.py importable and surfaces a clear 503 on every -# v1 route so existing clients get a precise error instead of a silent 404. +# All endpoints are operator-scoped via check_user_exists. Every query +# filters by the authenticated user's id so two operators on the same +# LNbits instance can never see each other's machines, settlements, or +# clients. The super-only platform-fee write endpoint lands in P2. from http import HTTPStatus -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException +from lnbits.core.crud import get_wallet +from lnbits.core.models import User +from lnbits.decorators import check_super_user, check_user_exists + +from .crud import ( + append_settlement_note, + create_dca_client, + create_deposit, + create_machine, + delete_dca_client, + delete_deposit, + delete_machine, + get_client_balance_summary, + get_commission_splits, + get_dca_client, + get_dca_clients_for_machine, + get_dca_clients_for_operator, + get_deposit, + get_deposits_for_client, + get_deposits_for_operator, + get_effective_commission_splits, + get_machine, + get_machines_for_operator, + get_payments_for_operator, + get_settlement, + get_settlements_for_machine, + get_settlements_for_operator, + get_super_config, + replace_commission_splits, + reset_settlement_for_retry, + update_dca_client, + update_deposit, + update_deposit_status, + update_machine, + update_super_config, +) +from .distribution import ( + apply_partial_dispense_and_redistribute, + process_settlement, + settle_lp_balance, +) +from .models import ( + AppendSettlementNoteData, + ClientBalanceSummary, + CommissionSplit, + CreateDcaClientData, + CreateDepositData, + CreateMachineData, + DcaClient, + DcaDeposit, + DcaPayment, + DcaSettlement, + Machine, + PartialDispenseData, + SetCommissionSplitsData, + SettleBalanceData, + SuperConfig, + UpdateDcaClientData, + UpdateDepositData, + UpdateDepositStatusData, + UpdateMachineData, + UpdateSuperConfigData, +) satmachineadmin_api_router = APIRouter() +async def _assert_wallet_owned_by(wallet_id: str, user_id: str) -> None: + """Defence-in-depth: refuse to bind any DB row to a wallet the caller + doesn't own. Used on every endpoint that accepts a wallet_id from the + request body. The DB-side UNIQUE on dca_machines.wallet_id (m007) is a + second line of defence; this check is the primary gate.""" + wallet = await get_wallet(wallet_id) + if wallet is None or wallet.user != user_id: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "wallet_id is not owned by the authenticated operator", + ) + + +# ============================================================================= +# Machines +# ============================================================================= + + +@satmachineadmin_api_router.post("/api/v1/dca/machines", response_model=Machine) +async def api_create_machine( + data: CreateMachineData, user: User = Depends(check_user_exists) +) -> Machine: + await _assert_wallet_owned_by(data.wallet_id, user.id) + return await create_machine(user.id, data) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/machines", response_model=list[Machine] +) +async def api_list_machines( + user: User = Depends(check_user_exists), +) -> list[Machine]: + return await get_machines_for_operator(user.id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/machines/{machine_id}", response_model=Machine +) +async def api_get_machine( + machine_id: str, user: User = Depends(check_user_exists) +) -> Machine: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return machine + + +@satmachineadmin_api_router.put( + "/api/v1/dca/machines/{machine_id}", response_model=Machine +) +async def api_update_machine( + machine_id: str, + data: UpdateMachineData, + user: User = Depends(check_user_exists), +) -> Machine: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + if data.wallet_id is not None: + await _assert_wallet_owned_by(data.wallet_id, user.id) + updated = await update_machine(machine_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return updated + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/machines/{machine_id}", status_code=HTTPStatus.NO_CONTENT +) +async def api_delete_machine( + machine_id: str, user: User = Depends(check_user_exists) +) -> None: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + await delete_machine(machine_id) + + +# ============================================================================= +# DCA Clients (LPs) — scoped per (machine, user). +# ============================================================================= + + +async def _machine_owned_by(machine_id: str, user_id: str) -> Machine: + """Lookup-with-ownership guard. 404 (not 403) so operators can't probe + for other operators' machines.""" + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user_id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return machine + + +async def _client_owned_by(client_id: str, user_id: str) -> DcaClient: + """Lookup-with-ownership guard for an LP record; ownership is checked + transitively via the client's machine. 404 if either doesn't match.""" + client = await get_dca_client(client_id) + if client is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + machine = await get_machine(client.machine_id) + if machine is None or machine.operator_user_id != user_id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + return client + + +@satmachineadmin_api_router.post( + "/api/v1/dca/clients", response_model=DcaClient +) +async def api_create_client( + data: CreateDcaClientData, user: User = Depends(check_user_exists) +) -> DcaClient: + # Operator can only register LPs on machines they own. + await _machine_owned_by(data.machine_id, user.id) + return await create_dca_client(data) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/clients", response_model=list[DcaClient] +) +async def api_list_clients( + machine_id: str | None = None, + user: User = Depends(check_user_exists), +) -> list[DcaClient]: + """List the operator's LPs. Without ?machine_id, returns all LPs across + the operator's fleet. With ?machine_id, scoped to that machine (with + ownership check).""" + if machine_id is None: + return await get_dca_clients_for_operator(user.id) + await _machine_owned_by(machine_id, user.id) + return await get_dca_clients_for_machine(machine_id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/clients/{client_id}", response_model=DcaClient +) +async def api_get_client( + client_id: str, user: User = Depends(check_user_exists) +) -> DcaClient: + return await _client_owned_by(client_id, user.id) + + +@satmachineadmin_api_router.put( + "/api/v1/dca/clients/{client_id}", response_model=DcaClient +) +async def api_update_client( + client_id: str, + data: UpdateDcaClientData, + user: User = Depends(check_user_exists), +) -> DcaClient: + await _client_owned_by(client_id, user.id) + updated = await update_dca_client(client_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + return updated + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/clients/{client_id}", status_code=HTTPStatus.NO_CONTENT +) +async def api_delete_client( + client_id: str, user: User = Depends(check_user_exists) +) -> None: + await _client_owned_by(client_id, user.id) + await delete_dca_client(client_id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/clients/{client_id}/balance", + response_model=ClientBalanceSummary, +) +async def api_get_client_balance( + client_id: str, user: User = Depends(check_user_exists) +) -> ClientBalanceSummary: + await _client_owned_by(client_id, user.id) + summary = await get_client_balance_summary(client_id) + if summary is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found") + return summary + + +@satmachineadmin_api_router.post( + "/api/v1/dca/clients/{client_id}/settle", response_model=DcaPayment +) +async def api_settle_client_balance( + client_id: str, + data: SettleBalanceData, + user: User = Depends(check_user_exists), +) -> DcaPayment: + """Operator UX — closes satmachineadmin#4. + + Settle an LP's remaining fiat balance from the operator's chosen funding + wallet at the specified exchange rate. The amount_fiat is capped at the + LP's remaining balance; if omitted, settles the full remaining. + + Use case: avoid the Zeno's-paradox of vanishing tiny shares for small + remaining balances. Operator hits 'Settle' on the LP, gets to specify + the rate, and the system pays out the rest in sats from their wallet. + """ + client = await _client_owned_by(client_id, user.id) + machine = await _machine_owned_by(client.machine_id, user.id) + # Verify the operator owns the funding wallet. + funding_wallet = await get_wallet(data.funding_wallet_id) + if funding_wallet is None or funding_wallet.user != user.id: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "funding_wallet_id is not owned by the authenticated operator", + ) + try: + return await settle_lp_balance(client, machine, data) + except ValueError as exc: + raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc + + +# ============================================================================= +# Deposits — operator records fiat handed in by an LP at a machine. +# ============================================================================= + + +async def _deposit_owned_by(deposit_id: str, user_id: str) -> DcaDeposit: + deposit = await get_deposit(deposit_id) + if deposit is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + machine = await get_machine(deposit.machine_id) + if machine is None or machine.operator_user_id != user_id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + return deposit + + +@satmachineadmin_api_router.post( + "/api/v1/dca/deposits", response_model=DcaDeposit +) +async def api_create_deposit( + data: CreateDepositData, user: User = Depends(check_user_exists) +) -> DcaDeposit: + # Verify the (client_id, machine_id) pair belongs to the operator. + client = await _client_owned_by(data.client_id, user.id) + if client.machine_id != data.machine_id: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "client_id and machine_id refer to different machines", + ) + return await create_deposit(user.id, data) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/deposits", response_model=list[DcaDeposit] +) +async def api_list_deposits( + client_id: str | None = None, + user: User = Depends(check_user_exists), +) -> list[DcaDeposit]: + """Operator's deposits across all their machines; ?client_id scopes to + a single LP (with ownership check).""" + if client_id is not None: + await _client_owned_by(client_id, user.id) + return await get_deposits_for_client(client_id) + return await get_deposits_for_operator(user.id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit +) +async def api_get_deposit( + deposit_id: str, user: User = Depends(check_user_exists) +) -> DcaDeposit: + return await _deposit_owned_by(deposit_id, user.id) + + +@satmachineadmin_api_router.put( + "/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit +) +async def api_update_deposit( + deposit_id: str, + data: UpdateDepositData, + user: User = Depends(check_user_exists), +) -> DcaDeposit: + existing = await _deposit_owned_by(deposit_id, user.id) + if existing.status != "pending": + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "Only pending deposits can be edited", + ) + updated = await update_deposit(deposit_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + return updated + + +@satmachineadmin_api_router.put( + "/api/v1/dca/deposits/{deposit_id}/status", response_model=DcaDeposit +) +async def api_update_deposit_status( + deposit_id: str, + data: UpdateDepositStatusData, + user: User = Depends(check_user_exists), +) -> DcaDeposit: + await _deposit_owned_by(deposit_id, user.id) + updated = await update_deposit_status(deposit_id, data) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found") + return updated + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/deposits/{deposit_id}", status_code=HTTPStatus.NO_CONTENT +) +async def api_delete_deposit( + deposit_id: str, user: User = Depends(check_user_exists) +) -> None: + existing = await _deposit_owned_by(deposit_id, user.id) + if existing.status != "pending": + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "Only pending deposits can be deleted", + ) + await delete_deposit(deposit_id) + + +# ============================================================================= +# Settlements (read-only at this phase; landing happens in tasks.py) +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/settlements", response_model=list[DcaSettlement] +) +async def api_list_settlements( + user: User = Depends(check_user_exists), +) -> list[DcaSettlement]: + return await get_settlements_for_operator(user.id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/machines/{machine_id}/settlements", + response_model=list[DcaSettlement], +) +async def api_list_settlements_for_machine( + machine_id: str, user: User = Depends(check_user_exists) +) -> list[DcaSettlement]: + machine = await get_machine(machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + return await get_settlements_for_machine(machine_id) + + +@satmachineadmin_api_router.get( + "/api/v1/dca/settlements/{settlement_id}", response_model=DcaSettlement +) +async def api_get_settlement( + settlement_id: str, user: User = Depends(check_user_exists) +) -> DcaSettlement: + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + return settlement + + +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/partial-dispense", + response_model=DcaSettlement, +) +async def api_partial_dispense( + settlement_id: str, + data: PartialDispenseData, + user: User = Depends(check_user_exists), +) -> DcaSettlement: + """Operator UX — resolves satmachineadmin#3. + + Recompute the split for a settlement that didn't dispense the full + amount (jam, mid-tx error). Provide one of dispensed_fraction (0..1) + or dispensed_sats. Optionally include a reason in notes. + + Refuses when any leg has already completed — Lightning payments can't + be clawed back. Use balance settlement (P3e) for those cases. + """ + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + if (data.dispensed_fraction is None) == (data.dispensed_sats is None): + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "Provide exactly one of dispensed_fraction or dispensed_sats", + ) + try: + return await apply_partial_dispense_and_redistribute(settlement_id, data) + except ValueError as exc: + raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc + + +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/retry", + response_model=DcaSettlement, +) +async def api_retry_settlement( + settlement_id: str, user: User = Depends(check_user_exists) +) -> DcaSettlement: + """Operator retry path for an errored settlement. + + Voids any failed legs (completed legs are NEVER re-paid — Lightning + sats already moved) and flips status 'errored' → 'pending', then + re-invokes process_settlement. The optimistic-lock claim guards + against a concurrent listener re-fire racing this retry.""" + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + if settlement.status != "errored": + raise HTTPException( + HTTPStatus.BAD_REQUEST, + f"settlement status must be 'errored' to retry " + f"(currently '{settlement.status}')", + ) + updated = await reset_settlement_for_retry(settlement_id) + if updated is None or updated.status != "pending": + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, "failed to reset settlement" + ) + await process_settlement(settlement_id) + after = await get_settlement(settlement_id) + return after if after is not None else updated + + +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/notes", + response_model=DcaSettlement, +) +async def api_append_settlement_note( + settlement_id: str, + data: AppendSettlementNoteData, + user: User = Depends(check_user_exists), +) -> DcaSettlement: + """Operator appends a free-form note to the settlement. Useful for cash- + drawer reconciliation context, off-LN refund records, or any narrative + an operator wants to attach. Each entry is timestamped (UTC) and tagged + with the author's user id; existing entries are never modified. + + For richer queryable audit (filter by author, time range, action type), + see aiolabs/satmachineadmin (future audit-table feature).""" + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + updated = await append_settlement_note(settlement_id, data.note, user.id) + if updated is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + return updated + + +# ============================================================================= +# Payments (read-only — the leg-typed breakdown of distributions) +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/payments", response_model=list[DcaPayment] +) +async def api_list_payments( + leg_type: str | None = None, + user: User = Depends(check_user_exists), +) -> list[DcaPayment]: + return await get_payments_for_operator(user.id, leg_type=leg_type) + + +# ============================================================================= +# Commission splits — operator's rules for distributing the commission +# remainder (post-super-fee). Sum-to-1.0 invariant enforced at the model +# boundary by SetCommissionSplitsData. +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/commission-splits", response_model=list[CommissionSplit] +) +async def api_get_commission_splits( + machine_id: str | None = None, + effective: bool = False, + user: User = Depends(check_user_exists), +) -> list[CommissionSplit]: + """No machine_id: operator's default ruleset (rows where machine_id IS NULL). + With machine_id: per-machine override only (404 the machine if not yours). + With machine_id and ?effective=true: per-machine override if set, else + operator default — what the settlement processor actually applies.""" + if machine_id is not None: + await _machine_owned_by(machine_id, user.id) + if effective: + return await get_effective_commission_splits(user.id, machine_id) + return await get_commission_splits(user.id, machine_id) + return await get_commission_splits(user.id, None) + + +@satmachineadmin_api_router.put( + "/api/v1/dca/commission-splits", response_model=list[CommissionSplit] +) +async def api_replace_commission_splits( + data: SetCommissionSplitsData, + user: User = Depends(check_user_exists), +) -> list[CommissionSplit]: + """Atomic replace for the (operator, machine) scope. If + data.machine_id is None, replaces the operator's default ruleset; + otherwise replaces the per-machine override (machine must be owned). + Sum-to-1.0 invariant enforced upstream by the Pydantic validator.""" + if data.machine_id is not None: + await _machine_owned_by(data.machine_id, user.id) + return await replace_commission_splits(user.id, data.machine_id, data.legs) + + +@satmachineadmin_api_router.delete( + "/api/v1/dca/commission-splits", + status_code=HTTPStatus.NO_CONTENT, +) +async def api_delete_commission_splits( + machine_id: str | None = None, + user: User = Depends(check_user_exists), +) -> None: + """Clear a ruleset. With machine_id: clears the per-machine override + (machine falls back to operator default). Without: clears the operator + default (any per-machine overrides keep applying).""" + if machine_id is not None: + await _machine_owned_by(machine_id, user.id) + # Atomic replace with an empty leg list — same effect as DELETE WHERE. + await replace_commission_splits(user.id, machine_id, []) + + +# ============================================================================= +# Super config — operators read; super (LNbits instance admin) writes. +# ============================================================================= + + +@satmachineadmin_api_router.get( + "/api/v1/dca/super-config", response_model=SuperConfig +) +async def api_get_super_config( + _user: User = Depends(check_user_exists), +) -> SuperConfig: + """Returns the platform-fee config so operators can display it as a + read-only line item in their UI. The fee is set by the LNbits super + instance-wide; operators see it but can't change it.""" + config = await get_super_config() + if config is None: + raise HTTPException( + HTTPStatus.NOT_FOUND, "Super config not initialised" + ) + return config + + +@satmachineadmin_api_router.put( + "/api/v1/dca/super-config", response_model=SuperConfig +) +async def api_update_super_config( + data: UpdateSuperConfigData, + _user: User = Depends(check_super_user), +) -> SuperConfig: + """Super-only: set the platform fee % charged on every operator's + commission, plus the destination wallet for collecting it. The fee is + enforced before the operator's own commission_splits ruleset fires + (see distribution.process_settlement).""" + config = await update_super_config(data) + if config is None: + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config" + ) + return config + + +# ============================================================================= +# Catch-all stub for endpoints not yet implemented (clients, deposits, +# commission splits, partial-tx, balance-settle, super-config write). Each +# lands in a follow-up commit. The catch-all comes LAST so specific routes +# above take precedence. +# ============================================================================= + + @satmachineadmin_api_router.api_route( "/api/v1/dca/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"], @@ -22,7 +663,6 @@ satmachineadmin_api_router = APIRouter() async def v2_in_progress_stub(full_path: str) -> None: raise HTTPException( HTTPStatus.SERVICE_UNAVAILABLE, - f"satmachineadmin v2 API not yet implemented (path: /{full_path}). " - "The v1 Lamassu surface has been removed; per-operator endpoints " - "land in P1. See plan.", + f"satmachineadmin v2: /api/v1/dca/{full_path} not yet implemented " + "(landing in P2+). See aiolabs/satmachineadmin#9.", )