Compare commits

..

No commits in common. "3ede66ff92f0d63431d1f74a4417db6f38ed4f22" and "937749f149ebbffd7f6dac60d1a969850c154e42" have entirely different histories.

9 changed files with 62 additions and 2006 deletions

View file

@ -1,176 +0,0 @@
# Satoshi Machine v2 — bitSpire payment parser.
#
# Translates an inbound LNbits Payment (cash-out customer paid the ATM's
# invoice) into the principal/commission split needed by satmachineadmin.
#
# Happy path: bitSpire populates Payment.extra with the canonical split
# fields per aiolabs/lamassu-next#44 — we read them directly.
#
# Fallback path: extra is missing (older bitSpire, edge case). We back-derive
# the split from the machine's fallback_commission_pct using the Lamassu-era
# formula (base = total / (1 + commission)) and mark used_fallback_split=true
# so the audit trail shows we estimated.
from __future__ import annotations
import json
from typing import Any, Optional, Tuple
from loguru import logger
from .calculations import calculate_commission
from .models import CreateDcaSettlementData, Machine
# Sentinel value bitSpire sets in Payment.extra.source so we know an inbound
# payment originated from an ATM cash-out and not some other extension or
# customer-initiated transfer.
BITSPIRE_SOURCE = "bitspire"
def _coerce_int(v: Any) -> Optional[int]:
if v is None:
return None
try:
return int(v)
except (TypeError, ValueError):
return None
def _coerce_float(v: Any) -> Optional[float]:
if v is None:
return None
try:
return float(v)
except (TypeError, ValueError):
return None
def _coerce_str(v: Any) -> Optional[str]:
if v is None:
return None
return str(v) if not isinstance(v, str) else v
def _json_dumps(v: Any) -> Optional[str]:
if v is None:
return None
try:
return json.dumps(v)
except (TypeError, ValueError):
return None
def is_bitspire_payment(extra: dict) -> bool:
"""True if Payment.extra carries the bitSpire source marker (post-#44)."""
return isinstance(extra, dict) and extra.get("source") == BITSPIRE_SOURCE
def parse_settlement(
machine: Machine,
payment_hash: str,
gross_sats: int,
extra: dict,
super_fee_pct: float,
) -> Tuple[CreateDcaSettlementData, bool]:
"""Build a CreateDcaSettlementData for an inbound payment landing on
`machine`'s wallet.
Returns (data, used_fallback): when `used_fallback` is True, bitSpire
didn't populate Payment.extra so we back-derived the split. Caller
should log this for visibility once aiolabs/lamassu-next#44 ships,
fallback usage should drop to zero.
"""
if is_bitspire_payment(extra):
data = _parse_extra(machine, payment_hash, gross_sats, extra, super_fee_pct)
return data, False
logger.warning(
f"satmachineadmin: settlement on machine {machine.machine_npub[:12]}... "
f"missing bitSpire extra metadata; back-deriving via "
f"fallback_commission_pct={machine.fallback_commission_pct}. "
f"See aiolabs/lamassu-next#44."
)
return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct), True
def _parse_extra(
machine: Machine,
payment_hash: str,
gross_sats: int,
extra: dict,
super_fee_pct: float,
) -> CreateDcaSettlementData:
"""Happy path: bitSpire populated Payment.extra per lamassu-next#44."""
net_sats = _coerce_int(extra.get("net_sats"))
fee_sats = _coerce_int(extra.get("fee_sats"))
if net_sats is None or fee_sats is None:
# Missing key fields — shouldn't happen post-#44 but defensive.
return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct)
commission_sats = fee_sats
platform_fee_sats = round(commission_sats * super_fee_pct)
operator_fee_sats = commission_sats - platform_fee_sats
exchange_rate = _coerce_float(extra.get("exchange_rate"))
if exchange_rate is None or exchange_rate <= 0:
# Without exchange rate we can't compute fiat. Use 1.0 as a stand-in
# and let the operator correct via manual reconciliation.
exchange_rate = 1.0
fiat_amount = round(gross_sats / exchange_rate, 2) if exchange_rate > 0 else 0.0
fiat_code = _coerce_str(extra.get("currency")) or machine.fiat_code
return CreateDcaSettlementData(
machine_id=machine.id,
payment_hash=payment_hash,
bitspire_event_id=None,
bitspire_txid=_coerce_str(extra.get("txid")),
gross_sats=gross_sats,
fiat_amount=fiat_amount,
fiat_code=fiat_code,
exchange_rate=exchange_rate,
net_sats=net_sats,
commission_sats=commission_sats,
platform_fee_sats=platform_fee_sats,
operator_fee_sats=operator_fee_sats,
used_fallback_split=False,
tx_type=_coerce_str(extra.get("type")) or "cash_out",
bills_json=_json_dumps(extra.get("bills")),
cassettes_json=_json_dumps(extra.get("cassettes")),
)
def _parse_fallback(
machine: Machine,
payment_hash: str,
gross_sats: int,
super_fee_pct: float,
) -> CreateDcaSettlementData:
"""Back-derive the split using the machine's fallback_commission_pct.
Same formula as the Lamassu integration used:
base_amount = round(gross / (1 + commission_pct))
commission = gross - base_amount
"""
net_sats, commission_sats, _effective = calculate_commission(
crypto_atoms=gross_sats,
commission_percentage=machine.fallback_commission_pct,
discount=0.0,
)
platform_fee_sats = round(commission_sats * super_fee_pct)
operator_fee_sats = commission_sats - platform_fee_sats
# No exchange rate from the wire; leave fiat_amount=0 so it's visibly
# incomplete on the operator's reconciliation screen.
return CreateDcaSettlementData(
machine_id=machine.id,
payment_hash=payment_hash,
bitspire_event_id=None,
bitspire_txid=None,
gross_sats=gross_sats,
fiat_amount=0.0,
fiat_code=machine.fiat_code,
exchange_rate=0.0,
net_sats=net_sats,
commission_sats=commission_sats,
platform_fee_sats=platform_fee_sats,
operator_fee_sats=operator_fee_sats,
used_fallback_split=True,
tx_type="cash_out",
bills_json=None,
cassettes_json=None,
)

View file

