feat(v2): settlement distribution — three leg groups, super-fee write (P2)

After a settlement lands (P1a), this commit pays out the three leg
groups via LNbits internal transfers (create_invoice + pay_invoice with
internal=True). Wired synchronously from the invoice listener — latency
is one bitSpire-tx wide. process_settlement is idempotent (status guard)
so retries are safe.

distribution.py — three leg groups, in order:

  1. super_fee leg:
       platform_fee_sats → super_fee_wallet_id (if set)
       skip + warn if super fee % > 0 but wallet not configured
  2. operator_split legs:
       operator_fee_sats sliced per the operator's commission_splits
       ruleset (per-machine override or operator default)
       skip + warn if operator has no ruleset configured
  3. dca legs:
       net_sats distributed proportionally to active flow-mode LPs at
       this machine, each capped at the LP's remaining-fiat-balance-
       in-sats (preserves the v1 sync-mismatch fix from PR #2)
       skip if exchange_rate=0 (fallback path with missing rate)

Every leg lands a dca_payments row with the leg_type discriminator and
inherits Payment.tag "satmachine:{machine_npub}" so LNbits payment-
history filters work natively across machines + operators.

Atomicity model: LN payments cannot be rolled back. Each leg is
attempted independently; success/fail recorded on the dca_payments row.
The settlement is marked 'processed' only when every leg completed; any
failure marks 'errored' with a concatenated message but leaves successful
legs in place. Sats that don't pay out (failed legs, missing super
wallet, no commission ruleset, no LP coverage) remain in the machine's
wallet — visible to the operator on the dashboard.

calculations.py — extracted two pure helpers:

  split_two_stage_commission(commission_sats, super_fee_pct)
    Stage-1: super takes super_fee_pct (rounded); operator absorbs the
    rounding remainder so platform + operator == commission_sats exactly.

  allocate_operator_split_legs(operator_fee_sats, leg_pcts)
    Stage-2: distributes the remainder across N legs per pct rules. Last
    leg absorbs the rounding remainder so sum(legs) == operator_fee_sats.

50 new tests cover the plan's verification scenario:
  100 sats commission, super=30%, operator splits 50/30/20
  → super 30, operator 35/21/14. Sum 100 ✓
plus all the edge cases the plan called out (super=0, super=100,
single-leg, zero-fee, parametrised invariant on sums).

views_api.py adds the super-only platform-fee write endpoint:
  PUT /api/v1/dca/super-config  (check_super_user)

This is the only super-only endpoint in v2 — sets super_fee_pct and the
destination wallet for collecting the fee.

72/72 tests pass (22 calculation + 50 two-stage-split). 13 routes
registered against LNbits 1.4 (nostr-transport).

Refs: aiolabs/satmachineadmin#9

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-14 15:34:07 +02:00
commit 56be3e5c52
5 changed files with 559 additions and 4 deletions

View file

