satmachineadmin/views_api.py
Padreug 794d7e5395
Some checks failed
ci.yml / feat(v2): wire fee-config publish into machine + super-config triggers (#39 3/3) (pull_request) Failing after 0s
feat(v2): wire fee-config publish into machine + super-config triggers (#39 3/3)
Three trigger points wire fee_transport.publish_fee_config into the
satmachineadmin API endpoints per the #39 spec. All three soft-fail on
transport errors — the underlying CRUD operation (machine create /
update / super-config save) succeeds even when the publish couldn't
reach the relay or the signer, and the operator can re-trigger by
editing again.

views_api.py:
- api_create_machine — publishes always after create, even when
  operator fees default to 0/0 (the resulting super-only payload is
  what unblocks the ATM past its `awaiting-fees` maintenance gate).
  Reads super_config singleton; if absent (m001 should have inserted
  it, so this is an impossible state), skips the publish to avoid
  crashing create.
- api_update_machine — publishes only when either
  operator_cash_*_fee_fraction is in the patch payload. Skip on
  name/location/wallet_id/is_active/fiat_code edits since those don't
  affect the fee model the ATM enforces (avoids unnecessary relay
  churn).
- api_update_super_config — publishes to every active machine when
  either super fraction changes. Per-machine: that machine's
  operator_user_id is the signer (machines owned by different
  operators sign with different keys); each soft-fail is independent.
  Skip if only super_fee_wallet_id changed (no fee-model impact).

Tests (9 cases, all green):
- 3 create-machine triggers: default 0/0 operator fees still publishes
  super-only payload, nonzero operator fees publish full payload,
  None super_config short-circuits without crashing
- 4 update-machine triggers: publishes on cash_in change, publishes on
  cash_out change, skips on name-only, skips on is_active-only
- 2 super-config triggers: publishes per-active-machine signed by
  each machine's operator on fraction change, skips entirely on
  wallet-id-only change (with an assertion that list_all_active_machines
  is never called, proving the short-circuit path)

191/191 tests green. Layer 2 (#39) complete; ready for joint smoke
once bitspire fixes the three deploy gaps from coord-log §2026-06-01T18:30Z
(`relay.aiolabs.dev` default, `VITE_LNBITS_HTTP_URL` dead echo,
operator-fees subscriber not running in maintenance state).

Refs: aiolabs/satmachineadmin#37 (parent), #39 (closes Layer 2),
aiolabs/lamassu-next#57 (Layer 3 consumer — blocked on bitspire-side
gaps).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 20:07:56 +02:00

1099 lines
44 KiB
Python

# Satoshi Machine v2 — operator API surface (P1b).
#
# All endpoints are operator-scoped via check_user_exists. Every query
# filters by the authenticated user's id so two operators on the same
# LNbits instance can never see each other's machines, settlements, or
# clients. The super-only platform-fee write endpoint lands in P2.
from http import HTTPStatus
from fastapi import APIRouter, Depends, HTTPException
from lnbits.core.crud import get_wallet
from lnbits.core.crud.users import get_account_by_pubkey
from lnbits.core.models import User
from lnbits.decorators import check_super_user, check_user_exists
from lnbits.utils.nostr import normalize_public_key
from .calculations import MAX_FEE_FRACTION_PER_DIRECTION
from .cassette_transport import (
CassetteTransportError,
OperatorIdentityMissing,
RelayUnavailable,
SignerUnavailable,
publish_to_atm,
)
from .fee_transport import publish_fee_config
from .crud import (
append_settlement_note,
count_completed_legs_for_settlement,
create_dca_client,
create_deposit,
create_machine,
delete_dca_client,
delete_deposit,
delete_machine,
force_reset_stuck_settlement,
get_client_balance_summary,
get_commission_splits,
get_dca_client,
get_dca_clients_for_machine,
get_dca_clients_for_operator,
get_deposit,
get_deposits_for_client,
get_deposits_for_operator,
get_effective_commission_splits,
get_machine,
get_machines_for_operator,
get_payments_for_operator,
get_settlement,
get_settlements_for_machine,
get_settlements_for_operator,
get_stuck_settlements_for_operator,
get_super_config,
list_all_active_machines,
list_cassette_configs_for_machine,
lp_is_onboarded,
replace_commission_splits,
reset_settlement_for_retry,
update_cassette_config,
update_dca_client,
update_deposit,
update_deposit_status,
update_machine,
update_super_config,
)
from .distribution import (
apply_partial_dispense_and_redistribute,
process_settlement,
settle_lp_balance,
)
from .models import (
AppendSettlementNoteData,
CassetteConfig,
ClientBalanceSummary,
CommissionSplit,
CreateDcaClientData,
CreateDepositData,
CreateMachineData,
DcaClient,
DcaDeposit,
DcaPayment,
DcaSettlement,
Machine,
PartialDispenseData,
PublishCassettesPayload,
SetCommissionSplitsData,
SettleBalanceData,
StuckSettlementsResponse,
SuperConfig,
UpdateDcaClientData,
UpdateDepositData,
UpdateDepositStatusData,
UpdateMachineData,
UpdateSuperConfigData,
UpsertCassetteConfigData,
)
satmachineadmin_api_router = APIRouter()
async def _assert_wallet_owned_by(wallet_id: str, user_id: str) -> None:
"""Defence-in-depth: refuse to bind any DB row to a wallet the caller
doesn't own. Used on every endpoint that accepts a wallet_id from the
request body. The DB-side UNIQUE on dca_machines.wallet_id (m007) is a
second line of defence; this check is the primary gate."""
wallet = await get_wallet(wallet_id)
if wallet is None or wallet.user != user_id:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"wallet_id is not owned by the authenticated operator",
)
async def _assert_no_pubkey_collision(machine_npub: str) -> None:
"""Defence-in-depth: refuse to register a machine whose npub matches
any LNbits operator account's pubkey.
Such a collision causes lnbits' nostr-transport `auth.py:resolve_
nostr_auth` to route inbound kind-21000 RPCs from the ATM directly
to that operator's wallet — works by coincidence, but breaks silently
the moment the operator's pubkey rotates (because the auto-account-
from-npub flow then fires for the ATM's now-orphaned npub, and the
invoice lands on a fresh auto-account wallet instead). Reproducer:
Greg's Sintra silent-drop on 2026-05-30T21:33Z. See
aiolabs/satmachineadmin#32 for the failure mode + this guard's
design rationale.
Path B (`#20` roster-lookup) is the architectural fix at the
routing layer; this guard prevents new operators from inadvertently
setting up the collision in the first place. Two layers of defence.
Idempotent on the same caller re-attempting machine creation with
the same npub (the second attempt hits the dca_machines.machine_npub
UNIQUE on m001, not this guard — they only collide with operator-
account pubkeys, not other machine npubs).
"""
canonical = normalize_public_key(machine_npub).lower()
matching = await get_account_by_pubkey(canonical)
if matching is not None:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
f"machine_npub {canonical[:12]}... collides with an "
f"existing LNbits operator account's pubkey. Registering "
"an ATM under this npub would silently route invoices via "
"a pubkey-collision dependency that breaks on operator "
"pubkey rotation. Use a fresh ATM keypair: lamassu-next "
"`provision-atm` regenerates one with `ATM_PRIVATE_KEY` "
"unset. See aiolabs/satmachineadmin#32."
),
)
async def _assert_machine_fee_cap_safe(
operator_in: float,
operator_out: float,
) -> None:
"""Reject create/update if (super_X + operator_X) > 0.15 for either
direction. Locked at 15% per coord-log §2026-06-01T07:22Z; defense in
depth — the bitspire consumer enforces the same cap on the wire-format
side (aiolabs/lamassu-next#57).
Fetches the current super-config singleton to pair against the
candidate per-machine fractions. NULL super-config (uninitialised
instance) treats super contribution as 0 — the cap then degenerates
to a pure operator-fee check.
"""
super_config = await get_super_config()
super_in = (
float(super_config.super_cash_in_fee_fraction) if super_config else 0.0
)
super_out = (
float(super_config.super_cash_out_fee_fraction) if super_config else 0.0
)
# Fields are stored as DECIMAL(10,4) and Pydantic validators round to
# 4 decimals on the way in, so the source-of-truth precision is 1e-4.
# Round the float-arithmetic sum to that precision before comparison so
# `0.10 + 0.05 = 0.15000000000000002` (IEEE 754) doesn't trip the cap.
total_in = round(super_in + operator_in, 4)
total_out = round(super_out + operator_out, 4)
if total_in > MAX_FEE_FRACTION_PER_DIRECTION:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
f"cash-in fee cap exceeded: super {super_in:.4f} + operator "
f"{operator_in:.4f} = {total_in:.4f} > {MAX_FEE_FRACTION_PER_DIRECTION}"
),
)
if total_out > MAX_FEE_FRACTION_PER_DIRECTION:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
f"cash-out fee cap exceeded: super {super_out:.4f} + operator "
f"{operator_out:.4f} = {total_out:.4f} > {MAX_FEE_FRACTION_PER_DIRECTION}"
),
)
async def _assert_super_config_cap_safe(
new_super_in: float | None,
new_super_out: float | None,
) -> None:
"""Reject super-config update if any active machine's
(new_super + operator) > 0.15 for either direction. Same cap policy
as _assert_machine_fee_cap_safe but checked across the fleet because
a super update affects every machine.
`None` for a direction means "no change" — pulls the current value
from super-config so the cap check still runs against the resulting
post-update state.
"""
current = await get_super_config()
effective_in = (
float(new_super_in)
if new_super_in is not None
else (float(current.super_cash_in_fee_fraction) if current else 0.0)
)
effective_out = (
float(new_super_out)
if new_super_out is not None
else (float(current.super_cash_out_fee_fraction) if current else 0.0)
)
machines = await list_all_active_machines()
for m in machines:
op_in = float(m.operator_cash_in_fee_fraction)
op_out = float(m.operator_cash_out_fee_fraction)
# Round to DECIMAL(10,4) precision — see _assert_machine_fee_cap_safe
# for the IEEE 754 motivation.
total_in = round(effective_in + op_in, 4)
total_out = round(effective_out + op_out, 4)
if total_in > MAX_FEE_FRACTION_PER_DIRECTION:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
f"super cash-in fee {effective_in:.4f} would exceed cap "
f"on machine {m.id} ({m.name or m.machine_npub[:12]}): "
f"+ operator {op_in:.4f} = "
f"{total_in:.4f} > {MAX_FEE_FRACTION_PER_DIRECTION}"
),
)
if total_out > MAX_FEE_FRACTION_PER_DIRECTION:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
f"super cash-out fee {effective_out:.4f} would exceed cap "
f"on machine {m.id} ({m.name or m.machine_npub[:12]}): "
f"+ operator {op_out:.4f} = "
f"{total_out:.4f} > {MAX_FEE_FRACTION_PER_DIRECTION}"
),
)
# =============================================================================
# Machines
# =============================================================================
@satmachineadmin_api_router.post("/api/v1/dca/machines", response_model=Machine)
async def api_create_machine(
data: CreateMachineData, user: User = Depends(check_user_exists)
) -> Machine:
await _assert_wallet_owned_by(data.wallet_id, user.id)
await _assert_no_pubkey_collision(data.machine_npub)
await _assert_machine_fee_cap_safe(
data.operator_cash_in_fee_fraction,
data.operator_cash_out_fee_fraction,
)
machine = await create_machine(user.id, data)
# Layer 2 (#39): publish initial fee config to the ATM so it can
# unblock past its `awaiting-fees` maintenance gate. Soft-fails on
# transport errors — machine creation has already succeeded.
super_config = await get_super_config()
if super_config is not None:
await publish_fee_config(machine, super_config, user.id)
return machine
@satmachineadmin_api_router.get("/api/v1/dca/machines", response_model=list[Machine])
async def api_list_machines(
user: User = Depends(check_user_exists),
) -> list[Machine]:
return await get_machines_for_operator(user.id)
@satmachineadmin_api_router.get(
"/api/v1/dca/machines/{machine_id}", response_model=Machine
)
async def api_get_machine(
machine_id: str, user: User = Depends(check_user_exists)
) -> Machine:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return machine
@satmachineadmin_api_router.put(
"/api/v1/dca/machines/{machine_id}", response_model=Machine
)
async def api_update_machine(
machine_id: str,
data: UpdateMachineData,
user: User = Depends(check_user_exists),
) -> Machine:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
if data.wallet_id is not None:
await _assert_wallet_owned_by(data.wallet_id, user.id)
# Cap check against post-update state — partial PATCH semantics:
# unset directional fields keep the machine's current value.
if (
data.operator_cash_in_fee_fraction is not None
or data.operator_cash_out_fee_fraction is not None
):
candidate_in = (
data.operator_cash_in_fee_fraction
if data.operator_cash_in_fee_fraction is not None
else float(machine.operator_cash_in_fee_fraction)
)
candidate_out = (
data.operator_cash_out_fee_fraction
if data.operator_cash_out_fee_fraction is not None
else float(machine.operator_cash_out_fee_fraction)
)
await _assert_machine_fee_cap_safe(candidate_in, candidate_out)
updated = await update_machine(machine_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
# Layer 2 (#39): if either operator fee fraction changed, publish a
# fresh kind-30078 to the ATM so it picks up the new total. Skip
# otherwise — name/location/wallet_id/is_active edits don't change
# the fee model the ATM enforces.
fees_changed = (
data.operator_cash_in_fee_fraction is not None
or data.operator_cash_out_fee_fraction is not None
)
if fees_changed:
super_config = await get_super_config()
if super_config is not None:
await publish_fee_config(updated, super_config, user.id)
return updated
@satmachineadmin_api_router.delete(
"/api/v1/dca/machines/{machine_id}", status_code=HTTPStatus.NO_CONTENT
)
async def api_delete_machine(
machine_id: str, user: User = Depends(check_user_exists)
) -> None:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
await delete_machine(machine_id)
# =============================================================================
# DCA Clients (LPs) — scoped per (machine, user).
# =============================================================================
async def _machine_owned_by(machine_id: str, user_id: str) -> Machine:
"""Lookup-with-ownership guard. 404 (not 403) so operators can't probe
for other operators' machines."""
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user_id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return machine
async def _client_owned_by(client_id: str, user_id: str) -> DcaClient:
"""Lookup-with-ownership guard for an LP record; ownership is checked
transitively via the client's machine. 404 if either doesn't match."""
client = await get_dca_client(client_id)
if client is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
machine = await get_machine(client.machine_id)
if machine is None or machine.operator_user_id != user_id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
return client
@satmachineadmin_api_router.post("/api/v1/dca/clients", response_model=DcaClient)
async def api_create_client(
data: CreateDcaClientData, user: User = Depends(check_user_exists)
) -> DcaClient:
# Operator can only register LPs on machines they own.
await _machine_owned_by(data.machine_id, user.id)
return await create_dca_client(data)
@satmachineadmin_api_router.get("/api/v1/dca/clients", response_model=list[DcaClient])
async def api_list_clients(
machine_id: str | None = None,
user: User = Depends(check_user_exists),
) -> list[DcaClient]:
"""List the operator's LPs. Without ?machine_id, returns all LPs across
the operator's fleet. With ?machine_id, scoped to that machine (with
ownership check)."""
if machine_id is None:
return await get_dca_clients_for_operator(user.id)
await _machine_owned_by(machine_id, user.id)
return await get_dca_clients_for_machine(machine_id)
@satmachineadmin_api_router.get(
"/api/v1/dca/clients/{client_id}", response_model=DcaClient
)
async def api_get_client(
client_id: str, user: User = Depends(check_user_exists)
) -> DcaClient:
return await _client_owned_by(client_id, user.id)
@satmachineadmin_api_router.put(
"/api/v1/dca/clients/{client_id}", response_model=DcaClient
)
async def api_update_client(
client_id: str,
data: UpdateDcaClientData,
user: User = Depends(check_user_exists),
) -> DcaClient:
await _client_owned_by(client_id, user.id)
updated = await update_dca_client(client_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
return updated
@satmachineadmin_api_router.delete(
"/api/v1/dca/clients/{client_id}", status_code=HTTPStatus.NO_CONTENT
)
async def api_delete_client(
client_id: str, user: User = Depends(check_user_exists)
) -> None:
await _client_owned_by(client_id, user.id)
await delete_dca_client(client_id)
@satmachineadmin_api_router.get(
"/api/v1/dca/clients/{client_id}/balance",
response_model=ClientBalanceSummary,
)
async def api_get_client_balance(
client_id: str, user: User = Depends(check_user_exists)
) -> ClientBalanceSummary:
await _client_owned_by(client_id, user.id)
summary = await get_client_balance_summary(client_id)
if summary is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
return summary
@satmachineadmin_api_router.post(
"/api/v1/dca/clients/{client_id}/settle", response_model=DcaPayment
)
async def api_settle_client_balance(
client_id: str,
data: SettleBalanceData,
user: User = Depends(check_user_exists),
) -> DcaPayment:
"""Operator UX — closes satmachineadmin#4.
Settle an LP's remaining fiat balance from the operator's chosen funding
wallet at the specified exchange rate. The amount_fiat is capped at the
LP's remaining balance; if omitted, settles the full remaining.
Use case: avoid the Zeno's-paradox of vanishing tiny shares for small
remaining balances. Operator hits 'Settle' on the LP, gets to specify
the rate, and the system pays out the rest in sats from their wallet.
"""
client = await _client_owned_by(client_id, user.id)
machine = await _machine_owned_by(client.machine_id, user.id)
# Verify the operator owns the funding wallet.
funding_wallet = await get_wallet(data.funding_wallet_id)
if funding_wallet is None or funding_wallet.user != user.id:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"funding_wallet_id is not owned by the authenticated operator",
)
try:
return await settle_lp_balance(client, machine, data)
except ValueError as exc:
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
# =============================================================================
# Deposits — operator records fiat handed in by an LP at a machine.
# =============================================================================
async def _deposit_owned_by(deposit_id: str, user_id: str) -> DcaDeposit:
deposit = await get_deposit(deposit_id)
if deposit is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
machine = await get_machine(deposit.machine_id)
if machine is None or machine.operator_user_id != user_id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
return deposit
@satmachineadmin_api_router.post("/api/v1/dca/deposits", response_model=DcaDeposit)
async def api_create_deposit(
data: CreateDepositData, user: User = Depends(check_user_exists)
) -> DcaDeposit:
# Verify the (client_id, machine_id) pair belongs to the operator.
client = await _client_owned_by(data.client_id, user.id)
if client.machine_id != data.machine_id:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"client_id and machine_id refer to different machines",
)
# Gate: refuse deposits for an LP who hasn't onboarded via
# satmachineclient. Without a dca_lp row we don't know where to
# send their DCA distributions, so accepting fiat against them
# would just queue up sats with nowhere to go. Forces the LP to
# actively register before any economic activity accrues.
if not await lp_is_onboarded(client.user_id):
raise HTTPException(
HTTPStatus.UNPROCESSABLE_ENTITY,
"LP has not onboarded yet — they must register via "
"satmachineclient and select a DCA wallet before deposits "
"can be recorded against them.",
)
# Currency is bound to the machine, not operator-choosable. Resolve
# it server-side so an operator with a UI bug / curl mistake / older
# client can't poison the LP balance with the wrong unit
# (aiolabs/satmachineadmin#26).
machine = await get_machine(data.machine_id)
if machine is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return await create_deposit(user.id, data, currency=machine.fiat_code)
@satmachineadmin_api_router.get("/api/v1/dca/deposits", response_model=list[DcaDeposit])
async def api_list_deposits(
client_id: str | None = None,
user: User = Depends(check_user_exists),
) -> list[DcaDeposit]:
"""Operator's deposits across all their machines; ?client_id scopes to
a single LP (with ownership check)."""
if client_id is not None:
await _client_owned_by(client_id, user.id)
return await get_deposits_for_client(client_id)
return await get_deposits_for_operator(user.id)
@satmachineadmin_api_router.get(
"/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit
)
async def api_get_deposit(
deposit_id: str, user: User = Depends(check_user_exists)
) -> DcaDeposit:
return await _deposit_owned_by(deposit_id, user.id)
@satmachineadmin_api_router.put(
"/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit
)
async def api_update_deposit(
deposit_id: str,
data: UpdateDepositData,
user: User = Depends(check_user_exists),
) -> DcaDeposit:
existing = await _deposit_owned_by(deposit_id, user.id)
if existing.status != "pending":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"Only pending deposits can be edited",
)
updated = await update_deposit(deposit_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
return updated
@satmachineadmin_api_router.put(
"/api/v1/dca/deposits/{deposit_id}/status", response_model=DcaDeposit
)
async def api_update_deposit_status(
deposit_id: str,
data: UpdateDepositStatusData,
user: User = Depends(check_user_exists),
) -> DcaDeposit:
await _deposit_owned_by(deposit_id, user.id)
updated = await update_deposit_status(deposit_id, data)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
return updated
@satmachineadmin_api_router.delete(
"/api/v1/dca/deposits/{deposit_id}", status_code=HTTPStatus.NO_CONTENT
)
async def api_delete_deposit(
deposit_id: str, user: User = Depends(check_user_exists)
) -> None:
existing = await _deposit_owned_by(deposit_id, user.id)
if existing.status != "pending":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"Only pending deposits can be deleted",
)
await delete_deposit(deposit_id)
# =============================================================================
# Settlements (read-only at this phase; landing happens in tasks.py)
# =============================================================================
@satmachineadmin_api_router.get(
"/api/v1/dca/settlements", response_model=list[DcaSettlement]
)
async def api_list_settlements(
user: User = Depends(check_user_exists),
) -> list[DcaSettlement]:
return await get_settlements_for_operator(user.id)
@satmachineadmin_api_router.get(
"/api/v1/dca/machines/{machine_id}/settlements",
response_model=list[DcaSettlement],
)
async def api_list_settlements_for_machine(
machine_id: str, user: User = Depends(check_user_exists)
) -> list[DcaSettlement]:
machine = await get_machine(machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
return await get_settlements_for_machine(machine_id)
# NOTE on route ordering: FastAPI matches in declaration order. The literal
# /settlements/stuck must be registered BEFORE /settlements/{settlement_id}
# so the literal wins. Same applies to any future literal sub-route under
# /settlements/* (don't reshuffle this section without re-confirming the
# order).
@satmachineadmin_api_router.get(
"/api/v1/dca/settlements/stuck", response_model=StuckSettlementsResponse
)
async def api_list_stuck_settlements(
threshold_minutes: int = 30,
user: User = Depends(check_user_exists),
) -> StuckSettlementsResponse:
"""Operator worklist of settlements that didn't process cleanly.
Returns four lists:
- rejected: Nostr attribution cross-check failed — signer didn't
match the machine identity. Investigate; do not retry.
- errored: distribution ran and failed; retry endpoint handles these
- stuck_pending: landed but never picked up by the processor
- stuck_processing: claim taken but no completion in N minutes
`threshold_minutes` controls the age threshold for 'stuck' (default 30).
Operators can force-recover stuck-processing settlements via
POST /api/v1/dca/settlements/{id}/force-reset."""
if threshold_minutes < 1:
raise HTTPException(HTTPStatus.BAD_REQUEST, "threshold_minutes must be >= 1")
buckets = await get_stuck_settlements_for_operator(user.id, threshold_minutes)
return StuckSettlementsResponse(
threshold_minutes=threshold_minutes,
rejected=buckets["rejected"],
errored=buckets["errored"],
stuck_pending=buckets["stuck_pending"],
stuck_processing=buckets["stuck_processing"],
)
@satmachineadmin_api_router.get(
"/api/v1/dca/settlements/{settlement_id}", response_model=DcaSettlement
)
async def api_get_settlement(
settlement_id: str, user: User = Depends(check_user_exists)
) -> DcaSettlement:
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
return settlement
@satmachineadmin_api_router.post(
"/api/v1/dca/settlements/{settlement_id}/partial-dispense",
response_model=DcaSettlement,
)
async def api_partial_dispense(
settlement_id: str,
data: PartialDispenseData,
user: User = Depends(check_user_exists),
) -> DcaSettlement:
"""Operator UX — resolves satmachineadmin#3.
Recompute the split for a settlement that didn't dispense the full
amount (jam, mid-tx error). Provide one of dispensed_fraction (0..1)
or dispensed_sats. Optionally include a reason in notes.
Refuses when any leg has already completed — Lightning payments can't
be clawed back. Use balance settlement (P3e) for those cases.
"""
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
if (data.dispensed_fraction is None) == (data.dispensed_sats is None):
raise HTTPException(
HTTPStatus.BAD_REQUEST,
"Provide exactly one of dispensed_fraction or dispensed_sats",
)
try:
return await apply_partial_dispense_and_redistribute(settlement_id, data)
except ValueError as exc:
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
@satmachineadmin_api_router.post(
"/api/v1/dca/settlements/{settlement_id}/force-reset",
response_model=DcaSettlement,
)
async def api_force_reset_settlement(
settlement_id: str,
threshold_minutes: int = 30,
user: User = Depends(check_user_exists),
) -> DcaSettlement:
"""Operator escape hatch for genuinely stuck settlements (processor
crashed mid-flight, claim never released). Flips status
'pending'/'processing''errored' so the retry endpoint can take over.
Refuses unless the settlement is older than `threshold_minutes` so an
operator can't accidentally interrupt a slow-but-running settlement.
Threshold check uses created_at as a proxy — adequate for v1 since the
processor either completes fast or it crashed."""
from datetime import datetime, timedelta, timezone
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
if settlement.status not in ("pending", "processing"):
raise HTTPException(
HTTPStatus.BAD_REQUEST,
f"settlement status must be 'pending' or 'processing' to "
f"force-reset (currently '{settlement.status}')",
)
# Age check — refuse if settlement is fresh (processor might still
# be running normally). Both sides made timezone-aware before compare.
created = settlement.created_at
if created.tzinfo is None:
created = created.replace(tzinfo=timezone.utc)
age = datetime.now(timezone.utc) - created
if age < timedelta(minutes=threshold_minutes):
raise HTTPException(
HTTPStatus.BAD_REQUEST,
f"settlement is only {age.total_seconds() / 60:.1f} minutes "
f"old (threshold {threshold_minutes}m); refusing to force-reset "
"a possibly-still-running settlement",
)
updated = await force_reset_stuck_settlement(settlement_id)
if updated is None:
raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR, "failed to force-reset")
return updated
@satmachineadmin_api_router.post(
"/api/v1/dca/settlements/{settlement_id}/retry",
response_model=DcaSettlement,
)
async def api_retry_settlement(
settlement_id: str, user: User = Depends(check_user_exists)
) -> DcaSettlement:
"""Operator retry path for an errored settlement.
Voids any failed legs (completed legs are NEVER re-paid — Lightning
sats already moved) and flips status 'errored''pending', then
re-invokes process_settlement. The optimistic-lock claim guards
against a concurrent listener re-fire racing this retry.
REFUSES when any leg has already completed. Reason: process_settlement
re-creates every leg from scratch (super_fee + operator_split + dca);
if a previous attempt already completed some of them, retrying would
DOUBLE-PAY those legs. For partial-success failures, the operator
needs to either edit the commission_splits ruleset to remove the
already-paid targets before retry, or manually pay the missing legs
out-of-band.
"""
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
if settlement.status != "errored":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
f"settlement status must be 'errored' to retry "
f"(currently '{settlement.status}')",
)
completed = await count_completed_legs_for_settlement(settlement_id)
if completed > 0:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
f"refusing to retry: {completed} leg(s) already completed. "
"Re-running distribution would double-pay them. Edit the "
"commission_splits ruleset to remove the already-paid targets, "
"or manually pay the missing legs.",
)
updated = await reset_settlement_for_retry(settlement_id)
if updated is None or updated.status != "pending":
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR, "failed to reset settlement"
)
await process_settlement(settlement_id)
after = await get_settlement(settlement_id)
return after if after is not None else updated
@satmachineadmin_api_router.post(
"/api/v1/dca/settlements/{settlement_id}/notes",
response_model=DcaSettlement,
)
async def api_append_settlement_note(
settlement_id: str,
data: AppendSettlementNoteData,
user: User = Depends(check_user_exists),
) -> DcaSettlement:
"""Operator appends a free-form note to the settlement. Useful for cash-
drawer reconciliation context, off-LN refund records, or any narrative
an operator wants to attach. Each entry is timestamped (UTC) and tagged
with the author's user id; existing entries are never modified.
For richer queryable audit (filter by author, time range, action type),
see aiolabs/satmachineadmin (future audit-table feature)."""
settlement = await get_settlement(settlement_id)
if settlement is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
machine = await get_machine(settlement.machine_id)
if machine is None or machine.operator_user_id != user.id:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
updated = await append_settlement_note(settlement_id, data.note, user.id)
if updated is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
return updated
# =============================================================================
# Payments (read-only — the leg-typed breakdown of distributions)
# =============================================================================
@satmachineadmin_api_router.get("/api/v1/dca/payments", response_model=list[DcaPayment])
async def api_list_payments(
leg_type: str | None = None,
user: User = Depends(check_user_exists),
) -> list[DcaPayment]:
return await get_payments_for_operator(user.id, leg_type=leg_type)
# =============================================================================
# Commission splits — operator's rules for distributing the commission
# remainder (post-super-fee). Sum-to-1.0 invariant enforced at the model
# boundary by SetCommissionSplitsData.
# =============================================================================
@satmachineadmin_api_router.get(
"/api/v1/dca/commission-splits", response_model=list[CommissionSplit]
)
async def api_get_commission_splits(
machine_id: str | None = None,
effective: bool = False,
user: User = Depends(check_user_exists),
) -> list[CommissionSplit]:
"""No machine_id: operator's default ruleset (rows where machine_id IS NULL).
With machine_id: per-machine override only (404 the machine if not yours).
With machine_id and ?effective=true: per-machine override if set, else
operator default — what the settlement processor actually applies."""
if machine_id is not None:
await _machine_owned_by(machine_id, user.id)
if effective:
return await get_effective_commission_splits(user.id, machine_id)
return await get_commission_splits(user.id, machine_id)
return await get_commission_splits(user.id, None)
@satmachineadmin_api_router.put(
"/api/v1/dca/commission-splits", response_model=list[CommissionSplit]
)
async def api_replace_commission_splits(
data: SetCommissionSplitsData,
user: User = Depends(check_user_exists),
) -> list[CommissionSplit]:
"""Atomic replace for the (operator, machine) scope. If
data.machine_id is None, replaces the operator's default ruleset;
otherwise replaces the per-machine override (machine must be owned).
Sum-to-1.0 invariant enforced upstream by the Pydantic validator."""
if data.machine_id is not None:
await _machine_owned_by(data.machine_id, user.id)
return await replace_commission_splits(user.id, data.machine_id, data.legs)
@satmachineadmin_api_router.delete(
"/api/v1/dca/commission-splits",
status_code=HTTPStatus.NO_CONTENT,
)
async def api_delete_commission_splits(
machine_id: str | None = None,
user: User = Depends(check_user_exists),
) -> None:
"""Clear a ruleset. With machine_id: clears the per-machine override
(machine falls back to operator default). Without: clears the operator
default (any per-machine overrides keep applying)."""
if machine_id is not None:
await _machine_owned_by(machine_id, user.id)
# Atomic replace with an empty leg list — same effect as DELETE WHERE.
await replace_commission_splits(user.id, machine_id, [])
# =============================================================================
# Super config — operators read; super (LNbits instance admin) writes.
# =============================================================================
@satmachineadmin_api_router.get("/api/v1/dca/super-config", response_model=SuperConfig)
async def api_get_super_config(
_user: User = Depends(check_user_exists),
) -> SuperConfig:
"""Returns the platform-fee config so operators can display it as a
read-only line item in their UI. The fee is set by the LNbits super
instance-wide; operators see it but can't change it."""
config = await get_super_config()
if config is None:
raise HTTPException(HTTPStatus.NOT_FOUND, "Super config not initialised")
return config
@satmachineadmin_api_router.put("/api/v1/dca/super-config", response_model=SuperConfig)
async def api_update_super_config(
data: UpdateSuperConfigData,
_user: User = Depends(check_super_user),
) -> SuperConfig:
"""Super-only: set the platform fee % charged on every operator's
commission, plus the destination wallet for collecting it. The fee is
enforced before the operator's own commission_splits ruleset fires
(see distribution.process_settlement)."""
await _assert_super_config_cap_safe(
data.super_cash_in_fee_fraction,
data.super_cash_out_fee_fraction,
)
config = await update_super_config(data)
if config is None:
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config"
)
# Layer 2 (#39): a super-fee change ripples to every active machine
# since each machine's total = super + machine.operator. Republish
# per-machine with that machine's operator as the signer.
# Soft-fails per machine independently; partial success is acceptable
# (the operator whose publish failed can re-trigger via a machine
# edit). Skip if neither directional fraction was touched in this
# update (e.g. caller only changed super_fee_wallet_id).
super_fractions_changed = (
data.super_cash_in_fee_fraction is not None
or data.super_cash_out_fee_fraction is not None
)
if super_fractions_changed:
for machine in await list_all_active_machines():
await publish_fee_config(machine, config, machine.operator_user_id)
return config
# =============================================================================
# Cassette configs (#29 v1.1) — per-machine ATM cassette inventory
# =============================================================================
# v1.1 surface, paired with aiolabs/lamassu-next#56 ATM-side. Two endpoints:
# GET /machines/{id}/cassettes — list rows for the operator UI
# POST /machines/{id}/cassettes/publish — apply edits + publish kind-30078
#
# Row creation (new (machine_id, position) pairs) is admin-only via the
# bootstrap consumer task — slot count is hardware-determined. Operator-
# side flow is edit-and-publish over the existing rows only; the editable
# fields per row are denomination and count.
@satmachineadmin_api_router.get(
"/api/v1/dca/machines/{machine_id}/cassettes",
response_model=list[CassetteConfig],
)
async def api_list_machine_cassettes(
machine_id: str, user: User = Depends(check_user_exists)
) -> list[CassetteConfig]:
"""List the cassette config rows for one of the operator's machines,
ordered by position. Empty list = ATM hasn't yet published its
bootstrap event (or the bootstrap consumer hasn't processed it yet);
UI should show a "waiting for ATM" state."""
await _machine_owned_by(machine_id, user.id)
return await list_cassette_configs_for_machine(machine_id)
@satmachineadmin_api_router.post(
"/api/v1/dca/machines/{machine_id}/cassettes/publish",
response_model=list[CassetteConfig],
)
async def api_publish_machine_cassettes(
machine_id: str,
payload: PublishCassettesPayload,
user: User = Depends(check_user_exists),
) -> list[CassetteConfig]:
"""Operator submits the full per-machine cassette state for publish to
the ATM. Validates the position set matches what's currently in
cassette_configs for the machine (slot count is hardware-fixed),
upserts each row, then encrypts + signs + publishes a kind-30078
event tagged with d=bitspire-cassettes:<atm_pubkey_hex> and
p=<atm_pubkey_hex>.
The `<m>` placeholder in the published d-tag is the ATM's hex pubkey
from machine.machine_npub (canonicalised via normalize_public_key),
NOT the internal dca_machines.id UUID — see #29 'machine_id semantics'
section and coord-log 2026-05-30T11:50Z load-bearing nudge.
Returns the fresh cassette_configs rows after the upserts so the UI
can refresh its table from one round-trip.
Errors:
400 — payload position set doesn't match the machine's stored set
(operator publishing for a slot that doesn't exist on the
ATM; or the bootstrap hasn't landed yet so no rows exist)
400 — operator hasn't onboarded a Nostr identity
503 — signer offline / client-side-only, or nostrclient extension
not installed on this LNbits instance
500 — anything else from the publish path
"""
machine = await _machine_owned_by(machine_id, user.id)
existing = await list_cassette_configs_for_machine(machine_id)
existing_positions = {row.position for row in existing}
incoming_positions = set(payload.positions.keys())
if not existing:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
"No cassette_configs rows exist for this machine yet — "
"waiting for the ATM's bootstrap state event. Power on the "
"ATM and confirm it has reached the configured relay; "
"satmachineadmin will auto-populate cassette_configs on "
"receipt."
),
)
if existing_positions != incoming_positions:
missing = existing_positions - incoming_positions
extra = incoming_positions - existing_positions
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
"Payload position set doesn't match the machine's stored "
f"set. Missing from payload: {sorted(missing)}; extra in "
f"payload: {sorted(extra)}. Slot count is hardware-fixed "
"— re-provision the ATM via atm-tui to add/remove physical "
"bays, then re-publish."
),
)
# Apply each per-row edit so the operator-believed state on
# satmachineadmin reflects the published payload, even if the ATM
# ack lands later (v2). updated_by audit-stamps the operator user id.
for pos, row in payload.positions.items():
updated = await update_cassette_config(
machine_id,
pos,
UpsertCassetteConfigData(denomination=row.denomination, count=row.count),
updated_by=user.id,
)
if updated is None:
# Defensive — we just validated the row exists, but a
# concurrent delete could land between. Surface as 500.
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"cassette row for position {pos} disappeared mid-publish",
)
try:
await publish_to_atm(machine, payload, user.id)
except OperatorIdentityMissing as exc:
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
except SignerUnavailable as exc:
raise HTTPException(HTTPStatus.SERVICE_UNAVAILABLE, str(exc)) from exc
except RelayUnavailable as exc:
raise HTTPException(HTTPStatus.SERVICE_UNAVAILABLE, str(exc)) from exc
except CassetteTransportError as exc:
raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR, str(exc)) from exc
return await list_cassette_configs_for_machine(machine_id)