diff --git a/crud.py b/crud.py index 326352c..7c3d6db 100644 --- a/crud.py +++ b/crud.py @@ -1,45 +1,194 @@ -# Description: This file contains the CRUD operations for talking to the database. +# Satoshi Machine v2 — CRUD layer over the m005 schema. +# +# All operator-scoped queries take an operator_user_id and enforce isolation +# at the SQL boundary. Cross-operator LP queries (for satmachineclient) join +# through dca_machines.operator_user_id. See plan section "Identity & multi- +# machine model". -from typing import List, Optional, Union -from datetime import datetime, timezone +from datetime import datetime +from typing import List, Optional from lnbits.db import Database from lnbits.helpers import urlsafe_short_hash from .models import ( - CreateDcaClientData, DcaClient, UpdateDcaClientData, - CreateDepositData, DcaDeposit, UpdateDepositData, UpdateDepositStatusData, - CreateDcaPaymentData, DcaPayment, ClientBalanceSummary, - CreateLamassuConfigData, LamassuConfig, UpdateLamassuConfigData, - CreateLamassuTransactionData, StoredLamassuTransaction + CommissionSplit, + CommissionSplitLeg, + CreateDcaClientData, + CreateDcaPaymentData, + CreateDcaSettlementData, + CreateDepositData, + CreateMachineData, + DcaClient, + DcaDeposit, + DcaPayment, + DcaSettlement, + Machine, + SuperConfig, + TelemetrySnapshot, + UpdateDcaClientData, + UpdateDepositData, + UpdateDepositStatusData, + UpdateMachineData, + UpdateSuperConfigData, ) db = Database("ext_satoshimachine") -# DCA Client CRUD Operations -async def create_dca_client(data: CreateDcaClientData) -> DcaClient: - client_id = urlsafe_short_hash() +# ============================================================================= +# Super config +# ============================================================================= + + +async def get_super_config() -> Optional[SuperConfig]: + return await db.fetchone( + "SELECT * FROM satoshimachine.super_config WHERE id = :id", + {"id": "default"}, + SuperConfig, + ) + + +async def update_super_config(data: UpdateSuperConfigData) -> Optional[SuperConfig]: + update_data = {k: v for k, v in data.dict().items() if v is not None} + if not update_data: + return await get_super_config() + update_data["updated_at"] = datetime.now() + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) + update_data["id"] = "default" + await db.execute( + f"UPDATE satoshimachine.super_config SET {set_clause} WHERE id = :id", + update_data, + ) + return await get_super_config() + + +# ============================================================================= +# Machines +# ============================================================================= + + +async def create_machine( + operator_user_id: str, data: CreateMachineData +) -> Machine: + machine_id = urlsafe_short_hash() + now = datetime.now() await db.execute( """ - INSERT INTO satoshimachine.dca_clients - (id, user_id, wallet_id, username, dca_mode, fixed_mode_daily_limit, status, created_at, updated_at) - VALUES (:id, :user_id, :wallet_id, :username, :dca_mode, :fixed_mode_daily_limit, :status, :created_at, :updated_at) + INSERT INTO satoshimachine.dca_machines + (id, operator_user_id, machine_npub, wallet_id, name, location, + fiat_code, is_active, fallback_commission_pct, created_at, updated_at) + VALUES (:id, :operator_user_id, :machine_npub, :wallet_id, :name, + :location, :fiat_code, :is_active, :fallback_commission_pct, + :created_at, :updated_at) + """, + { + "id": machine_id, + "operator_user_id": operator_user_id, + "machine_npub": data.machine_npub, + "wallet_id": data.wallet_id, + "name": data.name, + "location": data.location, + "fiat_code": data.fiat_code, + "is_active": True, + "fallback_commission_pct": data.fallback_commission_pct, + "created_at": now, + "updated_at": now, + }, + ) + machine = await get_machine(machine_id) + assert machine is not None + return machine + + +async def get_machine(machine_id: str) -> Optional[Machine]: + return await db.fetchone( + "SELECT * FROM satoshimachine.dca_machines WHERE id = :id", + {"id": machine_id}, + Machine, + ) + + +async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]: + return await db.fetchone( + "SELECT * FROM satoshimachine.dca_machines WHERE machine_npub = :npub", + {"npub": machine_npub}, + Machine, + ) + + +async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_machines + WHERE operator_user_id = :uid + ORDER BY created_at DESC + """, + {"uid": operator_user_id}, + Machine, + ) + + +async def update_machine( + machine_id: str, data: UpdateMachineData +) -> Optional[Machine]: + update_data = {k: v for k, v in data.dict().items() if v is not None} + if not update_data: + return await get_machine(machine_id) + update_data["updated_at"] = datetime.now() + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) + update_data["id"] = machine_id + await db.execute( + f"UPDATE satoshimachine.dca_machines SET {set_clause} WHERE id = :id", + update_data, + ) + return await get_machine(machine_id) + + +async def delete_machine(machine_id: str) -> None: + await db.execute( + "DELETE FROM satoshimachine.dca_machines WHERE id = :id", + {"id": machine_id}, + ) + + +# ============================================================================= +# DCA Clients (LPs) +# ============================================================================= + + +async def create_dca_client(data: CreateDcaClientData) -> DcaClient: + client_id = urlsafe_short_hash() + now = datetime.now() + await db.execute( + """ + INSERT INTO satoshimachine.dca_clients + (id, machine_id, user_id, wallet_id, username, dca_mode, + fixed_mode_daily_limit, autoforward_ln_address, autoforward_enabled, + status, created_at, updated_at) + VALUES (:id, :machine_id, :user_id, :wallet_id, :username, :dca_mode, + :fixed_mode_daily_limit, :autoforward_ln_address, + :autoforward_enabled, :status, :created_at, :updated_at) """, { "id": client_id, + "machine_id": data.machine_id, "user_id": data.user_id, "wallet_id": data.wallet_id, "username": data.username, "dca_mode": data.dca_mode, "fixed_mode_daily_limit": data.fixed_mode_daily_limit, + "autoforward_ln_address": data.autoforward_ln_address, + "autoforward_enabled": data.autoforward_enabled, "status": "active", - "created_at": datetime.now(), - "updated_at": datetime.now() - } + "created_at": now, + "updated_at": now, + }, ) - return await get_dca_client(client_id) + client = await get_dca_client(client_id) + assert client is not None + return client async def get_dca_client(client_id: str) -> Optional[DcaClient]: @@ -50,64 +199,129 @@ async def get_dca_client(client_id: str) -> Optional[DcaClient]: ) -async def get_dca_clients() -> List[DcaClient]: - return await db.fetchall( - "SELECT * FROM satoshimachine.dca_clients ORDER BY created_at DESC", - model=DcaClient, +async def get_dca_client_for_machine_user( + machine_id: str, user_id: str +) -> Optional[DcaClient]: + return await db.fetchone( + """ + SELECT * FROM satoshimachine.dca_clients + WHERE machine_id = :machine_id AND user_id = :user_id + """, + {"machine_id": machine_id, "user_id": user_id}, + DcaClient, ) -async def get_dca_client_by_user(user_id: str) -> Optional[DcaClient]: - return await db.fetchone( - "SELECT * FROM satoshimachine.dca_clients WHERE user_id = :user_id", +async def get_dca_clients_for_machine(machine_id: str) -> List[DcaClient]: + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_clients + WHERE machine_id = :machine_id + ORDER BY created_at DESC + """, + {"machine_id": machine_id}, + DcaClient, + ) + + +async def get_dca_clients_for_operator(operator_user_id: str) -> List[DcaClient]: + """All clients across every machine this operator owns.""" + return await db.fetchall( + """ + SELECT c.* + FROM satoshimachine.dca_clients c + JOIN satoshimachine.dca_machines m ON m.id = c.machine_id + WHERE m.operator_user_id = :uid + ORDER BY c.created_at DESC + """, + {"uid": operator_user_id}, + DcaClient, + ) + + +async def get_dca_clients_for_user(user_id: str) -> List[DcaClient]: + """LP cross-operator view — every machine this LP is registered at.""" + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_clients + WHERE user_id = :user_id + ORDER BY created_at DESC + """, {"user_id": user_id}, DcaClient, ) -async def update_dca_client(client_id: str, data: UpdateDcaClientData) -> Optional[DcaClient]: +async def get_flow_mode_clients_for_machine(machine_id: str) -> List[DcaClient]: + """Active flow-mode clients used by the distribution algorithm.""" + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_clients + WHERE machine_id = :machine_id + AND dca_mode = 'flow' + AND status = 'active' + ORDER BY created_at ASC + """, + {"machine_id": machine_id}, + DcaClient, + ) + + +async def update_dca_client( + client_id: str, data: UpdateDcaClientData +) -> Optional[DcaClient]: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_dca_client(client_id) - update_data["updated_at"] = datetime.now() - set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) update_data["id"] = client_id - await db.execute( f"UPDATE satoshimachine.dca_clients SET {set_clause} WHERE id = :id", - update_data + update_data, ) return await get_dca_client(client_id) async def delete_dca_client(client_id: str) -> None: await db.execute( - "DELETE FROM satoshimachine.dca_clients WHERE id = :id", - {"id": client_id} + "DELETE FROM satoshimachine.dca_clients WHERE id = :id", + {"id": client_id}, ) -# DCA Deposit CRUD Operations -async def create_deposit(data: CreateDepositData) -> DcaDeposit: +# ============================================================================= +# Deposits +# ============================================================================= + + +async def create_deposit( + creator_user_id: str, data: CreateDepositData +) -> DcaDeposit: deposit_id = urlsafe_short_hash() await db.execute( """ - INSERT INTO satoshimachine.dca_deposits - (id, client_id, amount, currency, status, notes, created_at) - VALUES (:id, :client_id, :amount, :currency, :status, :notes, :created_at) + INSERT INTO satoshimachine.dca_deposits + (id, client_id, machine_id, creator_user_id, amount, currency, + status, notes, created_at) + VALUES (:id, :client_id, :machine_id, :creator_user_id, :amount, + :currency, :status, :notes, :created_at) """, { "id": deposit_id, "client_id": data.client_id, + "machine_id": data.machine_id, + "creator_user_id": creator_user_id, "amount": data.amount, "currency": data.currency, "status": "pending", "notes": data.notes, - "created_at": datetime.now() - } + "created_at": datetime.now(), + }, ) - return await get_deposit(deposit_id) + deposit = await get_deposit(deposit_id) + assert deposit is not None + return deposit async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]: @@ -118,52 +332,65 @@ async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]: ) -async def get_deposits_by_client(client_id: str) -> List[DcaDeposit]: +async def get_deposits_for_client(client_id: str) -> List[DcaDeposit]: return await db.fetchall( - "SELECT * FROM satoshimachine.dca_deposits WHERE client_id = :client_id ORDER BY created_at DESC", + """ + SELECT * FROM satoshimachine.dca_deposits + WHERE client_id = :client_id + ORDER BY created_at DESC + """, {"client_id": client_id}, DcaDeposit, ) -async def get_all_deposits() -> List[DcaDeposit]: +async def get_deposits_for_operator(operator_user_id: str) -> List[DcaDeposit]: return await db.fetchall( - "SELECT * FROM satoshimachine.dca_deposits ORDER BY created_at DESC", - model=DcaDeposit, + """ + SELECT d.* + FROM satoshimachine.dca_deposits d + JOIN satoshimachine.dca_machines m ON m.id = d.machine_id + WHERE m.operator_user_id = :uid + ORDER BY d.created_at DESC + """, + {"uid": operator_user_id}, + DcaDeposit, ) -async def update_deposit_status(deposit_id: str, data: UpdateDepositStatusData) -> Optional[DcaDeposit]: - update_data = { - "status": data.status, - "notes": data.notes - } - - if data.status == "confirmed": - update_data["confirmed_at"] = datetime.now() - - set_clause = ", ".join([f"{k} = :{k}" for k, v in update_data.items() if v is not None]) - filtered_data = {k: v for k, v in update_data.items() if v is not None} - filtered_data["id"] = deposit_id - +async def update_deposit( + deposit_id: str, data: UpdateDepositData +) -> Optional[DcaDeposit]: + update_data = {k: v for k, v in data.dict().items() if v is not None} + if not update_data: + return await get_deposit(deposit_id) + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) + update_data["id"] = deposit_id await db.execute( f"UPDATE satoshimachine.dca_deposits SET {set_clause} WHERE id = :id", - filtered_data + update_data, ) return await get_deposit(deposit_id) -async def update_deposit(deposit_id: str, data: UpdateDepositData) -> Optional[DcaDeposit]: - update_data = {k: v for k, v in data.dict().items() if v is not None} - if not update_data: - return await get_deposit(deposit_id) - - set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) - update_data["id"] = deposit_id - +async def update_deposit_status( + deposit_id: str, data: UpdateDepositStatusData +) -> Optional[DcaDeposit]: + payload = { + "id": deposit_id, + "status": data.status, + "notes": data.notes, + "confirmed_at": datetime.now() if data.status == "confirmed" else None, + } await db.execute( - f"UPDATE satoshimachine.dca_deposits SET {set_clause} WHERE id = :id", - update_data + """ + UPDATE satoshimachine.dca_deposits + SET status = :status, + notes = COALESCE(:notes, notes), + confirmed_at = COALESCE(:confirmed_at, confirmed_at) + WHERE id = :id + """, + payload, ) return await get_deposit(deposit_id) @@ -171,36 +398,277 @@ async def update_deposit(deposit_id: str, data: UpdateDepositData) -> Optional[D async def delete_deposit(deposit_id: str) -> None: await db.execute( "DELETE FROM satoshimachine.dca_deposits WHERE id = :id", - {"id": deposit_id} + {"id": deposit_id}, ) -# DCA Payment CRUD Operations +# ============================================================================= +# Settlements (bitSpire kind-21000 events) +# ============================================================================= + + +async def create_settlement_idempotent( + data: CreateDcaSettlementData, +) -> Optional[DcaSettlement]: + """Insert a settlement keyed by bitspire_event_id. Returns the inserted row + on first sight; returns the existing row if the event_id was already seen + (subscription replay, relay double-delivery). The UNIQUE constraint on + bitspire_event_id is the source of truth.""" + existing = await get_settlement_by_event_id(data.bitspire_event_id) + if existing is not None: + return existing + settlement_id = urlsafe_short_hash() + await db.execute( + """ + INSERT INTO satoshimachine.dca_settlements + (id, machine_id, bitspire_event_id, bitspire_txid, payment_hash, + gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats, + commission_sats, platform_fee_sats, operator_fee_sats, + used_fallback_split, tx_type, bills_json, cassettes_json, + status, created_at) + VALUES (:id, :machine_id, :bitspire_event_id, :bitspire_txid, + :payment_hash, :gross_sats, :fiat_amount, :fiat_code, + :exchange_rate, :net_sats, :commission_sats, + :platform_fee_sats, :operator_fee_sats, :used_fallback_split, + :tx_type, :bills_json, :cassettes_json, :status, :created_at) + """, + { + "id": settlement_id, + "machine_id": data.machine_id, + "bitspire_event_id": data.bitspire_event_id, + "bitspire_txid": data.bitspire_txid, + "payment_hash": data.payment_hash, + "gross_sats": data.gross_sats, + "fiat_amount": data.fiat_amount, + "fiat_code": data.fiat_code, + "exchange_rate": data.exchange_rate, + "net_sats": data.net_sats, + "commission_sats": data.commission_sats, + "platform_fee_sats": data.platform_fee_sats, + "operator_fee_sats": data.operator_fee_sats, + "used_fallback_split": data.used_fallback_split, + "tx_type": data.tx_type, + "bills_json": data.bills_json, + "cassettes_json": data.cassettes_json, + "status": "pending", + "created_at": datetime.now(), + }, + ) + return await get_settlement(settlement_id) + + +async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]: + return await db.fetchone( + "SELECT * FROM satoshimachine.dca_settlements WHERE id = :id", + {"id": settlement_id}, + DcaSettlement, + ) + + +async def get_settlement_by_event_id( + bitspire_event_id: str, +) -> Optional[DcaSettlement]: + return await db.fetchone( + """ + SELECT * FROM satoshimachine.dca_settlements + WHERE bitspire_event_id = :eid + """, + {"eid": bitspire_event_id}, + DcaSettlement, + ) + + +async def get_settlements_for_machine( + machine_id: str, limit: int = 100 +) -> List[DcaSettlement]: + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_settlements + WHERE machine_id = :machine_id + ORDER BY created_at DESC + LIMIT :lim + """, + {"machine_id": machine_id, "lim": limit}, + DcaSettlement, + ) + + +async def get_settlements_for_operator( + operator_user_id: str, limit: int = 200 +) -> List[DcaSettlement]: + return await db.fetchall( + """ + SELECT s.* + FROM satoshimachine.dca_settlements s + JOIN satoshimachine.dca_machines m ON m.id = s.machine_id + WHERE m.operator_user_id = :uid + ORDER BY s.created_at DESC + LIMIT :lim + """, + {"uid": operator_user_id, "lim": limit}, + DcaSettlement, + ) + + +async def mark_settlement_status( + settlement_id: str, + status: str, + error_message: Optional[str] = None, +) -> Optional[DcaSettlement]: + """Status: 'pending' | 'processed' | 'partial' | 'refunded' | 'errored'.""" + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = :status, + error_message = :err, + processed_at = CASE + WHEN :status IN ('processed', 'partial', 'refunded') + THEN :now ELSE processed_at + END + WHERE id = :id + """, + { + "id": settlement_id, + "status": status, + "err": error_message, + "now": datetime.now(), + }, + ) + return await get_settlement(settlement_id) + + +# ============================================================================= +# Commission splits — operator's remainder-distribution rules. +# ============================================================================= + + +async def get_commission_splits( + operator_user_id: str, machine_id: Optional[str] = None +) -> List[CommissionSplit]: + """Returns the rule set for the given scope. + + Precedence (caller's responsibility): try per-machine override first; + if empty, fall back to operator default (machine_id IS NULL). + """ + if machine_id is None: + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_commission_splits + WHERE operator_user_id = :uid AND machine_id IS NULL + ORDER BY sort_order ASC + """, + {"uid": operator_user_id}, + CommissionSplit, + ) + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_commission_splits + WHERE operator_user_id = :uid AND machine_id = :mid + ORDER BY sort_order ASC + """, + {"uid": operator_user_id, "mid": machine_id}, + CommissionSplit, + ) + + +async def get_effective_commission_splits( + operator_user_id: str, machine_id: str +) -> List[CommissionSplit]: + """Per-machine override if set, otherwise operator's default ruleset.""" + overrides = await get_commission_splits(operator_user_id, machine_id) + if overrides: + return overrides + return await get_commission_splits(operator_user_id, None) + + +async def replace_commission_splits( + operator_user_id: str, + machine_id: Optional[str], + legs: List[CommissionSplitLeg], +) -> List[CommissionSplit]: + """Atomic replace for the (operator, machine) scope. Caller should have + already validated legs sum to 1.0 via the Pydantic model.""" + if machine_id is None: + await db.execute( + """ + DELETE FROM satoshimachine.dca_commission_splits + WHERE operator_user_id = :uid AND machine_id IS NULL + """, + {"uid": operator_user_id}, + ) + else: + await db.execute( + """ + DELETE FROM satoshimachine.dca_commission_splits + WHERE operator_user_id = :uid AND machine_id = :mid + """, + {"uid": operator_user_id, "mid": machine_id}, + ) + now = datetime.now() + for leg in legs: + await db.execute( + """ + INSERT INTO satoshimachine.dca_commission_splits + (id, machine_id, operator_user_id, wallet_id, label, pct, + sort_order, created_at) + VALUES (:id, :machine_id, :uid, :wallet_id, :label, :pct, + :sort_order, :created_at) + """, + { + "id": urlsafe_short_hash(), + "machine_id": machine_id, + "uid": operator_user_id, + "wallet_id": leg.wallet_id, + "label": leg.label, + "pct": leg.pct, + "sort_order": leg.sort_order, + "created_at": now, + }, + ) + return await get_commission_splits(operator_user_id, machine_id) + + +# ============================================================================= +# Payments — distribution legs. +# ============================================================================= + + async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment: payment_id = urlsafe_short_hash() await db.execute( """ - INSERT INTO satoshimachine.dca_payments - (id, client_id, amount_sats, amount_fiat, exchange_rate, transaction_type, - lamassu_transaction_id, payment_hash, status, created_at, transaction_time) - VALUES (:id, :client_id, :amount_sats, :amount_fiat, :exchange_rate, :transaction_type, - :lamassu_transaction_id, :payment_hash, :status, :created_at, :transaction_time) + INSERT INTO satoshimachine.dca_payments + (id, settlement_id, client_id, machine_id, operator_user_id, + leg_type, destination_wallet_id, destination_ln_address, + amount_sats, amount_fiat, exchange_rate, transaction_time, + external_payment_hash, status, created_at) + VALUES (:id, :settlement_id, :client_id, :machine_id, + :operator_user_id, :leg_type, :destination_wallet_id, + :destination_ln_address, :amount_sats, :amount_fiat, + :exchange_rate, :transaction_time, :external_payment_hash, + :status, :created_at) """, { "id": payment_id, + "settlement_id": data.settlement_id, "client_id": data.client_id, + "machine_id": data.machine_id, + "operator_user_id": data.operator_user_id, + "leg_type": data.leg_type, + "destination_wallet_id": data.destination_wallet_id, + "destination_ln_address": data.destination_ln_address, "amount_sats": data.amount_sats, "amount_fiat": data.amount_fiat, "exchange_rate": data.exchange_rate, - "transaction_type": data.transaction_type, - "lamassu_transaction_id": data.lamassu_transaction_id, - "payment_hash": data.payment_hash, + "transaction_time": data.transaction_time, + "external_payment_hash": data.external_payment_hash, "status": "pending", "created_at": datetime.now(), - "transaction_time": data.transaction_time - } + }, ) - return await get_dca_payment(payment_id) + payment = await get_dca_payment(payment_id) + assert payment is not None + return payment async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]: @@ -211,327 +679,253 @@ async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]: ) -async def get_payments_by_client(client_id: str) -> List[DcaPayment]: +async def get_payments_for_settlement(settlement_id: str) -> List[DcaPayment]: return await db.fetchall( - "SELECT * FROM satoshimachine.dca_payments WHERE client_id = :client_id ORDER BY created_at DESC", - {"client_id": client_id}, + """ + SELECT * FROM satoshimachine.dca_payments + WHERE settlement_id = :sid + ORDER BY created_at ASC + """, + {"sid": settlement_id}, DcaPayment, ) -async def get_all_payments() -> List[DcaPayment]: +async def get_payments_for_client(client_id: str) -> List[DcaPayment]: return await db.fetchall( - "SELECT * FROM satoshimachine.dca_payments ORDER BY created_at DESC", - model=DcaPayment, + """ + SELECT * FROM satoshimachine.dca_payments + WHERE client_id = :cid + ORDER BY created_at DESC + """, + {"cid": client_id}, + DcaPayment, ) -async def update_dca_payment_status(payment_id: str, status: str) -> None: - """Update the status of a DCA payment""" +async def get_payments_for_operator( + operator_user_id: str, leg_type: Optional[str] = None, limit: int = 200 +) -> List[DcaPayment]: + if leg_type is None: + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_payments + WHERE operator_user_id = :uid + ORDER BY created_at DESC + LIMIT :lim + """, + {"uid": operator_user_id, "lim": limit}, + DcaPayment, + ) + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_payments + WHERE operator_user_id = :uid AND leg_type = :leg + ORDER BY created_at DESC + LIMIT :lim + """, + {"uid": operator_user_id, "leg": leg_type, "lim": limit}, + DcaPayment, + ) + + +async def update_payment_status( + payment_id: str, + status: str, + external_payment_hash: Optional[str] = None, + error_message: Optional[str] = None, +) -> Optional[DcaPayment]: await db.execute( - "UPDATE satoshimachine.dca_payments SET status = :status WHERE id = :id", - {"status": status, "id": payment_id} - ) - - -async def get_payments_by_lamassu_transaction(lamassu_transaction_id: str) -> List[DcaPayment]: - return await db.fetchall( - "SELECT * FROM satoshimachine.dca_payments WHERE lamassu_transaction_id = :transaction_id", - {"transaction_id": lamassu_transaction_id}, - DcaPayment, - ) - - -# Balance and Summary Operations -async def get_client_balance_summary(client_id: str, as_of_time: Optional[datetime] = None) -> ClientBalanceSummary: - """Get client balance summary, optionally as of a specific point in time""" - - # Build time filter for temporal accuracy - time_filter = "" - params = {"client_id": client_id} - - if as_of_time is not None: - time_filter = "AND confirmed_at <= :as_of_time" - params["as_of_time"] = as_of_time - - # Get total confirmed deposits (only those confirmed before the cutoff time) - total_deposits_result = await db.fetchone( - f""" - SELECT COALESCE(SUM(amount), 0) as total, currency - FROM satoshimachine.dca_deposits - WHERE client_id = :client_id AND status = 'confirmed' {time_filter} - GROUP BY currency + """ + UPDATE satoshimachine.dca_payments + SET status = :status, + external_payment_hash = COALESCE(:hash, external_payment_hash), + error_message = :err + WHERE id = :id """, - params + { + "id": payment_id, + "status": status, + "hash": external_payment_hash, + "err": error_message, + }, ) - - # Get total payments made (only those with ATM transaction time before the cutoff) - # Use transaction_time instead of created_at for temporal accuracy - payment_time_filter = "" - if as_of_time is not None: - payment_time_filter = "AND transaction_time <= :as_of_time" - - total_payments_result = await db.fetchone( - f""" - SELECT COALESCE(SUM(amount_fiat), 0) as total - FROM satoshimachine.dca_payments - WHERE client_id = :client_id AND status = 'confirmed' {payment_time_filter} + return await get_dca_payment(payment_id) + + +# ============================================================================= +# Balance summaries +# ============================================================================= + + +async def get_client_balance_summary( + client_id: str, +) -> Optional[ClientBalanceSummary]: + """Per-client (and per-machine, since clients are per-machine in v2) summary. + + DCA legs only — settlement/autoforward/super_fee/operator_split legs are + not credited against an LP's balance. + """ + client = await get_dca_client(client_id) + if client is None: + return None + deposits_row = await db.fetchone( + """ + SELECT COALESCE(SUM(amount), 0) AS total + FROM satoshimachine.dca_deposits + WHERE client_id = :cid AND status = 'confirmed' """, - params + {"cid": client_id}, ) - - total_deposits = total_deposits_result["total"] if total_deposits_result else 0 - total_payments = total_payments_result["total"] if total_payments_result else 0 - currency = total_deposits_result["currency"] if total_deposits_result else "GTQ" - - # Log temporal filtering if as_of_time was used - if as_of_time is not None: - from loguru import logger - # Verify timezone consistency for temporal filtering - tz_info = "UTC" if as_of_time.tzinfo == timezone.utc else f"TZ: {as_of_time.tzinfo}" - logger.info(f"Client {client_id[:8]}... balance as of {as_of_time} ({tz_info}): deposits.confirmed_at <= cutoff, payments.transaction_time <= cutoff → {total_deposits - total_payments:.2f} GTQ remaining") - + payments_row = await db.fetchone( + """ + SELECT COALESCE(SUM(amount_fiat), 0) AS total + FROM satoshimachine.dca_payments + WHERE client_id = :cid AND leg_type = 'dca' AND status = 'completed' + """, + {"cid": client_id}, + ) + total_deposits = float(deposits_row["total"]) if deposits_row else 0.0 + total_payments = float(payments_row["total"]) if payments_row else 0.0 + # fiat code: take it from the machine (clients inherit their machine's fiat) + machine = await get_machine(client.machine_id) + currency = machine.fiat_code if machine else "GTQ" return ClientBalanceSummary( client_id=client_id, - total_deposits=total_deposits, - total_payments=total_payments, - remaining_balance=total_deposits - total_payments, - currency=currency + machine_id=client.machine_id, + total_deposits=round(total_deposits, 2), + total_payments=round(total_payments, 2), + remaining_balance=round(total_deposits - total_payments, 2), + currency=currency, ) -async def get_flow_mode_clients() -> List[DcaClient]: - return await db.fetchall( - "SELECT * FROM satoshimachine.dca_clients WHERE dca_mode = 'flow' AND status = 'active'", - model=DcaClient, - ) +# ============================================================================= +# Telemetry — sparse beacon (kind-30078) and fleet snapshot (kind-30079) state. +# ============================================================================= -async def get_fixed_mode_clients() -> List[DcaClient]: - return await db.fetchall( - "SELECT * FROM satoshimachine.dca_clients WHERE dca_mode = 'fixed' AND status = 'active'", - model=DcaClient, - ) - - -# Lamassu Configuration CRUD Operations -async def create_lamassu_config(data: CreateLamassuConfigData) -> LamassuConfig: - config_id = urlsafe_short_hash() - - # Deactivate any existing configs first (only one active config allowed) - await db.execute( - "UPDATE satoshimachine.lamassu_config SET is_active = false, updated_at = :updated_at", - {"updated_at": datetime.now()} - ) - - await db.execute( - """ - INSERT INTO satoshimachine.lamassu_config - (id, host, port, database_name, username, password, source_wallet_id, commission_wallet_id, is_active, created_at, updated_at, - use_ssh_tunnel, ssh_host, ssh_port, ssh_username, ssh_password, ssh_private_key, max_daily_limit_gtq) - VALUES (:id, :host, :port, :database_name, :username, :password, :source_wallet_id, :commission_wallet_id, :is_active, :created_at, :updated_at, - :use_ssh_tunnel, :ssh_host, :ssh_port, :ssh_username, :ssh_password, :ssh_private_key, :max_daily_limit_gtq) - """, - { - "id": config_id, - "host": data.host, - "port": data.port, - "database_name": data.database_name, - "username": data.username, - "password": data.password, - "source_wallet_id": data.source_wallet_id, - "commission_wallet_id": data.commission_wallet_id, - "is_active": True, - "created_at": datetime.now(), - "updated_at": datetime.now(), - "use_ssh_tunnel": data.use_ssh_tunnel, - "ssh_host": data.ssh_host, - "ssh_port": data.ssh_port, - "ssh_username": data.ssh_username, - "ssh_password": data.ssh_password, - "ssh_private_key": data.ssh_private_key, - "max_daily_limit_gtq": data.max_daily_limit_gtq - } - ) - return await get_lamassu_config(config_id) - - -async def get_lamassu_config(config_id: str) -> Optional[LamassuConfig]: +async def get_telemetry(machine_id: str) -> Optional[TelemetrySnapshot]: return await db.fetchone( - "SELECT * FROM satoshimachine.lamassu_config WHERE id = :id", - {"id": config_id}, - LamassuConfig, + "SELECT * FROM satoshimachine.dca_telemetry WHERE machine_id = :mid", + {"mid": machine_id}, + TelemetrySnapshot, ) -async def get_active_lamassu_config() -> Optional[LamassuConfig]: - return await db.fetchone( - "SELECT * FROM satoshimachine.lamassu_config WHERE is_active = true ORDER BY created_at DESC LIMIT 1", - model=LamassuConfig, - ) +async def upsert_beacon_snapshot( + machine_id: str, + *, + cash_in: Optional[bool] = None, + cash_out: Optional[bool] = None, + cash_level: Optional[str] = None, + fiat: Optional[str] = None, + model: Optional[str] = None, + name: Optional[str] = None, + location: Optional[str] = None, + geo: Optional[str] = None, + fees_json: Optional[str] = None, + limits_json: Optional[str] = None, + denominations_json: Optional[str] = None, + version: Optional[str] = None, +) -> Optional[TelemetrySnapshot]: + """Upsert kind-30078 beacon fields. All fields are nullable because today's + upstream payload only carries cash_in/cash_out/cash_level/fiat/model (see + lamassu-next#43 — the enrichment is not yet shipped).""" + existing = await get_telemetry(machine_id) + now = datetime.now() + if existing is None: + await db.execute( + """ + INSERT INTO satoshimachine.dca_telemetry + (machine_id, beacon_cash_in, beacon_cash_out, beacon_cash_level, + beacon_fiat, beacon_model, beacon_name, beacon_location, + beacon_geo, beacon_fees_json, beacon_limits_json, + beacon_denominations_json, beacon_version, beacon_received_at) + VALUES (:mid, :cash_in, :cash_out, :cash_level, :fiat, :model, + :name, :location, :geo, :fees, :limits, :denoms, + :version, :now) + """, + { + "mid": machine_id, + "cash_in": cash_in, + "cash_out": cash_out, + "cash_level": cash_level, + "fiat": fiat, + "model": model, + "name": name, + "location": location, + "geo": geo, + "fees": fees_json, + "limits": limits_json, + "denoms": denominations_json, + "version": version, + "now": now, + }, + ) + else: + await db.execute( + """ + UPDATE satoshimachine.dca_telemetry SET + beacon_cash_in = COALESCE(:cash_in, beacon_cash_in), + beacon_cash_out = COALESCE(:cash_out, beacon_cash_out), + beacon_cash_level = COALESCE(:cash_level, beacon_cash_level), + beacon_fiat = COALESCE(:fiat, beacon_fiat), + beacon_model = COALESCE(:model, beacon_model), + beacon_name = COALESCE(:name, beacon_name), + beacon_location = COALESCE(:location, beacon_location), + beacon_geo = COALESCE(:geo, beacon_geo), + beacon_fees_json = COALESCE(:fees, beacon_fees_json), + beacon_limits_json = COALESCE(:limits, beacon_limits_json), + beacon_denominations_json = + COALESCE(:denoms, beacon_denominations_json), + beacon_version = COALESCE(:version, beacon_version), + beacon_received_at = :now + WHERE machine_id = :mid + """, + { + "mid": machine_id, + "cash_in": cash_in, + "cash_out": cash_out, + "cash_level": cash_level, + "fiat": fiat, + "model": model, + "name": name, + "location": location, + "geo": geo, + "fees": fees_json, + "limits": limits_json, + "denoms": denominations_json, + "version": version, + "now": now, + }, + ) + return await get_telemetry(machine_id) -async def get_all_lamassu_configs() -> List[LamassuConfig]: - return await db.fetchall( - "SELECT * FROM satoshimachine.lamassu_config ORDER BY created_at DESC", - model=LamassuConfig, - ) - - -async def update_lamassu_config(config_id: str, data: UpdateLamassuConfigData) -> Optional[LamassuConfig]: - update_data = {k: v for k, v in data.dict().items() if v is not None} - if not update_data: - return await get_lamassu_config(config_id) - - update_data["updated_at"] = datetime.now() - set_clause = ", ".join([f"{k} = :{k}" for k in update_data.keys()]) - update_data["id"] = config_id - - await db.execute( - f"UPDATE satoshimachine.lamassu_config SET {set_clause} WHERE id = :id", - update_data - ) - return await get_lamassu_config(config_id) - - -async def update_config_test_result(config_id: str, success: bool) -> None: - utc_now = datetime.now(timezone.utc) - await db.execute( - """ - UPDATE satoshimachine.lamassu_config - SET test_connection_last = :test_time, test_connection_success = :success, updated_at = :updated_at - WHERE id = :id - """, - { - "id": config_id, - "test_time": utc_now, - "success": success, - "updated_at": utc_now - } - ) - - -async def delete_lamassu_config(config_id: str) -> None: - await db.execute( - "DELETE FROM satoshimachine.lamassu_config WHERE id = :id", - {"id": config_id} - ) - - -async def update_poll_start_time(config_id: str) -> None: - """Update the last poll start time""" - utc_now = datetime.now(timezone.utc) - await db.execute( - """ - UPDATE satoshimachine.lamassu_config - SET last_poll_time = :poll_time, updated_at = :updated_at - WHERE id = :id - """, - { - "id": config_id, - "poll_time": utc_now, - "updated_at": utc_now - } - ) - - -async def update_poll_success_time(config_id: str) -> None: - """Update the last successful poll time""" - utc_now = datetime.now(timezone.utc) - await db.execute( - """ - UPDATE satoshimachine.lamassu_config - SET last_successful_poll = :poll_time, updated_at = :updated_at - WHERE id = :id - """, - { - "id": config_id, - "poll_time": utc_now, - "updated_at": utc_now - } - ) - - -# Lamassu Transaction Storage CRUD Operations -async def create_lamassu_transaction(data: CreateLamassuTransactionData) -> StoredLamassuTransaction: - """Store a processed Lamassu transaction""" - transaction_id = urlsafe_short_hash() - await db.execute( - """ - INSERT INTO satoshimachine.lamassu_transactions - (id, lamassu_transaction_id, fiat_amount, crypto_amount, commission_percentage, - discount, effective_commission, commission_amount_sats, base_amount_sats, - exchange_rate, crypto_code, fiat_code, device_id, transaction_time, processed_at, - clients_count, distributions_total_sats) - VALUES (:id, :lamassu_transaction_id, :fiat_amount, :crypto_amount, :commission_percentage, - :discount, :effective_commission, :commission_amount_sats, :base_amount_sats, - :exchange_rate, :crypto_code, :fiat_code, :device_id, :transaction_time, :processed_at, - :clients_count, :distributions_total_sats) - """, - { - "id": transaction_id, - "lamassu_transaction_id": data.lamassu_transaction_id, - "fiat_amount": data.fiat_amount, - "crypto_amount": data.crypto_amount, - "commission_percentage": data.commission_percentage, - "discount": data.discount, - "effective_commission": data.effective_commission, - "commission_amount_sats": data.commission_amount_sats, - "base_amount_sats": data.base_amount_sats, - "exchange_rate": data.exchange_rate, - "crypto_code": data.crypto_code, - "fiat_code": data.fiat_code, - "device_id": data.device_id, - "transaction_time": data.transaction_time, - "processed_at": datetime.now(), - "clients_count": 0, # Will be updated after distributions - "distributions_total_sats": 0 # Will be updated after distributions - } - ) - return await get_lamassu_transaction(transaction_id) - - -async def get_lamassu_transaction(transaction_id: str) -> Optional[StoredLamassuTransaction]: - """Get a stored Lamassu transaction by ID""" - return await db.fetchone( - "SELECT * FROM satoshimachine.lamassu_transactions WHERE id = :id", - {"id": transaction_id}, - StoredLamassuTransaction, - ) - - -async def get_lamassu_transaction_by_lamassu_id(lamassu_transaction_id: str) -> Optional[StoredLamassuTransaction]: - """Get a stored Lamassu transaction by Lamassu transaction ID""" - return await db.fetchone( - "SELECT * FROM satoshimachine.lamassu_transactions WHERE lamassu_transaction_id = :lamassu_id", - {"lamassu_id": lamassu_transaction_id}, - StoredLamassuTransaction, - ) - - -async def get_all_lamassu_transactions() -> List[StoredLamassuTransaction]: - """Get all stored Lamassu transactions""" - return await db.fetchall( - "SELECT * FROM satoshimachine.lamassu_transactions ORDER BY transaction_time DESC", - model=StoredLamassuTransaction, - ) - - -async def update_lamassu_transaction_distribution_stats( - transaction_id: str, - clients_count: int, - distributions_total_sats: int -) -> None: - """Update distribution statistics for a Lamassu transaction""" - await db.execute( - """ - UPDATE satoshimachine.lamassu_transactions - SET clients_count = :clients_count, distributions_total_sats = :distributions_total_sats - WHERE id = :id - """, - { - "clients_count": clients_count, - "distributions_total_sats": distributions_total_sats, - "id": transaction_id - } - ) +async def upsert_fleet_snapshot( + machine_id: str, telemetry_json: str +) -> Optional[TelemetrySnapshot]: + """Upsert kind-30079 operator-only telemetry. Awaits lamassu-next#42 to + produce a real schema; we store the raw JSON blob until then.""" + existing = await get_telemetry(machine_id) + now = datetime.now() + if existing is None: + await db.execute( + """ + INSERT INTO satoshimachine.dca_telemetry + (machine_id, telemetry_json, telemetry_received_at) + VALUES (:mid, :json, :now) + """, + {"mid": machine_id, "json": telemetry_json, "now": now}, + ) + else: + await db.execute( + """ + UPDATE satoshimachine.dca_telemetry + SET telemetry_json = :json, telemetry_received_at = :now + WHERE machine_id = :mid + """, + {"mid": machine_id, "json": telemetry_json, "now": now}, + ) + return await get_telemetry(machine_id) diff --git a/tasks.py b/tasks.py index 0ed9efe..2efa8b0 100644 --- a/tasks.py +++ b/tasks.py @@ -1,53 +1,32 @@ -import asyncio -from datetime import datetime +# Satoshi Machine v2 — task placeholders. +# +# The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent. +# They will be replaced in P1 (Nostr subscription manager: subscribes via +# lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078 +# beacons + kind-30079 telemetry per registered machine, with auto-reconnect). +# +# These no-op stubs keep __init__.py importable in the interim so the +# extension can be activated even before P1 lands. + +import asyncio -from lnbits.core.models import Payment -from lnbits.core.services import websocket_updater -from lnbits.tasks import register_invoice_listener from loguru import logger -from .transaction_processor import poll_lamassu_transactions -####################################### -########## RUN YOUR TASKS HERE ######## -####################################### - -# The usual task is to listen to invoices related to this extension - - -async def wait_for_paid_invoices(): - """Invoice listener for DCA-related payments""" - invoice_queue = asyncio.Queue() - register_invoice_listener(invoice_queue, "ext_satmachineadmin") +async def wait_for_paid_invoices() -> None: + """No-op placeholder pending P1 Nostr subscription manager.""" + logger.debug( + "satmachineadmin v2: invoice listener stub running. " + "Real Nostr-transport subscription pending P1." + ) + # Sleep forever; the task system expects a long-lived coroutine. while True: - payment = await invoice_queue.get() - await on_invoice_paid(payment) + await asyncio.sleep(3600) -async def hourly_transaction_polling(): - """Background task that polls Lamassu database every hour for new transactions""" - logger.info("Starting hourly Lamassu transaction polling task") - +async def hourly_transaction_polling() -> None: + """No-op placeholder. The v1 Lamassu PostgreSQL poller is gone — bitSpire + settlements arrive push-based via Nostr kind-21000 in v2.""" + logger.debug("satmachineadmin v2: legacy polling stub (no-op).") while True: - try: - logger.info(f"Running Lamassu transaction poll at {datetime.now()}") - await poll_lamassu_transactions() - logger.info("Completed Lamassu transaction poll, sleeping for 1 hour") - - # Sleep for 1 hour (3600 seconds) - await asyncio.sleep(3600) - - except Exception as e: - logger.error(f"Error in hourly polling task: {e}") - # Sleep for 5 minutes before retrying on error - await asyncio.sleep(300) - - -async def on_invoice_paid(payment: Payment) -> None: - """Handle DCA-related invoice payments""" - # DCA payments are handled internally by the transaction processor - # This function can be extended if needed for additional payment processing - if payment.extra.get("tag") in ["dca_distribution", "dca_commission"]: - logger.info(f"DCA payment processed: {payment.checking_id} - {payment.amount} sats") - # Could add websocket notifications here if needed - pass + await asyncio.sleep(3600) diff --git a/views.py b/views.py index 6532836..1061e9f 100644 --- a/views.py +++ b/views.py @@ -1,8 +1,10 @@ -# Description: DCA Admin page endpoints. +# Satoshi Machine v2 — page route. +# +# v2 is operator-installable (any LNbits user, not super-only). The super-only +# check in v1's index() is gone. Super-only controls (platform fee config) +# move to a dedicated API endpoint protected by check_super_user in P1. -from http import HTTPStatus - -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, Depends, Request from fastapi.responses import HTMLResponse from lnbits.core.models import User from lnbits.decorators import check_user_exists @@ -15,13 +17,9 @@ def satmachineadmin_renderer(): return template_renderer(["satmachineadmin/templates"]) -# DCA Admin page - Requires superuser access @satmachineadmin_generic_router.get("/", response_class=HTMLResponse) async def index(req: Request, user: User = Depends(check_user_exists)): - if not user.super_user: - raise HTTPException( - HTTPStatus.FORBIDDEN, "User not authorized. No super user privileges." - ) return satmachineadmin_renderer().TemplateResponse( - "satmachineadmin/index.html", {"request": req, "user": user.json()} + "satmachineadmin/index.html", + {"request": req, "user": user.json()}, ) diff --git a/views_api.py b/views_api.py index eb38da8..fd2d430 100644 --- a/views_api.py +++ b/views_api.py @@ -1,570 +1,28 @@ -# Description: This file contains the extensions API endpoints. +# Satoshi Machine v2 — API placeholder. +# +# The v1 super-only Lamassu endpoints have been removed. The v2 operator- +# scoped surface (machines / clients / deposits / settlements / commission +# splits / partial-tx / balance-settle / super platform-fee) lands in P1+. +# See plan section "Critical files to modify". +# +# This stub keeps __init__.py importable and surfaces a clear 503 on every +# v1 route so existing clients get a precise error instead of a silent 404. from http import HTTPStatus -from typing import Optional -from fastapi import APIRouter, Depends, Request -from lnbits.core.crud import get_user -from lnbits.core.models import User, WalletTypeInfo -from lnbits.core.services import create_invoice -from lnbits.decorators import check_super_user -from starlette.exceptions import HTTPException - -from .crud import ( - # DCA CRUD operations - get_dca_clients, - get_dca_client, - update_dca_client, - delete_dca_client, - create_deposit, - get_all_deposits, - get_deposit, - update_deposit, - update_deposit_status, - delete_deposit, - get_client_balance_summary, - # Lamassu config CRUD operations - create_lamassu_config, - get_lamassu_config, - get_active_lamassu_config, - get_all_lamassu_configs, - update_lamassu_config, - update_config_test_result, - delete_lamassu_config, - # Lamassu transaction CRUD operations - get_all_lamassu_transactions, - get_lamassu_transaction, -) -from .models import ( - # DCA models - DcaClient, - UpdateDcaClientData, - CreateDepositData, - DcaDeposit, - UpdateDepositData, - UpdateDepositStatusData, - ClientBalanceSummary, - CreateLamassuConfigData, - LamassuConfig, - UpdateLamassuConfigData, - StoredLamassuTransaction, -) +from fastapi import APIRouter, HTTPException satmachineadmin_api_router = APIRouter() -################################################### -################ DCA API ENDPOINTS ################ -################################################### - -# DCA Client Endpoints - - -@satmachineadmin_api_router.get("/api/v1/dca/clients") -async def api_get_dca_clients( - wallet: WalletTypeInfo = Depends(check_super_user), -) -> list[DcaClient]: - """Get all DCA clients""" - return await get_dca_clients() - - -@satmachineadmin_api_router.get("/api/v1/dca/clients/{client_id}") -async def api_get_dca_client( - client_id: str, - wallet: WalletTypeInfo = Depends(check_super_user), -) -> DcaClient: - """Get a specific DCA client""" - client = await get_dca_client(client_id) - if not client: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="DCA client not found." - ) - return client - - -# Note: Client creation/update/delete will be handled by the DCA client extension -# Admin extension only reads existing clients and manages their deposits - - -@satmachineadmin_api_router.get("/api/v1/dca/clients/{client_id}/balance") -async def api_get_client_balance( - client_id: str, - wallet: WalletTypeInfo = Depends(check_super_user), -) -> ClientBalanceSummary: - """Get client balance summary""" - client = await get_dca_client(client_id) - if not client: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="DCA client not found." - ) - - return await get_client_balance_summary(client_id) - - -# DCA Deposit Endpoints - - -@satmachineadmin_api_router.get("/api/v1/dca/deposits") -async def api_get_deposits( - wallet: WalletTypeInfo = Depends(check_super_user), -) -> list[DcaDeposit]: - """Get all deposits""" - return await get_all_deposits() - - -@satmachineadmin_api_router.get("/api/v1/dca/deposits/{deposit_id}") -async def api_get_deposit( - deposit_id: str, - wallet: WalletTypeInfo = Depends(check_super_user), -) -> DcaDeposit: - """Get a specific deposit""" - deposit = await get_deposit(deposit_id) - if not deposit: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found." - ) - return deposit - - -@satmachineadmin_api_router.post("/api/v1/dca/deposits", status_code=HTTPStatus.CREATED) -async def api_create_deposit( - data: CreateDepositData, - user: User = Depends(check_super_user), -) -> DcaDeposit: - """Create a new deposit""" - # Verify client exists - client = await get_dca_client(data.client_id) - if not client: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="DCA client not found." - ) - - return await create_deposit(data) - - -@satmachineadmin_api_router.put("/api/v1/dca/deposits/{deposit_id}/status") -async def api_update_deposit_status( - deposit_id: str, - data: UpdateDepositStatusData, - user: User = Depends(check_super_user), -) -> DcaDeposit: - """Update deposit status (e.g., confirm deposit)""" - deposit = await get_deposit(deposit_id) - if not deposit: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found." - ) - - updated_deposit = await update_deposit_status(deposit_id, data) - if not updated_deposit: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail="Failed to update deposit.", - ) - return updated_deposit - - -@satmachineadmin_api_router.put("/api/v1/dca/deposits/{deposit_id}") -async def api_update_deposit( - deposit_id: str, - data: UpdateDepositData, - user: User = Depends(check_super_user), -) -> DcaDeposit: - """Update deposit fields (amount, currency, notes). Only pending deposits can be edited.""" - deposit = await get_deposit(deposit_id) - if not deposit: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found." - ) - - if deposit.status != "pending": - raise HTTPException( - status_code=HTTPStatus.BAD_REQUEST, - detail="Only pending deposits can be edited.", - ) - - updated_deposit = await update_deposit(deposit_id, data) - if not updated_deposit: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail="Failed to update deposit.", - ) - return updated_deposit - - -@satmachineadmin_api_router.delete("/api/v1/dca/deposits/{deposit_id}") -async def api_delete_deposit( - deposit_id: str, - user: User = Depends(check_super_user), -): - """Delete a deposit. Only pending deposits (not yet inserted into the machine) can be deleted.""" - deposit = await get_deposit(deposit_id) - if not deposit: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found." - ) - - if deposit.status != "pending": - raise HTTPException( - status_code=HTTPStatus.BAD_REQUEST, - detail="Only pending deposits can be deleted. Confirmed deposits have already been inserted into the machine.", - ) - - await delete_deposit(deposit_id) - return {"message": "Deposit deleted successfully"} - - -# Transaction Polling Endpoints - - -@satmachineadmin_api_router.post("/api/v1/dca/test-connection") -async def api_test_database_connection( - user: User = Depends(check_super_user), -): - """Test connection to Lamassu database with detailed reporting""" - try: - from .transaction_processor import transaction_processor - - # Use the detailed test method - result = await transaction_processor.test_connection_detailed() - return result - - except Exception as e: - return { - "success": False, - "message": f"Test connection error: {str(e)}", - "steps": [f"❌ Unexpected error: {str(e)}"], - "ssh_tunnel_used": False, - "ssh_tunnel_success": False, - "database_connection_success": False, - } - - -@satmachineadmin_api_router.post("/api/v1/dca/manual-poll") -async def api_manual_poll( - user: User = Depends(check_super_user), -): - """Manually trigger a poll of the Lamassu database""" - try: - from .transaction_processor import transaction_processor - from .crud import update_poll_start_time, update_poll_success_time - - # Get database configuration - db_config = await transaction_processor.connect_to_lamassu_db() - if not db_config: - raise HTTPException( - status_code=HTTPStatus.SERVICE_UNAVAILABLE, - detail="Could not get Lamassu database configuration", - ) - - config_id = db_config["config_id"] - - # Record manual poll start time - await update_poll_start_time(config_id) - - # Fetch and process transactions via SSH - new_transactions = await transaction_processor.fetch_new_transactions(db_config) - - transactions_processed = 0 - for transaction in new_transactions: - await transaction_processor.process_transaction(transaction) - transactions_processed += 1 - - # Record successful manual poll completion - await update_poll_success_time(config_id) - - return { - "success": True, - "transactions_processed": transactions_processed, - "message": f"Processed {transactions_processed} new transactions since last poll", - } - - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=f"Error during manual poll: {str(e)}", - ) - - -@satmachineadmin_api_router.post("/api/v1/dca/process-transaction/{transaction_id}") -async def api_process_specific_transaction( - transaction_id: str, - user: User = Depends(check_super_user), -): - """ - Manually process a specific Lamassu transaction by ID, bypassing all status filters. - - This endpoint is useful for processing transactions that were manually settled - or had dispense issues but need to be included in DCA distribution. - """ - try: - from .transaction_processor import transaction_processor - from .crud import get_payments_by_lamassu_transaction - - # Get database configuration - db_config = await transaction_processor.connect_to_lamassu_db() - if not db_config: - raise HTTPException( - status_code=HTTPStatus.SERVICE_UNAVAILABLE, - detail="Could not get Lamassu database configuration", - ) - - # Check if transaction was already processed - existing_payments = await get_payments_by_lamassu_transaction(transaction_id) - if existing_payments: - return { - "success": False, - "already_processed": True, - "message": f"Transaction {transaction_id} was already processed with {len(existing_payments)} distributions", - "payment_count": len(existing_payments), - } - - # Fetch the specific transaction from Lamassu (bypassing all filters) - transaction = await transaction_processor.fetch_transaction_by_id(db_config, transaction_id) - - if not transaction: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, - detail=f"Transaction {transaction_id} not found in Lamassu database", - ) - - # Process the transaction through normal DCA flow - await transaction_processor.process_transaction(transaction) - - return { - "success": True, - "message": f"Transaction {transaction_id} processed successfully", - "transaction_details": { - "transaction_id": transaction_id, - "status": transaction.get("status"), - "dispense": transaction.get("dispense"), - "dispense_confirmed": transaction.get("dispense_confirmed"), - "crypto_amount": transaction.get("crypto_amount"), - "fiat_amount": transaction.get("fiat_amount"), - }, - } - - except HTTPException: - raise - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=f"Error processing transaction {transaction_id}: {str(e)}", - ) - - -# COMMENTED OUT FOR PRODUCTION - Test transaction endpoint disabled -# Uncomment only for development/debugging purposes -# -# @satmachineadmin_api_router.post("/api/v1/dca/test-transaction") -# async def api_test_transaction( -# user: User = Depends(check_super_user), -# crypto_atoms: int = 103, -# commission_percentage: float = 0.03, -# discount: float = 0.0, -# ) -> dict: -# """Test transaction processing with simulated Lamassu transaction data""" -# try: -# from .transaction_processor import transaction_processor -# import uuid -# from datetime import datetime, timezone -# -# # Create a mock transaction that mimics Lamassu database structure -# mock_transaction = { -# "transaction_id": str(uuid.uuid4())[:8], # Short ID for testing -# "crypto_amount": crypto_atoms, # Total sats including commission -# "fiat_amount": 100, # Mock fiat amount (100 centavos = 1 GTQ) -# "commission_percentage": commission_percentage, # Already as decimal -# "discount": discount, -# "transaction_time": datetime.now(timezone.utc), -# "crypto_code": "BTC", -# "fiat_code": "GTQ", -# "device_id": "test_device", -# "status": "confirmed", -# } -# -# # Process the mock transaction through the complete DCA flow -# await transaction_processor.process_transaction(mock_transaction) -# -# # Calculate commission for response -# if commission_percentage > 0: -# effective_commission = commission_percentage * (100 - discount) / 100 -# base_crypto_atoms = int(crypto_atoms / (1 + effective_commission)) -# commission_amount_sats = crypto_atoms - base_crypto_atoms -# else: -# base_crypto_atoms = crypto_atoms -# commission_amount_sats = 0 -# -# return { -# "success": True, -# "message": "Test transaction processed successfully", -# "transaction_details": { -# "transaction_id": mock_transaction["transaction_id"], -# "total_amount_sats": crypto_atoms, -# "base_amount_sats": base_crypto_atoms, -# "commission_amount_sats": commission_amount_sats, -# "commission_percentage": commission_percentage -# * 100, # Show as percentage -# "effective_commission": effective_commission * 100 -# if commission_percentage > 0 -# else 0, -# "discount": discount, -# }, -# } -# -# except Exception as e: -# raise HTTPException( -# status_code=HTTPStatus.INTERNAL_SERVER_ERROR, -# detail=f"Error processing test transaction: {str(e)}", -# ) - - -# Lamassu Transaction Endpoints - - -@satmachineadmin_api_router.get("/api/v1/dca/transactions") -async def api_get_lamassu_transactions( - wallet: WalletTypeInfo = Depends(check_super_user), -) -> list[StoredLamassuTransaction]: - """Get all processed Lamassu transactions""" - return await get_all_lamassu_transactions() - - -@satmachineadmin_api_router.get("/api/v1/dca/transactions/{transaction_id}") -async def api_get_lamassu_transaction( - transaction_id: str, - wallet: WalletTypeInfo = Depends(check_super_user), -) -> StoredLamassuTransaction: - """Get a specific Lamassu transaction with details""" - transaction = await get_lamassu_transaction(transaction_id) - if not transaction: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Lamassu transaction not found." - ) - return transaction - - -@satmachineadmin_api_router.get( - "/api/v1/dca/transactions/{transaction_id}/distributions" +@satmachineadmin_api_router.api_route( + "/api/v1/dca/{full_path:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], ) -async def api_get_transaction_distributions( - transaction_id: str, - wallet: WalletTypeInfo = Depends(check_super_user), -) -> list[dict]: - """Get distribution details for a specific Lamassu transaction""" - # Get the stored transaction - transaction = await get_lamassu_transaction(transaction_id) - if not transaction: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Lamassu transaction not found." - ) - - # Get all DCA payments for this Lamassu transaction - from .crud import get_payments_by_lamassu_transaction, get_dca_client - - payments = await get_payments_by_lamassu_transaction( - transaction.lamassu_transaction_id +async def v2_in_progress_stub(full_path: str) -> None: + raise HTTPException( + HTTPStatus.SERVICE_UNAVAILABLE, + f"satmachineadmin v2 API not yet implemented (path: /{full_path}). " + "The v1 Lamassu surface has been removed; per-operator endpoints " + "land in P1. See plan.", ) - - # Enhance payments with client information - distributions = [] - for payment in payments: - client = await get_dca_client(payment.client_id) - distributions.append( - { - "payment_id": payment.id, - "client_id": payment.client_id, - "client_username": client.username if client else None, - "client_user_id": client.user_id if client else None, - "amount_sats": payment.amount_sats, - "amount_fiat": payment.amount_fiat, - "exchange_rate": payment.exchange_rate, - "status": payment.status, - "created_at": payment.created_at, - } - ) - - return distributions - - -# Lamassu Configuration Endpoints - - -@satmachineadmin_api_router.get("/api/v1/dca/config") -async def api_get_lamassu_config( - wallet: WalletTypeInfo = Depends(check_super_user), -) -> Optional[LamassuConfig]: - """Get active Lamassu database configuration""" - return await get_active_lamassu_config() - - -@satmachineadmin_api_router.post("/api/v1/dca/config", status_code=HTTPStatus.CREATED) -async def api_create_lamassu_config( - data: CreateLamassuConfigData, - user: User = Depends(check_super_user), -) -> LamassuConfig: - """Create/update Lamassu database configuration""" - return await create_lamassu_config(data) - - -@satmachineadmin_api_router.put("/api/v1/dca/config/{config_id}") -async def api_update_lamassu_config( - config_id: str, - data: UpdateLamassuConfigData, - user: User = Depends(check_super_user), -) -> LamassuConfig: - """Update Lamassu database configuration""" - config = await get_lamassu_config(config_id) - if not config: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Configuration not found." - ) - - updated_config = await update_lamassu_config(config_id, data) - if not updated_config: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail="Failed to update configuration.", - ) - return updated_config - - -@satmachineadmin_api_router.delete("/api/v1/dca/config/{config_id}") -async def api_delete_lamassu_config( - config_id: str, - user: User = Depends(check_super_user), -): - """Delete Lamassu database configuration""" - config = await get_lamassu_config(config_id) - if not config: - raise HTTPException( - status_code=HTTPStatus.NOT_FOUND, detail="Configuration not found." - ) - - await delete_lamassu_config(config_id) - return {"message": "Configuration deleted successfully"} - - -@satmachineadmin_api_router.get("/api/v1/dca/client-limits") -async def api_get_client_limits(): - """Get client-safe configuration limits (public endpoint - no authentication)""" - try: - config = await get_active_lamassu_config() - if not config: - # Return sensible defaults if no config exists - return { - "max_daily_limit_gtq": 2000, - "currency": "GTQ" - } - - # Return only client-safe configuration fields - return { - "max_daily_limit_gtq": config.max_daily_limit_gtq, - "currency": "GTQ" # Could be made configurable later - } - except Exception: - # Return defaults on any error - return { - "max_daily_limit_gtq": 2000, - "currency": "GTQ" - }