diff --git a/__init__.py b/__init__.py index 162f3dc..2d0ebcf 100644 --- a/__init__.py +++ b/__init__.py @@ -5,7 +5,7 @@ from lnbits.tasks import create_permanent_unique_task from loguru import logger from .crud import db -from .tasks import wait_for_paid_invoices +from .tasks import wait_for_cassette_state_events, wait_for_paid_invoices from .views import satmachineadmin_generic_router from .views_api import satmachineadmin_api_router @@ -42,6 +42,14 @@ def satmachineadmin_start(): "ext_satmachineadmin", wait_for_paid_invoices ) scheduled_tasks.append(invoice_task) + # Cassette bootstrap consumer (#29 v1) — subscribes to + # bitspire-cassettes-state events from each active ATM and upserts + # cassette_configs on receipt. Soft-fails if nostrclient isn't + # installed (logs + backs off, never crashes). + cassette_task = create_permanent_unique_task( + "ext_satmachineadmin_cassette_bootstrap", wait_for_cassette_state_events + ) + scheduled_tasks.append(cassette_task) __all__ = [ diff --git a/cassette_transport.py b/cassette_transport.py new file mode 100644 index 0000000..1725dde --- /dev/null +++ b/cassette_transport.py @@ -0,0 +1,424 @@ +""" +Cassette-config Nostr transport — operator ↔ ATM kind-30078 publish + consume. + +Per the locked design at aiolabs/satmachineadmin#29 (paired with +lamassu-next#56) and the dcd0874 privacy-by-default pivot, the operator +publishes position-keyed cassette config to a target ATM via: + + kind = 30078 (NIP-78, replaceable) + tags = [ + ["d", "bitspire-cassettes:"], + ["p", ""] + ] + content = NIP-44 v2 encrypted JSON of PublishCassettesPayload.to_wire_dict() + pubkey = operator pubkey + sig = operator signature + +The ATM-side consumer (lamassu-next#56) subscribes by the d-tag + its own +npub, decrypts, validates, applies, hot-reloads HAL. + +Reverse direction (ATM → operator, v1 = one-shot bootstrap on first boot, +v2 = continuous reverse channel for reconciliation): + + kind = 30078 + tags = [ + ["d", "bitspire-cassettes-state:"], + ["p", ""] + ] + content = NIP-44 v2 encrypted JSON, same PublishCassettesPayload shape + pubkey = ATM pubkey + +This module owns the wire-format side of both directions. The consumer +task (tasks.py) calls `decrypt_and_parse_state_event` per incoming event; +the API endpoint (views_api.py) calls `publish_to_atm` per operator submit. + +The `` placeholder semantics (load-bearing per the 2026-05-30T11:50Z +coord-log entry): always the ATM's hex pubkey, NEVER satmachineadmin's +internal dca_machines.id UUID. Helper `_atm_hex_pubkey(machine)` +centralises the canonicalisation via lnbits.utils.nostr.normalize_public_key. +""" + +from __future__ import annotations + +import json +import time + +from lnbits.core.crud.users import get_account +from lnbits.core.services.nip46_bunker_client import ( + NsecBunkerRpcError, + NsecBunkerTimeoutError, +) +from lnbits.core.signers import resolve_signer +from lnbits.core.signers.base import ( + NostrSigner, + SignerError, + SignerUnavailableError, +) +from lnbits.utils.nostr import normalize_public_key +from loguru import logger + +from .models import Machine, PublishCassettesPayload +from .nip44 import Nip44Error +from .nip44 import decrypt_from as _nip44_local_decrypt +from .nip44 import encrypt_for as _nip44_local_encrypt + +_KIND_NIP78 = 30078 +_D_TAG_CONFIG_PREFIX = "bitspire-cassettes:" # operator → ATM +_D_TAG_STATE_PREFIX = "bitspire-cassettes-state:" # ATM → operator + + +# ============================================================================= +# Errors +# ============================================================================= + + +class CassetteTransportError(Exception): + """Generic transport-layer error. Subclasses distinguish failure modes + so the API can surface meaningful HTTP statuses + the consumer task + can log + skip without crashing.""" + + +class OperatorIdentityMissing(CassetteTransportError): + """Operator account has no Nostr pubkey on file, or no signer is + available (pre-bunker rollout — operator hasn't onboarded via + Nostr-login).""" + + +class SignerUnavailable(CassetteTransportError): + """Resolved signer can't sign server-side (client-side-only signer, + or transient bunker unreachability post-lnbits#18). Publish skipped.""" + + +class RelayUnavailable(CassetteTransportError): + """nostrclient extension isn't installed or its relay manager isn't + reachable. Treated as soft-fail; publish skipped + logged.""" + + +class CassetteEventDecodeError(CassetteTransportError): + """Inbound state event failed validation: bad signature, NIP-44 v2 + decrypt failure, or payload didn't conform to PublishCassettesPayload. + Terminal — caller should log + skip, advancing past the event.""" + + +class CassetteEventTransientError(CassetteTransportError): + """Inbound state event couldn't be decrypted because the signer + component (typically the bunker) is transiently unavailable. Caller + should NOT advance past the event; retry on next tick. + + Distinct from CassetteEventDecodeError so the consumer task can + differentiate "MAC failed, give up" from "bunker is partitioned, try + again in a few seconds" — surfaced by lnbits at coord-log + 2026-05-31T07:10Z as the load-bearing distinction post-PR-#38.""" + + +# ============================================================================= +# Helpers — canonical pubkey + d-tag construction +# ============================================================================= + + +def _atm_hex_pubkey(machine: Machine) -> str: + """Canonicalise machine.machine_npub (hex OR npub bech32 — operator + enters either in the UI) to lowercase hex. ALL d-tag substitutions + use this value; using the internal machine.id UUID would silently + no-op the wire-level filter (per coord-log 11:50Z load-bearing nudge). + """ + return normalize_public_key(machine.machine_npub).lower() + + +def _config_d_tag(atm_pubkey_hex: str) -> str: + """d-tag for operator → ATM publish. ATM subscribes by this tag.""" + return f"{_D_TAG_CONFIG_PREFIX}{atm_pubkey_hex}" + + +def _state_d_tag(atm_pubkey_hex: str) -> str: + """d-tag for ATM → operator publish (bootstrap in v1, continuous v2).""" + return f"{_D_TAG_STATE_PREFIX}{atm_pubkey_hex}" + + +def build_state_d_tags_for_machines(machines: list[Machine]) -> list[str]: + """Bootstrap-consumer subscription filter helper: returns the full + `#d=[...]` list for all known ATMs an operator subscribes to.""" + return [_state_d_tag(_atm_hex_pubkey(m)) for m in machines] + + +# ============================================================================= +# Sign-as-operator — hybrid path (resolve_signer post #17, prvkey fallback) +# ============================================================================= + + +async def _resolve_operator_signer(operator_user_id: str): + """Fetch the operator's account + resolve to a NostrSigner. + + Single source of truth for "give me the signer for this operator, + or raise an operator-facing error if we can't." Returns + `(account, signer)` so callers that need both (publish path needs + `account.pubkey` for the event author and the signer for both + encrypt + sign) don't double-fetch. + + Raises: + - OperatorIdentityMissing — no account, or no pubkey on file + - SignerUnavailable — signer resolve failed, or signer can't sign + server-side (ClientSideOnly) + """ + account = await get_account(operator_user_id) + if account is None or not account.pubkey: + raise OperatorIdentityMissing( + f"operator {operator_user_id[:8]}... has no Nostr pubkey on file. " + "Onboard via the LNbits Nostr-login flow to publish cassette " + "config to your ATMs." + ) + try: + signer = resolve_signer(account) + except SignerError as exc: + raise SignerUnavailable( + f"signer resolve failed for operator {operator_user_id[:8]}...: " f"{exc}" + ) from exc + if not signer.can_sign(): + raise SignerUnavailable( + f"operator {operator_user_id[:8]}... has a client-side-only " + "signer; server can't sign or NIP-44-encrypt on their behalf. " + "Operator must hold their nsec via a NIP-46 bunker (lnbits#18) " + "or migrate to a server-signing account." + ) + return account, signer + + +async def _sign_as_operator(operator_user_id: str, event: dict) -> dict | None: + """Sign `event` using the operator's signer (LocalSigner or + RemoteBunkerSigner). Mutates `event` to add `created_at` (now), + `pubkey`, `id`, and `sig`. + + Raises typed CassetteTransportError subclasses on hard failure + (the publish endpoint maps these to HTTP statuses); never returns + None on the publish path. + """ + _account, signer = await _resolve_operator_signer(operator_user_id) + # created_at is part of the BIP-340 event-id hash; set before signing. + event["created_at"] = int(time.time()) + try: + return await signer.sign_event(event) + except SignerUnavailableError as exc: + raise SignerUnavailable( + f"signer unavailable for operator {operator_user_id[:8]}...: " f"{exc}" + ) from exc + + +async def _nip44_encrypt_via_signer( + account, signer: NostrSigner, plaintext: str, peer_pubkey_hex: str +) -> str: + """NIP-44 v2 encrypt via the signer abstraction, with a transitional + fallback to direct-prvkey for LocalSigner accounts. + + The bunker (RemoteBunkerSigner) implements `nip44_encrypt` natively — + the operator's nsec never leaves the bunker process. LocalSigner's + `nip44_encrypt` stub explicitly raises SignerUnavailableError + ("LocalSigner does not implement nip44_encrypt") per the + post-PR-#38 ABC — the spec is "migrate to bunker." For the + transitional window where some operators are still on LocalSigner + + their `account.prvkey` is intact, we catch that signal and use + our hand-rolled NIP-44 v2 impl against the stored prvkey. Same + wire output either way. + + Removed once every operator account on this instance is bunker- + backed (S7 fully landed). At that point this helper collapses to + `return await signer.nip44_encrypt(plaintext, peer_pubkey_hex)`. + """ + try: + return await signer.nip44_encrypt(plaintext, peer_pubkey_hex) + except SignerUnavailableError: + if account.signer_type == "LocalSigner" and account.prvkey: + return _nip44_local_encrypt(plaintext, account.prvkey, peer_pubkey_hex) + # ClientSideOnly, or RemoteBunkerSigner with bunker comms failure + # at config time — re-raise without wrapping; caller maps it. + raise + + +async def _nip44_decrypt_via_signer( + account, signer: NostrSigner, ciphertext: str, peer_pubkey_hex: str +) -> str: + """Decrypt mirror of `_nip44_encrypt_via_signer`. Same LocalSigner + transitional fallback.""" + try: + return await signer.nip44_decrypt(ciphertext, peer_pubkey_hex) + except SignerUnavailableError: + if account.signer_type == "LocalSigner" and account.prvkey: + return _nip44_local_decrypt(ciphertext, account.prvkey, peer_pubkey_hex) + raise + + +# ============================================================================= +# Publish — operator → ATM (the satmachineadmin API path) +# ============================================================================= + + +async def _publish_signed_event(signed_event: dict) -> None: + """Send a signed Nostr event to all relays via the nostrclient + extension's singleton RelayManager. + + Lazy import + typed-error so the API can surface "your LNbits doesn't + have nostrclient installed" as a 503 rather than a 500. Pattern + matches the cross-extension import guards in + `lnbits.core.services.users` (nostrmarket / nostrrelay). + """ + try: + from nostrclient.router import ( # type: ignore[import-not-found] + nostr_client, + ) + except ImportError as exc: + raise RelayUnavailable( + "nostrclient extension is not installed; cassette config " + "publish requires it. Install + activate the nostrclient " + "extension on this LNbits instance." + ) from exc + msg = json.dumps(["EVENT", signed_event]) + nostr_client.relay_manager.publish_message(msg) + + +async def publish_to_atm( + machine: Machine, + payload: PublishCassettesPayload, + operator_user_id: str, +) -> dict: + """Build, encrypt, sign, and publish a kind-30078 cassette config event + from the operator to the target ATM. + + Returns the signed event dict on success (caller may log event.id for + audit). Raises CassetteTransportError subclasses on hard failures: + - OperatorIdentityMissing → 400: operator hasn't onboarded + - SignerUnavailable → 503: signer offline / client-side-only / bunker + timeout at the encrypt or sign step + - RelayUnavailable → 503: nostrclient not installed + - CassetteTransportError → 500: anything else + """ + atm_pubkey_hex = _atm_hex_pubkey(machine) + + # Single fetch + resolve — same signer is used for both encrypt and sign. + account, signer = await _resolve_operator_signer(operator_user_id) + + # NIP-44 v2 encrypt the wire payload. Bunker round-trip on + # RemoteBunkerSigner; direct prvkey on LocalSigner (transitional). + plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":")) + try: + content = await _nip44_encrypt_via_signer( + account, signer, plaintext, atm_pubkey_hex + ) + except NsecBunkerTimeoutError as exc: + raise SignerUnavailable( + f"bunker unreachable while encrypting cassette config for " + f"operator {operator_user_id[:8]}...: {exc}" + ) from exc + except NsecBunkerRpcError as exc: + raise SignerUnavailable( + f"bunker rejected nip44_encrypt for operator " + f"{operator_user_id[:8]}... (policy / MAC / config issue): " + f"{exc}" + ) from exc + except SignerUnavailableError as exc: + raise SignerUnavailable( + f"signer cannot nip44-encrypt for operator " + f"{operator_user_id[:8]}...: {exc}" + ) from exc + + event: dict = { + "kind": _KIND_NIP78, + "tags": [ + ["d", _config_d_tag(atm_pubkey_hex)], + ["p", atm_pubkey_hex], + ], + "content": content, + # created_at is set inside _sign_as_operator before signing. + } + signed = await _sign_as_operator(operator_user_id, event) + if signed is None: + raise CassetteTransportError( + "sign_as_operator returned None unexpectedly — soft-fail path " + "shouldn't be reachable on a publish-initiated flow" + ) + + await _publish_signed_event(signed) + logger.info( + f"satmachineadmin: published kind-30078 cassette config to ATM " + f"{atm_pubkey_hex[:12]}... (event_id={signed['id'][:12]}..., " + f"machine_id={machine.id}, positions={sorted(payload.positions.keys())})" + ) + return signed + + +# ============================================================================= +# Consume — ATM → operator (the bootstrap consumer task) +# ============================================================================= + + +async def decrypt_and_parse_state_event( + event: dict, account, signer: NostrSigner +) -> PublishCassettesPayload: + """Decrypt + parse an inbound `bitspire-cassettes-state:` + event the ATM published toward the operator. + + Caller is responsible for: + - filtering on `kind=30078` and the expected `#d` tag list + - verifying the event signature (lnbits.utils.nostr.verify_event) + - confirming `event["pubkey"]` matches a known ATM (= machine.machine_npub + canonicalised) — the consumer task does this before calling here + - resolving the operator's account + signer via + `_resolve_operator_signer(...)` and passing them in + + This function does: + - NIP-44 v2 decrypt of event["content"] via `signer.nip44_decrypt` + (bunker round-trip on RemoteBunkerSigner; direct prvkey on the + transitional LocalSigner path) + - JSON parse + PublishCassettesPayload validation + + Error mapping: + - CassetteEventTransientError on NsecBunkerTimeoutError → caller + should NOT advance state_event_id; retry on next consumer tick + - CassetteEventDecodeError on anything else (bunker RPC reject, + signer unavailable, MAC failure, JSON parse, payload shape) → + terminal; caller logs + skips + """ + sender_pubkey = event.get("pubkey") + content = event.get("content") + if not isinstance(sender_pubkey, str) or not isinstance(content, str): + raise CassetteEventDecodeError( + "event missing required pubkey or content fields" + ) + + try: + plaintext = await _nip44_decrypt_via_signer( + account, signer, content, sender_pubkey + ) + except NsecBunkerTimeoutError as exc: + raise CassetteEventTransientError( + f"bunker unreachable while decrypting cassette state event: {exc}" + ) from exc + except NsecBunkerRpcError as exc: + raise CassetteEventDecodeError( + f"bunker rejected nip44_decrypt (policy / MAC / config): {exc}" + ) from exc + except SignerUnavailableError as exc: + raise CassetteEventDecodeError(f"signer cannot nip44-decrypt: {exc}") from exc + except Nip44Error as exc: + # Hand-rolled LocalSigner fallback path (transitional) — MAC fail + # / version mismatch / length issue. + raise CassetteEventDecodeError( + f"NIP-44 v2 decrypt failed (LocalSigner fallback path): {exc}" + ) from exc + except ValueError as exc: + # coincurve raises ValueError on a malformed pubkey hex (only + # reachable via the LocalSigner fallback path; the bunker handles + # pubkey validation server-side). + raise CassetteEventDecodeError(f"sender pubkey is malformed: {exc}") from exc + + try: + raw = json.loads(plaintext) + except json.JSONDecodeError as exc: + raise CassetteEventDecodeError( + f"decrypted content isn't valid JSON: {exc}" + ) from exc + + try: + return PublishCassettesPayload(**raw) + except Exception as exc: + raise CassetteEventDecodeError( + f"payload didn't validate as PublishCassettesPayload: {exc}" + ) from exc diff --git a/crud.py b/crud.py index 51b07a4..1144b0c 100644 --- a/crud.py +++ b/crud.py @@ -6,12 +6,12 @@ # machine model". from datetime import datetime -from typing import List, Optional from lnbits.db import Database from lnbits.helpers import urlsafe_short_hash from .models import ( + CassetteConfig, ClientBalanceSummary, CommissionSplit, CommissionSplitLeg, @@ -26,6 +26,7 @@ from .models import ( DcaPayment, DcaSettlement, Machine, + PublishCassettesPayload, SuperConfig, TelemetrySnapshot, UpdateDcaClientData, @@ -33,6 +34,7 @@ from .models import ( UpdateDepositStatusData, UpdateMachineData, UpdateSuperConfigData, + UpsertCassetteConfigData, UpsertDcaLpData, ) @@ -44,7 +46,7 @@ db = Database("ext_satoshimachine") # ============================================================================= -async def get_super_config() -> Optional[SuperConfig]: +async def get_super_config() -> SuperConfig | None: return await db.fetchone( "SELECT * FROM satoshimachine.super_config WHERE id = :id", {"id": "default"}, @@ -52,7 +54,7 @@ async def get_super_config() -> Optional[SuperConfig]: ) -async def update_super_config(data: UpdateSuperConfigData) -> Optional[SuperConfig]: +async def update_super_config(data: UpdateSuperConfigData) -> SuperConfig | None: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_super_config() @@ -100,7 +102,7 @@ async def create_machine(operator_user_id: str, data: CreateMachineData) -> Mach return machine -async def get_machine(machine_id: str) -> Optional[Machine]: +async def get_machine(machine_id: str) -> Machine | None: return await db.fetchone( "SELECT * FROM satoshimachine.dca_machines WHERE id = :id", {"id": machine_id}, @@ -108,7 +110,7 @@ async def get_machine(machine_id: str) -> Optional[Machine]: ) -async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]: +async def get_machine_by_npub(machine_npub: str) -> Machine | None: return await db.fetchone( "SELECT * FROM satoshimachine.dca_machines WHERE machine_npub = :npub", {"npub": machine_npub}, @@ -116,7 +118,7 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]: ) -async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]: +async def get_active_machine_by_wallet_id(wallet_id: str) -> Machine | None: """Used by the invoice listener to route an incoming payment to a machine.""" return await db.fetchone( """ @@ -129,7 +131,7 @@ async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]: ) -async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: +async def get_machines_for_operator(operator_user_id: str) -> list[Machine]: return await db.fetchall( """ SELECT * FROM satoshimachine.dca_machines @@ -141,7 +143,46 @@ async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: ) -async def update_machine(machine_id: str, data: UpdateMachineData) -> Optional[Machine]: +async def list_all_active_machines() -> list[Machine]: + """Used by the cassette bootstrap consumer task to build a single + cross-operator subscription filter. Each event's pubkey routes to + the right operator via get_machine_by_atm_pubkey_hex + the machine's + operator_user_id. + """ + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_machines + WHERE is_active = true + ORDER BY created_at DESC + """, + {}, + Machine, + ) + + +async def get_machine_by_atm_pubkey_hex(atm_pubkey_hex: str) -> Machine | None: + """Look up an active machine by its ATM pubkey, accepting hex or bech32 + in machine_npub. Used by the cassette bootstrap consumer to route an + incoming state event to the right machine row (and therefore operator + privkey for decryption). + + O(N) over active machines — fine for small fleets. If fleet sizes + grow, normalise machine_npub-at-write to hex and add an index. + """ + from lnbits.utils.nostr import normalize_public_key + + target = atm_pubkey_hex.lower() + machines = await list_all_active_machines() + for m in machines: + try: + if normalize_public_key(m.machine_npub).lower() == target: + return m + except (ValueError, AssertionError): + continue + return None + + +async def update_machine(machine_id: str, data: UpdateMachineData) -> Machine | None: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_machine(machine_id) @@ -213,7 +254,7 @@ _CLIENT_FROM = ( ) -async def get_dca_client(client_id: str) -> Optional[DcaClient]: +async def get_dca_client(client_id: str) -> DcaClient | None: return await db.fetchone( f"SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM} WHERE c.id = :id", {"id": client_id}, @@ -223,7 +264,7 @@ async def get_dca_client(client_id: str) -> Optional[DcaClient]: async def get_dca_client_for_machine_user( machine_id: str, user_id: str -) -> Optional[DcaClient]: +) -> DcaClient | None: return await db.fetchone( f""" SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM} @@ -234,7 +275,7 @@ async def get_dca_client_for_machine_user( ) -async def get_dca_clients_for_machine(machine_id: str) -> List[DcaClient]: +async def get_dca_clients_for_machine(machine_id: str) -> list[DcaClient]: return await db.fetchall( f""" SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM} @@ -246,7 +287,7 @@ async def get_dca_clients_for_machine(machine_id: str) -> List[DcaClient]: ) -async def get_dca_clients_for_operator(operator_user_id: str) -> List[DcaClient]: +async def get_dca_clients_for_operator(operator_user_id: str) -> list[DcaClient]: """All clients across every machine this operator owns.""" return await db.fetchall( f""" @@ -261,7 +302,7 @@ async def get_dca_clients_for_operator(operator_user_id: str) -> List[DcaClient] ) -async def get_dca_clients_for_user(user_id: str) -> List[DcaClient]: +async def get_dca_clients_for_user(user_id: str) -> list[DcaClient]: """LP cross-operator view — every machine this LP is registered at.""" return await db.fetchall( f""" @@ -274,7 +315,7 @@ async def get_dca_clients_for_user(user_id: str) -> List[DcaClient]: ) -async def get_flow_mode_clients_for_machine(machine_id: str) -> List[DcaClient]: +async def get_flow_mode_clients_for_machine(machine_id: str) -> list[DcaClient]: """Active LPs enrolled at this machine whose per-user `dca_lp` row has `default_dca_mode = 'flow'`. Used by the distribution algorithm. @@ -302,7 +343,7 @@ async def get_flow_mode_clients_for_machine(machine_id: str) -> List[DcaClient]: # ============================================================================= -async def get_dca_lp(user_id: str) -> Optional[DcaLpPreferences]: +async def get_dca_lp(user_id: str) -> DcaLpPreferences | None: """Return the LP's preferences row, or None if they haven't onboarded via satmachineclient yet.""" return await db.fetchone( @@ -325,7 +366,7 @@ async def upsert_dca_lp( user_id: str, data: UpsertDcaLpData, *, - fallback_wallet_id: Optional[str] = None, + fallback_wallet_id: str | None = None, ) -> DcaLpPreferences: """Create or update the LP's preferences row. @@ -380,7 +421,7 @@ async def upsert_dca_lp( async def update_dca_client( client_id: str, data: UpdateDcaClientData -) -> Optional[DcaClient]: +) -> DcaClient | None: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_dca_client(client_id) @@ -442,7 +483,7 @@ async def create_deposit( return deposit -async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]: +async def get_deposit(deposit_id: str) -> DcaDeposit | None: return await db.fetchone( "SELECT * FROM satoshimachine.dca_deposits WHERE id = :id", {"id": deposit_id}, @@ -450,7 +491,7 @@ async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]: ) -async def get_deposits_for_client(client_id: str) -> List[DcaDeposit]: +async def get_deposits_for_client(client_id: str) -> list[DcaDeposit]: return await db.fetchall( """ SELECT * FROM satoshimachine.dca_deposits @@ -462,7 +503,7 @@ async def get_deposits_for_client(client_id: str) -> List[DcaDeposit]: ) -async def get_deposits_for_operator(operator_user_id: str) -> List[DcaDeposit]: +async def get_deposits_for_operator(operator_user_id: str) -> list[DcaDeposit]: return await db.fetchall( """ SELECT d.* @@ -478,7 +519,7 @@ async def get_deposits_for_operator(operator_user_id: str) -> List[DcaDeposit]: async def update_deposit( deposit_id: str, data: UpdateDepositData -) -> Optional[DcaDeposit]: +) -> DcaDeposit | None: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_deposit(deposit_id) @@ -493,7 +534,7 @@ async def update_deposit( async def update_deposit_status( deposit_id: str, data: UpdateDepositStatusData -) -> Optional[DcaDeposit]: +) -> DcaDeposit | None: payload = { "id": deposit_id, "status": data.status, @@ -528,8 +569,8 @@ async def delete_deposit(deposit_id: str) -> None: async def create_settlement_idempotent( data: CreateDcaSettlementData, initial_status: str, - error_message: Optional[str] = None, -) -> Optional[DcaSettlement]: + error_message: str | None = None, +) -> DcaSettlement | None: """Insert a settlement keyed by payment_hash. Returns the inserted row on first sight; returns the existing row @@ -589,7 +630,7 @@ async def create_settlement_idempotent( return await get_settlement(settlement_id) -async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]: +async def get_settlement(settlement_id: str) -> DcaSettlement | None: return await db.fetchone( "SELECT * FROM satoshimachine.dca_settlements WHERE id = :id", {"id": settlement_id}, @@ -599,7 +640,7 @@ async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]: async def get_settlement_by_payment_hash( payment_hash: str, -) -> Optional[DcaSettlement]: +) -> DcaSettlement | None: return await db.fetchone( """ SELECT * FROM satoshimachine.dca_settlements @@ -612,7 +653,7 @@ async def get_settlement_by_payment_hash( async def get_settlements_for_machine( machine_id: str, limit: int = 100 -) -> List[DcaSettlement]: +) -> list[DcaSettlement]: return await db.fetchall( """ SELECT * FROM satoshimachine.dca_settlements @@ -705,7 +746,7 @@ async def get_stuck_settlements_for_operator( async def force_reset_stuck_settlement( settlement_id: str, -) -> Optional[DcaSettlement]: +) -> DcaSettlement | None: """Operator escape hatch for genuinely stuck settlements (processor crashed mid-flight, etc.). Flips 'pending'/'processing' → 'errored' so the existing retry endpoint can take over. Clears processing_claim. @@ -728,7 +769,7 @@ async def force_reset_stuck_settlement( async def get_settlements_for_operator( operator_user_id: str, limit: int = 200 -) -> List[DcaSettlement]: +) -> list[DcaSettlement]: return await db.fetchall( """ SELECT s.* @@ -746,8 +787,8 @@ async def get_settlements_for_operator( async def mark_settlement_status( settlement_id: str, status: str, - error_message: Optional[str] = None, -) -> Optional[DcaSettlement]: + error_message: str | None = None, +) -> DcaSettlement | None: """Status: 'pending' | 'processing' | 'processed' | 'partial' | 'refunded' | 'errored'. Clears processing_claim on terminal states so a fresh claim attempt won't see a stale token.""" @@ -778,7 +819,7 @@ async def mark_settlement_status( async def claim_settlement_for_processing( settlement_id: str, -) -> Optional[DcaSettlement]: +) -> DcaSettlement | None: """Optimistic-lock claim: atomically flip a settlement to 'processing' and tag it with a per-invocation token. Returns the claimed row on success; None if another caller already won the claim or the settlement @@ -808,7 +849,7 @@ async def claim_settlement_for_processing( async def reset_settlement_for_retry( settlement_id: str, -) -> Optional[DcaSettlement]: +) -> DcaSettlement | None: """Operator retry path. Flips 'errored' → 'pending' and voids any 'failed' legs so process_settlement re-runs them fresh. Completed legs are left in place — we never re-pay sats that already moved.""" @@ -844,7 +885,7 @@ async def apply_partial_dispense( new_operator_fee_sats: int, new_fiat_amount: float, appended_note: str, -) -> Optional[DcaSettlement]: +) -> DcaSettlement | None: """Overwrite the monetary fields on a settlement (partial-dispense recompute) and prepend `appended_note` to the notes column. @@ -899,7 +940,7 @@ async def count_completed_legs_for_settlement(settlement_id: str) -> int: async def append_settlement_note( settlement_id: str, note: str, author_user_id: str -) -> Optional[DcaSettlement]: +) -> DcaSettlement | None: """Prepend an operator-authored note to settlement.notes. Each entry is timestamped (UTC) and tagged with the author's user id so the trail is accountable. Append-only: existing entries are never edited.""" @@ -944,8 +985,8 @@ async def void_open_legs_for_settlement(settlement_id: str) -> None: async def get_commission_splits( - operator_user_id: str, machine_id: Optional[str] = None -) -> List[CommissionSplit]: + operator_user_id: str, machine_id: str | None = None +) -> list[CommissionSplit]: """Returns the rule set for the given scope. Precedence (caller's responsibility): try per-machine override first; @@ -974,7 +1015,7 @@ async def get_commission_splits( async def get_effective_commission_splits( operator_user_id: str, machine_id: str -) -> List[CommissionSplit]: +) -> list[CommissionSplit]: """Per-machine override if set, otherwise operator's default ruleset.""" overrides = await get_commission_splits(operator_user_id, machine_id) if overrides: @@ -984,9 +1025,9 @@ async def get_effective_commission_splits( async def replace_commission_splits( operator_user_id: str, - machine_id: Optional[str], - legs: List[CommissionSplitLeg], -) -> List[CommissionSplit]: + machine_id: str | None, + legs: list[CommissionSplitLeg], +) -> list[CommissionSplit]: """Atomic replace for the (operator, machine) scope. Caller should have already validated legs sum to 1.0 via the Pydantic model.""" if machine_id is None: @@ -1072,7 +1113,7 @@ async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment: return payment -async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]: +async def get_dca_payment(payment_id: str) -> DcaPayment | None: return await db.fetchone( "SELECT * FROM satoshimachine.dca_payments WHERE id = :id", {"id": payment_id}, @@ -1080,7 +1121,7 @@ async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]: ) -async def get_payments_for_settlement(settlement_id: str) -> List[DcaPayment]: +async def get_payments_for_settlement(settlement_id: str) -> list[DcaPayment]: return await db.fetchall( """ SELECT * FROM satoshimachine.dca_payments @@ -1092,7 +1133,7 @@ async def get_payments_for_settlement(settlement_id: str) -> List[DcaPayment]: ) -async def get_payments_for_client(client_id: str) -> List[DcaPayment]: +async def get_payments_for_client(client_id: str) -> list[DcaPayment]: return await db.fetchall( """ SELECT * FROM satoshimachine.dca_payments @@ -1105,8 +1146,8 @@ async def get_payments_for_client(client_id: str) -> List[DcaPayment]: async def get_payments_for_operator( - operator_user_id: str, leg_type: Optional[str] = None, limit: int = 200 -) -> List[DcaPayment]: + operator_user_id: str, leg_type: str | None = None, limit: int = 200 +) -> list[DcaPayment]: if leg_type is None: return await db.fetchall( """ @@ -1133,9 +1174,9 @@ async def get_payments_for_operator( async def update_payment_status( payment_id: str, status: str, - external_payment_hash: Optional[str] = None, - error_message: Optional[str] = None, -) -> Optional[DcaPayment]: + external_payment_hash: str | None = None, + error_message: str | None = None, +) -> DcaPayment | None: await db.execute( """ UPDATE satoshimachine.dca_payments @@ -1161,7 +1202,7 @@ async def update_payment_status( async def get_client_balance_summary( client_id: str, -) -> Optional[ClientBalanceSummary]: +) -> ClientBalanceSummary | None: """Per-client (and per-machine, since clients are per-machine in v2) summary. DCA legs only — settlement/autoforward/super_fee/operator_split legs are @@ -1210,7 +1251,7 @@ async def get_client_balance_summary( # ============================================================================= -async def get_telemetry(machine_id: str) -> Optional[TelemetrySnapshot]: +async def get_telemetry(machine_id: str) -> TelemetrySnapshot | None: return await db.fetchone( "SELECT * FROM satoshimachine.dca_telemetry WHERE machine_id = :mid", {"mid": machine_id}, @@ -1221,19 +1262,19 @@ async def get_telemetry(machine_id: str) -> Optional[TelemetrySnapshot]: async def upsert_beacon_snapshot( machine_id: str, *, - cash_in: Optional[bool] = None, - cash_out: Optional[bool] = None, - cash_level: Optional[str] = None, - fiat: Optional[str] = None, - model: Optional[str] = None, - name: Optional[str] = None, - location: Optional[str] = None, - geo: Optional[str] = None, - fees_json: Optional[str] = None, - limits_json: Optional[str] = None, - denominations_json: Optional[str] = None, - version: Optional[str] = None, -) -> Optional[TelemetrySnapshot]: + cash_in: bool | None = None, + cash_out: bool | None = None, + cash_level: str | None = None, + fiat: str | None = None, + model: str | None = None, + name: str | None = None, + location: str | None = None, + geo: str | None = None, + fees_json: str | None = None, + limits_json: str | None = None, + denominations_json: str | None = None, + version: str | None = None, +) -> TelemetrySnapshot | None: """Upsert kind-30078 beacon fields. All fields are nullable because today's upstream payload only carries cash_in/cash_out/cash_level/fiat/model (see lamassu-next#43 — the enrichment is not yet shipped).""" @@ -1310,7 +1351,7 @@ async def upsert_beacon_snapshot( async def upsert_fleet_snapshot( machine_id: str, telemetry_json: str -) -> Optional[TelemetrySnapshot]: +) -> TelemetrySnapshot | None: """Upsert kind-30079 operator-only telemetry. Awaits lamassu-next#42 to produce a real schema; we store the raw JSON blob until then.""" existing = await get_telemetry(machine_id) @@ -1334,3 +1375,156 @@ async def upsert_fleet_snapshot( {"mid": machine_id, "json": telemetry_json, "now": now}, ) return await get_telemetry(machine_id) + + +# ============================================================================= +# Cassette configs — operator-driven ATM cassette inventory (#29 v1.1). +# ============================================================================= +# Row lifecycle per #29: +# - First population for a (machine_id, position) pair → apply_bootstrap_state +# (consumer reading the ATM's one-shot bitspire-cassettes-state event) +# - Operator edit of denomination or count → update_cassette_config +# (refuses to create new rows; the slot count is hardware-determined) +# - Row creation/deletion for a new position → admin only, via ATM +# re-provisioning + new bootstrap event (not exposed in v1 here) + + +def _should_apply_bootstrap_state( + existing_state_event_id: str | None, incoming_event_id: str +) -> bool: + """Pure-function dedup gate for apply_bootstrap_state. + + Returns False if any existing row for this machine already references + the incoming event_id (relay re-delivery after restart). True otherwise. + + Extracted as a pure function so the dedup decision is unit-testable + without a database round-trip. The actual idempotency check in + apply_bootstrap_state fetches one existing row and passes its + state_event_id here. + """ + return existing_state_event_id != incoming_event_id + + +async def get_cassette_config( + machine_id: str, position: int +) -> CassetteConfig | None: + return await db.fetchone( + "SELECT * FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid AND position = :pos", + {"mid": machine_id, "pos": position}, + CassetteConfig, + ) + + +async def list_cassette_configs_for_machine( + machine_id: str, +) -> list[CassetteConfig]: + return await db.fetchall( + "SELECT * FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid ORDER BY position", + {"mid": machine_id}, + CassetteConfig, + ) + + +async def update_cassette_config( + machine_id: str, + position: int, + data: UpsertCassetteConfigData, + *, + updated_by: str | None = None, +) -> CassetteConfig | None: + """Operator-driven row update: change denomination and/or count for a + single cassette slot. Refuses to create new rows — those only land via + apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row + lifecycle: hardware-determined slot count, not operator-creatable). + Returns None if the (machine_id, position) row doesn't exist. + """ + existing = await get_cassette_config(machine_id, position) + if existing is None: + return None + update_data: dict = {k: v for k, v in data.dict().items() if v is not None} + if not update_data: + return existing + update_data["updated_at"] = datetime.now() + update_data["updated_by"] = updated_by + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) + update_data["mid"] = machine_id + update_data["pos"] = position + await db.execute( + f"UPDATE satoshimachine.cassette_configs SET {set_clause} " + "WHERE machine_id = :mid AND position = :pos", + update_data, + ) + return await get_cassette_config(machine_id, position) + + +async def apply_bootstrap_state( + machine_id: str, + event_id: str, + event_created_at: datetime, + payload: PublishCassettesPayload, +) -> bool: + """Consume an ATM-published kind-30078 bitspire-cassettes-state: event + and upsert one cassette_configs row per position in the payload. + + Returns True if the upsert ran; False if any existing row for this + machine already references this event_id (idempotent on relay + re-delivery / restart). + + Populates both the operator-believed columns (denomination, count, + updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel + columns (state_denomination, state_count, state_at, state_event_id) + so the operator's initial view matches the ATM's reported state. v2 + reconciliation UI will diverge them when continuous reverse-channel + events land + the operator subsequently edits. + """ + existing_first: dict | None = await db.fetchone( + "SELECT state_event_id FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid LIMIT 1", + {"mid": machine_id}, + ) + existing_event_id: str | None = None + if existing_first is not None: + existing_event_id = ( + existing_first.get("state_event_id") + if isinstance(existing_first, dict) + else getattr(existing_first, "state_event_id", None) + ) + if not _should_apply_bootstrap_state(existing_event_id, event_id): + return False + + now = datetime.now() + for pos, row in payload.positions.items(): + await db.execute( + """ + INSERT INTO satoshimachine.cassette_configs + (machine_id, position, denomination, count, updated_at, + updated_by, state_denomination, state_count, state_at, + state_event_id) + VALUES (:mid, :pos, :denom, :count, :now, :by, + :state_denom, :state_count, :state_at, :event_id) + ON CONFLICT (machine_id, position) DO UPDATE SET + denomination = excluded.denomination, + count = excluded.count, + updated_at = excluded.updated_at, + updated_by = excluded.updated_by, + state_denomination = excluded.state_denomination, + state_count = excluded.state_count, + state_at = excluded.state_at, + state_event_id = excluded.state_event_id + """, + { + "mid": machine_id, + "pos": pos, + "denom": row.denomination, + "count": row.count, + "now": now, + "by": "atm-bootstrap", + "state_denom": row.denomination, + "state_count": row.count, + "state_at": event_created_at, + "event_id": event_id, + }, + ) + return True diff --git a/migrations.py b/migrations.py index 38b29d0..e1d957c 100644 --- a/migrations.py +++ b/migrations.py @@ -474,9 +474,7 @@ async def m006_rename_to_canonical_sat_vocabulary(db): ] for table, old_col, new_col in renames: try: - await db.fetchone( - f"SELECT {old_col} FROM satoshimachine.{table} LIMIT 1" - ) + await db.fetchone(f"SELECT {old_col} FROM satoshimachine.{table} LIMIT 1") except Exception: # old column doesn't exist; either rename already landed or # m001 produced the canonical schema directly on fresh install. @@ -496,15 +494,11 @@ async def m006_rename_to_canonical_sat_vocabulary(db): ] for table, col in drops: try: - await db.fetchone( - f"SELECT {col} FROM satoshimachine.{table} LIMIT 1" - ) + await db.fetchone(f"SELECT {col} FROM satoshimachine.{table} LIMIT 1") except Exception: # column doesn't exist; either already dropped or never present. continue - await db.execute( - f"ALTER TABLE satoshimachine.{table} DROP COLUMN {col}" - ) + await db.execute(f"ALTER TABLE satoshimachine.{table} DROP COLUMN {col}") async def m005_lock_deposit_currency_to_machine_fiat_code(db): @@ -538,3 +532,115 @@ async def m005_lock_deposit_currency_to_machine_fiat_code(db): AND m.fiat_code != d.currency ) """) + + +async def m007_add_cassette_configs(db): + """Add cassette_configs table for operator-driven ATM cassette inventory. + + Tracks per-machine cassette state (denomination, count, position) editable + via the satmachineadmin dashboard and published to the ATM as encrypted + kind-30078 events. See aiolabs/satmachineadmin#29 + lamassu-next#56. + + Schema choice: PK (machine_id, denomination) mirrors the ATM-side + denomination-as-key invariant in + bitspire/atm-tui/src/db.zig:31 and + lamassu-next/apps/machine/electron/state-store.ts:54 + (the cassettes table PK is denomination; HAL inventory map keys on + denomination; dispense lookup is cassetteDenominations.indexOf — + duplicates collapse silently). Position is operator-assignable display + order, not the addressable unit. + + Reserved nullable columns (state_count, state_at, state_event_id) hold + the latest bitspire-cassettes-state: event the ATM + publishes (one-shot bootstrap in v1; continuous in v2). v1 UI doesn't + render them; v2 reconciliation UI consumes them without a migration. + """ + await db.execute(f""" + CREATE TABLE IF NOT EXISTS satoshimachine.cassette_configs ( + machine_id TEXT NOT NULL, + denomination INTEGER NOT NULL, + count INTEGER NOT NULL, + position INTEGER NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, + updated_by TEXT, + state_count INTEGER, + state_at TIMESTAMP, + state_event_id TEXT, + PRIMARY KEY (machine_id, denomination) + ); + """) + + +async def m008_flip_cassette_configs_pk_to_position(db): + """Flip cassette_configs PK from (machine_id, denomination) to + (machine_id, position). The denomination-keyed shape from m007 was + wrong: real machines have N cartridges of the same denomination + (cash-out throughput requires multiple bays for one denom), and the + operator needs to swap cartridge denominations during refill ($20 + bay becomes $50 bay) without a re-provisioning event. + + Coordinated v1.1 fix with the ATM side per the 2026-05-30T18:30Z + + 18:45Z log entries: + - Wire shape flips from {denominations: {: {position, count}}} + to {positions: {

