feat(v2): operator-scoped CRUD + stub legacy entry points

Replaces v1's super-only single-config CRUD with the v2 operator-scoped data
layer that matches the m005 schema:

- Machines: create/get/get_by_npub/list_for_operator/update/delete
- Clients: scoped per (machine, user). Adds list_for_operator (across an
  operator's fleet) and list_for_user (LP cross-operator view), plus
  get_flow_mode_clients_for_machine for the distribution algorithm.
- Deposits: now carry machine_id and creator_user_id; per-operator listing.
- Settlements: create_settlement_idempotent treats bitspire_event_id as the
  uniqueness key, returning the existing row on replay so subscription
  re-delivery is safe by construction. mark_settlement_status drives the
  pending → processed/partial/refunded/errored lifecycle.
- Commission splits: replace_commission_splits is an atomic per-scope
  replace; the SetCommissionSplitsData model already validates legs sum
  to 1.0 at the boundary. get_effective_commission_splits handles the
  per-machine-override-or-operator-default precedence.
- Payments: leg-typed (dca / super_fee / operator_split / settlement /
  autoforward / refund) with helpers for settlement/client/operator scopes.
- Balance summary: sums confirmed deposits minus completed dca legs.
- Telemetry: upsert_beacon_snapshot uses COALESCE so today's sparse
  kind-30078 payload doesn't clobber post-#43 fields when they start
  arriving. upsert_fleet_snapshot stores raw JSON until lamassu-next#42
  fixes the kind-30079 schema.
- Super config: singleton get/update.

Also stubs three legacy entry points so __init__.py imports cleanly while
the rest of P0/P1 is in flight:

- tasks.py: no-op stubs for wait_for_paid_invoices + hourly_transaction_polling.
  Real Nostr subscription manager lands in P1.
- views_api.py: a single /api/v1/dca/{...} catch-all returns 503 with a
  precise message. v2 endpoints land in P1+.
- views.py: drops the super-only check on the index page (v2 is
  operator-installable); platform-fee config moves to a super-only API in P1.

transaction_processor.py is left untouched but is now orphaned (no one
imports it) — gets a full rewrite in P1.

Refs: plan at ~/.claude/plans/snug-gliding-shamir.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-14 14:37:48 +02:00
commit 937749f149
4 changed files with 798 additions and 969 deletions

1152
crud.py

File diff suppressed because it is too large Load diff

View file

@ -1,53 +1,32 @@
import asyncio # Satoshi Machine v2 — task placeholders.
from datetime import datetime #
# The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent.
# They will be replaced in P1 (Nostr subscription manager: subscribes via
# lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078
# beacons + kind-30079 telemetry per registered machine, with auto-reconnect).
#
# These no-op stubs keep __init__.py importable in the interim so the
# extension can be activated even before P1 lands.
import asyncio
from lnbits.core.models import Payment
from lnbits.core.services import websocket_updater
from lnbits.tasks import register_invoice_listener
from loguru import logger from loguru import logger
from .transaction_processor import poll_lamassu_transactions
####################################### async def wait_for_paid_invoices() -> None:
########## RUN YOUR TASKS HERE ######## """No-op placeholder pending P1 Nostr subscription manager."""
####################################### logger.debug(
"satmachineadmin v2: invoice listener stub running. "
# The usual task is to listen to invoices related to this extension "Real Nostr-transport subscription pending P1."
)
# Sleep forever; the task system expects a long-lived coroutine.
async def wait_for_paid_invoices():
"""Invoice listener for DCA-related payments"""
invoice_queue = asyncio.Queue()
register_invoice_listener(invoice_queue, "ext_satmachineadmin")
while True: while True:
payment = await invoice_queue.get() await asyncio.sleep(3600)
await on_invoice_paid(payment)
async def hourly_transaction_polling(): async def hourly_transaction_polling() -> None:
"""Background task that polls Lamassu database every hour for new transactions""" """No-op placeholder. The v1 Lamassu PostgreSQL poller is gone — bitSpire
logger.info("Starting hourly Lamassu transaction polling task") settlements arrive push-based via Nostr kind-21000 in v2."""
logger.debug("satmachineadmin v2: legacy polling stub (no-op).")
while True: while True:
try: await asyncio.sleep(3600)
logger.info(f"Running Lamassu transaction poll at {datetime.now()}")
await poll_lamassu_transactions()
logger.info("Completed Lamassu transaction poll, sleeping for 1 hour")
# Sleep for 1 hour (3600 seconds)
await asyncio.sleep(3600)
except Exception as e:
logger.error(f"Error in hourly polling task: {e}")
# Sleep for 5 minutes before retrying on error
await asyncio.sleep(300)
async def on_invoice_paid(payment: Payment) -> None:
"""Handle DCA-related invoice payments"""
# DCA payments are handled internally by the transaction processor
# This function can be extended if needed for additional payment processing
if payment.extra.get("tag") in ["dca_distribution", "dca_commission"]:
logger.info(f"DCA payment processed: {payment.checking_id} - {payment.amount} sats")
# Could add websocket notifications here if needed
pass