@ -131,6 +131,70 @@ def calculate_distribution(
return distributions return distributions
def split_two_stage_commission(
commission_sats: int, super_fee_pct: float
) -> Tuple[int, int]:
"""Stage-1 of the v2 commission split: super takes `super_fee_pct` of the
total commission; the remainder is what the operator's own ruleset acts on.
Returns (platform_fee_sats, operator_fee_sats). Platform is rounded;
operator absorbs the rounding remainder so platform_fee + operator_fee
== commission_sats exactly.
Examples:
>>> split_two_stage_commission(100, 0.30)
(30, 70)
>>> split_two_stage_commission(7965, 0.30)
(2390, 5575)
>>> split_two_stage_commission(100, 0.0)
(0, 100)
>>> split_two_stage_commission(100, 1.0)
(100, 0)
"""
if commission_sats <= 0:
return 0, 0
platform = round(commission_sats * super_fee_pct)
platform = max(0, min(platform, commission_sats))
operator = commission_sats - platform
return platform, operator
def allocate_operator_split_legs(
operator_fee_sats: int, leg_pcts: list
) -> list:
"""Stage-2 of the v2 commission split: the operator's remainder is sliced
across N leg wallets per `leg_pcts` (each in 0..1, sum should equal 1.0).
The last leg absorbs the rounding remainder so the sum of allocations
exactly equals operator_fee_sats (assuming pcts sum to ~1.0). Returns
a list of integer sat amounts in the same order as leg_pcts.
Examples:
>>> allocate_operator_split_legs(70, [0.5, 0.3, 0.2])
[35, 21, 14]
>>> allocate_operator_split_legs(5575, [0.5, 0.3, 0.2])
[2787, 1672, 1116]
>>> allocate_operator_split_legs(100, [1.0])
[100]
>>> allocate_operator_split_legs(0, [0.5, 0.5])
[0, 0]
"""
if not leg_pcts:
return []
if operator_fee_sats <= 0:
return [0] * len(leg_pcts)
allocations: list = []
remaining = operator_fee_sats
for idx, pct in enumerate(leg_pcts):
if idx == len(leg_pcts) - 1:
allocations.append(remaining)
else:
amount = round(operator_fee_sats * float(pct))
allocations.append(amount)
remaining -= amount
return allocations
def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float: def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float:
""" """
Calculate exchange rate in sats per fiat unit. Calculate exchange rate in sats per fiat unit.

322
distribution.py Normal file
View file

@ -0,0 +1,322 @@
# Satoshi Machine v2 — settlement distribution (P2).
#
# Picks up a dca_settlements row with status='pending' and pays out the
# three leg groups via LNbits internal transfers (create_invoice +
# pay_invoice on the same instance auto-detect internal). All legs land
# in dca_payments with the appropriate leg_type discriminator and inherit
# the Payment.tag "satmachine:{machine_npub}" so LNbits payment-history
# filters work natively.
#
# Leg order:
# 1. super_fee — platform_fee_sats → super_fee_wallet_id (if set)
# 2. operator_split — operator_fee_sats split per operator's rules
# 3. dca — net_sats distributed proportionally to active LPs,
# each leg capped at the LP's remaining fiat balance
# (preserves the v1 sync-mismatch fix from PR #2)
#
# Atomicity: LN payments cannot be rolled back. We attempt each leg, record
# success/failure per dca_payments row, and mark the settlement 'processed'
# only when every leg completed. Any failure marks 'errored' with a message
# but leaves the successful legs in place. Sats that don't get paid out
# (failed legs, no LP coverage, missing super wallet) remain in the
# machine's wallet — visible to the operator on the dashboard.
from __future__ import annotations
from datetime import datetime
from typing import List
from lnbits.core.services import create_invoice, pay_invoice
from loguru import logger
from .calculations import allocate_operator_split_legs, calculate_distribution
from .crud import (
create_dca_payment,
get_client_balance_summary,
get_effective_commission_splits,
get_flow_mode_clients_for_machine,
get_machine,
get_settlement,
get_super_config,
mark_settlement_status,
update_payment_status,
)
from .models import (
CreateDcaPaymentData,
DcaPayment,
DcaSettlement,
Machine,
SuperConfig,
)
PAYMENT_TAG_PREFIX = "satmachine"
def _payment_tag(machine: Machine) -> str:
return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}"
async def process_settlement(settlement_id: str) -> None:
"""Process a pending settlement end-to-end. Safe to invoke multiple
times the status='processed' guard skips already-processed rows."""
settlement = await get_settlement(settlement_id)
if settlement is None:
logger.warning(f"distribution: settlement {settlement_id} not found")
return
if settlement.status != "pending":
return
machine = await get_machine(settlement.machine_id)
if machine is None:
logger.error(
f"distribution: settlement {settlement_id} references missing "
f"machine {settlement.machine_id}"
)
await mark_settlement_status(
settlement_id, "errored", "machine missing"
)
return
super_config = await get_super_config()
errors: List[str] = []
try:
await _pay_super_fee(settlement, machine, super_config, errors)
await _pay_operator_splits(settlement, machine, errors)
await _pay_dca_distributions(settlement, machine, errors)
except Exception as exc: # last-resort guard
logger.exception("distribution: unexpected error processing settlement")
errors.append(f"unexpected: {exc}")
if errors:
await mark_settlement_status(
settlement_id, "errored", "; ".join(errors)[:512]
)
else:
await mark_settlement_status(settlement_id, "processed", None)
# =============================================================================
# Leg 1 — super fee
# =============================================================================
async def _pay_super_fee(
settlement: DcaSettlement,
machine: Machine,
super_config: SuperConfig | None,
errors: List[str],
) -> None:
if settlement.platform_fee_sats <= 0:
return
if super_config is None or not super_config.super_fee_wallet_id:
# Super has configured a fee but not a destination wallet — leave
# the sats in the machine wallet and warn. The super needs to
# configure their wallet before they can collect.
logger.warning(
f"distribution: super_fee_sats={settlement.platform_fee_sats} "
f"left in machine wallet (super_fee_wallet_id not set)"
)
return
await _pay_internal(
settlement=settlement,
machine=machine,
leg_type="super_fee",
client_id=None,
destination_wallet_id=super_config.super_fee_wallet_id,
amount_sats=settlement.platform_fee_sats,
memo=f"satmachine super fee — {machine.name or machine.machine_npub[:12]}",
errors=errors,
)
# =============================================================================
# Leg 2 — operator commission splits
# =============================================================================
async def _pay_operator_splits(
settlement: DcaSettlement,
machine: Machine,
errors: List[str],
) -> None:
if settlement.operator_fee_sats <= 0:
return
splits = await get_effective_commission_splits(
machine.operator_user_id, machine.id
)
if not splits:
logger.warning(
f"distribution: operator_fee_sats={settlement.operator_fee_sats} "
f"left in machine wallet (operator has no commission_splits ruleset "
f"for machine {machine.id})"
)
return
# Pure allocator handles the rounding rule (last leg absorbs remainder).
leg_amounts = allocate_operator_split_legs(
settlement.operator_fee_sats,
[float(leg.pct) for leg in splits],
)
for idx, (leg, amount) in enumerate(zip(splits, leg_amounts)):
if amount <= 0:
continue
label = leg.label or f"split-{idx + 1}"
memo = (
f"satmachine operator split — "
f"{machine.name or machine.machine_npub[:12]} ({label})"
)
await _pay_internal(
settlement=settlement,
machine=machine,
leg_type="operator_split",
client_id=None,
destination_wallet_id=leg.wallet_id,
amount_sats=amount,
memo=memo,
errors=errors,
)
# =============================================================================
# Leg 3 — DCA distribution to active LPs
# =============================================================================
async def _pay_dca_distributions(
settlement: DcaSettlement,
machine: Machine,
errors: List[str],
) -> None:
if settlement.net_sats <= 0:
return
if settlement.exchange_rate <= 0:
# Fallback path with no exchange rate (bitSpire Payment.extra absent).
# Without a rate we can't compute fiat balances → can't compute
# proportional shares → leave net_sats in the machine wallet for
# the operator to manually reconcile.
logger.warning(
f"distribution: net_sats={settlement.net_sats} left in machine "
f"wallet (no exchange_rate; fallback path; see lamassu-next#44)"
)
return
clients = await get_flow_mode_clients_for_machine(machine.id)
if not clients:
return
# Build {client_id: remaining_fiat_balance} for proportional allocation.
client_balances: dict[str, float] = {}
for client in clients:
summary = await get_client_balance_summary(client.id)
if summary is None or summary.remaining_balance <= 0:
continue
client_balances[client.id] = summary.remaining_balance
if not client_balances:
return
# Compute proportional sat allocations, then cap each at the client's
# remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard).
raw_allocations = calculate_distribution(
base_amount_sats=settlement.net_sats,
client_balances=client_balances,
)
capped_allocations: dict[str, int] = {}
for client_id, raw_sats in raw_allocations.items():
remaining_fiat = client_balances[client_id]
cap_sats = int(remaining_fiat * float(settlement.exchange_rate))
capped_allocations[client_id] = min(raw_sats, cap_sats)
# Pay each capped allocation.
client_by_id = {c.id: c for c in clients}
for client_id, amount_sats in capped_allocations.items():
if amount_sats <= 0:
continue
client = client_by_id[client_id]
amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2)
memo = (
f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}"
)
await _pay_internal(
settlement=settlement,
machine=machine,
leg_type="dca",
client_id=client.id,
destination_wallet_id=client.wallet_id,
amount_sats=amount_sats,
amount_fiat=amount_fiat,
exchange_rate=float(settlement.exchange_rate),
memo=memo,
errors=errors,
)
# =============================================================================
# Internal transfer helper
# =============================================================================
async def _pay_internal(
*,
settlement: DcaSettlement,
machine: Machine,
leg_type: str,
client_id: str | None,
destination_wallet_id: str,
amount_sats: int,
memo: str,
errors: List[str],
amount_fiat: float | None = None,
exchange_rate: float | None = None,
) -> DcaPayment | None:
"""Create an invoice on the destination wallet, pay it from the machine
wallet, and record the leg in dca_payments. Returns the dca_payments row
on success (including the failed case the row stays for audit)."""
tag = _payment_tag(machine)
leg_row = await create_dca_payment(
CreateDcaPaymentData(
settlement_id=settlement.id,
client_id=client_id,
machine_id=machine.id,
operator_user_id=machine.operator_user_id,
leg_type=leg_type,
destination_wallet_id=destination_wallet_id,
destination_ln_address=None,
amount_sats=amount_sats,
amount_fiat=amount_fiat,
exchange_rate=exchange_rate,
transaction_time=datetime.now(),
external_payment_hash=None,
)
)
extra = {
"satmachine_leg": leg_type,
"satmachine_settlement_id": settlement.id,
"satmachine_machine_npub": machine.machine_npub,
}
try:
new_invoice = await create_invoice(
wallet_id=destination_wallet_id,
amount=float(amount_sats),
internal=True,
memo=memo,
extra=extra,
)
if not new_invoice or not new_invoice.bolt11:
await update_payment_status(
leg_row.id, "failed", None, "create_invoice returned empty"
)
errors.append(f"{leg_type}: create_invoice empty")
return leg_row
paid = await pay_invoice(
wallet_id=machine.wallet_id,
payment_request=new_invoice.bolt11,
description=memo,
tag=tag,
extra=extra,
)
await update_payment_status(
leg_row.id, "completed", paid.payment_hash, None
)
return leg_row
except Exception as exc:
logger.error(
f"distribution: {leg_type} leg failed "
f"(settlement={settlement.id} amount={amount_sats}): {exc}"
)
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
errors.append(f"{leg_type}: {exc}")
return leg_row