: {denomination, count}}} + - Position becomes the fixed row identity (hardware bay number); + denomination + count are operator-editable per row + - NO unique constraint on denomination (multiple same-denom cassettes + are operationally valid) + + Also adds `state_denomination` nullable column reserved for v2 + reverse-channel reconciliation (operator-believed denomination per + slot vs ATM-reported denomination — diff highlighting in v2 UI). + + SQLite doesn't support ALTER PRIMARY KEY directly; the migration + does the standard create-copy-drop-rename dance. Idempotent via the + column-probe trick used elsewhere in this file. + """ + try: + # Probe: does the old PK shape still exist? If state_denomination + # column already exists, m008 already ran — no-op. + await db.fetchone( + "SELECT state_denomination FROM satoshimachine.cassette_configs " "LIMIT 1" + ) + return + except Exception: + pass + + await db.execute(f""" + CREATE TABLE IF NOT EXISTS satoshimachine.cassette_configs_new ( + machine_id TEXT NOT NULL, + position INTEGER NOT NULL, + denomination INTEGER NOT NULL, + count INTEGER NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, + updated_by TEXT, + state_denomination INTEGER, + state_count INTEGER, + state_at TIMESTAMP, + state_event_id TEXT, + PRIMARY KEY (machine_id, position) + ); + """) + + # Backfill from the old table — column-by-column copy. In the v1 + # m007 schema the row's `denomination` was simultaneously the + # operator-believed denomination AND the ATM-reported denomination + # (because the only write path was the bootstrap consumer copying + # from the ATM's state.db). So state_denomination at migration time + # = current denomination as a best-guess baseline; the next bootstrap + # event re-populates the state_* columns authoritatively. + await db.execute(""" + INSERT INTO satoshimachine.cassette_configs_new + (machine_id, position, denomination, count, + updated_at, updated_by, + state_denomination, state_count, state_at, state_event_id) + SELECT machine_id, position, denomination, count, + updated_at, updated_by, + denomination, state_count, state_at, state_event_id + FROM satoshimachine.cassette_configs + """) + + await db.execute("DROP TABLE satoshimachine.cassette_configs") + await db.execute( + "ALTER TABLE satoshimachine.cassette_configs_new " "RENAME TO cassette_configs" + ) diff --git a/models.py b/models.py index d683cac..6a18815 100644 --- a/models.py +++ b/models.py @@ -6,7 +6,6 @@ # the plan at ~/.claude/plans/snug-gliding-shamir.md. from datetime import datetime -from typing import Optional from pydantic import BaseModel, validator @@ -26,8 +25,8 @@ class CreateMachineData(BaseModel): machine_npub: str wallet_id: str - name: Optional[str] = None - location: Optional[str] = None + name: str | None = None + location: str | None = None fiat_code: str = "GTQ" @@ -36,8 +35,8 @@ class Machine(BaseModel): operator_user_id: str machine_npub: str wallet_id: str - name: Optional[str] - location: Optional[str] + name: str | None + location: str | None fiat_code: str is_active: bool created_at: datetime @@ -45,11 +44,11 @@ class Machine(BaseModel): class UpdateMachineData(BaseModel): - name: Optional[str] = None - location: Optional[str] = None - fiat_code: Optional[str] = None - is_active: Optional[bool] = None - wallet_id: Optional[str] = None + name: str | None = None + location: str | None = None + fiat_code: str | None = None + is_active: bool | None = None + wallet_id: str | None = None # ============================================================================= @@ -69,14 +68,14 @@ class CreateDcaClientData(BaseModel): machine_id: str user_id: str - username: Optional[str] = None + username: str | None = None class DcaClient(BaseModel): id: str machine_id: str user_id: str - username: Optional[str] + username: str | None status: str created_at: datetime updated_at: datetime @@ -92,8 +91,8 @@ class UpdateDcaClientData(BaseModel): / mode / autoforward changes go through satmachineclient against `dca_lp` instead.""" - username: Optional[str] = None - status: Optional[str] = None + username: str | None = None + status: str | None = None class DcaLpPreferences(BaseModel): @@ -109,8 +108,8 @@ class DcaLpPreferences(BaseModel): user_id: str dca_wallet_id: str default_dca_mode: str # 'flow' | 'fixed' - fixed_mode_daily_limit: Optional[float] - autoforward_ln_address: Optional[str] + fixed_mode_daily_limit: float | None + autoforward_ln_address: str | None autoforward_enabled: bool created_at: datetime updated_at: datetime @@ -121,11 +120,11 @@ class UpsertDcaLpData(BaseModel): edits their preferences. All fields optional on update — pass only the ones being changed.""" - dca_wallet_id: Optional[str] = None - default_dca_mode: Optional[str] = None - fixed_mode_daily_limit: Optional[float] = None - autoforward_ln_address: Optional[str] = None - autoforward_enabled: Optional[bool] = None + dca_wallet_id: str | None = None + default_dca_mode: str | None = None + fixed_mode_daily_limit: float | None = None + autoforward_ln_address: str | None = None + autoforward_enabled: bool | None = None class ClientBalanceSummary(BaseModel): @@ -156,7 +155,7 @@ class CreateDepositData(BaseModel): client_id: str machine_id: str amount: float - notes: Optional[str] = None + notes: str | None = None @validator("amount") def round_amount(cls, v): @@ -173,9 +172,9 @@ class DcaDeposit(BaseModel): amount: float currency: str status: str # 'pending' | 'confirmed' | 'rejected' - notes: Optional[str] + notes: str | None created_at: datetime - confirmed_at: Optional[datetime] + confirmed_at: datetime | None class UpdateDepositData(BaseModel): @@ -183,8 +182,8 @@ class UpdateDepositData(BaseModel): `CreateDepositData`; the currency is bound to the machine and not editable after the row lands.""" - amount: Optional[float] = None - notes: Optional[str] = None + amount: float | None = None + notes: str | None = None @validator("amount") def round_amount(cls, v): @@ -195,7 +194,7 @@ class UpdateDepositData(BaseModel): class UpdateDepositStatusData(BaseModel): status: str # 'pending' | 'confirmed' | 'rejected' - notes: Optional[str] = None + notes: str | None = None # ============================================================================= @@ -210,8 +209,8 @@ class UpdateDepositStatusData(BaseModel): class CreateDcaSettlementData(BaseModel): machine_id: str payment_hash: str # the idempotency key (UNIQUE in the dca_settlements table) - bitspire_event_id: Optional[str] = None # reserved for direct-Nostr ingestion - bitspire_txid: Optional[str] = None + bitspire_event_id: str | None = None # reserved for direct-Nostr ingestion + bitspire_txid: str | None = None wire_sats: int fiat_amount: float fiat_code: str = "GTQ" @@ -221,16 +220,16 @@ class CreateDcaSettlementData(BaseModel): platform_fee_sats: int operator_fee_sats: int tx_type: str # 'cash_out' | 'cash_in' - bills_json: Optional[str] = None - cassettes_json: Optional[str] = None + bills_json: str | None = None + cassettes_json: str | None = None class DcaSettlement(BaseModel): id: str machine_id: str payment_hash: str - bitspire_event_id: Optional[str] - bitspire_txid: Optional[str] + bitspire_event_id: str | None + bitspire_txid: str | None wire_sats: int fiat_amount: float fiat_code: str @@ -240,8 +239,8 @@ class DcaSettlement(BaseModel): platform_fee_sats: int operator_fee_sats: int tx_type: str - bills_json: Optional[str] - cassettes_json: Optional[str] + bills_json: str | None + cassettes_json: str | None # 'pending' (default at insert) # 'processing' (claim taken by distribution processor) # 'processed' (all legs paid) @@ -252,19 +251,19 @@ class DcaSettlement(BaseModel): # never went near distribution. error_message holds the # reason. Retry is wrong — investigate the machine.) status: str - error_message: Optional[str] - processed_at: Optional[datetime] + error_message: str | None + processed_at: datetime | None created_at: datetime # Append-only audit memo. Populated when an operator triggers an in-place # adjustment (partial-dispense, manual reconciliation override). Each # entry timestamped + records original values so the overwrite is # auditable from the settlement detail view alone. Never edited in place. - notes: Optional[str] = None + notes: str | None = None # Optimistic-lock claim token written when status flips to 'processing'. # Two concurrent process_settlement invocations can't both win the claim # (only one matching read-back). Cleared back to NULL when the leg- # writing pass completes (status='processed' or 'errored'). - processing_claim: Optional[str] = None + processing_claim: str | None = None # ============================================================================= @@ -286,7 +285,7 @@ class CommissionSplitLeg(BaseModel): """ target: str - label: Optional[str] = None + label: str | None = None fraction: float sort_order: int = 0 @@ -306,10 +305,10 @@ class CommissionSplitLeg(BaseModel): class CommissionSplit(BaseModel): id: str - machine_id: Optional[str] # None = operator's default ruleset + machine_id: str | None # None = operator's default ruleset operator_user_id: str target: str - label: Optional[str] + label: str | None fraction: float sort_order: int created_at: datetime @@ -322,7 +321,7 @@ class SetCommissionSplitsData(BaseModel): machine without an explicit override). Otherwise scoped per machine. """ - machine_id: Optional[str] = None + machine_id: str | None = None legs: list[CommissionSplitLeg] @validator("legs") @@ -339,35 +338,35 @@ class SetCommissionSplitsData(BaseModel): class CreateDcaPaymentData(BaseModel): - settlement_id: Optional[str] = None - client_id: Optional[str] = None + settlement_id: str | None = None + client_id: str | None = None machine_id: str operator_user_id: str leg_type: str # 'dca' | 'super_fee' | 'operator_split' | 'settlement' | 'autoforward' | 'refund' - destination_wallet_id: Optional[str] = None - destination_ln_address: Optional[str] = None + destination_wallet_id: str | None = None + destination_ln_address: str | None = None amount_sats: int - amount_fiat: Optional[float] = None - exchange_rate: Optional[float] = None + amount_fiat: float | None = None + exchange_rate: float | None = None transaction_time: datetime - external_payment_hash: Optional[str] = None + external_payment_hash: str | None = None class DcaPayment(BaseModel): id: str - settlement_id: Optional[str] - client_id: Optional[str] + settlement_id: str | None + client_id: str | None machine_id: str operator_user_id: str leg_type: str - destination_wallet_id: Optional[str] - destination_ln_address: Optional[str] + destination_wallet_id: str | None + destination_ln_address: str | None amount_sats: int - amount_fiat: Optional[float] - exchange_rate: Optional[float] + amount_fiat: float | None + exchange_rate: float | None transaction_time: datetime - external_payment_hash: Optional[str] + external_payment_hash: str | None status: str # Leg status enum: # 'pending' — row written, payment not yet attempted @@ -378,7 +377,7 @@ class DcaPayment(BaseModel): # 'skipped' — intentionally not paid (no super wallet configured, # no commission ruleset, no exchange rate, no LPs) # 'refunded' — reserved for future refund flows - error_message: Optional[str] + error_message: str | None created_at: datetime @@ -391,22 +390,22 @@ class TelemetrySnapshot(BaseModel): machine_id: str # Beacon (kind-30078) — all fields are nullable because the upstream payload # is sparse today. As lamassu-next#43 lands, the post-#43 columns fill in. - beacon_cash_in: Optional[bool] = None - beacon_cash_out: Optional[bool] = None - beacon_cash_level: Optional[str] = None - beacon_fiat: Optional[str] = None - beacon_model: Optional[str] = None - beacon_name: Optional[str] = None - beacon_location: Optional[str] = None - beacon_geo: Optional[str] = None - beacon_fees_json: Optional[str] = None - beacon_limits_json: Optional[str] = None - beacon_denominations_json: Optional[str] = None - beacon_version: Optional[str] = None - beacon_received_at: Optional[datetime] = None + beacon_cash_in: bool | None = None + beacon_cash_out: bool | None = None + beacon_cash_level: str | None = None + beacon_fiat: str | None = None + beacon_model: str | None = None + beacon_name: str | None = None + beacon_location: str | None = None + beacon_geo: str | None = None + beacon_fees_json: str | None = None + beacon_limits_json: str | None = None + beacon_denominations_json: str | None = None + beacon_version: str | None = None + beacon_received_at: datetime | None = None # Fleet telemetry (kind-30079) — operator-only, awaits lamassu-next#42. - telemetry_json: Optional[str] = None - telemetry_received_at: Optional[datetime] = None + telemetry_json: str | None = None + telemetry_received_at: datetime | None = None # ============================================================================= @@ -417,13 +416,13 @@ class TelemetrySnapshot(BaseModel): class SuperConfig(BaseModel): id: str super_fee_fraction: float - super_fee_wallet_id: Optional[str] + super_fee_wallet_id: str | None updated_at: datetime class UpdateSuperConfigData(BaseModel): - super_fee_fraction: Optional[float] = None - super_fee_wallet_id: Optional[str] = None + super_fee_fraction: float | None = None + super_fee_wallet_id: str | None = None @validator("super_fee_fraction") def fee_in_unit_range(cls, v): @@ -448,9 +447,9 @@ class PartialDispenseData(BaseModel): """ settlement_id: str - dispensed_fraction: Optional[float] = None - dispensed_sats: Optional[int] = None - notes: Optional[str] = None + dispensed_fraction: float | None = None + dispensed_sats: int | None = None + notes: str | None = None @validator("dispensed_fraction") def fraction_in_unit_range(cls, v): @@ -530,8 +529,8 @@ class SettleBalanceData(BaseModel): # there's no ambiguity about what rate was used. exchange_rate: float # If None, settle the LP's full remaining balance. Else partial. - amount_fiat: Optional[float] = None - notes: Optional[str] = None + amount_fiat: float | None = None + notes: str | None = None @validator("exchange_rate") def positive_rate(cls, v): @@ -546,3 +545,140 @@ class SettleBalanceData(BaseModel): if v <= 0: raise ValueError("amount_fiat must be > 0 if specified") return round(float(v), 2) + + +# ============================================================================= +# Cassette configs — operator-driven ATM cassette inventory (#29 v1.1). +# ============================================================================= +# Schema is position-keyed per the coordinated v1.1 redesign at coord-log +# 2026-05-30T18:30Z + 18:45Z. The earlier denomination-keyed shape (m007) +# was wrong: real machines have N cassettes of the same denomination for +# cash-out throughput, and operators need to swap cartridge denominations +# during refill ($20 bay becomes a $50 bay) without re-provisioning. +# +# Wire shape: +# {"positions": {"": {"denomination": N, "count": M}}} +# +# Editable surface per row: +# - denomination: yes (operator swaps cartridges during refill) +# - count: yes (refill / decrement) +# Read-only per row: +# - position: hardware bay number; the slot count is fixed by the +# dispenser model (e.g., Tejo has 4 positions). +# +# No "denomination must be unique within payload" constraint: multiple +# same-denom cassettes are operationally valid. The ATM HAL distributes +# a dispense request greedy across all positions matching the requested +# denomination (lamassu-next#56 v1.1 HAL refactor). +# +# state_* columns are reserved nullable for the v2 reverse-channel +# reconciliation consumer (bitspire-cassettes-state:). +# v1 populates them on bootstrap-event receipt but the UI doesn't render +# reconciliation. state_denomination (added in m008) lets v2 highlight +# operator-believed-vs-ATM-reported denomination drift per slot. + + +class CassetteConfig(BaseModel): + machine_id: str + position: int + denomination: int + count: int + updated_at: datetime + updated_by: str | None + state_denomination: int | None + state_count: int | None + state_at: datetime | None + state_event_id: str | None + + +class UpsertCassetteConfigData(BaseModel): + """Operator edits a single cassette row's denomination or count from + the dashboard. Both fields optional; pass only those changed. + Position is not edited — it's the row's identity (hardware bay).""" + + denomination: int | None = None + count: int | None = None + + @validator("denomination") + def denomination_positive(cls, v): + if v is None: + return v + if v <= 0: + raise ValueError("denomination must be > 0") + return v + + @validator("count") + def count_non_negative(cls, v): + if v is None: + return v + if v < 0: + raise ValueError("count must be >= 0") + return v + + +class CassettePayloadRow(BaseModel): + """One position's payload values in the wire-format + `{"positions": {"": {"denomination", "count"}}}`.""" + + denomination: int + count: int + + @validator("denomination") + def denomination_positive(cls, v): + if v <= 0: + raise ValueError("denomination must be > 0") + return v + + @validator("count") + def count_non_negative(cls, v): + if v < 0: + raise ValueError("count must be >= 0") + return v + + +class PublishCassettesPayload(BaseModel): + """The decrypted JSON content of a kind-30078 cassette event, both + directions: + - operator → ATM (d-tag `bitspire-cassettes:`) + - ATM → operator (d-tag `bitspire-cassettes-state:`) + + Wire shape: `{"positions": {"": {"denomination", "count"}}}`. + JSON object keys are always strings; the validator coerces back to + int on parse. The position key set MUST match what the receiver + already has (slot count is hardware-fixed; no add/remove from this + payload). + + No denomination-unique constraint: multiple same-denom cassettes are + operationally valid (cash-out throughput on a popular denom). + """ + + positions: dict[int, CassettePayloadRow] + + @validator("positions", pre=True) + def coerce_string_keys_to_int(cls, v): + if not isinstance(v, dict): + raise ValueError("positions must be a dict") + out = {} + for k, val in v.items(): + try: + key_int = int(k) + except (TypeError, ValueError) as exc: + raise ValueError(f"position key {k!r} is not an int") from exc + if key_int <= 0: + raise ValueError(f"position must be > 0 (got {key_int})") + out[key_int] = val + return out + + def to_wire_dict(self) -> dict: + """Serialise back to the wire format with string keys for JSON + object compatibility. Used by the publisher to build the kind-30078 + event content before NIP-44 v2 encryption.""" + return { + "positions": { + str(pos): { + "denomination": row.denomination, + "count": row.count, + } + for pos, row in self.positions.items() + } + } diff --git a/nip44.py b/nip44.py new file mode 100644 index 0000000..109860d --- /dev/null +++ b/nip44.py @@ -0,0 +1,294 @@ +""" +NIP-44 v2 — versioned encrypted payloads (https://github.com/nostr-protocol/nips/blob/master/44.md). + +Hand-rolled because lnbits historically shipped only NIP-04 (AES-CBC) in +`lnbits.utils.nostr.encrypt_content`, and the locked design at +aiolabs/satmachineadmin#29 (paired with lamassu-next#56) wires cassette config +over kind-30078 with NIP-44 v2 encrypted content. + +## Runtime status (post lnbits PR #38, 2026-05-31) + +**Runtime usage has migrated to the signer abstraction** via +`signer.nip44_encrypt` / `signer.nip44_decrypt` on `lnbits.core.signers.base. +NostrSigner`. For RemoteBunkerSigner-backed accounts the bunker performs the +crypto and the operator's nsec never leaves the bunker process; for the +transitional LocalSigner path `cassette_transport._nip44_*_via_signer` falls +back to the helpers in this module against the stored `account.prvkey`. + +This module's runtime export footprint is therefore: + - `encrypt_for` / `decrypt_from` — called by the LocalSigner fallback in + `cassette_transport` until every operator on the instance is bunker-backed + (S7 / aiolabs/satmachineadmin#21). Then those calls disappear too. + - Everything else (encrypt_with_conversation_key, decrypt_with_conversation_key, + get_conversation_key, padding helpers, error classes) is **test-only**: + referenced by `tests/test_nip44_v2.py` to validate the wire format against + the canonical paulmillr/nip44 reference vectors and the bitspire cross-test + fixture posted to the coordination log. + +Don't add new runtime call sites here. The signer abstraction is the path. + +Two safety nets keep the impl honest: + 1. tests/test_nip44_v2.py runs reference vectors + round-trip + tamper-detection. + 2. bitspire posts a sample event encrypted on their nostr-tools side to the + coord log; test_decrypts_bitspire_sample_event cross-checks our impl + against theirs by decrypting that event with a known privkey. + +Wire format (per spec): + payload = base64( 0x02 || nonce (32B) || ciphertext (var) || mac (32B) ) + +Key derivation: + conversation_key = HKDF-extract(salt=b"nip44-v2", IKM=ecdh_shared_x) # 32B PRK, stable per pair + per-message: + nonce = csprng(32 bytes) + temp = HKDF-expand(PRK=conversation_key, info=nonce, L=76) + chacha_key = temp[0:32] + chacha_nonce = temp[32:44] + hmac_key = temp[44:76] + +Padding scheme (NIP-44 v2 length-prefixed, variable-chunk): + padded = uint16_be(len(plaintext)) || plaintext || zeros + such that 2 + padded_data_len matches a fixed step. +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac as hmac_stdlib +import os +import struct + +import coincurve +from cryptography.hazmat.primitives import hashes, hmac +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms +from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand + +# Spec constants. +_VERSION = 0x02 +_HKDF_SALT = b"nip44-v2" +_MIN_PLAINTEXT_LEN = 1 +_MAX_PLAINTEXT_LEN = 65535 +_NONCE_LEN = 32 +_MAC_LEN = 32 +_MIN_PAYLOAD_LEN = ( + 1 + _NONCE_LEN + (2 + 32) + _MAC_LEN +) # version + nonce + min padded + mac +_MAX_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 65536) + _MAC_LEN + + +class Nip44Error(Exception): + """Generic NIP-44 v2 envelope error. Subclasses distinguish failure modes.""" + + +class Nip44VersionError(Nip44Error): + """First payload byte was not 0x02. Could be a NIP-04 envelope, a v1 NIP-44, or garbage.""" + + +class Nip44MacError(Nip44Error): + """HMAC verification failed — payload was tampered, wrong conversation key, or corrupted in transit.""" + + +class Nip44LengthError(Nip44Error): + """Plaintext or payload length outside the spec-allowed range, or padding header lies.""" + + +# ============================================================================= +# Padding (NIP-44 v2) +# ============================================================================= + + +def _calc_padded_len(plaintext_len: int) -> int: + """Per NIP-44 v2 padding scheme: + if L <= 32: padded_len = 32 + else: chunk = max(32, next_power_2(L-1) // 8); padded_len = chunk * ((L-1) // chunk + 1) + """ + if plaintext_len <= 32: + return 32 + next_power = 1 << (plaintext_len - 1).bit_length() + chunk = max(32, next_power // 8) + return chunk * ((plaintext_len - 1) // chunk + 1) + + +def _pad(plaintext: bytes) -> bytes: + """Prefix uint16_be length + plaintext + zero-fill to the NIP-44 v2 boundary.""" + n = len(plaintext) + if n < _MIN_PLAINTEXT_LEN or n > _MAX_PLAINTEXT_LEN: + raise Nip44LengthError( + f"plaintext length {n} outside [{_MIN_PLAINTEXT_LEN}, {_MAX_PLAINTEXT_LEN}]" + ) + padded_data_len = _calc_padded_len(n) + zeros = b"\x00" * (padded_data_len - n) + return struct.pack(">H", n) + plaintext + zeros + + +def _unpad(padded: bytes) -> bytes: + """Strip the uint16_be length prefix and zero padding. Validates that the + declared length is consistent with the padded payload (rejects a forged + length prefix that would slice past the buffer or imply a different + padded_data_len than what we received).""" + if len(padded) < 2: + raise Nip44LengthError("padded payload too short to hold length prefix") + declared_len = struct.unpack(">H", padded[0:2])[0] + if declared_len < _MIN_PLAINTEXT_LEN or declared_len > _MAX_PLAINTEXT_LEN: + raise Nip44LengthError(f"declared plaintext length {declared_len} out of range") + if len(padded) != 2 + _calc_padded_len(declared_len): + raise Nip44LengthError( + f"padded buffer length {len(padded)} doesn't match the calculated padding " + f"for declared length {declared_len}" + ) + return padded[2 : 2 + declared_len] + + +# ============================================================================= +# Conversation + message-key derivation +# ============================================================================= + + +def get_conversation_key(privkey_hex: str, pubkey_hex: str) -> bytes: + """Derive the per-pair stable conversation key (PRK) used for all messages + between sender (privkey) and recipient (pubkey). + + Steps: + shared_x = ECDH(privkey, pubkey).x # 32 bytes, x-coordinate + prk = HKDF-extract(salt=b"nip44-v2", IKM=shared_x) + + coincurve's `.multiply(secret).format(compressed=True)[1:]` strips the + leading 0x02/0x03 parity byte to return the raw x-coord — same trick + `lnbits.utils.nostr.encrypt_content` uses for NIP-04. + """ + sender = coincurve.PrivateKey(bytes.fromhex(privkey_hex)) + recipient_pub = coincurve.PublicKey(b"\x02" + bytes.fromhex(pubkey_hex)) + shared_x = recipient_pub.multiply(sender.secret).format(compressed=True)[1:] + # HKDF-extract is HMAC-SHA256(key=salt, msg=ikm) per RFC 5869. + return hmac_stdlib.new(_HKDF_SALT, shared_x, hashlib.sha256).digest() + + +def _derive_message_keys( + conversation_key: bytes, nonce: bytes +) -> tuple[bytes, bytes, bytes]: + """Per-message key expansion: HKDF-expand(PRK=conversation_key, info=nonce, L=76). + Returns (chacha_key 32B, chacha_nonce 12B, hmac_key 32B).""" + hkdf = HKDFExpand(algorithm=hashes.SHA256(), length=76, info=nonce) + okm = hkdf.derive(conversation_key) + return okm[0:32], okm[32:44], okm[44:76] + + +def _hmac_aad(hmac_key: bytes, nonce: bytes, ciphertext: bytes) -> bytes: + """HMAC-SHA256(key=hmac_key, msg=nonce || ciphertext). Returns 32-byte MAC.""" + h = hmac.HMAC(hmac_key, hashes.SHA256()) + h.update(nonce) + h.update(ciphertext) + return h.finalize() + + +def _chacha20(key: bytes, nonce: bytes, data: bytes) -> bytes: + """ChaCha20 stream cipher (symmetric: encrypt == decrypt). Used both directions. + + The `cryptography` lib's `algorithms.ChaCha20(key, nonce)` expects a + 16-byte nonce arg: a 4-byte little-endian initial counter prefix + + 12-byte actual nonce. NIP-44 v2 starts the counter at 0 and uses the + HKDF-derived 12-byte chacha_nonce, so we prefix four zero bytes here. + """ + if len(nonce) != 12: + raise Nip44LengthError( + f"chacha_nonce must be 12 bytes (NIP-44 v2), got {len(nonce)}" + ) + cipher = Cipher(algorithms.ChaCha20(key, b"\x00\x00\x00\x00" + nonce), mode=None) + return cipher.encryptor().update(data) + + +# ============================================================================= +# Public API — low-level (nonce-controllable for testability) +# ============================================================================= + + +def encrypt_with_conversation_key( + plaintext: str, + conversation_key: bytes, + *, + nonce: bytes | None = None, +) -> str: + """Encrypt `plaintext` under a precomputed `conversation_key` (32B PRK). + + `nonce` is 32 random bytes when omitted (the production path). Tests pass + it explicitly to assert pinned reference vectors. + + Returns the base64-encoded payload string suitable as a Nostr event's + `content` field for kind-30078 (and any other kind that uses NIP-44 v2). + """ + if nonce is None: + nonce = os.urandom(_NONCE_LEN) + elif len(nonce) != _NONCE_LEN: + raise Nip44LengthError(f"nonce must be exactly {_NONCE_LEN} bytes") + + padded = _pad(plaintext.encode("utf-8")) + chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce) + ciphertext = _chacha20(chacha_key, chacha_nonce, padded) + mac = _hmac_aad(hmac_key, nonce, ciphertext) + return base64.b64encode(bytes([_VERSION]) + nonce + ciphertext + mac).decode( + "ascii" + ) + + +def decrypt_with_conversation_key(payload_b64: str, conversation_key: bytes) -> str: + """Decrypt a NIP-44 v2 payload using a precomputed `conversation_key`. + + Raises: + Nip44VersionError — payload's first byte isn't 0x02 + Nip44LengthError — payload too short / too long / declared length lies + Nip44MacError — HMAC verification failed (tamper, wrong key, corruption) + """ + try: + raw = base64.b64decode(payload_b64, validate=True) + except ( + Exception + ) as exc: + raise Nip44LengthError(f"payload is not valid base64: {exc}") from exc + + if len(raw) < _MIN_PAYLOAD_LEN or len(raw) > _MAX_PAYLOAD_LEN: + raise Nip44LengthError(f"payload length {len(raw)} outside valid range") + if raw[0] != _VERSION: + raise Nip44VersionError(f"unsupported NIP-44 version: 0x{raw[0]:02x}") + + nonce = raw[1 : 1 + _NONCE_LEN] + mac_received = raw[-_MAC_LEN:] + ciphertext = raw[1 + _NONCE_LEN : -_MAC_LEN] + + chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce) + mac_expected = _hmac_aad(hmac_key, nonce, ciphertext) + # constant-time compare to avoid timing-leak in MAC verification + if not hmac_stdlib.compare_digest(mac_received, mac_expected): + raise Nip44MacError("HMAC verification failed") + + padded = _chacha20(chacha_key, chacha_nonce, ciphertext) + plaintext_bytes = _unpad(padded) + return plaintext_bytes.decode("utf-8") + + +# ============================================================================= +# Public API — high-level (pair-keyed, the call shape app code reaches for) +# ============================================================================= + + +def encrypt_for( + plaintext: str, + sender_privkey_hex: str, + recipient_pubkey_hex: str, + *, + nonce: bytes | None = None, +) -> str: + """Encrypt `plaintext` from the sender (holding the privkey) to the recipient + (identified by pubkey). The recipient can decrypt with `decrypt_from( + payload, recipient_privkey_hex, sender_pubkey_hex)` — symmetric on the + conversation key, which is the same derived value from either side.""" + conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex) + return encrypt_with_conversation_key(plaintext, conversation_key, nonce=nonce) + + +def decrypt_from( + payload_b64: str, recipient_privkey_hex: str, sender_pubkey_hex: str +) -> str: + """Decrypt a payload that the recipient (holding the privkey) received from + the sender (identified by pubkey).""" + conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex) + return decrypt_with_conversation_key(payload_b64, conversation_key) diff --git a/static/js/index.js b/static/js/index.js index cdb493f..96b98ef 100644 --- a/static/js/index.js +++ b/static/js/index.js @@ -191,7 +191,30 @@ window.app = Vue.createApp({ show: false, loading: false, machine: null, - settlements: [] + settlements: [], + // Cassettes sub-tab state (#29 v1) — see openCassettePublishConfirm / + // submitCassettePublish methods + the cassettes panel in + // templates/satmachineadmin/index.html. + activeTab: 'settlements', + cassetteEdits: [], // editable working copy of cassette_configs rows + cassettesPristine: [], // last-known-clean snapshot for revert + cassettesLoading: false, + cassettesPublishing: false, + cassettesDirty: false, + cassettesError: null + }, + cassettesTable: { + columns: [ + {name: 'position', label: 'Bay', field: 'position', align: 'right'}, + {name: 'denomination', label: 'Denomination', field: 'denomination', align: 'right'}, + {name: 'count', label: 'Count', field: 'count', align: 'right'}, + {name: 'state', label: 'ATM-reported', field: 'state_denomination', align: 'right'}, + {name: 'updated_at', label: 'Updated', field: 'updated_at', align: 'left'} + ], + pagination: {rowsPerPage: 0} // hide pagination — cassette count is small + }, + cassettePublishConfirm: { + show: false }, partialDispenseDialog: { show: false, @@ -741,6 +764,11 @@ window.app = Vue.createApp({ async viewMachine(machine) { this.machineDetail.machine = machine this.machineDetail.settlements = [] + this.machineDetail.cassetteEdits = [] + this.machineDetail.cassettesPristine = [] + this.machineDetail.cassettesDirty = false + this.machineDetail.cassettesError = null + this.machineDetail.activeTab = 'settlements' this.machineDetail.show = true await this.reloadMachineDetail() }, @@ -759,6 +787,102 @@ window.app = Vue.createApp({ } finally { this.machineDetail.loading = false } + // Cassettes load in parallel; UI only renders them when the tab + // is active, but pre-loading means no flicker on tab switch. + await this.loadMachineCassettes() + }, + + // ----------------------------------------------------------------- + // Cassette inventory (#29 v1) + // ----------------------------------------------------------------- + async loadMachineCassettes() { + if (!this.machineDetail.machine) return + this.machineDetail.cassettesLoading = true + this.machineDetail.cassettesError = null + try { + const {data} = await LNbits.api.request( + 'GET', + `${MACHINES_PATH}/${this.machineDetail.machine.id}/cassettes` + ) + const rows = (data || []).map(row => ({...row, _dirty: false})) + this.machineDetail.cassetteEdits = rows + this.machineDetail.cassettesPristine = JSON.parse(JSON.stringify(rows)) + this.machineDetail.cassettesDirty = false + } catch (e) { + this._notifyError(e, 'Failed to load cassettes') + } finally { + this.machineDetail.cassettesLoading = false + } + }, + + markCassetteDirty(row) { + // Find pristine match by position (the row identity) and compare; + // flip _dirty + overall dirty flag accordingly. Editable fields + // are denomination + count; position is the immutable row key. + const pristine = this.machineDetail.cassettesPristine.find( + p => p.position === row.position + ) + row._dirty = + !pristine || + Number(row.denomination) !== Number(pristine.denomination) || + Number(row.count) !== Number(pristine.count) + this.machineDetail.cassettesDirty = + this.machineDetail.cassetteEdits.some(r => r._dirty) + }, + + revertCassetteEdits() { + this.machineDetail.cassetteEdits = JSON.parse( + JSON.stringify(this.machineDetail.cassettesPristine) + ) + this.machineDetail.cassettesDirty = false + this.machineDetail.cassettesError = null + }, + + openCassettePublishConfirm() { + if (!this.machineDetail.cassettesDirty) return + this.machineDetail.cassettesError = null + this.cassettePublishConfirm.show = true + }, + + async submitCassettePublish() { + // Build the PublishCassettesPayload shape (v1.1, position-keyed): + // { positions: { "": { denomination, count }, ... } } + // The API enforces the position set matches what's stored — + // since we only edit existing rows, this should always pass. + const positions = {} + for (const row of this.machineDetail.cassetteEdits) { + positions[String(row.position)] = { + denomination: Number(row.denomination), + count: Number(row.count) + } + } + const payload = {positions} + this.machineDetail.cassettesPublishing = true + try { + const {data} = await LNbits.api.request( + 'POST', + `${MACHINES_PATH}/${this.machineDetail.machine.id}/cassettes/publish`, + null, + payload + ) + const fresh = (data || []).map(r => ({...r, _dirty: false})) + this.machineDetail.cassetteEdits = fresh + this.machineDetail.cassettesPristine = JSON.parse(JSON.stringify(fresh)) + this.machineDetail.cassettesDirty = false + this.cassettePublishConfirm.show = false + Quasar.Notify.create({ + type: 'positive', + message: 'Cassette config published to ATM' + }) + } catch (e) { + const detail = + (e && e.response && e.response.data && e.response.data.detail) || + 'Publish failed' + this.machineDetail.cassettesError = detail + this._notifyError(e, 'Publish failed') + } finally { + this.machineDetail.cassettesPublishing = false + } }, settlementStatusColor(status) { diff --git a/tasks.py b/tasks.py index 7d77f0e..7f2a276 100644 --- a/tasks.py +++ b/tasks.py @@ -125,9 +125,7 @@ async def _handle_payment(payment: Payment) -> None: # stamp is missing, SettlementInvariantError on any range/sum # breach. super_config = await get_super_config() - super_fee_fraction = ( - float(super_config.super_fee_fraction) if super_config else 0.0 - ) + super_fee_fraction = float(super_config.super_fee_fraction) if super_config else 0.0 try: data = parse_settlement( machine=machine, @@ -194,9 +192,7 @@ async def _handle_payment(payment: Payment) -> None: task.add_done_callback(_inflight_distributions.discard) -async def _record_rejected( - payment: Payment, machine: Machine, exc: Exception -) -> None: +async def _record_rejected(payment: Payment, machine: Machine, exc: Exception) -> None: """Insert a minimal `dca_settlements` row with `status='rejected'` and the exception message for operator forensics. @@ -237,3 +233,258 @@ async def _record_rejected( f"(machine={machine.machine_npub[:12]}..., " f"payment_hash={payment.payment_hash[:12]}...): {exc}" ) + + +# ============================================================================= +# Cassette bootstrap consumer (#29 v1) +# ============================================================================= +# Subscribes to kind-30078 bitspire-cassettes-state: events +# published by each active machine's ATM on first boot (lamassu-next#56's +# bootstrap publish path). Decrypts the NIP-44 v2 content with the operator's +# privkey + ATM sender pubkey, validates as PublishCassettesPayload, and +# upserts cassette_configs via apply_bootstrap_state. +# +# v1 = one-shot per machine (ATM's meta.bootstrapPublishedAt makes the +# publish idempotent on ATM-side restart; satmachineadmin's apply_bootstrap_ +# state dedups on state_event_id for relay re-delivery). +# +# v2 (separate issue) = continuous reverse-channel consumer with a +# last_state_created_at watermark for reconciliation UI. +# +# Implementation: polls nostrclient.router.NostrRouter.received_subscription_ +# events keyed by our subscription_id. nostrclient's NostrRouter design is +# per-WebSocket-client; the singleton dict it drains into is the only +# server-side hook to consume events without standing up an in-process +# websocket. The relay manager is the same singleton publish_to_atm uses, +# so add_subscription registers a filter against the same relay pool. + +CASSETTE_BOOTSTRAP_SUB_ID = "satmachineadmin-cassette-bootstrap" +_CASSETTE_POLL_INTERVAL_S = 2.0 +_CASSETTE_BACKOFF_S = 30.0 # when nostrclient isn't installed yet + + +async def wait_for_cassette_state_events() -> None: + """Long-running task: subscribe to bitspire-cassettes-state events from + every active machine's ATM and upsert cassette_configs on receipt. + + Pattern mirrors wait_for_paid_invoices (try/except wraps each event, + never lets the loop die). Re-derives the subscription filter on each + tick from the current active-machines list — newly-added machines + start receiving bootstrap events without an LNbits restart. + + Soft-fail surfaces: + - nostrclient not installed → log + sleep _CASSETTE_BACKOFF_S + between retries (operator may install it later) + - inbound event fails sig-verify / decrypt / parse → log + skip + the event, continue the loop + - apply_bootstrap_state errors → log + skip + """ + logger.info( + "satmachineadmin v2: cassette bootstrap consumer starting " + f"(sub_id={CASSETTE_BOOTSTRAP_SUB_ID})" + ) + current_filter_key: str | None = None + while True: + try: + current_filter_key = await _cassette_consumer_tick(current_filter_key) + await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S) + except _NostrclientUnavailable: + logger.warning( + "satmachineadmin: nostrclient extension not installed; " + f"cassette bootstrap consumer sleeping {_CASSETTE_BACKOFF_S}s " + "before retry. Install + activate nostrclient on this " + "LNbits instance." + ) + current_filter_key = None + await asyncio.sleep(_CASSETTE_BACKOFF_S) + except Exception as exc: # listener must never die + logger.error( + f"satmachineadmin: cassette consumer loop error (continuing): " f"{exc}" + ) + await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S) + + +class _NostrclientUnavailable(Exception): + """Internal sentinel — nostrclient extension import failed. Caller + sleeps a backoff then retries; the operator may install nostrclient + at any time.""" + + +async def _cassette_consumer_tick(current_filter_key: str | None) -> str: + """Single iteration of the bootstrap-consumer loop. Returns the filter + key used this tick so the caller can detect filter-set changes. + + Raises _NostrclientUnavailable if nostrclient can't be imported (the + outer loop backs off + retries). + """ + try: + from nostrclient.router import ( # type: ignore[import-not-found] + NostrRouter, + nostr_client, + ) + except ImportError as exc: + raise _NostrclientUnavailable() from exc + + from .cassette_transport import build_state_d_tags_for_machines + from .crud import ( + apply_bootstrap_state, + get_machine_by_atm_pubkey_hex, + list_all_active_machines, + ) + + machines = await list_all_active_machines() + d_tags = build_state_d_tags_for_machines(machines) + filter_key = ",".join(sorted(d_tags)) + + if filter_key != current_filter_key: + if d_tags: + filters = [{"kinds": [30078], "#d": d_tags}] + # nostrclient's add_subscription is typed as list[str] but the + # actual relay protocol accepts list[Filter-dict] — type ignore + # the upstream typing mismatch. + nostr_client.relay_manager.add_subscription( + CASSETTE_BOOTSTRAP_SUB_ID, filters # type: ignore[arg-type] + ) + logger.info( + "satmachineadmin: (re)registered cassette bootstrap " + f"subscription with {len(d_tags)} d-tag(s)" + ) + else: + nostr_client.relay_manager.close_subscription(CASSETTE_BOOTSTRAP_SUB_ID) + logger.info( + "satmachineadmin: no active machines; closed cassette " + "bootstrap subscription" + ) + + inbound = NostrRouter.received_subscription_events.get(CASSETTE_BOOTSTRAP_SUB_ID) + if inbound: + while inbound: + event_message = inbound.pop(0) + try: + await _handle_cassette_state_event( + event_message, + get_machine_by_atm_pubkey_hex, + apply_bootstrap_state, + ) + except Exception as exc: + logger.warning( + f"satmachineadmin: cassette state event handler " + f"failed (skipping): {exc}" + ) + + return filter_key + + +async def _handle_cassette_state_event( + event_message, + get_machine_by_atm_pubkey_hex, + apply_bootstrap_state, +) -> None: + """Verify signature, resolve the operator's signer, decrypt via the + signer abstraction (bunker round-trip for RemoteBunkerSigner; direct + prvkey on the LocalSigner transitional fallback inside the transport + helper), parse, upsert. + + Each step logs at WARNING (not ERROR) so a noisy attacker can't fill + the logs — this is data on a public relay, garbage is expected. + + Two skip outcomes: + - Terminal (CassetteEventDecodeError / SignerUnavailable / + OperatorIdentityMissing / etc.): log + return. `apply_bootstrap_ + state` is never called → `state_event_id` is not advanced → + same event would re-process on next poll cycle but the consumer's + WARN log surfaces the underlying issue immediately. + - Transient (CassetteEventTransientError): log at INFO (less noisy) + + return. Same retry-via-no-advance semantics, just less + alarming in the operator log feed. + """ + import json as _json + from datetime import datetime as _datetime + from datetime import timezone as _timezone + + from lnbits.utils.nostr import verify_event + + from .cassette_transport import ( + CassetteEventDecodeError, + CassetteEventTransientError, + CassetteTransportError, + _resolve_operator_signer, + decrypt_and_parse_state_event, + ) + + event_raw = event_message.event + if isinstance(event_raw, str): + event_obj = _json.loads(event_raw) + elif isinstance(event_raw, dict): + event_obj = event_raw + else: + logger.warning( + f"satmachineadmin: cassette event of unexpected type " + f"{type(event_raw).__name__}; skipping" + ) + return + + if not verify_event(event_obj): + logger.warning( + f"satmachineadmin: cassette state event sig verify failed " + f"(id={event_obj.get('id', '?')[:12]}...)" + ) + return + + sender_pubkey = event_obj.get("pubkey", "") + machine = await get_machine_by_atm_pubkey_hex(sender_pubkey) + if machine is None: + # Unknown sender — could be relay noise or an attacker. Don't + # treat as our problem. + logger.warning( + f"satmachineadmin: cassette state event from unknown ATM " + f"pubkey {sender_pubkey[:12]}... (not in dca_machines); " + "skipping" + ) + return + + try: + account, signer = await _resolve_operator_signer(machine.operator_user_id) + except CassetteTransportError as exc: + # OperatorIdentityMissing / SignerUnavailable — log + skip. + logger.warning( + f"satmachineadmin: can't resolve signer for operator " + f"{machine.operator_user_id[:8]}... (machine {machine.id}): " + f"{exc}" + ) + return + + try: + payload = await decrypt_and_parse_state_event(event_obj, account, signer) + except CassetteEventTransientError as exc: + logger.info( + f"satmachineadmin: cassette state event for machine {machine.id} " + f"hit a transient signer error (will retry next poll): {exc}" + ) + return + except CassetteEventDecodeError as exc: + logger.warning( + f"satmachineadmin: cassette state event decode failed for " + f"machine {machine.id} (id={event_obj.get('id', '?')[:12]}...): " + f"{exc}" + ) + return + + event_id = event_obj.get("id", "") + created_at_unix = event_obj.get("created_at", 0) + event_created_at = _datetime.fromtimestamp(int(created_at_unix), tz=_timezone.utc) + + applied = await apply_bootstrap_state( + machine.id, event_id, event_created_at, payload + ) + if applied: + logger.info( + f"satmachineadmin: applied bootstrap state event {event_id[:12]}... " + f"to machine {machine.id} ({len(payload.positions)} cassettes)" + ) + else: + # Replay: event_id already on file. Normal on relay reconnect. + logger.debug( + f"satmachineadmin: cassette state event {event_id[:12]}... " + f"already applied to machine {machine.id} (replay no-op)" + ) diff --git a/templates/satmachineadmin/index.html b/templates/satmachineadmin/index.html index 6278ef9..8b3ddf3 100644 --- a/templates/satmachineadmin/index.html +++ b/templates/satmachineadmin/index.html @@ -818,7 +818,7 @@ - Reload settlements + Reload Close @@ -845,7 +845,21 @@ -