View file

@ -1,8 +1,10 @@
# Description: DCA Admin page endpoints. # Satoshi Machine v2 — page route.
#
# v2 is operator-installable (any LNbits user, not super-only). The super-only
# check in v1's index() is gone. Super-only controls (platform fee config)
# move to a dedicated API endpoint protected by check_super_user in P1.
from http import HTTPStatus from fastapi import APIRouter, Depends, Request
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from lnbits.core.models import User from lnbits.core.models import User
from lnbits.decorators import check_user_exists from lnbits.decorators import check_user_exists
@ -15,13 +17,9 @@ def satmachineadmin_renderer():
return template_renderer(["satmachineadmin/templates"]) return template_renderer(["satmachineadmin/templates"])
# DCA Admin page - Requires superuser access
@satmachineadmin_generic_router.get("/", response_class=HTMLResponse) @satmachineadmin_generic_router.get("/", response_class=HTMLResponse)
async def index(req: Request, user: User = Depends(check_user_exists)): async def index(req: Request, user: User = Depends(check_user_exists)):
if not user.super_user:
raise HTTPException(
HTTPStatus.FORBIDDEN, "User not authorized. No super user privileges."
)
return satmachineadmin_renderer().TemplateResponse( return satmachineadmin_renderer().TemplateResponse(
"satmachineadmin/index.html", {"request": req, "user": user.json()} "satmachineadmin/index.html",
{"request": req, "user": user.json()},
) )

View file