View file

@ -25,6 +25,7 @@ from .crud import (
get_active_machine_by_wallet_id, get_active_machine_by_wallet_id,
get_super_config, get_super_config,
) )
from .distribution import process_settlement
LISTENER_NAME = "ext_satmachineadmin" LISTENER_NAME = "ext_satmachineadmin"
@ -78,6 +79,10 @@ async def _handle_payment(payment: Payment) -> None:
f"(super_fee={data.platform_fee_sats} " f"(super_fee={data.platform_fee_sats} "
f"operator_fee={data.operator_fee_sats}){fb}" f"operator_fee={data.operator_fee_sats}){fb}"
) )
# Trigger distribution synchronously so latency is one bitSpire-tx wide.
# process_settlement is idempotent (status='processed' guard); if this
# task crashes mid-process, the next manual or scheduled retry resumes.
await process_settlement(settlement.id)
async def hourly_transaction_polling() -> None: async def hourly_transaction_polling() -> None:

View file

@ -0,0 +1,144 @@
"""
Tests for the v2 two-stage commission split (super first, operator remainder).
The plan calls out a verification scenario explicitly:
super_fee_pct=30%, operator split 50/30/20 on a 100-sat commission
super_wallet gets 30, operator_self gets 35, employee 21, maint 14.
Also covers the edge cases: super_fee_pct=0 (no super), super_fee_pct=1.0
(everything to super), single-leg operator ruleset, zero operator fee.
"""
import pytest
from ..calculations import (
allocate_operator_split_legs,
split_two_stage_commission,
)
class TestSplitTwoStageCommission:
"""Stage-1: super takes super_fee_pct of commission; operator gets rest."""
def test_plan_example_100sats_30pct(self):
platform, operator = split_two_stage_commission(100, 0.30)
assert platform == 30
assert operator == 70
assert platform + operator == 100
def test_realistic_7965sats_30pct(self):
# From the plan's 2000 GTQ → 266800 sats @ 3% commission example.
platform, operator = split_two_stage_commission(7965, 0.30)
assert platform == 2390 # round(7965 * 0.30) = 2389.5 → 2390
assert operator == 5575 # 7965 - 2390
assert platform + operator == 7965
def test_super_pct_zero_leaves_all_to_operator(self):
platform, operator = split_two_stage_commission(7965, 0.0)
assert platform == 0
assert operator == 7965
def test_super_pct_one_takes_everything(self):
platform, operator = split_two_stage_commission(7965, 1.0)
assert platform == 7965
assert operator == 0
def test_zero_commission(self):
platform, operator = split_two_stage_commission(0, 0.30)
assert platform == 0
assert operator == 0
def test_negative_commission_clamps_to_zero(self):
# Defensive: should never happen, but verify we don't go negative.
platform, operator = split_two_stage_commission(-100, 0.30)
assert platform == 0
assert operator == 0
@pytest.mark.parametrize("commission_sats", [1, 7, 100, 7965, 1_000_000])
@pytest.mark.parametrize("super_pct", [0.0, 0.1, 0.30, 0.5, 0.777, 1.0])
def test_invariant_sum_equals_commission(self, commission_sats, super_pct):
platform, operator = split_two_stage_commission(commission_sats, super_pct)
assert platform + operator == commission_sats
assert 0 <= platform <= commission_sats
assert 0 <= operator <= commission_sats
class TestAllocateOperatorSplitLegs:
"""Stage-2: operator's remainder split across N leg wallets per pct rules."""
def test_plan_example_50_30_20_on_70(self):
amounts = allocate_operator_split_legs(70, [0.5, 0.3, 0.2])
assert amounts == [35, 21, 14]
assert sum(amounts) == 70
def test_realistic_50_30_20_on_5575(self):
amounts = allocate_operator_split_legs(5575, [0.5, 0.3, 0.2])
# 50%: round(2787.5) = 2788; 30%: round(1672.5) = 1672; last absorbs
# remainder: 5575 - 2788 - 1672 = 1115.
# Note: round() uses banker's rounding so 2787.5 → 2788 actually
# because 2788 is even. Confirm by total invariant.
assert sum(amounts) == 5575
assert len(amounts) == 3
def test_single_leg_full_remainder(self):
amounts = allocate_operator_split_legs(100, [1.0])
assert amounts == [100]
def test_zero_operator_fee_zeros_all_legs(self):
amounts = allocate_operator_split_legs(0, [0.5, 0.5])
assert amounts == [0, 0]
def test_empty_legs_list_returns_empty(self):
amounts = allocate_operator_split_legs(100, [])
assert amounts == []
def test_last_leg_absorbs_rounding_remainder(self):
# 100 / 3 ≈ 33.33 each; rounding makes the first two 33 and last 34.
amounts = allocate_operator_split_legs(100, [1 / 3, 1 / 3, 1 / 3])
assert sum(amounts) == 100
assert amounts[0] == round(100 / 3) # 33
assert amounts[1] == round(100 / 3) # 33
# Last leg absorbs the rounding (34, not 33) so total == 100.
assert amounts[2] == 100 - amounts[0] - amounts[1]
@pytest.mark.parametrize(
"operator_fee,pcts",
[
(1, [0.5, 0.5]),
(7, [0.5, 0.3, 0.2]),
(100, [0.5, 0.5]),
(5575, [0.5, 0.3, 0.2]),
(1_000_000, [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]),
],
)
def test_invariant_sum_equals_operator_fee(self, operator_fee, pcts):
amounts = allocate_operator_split_legs(operator_fee, pcts)
assert sum(amounts) == operator_fee
assert all(a >= 0 for a in amounts)
class TestEndToEndScenarios:
"""The full two-stage split — super then operator legs — composed."""
def test_plan_example_full(self):
# 100 sats commission, super=30%, operator splits 50/30/20.
platform, operator = split_two_stage_commission(100, 0.30)
legs = allocate_operator_split_legs(operator, [0.5, 0.3, 0.2])
assert platform == 30
assert legs == [35, 21, 14]
assert platform + sum(legs) == 100
def test_super_pct_zero_full_pipeline(self):
platform, operator = split_two_stage_commission(7965, 0.0)
legs = allocate_operator_split_legs(operator, [1.0])
assert platform == 0
assert legs == [7965]
assert platform + sum(legs) == 7965
def test_super_pct_one_full_pipeline(self):
platform, operator = split_two_stage_commission(7965, 1.0)
legs = allocate_operator_split_legs(operator, [0.5, 0.5])
assert platform == 7965
# Operator has zero to distribute; both legs get zero.
assert legs == [0, 0]
assert platform + sum(legs) == 7965