+ + + + + + + + + +
Settlements

@@ -959,10 +973,158 @@ + + + + +

+
+
Cassettes
+

+ Per-cassette count and physical bay position. Denomination + set is hardware-determined (re-provision via atm-tui to + change). "Publish to ATM" encrypts + signs + sends the new + config to the machine via Nostr. +

+
+
+ + Discard unsaved edits + + +
+
+ + + + + + + + + Waiting for the ATM's bootstrap state event. Power on the ATM + and confirm it has reached the configured relay; cassette + rows will auto-populate on receipt. + + + + + + + + + + + + + + +
Publish cassette config to ATM
+ + +
+ + + + This publish will overwrite the ATM's currently-tracked + counts. If the ATM has dispensed cash since your last + refill or count baseline, those decrements will be lost. + Publish only after a physical refill (a known total), not to + "tweak" counts mid-day. v2 reconciliation will replace this + modal with reconciled state display. + +

Sending to ATM:

+ + + + + + + + + + + · count + + + + + +
+ + + + +
+
+ diff --git a/tests/test_cassette_configs.py b/tests/test_cassette_configs.py new file mode 100644 index 0000000..08a7e00 --- /dev/null +++ b/tests/test_cassette_configs.py @@ -0,0 +1,220 @@ +""" +Tests for the v1.1 cassette-config layer (aiolabs/satmachineadmin#29). + +Covers the pure pieces that don't need a live DB: + - Pydantic validator behaviour on PublishCassettesPayload + the row / + upsert models (position key coercion, integer ranges, multiple-same- + denomination payloads, wire-format round-trip) + - _should_apply_bootstrap_state dedup helper (extracted from + apply_bootstrap_state so the relay-re-delivery decision is testable + without a database round-trip) + +DB-touching tests (apply_bootstrap_state actually upserting, list-by- +machine ordering, etc.) follow the project convention from +test_deposit_currency.py: "Layer 2 is an endpoint-level behaviour better +covered by an integration test against a running LNbits; tracked in #26 +as a follow-up." Smoke-tested manually via the dev container. + +Wire shape pivot from m007 → m008 is the v1.1 coordination point per +coord-log 2026-05-30T18:30Z + 18:45Z: position is the row identity, +denomination + count are operator-editable per row, multiple same-denom +cassettes are valid. +""" + +import pytest + +from ..crud import _should_apply_bootstrap_state +from ..models import ( + CassettePayloadRow, + PublishCassettesPayload, + UpsertCassetteConfigData, +) + +# ============================================================================= +# PublishCassettesPayload — wire-shape validators +# ============================================================================= + + +class TestPublishCassettesPayload: + """The kind-30078 content payload, bidirectional (operator→ATM and + ATM→operator share the shape). String JSON keys must coerce to int; + per-row int constraints enforced; multiple same-denom rows are valid.""" + + def test_happy_path_coerces_string_keys_to_int(self): + p = PublishCassettesPayload( + positions={ + "1": {"denomination": 20, "count": 49}, + "2": {"denomination": 50, "count": 100}, + } + ) + assert set(p.positions.keys()) == {1, 2} + assert p.positions[1].denomination == 20 + assert p.positions[1].count == 49 + assert p.positions[2].denomination == 50 + assert p.positions[2].count == 100 + + def test_wire_dict_round_trip_restringifies_keys(self): + """to_wire_dict() must restringify position keys so the resulting + JSON is parseable by clients (including the ATM-side nostr-tools + NIP-44 v2 consumer per the byte-compat cross-test).""" + original = PublishCassettesPayload( + positions={ + "1": {"denomination": 20, "count": 49}, + "2": {"denomination": 50, "count": 100}, + } + ) + wire = original.to_wire_dict() + assert wire == { + "positions": { + "1": {"denomination": 20, "count": 49}, + "2": {"denomination": 50, "count": 100}, + } + } + # And the wire form round-trips back through the parser cleanly. + reparsed = PublishCassettesPayload(**wire) + assert reparsed.positions == original.positions + + def test_accepts_multiple_same_denomination_cassettes(self): + """v1.1 operational case: real machines have N cassettes loaded + with the same denomination for cash-out throughput. The wire shape + must accept this, and we explicitly do NOT validate uniqueness on + denomination. Coord-log 2026-05-30T18:45Z bitspire response.""" + p = PublishCassettesPayload( + positions={ + "1": {"denomination": 20, "count": 100}, + "2": {"denomination": 20, "count": 100}, + "3": {"denomination": 50, "count": 50}, + "4": {"denomination": 100, "count": 25}, + } + ) + assert len(p.positions) == 4 + denoms = [row.denomination for row in p.positions.values()] + assert denoms.count(20) == 2 # two $20 cassettes + assert sorted(denoms) == [20, 20, 50, 100] + + def test_rejects_non_int_position_key(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload(positions={"abc": {"denomination": 20, "count": 1}}) + assert "is not an int" in str(exc.value) + + def test_rejects_non_positive_position(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload(positions={"0": {"denomination": 20, "count": 1}}) + assert "position must be > 0" in str(exc.value) + + def test_rejects_negative_position(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload(positions={"-1": {"denomination": 20, "count": 1}}) + assert "position must be > 0" in str(exc.value) + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + PublishCassettesPayload(positions={"1": {"denomination": 20, "count": -1}}) + + def test_rejects_zero_denomination(self): + with pytest.raises(ValueError): + PublishCassettesPayload(positions={"1": {"denomination": 0, "count": 49}}) + + def test_rejects_negative_denomination(self): + with pytest.raises(ValueError): + PublishCassettesPayload(positions={"1": {"denomination": -20, "count": 49}}) + + def test_allows_zero_count(self): + """An empty cassette is a legal state — operator must be able to + record `count=0` after a dispatcher pulled the cassette mid-day.""" + p = PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 0}}) + assert p.positions[1].count == 0 + + +# ============================================================================= +# CassettePayloadRow — per-row int constraints +# ============================================================================= + + +class TestCassettePayloadRow: + def test_happy_path(self): + row = CassettePayloadRow(denomination=20, count=49) + assert row.denomination == 20 + assert row.count == 49 + + @pytest.mark.parametrize("bad_denom", [0, -1, -100]) + def test_rejects_non_positive_denomination(self, bad_denom): + with pytest.raises(ValueError): + CassettePayloadRow(denomination=bad_denom, count=1) + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + CassettePayloadRow(denomination=20, count=-1) + + +# ============================================================================= +# UpsertCassetteConfigData — operator-edit form +# ============================================================================= + + +class TestUpsertCassetteConfigData: + """Operator-driven row edit. Both fields optional; same int constraints + as the wire-format row but applied independently per-edit. Position is + NOT editable — it's the row's identity (the hardware bay number).""" + + def test_partial_update_count_only(self): + d = UpsertCassetteConfigData(count=80) + assert d.count == 80 + assert d.denomination is None + + def test_partial_update_denomination_only(self): + """v1.1 operational case: operator records a cartridge swap at + refill — slot 1 was $20, dispatcher replaced with $50.""" + d = UpsertCassetteConfigData(denomination=50) + assert d.denomination == 50 + assert d.count is None + + def test_empty_update_is_legal(self): + """An empty UpsertCassetteConfigData parses fine; the CRUD short- + circuits a no-op on empty payload (no SQL emitted).""" + d = UpsertCassetteConfigData() + assert d.count is None + assert d.denomination is None + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + UpsertCassetteConfigData(count=-1) + + def test_rejects_non_positive_denomination(self): + with pytest.raises(ValueError): + UpsertCassetteConfigData(denomination=0) + + +# ============================================================================= +# _should_apply_bootstrap_state — relay re-delivery dedup +# ============================================================================= + + +class TestShouldApplyBootstrapState: + """Pure-function dedup gate extracted from apply_bootstrap_state so the + decision is testable without a DB. Logic: apply if-and-only-if the + existing row's state_event_id differs from the incoming event_id. + + In v1.1 the ATM publishes the bootstrap event exactly once per machine, + so this is sufficient for replay protection. v2 will need a + `last_state_created_at` watermark in addition (per bitspire's + `meta.lastKnownConfigCreatedAt` on the ATM side) — flagged in #29's + v2 forward-look section. + """ + + def test_applies_when_no_existing_row(self): + assert _should_apply_bootstrap_state(None, "new-event-id") is True + + def test_applies_when_existing_event_id_differs(self): + assert _should_apply_bootstrap_state("old-event-id", "new-event-id") is True + + def test_skips_when_existing_event_id_matches(self): + """The same bootstrap event re-delivered after a relay reconnect + or satmachineadmin restart should no-op, not re-upsert the same + rows (which would clobber any operator edits since).""" + assert _should_apply_bootstrap_state("same-event", "same-event") is False + + def test_applies_when_existing_is_empty_string_and_incoming_is_id(self): + """Defensive — a sentinel empty-string existing_state_event_id + shouldn't block a real incoming event from applying.""" + assert _should_apply_bootstrap_state("", "real-event-id") is True diff --git a/tests/test_cassette_state_consumer.py b/tests/test_cassette_state_consumer.py new file mode 100644 index 0000000..a0840bc --- /dev/null +++ b/tests/test_cassette_state_consumer.py @@ -0,0 +1,485 @@ +""" +Tests for the cassette bootstrap consumer's transport-decrypt path +(`cassette_transport.decrypt_and_parse_state_event`) and d-tag construction. + +Post-PR-#38 migration (2026-05-31): the function takes an Account + +NostrSigner instead of a raw privkey, and is async. Tests use: + - `_FakeBunkerSigner` — implements async `nip44_decrypt/encrypt` against + the hand-rolled `nip44` impl so tests don't need a live bunker. + Exercises the "happy" RemoteBunkerSigner path. + - `_FakeLocalSignerStub` — raises `SignerUnavailableError` from + `nip44_decrypt`, mimicking the post-#38 `LocalSigner` stub. Combined + with an Account that has `signer_type="LocalSigner"` + `prvkey`, + exercises the transitional fallback path in + `_nip44_decrypt_via_signer`. + - `_FakeRaisingSigner` — raises an arbitrary exception, used to + exercise the `NsecBunkerTimeoutError` → `CassetteEventTransientError` + and `NsecBunkerRpcError` → `CassetteEventDecodeError` mappings. + +Coroutines are driven via `asyncio.run` so no pytest-asyncio config is +required. Matches the existing project test pattern (test_init.py +demonstrates the project lacks an asyncio plugin in CI; using asyncio.run +inside the test body sidesteps that without changing project config). + +Full handler tests (the dispatch through verify_event → +get_machine_by_atm_pubkey_hex → apply_bootstrap_state) need a live LNbits +DB; smoke-tested manually via the dev container per the project +convention (see test_deposit_currency.py rationale). +""" + +import asyncio +import json +from types import SimpleNamespace + +import coincurve +import pytest +from lnbits.core.services.nip46_bunker_client import ( + NsecBunkerRpcError, + NsecBunkerTimeoutError, +) +from lnbits.core.signers.base import SignerUnavailableError + +from ..cassette_transport import ( + CassetteEventDecodeError, + CassetteEventTransientError, + _atm_hex_pubkey, + _config_d_tag, + _state_d_tag, + build_state_d_tags_for_machines, + decrypt_and_parse_state_event, +) +from ..models import Machine, PublishCassettesPayload +from ..nip44 import ( + decrypt_from as _nip44_decrypt, +) +from ..nip44 import ( + encrypt_with_conversation_key, + get_conversation_key, +) + +# Canonical keys (integer 1 + integer 2, the paulmillr/nip44 reference pair). +_OP_SEC = "00" * 31 + "01" +_ATM_SEC = "00" * 31 + "02" + + +def _pub_hex(sec_hex: str) -> str: + return ( + coincurve.PrivateKey(bytes.fromhex(sec_hex)) + .public_key.format(compressed=True)[1:] + .hex() + ) + + +_OP_PUB = _pub_hex(_OP_SEC) +_ATM_PUB = _pub_hex(_ATM_SEC) + + +# ============================================================================= +# Fake signers + account-shaped helper +# ============================================================================= + + +class _FakeBunkerSigner: + """Test double for RemoteBunkerSigner — implements async nip44_* + against the hand-rolled `nip44` impl. Used to exercise the + "signer.nip44_decrypt returns successfully" path without standing up + a live bunker process.""" + + def __init__(self, privkey_hex: str): + self._privkey_hex = privkey_hex + + @property + def pubkey(self) -> str: + return _pub_hex(self._privkey_hex) + + def can_sign(self) -> bool: + return True + + async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str: + ck = get_conversation_key(self._privkey_hex, peer_pubkey_hex) + return encrypt_with_conversation_key(plaintext, ck) + + async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str: + return _nip44_decrypt(ciphertext, self._privkey_hex, peer_pubkey_hex) + + +class _FakeLocalSignerStub: + """Test double for the post-#38 LocalSigner stub — its nip44_* always + raises SignerUnavailableError. Combined with an Account that has + `signer_type='LocalSigner'` + `prvkey` populated, exercises the + transitional fallback in `_nip44_decrypt_via_signer` (which catches + the SignerUnavailableError and falls back to direct-prvkey via the + hand-rolled impl).""" + + def can_sign(self) -> bool: + return True + + async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str: + raise SignerUnavailableError("LocalSigner does not implement nip44_encrypt") + + async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str: + raise SignerUnavailableError("LocalSigner does not implement nip44_decrypt") + + +class _FakeRaisingSigner: + """Test double that raises a configurable exception on nip44_decrypt. + Used to validate the bunker-error-mapping branches in + decrypt_and_parse_state_event.""" + + def __init__(self, exc): + self._exc = exc + + def can_sign(self) -> bool: + return True + + async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str: + raise self._exc + + async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str: + raise self._exc + + +def _fake_account( + signer_type: str = "RemoteBunkerSigner", + prvkey: str | None = None, +): + """Account-shaped duck-typed object. decrypt_and_parse_state_event + + _nip44_decrypt_via_signer only read `.signer_type` and `.prvkey`; the + rest is irrelevant.""" + return SimpleNamespace( + id="test-operator", + pubkey=_OP_PUB, + prvkey=prvkey, + signer_type=signer_type, + signer_config=None, + ) + + +def _make_state_event( + payload: PublishCassettesPayload, + *, + atm_sec: str = _ATM_SEC, + op_pub: str = _OP_PUB, + atm_pub: str = _ATM_PUB, + event_id: str = "fake-event-id", + created_at: int = 1234567890, +) -> dict: + """Build a state event the way bitspire's ATM publisher would. Skips + the sig-verify step (handler-level concern); the transport-decrypt + path doesn't depend on sig validity, only on conversation-key match.""" + plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":")) + ck = get_conversation_key(atm_sec, op_pub) + content = encrypt_with_conversation_key(plaintext, ck) + return { + "kind": 30078, + "pubkey": atm_pub, + "content": content, + "tags": [ + ["d", f"bitspire-cassettes-state:{atm_pub}"], + ["p", op_pub], + ], + "created_at": created_at, + "id": event_id, + } + + +# ============================================================================= +# decrypt_and_parse_state_event — RemoteBunkerSigner happy path +# ============================================================================= + + +class TestDecryptViaBunkerSigner: + """The expected production path post-#38: operator account is bunker- + backed, signer.nip44_decrypt routes through the bunker (mocked here + via _FakeBunkerSigner), and the wire payload round-trips cleanly.""" + + def test_happy_path_recovers_positions_keyed_payload(self): + payload = PublishCassettesPayload( + positions={ + "1": {"denomination": 20, "count": 49}, + "2": {"denomination": 50, "count": 100}, + } + ) + event = _make_state_event(payload) + account = _fake_account(signer_type="RemoteBunkerSigner") + signer = _FakeBunkerSigner(_OP_SEC) + + recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + assert sorted(recovered.positions.keys()) == [1, 2] + assert recovered.positions[1].denomination == 20 + assert recovered.positions[1].count == 49 + assert recovered.positions[2].denomination == 50 + assert recovered.positions[2].count == 100 + + def test_round_trips_multiple_same_denomination(self): + """v1.1 operational case (coord-log 2026-05-30T18:45Z) — multiple + bays carrying the same denomination.""" + payload = PublishCassettesPayload( + positions={ + "1": {"denomination": 20, "count": 100}, + "2": {"denomination": 20, "count": 100}, + "3": {"denomination": 20, "count": 100}, + "4": {"denomination": 20, "count": 100}, + } + ) + event = _make_state_event(payload) + account = _fake_account() + signer = _FakeBunkerSigner(_OP_SEC) + + recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + assert len(recovered.positions) == 4 + for pos in (1, 2, 3, 4): + assert recovered.positions[pos].denomination == 20 + assert recovered.positions[pos].count == 100 + + +# ============================================================================= +# decrypt_and_parse_state_event — LocalSigner transitional fallback +# ============================================================================= + + +class TestDecryptViaLocalSignerFallback: + """When the operator account is still on LocalSigner (pre-bunker + migration), the LocalSigner stub raises SignerUnavailableError from + nip44_decrypt. `_nip44_decrypt_via_signer` catches that and falls + back to the hand-rolled impl using `account.prvkey`. Same wire + output; transitional until S7 retires LocalSigner accounts entirely.""" + + def test_localsigner_with_prvkey_decrypts_via_fallback(self): + payload = PublishCassettesPayload( + positions={"1": {"denomination": 20, "count": 49}} + ) + event = _make_state_event(payload) + account = _fake_account(signer_type="LocalSigner", prvkey=_OP_SEC) + signer = _FakeLocalSignerStub() + + recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + assert recovered.positions[1].denomination == 20 + assert recovered.positions[1].count == 49 + + def test_localsigner_without_prvkey_raises_decode_error(self): + """A LocalSigner account whose prvkey field is None (impossible + in practice — LocalSigner requires prvkey at provision time, but + defensive in case the row is corrupt) should surface as a + decode error, not silently succeed.""" + payload = PublishCassettesPayload( + positions={"1": {"denomination": 20, "count": 49}} + ) + event = _make_state_event(payload) + account = _fake_account(signer_type="LocalSigner", prvkey=None) + signer = _FakeLocalSignerStub() + + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + def test_clientonlysigner_raises_decode_error(self): + """ClientSideOnlySigner has no server-side decrypt path at all; + falling back to direct-prvkey is also impossible (no prvkey). + Surface as a decode error so the consumer logs + skips.""" + payload = PublishCassettesPayload( + positions={"1": {"denomination": 20, "count": 49}} + ) + event = _make_state_event(payload) + account = _fake_account(signer_type="ClientSideOnlySigner", prvkey=None) + signer = _FakeLocalSignerStub() # behaves the same way: raises + + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + +# ============================================================================= +# decrypt_and_parse_state_event — bunker error mapping +# ============================================================================= + + +class TestBunkerErrorMapping: + """The post-#38 error hierarchy splits transient (bunker partitioned) + from terminal (bunker policy reject, MAC failure). Consumer behaves + differently — transient retries, terminal logs + skips. Validate the + mapping from NsecBunker* exceptions to our CassetteEvent* types.""" + + def test_timeout_maps_to_transient_error(self): + """Bunker unreachable → NsecBunkerTimeoutError → caller-visible + CassetteEventTransientError. Consumer treats this as retry- + eligible (don't advance state_event_id).""" + payload = PublishCassettesPayload( + positions={"1": {"denomination": 20, "count": 49}} + ) + event = _make_state_event(payload) + account = _fake_account() + signer = _FakeRaisingSigner(NsecBunkerTimeoutError("bunker unreachable")) + with pytest.raises(CassetteEventTransientError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + def test_rpc_reject_maps_to_decode_error(self): + """Bunker rejected the RPC (policy / MAC / config) → + NsecBunkerRpcError → caller-visible CassetteEventDecodeError. + Terminal — retrying won't help.""" + payload = PublishCassettesPayload( + positions={"1": {"denomination": 20, "count": 49}} + ) + event = _make_state_event(payload) + account = _fake_account() + signer = _FakeRaisingSigner( + NsecBunkerRpcError("bunker policy reject: kind 30078 not authorised") + ) + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + +# ============================================================================= +# decrypt_and_parse_state_event — payload + envelope validation +# ============================================================================= + + +class TestPayloadValidation: + """Errors that originate at the parse layer (post-decrypt), not the + signer. Same set as pre-migration — covered through the bunker-signer + path since LocalSigner is going away.""" + + def test_tampered_content_rejected(self): + payload = PublishCassettesPayload( + positions={"1": {"denomination": 20, "count": 49}} + ) + event = _make_state_event(payload) + event["content"] = event["content"][:-2] + "AA" + account = _fake_account() + signer = _FakeBunkerSigner(_OP_SEC) + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + def test_wrong_signer_privkey_rejected(self): + """Wrong privkey on the signer → wrong conversation key → MAC + verification fails inside nip44_decrypt → surfaces as decode + error (via the hand-rolled Nip44Error since this is the fake + bunker signer; in production the bunker would raise + NsecBunkerRpcError which also maps to CassetteEventDecodeError).""" + payload = PublishCassettesPayload( + positions={"1": {"denomination": 20, "count": 49}} + ) + event = _make_state_event(payload) + account = _fake_account() + wrong_sec = "00" * 31 + "03" + signer = _FakeBunkerSigner(wrong_sec) + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + def test_missing_content_rejected(self): + event = _make_state_event( + PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}}) + ) + del event["content"] + account = _fake_account() + signer = _FakeBunkerSigner(_OP_SEC) + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + def test_missing_pubkey_rejected(self): + event = _make_state_event( + PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}}) + ) + del event["pubkey"] + account = _fake_account() + signer = _FakeBunkerSigner(_OP_SEC) + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + def test_decrypted_garbage_json_rejected(self): + """If plaintext decrypts cleanly but isn't valid JSON, surface + as decode error (not crash the consumer loop).""" + ck = get_conversation_key(_ATM_SEC, _OP_PUB) + event = { + "kind": 30078, + "pubkey": _ATM_PUB, + "content": encrypt_with_conversation_key("definitely not json", ck), + "tags": [], + "created_at": 0, + "id": "x", + } + account = _fake_account() + signer = _FakeBunkerSigner(_OP_SEC) + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + def test_decrypted_wrong_shape_rejected(self): + """Well-formed JSON but missing 'positions' → payload-shape + validation failure.""" + ck = get_conversation_key(_ATM_SEC, _OP_PUB) + event = { + "kind": 30078, + "pubkey": _ATM_PUB, + "content": encrypt_with_conversation_key('{"wrong_field": 42}', ck), + "tags": [], + "created_at": 0, + "id": "x", + } + account = _fake_account() + signer = _FakeBunkerSigner(_OP_SEC) + with pytest.raises(CassetteEventDecodeError): + asyncio.run(decrypt_and_parse_state_event(event, account, signer)) + + +# ============================================================================= +# d-tag construction — unchanged by the migration, kept as regression guard +# ============================================================================= + + +class TestDTagConstruction: + """The `` placeholder in d-tags = ATM hex pubkey (load-bearing per + coord-log 2026-05-30T11:50Z). These tests pin the canonical + substitution so a refactor can't silently break wire compatibility.""" + + def _machine(self, npub: str, id_: str = "m1") -> Machine: + from datetime import datetime, timezone + + now = datetime.now(timezone.utc) + return Machine( + id=id_, + operator_user_id="op1", + machine_npub=npub, + wallet_id="w1", + name=None, + location=None, + fiat_code="EUR", + is_active=True, + created_at=now, + updated_at=now, + ) + + def test_atm_hex_pubkey_from_hex_storage(self): + assert _atm_hex_pubkey(self._machine(_ATM_PUB)) == _ATM_PUB + + def test_atm_hex_pubkey_lowercases_uppercase_hex(self): + assert _atm_hex_pubkey(self._machine(_ATM_PUB.upper())) == _ATM_PUB + + def test_atm_hex_pubkey_canonicalises_bech32_to_hex(self): + from lnbits.utils.nostr import hex_to_npub + + npub_bech32 = hex_to_npub(_ATM_PUB) + assert _atm_hex_pubkey(self._machine(npub_bech32)) == _ATM_PUB + + def test_config_d_tag_uses_hex_pubkey_not_id(self): + """REGRESSION GUARD: d-tag must contain the ATM hex pubkey, NOT + the internal machine UUID.""" + m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey") + d_tag = _config_d_tag(_atm_hex_pubkey(m)) + assert d_tag == f"bitspire-cassettes:{_ATM_PUB}" + assert "some-uuid" not in d_tag + + def test_state_d_tag_uses_hex_pubkey_not_id(self): + m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey") + d_tag = _state_d_tag(_atm_hex_pubkey(m)) + assert d_tag == f"bitspire-cassettes-state:{_ATM_PUB}" + assert "some-uuid" not in d_tag + + def test_build_state_d_tags_for_machines(self): + atm2_pub = _pub_hex("00" * 31 + "03") + machines = [ + self._machine(_ATM_PUB, id_="m1"), + self._machine(atm2_pub, id_="m2"), + ] + tags = build_state_d_tags_for_machines(machines) + assert tags == [ + f"bitspire-cassettes-state:{_ATM_PUB}", + f"bitspire-cassettes-state:{atm2_pub}", + ] diff --git a/tests/test_nip44_v2.py b/tests/test_nip44_v2.py new file mode 100644 index 0000000..31f996b --- /dev/null +++ b/tests/test_nip44_v2.py @@ -0,0 +1,390 @@ +""" +Tests for the hand-rolled NIP-44 v2 implementation in `nip44.py`. + +Three layers of validation, ordered by trust: + 1. Pinned reference vector from the canonical paulmillr/nip44 test suite — + the conversation_key for (sec=1, sec=2) is widely-published as + c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d. If + our get_conversation_key() ever drifts from that value, the impl is + broken at the key-derivation layer. + 2. Round-trip + tamper detection — verifies the encrypt/decrypt loop + under random nonces, catches HMAC + version + padding tampering. + 3. Cross-test (TBD) — bitspire will post one sample event encrypted on + their nostr-tools side to the coord log; test_decrypts_bitspire_sample + wires it as a fixture and asserts byte-compatibility with the + nostr-tools NIP-44 v2 impl. Placeholder stub until the sample lands. +""" + +import base64 + +import coincurve +import pytest + +from ..nip44 import ( + Nip44LengthError, + Nip44MacError, + Nip44VersionError, + _calc_padded_len, + decrypt_from, + decrypt_with_conversation_key, + encrypt_for, + encrypt_with_conversation_key, + get_conversation_key, +) + + +# Helper: derive a compressed-x-coord pubkey hex string from a secret hex. +def _pub_hex(sec_hex: str) -> str: + return ( + coincurve.PrivateKey(bytes.fromhex(sec_hex)) + .public_key.format(compressed=True)[1:] + .hex() + ) + + +# Canonical test keys widely used across NIP-44 reference vectors. +_SEC_ONE = "00" * 31 + "01" # integer 1 +_SEC_TWO = "00" * 31 + "02" # integer 2 +_PUB_ONE = _pub_hex(_SEC_ONE) +_PUB_TWO = _pub_hex(_SEC_TWO) + + +# ============================================================================= +# Layer 1 — pinned reference vector (paulmillr/nip44) +# ============================================================================= + + +class TestConversationKeyReferenceVector: + """Pinned reference vector from the canonical NIP-44 v2 test suite + (paulmillr/nip44). If get_conversation_key drifts from this value we + have a key-derivation regression — fail loudly.""" + + REFERENCE_CK_HEX = ( + "c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d" + ) + + def test_sec_one_pub_two(self): + ck = get_conversation_key(_SEC_ONE, _PUB_TWO) + assert ck.hex() == self.REFERENCE_CK_HEX + + def test_sec_two_pub_one_is_symmetric(self): + """Conversation key is symmetric: ck(privA, pubB) == ck(privB, pubA). + Both sides of a NIP-44 conversation derive the identical PRK; this + is what lets the recipient decrypt with their own privkey + the + sender's pubkey.""" + ck_ab = get_conversation_key(_SEC_ONE, _PUB_TWO) + ck_ba = get_conversation_key(_SEC_TWO, _PUB_ONE) + assert ck_ab == ck_ba + + +# ============================================================================= +# Layer 2 — round-trip + tamper detection +# ============================================================================= + + +class TestRoundTrip: + """Encrypt then decrypt under the high-level pair-keyed API.""" + + @pytest.mark.parametrize( + "plaintext", + [ + "a", # 1 byte (minimum) + "hello, nip44 v2", # short + "x" * 32, # exactly the small-payload boundary + "x" * 33, # just over + "y" * 1000, # medium + "z" * 5000, # large + '{"denominations": {"20": {"position": 1, "count": 49}}}', # realistic + ], + ) + def test_round_trip_various_lengths(self, plaintext): + payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + recovered = decrypt_from(payload, _SEC_TWO, _PUB_ONE) + assert recovered == plaintext + + def test_payloads_are_unique_under_random_nonce(self): + """Same plaintext + same key pair should produce different payloads + each time because the nonce is fresh CSPRNG bytes. Catches a + regression where the nonce is accidentally pinned.""" + plaintext = "the same message" + p1 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + p2 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + assert p1 != p2 + assert decrypt_from(p1, _SEC_TWO, _PUB_ONE) == plaintext + assert decrypt_from(p2, _SEC_TWO, _PUB_ONE) == plaintext + + def test_pinned_nonce_is_deterministic(self): + """Same plaintext + same key pair + same nonce = byte-identical + payload. Regression-locks the chacha20 + hmac chain.""" + ck = get_conversation_key(_SEC_ONE, _PUB_TWO) + nonce = bytes(32) # 32 zero bytes + p1 = encrypt_with_conversation_key("a", ck, nonce=nonce) + p2 = encrypt_with_conversation_key("a", ck, nonce=nonce) + assert p1 == p2 + assert decrypt_with_conversation_key(p1, ck) == "a" + + +class TestTamperDetection: + """HMAC-SHA256 verification catches tampered envelopes. The cryptographic + construction depends on this — if HMAC verification ever no-ops, a + relay-MITM could forge ATM state events.""" + + def _payload(self) -> str: + return encrypt_for("important message", _SEC_ONE, _PUB_TWO) + + def test_flipped_mac_byte_rejected(self): + raw = bytearray(base64.b64decode(self._payload())) + raw[-1] ^= 0x01 + tampered = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44MacError): + decrypt_from(tampered, _SEC_TWO, _PUB_ONE) + + def test_flipped_ciphertext_byte_rejected(self): + raw = bytearray(base64.b64decode(self._payload())) + # Flip a byte in the middle of the ciphertext segment + # (version[1] + nonce[32..32] + ciphertext[33..-32] + mac[-32..]) + ct_start = 1 + 32 + raw[ct_start + 5] ^= 0x01 + tampered = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44MacError): + decrypt_from(tampered, _SEC_TWO, _PUB_ONE) + + def test_flipped_nonce_byte_rejected(self): + raw = bytearray(base64.b64decode(self._payload())) + # Nonce starts at byte 1 (after version) + raw[1] ^= 0x01 + tampered = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44MacError): + decrypt_from(tampered, _SEC_TWO, _PUB_ONE) + + def test_wrong_recipient_privkey_rejected(self): + """The MAC is derived from the conversation key, so a wrong + recipient privkey produces a different conversation key → + different hmac_key → MAC verification fails. (Doesn't decrypt + to garbage; fails fast.)""" + sec_three = "00" * 31 + "03" + with pytest.raises(Nip44MacError): + decrypt_from(self._payload(), sec_three, _PUB_ONE) + + +class TestVersionRejection: + def test_v1_byte_rejected(self): + raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO))) + raw[0] = 0x01 + bad = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44VersionError): + decrypt_from(bad, _SEC_TWO, _PUB_ONE) + + def test_unknown_version_byte_rejected(self): + raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO))) + raw[0] = 0xFF + bad = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44VersionError): + decrypt_from(bad, _SEC_TWO, _PUB_ONE) + + +class TestLengthGuards: + def test_empty_plaintext_rejected(self): + with pytest.raises(Nip44LengthError): + encrypt_for("", _SEC_ONE, _PUB_TWO) + + def test_plaintext_at_max_length_accepted(self): + plaintext = "x" * 65535 + payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + assert decrypt_from(payload, _SEC_TWO, _PUB_ONE) == plaintext + + def test_plaintext_over_max_rejected(self): + with pytest.raises(Nip44LengthError): + encrypt_for("x" * 65536, _SEC_ONE, _PUB_TWO) + + def test_invalid_base64_payload_rejected(self): + with pytest.raises(Nip44LengthError): + decrypt_from("not!!!base64@@@", _SEC_TWO, _PUB_ONE) + + def test_payload_too_short_rejected(self): + # 50 bytes is well under the 99-byte minimum + too_short = base64.b64encode(b"\x02" + b"\x00" * 49).decode("ascii") + with pytest.raises(Nip44LengthError): + decrypt_from(too_short, _SEC_TWO, _PUB_ONE) + + +class TestPaddingFormula: + """Spot-check the _calc_padded_len formula against hand-computed cases. + Locks in the NIP-44 v2 padding scheme so a refactor can't silently + break wire compatibility (which would only surface as cross-impl + decryption failures — exactly what test_decrypts_bitspire_sample is + meant to catch end-to-end, but a unit test here is cheaper).""" + + @pytest.mark.parametrize( + "plaintext_len,expected_padded", + [ + (1, 32), # <= 32 → 32 + (16, 32), + (32, 32), + (33, 64), # > 32 → next chunk + (64, 64), + ( + 65, + 96, + ), # chunk = 32 for L=65 (next_power(64) = 64; 64//8 = 8; max(32, 8) = 32) + (100, 128), + (128, 128), + # L=129: next_power(128) = 1<<8 = 256; chunk = max(32, 256//8) = 32; + # padded = 32 * (128//32 + 1) = 32 * 5 = 160. + (129, 160), + (256, 256), # chunk = 32 for L=256 (next_power(255)=256; max(32, 32) = 32) + (257, 320), + ( + 1000, + 1024, + ), # chunk = 128 for L=1000 (next_power(999)=1024; max(32, 128) = 128) + ], + ) + def test_calc_padded_len(self, plaintext_len, expected_padded): + assert _calc_padded_len(plaintext_len) == expected_padded + + +# ============================================================================= +# Layer 3 — byte-compat cross-test against nostr-tools (bitspire's impl) +# ============================================================================= + + +# ----------------------------------------------------------------------------- +# Bitspire-side v1.1 fixture, posted to ~/dev/coordination/log.md at +# 2026-05-30T19:00Z. Positions-keyed wire shape per the v1.1 redesign +# (18:30Z + 18:45Z); intentionally includes two positions sharing +# denomination=20 to exercise the multi-same-denom round-trip on our +# decrypt + payload-validate path. Throwaway keypairs (one-shot, never +# sign anything else) — safe to embed verbatim. +# Generated by apps/machine/src/services/operator-config.ts-shape code +# path using the @bitSpire/nostr-client encryptContentV2 + +# createSignedEvent helpers (same code the production bootstrap publish +# uses). Round-tripped on bitspire side via decryptContentV2 before posting. +# ----------------------------------------------------------------------------- + +_BITSPIRE_FIXTURE = { + "atm_keypair": { + "privkey_hex": ( + "814e6188d017102bbf301ba5b38fba95b2556dc79a60df4cd50605c4593578e6" + ), + "pubkey_hex": ( + "217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b" + ), + }, + "operator_keypair": { + "privkey_hex": ( + "cca7dd9fe4874f6b9f3f3fae21648da686b7e714bfd4786e8fa8745933fd3185" + ), + "pubkey_hex": ( + "49bd8e615769f8b6a5aa8ce9617b919996abecf234599ba196789461cf239146" + ), + }, + "expected_plaintext": { + "positions": { + "1": {"denomination": 20, "count": 49}, + "2": {"denomination": 20, "count": 38}, + "3": {"denomination": 50, "count": 100}, + }, + }, + "event": { + "kind": 30078, + "content": ( + "AqOHsCcjN2W8L/Cx0uH+n++VA13W+wy7z1EcuuNX49sSagelX2lI0HEKyd+ActOc" + "iaPsHrp9ecJTkEZOD86ioldbLbEVColJwK4g1uVZSbpDeqRe+97woxVDqPnzj507" + "tFaVLF/dRmda+oKHUzkVPhE4PHQJzp9Fqji38J3nU6N68qo7KOt3qg1nSy5eDfAu" + "zt7djRBx63+/veub0rWTMMQLBgci8+Ms6Y+Zb1mki3L6NWuIR0Or+8DhcD+ZJiOu" + "WTcx" + ), + "tags": [ + [ + "d", + "bitspire-cassettes-state:" + "217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b", + ], + [ + "p", + "49bd8e615769f8b6a5aa8ce9617b919996abecf234599ba196789461cf239146", + ], + ], + "created_at": 1780173222, + "pubkey": ("217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b"), + "id": ("72c09f333386dd4ad6125f8c69823824eea50d8091b694458bcd60701517eece"), + "sig": ( + "07ecafacf0169f074e564a999ee1c31446930b43391d007c4a1f9ef7ad890d6c" + "2aa6e3ecc5318edeb5748fbd64c7ca33407099a97154e2ff7e0c626e48d71925" + ), + }, +} + + +class TestBitspireCrossTest: + """Byte-compat cross-test between our hand-rolled NIP-44 v2 (`nip44.py`) + and the nostr-tools NIP-44 v2 impl that bitspire uses on the ATM side + (via @bitSpire/nostr-client). If these tests pass, the wire format + agrees across both implementations and the joint round-trip (operator + publish → ATM apply / ATM bootstrap → operator consume) is byte-safe. + If any fail, the spec ambiguity surfaces before sintra ships.""" + + def test_decrypts_bitspire_sample_event(self): + """The load-bearing assertion: our `decrypt_from` recovers the + expected `{"positions": {...}}` plaintext from bitspire's encrypted + event content. v1.1 fixture intentionally exercises the multi-same- + denomination round-trip (positions 1 + 2 both hold $20).""" + import json + + event = _BITSPIRE_FIXTURE["event"] + operator_privkey = _BITSPIRE_FIXTURE["operator_keypair"]["privkey_hex"] + + from ..nip44 import decrypt_from + + plaintext = decrypt_from( + event["content"], + operator_privkey, + event["pubkey"], + ) + payload = json.loads(plaintext) + assert payload == _BITSPIRE_FIXTURE["expected_plaintext"] + + # v1.1 invariant: two positions can carry the same denomination. + # Pin it explicitly so a future "fix" that re-introduces denom- + # uniqueness validation surfaces here instead of as a runtime + # rejection on real machines. + assert payload["positions"]["1"]["denomination"] == 20 + assert payload["positions"]["2"]["denomination"] == 20 + assert payload["positions"]["1"]["count"] != payload["positions"]["2"]["count"] + + def test_signature_verifies_via_lnbits_helper(self): + """Optional extra per bitspire's 13:15Z note (3). The consumer + path runs verify_event before NIP-44 decrypt — locking the sig- + algorithm agreement here means both sides hash the event id the + same way + Schnorr-verify under the same x-only public-key + convention.""" + from lnbits.utils.nostr import verify_event + + assert verify_event(_BITSPIRE_FIXTURE["event"]) is True + + def test_encrypt_round_trip_via_our_impl_decrypts_with_their_keys(self): + """Optional extra per bitspire's 13:15Z note (3). Encrypt the + expected plaintext using OUR impl with the ATM keypair as + sender + operator pubkey as recipient. The resulting ciphertext + won't be byte-identical to the fixture (NIP-44 v2 nonces are + random) but it MUST decrypt back to the same plaintext when + passed to our decrypt path. Locks the encrypt direction too, + not just decrypt.""" + import json + + from ..nip44 import decrypt_from, encrypt_for + + plaintext = json.dumps( + _BITSPIRE_FIXTURE["expected_plaintext"], separators=(",", ":") + ) + atm_sec = _BITSPIRE_FIXTURE["atm_keypair"]["privkey_hex"] + atm_pub = _BITSPIRE_FIXTURE["atm_keypair"]["pubkey_hex"] + op_sec = _BITSPIRE_FIXTURE["operator_keypair"]["privkey_hex"] + op_pub = _BITSPIRE_FIXTURE["operator_keypair"]["pubkey_hex"] + + our_ciphertext = encrypt_for(plaintext, atm_sec, op_pub) + recovered = decrypt_from(our_ciphertext, op_sec, atm_pub) + assert json.loads(recovered) == _BITSPIRE_FIXTURE["expected_plaintext"] + # The two ciphertexts SHOULD differ (random nonce per encrypt) + assert our_ciphertext != _BITSPIRE_FIXTURE["event"]["content"] diff --git a/views_api.py b/views_api.py index 93ceeeb..66d9e76 100644 --- a/views_api.py +++ b/views_api.py @@ -12,6 +12,13 @@ from lnbits.core.crud import get_wallet from lnbits.core.models import User from lnbits.decorators import check_super_user, check_user_exists +from .cassette_transport import ( + CassetteTransportError, + OperatorIdentityMissing, + RelayUnavailable, + SignerUnavailable, + publish_to_atm, +) from .crud import ( append_settlement_note, count_completed_legs_for_settlement, @@ -39,9 +46,11 @@ from .crud import ( get_settlements_for_operator, get_stuck_settlements_for_operator, get_super_config, + list_cassette_configs_for_machine, lp_is_onboarded, replace_commission_splits, reset_settlement_for_retry, + update_cassette_config, update_dca_client, update_deposit, update_deposit_status, @@ -55,6 +64,7 @@ from .distribution import ( ) from .models import ( AppendSettlementNoteData, + CassetteConfig, ClientBalanceSummary, CommissionSplit, CreateDcaClientData, @@ -66,6 +76,7 @@ from .models import ( DcaSettlement, Machine, PartialDispenseData, + PublishCassettesPayload, SetCommissionSplitsData, SettleBalanceData, StuckSettlementsResponse, @@ -75,6 +86,7 @@ from .models import ( UpdateDepositStatusData, UpdateMachineData, UpdateSuperConfigData, + UpsertCassetteConfigData, ) satmachineadmin_api_router = APIRouter() @@ -759,3 +771,127 @@ async def api_update_super_config( HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config" ) return config + + +# ============================================================================= +# Cassette configs (#29 v1.1) — per-machine ATM cassette inventory +# ============================================================================= +# v1.1 surface, paired with aiolabs/lamassu-next#56 ATM-side. Two endpoints: +# GET /machines/{id}/cassettes — list rows for the operator UI +# POST /machines/{id}/cassettes/publish — apply edits + publish kind-30078 +# +# Row creation (new (machine_id, position) pairs) is admin-only via the +# bootstrap consumer task — slot count is hardware-determined. Operator- +# side flow is edit-and-publish over the existing rows only; the editable +# fields per row are denomination and count. + + +@satmachineadmin_api_router.get( + "/api/v1/dca/machines/{machine_id}/cassettes", + response_model=list[CassetteConfig], +) +async def api_list_machine_cassettes( + machine_id: str, user: User = Depends(check_user_exists) +) -> list[CassetteConfig]: + """List the cassette config rows for one of the operator's machines, + ordered by position. Empty list = ATM hasn't yet published its + bootstrap event (or the bootstrap consumer hasn't processed it yet); + UI should show a "waiting for ATM" state.""" + await _machine_owned_by(machine_id, user.id) + return await list_cassette_configs_for_machine(machine_id) + + +@satmachineadmin_api_router.post( + "/api/v1/dca/machines/{machine_id}/cassettes/publish", + response_model=list[CassetteConfig], +) +async def api_publish_machine_cassettes( + machine_id: str, + payload: PublishCassettesPayload, + user: User = Depends(check_user_exists), +) -> list[CassetteConfig]: + """Operator submits the full per-machine cassette state for publish to + the ATM. Validates the position set matches what's currently in + cassette_configs for the machine (slot count is hardware-fixed), + upserts each row, then encrypts + signs + publishes a kind-30078 + event tagged with d=bitspire-cassettes: and + p=. + + The `` placeholder in the published d-tag is the ATM's hex pubkey + from machine.machine_npub (canonicalised via normalize_public_key), + NOT the internal dca_machines.id UUID — see #29 'machine_id semantics' + section and coord-log 2026-05-30T11:50Z load-bearing nudge. + + Returns the fresh cassette_configs rows after the upserts so the UI + can refresh its table from one round-trip. + + Errors: + 400 — payload position set doesn't match the machine's stored set + (operator publishing for a slot that doesn't exist on the + ATM; or the bootstrap hasn't landed yet so no rows exist) + 400 — operator hasn't onboarded a Nostr identity + 503 — signer offline / client-side-only, or nostrclient extension + not installed on this LNbits instance + 500 — anything else from the publish path + """ + machine = await _machine_owned_by(machine_id, user.id) + + existing = await list_cassette_configs_for_machine(machine_id) + existing_positions = {row.position for row in existing} + incoming_positions = set(payload.positions.keys()) + + if not existing: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + ( + "No cassette_configs rows exist for this machine yet — " + "waiting for the ATM's bootstrap state event. Power on the " + "ATM and confirm it has reached the configured relay; " + "satmachineadmin will auto-populate cassette_configs on " + "receipt." + ), + ) + if existing_positions != incoming_positions: + missing = existing_positions - incoming_positions + extra = incoming_positions - existing_positions + raise HTTPException( + HTTPStatus.BAD_REQUEST, + ( + "Payload position set doesn't match the machine's stored " + f"set. Missing from payload: {sorted(missing)}; extra in " + f"payload: {sorted(extra)}. Slot count is hardware-fixed " + "— re-provision the ATM via atm-tui to add/remove physical " + "bays, then re-publish." + ), + ) + + # Apply each per-row edit so the operator-believed state on + # satmachineadmin reflects the published payload, even if the ATM + # ack lands later (v2). updated_by audit-stamps the operator user id. + for pos, row in payload.positions.items(): + updated = await update_cassette_config( + machine_id, + pos, + UpsertCassetteConfigData(denomination=row.denomination, count=row.count), + updated_by=user.id, + ) + if updated is None: + # Defensive — we just validated the row exists, but a + # concurrent delete could land between. Surface as 500. + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, + f"cassette row for position {pos} disappeared mid-publish", + ) + + try: + await publish_to_atm(machine, payload, user.id) + except OperatorIdentityMissing as exc: + raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc + except SignerUnavailable as exc: + raise HTTPException(HTTPStatus.SERVICE_UNAVAILABLE, str(exc)) from exc + except RelayUnavailable as exc: + raise HTTPException(HTTPStatus.SERVICE_UNAVAILABLE, str(exc)) from exc + except CassetteTransportError as exc: + raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR, str(exc)) from exc + + return await list_cassette_configs_for_machine(machine_id)