satmachineadmin/crud.py
Padreug d9e8a04b8b feat(v2): record fee_mismatch_sats per settlement, Phase 1 (#38 4/5)
Phase-1 observability per coord-log §2026-06-01T07:00Z (option A
locked: always record, no enforce_fee_match gate):

  fee_mismatch_sats = bitspire_fee_sats - (platform_fee_sats + operator_fee_sats)

Positive = bitspire over-reported; negative = under-reported; zero =
exact match. Recorded unconditionally on every settlement; WARN-
logged via loguru only when |delta| > tolerance, where
tolerance = max(1, int(principal_sats * 0.001)) — 1-sat floor with
0.1% relative ceiling.

bitspire.py:parse_settlement:
- Computes the delta after split_principal_based returns.
- WARN log line carries bitspire_fee_sats / expected / delta /
  tolerance / principal / both fractions / tx_type / machine-npub
  prefix for triage queries.
- Always stamps fee_mismatch_sats onto CreateDcaSettlementData.
- Comment explains the pre-Layer-3 expectation: large deltas are
  expected while the ATM hardcodes 7.77% cash-out (aiolabs/lamassu-
  next#57); the data here will quiet once Layer 3 ships.

crud.py:create_settlement_idempotent: extends the INSERT to persist
the new column.

Tests:
- tests/conftest.py: `loguru_capture` fixture — loguru routes to a
  pre-bound stderr sink that pytest's caplog (stdlib only) misses and
  capsys can't see; the fixture adds a list-sink for the test's
  duration. Reusable for future log-behavior tests.
- tests/test_fee_mismatch_recording.py: 8 cases covering exact-match
  zero delta, bitspire over- and under-reporting, the pre-Layer-3
  large-delta scenario, within-tolerance silence, over-tolerance
  warning, diagnostic-fields presence in the WARN line, and the
  1-sat floor on tiny-principal settlements.

164/164 tests green. Phase 2 (reject on out-of-tolerance) lands as
a follow-up once observability data justifies the tighter posture.

Refs: aiolabs/satmachineadmin#38 (Layer 1), coord-log §2026-06-01T07:00Z
(lnbits advisory + option A lock).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 14:34:25 +02:00

1537 lines
52 KiB
Python

# Satoshi Machine v2 — CRUD layer over the m005 schema.
#
# All operator-scoped queries take an operator_user_id and enforce isolation
# at the SQL boundary. Cross-operator LP queries (for satmachineclient) join
# through dca_machines.operator_user_id. See plan section "Identity & multi-
# machine model".
from datetime import datetime
from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
from .models import (
CassetteConfig,
ClientBalanceSummary,
CommissionSplit,
CommissionSplitLeg,
CreateDcaClientData,
CreateDcaPaymentData,
CreateDcaSettlementData,
CreateDepositData,
CreateMachineData,
DcaClient,
DcaDeposit,
DcaLpPreferences,
DcaPayment,
DcaSettlement,
Machine,
PublishCassettesPayload,
SuperConfig,
TelemetrySnapshot,
UpdateDcaClientData,
UpdateDepositData,
UpdateDepositStatusData,
UpdateMachineData,
UpdateSuperConfigData,
UpsertCassetteConfigData,
UpsertDcaLpData,
)
db = Database("ext_satoshimachine")
# =============================================================================
# Super config
# =============================================================================
async def get_super_config() -> SuperConfig | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.super_config WHERE id = :id",
{"id": "default"},
SuperConfig,
)
async def update_super_config(data: UpdateSuperConfigData) -> SuperConfig | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_super_config()
update_data["updated_at"] = datetime.now()
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["id"] = "default"
await db.execute(
f"UPDATE satoshimachine.super_config SET {set_clause} WHERE id = :id",
update_data,
)
return await get_super_config()
# =============================================================================
# Machines
# =============================================================================
async def create_machine(operator_user_id: str, data: CreateMachineData) -> Machine:
machine_id = urlsafe_short_hash()
now = datetime.now()
await db.execute(
"""
INSERT INTO satoshimachine.dca_machines
(id, operator_user_id, machine_npub, wallet_id, name, location,
fiat_code, is_active,
operator_cash_in_fee_fraction, operator_cash_out_fee_fraction,
created_at, updated_at)
VALUES (:id, :operator_user_id, :machine_npub, :wallet_id, :name,
:location, :fiat_code, :is_active,
:operator_cash_in_fee_fraction, :operator_cash_out_fee_fraction,
:created_at, :updated_at)
""",
{
"id": machine_id,
"operator_user_id": operator_user_id,
"machine_npub": data.machine_npub,
"wallet_id": data.wallet_id,
"name": data.name,
"location": data.location,
"fiat_code": data.fiat_code,
"is_active": True,
"operator_cash_in_fee_fraction": data.operator_cash_in_fee_fraction,
"operator_cash_out_fee_fraction": data.operator_cash_out_fee_fraction,
"created_at": now,
"updated_at": now,
},
)
machine = await get_machine(machine_id)
assert machine is not None
return machine
async def get_machine(machine_id: str) -> Machine | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_machines WHERE id = :id",
{"id": machine_id},
Machine,
)
async def get_machine_by_npub(machine_npub: str) -> Machine | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_machines WHERE machine_npub = :npub",
{"npub": machine_npub},
Machine,
)
async def get_active_machine_by_wallet_id(wallet_id: str) -> Machine | None:
"""Used by the invoice listener to route an incoming payment to a machine."""
return await db.fetchone(
"""
SELECT * FROM satoshimachine.dca_machines
WHERE wallet_id = :wid AND is_active = true
LIMIT 1
""",
{"wid": wallet_id},
Machine,
)
async def get_machines_for_operator(operator_user_id: str) -> list[Machine]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_machines
WHERE operator_user_id = :uid
ORDER BY created_at DESC
""",
{"uid": operator_user_id},
Machine,
)
async def list_all_active_machines() -> list[Machine]:
"""Used by the cassette bootstrap consumer task to build a single
cross-operator subscription filter. Each event's pubkey routes to
the right operator via get_machine_by_atm_pubkey_hex + the machine's
operator_user_id.
"""
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_machines
WHERE is_active = true
ORDER BY created_at DESC
""",
{},
Machine,
)
async def get_machine_by_atm_pubkey_hex(atm_pubkey_hex: str) -> Machine | None:
"""Look up an active machine by its ATM pubkey, accepting hex or bech32
in machine_npub. Used by the cassette bootstrap consumer to route an
incoming state event to the right machine row (and therefore operator
privkey for decryption).
O(N) over active machines — fine for small fleets. If fleet sizes
grow, normalise machine_npub-at-write to hex and add an index.
"""
from lnbits.utils.nostr import normalize_public_key
target = atm_pubkey_hex.lower()
machines = await list_all_active_machines()
for m in machines:
try:
if normalize_public_key(m.machine_npub).lower() == target:
return m
except (ValueError, AssertionError):
continue
return None
async def update_machine(machine_id: str, data: UpdateMachineData) -> Machine | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_machine(machine_id)
update_data["updated_at"] = datetime.now()
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["id"] = machine_id
await db.execute(
f"UPDATE satoshimachine.dca_machines SET {set_clause} WHERE id = :id",
update_data,
)
return await get_machine(machine_id)
async def delete_machine(machine_id: str) -> None:
await db.execute(
"DELETE FROM satoshimachine.dca_machines WHERE id = :id",
{"id": machine_id},
)
# =============================================================================
# DCA Clients (LPs)
# =============================================================================
async def create_dca_client(data: CreateDcaClientData) -> DcaClient:
"""Operator enrols an LP at one of their machines.
Pure (machine, LP) record. Wallet / mode / autoforward live on
dca_lp (per-user) — populated by the LP via satmachineclient.
Enrolment doesn't require the LP to be onboarded yet, but deposits
do (see `create_deposit`).
"""
client_id = urlsafe_short_hash()
now = datetime.now()
await db.execute(
"""
INSERT INTO satoshimachine.dca_clients
(id, machine_id, user_id, username, status, created_at, updated_at)
VALUES (:id, :machine_id, :user_id, :username, :status,
:created_at, :updated_at)
""",
{
"id": client_id,
"machine_id": data.machine_id,
"user_id": data.user_id,
"username": data.username,
"status": "active",
"created_at": now,
"updated_at": now,
},
)
client = await get_dca_client(client_id)
assert client is not None
return client
# Shared SELECT fragment: client columns plus the LP-onboarded flag
# computed via LEFT JOIN on dca_lp. Returned as `lp_onboarded` (boolean
# 0/1 in SQLite, which Pydantic coerces to bool on the DcaClient model).
_CLIENT_SELECT = """
c.id, c.machine_id, c.user_id, c.username, c.status,
c.created_at, c.updated_at,
(lp.user_id IS NOT NULL) AS lp_onboarded
"""
_CLIENT_FROM = (
"satoshimachine.dca_clients c "
"LEFT JOIN satoshimachine.dca_lp lp ON lp.user_id = c.user_id"
)
async def get_dca_client(client_id: str) -> DcaClient | None:
return await db.fetchone(
f"SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM} WHERE c.id = :id",
{"id": client_id},
DcaClient,
)
async def get_dca_client_for_machine_user(
machine_id: str, user_id: str
) -> DcaClient | None:
return await db.fetchone(
f"""
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
WHERE c.machine_id = :machine_id AND c.user_id = :user_id
""",
{"machine_id": machine_id, "user_id": user_id},
DcaClient,
)
async def get_dca_clients_for_machine(machine_id: str) -> list[DcaClient]:
return await db.fetchall(
f"""
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
WHERE c.machine_id = :machine_id
ORDER BY c.created_at DESC
""",
{"machine_id": machine_id},
DcaClient,
)
async def get_dca_clients_for_operator(operator_user_id: str) -> list[DcaClient]:
"""All clients across every machine this operator owns."""
return await db.fetchall(
f"""
SELECT {_CLIENT_SELECT}
FROM {_CLIENT_FROM}
JOIN satoshimachine.dca_machines m ON m.id = c.machine_id
WHERE m.operator_user_id = :uid
ORDER BY c.created_at DESC
""",
{"uid": operator_user_id},
DcaClient,
)
async def get_dca_clients_for_user(user_id: str) -> list[DcaClient]:
"""LP cross-operator view — every machine this LP is registered at."""
return await db.fetchall(
f"""
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
WHERE c.user_id = :user_id
ORDER BY c.created_at DESC
""",
{"user_id": user_id},
DcaClient,
)
async def get_flow_mode_clients_for_machine(machine_id: str) -> list[DcaClient]:
"""Active LPs enrolled at this machine whose per-user `dca_lp` row
has `default_dca_mode = 'flow'`. Used by the distribution algorithm.
An LP enrolment without a matching `dca_lp` row (i.e., the LP hasn't
onboarded via satmachineclient yet) is filtered out by the INNER
JOIN — there's no destination wallet to pay to.
"""
return await db.fetchall(
"""
SELECT c.*
FROM satoshimachine.dca_clients c
JOIN satoshimachine.dca_lp lp ON lp.user_id = c.user_id
WHERE c.machine_id = :machine_id
AND lp.default_dca_mode = 'flow'
AND c.status = 'active'
ORDER BY c.created_at ASC
""",
{"machine_id": machine_id},
DcaClient,
)
# =============================================================================
# DCA LP preferences (per-user) — wallet + mode + autoforward
# =============================================================================
async def get_dca_lp(user_id: str) -> DcaLpPreferences | None:
"""Return the LP's preferences row, or None if they haven't onboarded
via satmachineclient yet."""
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_lp WHERE user_id = :uid",
{"uid": user_id},
DcaLpPreferences,
)
async def lp_is_onboarded(user_id: str) -> bool:
"""Cheap existence check used by the deposit-creation gate."""
row = await db.fetchone(
"SELECT user_id FROM satoshimachine.dca_lp WHERE user_id = :uid",
{"uid": user_id},
)
return row is not None
async def upsert_dca_lp(
user_id: str,
data: UpsertDcaLpData,
*,
fallback_wallet_id: str | None = None,
) -> DcaLpPreferences:
"""Create or update the LP's preferences row.
First call (no row yet): `data.dca_wallet_id` must be set OR
`fallback_wallet_id` must be provided (satmachineclient passes the
LP's default LNbits wallet here when auto-seeding on first dashboard
visit). Subsequent calls update only the fields in `data` that are
non-None.
"""
existing = await get_dca_lp(user_id)
now = datetime.now()
if existing is None:
wallet_id = data.dca_wallet_id or fallback_wallet_id
if not wallet_id:
raise ValueError(
"first upsert requires dca_wallet_id (or fallback_wallet_id)"
)
await db.execute(
"""
INSERT INTO satoshimachine.dca_lp
(user_id, dca_wallet_id, default_dca_mode, fixed_mode_daily_limit,
autoforward_ln_address, autoforward_enabled,
created_at, updated_at)
VALUES (:uid, :wallet, :mode, :limit, :ln_addr, :auto,
:now, :now)
""",
{
"uid": user_id,
"wallet": wallet_id,
"mode": data.default_dca_mode or "flow",
"limit": data.fixed_mode_daily_limit,
"ln_addr": data.autoforward_ln_address,
"auto": data.autoforward_enabled or False,
"now": now,
},
)
else:
update_data: dict = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return existing
update_data["updated_at"] = now
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["uid"] = user_id
await db.execute(
f"UPDATE satoshimachine.dca_lp SET {set_clause} WHERE user_id = :uid",
update_data,
)
refreshed = await get_dca_lp(user_id)
assert refreshed is not None
return refreshed
async def update_dca_client(
client_id: str, data: UpdateDcaClientData
) -> DcaClient | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_dca_client(client_id)
update_data["updated_at"] = datetime.now()
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["id"] = client_id
await db.execute(
f"UPDATE satoshimachine.dca_clients SET {set_clause} WHERE id = :id",
update_data,
)
return await get_dca_client(client_id)
async def delete_dca_client(client_id: str) -> None:
await db.execute(
"DELETE FROM satoshimachine.dca_clients WHERE id = :id",
{"id": client_id},
)
# =============================================================================
# Deposits
# =============================================================================
async def create_deposit(
creator_user_id: str, data: CreateDepositData, *, currency: str
) -> DcaDeposit:
"""Insert a deposit row.
`currency` is passed explicitly by the caller (the API endpoint
resolves it from the target machine's `fiat_code`) rather than
coming off the request body — the operator doesn't get to choose
it (`aiolabs/satmachineadmin#26`).
"""
deposit_id = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO satoshimachine.dca_deposits
(id, client_id, machine_id, creator_user_id, amount, currency,
status, notes, created_at)
VALUES (:id, :client_id, :machine_id, :creator_user_id, :amount,
:currency, :status, :notes, :created_at)
""",
{
"id": deposit_id,
"client_id": data.client_id,
"machine_id": data.machine_id,
"creator_user_id": creator_user_id,
"amount": data.amount,
"currency": currency,
"status": "pending",
"notes": data.notes,
"created_at": datetime.now(),
},
)
deposit = await get_deposit(deposit_id)
assert deposit is not None
return deposit
async def get_deposit(deposit_id: str) -> DcaDeposit | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_deposits WHERE id = :id",
{"id": deposit_id},
DcaDeposit,
)
async def get_deposits_for_client(client_id: str) -> list[DcaDeposit]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_deposits
WHERE client_id = :client_id
ORDER BY created_at DESC
""",
{"client_id": client_id},
DcaDeposit,
)
async def get_deposits_for_operator(operator_user_id: str) -> list[DcaDeposit]:
return await db.fetchall(
"""
SELECT d.*
FROM satoshimachine.dca_deposits d
JOIN satoshimachine.dca_machines m ON m.id = d.machine_id
WHERE m.operator_user_id = :uid
ORDER BY d.created_at DESC
""",
{"uid": operator_user_id},
DcaDeposit,
)
async def update_deposit(
deposit_id: str, data: UpdateDepositData
) -> DcaDeposit | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_deposit(deposit_id)
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["id"] = deposit_id
await db.execute(
f"UPDATE satoshimachine.dca_deposits SET {set_clause} WHERE id = :id",
update_data,
)
return await get_deposit(deposit_id)
async def update_deposit_status(
deposit_id: str, data: UpdateDepositStatusData
) -> DcaDeposit | None:
payload = {
"id": deposit_id,
"status": data.status,
"notes": data.notes,
"confirmed_at": datetime.now() if data.status == "confirmed" else None,
}
await db.execute(
"""
UPDATE satoshimachine.dca_deposits
SET status = :status,
notes = COALESCE(:notes, notes),
confirmed_at = COALESCE(:confirmed_at, confirmed_at)
WHERE id = :id
""",
payload,
)
return await get_deposit(deposit_id)
async def delete_deposit(deposit_id: str) -> None:
await db.execute(
"DELETE FROM satoshimachine.dca_deposits WHERE id = :id",
{"id": deposit_id},
)
# =============================================================================
# Settlements (bitSpire kind-21000 events)
# =============================================================================
async def create_settlement_idempotent(
data: CreateDcaSettlementData,
initial_status: str,
error_message: str | None = None,
) -> DcaSettlement | None:
"""Insert a settlement keyed by payment_hash.
Returns the inserted row on first sight; returns the existing row
if the payment_hash was already seen (subscription replay,
dispatcher double-fire). The UNIQUE constraint on payment_hash is
the source of truth.
`initial_status` is the row's status at insert time. Normal
settlements arrive as 'pending' and the distribution processor
transitions them through 'processing''processed' / 'errored'.
A row that fails the Nostr attribution cross-check (bitspire.
assert_nostr_attribution) is inserted directly as 'rejected' with
the failure reason in `error_message` — never goes near the
distribution path.
"""
existing = await get_settlement_by_payment_hash(data.payment_hash)
if existing is not None:
return existing
settlement_id = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO satoshimachine.dca_settlements
(id, machine_id, payment_hash, bitspire_event_id, bitspire_txid,
wire_sats, fiat_amount, fiat_code, exchange_rate, principal_sats,
fee_sats, platform_fee_sats, operator_fee_sats, fee_mismatch_sats,
tx_type, bills_json, cassettes_json,
status, error_message, created_at)
VALUES (:id, :machine_id, :payment_hash, :bitspire_event_id,
:bitspire_txid, :wire_sats, :fiat_amount, :fiat_code,
:exchange_rate, :principal_sats, :fee_sats,
:platform_fee_sats, :operator_fee_sats, :fee_mismatch_sats,
:tx_type, :bills_json, :cassettes_json, :status,
:error_message, :created_at)
""",
{
"id": settlement_id,
"machine_id": data.machine_id,
"payment_hash": data.payment_hash,
"bitspire_event_id": data.bitspire_event_id,
"bitspire_txid": data.bitspire_txid,
"wire_sats": data.wire_sats,
"fiat_amount": data.fiat_amount,
"fiat_code": data.fiat_code,
"exchange_rate": data.exchange_rate,
"principal_sats": data.principal_sats,
"fee_sats": data.fee_sats,
"platform_fee_sats": data.platform_fee_sats,
"operator_fee_sats": data.operator_fee_sats,
"fee_mismatch_sats": data.fee_mismatch_sats,
"tx_type": data.tx_type,
"bills_json": data.bills_json,
"cassettes_json": data.cassettes_json,
"status": initial_status,
"error_message": error_message,
"created_at": datetime.now(),
},
)
return await get_settlement(settlement_id)
async def get_settlement(settlement_id: str) -> DcaSettlement | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_settlements WHERE id = :id",
{"id": settlement_id},
DcaSettlement,
)
async def get_settlement_by_payment_hash(
payment_hash: str,
) -> DcaSettlement | None:
return await db.fetchone(
"""
SELECT * FROM satoshimachine.dca_settlements
WHERE payment_hash = :hash
""",
{"hash": payment_hash},
DcaSettlement,
)
async def get_settlements_for_machine(
machine_id: str, limit: int = 100
) -> list[DcaSettlement]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_settlements
WHERE machine_id = :machine_id
ORDER BY created_at DESC
LIMIT :lim
""",
{"machine_id": machine_id, "lim": limit},
DcaSettlement,
)
async def get_stuck_settlements_for_operator(
operator_user_id: str, threshold_minutes: int = 30
) -> dict:
"""Operator worklist of settlements that didn't process cleanly.
Returns a dict with four keyed lists:
- 'rejected': any status='rejected' (Nostr attribution cross-check
failed — signer didn't match the machine identity). Distinct
from 'errored' because retry is wrong: the row was misrouted,
not operationally failed. Operator must investigate the machine.
- 'errored': any status='errored' (distribution failed for an
operational reason — wallet error, network, downstream payment).
Operator retries from this bucket.
- 'stuck_pending': status='pending' AND older than threshold
(listener crashed before invoking process_settlement).
- 'stuck_processing': status='processing' AND older than threshold
(processor crashed mid-flight; processing_claim is set but no
completion landed).
"""
from datetime import timedelta
threshold_at = datetime.now() - timedelta(minutes=threshold_minutes)
rejected = await db.fetchall(
"""
SELECT s.*
FROM satoshimachine.dca_settlements s
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
WHERE m.operator_user_id = :uid AND s.status = 'rejected'
ORDER BY s.created_at DESC
""",
{"uid": operator_user_id},
DcaSettlement,
)
errored = await db.fetchall(
"""
SELECT s.*
FROM satoshimachine.dca_settlements s
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
WHERE m.operator_user_id = :uid AND s.status = 'errored'
ORDER BY s.created_at DESC
""",
{"uid": operator_user_id},
DcaSettlement,
)
stuck_pending = await db.fetchall(
"""
SELECT s.*
FROM satoshimachine.dca_settlements s
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
WHERE m.operator_user_id = :uid
AND s.status = 'pending'
AND s.created_at < :threshold
ORDER BY s.created_at ASC
""",
{"uid": operator_user_id, "threshold": threshold_at},
DcaSettlement,
)
stuck_processing = await db.fetchall(
"""
SELECT s.*
FROM satoshimachine.dca_settlements s
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
WHERE m.operator_user_id = :uid
AND s.status = 'processing'
AND s.created_at < :threshold
ORDER BY s.created_at ASC
""",
{"uid": operator_user_id, "threshold": threshold_at},
DcaSettlement,
)
return {
"rejected": rejected,
"errored": errored,
"stuck_pending": stuck_pending,
"stuck_processing": stuck_processing,
}
async def force_reset_stuck_settlement(
settlement_id: str,
) -> DcaSettlement | None:
"""Operator escape hatch for genuinely stuck settlements (processor
crashed mid-flight, etc.). Flips 'pending'/'processing''errored' so
the existing retry endpoint can take over. Clears processing_claim.
Caller is responsible for verifying the settlement is *actually* stuck
(e.g., via threshold check on created_at). This function trusts the
decision."""
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET status = 'errored',
processing_claim = NULL,
error_message = 'force-reset by operator (was stuck)'
WHERE id = :id AND status IN ('pending', 'processing')
""",
{"id": settlement_id},
)
return await get_settlement(settlement_id)
async def get_settlements_for_operator(
operator_user_id: str, limit: int = 200
) -> list[DcaSettlement]:
return await db.fetchall(
"""
SELECT s.*
FROM satoshimachine.dca_settlements s
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
WHERE m.operator_user_id = :uid
ORDER BY s.created_at DESC
LIMIT :lim
""",
{"uid": operator_user_id, "lim": limit},
DcaSettlement,
)
async def mark_settlement_status(
settlement_id: str,
status: str,
error_message: str | None = None,
) -> DcaSettlement | None:
"""Status: 'pending' | 'processing' | 'processed' | 'partial' |
'refunded' | 'errored'. Clears processing_claim on terminal states so a
fresh claim attempt won't see a stale token."""
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET status = :status,
error_message = :err,
processed_at = CASE
WHEN :status IN ('processed', 'partial', 'refunded')
THEN :now ELSE processed_at
END,
processing_claim = CASE
WHEN :status = 'processing' THEN processing_claim
ELSE NULL
END
WHERE id = :id
""",
{
"id": settlement_id,
"status": status,
"err": error_message,
"now": datetime.now(),
},
)
return await get_settlement(settlement_id)
async def claim_settlement_for_processing(
settlement_id: str,
) -> DcaSettlement | None:
"""Optimistic-lock claim: atomically flip a settlement to 'processing'
and tag it with a per-invocation token. Returns the claimed row on
success; None if another caller already won the claim or the settlement
is not in a claimable state ('pending').
Pattern is portable across SQLite + PostgreSQL (doesn't rely on
UPDATE ... RETURNING). Two concurrent invocations may both run the
UPDATE, but only one row matches the WHERE clause; the loser's UPDATE
is a no-op against status='processing'. The read-back check on the
token disambiguates."""
token = urlsafe_short_hash()
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET status = 'processing', processing_claim = :token
WHERE id = :id AND status = 'pending'
""",
{"id": settlement_id, "token": token},
)
after = await get_settlement(settlement_id)
if after is None:
return None
if after.processing_claim != token:
return None
return after
async def reset_settlement_for_retry(
settlement_id: str,
) -> DcaSettlement | None:
"""Operator retry path. Flips 'errored''pending' and voids any
'failed' legs so process_settlement re-runs them fresh. Completed legs
are left in place — we never re-pay sats that already moved."""
await db.execute(
"""
UPDATE satoshimachine.dca_payments
SET status = 'voided'
WHERE settlement_id = :sid AND status = 'failed'
""",
{"sid": settlement_id},
)
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET status = 'pending',
error_message = NULL,
processing_claim = NULL,
processed_at = NULL
WHERE id = :id AND status = 'errored'
""",
{"id": settlement_id},
)
return await get_settlement(settlement_id)
async def apply_partial_dispense(
settlement_id: str,
*,
new_wire_sats: int,
new_principal_sats: int,
new_fee_sats: int,
new_platform_fee_sats: int,
new_operator_fee_sats: int,
new_fiat_amount: float,
appended_note: str,
) -> DcaSettlement | None:
"""Overwrite the monetary fields on a settlement (partial-dispense
recompute) and prepend `appended_note` to the notes column.
Notes are append-only: new lines go at the top (newest first) so the
settlement detail view shows the most recent adjustment first without
needing to scroll. Resets status to 'pending' so process_settlement
can re-distribute via the existing idempotent path."""
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET wire_sats = :gross,
principal_sats = :principal,
fee_sats = :commission,
platform_fee_sats = :platform,
operator_fee_sats = :operator,
fiat_amount = :fiat,
status = 'pending',
error_message = NULL,
processed_at = NULL,
notes = CASE
WHEN notes IS NULL OR notes = '' THEN :note
ELSE :note || char(10) || char(10) || notes
END
WHERE id = :id
""",
{
"id": settlement_id,
"gross": new_wire_sats,
"principal": new_principal_sats,
"commission": new_fee_sats,
"platform": new_platform_fee_sats,
"operator": new_operator_fee_sats,
"fiat": new_fiat_amount,
"note": appended_note,
},
)
return await get_settlement(settlement_id)
async def count_completed_legs_for_settlement(settlement_id: str) -> int:
"""Used by partial-dispense to refuse adjustments after any leg has
successfully moved sats (Lightning payments can't be clawed back)."""
row = await db.fetchone(
"""
SELECT COUNT(*) AS n FROM satoshimachine.dca_payments
WHERE settlement_id = :sid AND status = 'completed'
""",
{"sid": settlement_id},
)
return int(row["n"]) if row else 0
async def append_settlement_note(
settlement_id: str, note: str, author_user_id: str
) -> DcaSettlement | None:
"""Prepend an operator-authored note to settlement.notes. Each entry is
timestamped (UTC) and tagged with the author's user id so the trail
is accountable. Append-only: existing entries are never edited."""
from datetime import timezone
ts = datetime.now(timezone.utc).isoformat(timespec="seconds")
formatted = f"[{ts} by {author_user_id}] {note}"
await db.execute(
"""
UPDATE satoshimachine.dca_settlements
SET notes = CASE
WHEN notes IS NULL OR notes = '' THEN :note
ELSE :note || char(10) || char(10) || notes
END
WHERE id = :id
""",
{"id": settlement_id, "note": formatted},
)
return await get_settlement(settlement_id)
async def void_open_legs_for_settlement(settlement_id: str) -> None:
"""Marks open legs as 'voided' before re-running distribution on a
partial-dispense recompute. Preserves the rows for audit but stops
them from being interpreted as live. Includes 'skipped' so that audit
rows from a prior attempt don't double-count once the new attempt
writes its own (possibly different) skipped reasons."""
await db.execute(
"""
UPDATE satoshimachine.dca_payments
SET status = 'voided'
WHERE settlement_id = :sid
AND status IN ('pending', 'failed', 'skipped')
""",
{"sid": settlement_id},
)
# =============================================================================
# Commission splits — operator's remainder-distribution rules.
# =============================================================================
async def get_commission_splits(
operator_user_id: str, machine_id: str | None = None
) -> list[CommissionSplit]:
"""Returns the rule set for the given scope.
Precedence (caller's responsibility): try per-machine override first;
if empty, fall back to operator default (machine_id IS NULL).
"""
if machine_id is None:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_commission_splits
WHERE operator_user_id = :uid AND machine_id IS NULL
ORDER BY sort_order ASC
""",
{"uid": operator_user_id},
CommissionSplit,
)
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_commission_splits
WHERE operator_user_id = :uid AND machine_id = :mid
ORDER BY sort_order ASC
""",
{"uid": operator_user_id, "mid": machine_id},
CommissionSplit,
)
async def get_effective_commission_splits(
operator_user_id: str, machine_id: str
) -> list[CommissionSplit]:
"""Per-machine override if set, otherwise operator's default ruleset."""
overrides = await get_commission_splits(operator_user_id, machine_id)
if overrides:
return overrides
return await get_commission_splits(operator_user_id, None)
async def replace_commission_splits(
operator_user_id: str,
machine_id: str | None,
legs: list[CommissionSplitLeg],
) -> list[CommissionSplit]:
"""Atomic replace for the (operator, machine) scope. Caller should have
already validated legs sum to 1.0 via the Pydantic model."""
if machine_id is None:
await db.execute(
"""
DELETE FROM satoshimachine.dca_commission_splits
WHERE operator_user_id = :uid AND machine_id IS NULL
""",
{"uid": operator_user_id},
)
else:
await db.execute(
"""
DELETE FROM satoshimachine.dca_commission_splits
WHERE operator_user_id = :uid AND machine_id = :mid
""",
{"uid": operator_user_id, "mid": machine_id},
)
now = datetime.now()
for leg in legs:
await db.execute(
"""
INSERT INTO satoshimachine.dca_commission_splits
(id, machine_id, operator_user_id, target, label, fraction,
sort_order, created_at)
VALUES (:id, :machine_id, :uid, :target, :label, :fraction,
:sort_order, :created_at)
""",
{
"id": urlsafe_short_hash(),
"machine_id": machine_id,
"uid": operator_user_id,
"target": leg.target,
"label": leg.label,
"fraction": leg.fraction,
"sort_order": leg.sort_order,
"created_at": now,
},
)
return await get_commission_splits(operator_user_id, machine_id)
# =============================================================================
# Payments — distribution legs.
# =============================================================================
async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment:
payment_id = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO satoshimachine.dca_payments
(id, settlement_id, client_id, machine_id, operator_user_id,
leg_type, destination_wallet_id, destination_ln_address,
amount_sats, amount_fiat, exchange_rate, transaction_time,
external_payment_hash, status, created_at)
VALUES (:id, :settlement_id, :client_id, :machine_id,
:operator_user_id, :leg_type, :destination_wallet_id,
:destination_ln_address, :amount_sats, :amount_fiat,
:exchange_rate, :transaction_time, :external_payment_hash,
:status, :created_at)
""",
{
"id": payment_id,
"settlement_id": data.settlement_id,
"client_id": data.client_id,
"machine_id": data.machine_id,
"operator_user_id": data.operator_user_id,
"leg_type": data.leg_type,
"destination_wallet_id": data.destination_wallet_id,
"destination_ln_address": data.destination_ln_address,
"amount_sats": data.amount_sats,
"amount_fiat": data.amount_fiat,
"exchange_rate": data.exchange_rate,
"transaction_time": data.transaction_time,
"external_payment_hash": data.external_payment_hash,
"status": "pending",
"created_at": datetime.now(),
},
)
payment = await get_dca_payment(payment_id)
assert payment is not None
return payment
async def get_dca_payment(payment_id: str) -> DcaPayment | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_payments WHERE id = :id",
{"id": payment_id},
DcaPayment,
)
async def get_payments_for_settlement(settlement_id: str) -> list[DcaPayment]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_payments
WHERE settlement_id = :sid
ORDER BY created_at ASC
""",
{"sid": settlement_id},
DcaPayment,
)
async def get_payments_for_client(client_id: str) -> list[DcaPayment]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_payments
WHERE client_id = :cid
ORDER BY created_at DESC
""",
{"cid": client_id},
DcaPayment,
)
async def get_payments_for_operator(
operator_user_id: str, leg_type: str | None = None, limit: int = 200
) -> list[DcaPayment]:
if leg_type is None:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_payments
WHERE operator_user_id = :uid
ORDER BY created_at DESC
LIMIT :lim
""",
{"uid": operator_user_id, "lim": limit},
DcaPayment,
)
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_payments
WHERE operator_user_id = :uid AND leg_type = :leg
ORDER BY created_at DESC
LIMIT :lim
""",
{"uid": operator_user_id, "leg": leg_type, "lim": limit},
DcaPayment,
)
async def update_payment_status(
payment_id: str,
status: str,
external_payment_hash: str | None = None,
error_message: str | None = None,
) -> DcaPayment | None:
await db.execute(
"""
UPDATE satoshimachine.dca_payments
SET status = :status,
external_payment_hash = COALESCE(:hash, external_payment_hash),
error_message = :err
WHERE id = :id
""",
{
"id": payment_id,
"status": status,
"hash": external_payment_hash,
"err": error_message,
},
)
return await get_dca_payment(payment_id)
# =============================================================================
# Balance summaries
# =============================================================================
async def get_client_balance_summary(
client_id: str,
) -> ClientBalanceSummary | None:
"""Per-client (and per-machine, since clients are per-machine in v2) summary.
DCA legs only — settlement/autoforward/super_fee/operator_split legs are
not credited against an LP's balance.
"""
client = await get_dca_client(client_id)
if client is None:
return None
deposits_row = await db.fetchone(
"""
SELECT COALESCE(SUM(amount), 0) AS total
FROM satoshimachine.dca_deposits
WHERE client_id = :cid AND status = 'confirmed'
""",
{"cid": client_id},
)
# Both DCA legs (auto, from bitSpire settlements) and balance-settle legs
# (operator-initiated under #4) reduce the LP's remaining fiat balance.
payments_row = await db.fetchone(
"""
SELECT COALESCE(SUM(amount_fiat), 0) AS total
FROM satoshimachine.dca_payments
WHERE client_id = :cid
AND leg_type IN ('dca', 'settlement')
AND status = 'completed'
""",
{"cid": client_id},
)
total_deposits = float(deposits_row["total"]) if deposits_row else 0.0
total_payments = float(payments_row["total"]) if payments_row else 0.0
# fiat code: take it from the machine (clients inherit their machine's fiat)
machine = await get_machine(client.machine_id)
currency = machine.fiat_code if machine else "GTQ"
return ClientBalanceSummary(
client_id=client_id,
machine_id=client.machine_id,
total_deposits=round(total_deposits, 2),
total_payments=round(total_payments, 2),
remaining_balance=round(total_deposits - total_payments, 2),
currency=currency,
)
# =============================================================================
# Telemetry — sparse beacon (kind-30078) and fleet snapshot (kind-30079) state.
# =============================================================================
async def get_telemetry(machine_id: str) -> TelemetrySnapshot | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_telemetry WHERE machine_id = :mid",
{"mid": machine_id},
TelemetrySnapshot,
)
async def upsert_beacon_snapshot(
machine_id: str,
*,
cash_in: bool | None = None,
cash_out: bool | None = None,
cash_level: str | None = None,
fiat: str | None = None,
model: str | None = None,
name: str | None = None,
location: str | None = None,
geo: str | None = None,
fees_json: str | None = None,
limits_json: str | None = None,
denominations_json: str | None = None,
version: str | None = None,
) -> TelemetrySnapshot | None:
"""Upsert kind-30078 beacon fields. All fields are nullable because today's
upstream payload only carries cash_in/cash_out/cash_level/fiat/model (see
lamassu-next#43 — the enrichment is not yet shipped)."""
existing = await get_telemetry(machine_id)
now = datetime.now()
if existing is None:
await db.execute(
"""
INSERT INTO satoshimachine.dca_telemetry
(machine_id, beacon_cash_in, beacon_cash_out, beacon_cash_level,
beacon_fiat, beacon_model, beacon_name, beacon_location,
beacon_geo, beacon_fees_json, beacon_limits_json,
beacon_denominations_json, beacon_version, beacon_received_at)
VALUES (:mid, :cash_in, :cash_out, :cash_level, :fiat, :model,
:name, :location, :geo, :fees, :limits, :denoms,
:version, :now)
""",
{
"mid": machine_id,
"cash_in": cash_in,
"cash_out": cash_out,
"cash_level": cash_level,
"fiat": fiat,
"model": model,
"name": name,
"location": location,
"geo": geo,
"fees": fees_json,
"limits": limits_json,
"denoms": denominations_json,
"version": version,
"now": now,
},
)
else:
await db.execute(
"""
UPDATE satoshimachine.dca_telemetry SET
beacon_cash_in = COALESCE(:cash_in, beacon_cash_in),
beacon_cash_out = COALESCE(:cash_out, beacon_cash_out),
beacon_cash_level = COALESCE(:cash_level, beacon_cash_level),
beacon_fiat = COALESCE(:fiat, beacon_fiat),
beacon_model = COALESCE(:model, beacon_model),
beacon_name = COALESCE(:name, beacon_name),
beacon_location = COALESCE(:location, beacon_location),
beacon_geo = COALESCE(:geo, beacon_geo),
beacon_fees_json = COALESCE(:fees, beacon_fees_json),
beacon_limits_json = COALESCE(:limits, beacon_limits_json),
beacon_denominations_json =
COALESCE(:denoms, beacon_denominations_json),
beacon_version = COALESCE(:version, beacon_version),
beacon_received_at = :now
WHERE machine_id = :mid
""",
{
"mid": machine_id,
"cash_in": cash_in,
"cash_out": cash_out,
"cash_level": cash_level,
"fiat": fiat,
"model": model,
"name": name,
"location": location,
"geo": geo,
"fees": fees_json,
"limits": limits_json,
"denoms": denominations_json,
"version": version,
"now": now,
},
)
return await get_telemetry(machine_id)
async def upsert_fleet_snapshot(
machine_id: str, telemetry_json: str
) -> TelemetrySnapshot | None:
"""Upsert kind-30079 operator-only telemetry. Awaits lamassu-next#42 to
produce a real schema; we store the raw JSON blob until then."""
existing = await get_telemetry(machine_id)
now = datetime.now()
if existing is None:
await db.execute(
"""
INSERT INTO satoshimachine.dca_telemetry
(machine_id, telemetry_json, telemetry_received_at)
VALUES (:mid, :json, :now)
""",
{"mid": machine_id, "json": telemetry_json, "now": now},
)
else:
await db.execute(
"""
UPDATE satoshimachine.dca_telemetry
SET telemetry_json = :json, telemetry_received_at = :now
WHERE machine_id = :mid
""",
{"mid": machine_id, "json": telemetry_json, "now": now},
)
return await get_telemetry(machine_id)
# =============================================================================
# Cassette configs — operator-driven ATM cassette inventory (#29 v1.1).
# =============================================================================
# Row lifecycle per #29:
# - First population for a (machine_id, position) pair → apply_bootstrap_state
# (consumer reading the ATM's one-shot bitspire-cassettes-state event)
# - Operator edit of denomination or count → update_cassette_config
# (refuses to create new rows; the slot count is hardware-determined)
# - Row creation/deletion for a new position → admin only, via ATM
# re-provisioning + new bootstrap event (not exposed in v1 here)
def _should_apply_bootstrap_state(
existing_state_event_id: str | None, incoming_event_id: str
) -> bool:
"""Pure-function dedup gate for apply_bootstrap_state.
Returns False if any existing row for this machine already references
the incoming event_id (relay re-delivery after restart). True otherwise.
Extracted as a pure function so the dedup decision is unit-testable
without a database round-trip. The actual idempotency check in
apply_bootstrap_state fetches one existing row and passes its
state_event_id here.
"""
return existing_state_event_id != incoming_event_id
async def get_cassette_config(
machine_id: str, position: int
) -> CassetteConfig | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.cassette_configs "
"WHERE machine_id = :mid AND position = :pos",
{"mid": machine_id, "pos": position},
CassetteConfig,
)
async def list_cassette_configs_for_machine(
machine_id: str,
) -> list[CassetteConfig]:
return await db.fetchall(
"SELECT * FROM satoshimachine.cassette_configs "
"WHERE machine_id = :mid ORDER BY position",
{"mid": machine_id},
CassetteConfig,
)
async def update_cassette_config(
machine_id: str,
position: int,
data: UpsertCassetteConfigData,
*,
updated_by: str | None = None,
) -> CassetteConfig | None:
"""Operator-driven row update: change denomination and/or count for a
single cassette slot. Refuses to create new rows — those only land via
apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row
lifecycle: hardware-determined slot count, not operator-creatable).
Returns None if the (machine_id, position) row doesn't exist.
"""
existing = await get_cassette_config(machine_id, position)
if existing is None:
return None
update_data: dict = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return existing
update_data["updated_at"] = datetime.now()
update_data["updated_by"] = updated_by
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["mid"] = machine_id
update_data["pos"] = position
await db.execute(
f"UPDATE satoshimachine.cassette_configs SET {set_clause} "
"WHERE machine_id = :mid AND position = :pos",
update_data,
)
return await get_cassette_config(machine_id, position)
async def apply_bootstrap_state(
machine_id: str,
event_id: str,
event_created_at: datetime,
payload: PublishCassettesPayload,
) -> bool:
"""Consume an ATM-published kind-30078 bitspire-cassettes-state:<m> event
and upsert one cassette_configs row per position in the payload.
Returns True if the upsert ran; False if any existing row for this
machine already references this event_id (idempotent on relay
re-delivery / restart).
Populates both the operator-believed columns (denomination, count,
updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel
columns (state_denomination, state_count, state_at, state_event_id)
so the operator's initial view matches the ATM's reported state. v2
reconciliation UI will diverge them when continuous reverse-channel
events land + the operator subsequently edits.
"""
existing_first: dict | None = await db.fetchone(
"SELECT state_event_id FROM satoshimachine.cassette_configs "
"WHERE machine_id = :mid LIMIT 1",
{"mid": machine_id},
)
existing_event_id: str | None = None
if existing_first is not None:
existing_event_id = (
existing_first.get("state_event_id")
if isinstance(existing_first, dict)
else getattr(existing_first, "state_event_id", None)
)
if not _should_apply_bootstrap_state(existing_event_id, event_id):
return False
now = datetime.now()
for pos, row in payload.positions.items():
await db.execute(
"""
INSERT INTO satoshimachine.cassette_configs
(machine_id, position, denomination, count, updated_at,
updated_by, state_denomination, state_count, state_at,
state_event_id)
VALUES (:mid, :pos, :denom, :count, :now, :by,
:state_denom, :state_count, :state_at, :event_id)
ON CONFLICT (machine_id, position) DO UPDATE SET
denomination = excluded.denomination,
count = excluded.count,
updated_at = excluded.updated_at,
updated_by = excluded.updated_by,
state_denomination = excluded.state_denomination,
state_count = excluded.state_count,
state_at = excluded.state_at,
state_event_id = excluded.state_event_id
""",
{
"mid": machine_id,
"pos": pos,
"denom": row.denomination,
"count": row.count,
"now": now,
"by": "atm-bootstrap",
"state_denom": row.denomination,
"state_count": row.count,
"state_at": event_created_at,
"event_id": event_id,
},
)
return True