View file

@ -9,7 +9,7 @@ from http import HTTPStatus
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from lnbits.core.models import User from lnbits.core.models import User
from lnbits.decorators import check_user_exists from lnbits.decorators import check_super_user, check_user_exists
from .crud import ( from .crud import (
create_machine, create_machine,
@ -22,6 +22,7 @@ from .crud import (
get_settlements_for_operator, get_settlements_for_operator,
get_super_config, get_super_config,
update_machine, update_machine,
update_super_config,
) )
from .models import ( from .models import (
CreateMachineData, CreateMachineData,
@ -30,6 +31,7 @@ from .models import (
Machine, Machine,
SuperConfig, SuperConfig,
UpdateMachineData, UpdateMachineData,
UpdateSuperConfigData,
) )
satmachineadmin_api_router = APIRouter() satmachineadmin_api_router = APIRouter()
@ -155,7 +157,7 @@ async def api_list_payments(
# ============================================================================= # =============================================================================
# Super config — read-only at this phase. Super-only write endpoint lands in P2. # Super config — operators read; super (LNbits instance admin) writes.
# ============================================================================= # =============================================================================
@ -167,8 +169,7 @@ async def api_get_super_config(
) -> SuperConfig: ) -> SuperConfig:
"""Returns the platform-fee config so operators can display it as a """Returns the platform-fee config so operators can display it as a
read-only line item in their UI. The fee is set by the LNbits super read-only line item in their UI. The fee is set by the LNbits super
instance-wide; operators see it but can't change it (write endpoint instance-wide; operators see it but can't change it."""
protected by check_super_user, landing in P2)."""
config = await get_super_config() config = await get_super_config()
if config is None: if config is None:
raise HTTPException( raise HTTPException(
@ -177,6 +178,25 @@ async def api_get_super_config(
return config return config
@satmachineadmin_api_router.put(
"/api/v1/dca/super-config", response_model=SuperConfig
)
async def api_update_super_config(
data: UpdateSuperConfigData,
_user: User = Depends(check_super_user),
) -> SuperConfig:
"""Super-only: set the platform fee % charged on every operator's
commission, plus the destination wallet for collecting it. The fee is
enforced before the operator's own commission_splits ruleset fires
(see distribution.process_settlement)."""
config = await update_super_config(data)
if config is None:
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config"
)
return config
# ============================================================================= # =============================================================================
# Catch-all stub for endpoints not yet implemented (clients, deposits, # Catch-all stub for endpoints not yet implemented (clients, deposits,
# commission splits, partial-tx, balance-settle, super-config write). Each # commission splits, partial-tx, balance-settle, super-config write). Each