Phase-1 observability per coord-log §2026-06-01T07:00Z (option A locked: always record, no enforce_fee_match gate): fee_mismatch_sats = bitspire_fee_sats - (platform_fee_sats + operator_fee_sats) Positive = bitspire over-reported; negative = under-reported; zero = exact match. Recorded unconditionally on every settlement; WARN- logged via loguru only when |delta| > tolerance, where tolerance = max(1, int(principal_sats * 0.001)) — 1-sat floor with 0.1% relative ceiling. bitspire.py:parse_settlement: - Computes the delta after split_principal_based returns. - WARN log line carries bitspire_fee_sats / expected / delta / tolerance / principal / both fractions / tx_type / machine-npub prefix for triage queries. - Always stamps fee_mismatch_sats onto CreateDcaSettlementData. - Comment explains the pre-Layer-3 expectation: large deltas are expected while the ATM hardcodes 7.77% cash-out (aiolabs/lamassu- next#57); the data here will quiet once Layer 3 ships. crud.py:create_settlement_idempotent: extends the INSERT to persist the new column. Tests: - tests/conftest.py: `loguru_capture` fixture — loguru routes to a pre-bound stderr sink that pytest's caplog (stdlib only) misses and capsys can't see; the fixture adds a list-sink for the test's duration. Reusable for future log-behavior tests. - tests/test_fee_mismatch_recording.py: 8 cases covering exact-match zero delta, bitspire over- and under-reporting, the pre-Layer-3 large-delta scenario, within-tolerance silence, over-tolerance warning, diagnostic-fields presence in the WARN line, and the 1-sat floor on tiny-principal settlements. 164/164 tests green. Phase 2 (reject on out-of-tolerance) lands as a follow-up once observability data justifies the tighter posture. Refs: aiolabs/satmachineadmin#38 (Layer 1), coord-log §2026-06-01T07:00Z (lnbits advisory + option A lock). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1537 lines
52 KiB
Python
1537 lines
52 KiB
Python
# Satoshi Machine v2 — CRUD layer over the m005 schema.
|
|
#
|
|
# All operator-scoped queries take an operator_user_id and enforce isolation
|
|
# at the SQL boundary. Cross-operator LP queries (for satmachineclient) join
|
|
# through dca_machines.operator_user_id. See plan section "Identity & multi-
|
|
# machine model".
|
|
|
|
from datetime import datetime
|
|
|
|
from lnbits.db import Database
|
|
from lnbits.helpers import urlsafe_short_hash
|
|
|
|
from .models import (
|
|
CassetteConfig,
|
|
ClientBalanceSummary,
|
|
CommissionSplit,
|
|
CommissionSplitLeg,
|
|
CreateDcaClientData,
|
|
CreateDcaPaymentData,
|
|
CreateDcaSettlementData,
|
|
CreateDepositData,
|
|
CreateMachineData,
|
|
DcaClient,
|
|
DcaDeposit,
|
|
DcaLpPreferences,
|
|
DcaPayment,
|
|
DcaSettlement,
|
|
Machine,
|
|
PublishCassettesPayload,
|
|
SuperConfig,
|
|
TelemetrySnapshot,
|
|
UpdateDcaClientData,
|
|
UpdateDepositData,
|
|
UpdateDepositStatusData,
|
|
UpdateMachineData,
|
|
UpdateSuperConfigData,
|
|
UpsertCassetteConfigData,
|
|
UpsertDcaLpData,
|
|
)
|
|
|
|
db = Database("ext_satoshimachine")
|
|
|
|
|
|
# =============================================================================
|
|
# Super config
|
|
# =============================================================================
|
|
|
|
|
|
async def get_super_config() -> SuperConfig | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.super_config WHERE id = :id",
|
|
{"id": "default"},
|
|
SuperConfig,
|
|
)
|
|
|
|
|
|
async def update_super_config(data: UpdateSuperConfigData) -> SuperConfig | None:
|
|
update_data = {k: v for k, v in data.dict().items() if v is not None}
|
|
if not update_data:
|
|
return await get_super_config()
|
|
update_data["updated_at"] = datetime.now()
|
|
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
|
|
update_data["id"] = "default"
|
|
await db.execute(
|
|
f"UPDATE satoshimachine.super_config SET {set_clause} WHERE id = :id",
|
|
update_data,
|
|
)
|
|
return await get_super_config()
|
|
|
|
|
|
# =============================================================================
|
|
# Machines
|
|
# =============================================================================
|
|
|
|
|
|
async def create_machine(operator_user_id: str, data: CreateMachineData) -> Machine:
|
|
machine_id = urlsafe_short_hash()
|
|
now = datetime.now()
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_machines
|
|
(id, operator_user_id, machine_npub, wallet_id, name, location,
|
|
fiat_code, is_active,
|
|
operator_cash_in_fee_fraction, operator_cash_out_fee_fraction,
|
|
created_at, updated_at)
|
|
VALUES (:id, :operator_user_id, :machine_npub, :wallet_id, :name,
|
|
:location, :fiat_code, :is_active,
|
|
:operator_cash_in_fee_fraction, :operator_cash_out_fee_fraction,
|
|
:created_at, :updated_at)
|
|
""",
|
|
{
|
|
"id": machine_id,
|
|
"operator_user_id": operator_user_id,
|
|
"machine_npub": data.machine_npub,
|
|
"wallet_id": data.wallet_id,
|
|
"name": data.name,
|
|
"location": data.location,
|
|
"fiat_code": data.fiat_code,
|
|
"is_active": True,
|
|
"operator_cash_in_fee_fraction": data.operator_cash_in_fee_fraction,
|
|
"operator_cash_out_fee_fraction": data.operator_cash_out_fee_fraction,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
},
|
|
)
|
|
machine = await get_machine(machine_id)
|
|
assert machine is not None
|
|
return machine
|
|
|
|
|
|
async def get_machine(machine_id: str) -> Machine | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.dca_machines WHERE id = :id",
|
|
{"id": machine_id},
|
|
Machine,
|
|
)
|
|
|
|
|
|
async def get_machine_by_npub(machine_npub: str) -> Machine | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.dca_machines WHERE machine_npub = :npub",
|
|
{"npub": machine_npub},
|
|
Machine,
|
|
)
|
|
|
|
|
|
async def get_active_machine_by_wallet_id(wallet_id: str) -> Machine | None:
|
|
"""Used by the invoice listener to route an incoming payment to a machine."""
|
|
return await db.fetchone(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_machines
|
|
WHERE wallet_id = :wid AND is_active = true
|
|
LIMIT 1
|
|
""",
|
|
{"wid": wallet_id},
|
|
Machine,
|
|
)
|
|
|
|
|
|
async def get_machines_for_operator(operator_user_id: str) -> list[Machine]:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_machines
|
|
WHERE operator_user_id = :uid
|
|
ORDER BY created_at DESC
|
|
""",
|
|
{"uid": operator_user_id},
|
|
Machine,
|
|
)
|
|
|
|
|
|
async def list_all_active_machines() -> list[Machine]:
|
|
"""Used by the cassette bootstrap consumer task to build a single
|
|
cross-operator subscription filter. Each event's pubkey routes to
|
|
the right operator via get_machine_by_atm_pubkey_hex + the machine's
|
|
operator_user_id.
|
|
"""
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_machines
|
|
WHERE is_active = true
|
|
ORDER BY created_at DESC
|
|
""",
|
|
{},
|
|
Machine,
|
|
)
|
|
|
|
|
|
async def get_machine_by_atm_pubkey_hex(atm_pubkey_hex: str) -> Machine | None:
|
|
"""Look up an active machine by its ATM pubkey, accepting hex or bech32
|
|
in machine_npub. Used by the cassette bootstrap consumer to route an
|
|
incoming state event to the right machine row (and therefore operator
|
|
privkey for decryption).
|
|
|
|
O(N) over active machines — fine for small fleets. If fleet sizes
|
|
grow, normalise machine_npub-at-write to hex and add an index.
|
|
"""
|
|
from lnbits.utils.nostr import normalize_public_key
|
|
|
|
target = atm_pubkey_hex.lower()
|
|
machines = await list_all_active_machines()
|
|
for m in machines:
|
|
try:
|
|
if normalize_public_key(m.machine_npub).lower() == target:
|
|
return m
|
|
except (ValueError, AssertionError):
|
|
continue
|
|
return None
|
|
|
|
|
|
async def update_machine(machine_id: str, data: UpdateMachineData) -> Machine | None:
|
|
update_data = {k: v for k, v in data.dict().items() if v is not None}
|
|
if not update_data:
|
|
return await get_machine(machine_id)
|
|
update_data["updated_at"] = datetime.now()
|
|
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
|
|
update_data["id"] = machine_id
|
|
await db.execute(
|
|
f"UPDATE satoshimachine.dca_machines SET {set_clause} WHERE id = :id",
|
|
update_data,
|
|
)
|
|
return await get_machine(machine_id)
|
|
|
|
|
|
async def delete_machine(machine_id: str) -> None:
|
|
await db.execute(
|
|
"DELETE FROM satoshimachine.dca_machines WHERE id = :id",
|
|
{"id": machine_id},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# DCA Clients (LPs)
|
|
# =============================================================================
|
|
|
|
|
|
async def create_dca_client(data: CreateDcaClientData) -> DcaClient:
|
|
"""Operator enrols an LP at one of their machines.
|
|
|
|
Pure (machine, LP) record. Wallet / mode / autoforward live on
|
|
dca_lp (per-user) — populated by the LP via satmachineclient.
|
|
Enrolment doesn't require the LP to be onboarded yet, but deposits
|
|
do (see `create_deposit`).
|
|
"""
|
|
client_id = urlsafe_short_hash()
|
|
now = datetime.now()
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_clients
|
|
(id, machine_id, user_id, username, status, created_at, updated_at)
|
|
VALUES (:id, :machine_id, :user_id, :username, :status,
|
|
:created_at, :updated_at)
|
|
""",
|
|
{
|
|
"id": client_id,
|
|
"machine_id": data.machine_id,
|
|
"user_id": data.user_id,
|
|
"username": data.username,
|
|
"status": "active",
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
},
|
|
)
|
|
client = await get_dca_client(client_id)
|
|
assert client is not None
|
|
return client
|
|
|
|
|
|
# Shared SELECT fragment: client columns plus the LP-onboarded flag
|
|
# computed via LEFT JOIN on dca_lp. Returned as `lp_onboarded` (boolean
|
|
# 0/1 in SQLite, which Pydantic coerces to bool on the DcaClient model).
|
|
_CLIENT_SELECT = """
|
|
c.id, c.machine_id, c.user_id, c.username, c.status,
|
|
c.created_at, c.updated_at,
|
|
(lp.user_id IS NOT NULL) AS lp_onboarded
|
|
"""
|
|
_CLIENT_FROM = (
|
|
"satoshimachine.dca_clients c "
|
|
"LEFT JOIN satoshimachine.dca_lp lp ON lp.user_id = c.user_id"
|
|
)
|
|
|
|
|
|
async def get_dca_client(client_id: str) -> DcaClient | None:
|
|
return await db.fetchone(
|
|
f"SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM} WHERE c.id = :id",
|
|
{"id": client_id},
|
|
DcaClient,
|
|
)
|
|
|
|
|
|
async def get_dca_client_for_machine_user(
|
|
machine_id: str, user_id: str
|
|
) -> DcaClient | None:
|
|
return await db.fetchone(
|
|
f"""
|
|
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
|
|
WHERE c.machine_id = :machine_id AND c.user_id = :user_id
|
|
""",
|
|
{"machine_id": machine_id, "user_id": user_id},
|
|
DcaClient,
|
|
)
|
|
|
|
|
|
async def get_dca_clients_for_machine(machine_id: str) -> list[DcaClient]:
|
|
return await db.fetchall(
|
|
f"""
|
|
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
|
|
WHERE c.machine_id = :machine_id
|
|
ORDER BY c.created_at DESC
|
|
""",
|
|
{"machine_id": machine_id},
|
|
DcaClient,
|
|
)
|
|
|
|
|
|
async def get_dca_clients_for_operator(operator_user_id: str) -> list[DcaClient]:
|
|
"""All clients across every machine this operator owns."""
|
|
return await db.fetchall(
|
|
f"""
|
|
SELECT {_CLIENT_SELECT}
|
|
FROM {_CLIENT_FROM}
|
|
JOIN satoshimachine.dca_machines m ON m.id = c.machine_id
|
|
WHERE m.operator_user_id = :uid
|
|
ORDER BY c.created_at DESC
|
|
""",
|
|
{"uid": operator_user_id},
|
|
DcaClient,
|
|
)
|
|
|
|
|
|
async def get_dca_clients_for_user(user_id: str) -> list[DcaClient]:
|
|
"""LP cross-operator view — every machine this LP is registered at."""
|
|
return await db.fetchall(
|
|
f"""
|
|
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
|
|
WHERE c.user_id = :user_id
|
|
ORDER BY c.created_at DESC
|
|
""",
|
|
{"user_id": user_id},
|
|
DcaClient,
|
|
)
|
|
|
|
|
|
async def get_flow_mode_clients_for_machine(machine_id: str) -> list[DcaClient]:
|
|
"""Active LPs enrolled at this machine whose per-user `dca_lp` row
|
|
has `default_dca_mode = 'flow'`. Used by the distribution algorithm.
|
|
|
|
An LP enrolment without a matching `dca_lp` row (i.e., the LP hasn't
|
|
onboarded via satmachineclient yet) is filtered out by the INNER
|
|
JOIN — there's no destination wallet to pay to.
|
|
"""
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT c.*
|
|
FROM satoshimachine.dca_clients c
|
|
JOIN satoshimachine.dca_lp lp ON lp.user_id = c.user_id
|
|
WHERE c.machine_id = :machine_id
|
|
AND lp.default_dca_mode = 'flow'
|
|
AND c.status = 'active'
|
|
ORDER BY c.created_at ASC
|
|
""",
|
|
{"machine_id": machine_id},
|
|
DcaClient,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# DCA LP preferences (per-user) — wallet + mode + autoforward
|
|
# =============================================================================
|
|
|
|
|
|
async def get_dca_lp(user_id: str) -> DcaLpPreferences | None:
|
|
"""Return the LP's preferences row, or None if they haven't onboarded
|
|
via satmachineclient yet."""
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.dca_lp WHERE user_id = :uid",
|
|
{"uid": user_id},
|
|
DcaLpPreferences,
|
|
)
|
|
|
|
|
|
async def lp_is_onboarded(user_id: str) -> bool:
|
|
"""Cheap existence check used by the deposit-creation gate."""
|
|
row = await db.fetchone(
|
|
"SELECT user_id FROM satoshimachine.dca_lp WHERE user_id = :uid",
|
|
{"uid": user_id},
|
|
)
|
|
return row is not None
|
|
|
|
|
|
async def upsert_dca_lp(
|
|
user_id: str,
|
|
data: UpsertDcaLpData,
|
|
*,
|
|
fallback_wallet_id: str | None = None,
|
|
) -> DcaLpPreferences:
|
|
"""Create or update the LP's preferences row.
|
|
|
|
First call (no row yet): `data.dca_wallet_id` must be set OR
|
|
`fallback_wallet_id` must be provided (satmachineclient passes the
|
|
LP's default LNbits wallet here when auto-seeding on first dashboard
|
|
visit). Subsequent calls update only the fields in `data` that are
|
|
non-None.
|
|
"""
|
|
existing = await get_dca_lp(user_id)
|
|
now = datetime.now()
|
|
if existing is None:
|
|
wallet_id = data.dca_wallet_id or fallback_wallet_id
|
|
if not wallet_id:
|
|
raise ValueError(
|
|
"first upsert requires dca_wallet_id (or fallback_wallet_id)"
|
|
)
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_lp
|
|
(user_id, dca_wallet_id, default_dca_mode, fixed_mode_daily_limit,
|
|
autoforward_ln_address, autoforward_enabled,
|
|
created_at, updated_at)
|
|
VALUES (:uid, :wallet, :mode, :limit, :ln_addr, :auto,
|
|
:now, :now)
|
|
""",
|
|
{
|
|
"uid": user_id,
|
|
"wallet": wallet_id,
|
|
"mode": data.default_dca_mode or "flow",
|
|
"limit": data.fixed_mode_daily_limit,
|
|
"ln_addr": data.autoforward_ln_address,
|
|
"auto": data.autoforward_enabled or False,
|
|
"now": now,
|
|
},
|
|
)
|
|
else:
|
|
update_data: dict = {k: v for k, v in data.dict().items() if v is not None}
|
|
if not update_data:
|
|
return existing
|
|
update_data["updated_at"] = now
|
|
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
|
|
update_data["uid"] = user_id
|
|
await db.execute(
|
|
f"UPDATE satoshimachine.dca_lp SET {set_clause} WHERE user_id = :uid",
|
|
update_data,
|
|
)
|
|
refreshed = await get_dca_lp(user_id)
|
|
assert refreshed is not None
|
|
return refreshed
|
|
|
|
|
|
async def update_dca_client(
|
|
client_id: str, data: UpdateDcaClientData
|
|
) -> DcaClient | None:
|
|
update_data = {k: v for k, v in data.dict().items() if v is not None}
|
|
if not update_data:
|
|
return await get_dca_client(client_id)
|
|
update_data["updated_at"] = datetime.now()
|
|
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
|
|
update_data["id"] = client_id
|
|
await db.execute(
|
|
f"UPDATE satoshimachine.dca_clients SET {set_clause} WHERE id = :id",
|
|
update_data,
|
|
)
|
|
return await get_dca_client(client_id)
|
|
|
|
|
|
async def delete_dca_client(client_id: str) -> None:
|
|
await db.execute(
|
|
"DELETE FROM satoshimachine.dca_clients WHERE id = :id",
|
|
{"id": client_id},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Deposits
|
|
# =============================================================================
|
|
|
|
|
|
async def create_deposit(
|
|
creator_user_id: str, data: CreateDepositData, *, currency: str
|
|
) -> DcaDeposit:
|
|
"""Insert a deposit row.
|
|
|
|
`currency` is passed explicitly by the caller (the API endpoint
|
|
resolves it from the target machine's `fiat_code`) rather than
|
|
coming off the request body — the operator doesn't get to choose
|
|
it (`aiolabs/satmachineadmin#26`).
|
|
"""
|
|
deposit_id = urlsafe_short_hash()
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_deposits
|
|
(id, client_id, machine_id, creator_user_id, amount, currency,
|
|
status, notes, created_at)
|
|
VALUES (:id, :client_id, :machine_id, :creator_user_id, :amount,
|
|
:currency, :status, :notes, :created_at)
|
|
""",
|
|
{
|
|
"id": deposit_id,
|
|
"client_id": data.client_id,
|
|
"machine_id": data.machine_id,
|
|
"creator_user_id": creator_user_id,
|
|
"amount": data.amount,
|
|
"currency": currency,
|
|
"status": "pending",
|
|
"notes": data.notes,
|
|
"created_at": datetime.now(),
|
|
},
|
|
)
|
|
deposit = await get_deposit(deposit_id)
|
|
assert deposit is not None
|
|
return deposit
|
|
|
|
|
|
async def get_deposit(deposit_id: str) -> DcaDeposit | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.dca_deposits WHERE id = :id",
|
|
{"id": deposit_id},
|
|
DcaDeposit,
|
|
)
|
|
|
|
|
|
async def get_deposits_for_client(client_id: str) -> list[DcaDeposit]:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_deposits
|
|
WHERE client_id = :client_id
|
|
ORDER BY created_at DESC
|
|
""",
|
|
{"client_id": client_id},
|
|
DcaDeposit,
|
|
)
|
|
|
|
|
|
async def get_deposits_for_operator(operator_user_id: str) -> list[DcaDeposit]:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT d.*
|
|
FROM satoshimachine.dca_deposits d
|
|
JOIN satoshimachine.dca_machines m ON m.id = d.machine_id
|
|
WHERE m.operator_user_id = :uid
|
|
ORDER BY d.created_at DESC
|
|
""",
|
|
{"uid": operator_user_id},
|
|
DcaDeposit,
|
|
)
|
|
|
|
|
|
async def update_deposit(
|
|
deposit_id: str, data: UpdateDepositData
|
|
) -> DcaDeposit | None:
|
|
update_data = {k: v for k, v in data.dict().items() if v is not None}
|
|
if not update_data:
|
|
return await get_deposit(deposit_id)
|
|
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
|
|
update_data["id"] = deposit_id
|
|
await db.execute(
|
|
f"UPDATE satoshimachine.dca_deposits SET {set_clause} WHERE id = :id",
|
|
update_data,
|
|
)
|
|
return await get_deposit(deposit_id)
|
|
|
|
|
|
async def update_deposit_status(
|
|
deposit_id: str, data: UpdateDepositStatusData
|
|
) -> DcaDeposit | None:
|
|
payload = {
|
|
"id": deposit_id,
|
|
"status": data.status,
|
|
"notes": data.notes,
|
|
"confirmed_at": datetime.now() if data.status == "confirmed" else None,
|
|
}
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_deposits
|
|
SET status = :status,
|
|
notes = COALESCE(:notes, notes),
|
|
confirmed_at = COALESCE(:confirmed_at, confirmed_at)
|
|
WHERE id = :id
|
|
""",
|
|
payload,
|
|
)
|
|
return await get_deposit(deposit_id)
|
|
|
|
|
|
async def delete_deposit(deposit_id: str) -> None:
|
|
await db.execute(
|
|
"DELETE FROM satoshimachine.dca_deposits WHERE id = :id",
|
|
{"id": deposit_id},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Settlements (bitSpire kind-21000 events)
|
|
# =============================================================================
|
|
|
|
|
|
async def create_settlement_idempotent(
|
|
data: CreateDcaSettlementData,
|
|
initial_status: str,
|
|
error_message: str | None = None,
|
|
) -> DcaSettlement | None:
|
|
"""Insert a settlement keyed by payment_hash.
|
|
|
|
Returns the inserted row on first sight; returns the existing row
|
|
if the payment_hash was already seen (subscription replay,
|
|
dispatcher double-fire). The UNIQUE constraint on payment_hash is
|
|
the source of truth.
|
|
|
|
`initial_status` is the row's status at insert time. Normal
|
|
settlements arrive as 'pending' and the distribution processor
|
|
transitions them through 'processing' → 'processed' / 'errored'.
|
|
A row that fails the Nostr attribution cross-check (bitspire.
|
|
assert_nostr_attribution) is inserted directly as 'rejected' with
|
|
the failure reason in `error_message` — never goes near the
|
|
distribution path.
|
|
"""
|
|
existing = await get_settlement_by_payment_hash(data.payment_hash)
|
|
if existing is not None:
|
|
return existing
|
|
settlement_id = urlsafe_short_hash()
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_settlements
|
|
(id, machine_id, payment_hash, bitspire_event_id, bitspire_txid,
|
|
wire_sats, fiat_amount, fiat_code, exchange_rate, principal_sats,
|
|
fee_sats, platform_fee_sats, operator_fee_sats, fee_mismatch_sats,
|
|
tx_type, bills_json, cassettes_json,
|
|
status, error_message, created_at)
|
|
VALUES (:id, :machine_id, :payment_hash, :bitspire_event_id,
|
|
:bitspire_txid, :wire_sats, :fiat_amount, :fiat_code,
|
|
:exchange_rate, :principal_sats, :fee_sats,
|
|
:platform_fee_sats, :operator_fee_sats, :fee_mismatch_sats,
|
|
:tx_type, :bills_json, :cassettes_json, :status,
|
|
:error_message, :created_at)
|
|
""",
|
|
{
|
|
"id": settlement_id,
|
|
"machine_id": data.machine_id,
|
|
"payment_hash": data.payment_hash,
|
|
"bitspire_event_id": data.bitspire_event_id,
|
|
"bitspire_txid": data.bitspire_txid,
|
|
"wire_sats": data.wire_sats,
|
|
"fiat_amount": data.fiat_amount,
|
|
"fiat_code": data.fiat_code,
|
|
"exchange_rate": data.exchange_rate,
|
|
"principal_sats": data.principal_sats,
|
|
"fee_sats": data.fee_sats,
|
|
"platform_fee_sats": data.platform_fee_sats,
|
|
"operator_fee_sats": data.operator_fee_sats,
|
|
"fee_mismatch_sats": data.fee_mismatch_sats,
|
|
"tx_type": data.tx_type,
|
|
"bills_json": data.bills_json,
|
|
"cassettes_json": data.cassettes_json,
|
|
"status": initial_status,
|
|
"error_message": error_message,
|
|
"created_at": datetime.now(),
|
|
},
|
|
)
|
|
return await get_settlement(settlement_id)
|
|
|
|
|
|
async def get_settlement(settlement_id: str) -> DcaSettlement | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.dca_settlements WHERE id = :id",
|
|
{"id": settlement_id},
|
|
DcaSettlement,
|
|
)
|
|
|
|
|
|
async def get_settlement_by_payment_hash(
|
|
payment_hash: str,
|
|
) -> DcaSettlement | None:
|
|
return await db.fetchone(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_settlements
|
|
WHERE payment_hash = :hash
|
|
""",
|
|
{"hash": payment_hash},
|
|
DcaSettlement,
|
|
)
|
|
|
|
|
|
async def get_settlements_for_machine(
|
|
machine_id: str, limit: int = 100
|
|
) -> list[DcaSettlement]:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_settlements
|
|
WHERE machine_id = :machine_id
|
|
ORDER BY created_at DESC
|
|
LIMIT :lim
|
|
""",
|
|
{"machine_id": machine_id, "lim": limit},
|
|
DcaSettlement,
|
|
)
|
|
|
|
|
|
async def get_stuck_settlements_for_operator(
|
|
operator_user_id: str, threshold_minutes: int = 30
|
|
) -> dict:
|
|
"""Operator worklist of settlements that didn't process cleanly.
|
|
|
|
Returns a dict with four keyed lists:
|
|
- 'rejected': any status='rejected' (Nostr attribution cross-check
|
|
failed — signer didn't match the machine identity). Distinct
|
|
from 'errored' because retry is wrong: the row was misrouted,
|
|
not operationally failed. Operator must investigate the machine.
|
|
- 'errored': any status='errored' (distribution failed for an
|
|
operational reason — wallet error, network, downstream payment).
|
|
Operator retries from this bucket.
|
|
- 'stuck_pending': status='pending' AND older than threshold
|
|
(listener crashed before invoking process_settlement).
|
|
- 'stuck_processing': status='processing' AND older than threshold
|
|
(processor crashed mid-flight; processing_claim is set but no
|
|
completion landed).
|
|
"""
|
|
from datetime import timedelta
|
|
|
|
threshold_at = datetime.now() - timedelta(minutes=threshold_minutes)
|
|
rejected = await db.fetchall(
|
|
"""
|
|
SELECT s.*
|
|
FROM satoshimachine.dca_settlements s
|
|
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
|
WHERE m.operator_user_id = :uid AND s.status = 'rejected'
|
|
ORDER BY s.created_at DESC
|
|
""",
|
|
{"uid": operator_user_id},
|
|
DcaSettlement,
|
|
)
|
|
errored = await db.fetchall(
|
|
"""
|
|
SELECT s.*
|
|
FROM satoshimachine.dca_settlements s
|
|
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
|
WHERE m.operator_user_id = :uid AND s.status = 'errored'
|
|
ORDER BY s.created_at DESC
|
|
""",
|
|
{"uid": operator_user_id},
|
|
DcaSettlement,
|
|
)
|
|
stuck_pending = await db.fetchall(
|
|
"""
|
|
SELECT s.*
|
|
FROM satoshimachine.dca_settlements s
|
|
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
|
WHERE m.operator_user_id = :uid
|
|
AND s.status = 'pending'
|
|
AND s.created_at < :threshold
|
|
ORDER BY s.created_at ASC
|
|
""",
|
|
{"uid": operator_user_id, "threshold": threshold_at},
|
|
DcaSettlement,
|
|
)
|
|
stuck_processing = await db.fetchall(
|
|
"""
|
|
SELECT s.*
|
|
FROM satoshimachine.dca_settlements s
|
|
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
|
WHERE m.operator_user_id = :uid
|
|
AND s.status = 'processing'
|
|
AND s.created_at < :threshold
|
|
ORDER BY s.created_at ASC
|
|
""",
|
|
{"uid": operator_user_id, "threshold": threshold_at},
|
|
DcaSettlement,
|
|
)
|
|
return {
|
|
"rejected": rejected,
|
|
"errored": errored,
|
|
"stuck_pending": stuck_pending,
|
|
"stuck_processing": stuck_processing,
|
|
}
|
|
|
|
|
|
async def force_reset_stuck_settlement(
|
|
settlement_id: str,
|
|
) -> DcaSettlement | None:
|
|
"""Operator escape hatch for genuinely stuck settlements (processor
|
|
crashed mid-flight, etc.). Flips 'pending'/'processing' → 'errored' so
|
|
the existing retry endpoint can take over. Clears processing_claim.
|
|
|
|
Caller is responsible for verifying the settlement is *actually* stuck
|
|
(e.g., via threshold check on created_at). This function trusts the
|
|
decision."""
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_settlements
|
|
SET status = 'errored',
|
|
processing_claim = NULL,
|
|
error_message = 'force-reset by operator (was stuck)'
|
|
WHERE id = :id AND status IN ('pending', 'processing')
|
|
""",
|
|
{"id": settlement_id},
|
|
)
|
|
return await get_settlement(settlement_id)
|
|
|
|
|
|
async def get_settlements_for_operator(
|
|
operator_user_id: str, limit: int = 200
|
|
) -> list[DcaSettlement]:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT s.*
|
|
FROM satoshimachine.dca_settlements s
|
|
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
|
WHERE m.operator_user_id = :uid
|
|
ORDER BY s.created_at DESC
|
|
LIMIT :lim
|
|
""",
|
|
{"uid": operator_user_id, "lim": limit},
|
|
DcaSettlement,
|
|
)
|
|
|
|
|
|
async def mark_settlement_status(
|
|
settlement_id: str,
|
|
status: str,
|
|
error_message: str | None = None,
|
|
) -> DcaSettlement | None:
|
|
"""Status: 'pending' | 'processing' | 'processed' | 'partial' |
|
|
'refunded' | 'errored'. Clears processing_claim on terminal states so a
|
|
fresh claim attempt won't see a stale token."""
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_settlements
|
|
SET status = :status,
|
|
error_message = :err,
|
|
processed_at = CASE
|
|
WHEN :status IN ('processed', 'partial', 'refunded')
|
|
THEN :now ELSE processed_at
|
|
END,
|
|
processing_claim = CASE
|
|
WHEN :status = 'processing' THEN processing_claim
|
|
ELSE NULL
|
|
END
|
|
WHERE id = :id
|
|
""",
|
|
{
|
|
"id": settlement_id,
|
|
"status": status,
|
|
"err": error_message,
|
|
"now": datetime.now(),
|
|
},
|
|
)
|
|
return await get_settlement(settlement_id)
|
|
|
|
|
|
async def claim_settlement_for_processing(
|
|
settlement_id: str,
|
|
) -> DcaSettlement | None:
|
|
"""Optimistic-lock claim: atomically flip a settlement to 'processing'
|
|
and tag it with a per-invocation token. Returns the claimed row on
|
|
success; None if another caller already won the claim or the settlement
|
|
is not in a claimable state ('pending').
|
|
|
|
Pattern is portable across SQLite + PostgreSQL (doesn't rely on
|
|
UPDATE ... RETURNING). Two concurrent invocations may both run the
|
|
UPDATE, but only one row matches the WHERE clause; the loser's UPDATE
|
|
is a no-op against status='processing'. The read-back check on the
|
|
token disambiguates."""
|
|
token = urlsafe_short_hash()
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_settlements
|
|
SET status = 'processing', processing_claim = :token
|
|
WHERE id = :id AND status = 'pending'
|
|
""",
|
|
{"id": settlement_id, "token": token},
|
|
)
|
|
after = await get_settlement(settlement_id)
|
|
if after is None:
|
|
return None
|
|
if after.processing_claim != token:
|
|
return None
|
|
return after
|
|
|
|
|
|
async def reset_settlement_for_retry(
|
|
settlement_id: str,
|
|
) -> DcaSettlement | None:
|
|
"""Operator retry path. Flips 'errored' → 'pending' and voids any
|
|
'failed' legs so process_settlement re-runs them fresh. Completed legs
|
|
are left in place — we never re-pay sats that already moved."""
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_payments
|
|
SET status = 'voided'
|
|
WHERE settlement_id = :sid AND status = 'failed'
|
|
""",
|
|
{"sid": settlement_id},
|
|
)
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_settlements
|
|
SET status = 'pending',
|
|
error_message = NULL,
|
|
processing_claim = NULL,
|
|
processed_at = NULL
|
|
WHERE id = :id AND status = 'errored'
|
|
""",
|
|
{"id": settlement_id},
|
|
)
|
|
return await get_settlement(settlement_id)
|
|
|
|
|
|
async def apply_partial_dispense(
|
|
settlement_id: str,
|
|
*,
|
|
new_wire_sats: int,
|
|
new_principal_sats: int,
|
|
new_fee_sats: int,
|
|
new_platform_fee_sats: int,
|
|
new_operator_fee_sats: int,
|
|
new_fiat_amount: float,
|
|
appended_note: str,
|
|
) -> DcaSettlement | None:
|
|
"""Overwrite the monetary fields on a settlement (partial-dispense
|
|
recompute) and prepend `appended_note` to the notes column.
|
|
|
|
Notes are append-only: new lines go at the top (newest first) so the
|
|
settlement detail view shows the most recent adjustment first without
|
|
needing to scroll. Resets status to 'pending' so process_settlement
|
|
can re-distribute via the existing idempotent path."""
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_settlements
|
|
SET wire_sats = :gross,
|
|
principal_sats = :principal,
|
|
fee_sats = :commission,
|
|
platform_fee_sats = :platform,
|
|
operator_fee_sats = :operator,
|
|
fiat_amount = :fiat,
|
|
status = 'pending',
|
|
error_message = NULL,
|
|
processed_at = NULL,
|
|
notes = CASE
|
|
WHEN notes IS NULL OR notes = '' THEN :note
|
|
ELSE :note || char(10) || char(10) || notes
|
|
END
|
|
WHERE id = :id
|
|
""",
|
|
{
|
|
"id": settlement_id,
|
|
"gross": new_wire_sats,
|
|
"principal": new_principal_sats,
|
|
"commission": new_fee_sats,
|
|
"platform": new_platform_fee_sats,
|
|
"operator": new_operator_fee_sats,
|
|
"fiat": new_fiat_amount,
|
|
"note": appended_note,
|
|
},
|
|
)
|
|
return await get_settlement(settlement_id)
|
|
|
|
|
|
async def count_completed_legs_for_settlement(settlement_id: str) -> int:
|
|
"""Used by partial-dispense to refuse adjustments after any leg has
|
|
successfully moved sats (Lightning payments can't be clawed back)."""
|
|
row = await db.fetchone(
|
|
"""
|
|
SELECT COUNT(*) AS n FROM satoshimachine.dca_payments
|
|
WHERE settlement_id = :sid AND status = 'completed'
|
|
""",
|
|
{"sid": settlement_id},
|
|
)
|
|
return int(row["n"]) if row else 0
|
|
|
|
|
|
async def append_settlement_note(
|
|
settlement_id: str, note: str, author_user_id: str
|
|
) -> DcaSettlement | None:
|
|
"""Prepend an operator-authored note to settlement.notes. Each entry is
|
|
timestamped (UTC) and tagged with the author's user id so the trail
|
|
is accountable. Append-only: existing entries are never edited."""
|
|
from datetime import timezone
|
|
|
|
ts = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
formatted = f"[{ts} by {author_user_id}] {note}"
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_settlements
|
|
SET notes = CASE
|
|
WHEN notes IS NULL OR notes = '' THEN :note
|
|
ELSE :note || char(10) || char(10) || notes
|
|
END
|
|
WHERE id = :id
|
|
""",
|
|
{"id": settlement_id, "note": formatted},
|
|
)
|
|
return await get_settlement(settlement_id)
|
|
|
|
|
|
async def void_open_legs_for_settlement(settlement_id: str) -> None:
|
|
"""Marks open legs as 'voided' before re-running distribution on a
|
|
partial-dispense recompute. Preserves the rows for audit but stops
|
|
them from being interpreted as live. Includes 'skipped' so that audit
|
|
rows from a prior attempt don't double-count once the new attempt
|
|
writes its own (possibly different) skipped reasons."""
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_payments
|
|
SET status = 'voided'
|
|
WHERE settlement_id = :sid
|
|
AND status IN ('pending', 'failed', 'skipped')
|
|
""",
|
|
{"sid": settlement_id},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Commission splits — operator's remainder-distribution rules.
|
|
# =============================================================================
|
|
|
|
|
|
async def get_commission_splits(
|
|
operator_user_id: str, machine_id: str | None = None
|
|
) -> list[CommissionSplit]:
|
|
"""Returns the rule set for the given scope.
|
|
|
|
Precedence (caller's responsibility): try per-machine override first;
|
|
if empty, fall back to operator default (machine_id IS NULL).
|
|
"""
|
|
if machine_id is None:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_commission_splits
|
|
WHERE operator_user_id = :uid AND machine_id IS NULL
|
|
ORDER BY sort_order ASC
|
|
""",
|
|
{"uid": operator_user_id},
|
|
CommissionSplit,
|
|
)
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_commission_splits
|
|
WHERE operator_user_id = :uid AND machine_id = :mid
|
|
ORDER BY sort_order ASC
|
|
""",
|
|
{"uid": operator_user_id, "mid": machine_id},
|
|
CommissionSplit,
|
|
)
|
|
|
|
|
|
async def get_effective_commission_splits(
|
|
operator_user_id: str, machine_id: str
|
|
) -> list[CommissionSplit]:
|
|
"""Per-machine override if set, otherwise operator's default ruleset."""
|
|
overrides = await get_commission_splits(operator_user_id, machine_id)
|
|
if overrides:
|
|
return overrides
|
|
return await get_commission_splits(operator_user_id, None)
|
|
|
|
|
|
async def replace_commission_splits(
|
|
operator_user_id: str,
|
|
machine_id: str | None,
|
|
legs: list[CommissionSplitLeg],
|
|
) -> list[CommissionSplit]:
|
|
"""Atomic replace for the (operator, machine) scope. Caller should have
|
|
already validated legs sum to 1.0 via the Pydantic model."""
|
|
if machine_id is None:
|
|
await db.execute(
|
|
"""
|
|
DELETE FROM satoshimachine.dca_commission_splits
|
|
WHERE operator_user_id = :uid AND machine_id IS NULL
|
|
""",
|
|
{"uid": operator_user_id},
|
|
)
|
|
else:
|
|
await db.execute(
|
|
"""
|
|
DELETE FROM satoshimachine.dca_commission_splits
|
|
WHERE operator_user_id = :uid AND machine_id = :mid
|
|
""",
|
|
{"uid": operator_user_id, "mid": machine_id},
|
|
)
|
|
now = datetime.now()
|
|
for leg in legs:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_commission_splits
|
|
(id, machine_id, operator_user_id, target, label, fraction,
|
|
sort_order, created_at)
|
|
VALUES (:id, :machine_id, :uid, :target, :label, :fraction,
|
|
:sort_order, :created_at)
|
|
""",
|
|
{
|
|
"id": urlsafe_short_hash(),
|
|
"machine_id": machine_id,
|
|
"uid": operator_user_id,
|
|
"target": leg.target,
|
|
"label": leg.label,
|
|
"fraction": leg.fraction,
|
|
"sort_order": leg.sort_order,
|
|
"created_at": now,
|
|
},
|
|
)
|
|
return await get_commission_splits(operator_user_id, machine_id)
|
|
|
|
|
|
# =============================================================================
|
|
# Payments — distribution legs.
|
|
# =============================================================================
|
|
|
|
|
|
async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment:
|
|
payment_id = urlsafe_short_hash()
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_payments
|
|
(id, settlement_id, client_id, machine_id, operator_user_id,
|
|
leg_type, destination_wallet_id, destination_ln_address,
|
|
amount_sats, amount_fiat, exchange_rate, transaction_time,
|
|
external_payment_hash, status, created_at)
|
|
VALUES (:id, :settlement_id, :client_id, :machine_id,
|
|
:operator_user_id, :leg_type, :destination_wallet_id,
|
|
:destination_ln_address, :amount_sats, :amount_fiat,
|
|
:exchange_rate, :transaction_time, :external_payment_hash,
|
|
:status, :created_at)
|
|
""",
|
|
{
|
|
"id": payment_id,
|
|
"settlement_id": data.settlement_id,
|
|
"client_id": data.client_id,
|
|
"machine_id": data.machine_id,
|
|
"operator_user_id": data.operator_user_id,
|
|
"leg_type": data.leg_type,
|
|
"destination_wallet_id": data.destination_wallet_id,
|
|
"destination_ln_address": data.destination_ln_address,
|
|
"amount_sats": data.amount_sats,
|
|
"amount_fiat": data.amount_fiat,
|
|
"exchange_rate": data.exchange_rate,
|
|
"transaction_time": data.transaction_time,
|
|
"external_payment_hash": data.external_payment_hash,
|
|
"status": "pending",
|
|
"created_at": datetime.now(),
|
|
},
|
|
)
|
|
payment = await get_dca_payment(payment_id)
|
|
assert payment is not None
|
|
return payment
|
|
|
|
|
|
async def get_dca_payment(payment_id: str) -> DcaPayment | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.dca_payments WHERE id = :id",
|
|
{"id": payment_id},
|
|
DcaPayment,
|
|
)
|
|
|
|
|
|
async def get_payments_for_settlement(settlement_id: str) -> list[DcaPayment]:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_payments
|
|
WHERE settlement_id = :sid
|
|
ORDER BY created_at ASC
|
|
""",
|
|
{"sid": settlement_id},
|
|
DcaPayment,
|
|
)
|
|
|
|
|
|
async def get_payments_for_client(client_id: str) -> list[DcaPayment]:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_payments
|
|
WHERE client_id = :cid
|
|
ORDER BY created_at DESC
|
|
""",
|
|
{"cid": client_id},
|
|
DcaPayment,
|
|
)
|
|
|
|
|
|
async def get_payments_for_operator(
|
|
operator_user_id: str, leg_type: str | None = None, limit: int = 200
|
|
) -> list[DcaPayment]:
|
|
if leg_type is None:
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_payments
|
|
WHERE operator_user_id = :uid
|
|
ORDER BY created_at DESC
|
|
LIMIT :lim
|
|
""",
|
|
{"uid": operator_user_id, "lim": limit},
|
|
DcaPayment,
|
|
)
|
|
return await db.fetchall(
|
|
"""
|
|
SELECT * FROM satoshimachine.dca_payments
|
|
WHERE operator_user_id = :uid AND leg_type = :leg
|
|
ORDER BY created_at DESC
|
|
LIMIT :lim
|
|
""",
|
|
{"uid": operator_user_id, "leg": leg_type, "lim": limit},
|
|
DcaPayment,
|
|
)
|
|
|
|
|
|
async def update_payment_status(
|
|
payment_id: str,
|
|
status: str,
|
|
external_payment_hash: str | None = None,
|
|
error_message: str | None = None,
|
|
) -> DcaPayment | None:
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_payments
|
|
SET status = :status,
|
|
external_payment_hash = COALESCE(:hash, external_payment_hash),
|
|
error_message = :err
|
|
WHERE id = :id
|
|
""",
|
|
{
|
|
"id": payment_id,
|
|
"status": status,
|
|
"hash": external_payment_hash,
|
|
"err": error_message,
|
|
},
|
|
)
|
|
return await get_dca_payment(payment_id)
|
|
|
|
|
|
# =============================================================================
|
|
# Balance summaries
|
|
# =============================================================================
|
|
|
|
|
|
async def get_client_balance_summary(
|
|
client_id: str,
|
|
) -> ClientBalanceSummary | None:
|
|
"""Per-client (and per-machine, since clients are per-machine in v2) summary.
|
|
|
|
DCA legs only — settlement/autoforward/super_fee/operator_split legs are
|
|
not credited against an LP's balance.
|
|
"""
|
|
client = await get_dca_client(client_id)
|
|
if client is None:
|
|
return None
|
|
deposits_row = await db.fetchone(
|
|
"""
|
|
SELECT COALESCE(SUM(amount), 0) AS total
|
|
FROM satoshimachine.dca_deposits
|
|
WHERE client_id = :cid AND status = 'confirmed'
|
|
""",
|
|
{"cid": client_id},
|
|
)
|
|
# Both DCA legs (auto, from bitSpire settlements) and balance-settle legs
|
|
# (operator-initiated under #4) reduce the LP's remaining fiat balance.
|
|
payments_row = await db.fetchone(
|
|
"""
|
|
SELECT COALESCE(SUM(amount_fiat), 0) AS total
|
|
FROM satoshimachine.dca_payments
|
|
WHERE client_id = :cid
|
|
AND leg_type IN ('dca', 'settlement')
|
|
AND status = 'completed'
|
|
""",
|
|
{"cid": client_id},
|
|
)
|
|
total_deposits = float(deposits_row["total"]) if deposits_row else 0.0
|
|
total_payments = float(payments_row["total"]) if payments_row else 0.0
|
|
# fiat code: take it from the machine (clients inherit their machine's fiat)
|
|
machine = await get_machine(client.machine_id)
|
|
currency = machine.fiat_code if machine else "GTQ"
|
|
return ClientBalanceSummary(
|
|
client_id=client_id,
|
|
machine_id=client.machine_id,
|
|
total_deposits=round(total_deposits, 2),
|
|
total_payments=round(total_payments, 2),
|
|
remaining_balance=round(total_deposits - total_payments, 2),
|
|
currency=currency,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Telemetry — sparse beacon (kind-30078) and fleet snapshot (kind-30079) state.
|
|
# =============================================================================
|
|
|
|
|
|
async def get_telemetry(machine_id: str) -> TelemetrySnapshot | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.dca_telemetry WHERE machine_id = :mid",
|
|
{"mid": machine_id},
|
|
TelemetrySnapshot,
|
|
)
|
|
|
|
|
|
async def upsert_beacon_snapshot(
|
|
machine_id: str,
|
|
*,
|
|
cash_in: bool | None = None,
|
|
cash_out: bool | None = None,
|
|
cash_level: str | None = None,
|
|
fiat: str | None = None,
|
|
model: str | None = None,
|
|
name: str | None = None,
|
|
location: str | None = None,
|
|
geo: str | None = None,
|
|
fees_json: str | None = None,
|
|
limits_json: str | None = None,
|
|
denominations_json: str | None = None,
|
|
version: str | None = None,
|
|
) -> TelemetrySnapshot | None:
|
|
"""Upsert kind-30078 beacon fields. All fields are nullable because today's
|
|
upstream payload only carries cash_in/cash_out/cash_level/fiat/model (see
|
|
lamassu-next#43 — the enrichment is not yet shipped)."""
|
|
existing = await get_telemetry(machine_id)
|
|
now = datetime.now()
|
|
if existing is None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_telemetry
|
|
(machine_id, beacon_cash_in, beacon_cash_out, beacon_cash_level,
|
|
beacon_fiat, beacon_model, beacon_name, beacon_location,
|
|
beacon_geo, beacon_fees_json, beacon_limits_json,
|
|
beacon_denominations_json, beacon_version, beacon_received_at)
|
|
VALUES (:mid, :cash_in, :cash_out, :cash_level, :fiat, :model,
|
|
:name, :location, :geo, :fees, :limits, :denoms,
|
|
:version, :now)
|
|
""",
|
|
{
|
|
"mid": machine_id,
|
|
"cash_in": cash_in,
|
|
"cash_out": cash_out,
|
|
"cash_level": cash_level,
|
|
"fiat": fiat,
|
|
"model": model,
|
|
"name": name,
|
|
"location": location,
|
|
"geo": geo,
|
|
"fees": fees_json,
|
|
"limits": limits_json,
|
|
"denoms": denominations_json,
|
|
"version": version,
|
|
"now": now,
|
|
},
|
|
)
|
|
else:
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_telemetry SET
|
|
beacon_cash_in = COALESCE(:cash_in, beacon_cash_in),
|
|
beacon_cash_out = COALESCE(:cash_out, beacon_cash_out),
|
|
beacon_cash_level = COALESCE(:cash_level, beacon_cash_level),
|
|
beacon_fiat = COALESCE(:fiat, beacon_fiat),
|
|
beacon_model = COALESCE(:model, beacon_model),
|
|
beacon_name = COALESCE(:name, beacon_name),
|
|
beacon_location = COALESCE(:location, beacon_location),
|
|
beacon_geo = COALESCE(:geo, beacon_geo),
|
|
beacon_fees_json = COALESCE(:fees, beacon_fees_json),
|
|
beacon_limits_json = COALESCE(:limits, beacon_limits_json),
|
|
beacon_denominations_json =
|
|
COALESCE(:denoms, beacon_denominations_json),
|
|
beacon_version = COALESCE(:version, beacon_version),
|
|
beacon_received_at = :now
|
|
WHERE machine_id = :mid
|
|
""",
|
|
{
|
|
"mid": machine_id,
|
|
"cash_in": cash_in,
|
|
"cash_out": cash_out,
|
|
"cash_level": cash_level,
|
|
"fiat": fiat,
|
|
"model": model,
|
|
"name": name,
|
|
"location": location,
|
|
"geo": geo,
|
|
"fees": fees_json,
|
|
"limits": limits_json,
|
|
"denoms": denominations_json,
|
|
"version": version,
|
|
"now": now,
|
|
},
|
|
)
|
|
return await get_telemetry(machine_id)
|
|
|
|
|
|
async def upsert_fleet_snapshot(
|
|
machine_id: str, telemetry_json: str
|
|
) -> TelemetrySnapshot | None:
|
|
"""Upsert kind-30079 operator-only telemetry. Awaits lamassu-next#42 to
|
|
produce a real schema; we store the raw JSON blob until then."""
|
|
existing = await get_telemetry(machine_id)
|
|
now = datetime.now()
|
|
if existing is None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.dca_telemetry
|
|
(machine_id, telemetry_json, telemetry_received_at)
|
|
VALUES (:mid, :json, :now)
|
|
""",
|
|
{"mid": machine_id, "json": telemetry_json, "now": now},
|
|
)
|
|
else:
|
|
await db.execute(
|
|
"""
|
|
UPDATE satoshimachine.dca_telemetry
|
|
SET telemetry_json = :json, telemetry_received_at = :now
|
|
WHERE machine_id = :mid
|
|
""",
|
|
{"mid": machine_id, "json": telemetry_json, "now": now},
|
|
)
|
|
return await get_telemetry(machine_id)
|
|
|
|
|
|
# =============================================================================
|
|
# Cassette configs — operator-driven ATM cassette inventory (#29 v1.1).
|
|
# =============================================================================
|
|
# Row lifecycle per #29:
|
|
# - First population for a (machine_id, position) pair → apply_bootstrap_state
|
|
# (consumer reading the ATM's one-shot bitspire-cassettes-state event)
|
|
# - Operator edit of denomination or count → update_cassette_config
|
|
# (refuses to create new rows; the slot count is hardware-determined)
|
|
# - Row creation/deletion for a new position → admin only, via ATM
|
|
# re-provisioning + new bootstrap event (not exposed in v1 here)
|
|
|
|
|
|
def _should_apply_bootstrap_state(
|
|
existing_state_event_id: str | None, incoming_event_id: str
|
|
) -> bool:
|
|
"""Pure-function dedup gate for apply_bootstrap_state.
|
|
|
|
Returns False if any existing row for this machine already references
|
|
the incoming event_id (relay re-delivery after restart). True otherwise.
|
|
|
|
Extracted as a pure function so the dedup decision is unit-testable
|
|
without a database round-trip. The actual idempotency check in
|
|
apply_bootstrap_state fetches one existing row and passes its
|
|
state_event_id here.
|
|
"""
|
|
return existing_state_event_id != incoming_event_id
|
|
|
|
|
|
async def get_cassette_config(
|
|
machine_id: str, position: int
|
|
) -> CassetteConfig | None:
|
|
return await db.fetchone(
|
|
"SELECT * FROM satoshimachine.cassette_configs "
|
|
"WHERE machine_id = :mid AND position = :pos",
|
|
{"mid": machine_id, "pos": position},
|
|
CassetteConfig,
|
|
)
|
|
|
|
|
|
async def list_cassette_configs_for_machine(
|
|
machine_id: str,
|
|
) -> list[CassetteConfig]:
|
|
return await db.fetchall(
|
|
"SELECT * FROM satoshimachine.cassette_configs "
|
|
"WHERE machine_id = :mid ORDER BY position",
|
|
{"mid": machine_id},
|
|
CassetteConfig,
|
|
)
|
|
|
|
|
|
async def update_cassette_config(
|
|
machine_id: str,
|
|
position: int,
|
|
data: UpsertCassetteConfigData,
|
|
*,
|
|
updated_by: str | None = None,
|
|
) -> CassetteConfig | None:
|
|
"""Operator-driven row update: change denomination and/or count for a
|
|
single cassette slot. Refuses to create new rows — those only land via
|
|
apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row
|
|
lifecycle: hardware-determined slot count, not operator-creatable).
|
|
Returns None if the (machine_id, position) row doesn't exist.
|
|
"""
|
|
existing = await get_cassette_config(machine_id, position)
|
|
if existing is None:
|
|
return None
|
|
update_data: dict = {k: v for k, v in data.dict().items() if v is not None}
|
|
if not update_data:
|
|
return existing
|
|
update_data["updated_at"] = datetime.now()
|
|
update_data["updated_by"] = updated_by
|
|
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
|
|
update_data["mid"] = machine_id
|
|
update_data["pos"] = position
|
|
await db.execute(
|
|
f"UPDATE satoshimachine.cassette_configs SET {set_clause} "
|
|
"WHERE machine_id = :mid AND position = :pos",
|
|
update_data,
|
|
)
|
|
return await get_cassette_config(machine_id, position)
|
|
|
|
|
|
async def apply_bootstrap_state(
|
|
machine_id: str,
|
|
event_id: str,
|
|
event_created_at: datetime,
|
|
payload: PublishCassettesPayload,
|
|
) -> bool:
|
|
"""Consume an ATM-published kind-30078 bitspire-cassettes-state:<m> event
|
|
and upsert one cassette_configs row per position in the payload.
|
|
|
|
Returns True if the upsert ran; False if any existing row for this
|
|
machine already references this event_id (idempotent on relay
|
|
re-delivery / restart).
|
|
|
|
Populates both the operator-believed columns (denomination, count,
|
|
updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel
|
|
columns (state_denomination, state_count, state_at, state_event_id)
|
|
so the operator's initial view matches the ATM's reported state. v2
|
|
reconciliation UI will diverge them when continuous reverse-channel
|
|
events land + the operator subsequently edits.
|
|
"""
|
|
existing_first: dict | None = await db.fetchone(
|
|
"SELECT state_event_id FROM satoshimachine.cassette_configs "
|
|
"WHERE machine_id = :mid LIMIT 1",
|
|
{"mid": machine_id},
|
|
)
|
|
existing_event_id: str | None = None
|
|
if existing_first is not None:
|
|
existing_event_id = (
|
|
existing_first.get("state_event_id")
|
|
if isinstance(existing_first, dict)
|
|
else getattr(existing_first, "state_event_id", None)
|
|
)
|
|
if not _should_apply_bootstrap_state(existing_event_id, event_id):
|
|
return False
|
|
|
|
now = datetime.now()
|
|
for pos, row in payload.positions.items():
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO satoshimachine.cassette_configs
|
|
(machine_id, position, denomination, count, updated_at,
|
|
updated_by, state_denomination, state_count, state_at,
|
|
state_event_id)
|
|
VALUES (:mid, :pos, :denom, :count, :now, :by,
|
|
:state_denom, :state_count, :state_at, :event_id)
|
|
ON CONFLICT (machine_id, position) DO UPDATE SET
|
|
denomination = excluded.denomination,
|
|
count = excluded.count,
|
|
updated_at = excluded.updated_at,
|
|
updated_by = excluded.updated_by,
|
|
state_denomination = excluded.state_denomination,
|
|
state_count = excluded.state_count,
|
|
state_at = excluded.state_at,
|
|
state_event_id = excluded.state_event_id
|
|
""",
|
|
{
|
|
"mid": machine_id,
|
|
"pos": pos,
|
|
"denom": row.denomination,
|
|
"count": row.count,
|
|
"now": now,
|
|
"by": "atm-bootstrap",
|
|
"state_denom": row.denomination,
|
|
"state_count": row.count,
|
|
"state_at": event_created_at,
|
|
"event_id": event_id,
|
|
},
|
|
)
|
|
return True
|