@ -1,570 +1,28 @@
# Description: This file contains the extensions API endpoints. # Satoshi Machine v2 — API placeholder.
#
# The v1 super-only Lamassu endpoints have been removed. The v2 operator-
# scoped surface (machines / clients / deposits / settlements / commission
# splits / partial-tx / balance-settle / super platform-fee) lands in P1+.
# See plan section "Critical files to modify".
#
# This stub keeps __init__.py importable and surfaces a clear 503 on every
# v1 route so existing clients get a precise error instead of a silent 404.
from http import HTTPStatus from http import HTTPStatus
from typing import Optional
from fastapi import APIRouter, Depends, Request from fastapi import APIRouter, HTTPException
from lnbits.core.crud import get_user
from lnbits.core.models import User, WalletTypeInfo
from lnbits.core.services import create_invoice
from lnbits.decorators import check_super_user
from starlette.exceptions import HTTPException
from .crud import (
# DCA CRUD operations
get_dca_clients,
get_dca_client,
update_dca_client,
delete_dca_client,
create_deposit,
get_all_deposits,
get_deposit,
update_deposit,
update_deposit_status,
delete_deposit,
get_client_balance_summary,
# Lamassu config CRUD operations
create_lamassu_config,
get_lamassu_config,
get_active_lamassu_config,
get_all_lamassu_configs,
update_lamassu_config,
update_config_test_result,
delete_lamassu_config,
# Lamassu transaction CRUD operations
get_all_lamassu_transactions,
get_lamassu_transaction,
)
from .models import (
# DCA models
DcaClient,
UpdateDcaClientData,
CreateDepositData,
DcaDeposit,
UpdateDepositData,
UpdateDepositStatusData,
ClientBalanceSummary,
CreateLamassuConfigData,
LamassuConfig,
UpdateLamassuConfigData,
StoredLamassuTransaction,
)
satmachineadmin_api_router = APIRouter() satmachineadmin_api_router = APIRouter()
################################################### @satmachineadmin_api_router.api_route(
################ DCA API ENDPOINTS ################ "/api/v1/dca/{full_path:path}",
################################################### methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
# DCA Client Endpoints
@satmachineadmin_api_router.get("/api/v1/dca/clients")
async def api_get_dca_clients(
wallet: WalletTypeInfo = Depends(check_super_user),
) -> list[DcaClient]:
"""Get all DCA clients"""
return await get_dca_clients()
@satmachineadmin_api_router.get("/api/v1/dca/clients/{client_id}")
async def api_get_dca_client(
client_id: str,
wallet: WalletTypeInfo = Depends(check_super_user),
) -> DcaClient:
"""Get a specific DCA client"""
client = await get_dca_client(client_id)
if not client:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="DCA client not found."
)
return client
# Note: Client creation/update/delete will be handled by the DCA client extension
# Admin extension only reads existing clients and manages their deposits
@satmachineadmin_api_router.get("/api/v1/dca/clients/{client_id}/balance")
async def api_get_client_balance(
client_id: str,
wallet: WalletTypeInfo = Depends(check_super_user),
) -> ClientBalanceSummary:
"""Get client balance summary"""
client = await get_dca_client(client_id)
if not client:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="DCA client not found."
)
return await get_client_balance_summary(client_id)
# DCA Deposit Endpoints
@satmachineadmin_api_router.get("/api/v1/dca/deposits")
async def api_get_deposits(
wallet: WalletTypeInfo = Depends(check_super_user),
) -> list[DcaDeposit]:
"""Get all deposits"""
return await get_all_deposits()
@satmachineadmin_api_router.get("/api/v1/dca/deposits/{deposit_id}")
async def api_get_deposit(
deposit_id: str,
wallet: WalletTypeInfo = Depends(check_super_user),
) -> DcaDeposit:
"""Get a specific deposit"""
deposit = await get_deposit(deposit_id)
if not deposit:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found."
)
return deposit
@satmachineadmin_api_router.post("/api/v1/dca/deposits", status_code=HTTPStatus.CREATED)
async def api_create_deposit(
data: CreateDepositData,
user: User = Depends(check_super_user),
) -> DcaDeposit:
"""Create a new deposit"""
# Verify client exists
client = await get_dca_client(data.client_id)
if not client:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="DCA client not found."
)
return await create_deposit(data)
@satmachineadmin_api_router.put("/api/v1/dca/deposits/{deposit_id}/status")
async def api_update_deposit_status(
deposit_id: str,
data: UpdateDepositStatusData,
user: User = Depends(check_super_user),
) -> DcaDeposit:
"""Update deposit status (e.g., confirm deposit)"""
deposit = await get_deposit(deposit_id)
if not deposit:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found."
)
updated_deposit = await update_deposit_status(deposit_id, data)
if not updated_deposit:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Failed to update deposit.",
)
return updated_deposit
@satmachineadmin_api_router.put("/api/v1/dca/deposits/{deposit_id}")
async def api_update_deposit(
deposit_id: str,
data: UpdateDepositData,
user: User = Depends(check_super_user),
) -> DcaDeposit:
"""Update deposit fields (amount, currency, notes). Only pending deposits can be edited."""
deposit = await get_deposit(deposit_id)
if not deposit:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found."
)
if deposit.status != "pending":
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Only pending deposits can be edited.",
)
updated_deposit = await update_deposit(deposit_id, data)
if not updated_deposit:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Failed to update deposit.",
)
return updated_deposit
@satmachineadmin_api_router.delete("/api/v1/dca/deposits/{deposit_id}")
async def api_delete_deposit(
deposit_id: str,
user: User = Depends(check_super_user),
):
"""Delete a deposit. Only pending deposits (not yet inserted into the machine) can be deleted."""
deposit = await get_deposit(deposit_id)
if not deposit:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Deposit not found."
)
if deposit.status != "pending":
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Only pending deposits can be deleted. Confirmed deposits have already been inserted into the machine.",
)
await delete_deposit(deposit_id)
return {"message": "Deposit deleted successfully"}
# Transaction Polling Endpoints
@satmachineadmin_api_router.post("/api/v1/dca/test-connection")
async def api_test_database_connection(
user: User = Depends(check_super_user),
):
"""Test connection to Lamassu database with detailed reporting"""
try:
from .transaction_processor import transaction_processor
# Use the detailed test method
result = await transaction_processor.test_connection_detailed()
return result
except Exception as e:
return {
"success": False,
"message": f"Test connection error: {str(e)}",
"steps": [f"❌ Unexpected error: {str(e)}"],
"ssh_tunnel_used": False,
"ssh_tunnel_success": False,
"database_connection_success": False,
}
@satmachineadmin_api_router.post("/api/v1/dca/manual-poll")
async def api_manual_poll(
user: User = Depends(check_super_user),
):
"""Manually trigger a poll of the Lamassu database"""
try:
from .transaction_processor import transaction_processor
from .crud import update_poll_start_time, update_poll_success_time
# Get database configuration
db_config = await transaction_processor.connect_to_lamassu_db()
if not db_config:
raise HTTPException(
status_code=HTTPStatus.SERVICE_UNAVAILABLE,
detail="Could not get Lamassu database configuration",
)
config_id = db_config["config_id"]
# Record manual poll start time
await update_poll_start_time(config_id)
# Fetch and process transactions via SSH
new_transactions = await transaction_processor.fetch_new_transactions(db_config)
transactions_processed = 0
for transaction in new_transactions:
await transaction_processor.process_transaction(transaction)
transactions_processed += 1
# Record successful manual poll completion
await update_poll_success_time(config_id)
return {
"success": True,
"transactions_processed": transactions_processed,
"message": f"Processed {transactions_processed} new transactions since last poll",
}
except Exception as e:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Error during manual poll: {str(e)}",
)
@satmachineadmin_api_router.post("/api/v1/dca/process-transaction/{transaction_id}")
async def api_process_specific_transaction(
transaction_id: str,
user: User = Depends(check_super_user),
):
"""
Manually process a specific Lamassu transaction by ID, bypassing all status filters.
This endpoint is useful for processing transactions that were manually settled
or had dispense issues but need to be included in DCA distribution.
"""
try:
from .transaction_processor import transaction_processor
from .crud import get_payments_by_lamassu_transaction
# Get database configuration
db_config = await transaction_processor.connect_to_lamassu_db()
if not db_config:
raise HTTPException(
status_code=HTTPStatus.SERVICE_UNAVAILABLE,
detail="Could not get Lamassu database configuration",
)
# Check if transaction was already processed
existing_payments = await get_payments_by_lamassu_transaction(transaction_id)
if existing_payments:
return {
"success": False,
"already_processed": True,
"message": f"Transaction {transaction_id} was already processed with {len(existing_payments)} distributions",
"payment_count": len(existing_payments),
}
# Fetch the specific transaction from Lamassu (bypassing all filters)
transaction = await transaction_processor.fetch_transaction_by_id(db_config, transaction_id)
if not transaction:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Transaction {transaction_id} not found in Lamassu database",
)
# Process the transaction through normal DCA flow
await transaction_processor.process_transaction(transaction)
return {
"success": True,
"message": f"Transaction {transaction_id} processed successfully",
"transaction_details": {
"transaction_id": transaction_id,
"status": transaction.get("status"),
"dispense": transaction.get("dispense"),
"dispense_confirmed": transaction.get("dispense_confirmed"),
"crypto_amount": transaction.get("crypto_amount"),
"fiat_amount": transaction.get("fiat_amount"),
},
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Error processing transaction {transaction_id}: {str(e)}",
)
# COMMENTED OUT FOR PRODUCTION - Test transaction endpoint disabled
# Uncomment only for development/debugging purposes
#
# @satmachineadmin_api_router.post("/api/v1/dca/test-transaction")
# async def api_test_transaction(
# user: User = Depends(check_super_user),
# crypto_atoms: int = 103,
# commission_percentage: float = 0.03,
# discount: float = 0.0,
# ) -> dict:
# """Test transaction processing with simulated Lamassu transaction data"""
# try:
# from .transaction_processor import transaction_processor
# import uuid
# from datetime import datetime, timezone
#
# # Create a mock transaction that mimics Lamassu database structure
# mock_transaction = {
# "transaction_id": str(uuid.uuid4())[:8], # Short ID for testing
# "crypto_amount": crypto_atoms, # Total sats including commission
# "fiat_amount": 100, # Mock fiat amount (100 centavos = 1 GTQ)
# "commission_percentage": commission_percentage, # Already as decimal
# "discount": discount,
# "transaction_time": datetime.now(timezone.utc),
# "crypto_code": "BTC",
# "fiat_code": "GTQ",
# "device_id": "test_device",
# "status": "confirmed",
# }
#
# # Process the mock transaction through the complete DCA flow
# await transaction_processor.process_transaction(mock_transaction)
#
# # Calculate commission for response
# if commission_percentage > 0:
# effective_commission = commission_percentage * (100 - discount) / 100
# base_crypto_atoms = int(crypto_atoms / (1 + effective_commission))
# commission_amount_sats = crypto_atoms - base_crypto_atoms
# else:
# base_crypto_atoms = crypto_atoms
# commission_amount_sats = 0
#
# return {
# "success": True,
# "message": "Test transaction processed successfully",
# "transaction_details": {
# "transaction_id": mock_transaction["transaction_id"],
# "total_amount_sats": crypto_atoms,
# "base_amount_sats": base_crypto_atoms,
# "commission_amount_sats": commission_amount_sats,
# "commission_percentage": commission_percentage
# * 100, # Show as percentage
# "effective_commission": effective_commission * 100
# if commission_percentage > 0
# else 0,
# "discount": discount,
# },
# }
#
# except Exception as e:
# raise HTTPException(
# status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
# detail=f"Error processing test transaction: {str(e)}",
# )
# Lamassu Transaction Endpoints
@satmachineadmin_api_router.get("/api/v1/dca/transactions")
async def api_get_lamassu_transactions(
wallet: WalletTypeInfo = Depends(check_super_user),
) -> list[StoredLamassuTransaction]:
"""Get all processed Lamassu transactions"""
return await get_all_lamassu_transactions()
@satmachineadmin_api_router.get("/api/v1/dca/transactions/{transaction_id}")
async def api_get_lamassu_transaction(
transaction_id: str,
wallet: WalletTypeInfo = Depends(check_super_user),
) -> StoredLamassuTransaction:
"""Get a specific Lamassu transaction with details"""
transaction = await get_lamassu_transaction(transaction_id)
if not transaction:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Lamassu transaction not found."
)
return transaction
@satmachineadmin_api_router.get(
"/api/v1/dca/transactions/{transaction_id}/distributions"
) )
async def api_get_transaction_distributions( async def v2_in_progress_stub(full_path: str) -> None:
transaction_id: str, raise HTTPException(
wallet: WalletTypeInfo = Depends(check_super_user), HTTPStatus.SERVICE_UNAVAILABLE,
) -> list[dict]: f"satmachineadmin v2 API not yet implemented (path: /{full_path}). "
"""Get distribution details for a specific Lamassu transaction""" "The v1 Lamassu surface has been removed; per-operator endpoints "
# Get the stored transaction "land in P1. See plan.",
transaction = await get_lamassu_transaction(transaction_id)
if not transaction:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Lamassu transaction not found."
)
# Get all DCA payments for this Lamassu transaction
from .crud import get_payments_by_lamassu_transaction, get_dca_client
payments = await get_payments_by_lamassu_transaction(
transaction.lamassu_transaction_id
) )
# Enhance payments with client information
distributions = []
for payment in payments:
client = await get_dca_client(payment.client_id)
distributions.append(
{
"payment_id": payment.id,
"client_id": payment.client_id,
"client_username": client.username if client else None,
"client_user_id": client.user_id if client else None,
"amount_sats": payment.amount_sats,
"amount_fiat": payment.amount_fiat,
"exchange_rate": payment.exchange_rate,
"status": payment.status,
"created_at": payment.created_at,
}
)
return distributions
# Lamassu Configuration Endpoints
@satmachineadmin_api_router.get("/api/v1/dca/config")
async def api_get_lamassu_config(
wallet: WalletTypeInfo = Depends(check_super_user),
) -> Optional[LamassuConfig]:
"""Get active Lamassu database configuration"""
return await get_active_lamassu_config()
@satmachineadmin_api_router.post("/api/v1/dca/config", status_code=HTTPStatus.CREATED)
async def api_create_lamassu_config(
data: CreateLamassuConfigData,
user: User = Depends(check_super_user),
) -> LamassuConfig:
"""Create/update Lamassu database configuration"""
return await create_lamassu_config(data)
@satmachineadmin_api_router.put("/api/v1/dca/config/{config_id}")
async def api_update_lamassu_config(
config_id: str,
data: UpdateLamassuConfigData,
user: User = Depends(check_super_user),
) -> LamassuConfig:
"""Update Lamassu database configuration"""
config = await get_lamassu_config(config_id)
if not config:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Configuration not found."
)
updated_config = await update_lamassu_config(config_id, data)
if not updated_config:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Failed to update configuration.",
)
return updated_config
@satmachineadmin_api_router.delete("/api/v1/dca/config/{config_id}")
async def api_delete_lamassu_config(
config_id: str,
user: User = Depends(check_super_user),
):
"""Delete Lamassu database configuration"""
config = await get_lamassu_config(config_id)
if not config:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Configuration not found."
)
await delete_lamassu_config(config_id)
return {"message": "Configuration deleted successfully"}
@satmachineadmin_api_router.get("/api/v1/dca/client-limits")
async def api_get_client_limits():
"""Get client-safe configuration limits (public endpoint - no authentication)"""
try:
config = await get_active_lamassu_config()
if not config:
# Return sensible defaults if no config exists
return {
"max_daily_limit_gtq": 2000,
"currency": "GTQ"
}
# Return only client-safe configuration fields
return {
"max_daily_limit_gtq": config.max_daily_limit_gtq,
"currency": "GTQ" # Could be made configurable later
}
except Exception:
# Return defaults on any error
return {
"max_daily_limit_gtq": 2000,
"currency": "GTQ"
}