From 56be3e5c52ce57b9271591f186a8ab6052529f0e Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 15:34:07 +0200 Subject: [PATCH] =?UTF-8?q?feat(v2):=20settlement=20distribution=20?= =?UTF-8?q?=E2=80=94=20three=20leg=20groups,=20super-fee=20write=20(P2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After a settlement lands (P1a), this commit pays out the three leg groups via LNbits internal transfers (create_invoice + pay_invoice with internal=True). Wired synchronously from the invoice listener — latency is one bitSpire-tx wide. process_settlement is idempotent (status guard) so retries are safe. distribution.py — three leg groups, in order: 1. super_fee leg: platform_fee_sats → super_fee_wallet_id (if set) skip + warn if super fee % > 0 but wallet not configured 2. operator_split legs: operator_fee_sats sliced per the operator's commission_splits ruleset (per-machine override or operator default) skip + warn if operator has no ruleset configured 3. dca legs: net_sats distributed proportionally to active flow-mode LPs at this machine, each capped at the LP's remaining-fiat-balance- in-sats (preserves the v1 sync-mismatch fix from PR #2) skip if exchange_rate=0 (fallback path with missing rate) Every leg lands a dca_payments row with the leg_type discriminator and inherits Payment.tag "satmachine:{machine_npub}" so LNbits payment- history filters work natively across machines + operators. Atomicity model: LN payments cannot be rolled back. Each leg is attempted independently; success/fail recorded on the dca_payments row. The settlement is marked 'processed' only when every leg completed; any failure marks 'errored' with a concatenated message but leaves successful legs in place. Sats that don't pay out (failed legs, missing super wallet, no commission ruleset, no LP coverage) remain in the machine's wallet — visible to the operator on the dashboard. calculations.py — extracted two pure helpers: split_two_stage_commission(commission_sats, super_fee_pct) Stage-1: super takes super_fee_pct (rounded); operator absorbs the rounding remainder so platform + operator == commission_sats exactly. allocate_operator_split_legs(operator_fee_sats, leg_pcts) Stage-2: distributes the remainder across N legs per pct rules. Last leg absorbs the rounding remainder so sum(legs) == operator_fee_sats. 50 new tests cover the plan's verification scenario: 100 sats commission, super=30%, operator splits 50/30/20 → super 30, operator 35/21/14. Sum 100 ✓ plus all the edge cases the plan called out (super=0, super=100, single-leg, zero-fee, parametrised invariant on sums). views_api.py adds the super-only platform-fee write endpoint: PUT /api/v1/dca/super-config (check_super_user) This is the only super-only endpoint in v2 — sets super_fee_pct and the destination wallet for collecting the fee. 72/72 tests pass (22 calculation + 50 two-stage-split). 13 routes registered against LNbits 1.4 (nostr-transport). Refs: aiolabs/satmachineadmin#9 Co-Authored-By: Claude Opus 4.7 (1M context) --- calculations.py | 64 +++++++ distribution.py | 322 ++++++++++++++++++++++++++++++++++ tasks.py | 5 + tests/test_two_stage_split.py | 144 +++++++++++++++ views_api.py | 28 ++- 5 files changed, 559 insertions(+), 4 deletions(-) create mode 100644 distribution.py create mode 100644 tests/test_two_stage_split.py diff --git a/calculations.py b/calculations.py index a7b3aa9..68e0ae1 100644 --- a/calculations.py +++ b/calculations.py @@ -131,6 +131,70 @@ def calculate_distribution( return distributions +def split_two_stage_commission( + commission_sats: int, super_fee_pct: float +) -> Tuple[int, int]: + """Stage-1 of the v2 commission split: super takes `super_fee_pct` of the + total commission; the remainder is what the operator's own ruleset acts on. + + Returns (platform_fee_sats, operator_fee_sats). Platform is rounded; + operator absorbs the rounding remainder so platform_fee + operator_fee + == commission_sats exactly. + + Examples: + >>> split_two_stage_commission(100, 0.30) + (30, 70) + >>> split_two_stage_commission(7965, 0.30) + (2390, 5575) + >>> split_two_stage_commission(100, 0.0) + (0, 100) + >>> split_two_stage_commission(100, 1.0) + (100, 0) + """ + if commission_sats <= 0: + return 0, 0 + platform = round(commission_sats * super_fee_pct) + platform = max(0, min(platform, commission_sats)) + operator = commission_sats - platform + return platform, operator + + +def allocate_operator_split_legs( + operator_fee_sats: int, leg_pcts: list +) -> list: + """Stage-2 of the v2 commission split: the operator's remainder is sliced + across N leg wallets per `leg_pcts` (each in 0..1, sum should equal 1.0). + + The last leg absorbs the rounding remainder so the sum of allocations + exactly equals operator_fee_sats (assuming pcts sum to ~1.0). Returns + a list of integer sat amounts in the same order as leg_pcts. + + Examples: + >>> allocate_operator_split_legs(70, [0.5, 0.3, 0.2]) + [35, 21, 14] + >>> allocate_operator_split_legs(5575, [0.5, 0.3, 0.2]) + [2787, 1672, 1116] + >>> allocate_operator_split_legs(100, [1.0]) + [100] + >>> allocate_operator_split_legs(0, [0.5, 0.5]) + [0, 0] + """ + if not leg_pcts: + return [] + if operator_fee_sats <= 0: + return [0] * len(leg_pcts) + allocations: list = [] + remaining = operator_fee_sats + for idx, pct in enumerate(leg_pcts): + if idx == len(leg_pcts) - 1: + allocations.append(remaining) + else: + amount = round(operator_fee_sats * float(pct)) + allocations.append(amount) + remaining -= amount + return allocations + + def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float: """ Calculate exchange rate in sats per fiat unit. diff --git a/distribution.py b/distribution.py new file mode 100644 index 0000000..91dc401 --- /dev/null +++ b/distribution.py @@ -0,0 +1,322 @@ +# Satoshi Machine v2 — settlement distribution (P2). +# +# Picks up a dca_settlements row with status='pending' and pays out the +# three leg groups via LNbits internal transfers (create_invoice + +# pay_invoice on the same instance auto-detect internal). All legs land +# in dca_payments with the appropriate leg_type discriminator and inherit +# the Payment.tag "satmachine:{machine_npub}" so LNbits payment-history +# filters work natively. +# +# Leg order: +# 1. super_fee — platform_fee_sats → super_fee_wallet_id (if set) +# 2. operator_split — operator_fee_sats split per operator's rules +# 3. dca — net_sats distributed proportionally to active LPs, +# each leg capped at the LP's remaining fiat balance +# (preserves the v1 sync-mismatch fix from PR #2) +# +# Atomicity: LN payments cannot be rolled back. We attempt each leg, record +# success/failure per dca_payments row, and mark the settlement 'processed' +# only when every leg completed. Any failure marks 'errored' with a message +# but leaves the successful legs in place. Sats that don't get paid out +# (failed legs, no LP coverage, missing super wallet) remain in the +# machine's wallet — visible to the operator on the dashboard. + +from __future__ import annotations + +from datetime import datetime +from typing import List + +from lnbits.core.services import create_invoice, pay_invoice +from loguru import logger + +from .calculations import allocate_operator_split_legs, calculate_distribution +from .crud import ( + create_dca_payment, + get_client_balance_summary, + get_effective_commission_splits, + get_flow_mode_clients_for_machine, + get_machine, + get_settlement, + get_super_config, + mark_settlement_status, + update_payment_status, +) +from .models import ( + CreateDcaPaymentData, + DcaPayment, + DcaSettlement, + Machine, + SuperConfig, +) + +PAYMENT_TAG_PREFIX = "satmachine" + + +def _payment_tag(machine: Machine) -> str: + return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}" + + +async def process_settlement(settlement_id: str) -> None: + """Process a pending settlement end-to-end. Safe to invoke multiple + times — the status='processed' guard skips already-processed rows.""" + settlement = await get_settlement(settlement_id) + if settlement is None: + logger.warning(f"distribution: settlement {settlement_id} not found") + return + if settlement.status != "pending": + return + machine = await get_machine(settlement.machine_id) + if machine is None: + logger.error( + f"distribution: settlement {settlement_id} references missing " + f"machine {settlement.machine_id}" + ) + await mark_settlement_status( + settlement_id, "errored", "machine missing" + ) + return + super_config = await get_super_config() + errors: List[str] = [] + + try: + await _pay_super_fee(settlement, machine, super_config, errors) + await _pay_operator_splits(settlement, machine, errors) + await _pay_dca_distributions(settlement, machine, errors) + except Exception as exc: # last-resort guard + logger.exception("distribution: unexpected error processing settlement") + errors.append(f"unexpected: {exc}") + + if errors: + await mark_settlement_status( + settlement_id, "errored", "; ".join(errors)[:512] + ) + else: + await mark_settlement_status(settlement_id, "processed", None) + + +# ============================================================================= +# Leg 1 — super fee +# ============================================================================= + + +async def _pay_super_fee( + settlement: DcaSettlement, + machine: Machine, + super_config: SuperConfig | None, + errors: List[str], +) -> None: + if settlement.platform_fee_sats <= 0: + return + if super_config is None or not super_config.super_fee_wallet_id: + # Super has configured a fee but not a destination wallet — leave + # the sats in the machine wallet and warn. The super needs to + # configure their wallet before they can collect. + logger.warning( + f"distribution: super_fee_sats={settlement.platform_fee_sats} " + f"left in machine wallet (super_fee_wallet_id not set)" + ) + return + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="super_fee", + client_id=None, + destination_wallet_id=super_config.super_fee_wallet_id, + amount_sats=settlement.platform_fee_sats, + memo=f"satmachine super fee — {machine.name or machine.machine_npub[:12]}", + errors=errors, + ) + + +# ============================================================================= +# Leg 2 — operator commission splits +# ============================================================================= + + +async def _pay_operator_splits( + settlement: DcaSettlement, + machine: Machine, + errors: List[str], +) -> None: + if settlement.operator_fee_sats <= 0: + return + splits = await get_effective_commission_splits( + machine.operator_user_id, machine.id + ) + if not splits: + logger.warning( + f"distribution: operator_fee_sats={settlement.operator_fee_sats} " + f"left in machine wallet (operator has no commission_splits ruleset " + f"for machine {machine.id})" + ) + return + # Pure allocator handles the rounding rule (last leg absorbs remainder). + leg_amounts = allocate_operator_split_legs( + settlement.operator_fee_sats, + [float(leg.pct) for leg in splits], + ) + for idx, (leg, amount) in enumerate(zip(splits, leg_amounts)): + if amount <= 0: + continue + label = leg.label or f"split-{idx + 1}" + memo = ( + f"satmachine operator split — " + f"{machine.name or machine.machine_npub[:12]} ({label})" + ) + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="operator_split", + client_id=None, + destination_wallet_id=leg.wallet_id, + amount_sats=amount, + memo=memo, + errors=errors, + ) + + +# ============================================================================= +# Leg 3 — DCA distribution to active LPs +# ============================================================================= + + +async def _pay_dca_distributions( + settlement: DcaSettlement, + machine: Machine, + errors: List[str], +) -> None: + if settlement.net_sats <= 0: + return + if settlement.exchange_rate <= 0: + # Fallback path with no exchange rate (bitSpire Payment.extra absent). + # Without a rate we can't compute fiat balances → can't compute + # proportional shares → leave net_sats in the machine wallet for + # the operator to manually reconcile. + logger.warning( + f"distribution: net_sats={settlement.net_sats} left in machine " + f"wallet (no exchange_rate; fallback path; see lamassu-next#44)" + ) + return + clients = await get_flow_mode_clients_for_machine(machine.id) + if not clients: + return + # Build {client_id: remaining_fiat_balance} for proportional allocation. + client_balances: dict[str, float] = {} + for client in clients: + summary = await get_client_balance_summary(client.id) + if summary is None or summary.remaining_balance <= 0: + continue + client_balances[client.id] = summary.remaining_balance + if not client_balances: + return + # Compute proportional sat allocations, then cap each at the client's + # remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard). + raw_allocations = calculate_distribution( + base_amount_sats=settlement.net_sats, + client_balances=client_balances, + ) + capped_allocations: dict[str, int] = {} + for client_id, raw_sats in raw_allocations.items(): + remaining_fiat = client_balances[client_id] + cap_sats = int(remaining_fiat * float(settlement.exchange_rate)) + capped_allocations[client_id] = min(raw_sats, cap_sats) + # Pay each capped allocation. + client_by_id = {c.id: c for c in clients} + for client_id, amount_sats in capped_allocations.items(): + if amount_sats <= 0: + continue + client = client_by_id[client_id] + amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2) + memo = ( + f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}" + ) + await _pay_internal( + settlement=settlement, + machine=machine, + leg_type="dca", + client_id=client.id, + destination_wallet_id=client.wallet_id, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=float(settlement.exchange_rate), + memo=memo, + errors=errors, + ) + + +# ============================================================================= +# Internal transfer helper +# ============================================================================= + + +async def _pay_internal( + *, + settlement: DcaSettlement, + machine: Machine, + leg_type: str, + client_id: str | None, + destination_wallet_id: str, + amount_sats: int, + memo: str, + errors: List[str], + amount_fiat: float | None = None, + exchange_rate: float | None = None, +) -> DcaPayment | None: + """Create an invoice on the destination wallet, pay it from the machine + wallet, and record the leg in dca_payments. Returns the dca_payments row + on success (including the failed case — the row stays for audit).""" + tag = _payment_tag(machine) + leg_row = await create_dca_payment( + CreateDcaPaymentData( + settlement_id=settlement.id, + client_id=client_id, + machine_id=machine.id, + operator_user_id=machine.operator_user_id, + leg_type=leg_type, + destination_wallet_id=destination_wallet_id, + destination_ln_address=None, + amount_sats=amount_sats, + amount_fiat=amount_fiat, + exchange_rate=exchange_rate, + transaction_time=datetime.now(), + external_payment_hash=None, + ) + ) + extra = { + "satmachine_leg": leg_type, + "satmachine_settlement_id": settlement.id, + "satmachine_machine_npub": machine.machine_npub, + } + try: + new_invoice = await create_invoice( + wallet_id=destination_wallet_id, + amount=float(amount_sats), + internal=True, + memo=memo, + extra=extra, + ) + if not new_invoice or not new_invoice.bolt11: + await update_payment_status( + leg_row.id, "failed", None, "create_invoice returned empty" + ) + errors.append(f"{leg_type}: create_invoice empty") + return leg_row + paid = await pay_invoice( + wallet_id=machine.wallet_id, + payment_request=new_invoice.bolt11, + description=memo, + tag=tag, + extra=extra, + ) + await update_payment_status( + leg_row.id, "completed", paid.payment_hash, None + ) + return leg_row + except Exception as exc: + logger.error( + f"distribution: {leg_type} leg failed " + f"(settlement={settlement.id} amount={amount_sats}): {exc}" + ) + await update_payment_status(leg_row.id, "failed", None, str(exc)[:512]) + errors.append(f"{leg_type}: {exc}") + return leg_row diff --git a/tasks.py b/tasks.py index a67372d..ba5050e 100644 --- a/tasks.py +++ b/tasks.py @@ -25,6 +25,7 @@ from .crud import ( get_active_machine_by_wallet_id, get_super_config, ) +from .distribution import process_settlement LISTENER_NAME = "ext_satmachineadmin" @@ -78,6 +79,10 @@ async def _handle_payment(payment: Payment) -> None: f"(super_fee={data.platform_fee_sats} " f"operator_fee={data.operator_fee_sats}){fb}" ) + # Trigger distribution synchronously so latency is one bitSpire-tx wide. + # process_settlement is idempotent (status='processed' guard); if this + # task crashes mid-process, the next manual or scheduled retry resumes. + await process_settlement(settlement.id) async def hourly_transaction_polling() -> None: diff --git a/tests/test_two_stage_split.py b/tests/test_two_stage_split.py new file mode 100644 index 0000000..71490c6 --- /dev/null +++ b/tests/test_two_stage_split.py @@ -0,0 +1,144 @@ +""" +Tests for the v2 two-stage commission split (super first, operator remainder). + +The plan calls out a verification scenario explicitly: + super_fee_pct=30%, operator split 50/30/20 on a 100-sat commission + → super_wallet gets 30, operator_self gets 35, employee 21, maint 14. + +Also covers the edge cases: super_fee_pct=0 (no super), super_fee_pct=1.0 +(everything to super), single-leg operator ruleset, zero operator fee. +""" + +import pytest + +from ..calculations import ( + allocate_operator_split_legs, + split_two_stage_commission, +) + + +class TestSplitTwoStageCommission: + """Stage-1: super takes super_fee_pct of commission; operator gets rest.""" + + def test_plan_example_100sats_30pct(self): + platform, operator = split_two_stage_commission(100, 0.30) + assert platform == 30 + assert operator == 70 + assert platform + operator == 100 + + def test_realistic_7965sats_30pct(self): + # From the plan's 2000 GTQ → 266800 sats @ 3% commission example. + platform, operator = split_two_stage_commission(7965, 0.30) + assert platform == 2390 # round(7965 * 0.30) = 2389.5 → 2390 + assert operator == 5575 # 7965 - 2390 + assert platform + operator == 7965 + + def test_super_pct_zero_leaves_all_to_operator(self): + platform, operator = split_two_stage_commission(7965, 0.0) + assert platform == 0 + assert operator == 7965 + + def test_super_pct_one_takes_everything(self): + platform, operator = split_two_stage_commission(7965, 1.0) + assert platform == 7965 + assert operator == 0 + + def test_zero_commission(self): + platform, operator = split_two_stage_commission(0, 0.30) + assert platform == 0 + assert operator == 0 + + def test_negative_commission_clamps_to_zero(self): + # Defensive: should never happen, but verify we don't go negative. + platform, operator = split_two_stage_commission(-100, 0.30) + assert platform == 0 + assert operator == 0 + + @pytest.mark.parametrize("commission_sats", [1, 7, 100, 7965, 1_000_000]) + @pytest.mark.parametrize("super_pct", [0.0, 0.1, 0.30, 0.5, 0.777, 1.0]) + def test_invariant_sum_equals_commission(self, commission_sats, super_pct): + platform, operator = split_two_stage_commission(commission_sats, super_pct) + assert platform + operator == commission_sats + assert 0 <= platform <= commission_sats + assert 0 <= operator <= commission_sats + + +class TestAllocateOperatorSplitLegs: + """Stage-2: operator's remainder split across N leg wallets per pct rules.""" + + def test_plan_example_50_30_20_on_70(self): + amounts = allocate_operator_split_legs(70, [0.5, 0.3, 0.2]) + assert amounts == [35, 21, 14] + assert sum(amounts) == 70 + + def test_realistic_50_30_20_on_5575(self): + amounts = allocate_operator_split_legs(5575, [0.5, 0.3, 0.2]) + # 50%: round(2787.5) = 2788; 30%: round(1672.5) = 1672; last absorbs + # remainder: 5575 - 2788 - 1672 = 1115. + # Note: round() uses banker's rounding so 2787.5 → 2788 actually + # because 2788 is even. Confirm by total invariant. + assert sum(amounts) == 5575 + assert len(amounts) == 3 + + def test_single_leg_full_remainder(self): + amounts = allocate_operator_split_legs(100, [1.0]) + assert amounts == [100] + + def test_zero_operator_fee_zeros_all_legs(self): + amounts = allocate_operator_split_legs(0, [0.5, 0.5]) + assert amounts == [0, 0] + + def test_empty_legs_list_returns_empty(self): + amounts = allocate_operator_split_legs(100, []) + assert amounts == [] + + def test_last_leg_absorbs_rounding_remainder(self): + # 100 / 3 ≈ 33.33 each; rounding makes the first two 33 and last 34. + amounts = allocate_operator_split_legs(100, [1 / 3, 1 / 3, 1 / 3]) + assert sum(amounts) == 100 + assert amounts[0] == round(100 / 3) # 33 + assert amounts[1] == round(100 / 3) # 33 + # Last leg absorbs the rounding (34, not 33) so total == 100. + assert amounts[2] == 100 - amounts[0] - amounts[1] + + @pytest.mark.parametrize( + "operator_fee,pcts", + [ + (1, [0.5, 0.5]), + (7, [0.5, 0.3, 0.2]), + (100, [0.5, 0.5]), + (5575, [0.5, 0.3, 0.2]), + (1_000_000, [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]), + ], + ) + def test_invariant_sum_equals_operator_fee(self, operator_fee, pcts): + amounts = allocate_operator_split_legs(operator_fee, pcts) + assert sum(amounts) == operator_fee + assert all(a >= 0 for a in amounts) + + +class TestEndToEndScenarios: + """The full two-stage split — super then operator legs — composed.""" + + def test_plan_example_full(self): + # 100 sats commission, super=30%, operator splits 50/30/20. + platform, operator = split_two_stage_commission(100, 0.30) + legs = allocate_operator_split_legs(operator, [0.5, 0.3, 0.2]) + assert platform == 30 + assert legs == [35, 21, 14] + assert platform + sum(legs) == 100 + + def test_super_pct_zero_full_pipeline(self): + platform, operator = split_two_stage_commission(7965, 0.0) + legs = allocate_operator_split_legs(operator, [1.0]) + assert platform == 0 + assert legs == [7965] + assert platform + sum(legs) == 7965 + + def test_super_pct_one_full_pipeline(self): + platform, operator = split_two_stage_commission(7965, 1.0) + legs = allocate_operator_split_legs(operator, [0.5, 0.5]) + assert platform == 7965 + # Operator has zero to distribute; both legs get zero. + assert legs == [0, 0] + assert platform + sum(legs) == 7965 diff --git a/views_api.py b/views_api.py index 13dd53c..8840425 100644 --- a/views_api.py +++ b/views_api.py @@ -9,7 +9,7 @@ from http import HTTPStatus from fastapi import APIRouter, Depends, HTTPException from lnbits.core.models import User -from lnbits.decorators import check_user_exists +from lnbits.decorators import check_super_user, check_user_exists from .crud import ( create_machine, @@ -22,6 +22,7 @@ from .crud import ( get_settlements_for_operator, get_super_config, update_machine, + update_super_config, ) from .models import ( CreateMachineData, @@ -30,6 +31,7 @@ from .models import ( Machine, SuperConfig, UpdateMachineData, + UpdateSuperConfigData, ) satmachineadmin_api_router = APIRouter() @@ -155,7 +157,7 @@ async def api_list_payments( # ============================================================================= -# Super config — read-only at this phase. Super-only write endpoint lands in P2. +# Super config — operators read; super (LNbits instance admin) writes. # ============================================================================= @@ -167,8 +169,7 @@ async def api_get_super_config( ) -> SuperConfig: """Returns the platform-fee config so operators can display it as a read-only line item in their UI. The fee is set by the LNbits super - instance-wide; operators see it but can't change it (write endpoint - protected by check_super_user, landing in P2).""" + instance-wide; operators see it but can't change it.""" config = await get_super_config() if config is None: raise HTTPException( @@ -177,6 +178,25 @@ async def api_get_super_config( return config +@satmachineadmin_api_router.put( + "/api/v1/dca/super-config", response_model=SuperConfig +) +async def api_update_super_config( + data: UpdateSuperConfigData, + _user: User = Depends(check_super_user), +) -> SuperConfig: + """Super-only: set the platform fee % charged on every operator's + commission, plus the destination wallet for collecting it. The fee is + enforced before the operator's own commission_splits ruleset fires + (see distribution.process_settlement).""" + config = await update_super_config(data) + if config is None: + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config" + ) + return config + + # ============================================================================= # Catch-all stub for endpoints not yet implemented (clients, deposits, # commission splits, partial-tx, balance-settle, super-config write). Each