@ -131,70 +131,6 @@ def calculate_distribution(
return distributions return distributions
def split_two_stage_commission(
commission_sats: int, super_fee_pct: float
) -> Tuple[int, int]:
"""Stage-1 of the v2 commission split: super takes `super_fee_pct` of the
total commission; the remainder is what the operator's own ruleset acts on.
Returns (platform_fee_sats, operator_fee_sats). Platform is rounded;
operator absorbs the rounding remainder so platform_fee + operator_fee
== commission_sats exactly.
Examples:
>>> split_two_stage_commission(100, 0.30)
(30, 70)
>>> split_two_stage_commission(7965, 0.30)
(2390, 5575)
>>> split_two_stage_commission(100, 0.0)
(0, 100)
>>> split_two_stage_commission(100, 1.0)
(100, 0)
"""
if commission_sats <= 0:
return 0, 0
platform = round(commission_sats * super_fee_pct)
platform = max(0, min(platform, commission_sats))
operator = commission_sats - platform
return platform, operator
def allocate_operator_split_legs(
operator_fee_sats: int, leg_pcts: list
) -> list:
"""Stage-2 of the v2 commission split: the operator's remainder is sliced
across N leg wallets per `leg_pcts` (each in 0..1, sum should equal 1.0).
The last leg absorbs the rounding remainder so the sum of allocations
exactly equals operator_fee_sats (assuming pcts sum to ~1.0). Returns
a list of integer sat amounts in the same order as leg_pcts.
Examples:
>>> allocate_operator_split_legs(70, [0.5, 0.3, 0.2])
[35, 21, 14]
>>> allocate_operator_split_legs(5575, [0.5, 0.3, 0.2])
[2787, 1672, 1116]
>>> allocate_operator_split_legs(100, [1.0])
[100]
>>> allocate_operator_split_legs(0, [0.5, 0.5])
[0, 0]
"""
if not leg_pcts:
return []
if operator_fee_sats <= 0:
return [0] * len(leg_pcts)
allocations: list = []
remaining = operator_fee_sats
for idx, pct in enumerate(leg_pcts):
if idx == len(leg_pcts) - 1:
allocations.append(remaining)
else:
amount = round(operator_fee_sats * float(pct))
allocations.append(amount)
remaining -= amount
return allocations
def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float: def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float:
""" """
Calculate exchange rate in sats per fiat unit. Calculate exchange rate in sats per fiat unit.

212
crud.py
View file

@ -118,19 +118,6 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]:
) )
async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]:
"""Used by the invoice listener to route an incoming payment to a machine."""
return await db.fetchone(
"""
SELECT * FROM satoshimachine.dca_machines
WHERE wallet_id = :wid AND is_active = true
LIMIT 1
""",
{"wid": wallet_id},
Machine,
)
async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: async def get_machines_for_operator(operator_user_id: str) -> List[Machine]:
return await db.fetchall( return await db.fetchall(
""" """
@ -423,24 +410,24 @@ async def delete_deposit(deposit_id: str) -> None:
async def create_settlement_idempotent( async def create_settlement_idempotent(
data: CreateDcaSettlementData, data: CreateDcaSettlementData,
) -> Optional[DcaSettlement]: ) -> Optional[DcaSettlement]:
"""Insert a settlement keyed by payment_hash. Returns the inserted row on """Insert a settlement keyed by bitspire_event_id. Returns the inserted row
first sight; returns the existing row if the payment_hash was already seen on first sight; returns the existing row if the event_id was already seen
(subscription replay, dispatcher double-fire). The UNIQUE constraint on (subscription replay, relay double-delivery). The UNIQUE constraint on
payment_hash is the source of truth.""" bitspire_event_id is the source of truth."""
existing = await get_settlement_by_payment_hash(data.payment_hash) existing = await get_settlement_by_event_id(data.bitspire_event_id)
if existing is not None: if existing is not None:
return existing return existing
settlement_id = urlsafe_short_hash() settlement_id = urlsafe_short_hash()
await db.execute( await db.execute(
""" """
INSERT INTO satoshimachine.dca_settlements INSERT INTO satoshimachine.dca_settlements
(id, machine_id, payment_hash, bitspire_event_id, bitspire_txid, (id, machine_id, bitspire_event_id, bitspire_txid, payment_hash,
gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats, gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats,
commission_sats, platform_fee_sats, operator_fee_sats, commission_sats, platform_fee_sats, operator_fee_sats,
used_fallback_split, tx_type, bills_json, cassettes_json, used_fallback_split, tx_type, bills_json, cassettes_json,
status, created_at) status, created_at)
VALUES (:id, :machine_id, :payment_hash, :bitspire_event_id, VALUES (:id, :machine_id, :bitspire_event_id, :bitspire_txid,
:bitspire_txid, :gross_sats, :fiat_amount, :fiat_code, :payment_hash, :gross_sats, :fiat_amount, :fiat_code,
:exchange_rate, :net_sats, :commission_sats, :exchange_rate, :net_sats, :commission_sats,
:platform_fee_sats, :operator_fee_sats, :used_fallback_split, :platform_fee_sats, :operator_fee_sats, :used_fallback_split,
:tx_type, :bills_json, :cassettes_json, :status, :created_at) :tx_type, :bills_json, :cassettes_json, :status, :created_at)
@ -448,9 +435,9 @@ async def create_settlement_idempotent(
{ {
"id": settlement_id, "id": settlement_id,
"machine_id": data.machine_id, "machine_id": data.machine_id,
"payment_hash": data.payment_hash,
"bitspire_event_id": data.bitspire_event_id, "bitspire_event_id": data.bitspire_event_id,
"bitspire_txid": data.bitspire_txid, "bitspire_txid": data.bitspire_txid,
"payment_hash": data.payment_hash,
"gross_sats": data.gross_sats, "gross_sats": data.gross_sats,
"fiat_amount": data.fiat_amount, "fiat_amount": data.fiat_amount,
"fiat_code": data.fiat_code, "fiat_code": data.fiat_code,
@ -478,15 +465,15 @@ async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]:
) )
async def get_settlement_by_payment_hash( async def get_settlement_by_event_id(
payment_hash: str, bitspire_event_id: str,
) -> Optional[DcaSettlement]: ) -> Optional[DcaSettlement]:
return await db.fetchone( return await db.fetchone(
""" """
SELECT * FROM satoshimachine.dca_settlements SELECT * FROM satoshimachine.dca_settlements
WHERE payment_hash = :hash WHERE bitspire_event_id = :eid
""", """,
{"hash": payment_hash}, {"eid": bitspire_event_id},
DcaSettlement, DcaSettlement,
) )
@ -528,9 +515,7 @@ async def mark_settlement_status(
status: str, status: str,
error_message: Optional[str] = None, error_message: Optional[str] = None,
) -> Optional[DcaSettlement]: ) -> Optional[DcaSettlement]:
"""Status: 'pending' | 'processing' | 'processed' | 'partial' | """Status: 'pending' | 'processed' | 'partial' | 'refunded' | 'errored'."""
'refunded' | 'errored'. Clears processing_claim on terminal states so a
fresh claim attempt won't see a stale token."""
await db.execute( await db.execute(
""" """
UPDATE satoshimachine.dca_settlements UPDATE satoshimachine.dca_settlements
@ -539,10 +524,6 @@ async def mark_settlement_status(
processed_at = CASE processed_at = CASE
WHEN :status IN ('processed', 'partial', 'refunded') WHEN :status IN ('processed', 'partial', 'refunded')
THEN :now ELSE processed_at THEN :now ELSE processed_at
END,
processing_claim = CASE
WHEN :status = 'processing' THEN processing_claim
ELSE NULL
END END
WHERE id = :id WHERE id = :id
""", """,
@ -556,165 +537,6 @@ async def mark_settlement_status(
return await get_settlement(settlement_id) return await get_settlement(settlement_id)
async def claim_settlement_for_processing(
settlement_id: str,
) -> Optional[DcaSettlement]:
"""Optimistic-lock claim: atomically flip a settlement to 'processing'
and tag it with a per-invocation token. Returns the claimed row on
success; None if another caller already won the claim or the settlement
is not in a claimable state ('pending').
Pattern is portable across SQLite + PostgreSQL (doesn't rely on
UPDATE ... RETURNING). Two concurrent invocations may both run the
UPDATE, but only one row matches the WHERE clause; the loser's UPDATE
is a no-op against status='processing'. The read-back check on the
token disambiguates."""
token = urlsafe_short_hash()
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET status = 'processing', processing_claim = :token
WHERE id = :id AND status = 'pending'
""",
{"id": settlement_id, "token": token},
)
after = await get_settlement(settlement_id)
if after is None:
return None
if after.processing_claim != token:
return None
return after
async def reset_settlement_for_retry(
settlement_id: str,
) -> Optional[DcaSettlement]:
"""Operator retry path. Flips 'errored''pending' and voids any
'failed' legs so process_settlement re-runs them fresh. Completed legs
are left in place we never re-pay sats that already moved."""
await db.execute(
"""
UPDATE satoshimachine.dca_payments
SET status = 'voided'
WHERE settlement_id = :sid AND status = 'failed'
""",
{"sid": settlement_id},
)
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET status = 'pending',
error_message = NULL,
processing_claim = NULL,
processed_at = NULL
WHERE id = :id AND status = 'errored'
""",
{"id": settlement_id},
)
return await get_settlement(settlement_id)
async def apply_partial_dispense(
settlement_id: str,
*,
new_gross_sats: int,
new_net_sats: int,
new_commission_sats: int,
new_platform_fee_sats: int,
new_operator_fee_sats: int,
new_fiat_amount: float,
appended_note: str,
) -> Optional[DcaSettlement]:
"""Overwrite the monetary fields on a settlement (partial-dispense
recompute) and prepend `appended_note` to the notes column.
Notes are append-only: new lines go at the top (newest first) so the
settlement detail view shows the most recent adjustment first without
needing to scroll. Resets status to 'pending' so process_settlement
can re-distribute via the existing idempotent path."""
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET gross_sats = :gross,
net_sats = :net,
commission_sats = :commission,
platform_fee_sats = :platform,
operator_fee_sats = :operator,
fiat_amount = :fiat,
status = 'pending',
error_message = NULL,
processed_at = NULL,
notes = CASE
WHEN notes IS NULL OR notes = '' THEN :note
ELSE :note || char(10) || char(10) || notes
END
WHERE id = :id
""",
{
"id": settlement_id,
"gross": new_gross_sats,
"net": new_net_sats,
"commission": new_commission_sats,
"platform": new_platform_fee_sats,
"operator": new_operator_fee_sats,
"fiat": new_fiat_amount,
"note": appended_note,
},
)
return await get_settlement(settlement_id)
async def count_completed_legs_for_settlement(settlement_id: str) -> int:
"""Used by partial-dispense to refuse adjustments after any leg has
successfully moved sats (Lightning payments can't be clawed back)."""
row = await db.fetchone(
"""
SELECT COUNT(*) AS n FROM satoshimachine.dca_payments
WHERE settlement_id = :sid AND status = 'completed'
""",
{"sid": settlement_id},
)
return int(row["n"]) if row else 0
async def append_settlement_note(
settlement_id: str, note: str, author_user_id: str
) -> Optional[DcaSettlement]:
"""Prepend an operator-authored note to settlement.notes. Each entry is
timestamped (UTC) and tagged with the author's user id so the trail
is accountable. Append-only: existing entries are never edited."""
from datetime import timezone
ts = datetime.now(timezone.utc).isoformat(timespec="seconds")
formatted = f"[{ts} by {author_user_id}] {note}"
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET notes = CASE
WHEN notes IS NULL OR notes = '' THEN :note
ELSE :note || char(10) || char(10) || notes
END
WHERE id = :id
""",
{"id": settlement_id, "note": formatted},
)
return await get_settlement(settlement_id)
async def void_open_legs_for_settlement(settlement_id: str) -> None:
"""Marks pending/failed legs as 'voided' before re-running distribution
on a partial-dispense recompute. Preserves the rows for audit but stops
them from being interpreted as live."""
await db.execute(
"""
UPDATE satoshimachine.dca_payments
SET status = 'voided'
WHERE settlement_id = :sid AND status IN ('pending', 'failed')
""",
{"sid": settlement_id},
)
# ============================================================================= # =============================================================================
# Commission splits — operator's remainder-distribution rules. # Commission splits — operator's remainder-distribution rules.
# ============================================================================= # =============================================================================
@ -955,15 +777,11 @@ async def get_client_balance_summary(
""", """,
{"cid": client_id}, {"cid": client_id},
) )
# Both DCA legs (auto, from bitSpire settlements) and balance-settle legs
# (operator-initiated under #4) reduce the LP's remaining fiat balance.
payments_row = await db.fetchone( payments_row = await db.fetchone(
""" """
SELECT COALESCE(SUM(amount_fiat), 0) AS total SELECT COALESCE(SUM(amount_fiat), 0) AS total
FROM satoshimachine.dca_payments FROM satoshimachine.dca_payments
WHERE client_id = :cid WHERE client_id = :cid AND leg_type = 'dca' AND status = 'completed'
AND leg_type IN ('dca', 'settlement')
AND status = 'completed'
""", """,
{"cid": client_id}, {"cid": client_id},
) )

View file

@ -1,572 +0,0 @@
# Satoshi Machine v2 — settlement distribution (P2).
#
# Picks up a dca_settlements row with status='pending' and pays out the
# three leg groups via LNbits internal transfers (create_invoice +
# pay_invoice on the same instance auto-detect internal). All legs land
# in dca_payments with the appropriate leg_type discriminator and inherit
# the Payment.tag "satmachine:{machine_npub}" so LNbits payment-history
# filters work natively.
#
# Leg order:
# 1. super_fee — platform_fee_sats → super_fee_wallet_id (if set)
# 2. operator_split — operator_fee_sats split per operator's rules
# 3. dca — net_sats distributed proportionally to active LPs,
# each leg capped at the LP's remaining fiat balance
# (preserves the v1 sync-mismatch fix from PR #2)
#
# Atomicity: LN payments cannot be rolled back. We attempt each leg, record
# success/failure per dca_payments row, and mark the settlement 'processed'
# only when every leg completed. Any failure marks 'errored' with a message
# but leaves the successful legs in place. Sats that don't get paid out
# (failed legs, no LP coverage, missing super wallet) remain in the
# machine's wallet — visible to the operator on the dashboard.
from __future__ import annotations
from datetime import datetime, timezone
from typing import List
from lnbits.core.services import create_invoice, pay_invoice
from loguru import logger
from .calculations import (
allocate_operator_split_legs,
calculate_distribution,
split_two_stage_commission,
)
from .crud import (
apply_partial_dispense,
claim_settlement_for_processing,
count_completed_legs_for_settlement,
create_dca_payment,
get_client_balance_summary,
get_effective_commission_splits,
get_flow_mode_clients_for_machine,
get_machine,
get_settlement,
get_super_config,
mark_settlement_status,
update_payment_status,
void_open_legs_for_settlement,
)
from .models import (
CreateDcaPaymentData,
DcaClient,
DcaPayment,
DcaSettlement,
Machine,
PartialDispenseData,
SettleBalanceData,
SuperConfig,
)
PAYMENT_TAG_PREFIX = "satmachine"
def _payment_tag(machine: Machine) -> str:
return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}"
def _resolve_partial_dispense_gross(
settlement: DcaSettlement, data: PartialDispenseData
) -> int:
if data.dispensed_sats is not None:
new_gross = int(data.dispensed_sats)
elif data.dispensed_fraction is not None:
new_gross = round(settlement.gross_sats * float(data.dispensed_fraction))
else:
raise ValueError("provide one of dispensed_sats or dispensed_fraction")
if new_gross < 0:
raise ValueError("partial dispense cannot be negative")
if new_gross > settlement.gross_sats:
raise ValueError(
f"partial dispense ({new_gross} sats) cannot exceed the original "
f"gross ({settlement.gross_sats} sats)"
)
return new_gross
def _build_partial_dispense_memo(
settlement: DcaSettlement,
data: PartialDispenseData,
*,
new_gross: int,
new_net: int,
new_commission: int,
new_platform: int,
new_operator: int,
) -> str:
reason = (data.notes or "").strip() or "(no reason given)"
if data.dispensed_sats is not None:
adjust = f"dispensed_sats={data.dispensed_sats}"
else:
adjust = f"dispensed_fraction={data.dispensed_fraction}"
ts = datetime.now(timezone.utc).isoformat(timespec="seconds")
return (
f"[{ts}] partial dispense applied — {adjust}. "
f"Original gross={settlement.gross_sats} net={settlement.net_sats} "
f"commission={settlement.commission_sats} "
f"(super_fee={settlement.platform_fee_sats} "
f"operator_fee={settlement.operator_fee_sats}). "
f"New gross={new_gross} net={new_net} commission={new_commission} "
f"(super_fee={new_platform} operator_fee={new_operator}). "
f"Reason: {reason}"
)
async def settle_lp_balance(
client: DcaClient, machine: Machine, data: SettleBalanceData
) -> DcaPayment:
"""Operator UX action — closes satmachineadmin#4.
Settle an LP's remaining fiat balance from the operator's chosen funding
wallet at the rate the operator specified. Records a leg_type='settlement'
row that counts against the LP's balance summary (so a subsequent
get_client_balance_summary reflects the new zero/reduced balance).
Caller is responsible for verifying the operator owns both the client's
machine and the funding wallet (API endpoint does this). The amount_fiat
is capped at the LP's remaining balance — operators cannot accidentally
over-pay via this path.
"""
summary = await get_client_balance_summary(client.id)
if summary is None:
raise ValueError(f"client {client.id} balance not available")
remaining = float(summary.remaining_balance)
if remaining <= 0:
raise ValueError(
f"client {client.id} has no remaining balance to settle"
)
# Resolve fiat amount: explicit if given (capped at remaining), else full.
requested = (
float(data.amount_fiat) if data.amount_fiat is not None else remaining
)
amount_fiat = round(min(requested, remaining), 2)
if amount_fiat <= 0:
raise ValueError("computed settlement amount is zero")
exchange_rate = float(data.exchange_rate)
amount_sats = round(amount_fiat * exchange_rate)
if amount_sats <= 0:
raise ValueError(
f"computed sat amount is zero (amount_fiat={amount_fiat}, "
f"exchange_rate={exchange_rate})"
)
reason = (data.notes or "").strip() or "(no reason given)"
memo = (
f"satmachine balance settle — {amount_fiat:.2f} "
f"{machine.fiat_code} @ {exchange_rate:g} sat/{machine.fiat_code} "
f"= {amount_sats} sats. Reason: {reason}"
)
leg_row = await create_dca_payment(
CreateDcaPaymentData(
settlement_id=None,
client_id=client.id,
machine_id=machine.id,
operator_user_id=machine.operator_user_id,
leg_type="settlement",
destination_wallet_id=client.wallet_id,
destination_ln_address=None,
amount_sats=amount_sats,
amount_fiat=amount_fiat,
exchange_rate=exchange_rate,
transaction_time=datetime.now(timezone.utc),
external_payment_hash=None,
)
)
extra = {
"satmachine_leg": "settlement",
"satmachine_client_id": client.id,
"satmachine_machine_npub": machine.machine_npub,
"satmachine_exchange_rate": exchange_rate,
}
try:
new_invoice = await create_invoice(
wallet_id=client.wallet_id,
amount=float(amount_sats),
internal=True,
memo=memo,
extra=extra,
)
if not new_invoice or not new_invoice.bolt11:
await update_payment_status(
leg_row.id, "failed", None, "create_invoice returned empty"
)
raise ValueError("create_invoice returned empty")
paid = await pay_invoice(
wallet_id=data.funding_wallet_id,
payment_request=new_invoice.bolt11,
description=memo,
tag=_payment_tag(machine),
extra=extra,
)
completed = await update_payment_status(
leg_row.id, "completed", paid.payment_hash, None
)
return completed if completed is not None else leg_row
except Exception as exc:
logger.error(
f"distribution: balance-settle failed for client {client.id} "
f"({amount_sats} sats from wallet {data.funding_wallet_id}): {exc}"
)
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
raise
async def apply_partial_dispense_and_redistribute(
settlement_id: str, data: PartialDispenseData
) -> DcaSettlement:
"""Operator UX action — closes satmachineadmin#3.
When a bitSpire dispense fails mid-transaction (e.g., dispenser jam after
6 of 10 bills), the operator confirms the actual amount dispensed and we
re-allocate the split against that partial gross. Sat amounts scale
linearly, preserving the original commission ratio exactly; the two-stage
super/operator split is recomputed using the CURRENT super_fee_pct
(super may have changed the rate since the original landed).
Hard guard: refuses if any dca_payments leg has already completed.
Lightning payments can't be clawed back, so we won't try.
Side effects:
- Voids pending/failed legs (status 'voided').
- Overwrites the settlement's monetary fields with the new totals.
- Appends a timestamped memo to settlement.notes capturing the
original values + operator's reason.
- Resets settlement.status to 'pending' and triggers process_settlement.
"""
settlement = await get_settlement(settlement_id)
if settlement is None:
raise ValueError(f"settlement {settlement_id} not found")
if settlement.gross_sats <= 0:
raise ValueError("cannot partial-dispense a zero-gross settlement")
completed = await count_completed_legs_for_settlement(settlement_id)
if completed > 0:
raise ValueError(
f"cannot partial-dispense: {completed} leg(s) already completed "
"(Lightning payments can't be clawed back)"
)
new_gross = _resolve_partial_dispense_gross(settlement, data)
# Linear scale preserves the original commission ratio exactly.
scale = new_gross / settlement.gross_sats
new_commission = round(settlement.commission_sats * scale)
new_net = new_gross - new_commission
new_fiat = round(float(settlement.fiat_amount) * scale, 2)
# Re-stage-1 split using the CURRENT super_fee_pct.
super_config = await get_super_config()
super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0
new_platform, new_operator = split_two_stage_commission(
new_commission, super_fee_pct
)
memo = _build_partial_dispense_memo(
settlement,
data,
new_gross=new_gross,
new_net=new_net,
new_commission=new_commission,
new_platform=new_platform,
new_operator=new_operator,
)
await void_open_legs_for_settlement(settlement_id)
updated = await apply_partial_dispense(
settlement_id,
new_gross_sats=new_gross,
new_net_sats=new_net,
new_commission_sats=new_commission,
new_platform_fee_sats=new_platform,
new_operator_fee_sats=new_operator,
new_fiat_amount=new_fiat,
appended_note=memo,
)
if updated is None:
raise ValueError(f"settlement {settlement_id} disappeared mid-update")
logger.info(
f"distribution: partial-dispense applied to settlement "
f"{settlement_id} — re-running distribution"
)
await process_settlement(settlement_id)
after = await get_settlement(settlement_id)
return after if after is not None else updated
async def process_settlement(settlement_id: str) -> None:
"""Process a pending settlement end-to-end.
Concurrency-safe: an optimistic-lock claim flips the settlement to
'processing' atomically and tags it with a per-invocation token.
Concurrent invocations on the same id can't both win — losers see the
claim mismatch on read-back and return without writing any legs.
Retries land via reset_settlement_for_retry which voids failed legs
and flips 'errored' back to 'pending'."""
settlement = await claim_settlement_for_processing(settlement_id)
if settlement is None:
# Either already claimed by a concurrent invocation, or not in a
# 'pending' state. Either way, nothing to do here.
logger.debug(
f"distribution: skip {settlement_id} — not claimable (already "
"processing or not pending)"
)
return
machine = await get_machine(settlement.machine_id)
if machine is None:
logger.error(
f"distribution: settlement {settlement_id} references missing "
f"machine {settlement.machine_id}"
)
await mark_settlement_status(
settlement_id, "errored", "machine missing"
)
return
super_config = await get_super_config()
errors: List[str] = []
try:
await _pay_super_fee(settlement, machine, super_config, errors)
await _pay_operator_splits(settlement, machine, errors)
await _pay_dca_distributions(settlement, machine, errors)
except Exception as exc: # last-resort guard
logger.exception("distribution: unexpected error processing settlement")
errors.append(f"unexpected: {exc}")
if errors:
await mark_settlement_status(
settlement_id, "errored", "; ".join(errors)[:512]
)
else:
await mark_settlement_status(settlement_id, "processed", None)
# =============================================================================
# Leg 1 — super fee
# =============================================================================
async def _pay_super_fee(
settlement: DcaSettlement,
machine: Machine,
super_config: SuperConfig | None,
errors: List[str],
) -> None:
if settlement.platform_fee_sats <= 0:
return
if super_config is None or not super_config.super_fee_wallet_id:
# Super has configured a fee but not a destination wallet — leave
# the sats in the machine wallet and warn. The super needs to
# configure their wallet before they can collect.
logger.warning(
f"distribution: super_fee_sats={settlement.platform_fee_sats} "
f"left in machine wallet (super_fee_wallet_id not set)"
)
return
await _pay_internal(
settlement=settlement,
machine=machine,
leg_type="super_fee",
client_id=None,
destination_wallet_id=super_config.super_fee_wallet_id,
amount_sats=settlement.platform_fee_sats,
memo=f"satmachine super fee — {machine.name or machine.machine_npub[:12]}",
errors=errors,
)
# =============================================================================
# Leg 2 — operator commission splits
# =============================================================================
async def _pay_operator_splits(
settlement: DcaSettlement,
machine: Machine,
errors: List[str],
) -> None:
if settlement.operator_fee_sats <= 0:
return
splits = await get_effective_commission_splits(
machine.operator_user_id, machine.id
)
if not splits:
logger.warning(
f"distribution: operator_fee_sats={settlement.operator_fee_sats} "
f"left in machine wallet (operator has no commission_splits ruleset "
f"for machine {machine.id})"
)
return
# Pure allocator handles the rounding rule (last leg absorbs remainder).
leg_amounts = allocate_operator_split_legs(
settlement.operator_fee_sats,
[float(leg.pct) for leg in splits],
)
for idx, (leg, amount) in enumerate(zip(splits, leg_amounts, strict=True)):
if amount <= 0:
continue
label = leg.label or f"split-{idx + 1}"
memo = (
f"satmachine operator split — "
f"{machine.name or machine.machine_npub[:12]} ({label})"
)
await _pay_internal(
settlement=settlement,
machine=machine,
leg_type="operator_split",
client_id=None,
destination_wallet_id=leg.wallet_id,
amount_sats=amount,
memo=memo,
errors=errors,
)
# =============================================================================
# Leg 3 — DCA distribution to active LPs
# =============================================================================
async def _pay_dca_distributions(
settlement: DcaSettlement,
machine: Machine,
errors: List[str],
) -> None:
if settlement.net_sats <= 0:
return
if settlement.exchange_rate <= 0:
# Fallback path with no exchange rate (bitSpire Payment.extra absent).
# Without a rate we can't compute fiat balances → can't compute
# proportional shares → leave net_sats in the machine wallet for
# the operator to manually reconcile.
logger.warning(
f"distribution: net_sats={settlement.net_sats} left in machine "
f"wallet (no exchange_rate; fallback path; see lamassu-next#44)"
)
return
clients = await get_flow_mode_clients_for_machine(machine.id)
if not clients:
return
# Build {client_id: remaining_fiat_balance} for proportional allocation.
client_balances: dict[str, float] = {}
for client in clients:
summary = await get_client_balance_summary(client.id)
if summary is None or summary.remaining_balance <= 0:
continue
client_balances[client.id] = summary.remaining_balance
if not client_balances:
return
# Compute proportional sat allocations, then cap each at the client's
# remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard).
raw_allocations = calculate_distribution(
base_amount_sats=settlement.net_sats,
client_balances=client_balances,
)
capped_allocations: dict[str, int] = {}
for client_id, raw_sats in raw_allocations.items():
remaining_fiat = client_balances[client_id]
cap_sats = int(remaining_fiat * float(settlement.exchange_rate))
capped_allocations[client_id] = min(raw_sats, cap_sats)
# Pay each capped allocation.
client_by_id = {c.id: c for c in clients}
for client_id, amount_sats in capped_allocations.items():
if amount_sats <= 0:
continue
client = client_by_id[client_id]
amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2)
memo = (
f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}"
)
await _pay_internal(
settlement=settlement,
machine=machine,
leg_type="dca",
client_id=client.id,
destination_wallet_id=client.wallet_id,
amount_sats=amount_sats,
amount_fiat=amount_fiat,
exchange_rate=float(settlement.exchange_rate),
memo=memo,
errors=errors,
)
# =============================================================================
# Internal transfer helper
# =============================================================================
async def _pay_internal(
*,
settlement: DcaSettlement,
machine: Machine,
leg_type: str,
client_id: str | None,
destination_wallet_id: str,
amount_sats: int,
memo: str,
errors: List[str],
amount_fiat: float | None = None,
exchange_rate: float | None = None,
) -> DcaPayment | None:
"""Create an invoice on the destination wallet, pay it from the machine
wallet, and record the leg in dca_payments. Returns the dca_payments row
on success (including the failed case the row stays for audit)."""
tag = _payment_tag(machine)
leg_row = await create_dca_payment(
CreateDcaPaymentData(
settlement_id=settlement.id,
client_id=client_id,
machine_id=machine.id,
operator_user_id=machine.operator_user_id,
leg_type=leg_type,
destination_wallet_id=destination_wallet_id,
destination_ln_address=None,
amount_sats=amount_sats,
amount_fiat=amount_fiat,
exchange_rate=exchange_rate,
transaction_time=datetime.now(),
external_payment_hash=None,
)
)
extra = {
"satmachine_leg": leg_type,
"satmachine_settlement_id": settlement.id,
"satmachine_machine_npub": machine.machine_npub,
}
try:
new_invoice = await create_invoice(
wallet_id=destination_wallet_id,
amount=float(amount_sats),
internal=True,
memo=memo,
extra=extra,
)
if not new_invoice or not new_invoice.bolt11:
await update_payment_status(
leg_row.id, "failed", None, "create_invoice returned empty"
)
errors.append(f"{leg_type}: create_invoice empty")
return leg_row
paid = await pay_invoice(
wallet_id=machine.wallet_id,
payment_request=new_invoice.bolt11,
description=memo,
tag=tag,
extra=extra,
)
await update_payment_status(
leg_row.id, "completed", paid.payment_hash, None
)
return leg_row
except Exception as exc:
logger.error(
f"distribution: {leg_type} leg failed "
f"(settlement={settlement.id} amount={amount_sats}): {exc}"
)
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
errors.append(f"{leg_type}: {exc}")
return leg_row

View file

@ -290,28 +290,20 @@ async def m005_satmachine_v2_overhaul(db):
"ON satoshimachine.dca_deposits (client_id, created_at DESC)" "ON satoshimachine.dca_deposits (client_id, created_at DESC)"
) )
# dca_settlements — idempotency table for bitSpire-driven settlements. # dca_settlements — idempotency table for bitSpire kind-21000 events.
# The natural unique key is payment_hash (every LN invoice has a globally # CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute BIGINT
# unique hash; subscription replays / dispatcher double-fires dedup via the # (not as a derived percentage). Today this is just the contractual split. Once
# UNIQUE constraint). bitspire_event_id is reserved for a future path where # the v2 promotion engine ships, the two values diverge when discounts fire and
# we subscribe to raw Nostr events directly (kind-30078/30079 ingestion # this row is the only audit-grade record of who forgave what. Do not collapse
# uses dca_telemetry; bitspire_event_id is kept here for future bookkeeping # them into a single commission_pct field. See plan section "Customer discounts".
# if we ever bypass the LNbits Payment system).
#
# CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute
# BIGINT (not a derived percentage). Today this is just the contractual
# split. Once the v2 promotion engine ships, the two values diverge when
# discounts fire and this row is the only audit-grade record of who forgave
# what. Do not collapse them into a single commission_pct field. See plan
# section "Customer discounts".
await db.execute( await db.execute(
f""" f"""
CREATE TABLE satoshimachine.dca_settlements ( CREATE TABLE satoshimachine.dca_settlements (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
machine_id TEXT NOT NULL, machine_id TEXT NOT NULL,
payment_hash TEXT NOT NULL UNIQUE, bitspire_event_id TEXT NOT NULL UNIQUE,
bitspire_event_id TEXT,
bitspire_txid TEXT, bitspire_txid TEXT,
payment_hash TEXT NOT NULL,
gross_sats BIGINT NOT NULL, gross_sats BIGINT NOT NULL,
fiat_amount DECIMAL(10,2) NOT NULL, fiat_amount DECIMAL(10,2) NOT NULL,
fiat_code TEXT NOT NULL DEFAULT 'GTQ', fiat_code TEXT NOT NULL DEFAULT 'GTQ',
@ -335,7 +327,10 @@ async def m005_satmachine_v2_overhaul(db):
"CREATE INDEX dca_settlements_machine_idx " "CREATE INDEX dca_settlements_machine_idx "
"ON satoshimachine.dca_settlements (machine_id, created_at DESC)" "ON satoshimachine.dca_settlements (machine_id, created_at DESC)"
) )
# payment_hash UNIQUE already creates a lookup index — no extra index needed. await db.execute(
"CREATE INDEX dca_settlements_payment_hash_idx "
"ON satoshimachine.dca_settlements (payment_hash)"
)
# dca_commission_splits — operator's rules for distributing the *remainder* # dca_commission_splits — operator's rules for distributing the *remainder*
# of each commission (commission_sats - platform_fee_sats). One row per leg. # of each commission (commission_sats - platform_fee_sats). One row per leg.
@ -428,51 +423,3 @@ async def m005_satmachine_v2_overhaul(db):
); );
""" """
) )
async def m006_add_settlement_notes(db):
"""Audit memo on dca_settlements.
When an operator triggers an in-place adjustment (partial-dispense,
manual reconciliation override, etc.), the settlement row's monetary
fields are overwritten with the new numbers. To preserve the audit
trail without a separate history table, we append a timestamped memo
to this notes column capturing the previous values and the reason.
Operators see this directly in the settlement detail view, so any
overwrite is visible and dated. Append-only convention: new memos
are prepended with a timestamp; never edited in place.
"""
await db.execute(
"ALTER TABLE satoshimachine.dca_settlements ADD COLUMN notes TEXT"
)
async def m007_settlement_claim_and_machine_wallet_unique(db):
"""Security + concurrency hardening (fix bundle 1).
1. Adds `processing_claim` to dca_settlements. The settlement processor
uses an optimistic-lock pattern: write a per-invocation claim token
alongside the status='processing' flip, then re-read and confirm the
persisted token matches. Two concurrent process_settlement invocations
on the same id can't both win the claim, so no duplicate leg
creation / double-pay.
2. Adds a UNIQUE index on dca_machines.wallet_id so two machine rows
can never claim the same wallet. Closes a wallet-IDOR funds-theft
vector where operator A could register a machine on operator B's
wallet_id and drain it via the settlement processor's pay_invoice.
Defence-in-depth on top of the API-layer ownership check; if a future
endpoint forgets the check, the DB still rejects.
CREATE UNIQUE INDEX is portable across SQLite and PostgreSQL
(ALTER TABLE ADD CONSTRAINT is not on SQLite).
"""
await db.execute(
"ALTER TABLE satoshimachine.dca_settlements "
"ADD COLUMN processing_claim TEXT"
)
await db.execute(
"CREATE UNIQUE INDEX dca_machines_wallet_id_uq "
"ON satoshimachine.dca_machines (wallet_id)"
)

View file

@ -185,9 +185,9 @@ class UpdateDepositStatusData(BaseModel):
class CreateDcaSettlementData(BaseModel): class CreateDcaSettlementData(BaseModel):
machine_id: str machine_id: str
payment_hash: str # the idempotency key (UNIQUE in the dca_settlements table) bitspire_event_id: str # nostr event id — the idempotency key
bitspire_event_id: Optional[str] = None # reserved for direct-Nostr ingestion
bitspire_txid: Optional[str] = None bitspire_txid: Optional[str] = None
payment_hash: str
gross_sats: int gross_sats: int
fiat_amount: float fiat_amount: float
fiat_code: str = "GTQ" fiat_code: str = "GTQ"
@ -205,9 +205,9 @@ class CreateDcaSettlementData(BaseModel):
class DcaSettlement(BaseModel): class DcaSettlement(BaseModel):
id: str id: str
machine_id: str machine_id: str
payment_hash: str bitspire_event_id: str
bitspire_event_id: Optional[str]
bitspire_txid: Optional[str] bitspire_txid: Optional[str]
payment_hash: str
gross_sats: int gross_sats: int
fiat_amount: float fiat_amount: float
fiat_code: str fiat_code: str
@ -224,16 +224,6 @@ class DcaSettlement(BaseModel):
error_message: Optional[str] error_message: Optional[str]
processed_at: Optional[datetime] processed_at: Optional[datetime]
created_at: datetime created_at: datetime
# Append-only audit memo. Populated when an operator triggers an in-place
# adjustment (partial-dispense, manual reconciliation override). Each
# entry timestamped + records original values so the overwrite is
# auditable from the settlement detail view alone. Never edited in place.
notes: Optional[str] = None
# Optimistic-lock claim token written when status flips to 'processing'.
# Two concurrent process_settlement invocations can't both win the claim
# (only one matching read-back). Cleared back to NULL when the leg-
# writing pass completes (status='processed' or 'errored').
processing_claim: Optional[str] = None
# ============================================================================= # =============================================================================
@ -407,60 +397,18 @@ class PartialDispenseData(BaseModel):
return v return v
class AppendSettlementNoteData(BaseModel):
"""Operator-authored free-form note on a settlement.
Notes are prepended (newest first) to the settlement's `notes` column,
with a UTC timestamp and the author's user id so each entry is
accountable. Useful for cash-drawer reconciliation context, off-the-
record refund records, or any narrative an operator wants to attach
for future reference.
"""
note: str
@validator("note")
def non_empty(cls, v):
v = v.strip() if isinstance(v, str) else v
if not v:
raise ValueError("note cannot be empty")
if len(v) > 2000:
raise ValueError("note too long (max 2000 chars)")
return v
class SettleBalanceData(BaseModel): class SettleBalanceData(BaseModel):
"""Resolves satmachineadmin#4 — operator settles small remaining LP balance """Resolves satmachineadmin#4 — operator settles small remaining LP balance
from their own wallet at a specified exchange rate. from their own wallet at the current exchange rate."""
Use case: an LP has a small remaining fiat balance (e.g. 47 GTQ) that
keeps shrinking proportionally on each new transaction (Zeno's paradox).
Operator hits 'Settle', specifies the exchange rate they're willing to
honor, and the system pays out the remaining balance in sats from the
operator's wallet. The LP's balance goes to zero; settlement legs count
against the LP's balance summary alongside DCA legs.
"""
client_id: str
funding_wallet_id: str funding_wallet_id: str
# The exchange rate the operator is settling at (sats per 1 fiat unit). # If None, settle the full remaining balance.
# Operator picks the rate so they can use exchange spot, a market
# midpoint, or a favorable rate as a gesture. Required and explicit so
# there's no ambiguity about what rate was used.
exchange_rate: float
# If None, settle the LP's full remaining balance. Else partial.
amount_fiat: Optional[float] = None amount_fiat: Optional[float] = None
notes: Optional[str] = None notes: Optional[str] = None
@validator("exchange_rate")
def positive_rate(cls, v):
if v is None or v <= 0:
raise ValueError("exchange_rate must be > 0 (sats per fiat unit)")
return float(v)
@validator("amount_fiat") @validator("amount_fiat")
def round_amount(cls, v): def round_amount(cls, v):
if v is None: if v is not None:
return v return round(float(v), 2)
if v <= 0: return v
raise ValueError("amount_fiat must be > 0 if specified")
return round(float(v), 2)

View file

@ -1,88 +1,27 @@
# Satoshi Machine v2 — invoice listener (P1). # Satoshi Machine v2 — task placeholders.
# #
# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then # The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent.
# for each successful inbound payment: # They will be replaced in P1 (Nostr subscription manager: subscribes via
# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip. # lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078
# 2. Parses Payment.extra for bitSpire split metadata (post-lamassu-next#44). # beacons + kind-30079 telemetry per registered machine, with auto-reconnect).
# Falls back to machine.fallback_commission_pct if extra is absent.
# 3. Computes the two-stage split (super_fee first, operator remainder).
# 4. Inserts a dca_settlements row idempotently (keyed by payment_hash).
# #
# The actual distribution of sats — paying out the LP DCA legs, the super-fee # These no-op stubs keep __init__.py importable in the interim so the
# leg, and the operator's commission-split legs — happens in a separate # extension can be activated even before P1 lands.
# settlement-processor task (P2). This listener only LANDS the settlement
# row; status='pending' tells the processor it still needs to move the money.
import asyncio import asyncio
from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
from loguru import logger from loguru import logger
from .bitspire import parse_settlement
from .crud import (
create_settlement_idempotent,
get_active_machine_by_wallet_id,
get_super_config,
)
from .distribution import process_settlement
LISTENER_NAME = "ext_satmachineadmin"
async def wait_for_paid_invoices() -> None: async def wait_for_paid_invoices() -> None:
invoice_queue: asyncio.Queue = asyncio.Queue() """No-op placeholder pending P1 Nostr subscription manager."""
register_invoice_listener(invoice_queue, LISTENER_NAME) logger.debug(
logger.info( "satmachineadmin v2: invoice listener stub running. "
"satmachineadmin v2: invoice listener registered as " "Real Nostr-transport subscription pending P1."
f"`{LISTENER_NAME}` — waiting for bitSpire settlements."
) )
# Sleep forever; the task system expects a long-lived coroutine.
while True: while True:
payment: Payment = await invoice_queue.get() await asyncio.sleep(3600)
try:
await _handle_payment(payment)
except Exception as exc: # listener must never die
logger.error(
f"satmachineadmin: error handling payment "
f"{payment.payment_hash[:12]}...: {exc}"
)
async def _handle_payment(payment: Payment) -> None:
if not payment.is_in or not payment.success:
return
machine = await get_active_machine_by_wallet_id(payment.wallet_id)
if machine is None:
return
super_config = await get_super_config()
super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0
data, used_fallback = parse_settlement(
machine=machine,
payment_hash=payment.payment_hash,
gross_sats=payment.sat,
extra=payment.extra or {},
super_fee_pct=super_fee_pct,
)
settlement = await create_settlement_idempotent(data)
if settlement is None:
logger.error(
f"satmachineadmin: failed to insert settlement for "
f"payment_hash={payment.payment_hash[:12]}..."
)
return
fb = " (fallback split)" if used_fallback else ""
logger.info(
f"satmachineadmin: landed settlement {settlement.id} for "
f"machine={machine.machine_npub[:12]}... "
f"gross={data.gross_sats}sats net={data.net_sats}sats "
f"commission={data.commission_sats}sats "
f"(super_fee={data.platform_fee_sats} "
f"operator_fee={data.operator_fee_sats}){fb}"
)
# Trigger distribution synchronously so latency is one bitSpire-tx wide.
# process_settlement is idempotent (status='processed' guard); if this
# task crashes mid-process, the next manual or scheduled retry resumes.
await process_settlement(settlement.id)
async def hourly_transaction_polling() -> None: async def hourly_transaction_polling() -> None:

View file

@ -1,144 +0,0 @@
"""
Tests for the v2 two-stage commission split (super first, operator remainder).
The plan calls out a verification scenario explicitly:
super_fee_pct=30%, operator split 50/30/20 on a 100-sat commission
super_wallet gets 30, operator_self gets 35, employee 21, maint 14.
Also covers the edge cases: super_fee_pct=0 (no super), super_fee_pct=1.0
(everything to super), single-leg operator ruleset, zero operator fee.
"""
import pytest
from ..calculations import (
allocate_operator_split_legs,
split_two_stage_commission,
)
class TestSplitTwoStageCommission:
"""Stage-1: super takes super_fee_pct of commission; operator gets rest."""
def test_plan_example_100sats_30pct(self):
platform, operator = split_two_stage_commission(100, 0.30)
assert platform == 30
assert operator == 70
assert platform + operator == 100
def test_realistic_7965sats_30pct(self):
# From the plan's 2000 GTQ → 266800 sats @ 3% commission example.
platform, operator = split_two_stage_commission(7965, 0.30)
assert platform == 2390 # round(7965 * 0.30) = 2389.5 → 2390
assert operator == 5575 # 7965 - 2390
assert platform + operator == 7965
def test_super_pct_zero_leaves_all_to_operator(self):
platform, operator = split_two_stage_commission(7965, 0.0)
assert platform == 0
assert operator == 7965
def test_super_pct_one_takes_everything(self):
platform, operator = split_two_stage_commission(7965, 1.0)
assert platform == 7965
assert operator == 0
def test_zero_commission(self):
platform, operator = split_two_stage_commission(0, 0.30)
assert platform == 0
assert operator == 0
def test_negative_commission_clamps_to_zero(self):
# Defensive: should never happen, but verify we don't go negative.
platform, operator = split_two_stage_commission(-100, 0.30)
assert platform == 0
assert operator == 0
@pytest.mark.parametrize("commission_sats", [1, 7, 100, 7965, 1_000_000])
@pytest.mark.parametrize("super_pct", [0.0, 0.1, 0.30, 0.5, 0.777, 1.0])
def test_invariant_sum_equals_commission(self, commission_sats, super_pct):
platform, operator = split_two_stage_commission(commission_sats, super_pct)
assert platform + operator == commission_sats
assert 0 <= platform <= commission_sats
assert 0 <= operator <= commission_sats
class TestAllocateOperatorSplitLegs:
"""Stage-2: operator's remainder split across N leg wallets per pct rules."""
def test_plan_example_50_30_20_on_70(self):
amounts = allocate_operator_split_legs(70, [0.5, 0.3, 0.2])
assert amounts == [35, 21, 14]
assert sum(amounts) == 70
def test_realistic_50_30_20_on_5575(self):
amounts = allocate_operator_split_legs(5575, [0.5, 0.3, 0.2])
# 50%: round(2787.5) = 2788; 30%: round(1672.5) = 1672; last absorbs
# remainder: 5575 - 2788 - 1672 = 1115.
# Note: round() uses banker's rounding so 2787.5 → 2788 actually
# because 2788 is even. Confirm by total invariant.
assert sum(amounts) == 5575
assert len(amounts) == 3
def test_single_leg_full_remainder(self):
amounts = allocate_operator_split_legs(100, [1.0])
assert amounts == [100]
def test_zero_operator_fee_zeros_all_legs(self):
amounts = allocate_operator_split_legs(0, [0.5, 0.5])
assert amounts == [0, 0]
def test_empty_legs_list_returns_empty(self):
amounts = allocate_operator_split_legs(100, [])
assert amounts == []
def test_last_leg_absorbs_rounding_remainder(self):
# 100 / 3 ≈ 33.33 each; rounding makes the first two 33 and last 34.
amounts = allocate_operator_split_legs(100, [1 / 3, 1 / 3, 1 / 3])
assert sum(amounts) == 100
assert amounts[0] == round(100 / 3) # 33
assert amounts[1] == round(100 / 3) # 33
# Last leg absorbs the rounding (34, not 33) so total == 100.
assert amounts[2] == 100 - amounts[0] - amounts[1]
@pytest.mark.parametrize(
"operator_fee,pcts",
[
(1, [0.5, 0.5]),
(7, [0.5, 0.3, 0.2]),
(100, [0.5, 0.5]),
(5575, [0.5, 0.3, 0.2]),
(1_000_000, [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]),
],
)
def test_invariant_sum_equals_operator_fee(self, operator_fee, pcts):
amounts = allocate_operator_split_legs(operator_fee, pcts)
assert sum(amounts) == operator_fee
assert all(a >= 0 for a in amounts)
class TestEndToEndScenarios:
"""The full two-stage split — super then operator legs — composed."""
def test_plan_example_full(self):
# 100 sats commission, super=30%, operator splits 50/30/20.
platform, operator = split_two_stage_commission(100, 0.30)
legs = allocate_operator_split_legs(operator, [0.5, 0.3, 0.2])
assert platform == 30
assert legs == [35, 21, 14]
assert platform + sum(legs) == 100
def test_super_pct_zero_full_pipeline(self):
platform, operator = split_two_stage_commission(7965, 0.0)
legs = allocate_operator_split_legs(operator, [1.0])
assert platform == 0
assert legs == [7965]
assert platform + sum(legs) == 7965
def test_super_pct_one_full_pipeline(self):
platform, operator = split_two_stage_commission(7965, 1.0)
legs = allocate_operator_split_legs(operator, [0.5, 0.5])
assert platform == 7965
# Operator has zero to distribute; both legs get zero.
assert legs == [0, 0]
assert platform + sum(legs) == 7965

View file

@ -1,661 +1,20 @@
# Satoshi Machine v2 — operator API surface (P1b). # Satoshi Machine v2 — API placeholder.
# #
# All endpoints are operator-scoped via check_user_exists. Every query # The v1 super-only Lamassu endpoints have been removed. The v2 operator-
# filters by the authenticated user's id so two operators on the same # scoped surface (machines / clients / deposits / settlements / commission
# LNbits instance can never see each other's machines, settlements, or # splits / partial-tx / balance-settle / super platform-fee) lands in P1+.
# clients. The super-only platform-fee write endpoint lands in P2. # See plan section "Critical files to modify".
#
# This stub keeps __init__.py importable and surfaces a clear 503 on every
# v1 route so existing clients get a precise error instead of a silent 404.
from http import HTTPStatus from http import HTTPStatus
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, HTTPException
from lnbits.core.crud import get_wallet
from lnbits.core.models import User
from lnbits.decorators import check_super_user, check_user_exists
from .crud import (
append_settlement_note,
create_dca_client,
create_deposit,
create_machine,
delete_dca_client,
delete_deposit,
delete_machine,
get_client_balance_summary,
get_commission_splits,
get_dca_client,
get_dca_clients_for_machine,
get_dca_clients_for_operator,
get_deposit,
get_deposits_for_client,
get_deposits_for_operator,
get_effective_commission_splits,
get_machine,
get_machines_for_operator,
get_payments_for_operator,
get_settlement,
get_settlements_for_machine,
get_settlements_for_operator,
get_super_config,
replace_commission_splits,
reset_settlement_for_retry,
update_dca_client,
update_deposit,
update_deposit_status,
update_machine,
update_super_config,
)
from .distribution import (
apply_partial_dispense_and_redistribute,
process_settlement,
settle_lp_balance,
)
from .models import (
AppendSettlementNoteData,
ClientBalanceSummary,
CommissionSplit,
CreateDcaClientData,
CreateDepositData,
CreateMachineData,
DcaClient,
DcaDeposit,
DcaPayment,
DcaSettlement,
Machine,
PartialDispenseData,
SetCommissionSplitsData,
SettleBalanceData,
SuperConfig,
UpdateDcaClientData,
UpdateDepositData,
UpdateDepositStatusData,
UpdateMachineData,
UpdateSuperConfigData,
)
satmachineadmin_api_router = APIRouter() satmachineadmin_api_router = APIRouter()
async def _assert_wallet_owned_by(wallet_id: str, user_id: str) -> None:
"""Defence-in-depth: refuse to bind any DB row to a wallet the caller
doesn't own. Used on every endpoint that accepts a wallet_id from the
request body. The DB-side UNIQUE on dca_machines.wallet_id (m007) is a
second line of defence; this check is the primary gate."""
wallet = await get_wallet(wallet_id)
if wallet is None or wallet.user != user_id:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"wallet_id is not owned by the authenticated operator",
)
# =============================================================================
# Machines
# =============================================================================
@satmachineadmin_api_router.post("/api/v1/dca/machines", response_model=Machine)
async def api_create_machine(
data: CreateMachineData, user: User = Depends(check_user_exists)
) -> Machine:
await _assert_wallet_owned_by(data.wallet_id, user.id)
return await create_machine(user.id, data)
@satmachineadmin_api_router.get(
"/api/v1/dca/machines", response_model=list[Machine]
)
async def api_list_machines(
user: User = Depends(check_user_exists),
) -> list[Machine]:
return await get_machines_for_operator(user.id)
@satmachineadmin_api_router.get(
"/api/v1/dca/machines/{machine_id}", response_model=Machine
)
async def api_get_machine(
machine_id: str, user: User = Depends(check_user_exists)
) -> Machine:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return machine
@satmachineadmin_api_router.put(
"/api/v1/dca/machines/{machine_id}", response_model=Machine
)
async def api_update_machine(
machine_id: str,
data: UpdateMachineData,
user: User = Depends(check_user_exists),
) -> Machine:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
if data.wallet_id is not None:
await _assert_wallet_owned_by(data.wallet_id, user.id)
updated = await update_machine(machine_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return updated
@satmachineadmin_api_router.delete(
"/api/v1/dca/machines/{machine_id}", status_code=HTTPStatus.NO_CONTENT
)
async def api_delete_machine(
machine_id: str, user: User = Depends(check_user_exists)
) -> None:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
await delete_machine(machine_id)
# =============================================================================
# DCA Clients (LPs) — scoped per (machine, user).
# =============================================================================
async def _machine_owned_by(machine_id: str, user_id: str) -> Machine:
"""Lookup-with-ownership guard. 404 (not 403) so operators can't probe
for other operators' machines."""
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user_id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return machine
async def _client_owned_by(client_id: str, user_id: str) -> DcaClient:
"""Lookup-with-ownership guard for an LP record; ownership is checked
transitively via the client's machine. 404 if either doesn't match."""
client = await get_dca_client(client_id)
if client is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
machine = await get_machine(client.machine_id)
if machine is None or machine.operator_user_id != user_id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
return client
@satmachineadmin_api_router.post(
"/api/v1/dca/clients", response_model=DcaClient
)
async def api_create_client(
data: CreateDcaClientData, user: User = Depends(check_user_exists)
) -> DcaClient:
# Operator can only register LPs on machines they own.
await _machine_owned_by(data.machine_id, user.id)
return await create_dca_client(data)
@satmachineadmin_api_router.get(
"/api/v1/dca/clients", response_model=list[DcaClient]
)
async def api_list_clients(
machine_id: str | None = None,
user: User = Depends(check_user_exists),
) -> list[DcaClient]:
"""List the operator's LPs. Without ?machine_id, returns all LPs across
the operator's fleet. With ?machine_id, scoped to that machine (with
ownership check)."""
if machine_id is None:
return await get_dca_clients_for_operator(user.id)
await _machine_owned_by(machine_id, user.id)
return await get_dca_clients_for_machine(machine_id)
@satmachineadmin_api_router.get(
"/api/v1/dca/clients/{client_id}", response_model=DcaClient
)
async def api_get_client(
client_id: str, user: User = Depends(check_user_exists)
) -> DcaClient:
return await _client_owned_by(client_id, user.id)
@satmachineadmin_api_router.put(
"/api/v1/dca/clients/{client_id}", response_model=DcaClient
)
async def api_update_client(
client_id: str,
data: UpdateDcaClientData,
user: User = Depends(check_user_exists),
) -> DcaClient:
await _client_owned_by(client_id, user.id)
updated = await update_dca_client(client_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
return updated
@satmachineadmin_api_router.delete(
"/api/v1/dca/clients/{client_id}", status_code=HTTPStatus.NO_CONTENT
)
async def api_delete_client(
client_id: str, user: User = Depends(check_user_exists)
) -> None:
await _client_owned_by(client_id, user.id)
await delete_dca_client(client_id)
@satmachineadmin_api_router.get(
"/api/v1/dca/clients/{client_id}/balance",
response_model=ClientBalanceSummary,
)
async def api_get_client_balance(
client_id: str, user: User = Depends(check_user_exists)
) -> ClientBalanceSummary:
await _client_owned_by(client_id, user.id)
summary = await get_client_balance_summary(client_id)
if summary is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
return summary
@satmachineadmin_api_router.post(
"/api/v1/dca/clients/{client_id}/settle", response_model=DcaPayment
)
async def api_settle_client_balance(
client_id: str,
data: SettleBalanceData,
user: User = Depends(check_user_exists),
) -> DcaPayment:
"""Operator UX — closes satmachineadmin#4.
Settle an LP's remaining fiat balance from the operator's chosen funding
wallet at the specified exchange rate. The amount_fiat is capped at the
LP's remaining balance; if omitted, settles the full remaining.
Use case: avoid the Zeno's-paradox of vanishing tiny shares for small
remaining balances. Operator hits 'Settle' on the LP, gets to specify
the rate, and the system pays out the rest in sats from their wallet.
"""
client = await _client_owned_by(client_id, user.id)
machine = await _machine_owned_by(client.machine_id, user.id)
# Verify the operator owns the funding wallet.
funding_wallet = await get_wallet(data.funding_wallet_id)
if funding_wallet is None or funding_wallet.user != user.id:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"funding_wallet_id is not owned by the authenticated operator",
)
try:
return await settle_lp_balance(client, machine, data)
except ValueError as exc:
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
# =============================================================================
# Deposits — operator records fiat handed in by an LP at a machine.
# =============================================================================
async def _deposit_owned_by(deposit_id: str, user_id: str) -> DcaDeposit:
deposit = await get_deposit(deposit_id)
if deposit is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
machine = await get_machine(deposit.machine_id)
if machine is None or machine.operator_user_id != user_id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
return deposit
@satmachineadmin_api_router.post(
"/api/v1/dca/deposits", response_model=DcaDeposit
)
async def api_create_deposit(
data: CreateDepositData, user: User = Depends(check_user_exists)
) -> DcaDeposit:
# Verify the (client_id, machine_id) pair belongs to the operator.
client = await _client_owned_by(data.client_id, user.id)
if client.machine_id != data.machine_id:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"client_id and machine_id refer to different machines",
)
return await create_deposit(user.id, data)
@satmachineadmin_api_router.get(
"/api/v1/dca/deposits", response_model=list[DcaDeposit]
)
async def api_list_deposits(
client_id: str | None = None,
user: User = Depends(check_user_exists),
) -> list[DcaDeposit]:
"""Operator's deposits across all their machines; ?client_id scopes to
a single LP (with ownership check)."""
if client_id is not None:
await _client_owned_by(client_id, user.id)
return await get_deposits_for_client(client_id)
return await get_deposits_for_operator(user.id)
@satmachineadmin_api_router.get(
"/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit
)
async def api_get_deposit(
deposit_id: str, user: User = Depends(check_user_exists)
) -> DcaDeposit:
return await _deposit_owned_by(deposit_id, user.id)
@satmachineadmin_api_router.put(
"/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit
)
async def api_update_deposit(
deposit_id: str,
data: UpdateDepositData,
user: User = Depends(check_user_exists),
) -> DcaDeposit:
existing = await _deposit_owned_by(deposit_id, user.id)
if existing.status != "pending":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"Only pending deposits can be edited",
)
updated = await update_deposit(deposit_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
return updated
@satmachineadmin_api_router.put(
"/api/v1/dca/deposits/{deposit_id}/status", response_model=DcaDeposit
)
async def api_update_deposit_status(
deposit_id: str,
data: UpdateDepositStatusData,
user: User = Depends(check_user_exists),
) -> DcaDeposit:
await _deposit_owned_by(deposit_id, user.id)
updated = await update_deposit_status(deposit_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
return updated
@satmachineadmin_api_router.delete(
"/api/v1/dca/deposits/{deposit_id}", status_code=HTTPStatus.NO_CONTENT
)
async def api_delete_deposit(
deposit_id: str, user: User = Depends(check_user_exists)
) -> None:
existing = await _deposit_owned_by(deposit_id, user.id)
if existing.status != "pending":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"Only pending deposits can be deleted",
)
await delete_deposit(deposit_id)
# =============================================================================
# Settlements (read-only at this phase; landing happens in tasks.py)
# =============================================================================
@satmachineadmin_api_router.get(
"/api/v1/dca/settlements", response_model=list[DcaSettlement]
)
async def api_list_settlements(
user: User = Depends(check_user_exists),
) -> list[DcaSettlement]:
return await get_settlements_for_operator(user.id)
@satmachineadmin_api_router.get(
"/api/v1/dca/machines/{machine_id}/settlements",
response_model=list[DcaSettlement],
)
async def api_list_settlements_for_machine(
machine_id: str, user: User = Depends(check_user_exists)
) -> list[DcaSettlement]:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return await get_settlements_for_machine(machine_id)
@satmachineadmin_api_router.get(
"/api/v1/dca/settlements/{settlement_id}", response_model=DcaSettlement
)
async def api_get_settlement(
settlement_id: str, user: User = Depends(check_user_exists)
) -> DcaSettlement:
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
return settlement
@satmachineadmin_api_router.post(
"/api/v1/dca/settlements/{settlement_id}/partial-dispense",
response_model=DcaSettlement,
)
async def api_partial_dispense(
settlement_id: str,
data: PartialDispenseData,
user: User = Depends(check_user_exists),
) -> DcaSettlement:
"""Operator UX — resolves satmachineadmin#3.
Recompute the split for a settlement that didn't dispense the full
amount (jam, mid-tx error). Provide one of dispensed_fraction (0..1)
or dispensed_sats. Optionally include a reason in notes.
Refuses when any leg has already completed Lightning payments can't
be clawed back. Use balance settlement (P3e) for those cases.
"""
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
if (data.dispensed_fraction is None) == (data.dispensed_sats is None):
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"Provide exactly one of dispensed_fraction or dispensed_sats",
)
try:
return await apply_partial_dispense_and_redistribute(settlement_id, data)
except ValueError as exc:
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
@satmachineadmin_api_router.post(
"/api/v1/dca/settlements/{settlement_id}/retry",
response_model=DcaSettlement,
)
async def api_retry_settlement(
settlement_id: str, user: User = Depends(check_user_exists)
) -> DcaSettlement:
"""Operator retry path for an errored settlement.
Voids any failed legs (completed legs are NEVER re-paid Lightning
sats already moved) and flips status 'errored' 'pending', then
re-invokes process_settlement. The optimistic-lock claim guards
against a concurrent listener re-fire racing this retry."""
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
if settlement.status != "errored":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
f"settlement status must be 'errored' to retry "
f"(currently '{settlement.status}')",
)
updated = await reset_settlement_for_retry(settlement_id)
if updated is None or updated.status != "pending":
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR, "failed to reset settlement"
)
await process_settlement(settlement_id)
after = await get_settlement(settlement_id)
return after if after is not None else updated
@satmachineadmin_api_router.post(
"/api/v1/dca/settlements/{settlement_id}/notes",
response_model=DcaSettlement,
)
async def api_append_settlement_note(
settlement_id: str,
data: AppendSettlementNoteData,
user: User = Depends(check_user_exists),
) -> DcaSettlement:
"""Operator appends a free-form note to the settlement. Useful for cash-
drawer reconciliation context, off-LN refund records, or any narrative
an operator wants to attach. Each entry is timestamped (UTC) and tagged
with the author's user id; existing entries are never modified.
For richer queryable audit (filter by author, time range, action type),
see aiolabs/satmachineadmin (future audit-table feature)."""
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
updated = await append_settlement_note(settlement_id, data.note, user.id)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
return updated
# =============================================================================
# Payments (read-only — the leg-typed breakdown of distributions)
# =============================================================================
@satmachineadmin_api_router.get(
"/api/v1/dca/payments", response_model=list[DcaPayment]
)
async def api_list_payments(
leg_type: str | None = None,
user: User = Depends(check_user_exists),
) -> list[DcaPayment]:
return await get_payments_for_operator(user.id, leg_type=leg_type)
# =============================================================================
# Commission splits — operator's rules for distributing the commission
# remainder (post-super-fee). Sum-to-1.0 invariant enforced at the model
# boundary by SetCommissionSplitsData.
# =============================================================================
@satmachineadmin_api_router.get(
"/api/v1/dca/commission-splits", response_model=list[CommissionSplit]
)
async def api_get_commission_splits(
machine_id: str | None = None,
effective: bool = False,
user: User = Depends(check_user_exists),
) -> list[CommissionSplit]:
"""No machine_id: operator's default ruleset (rows where machine_id IS NULL).
With machine_id: per-machine override only (404 the machine if not yours).
With machine_id and ?effective=true: per-machine override if set, else
operator default what the settlement processor actually applies."""
if machine_id is not None:
await _machine_owned_by(machine_id, user.id)
if effective:
return await get_effective_commission_splits(user.id, machine_id)
return await get_commission_splits(user.id, machine_id)
return await get_commission_splits(user.id, None)
@satmachineadmin_api_router.put(
"/api/v1/dca/commission-splits", response_model=list[CommissionSplit]
)
async def api_replace_commission_splits(
data: SetCommissionSplitsData,
user: User = Depends(check_user_exists),
) -> list[CommissionSplit]:
"""Atomic replace for the (operator, machine) scope. If
data.machine_id is None, replaces the operator's default ruleset;
otherwise replaces the per-machine override (machine must be owned).
Sum-to-1.0 invariant enforced upstream by the Pydantic validator."""
if data.machine_id is not None:
await _machine_owned_by(data.machine_id, user.id)
return await replace_commission_splits(user.id, data.machine_id, data.legs)
@satmachineadmin_api_router.delete(
"/api/v1/dca/commission-splits",
status_code=HTTPStatus.NO_CONTENT,
)
async def api_delete_commission_splits(
machine_id: str | None = None,
user: User = Depends(check_user_exists),
) -> None:
"""Clear a ruleset. With machine_id: clears the per-machine override
(machine falls back to operator default). Without: clears the operator
default (any per-machine overrides keep applying)."""
if machine_id is not None:
await _machine_owned_by(machine_id, user.id)
# Atomic replace with an empty leg list — same effect as DELETE WHERE.
await replace_commission_splits(user.id, machine_id, [])
# =============================================================================
# Super config — operators read; super (LNbits instance admin) writes.
# =============================================================================
@satmachineadmin_api_router.get(
"/api/v1/dca/super-config", response_model=SuperConfig
)
async def api_get_super_config(
_user: User = Depends(check_user_exists),
) -> SuperConfig:
"""Returns the platform-fee config so operators can display it as a
read-only line item in their UI. The fee is set by the LNbits super
instance-wide; operators see it but can't change it."""
config = await get_super_config()
if config is None:
raise HTTPException(
HTTPStatus.NOT_FOUND, "Super config not initialised"
)
return config
@satmachineadmin_api_router.put(
"/api/v1/dca/super-config", response_model=SuperConfig
)
async def api_update_super_config(
data: UpdateSuperConfigData,
_user: User = Depends(check_super_user),
) -> SuperConfig:
"""Super-only: set the platform fee % charged on every operator's
commission, plus the destination wallet for collecting it. The fee is
enforced before the operator's own commission_splits ruleset fires
(see distribution.process_settlement)."""
config = await update_super_config(data)
if config is None:
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config"
)
return config
# =============================================================================
# Catch-all stub for endpoints not yet implemented (clients, deposits,
# commission splits, partial-tx, balance-settle, super-config write). Each
# lands in a follow-up commit. The catch-all comes LAST so specific routes
# above take precedence.
# =============================================================================
@satmachineadmin_api_router.api_route( @satmachineadmin_api_router.api_route(
"/api/v1/dca/{full_path:path}", "/api/v1/dca/{full_path:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH"], methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
@ -663,6 +22,7 @@ async def api_update_super_config(
async def v2_in_progress_stub(full_path: str) -> None: async def v2_in_progress_stub(full_path: str) -> None:
raise HTTPException( raise HTTPException(
HTTPStatus.SERVICE_UNAVAILABLE, HTTPStatus.SERVICE_UNAVAILABLE,
f"satmachineadmin v2: /api/v1/dca/{full_path} not yet implemented " f"satmachineadmin v2 API not yet implemented (path: /{full_path}). "
"(landing in P2+). See aiolabs/satmachineadmin#9.", "The v1 Lamassu surface has been removed; per-operator endpoints "
"land in P1. See plan.",
) )