Merge pull request 'feat(v2): operator-side cassette inventory v1.1 + signer.nip44_* migration (#29)' (#30) from feat/cassette-config-v1 into v2-bitspire

Reviewed-on: #30
This commit is contained in:
padreug 2026-05-31 13:54:18 +00:00
commit 44f6c0b1bd
13 changed files with 3095 additions and 165 deletions

View file

@ -5,7 +5,7 @@ from lnbits.tasks import create_permanent_unique_task
from loguru import logger
from .crud import db
from .tasks import wait_for_paid_invoices
from .tasks import wait_for_cassette_state_events, wait_for_paid_invoices
from .views import satmachineadmin_generic_router
from .views_api import satmachineadmin_api_router
@ -42,6 +42,14 @@ def satmachineadmin_start():
"ext_satmachineadmin", wait_for_paid_invoices
)
scheduled_tasks.append(invoice_task)
# Cassette bootstrap consumer (#29 v1) — subscribes to
# bitspire-cassettes-state events from each active ATM and upserts
# cassette_configs on receipt. Soft-fails if nostrclient isn't
# installed (logs + backs off, never crashes).
cassette_task = create_permanent_unique_task(
"ext_satmachineadmin_cassette_bootstrap", wait_for_cassette_state_events
)
scheduled_tasks.append(cassette_task)
__all__ = [

424
cassette_transport.py Normal file
View file

@ -0,0 +1,424 @@
"""
Cassette-config Nostr transport operator ATM kind-30078 publish + consume.
Per the locked design at aiolabs/satmachineadmin#29 (paired with
lamassu-next#56) and the dcd0874 privacy-by-default pivot, the operator
publishes position-keyed cassette config to a target ATM via:
kind = 30078 (NIP-78, replaceable)
tags = [
["d", "bitspire-cassettes:<atm_pubkey_hex>"],
["p", "<atm_pubkey_hex>"]
]
content = NIP-44 v2 encrypted JSON of PublishCassettesPayload.to_wire_dict()
pubkey = operator pubkey
sig = operator signature
The ATM-side consumer (lamassu-next#56) subscribes by the d-tag + its own
npub, decrypts, validates, applies, hot-reloads HAL.
Reverse direction (ATM operator, v1 = one-shot bootstrap on first boot,
v2 = continuous reverse channel for reconciliation):
kind = 30078
tags = [
["d", "bitspire-cassettes-state:<atm_pubkey_hex>"],
["p", "<operator_pubkey_hex>"]
]
content = NIP-44 v2 encrypted JSON, same PublishCassettesPayload shape
pubkey = ATM pubkey
This module owns the wire-format side of both directions. The consumer
task (tasks.py) calls `decrypt_and_parse_state_event` per incoming event;
the API endpoint (views_api.py) calls `publish_to_atm` per operator submit.
The `<m>` placeholder semantics (load-bearing per the 2026-05-30T11:50Z
coord-log entry): always the ATM's hex pubkey, NEVER satmachineadmin's
internal dca_machines.id UUID. Helper `_atm_hex_pubkey(machine)`
centralises the canonicalisation via lnbits.utils.nostr.normalize_public_key.
"""
from __future__ import annotations
import json
import time
from lnbits.core.crud.users import get_account
from lnbits.core.services.nip46_bunker_client import (
NsecBunkerRpcError,
NsecBunkerTimeoutError,
)
from lnbits.core.signers import resolve_signer
from lnbits.core.signers.base import (
NostrSigner,
SignerError,
SignerUnavailableError,
)
from lnbits.utils.nostr import normalize_public_key
from loguru import logger
from .models import Machine, PublishCassettesPayload
from .nip44 import Nip44Error
from .nip44 import decrypt_from as _nip44_local_decrypt
from .nip44 import encrypt_for as _nip44_local_encrypt
_KIND_NIP78 = 30078
_D_TAG_CONFIG_PREFIX = "bitspire-cassettes:" # operator → ATM
_D_TAG_STATE_PREFIX = "bitspire-cassettes-state:" # ATM → operator
# =============================================================================
# Errors
# =============================================================================
class CassetteTransportError(Exception):
"""Generic transport-layer error. Subclasses distinguish failure modes
so the API can surface meaningful HTTP statuses + the consumer task
can log + skip without crashing."""
class OperatorIdentityMissing(CassetteTransportError):
"""Operator account has no Nostr pubkey on file, or no signer is
available (pre-bunker rollout operator hasn't onboarded via
Nostr-login)."""
class SignerUnavailable(CassetteTransportError):
"""Resolved signer can't sign server-side (client-side-only signer,
or transient bunker unreachability post-lnbits#18). Publish skipped."""
class RelayUnavailable(CassetteTransportError):
"""nostrclient extension isn't installed or its relay manager isn't
reachable. Treated as soft-fail; publish skipped + logged."""
class CassetteEventDecodeError(CassetteTransportError):
"""Inbound state event failed validation: bad signature, NIP-44 v2
decrypt failure, or payload didn't conform to PublishCassettesPayload.
Terminal caller should log + skip, advancing past the event."""
class CassetteEventTransientError(CassetteTransportError):
"""Inbound state event couldn't be decrypted because the signer
component (typically the bunker) is transiently unavailable. Caller
should NOT advance past the event; retry on next tick.
Distinct from CassetteEventDecodeError so the consumer task can
differentiate "MAC failed, give up" from "bunker is partitioned, try
again in a few seconds" — surfaced by lnbits at coord-log
2026-05-31T07:10Z as the load-bearing distinction post-PR-#38."""
# =============================================================================
# Helpers — canonical pubkey + d-tag construction
# =============================================================================
def _atm_hex_pubkey(machine: Machine) -> str:
"""Canonicalise machine.machine_npub (hex OR npub bech32 — operator
enters either in the UI) to lowercase hex. ALL d-tag substitutions
use this value; using the internal machine.id UUID would silently
no-op the wire-level filter (per coord-log 11:50Z load-bearing nudge).
"""
return normalize_public_key(machine.machine_npub).lower()
def _config_d_tag(atm_pubkey_hex: str) -> str:
"""d-tag for operator → ATM publish. ATM subscribes by this tag."""
return f"{_D_TAG_CONFIG_PREFIX}{atm_pubkey_hex}"
def _state_d_tag(atm_pubkey_hex: str) -> str:
"""d-tag for ATM → operator publish (bootstrap in v1, continuous v2)."""
return f"{_D_TAG_STATE_PREFIX}{atm_pubkey_hex}"
def build_state_d_tags_for_machines(machines: list[Machine]) -> list[str]:
"""Bootstrap-consumer subscription filter helper: returns the full
`#d=[...]` list for all known ATMs an operator subscribes to."""
return [_state_d_tag(_atm_hex_pubkey(m)) for m in machines]
# =============================================================================
# Sign-as-operator — hybrid path (resolve_signer post #17, prvkey fallback)
# =============================================================================
async def _resolve_operator_signer(operator_user_id: str):
"""Fetch the operator's account + resolve to a NostrSigner.
Single source of truth for "give me the signer for this operator,
or raise an operator-facing error if we can't." Returns
`(account, signer)` so callers that need both (publish path needs
`account.pubkey` for the event author and the signer for both
encrypt + sign) don't double-fetch.
Raises:
- OperatorIdentityMissing no account, or no pubkey on file
- SignerUnavailable signer resolve failed, or signer can't sign
server-side (ClientSideOnly)
"""
account = await get_account(operator_user_id)
if account is None or not account.pubkey:
raise OperatorIdentityMissing(
f"operator {operator_user_id[:8]}... has no Nostr pubkey on file. "
"Onboard via the LNbits Nostr-login flow to publish cassette "
"config to your ATMs."
)
try:
signer = resolve_signer(account)
except SignerError as exc:
raise SignerUnavailable(
f"signer resolve failed for operator {operator_user_id[:8]}...: " f"{exc}"
) from exc
if not signer.can_sign():
raise SignerUnavailable(
f"operator {operator_user_id[:8]}... has a client-side-only "
"signer; server can't sign or NIP-44-encrypt on their behalf. "
"Operator must hold their nsec via a NIP-46 bunker (lnbits#18) "
"or migrate to a server-signing account."
)
return account, signer
async def _sign_as_operator(operator_user_id: str, event: dict) -> dict | None:
"""Sign `event` using the operator's signer (LocalSigner or
RemoteBunkerSigner). Mutates `event` to add `created_at` (now),
`pubkey`, `id`, and `sig`.
Raises typed CassetteTransportError subclasses on hard failure
(the publish endpoint maps these to HTTP statuses); never returns
None on the publish path.
"""
_account, signer = await _resolve_operator_signer(operator_user_id)
# created_at is part of the BIP-340 event-id hash; set before signing.
event["created_at"] = int(time.time())
try:
return await signer.sign_event(event)
except SignerUnavailableError as exc:
raise SignerUnavailable(
f"signer unavailable for operator {operator_user_id[:8]}...: " f"{exc}"
) from exc
async def _nip44_encrypt_via_signer(
account, signer: NostrSigner, plaintext: str, peer_pubkey_hex: str
) -> str:
"""NIP-44 v2 encrypt via the signer abstraction, with a transitional
fallback to direct-prvkey for LocalSigner accounts.
The bunker (RemoteBunkerSigner) implements `nip44_encrypt` natively
the operator's nsec never leaves the bunker process. LocalSigner's
`nip44_encrypt` stub explicitly raises SignerUnavailableError
("LocalSigner does not implement nip44_encrypt") per the
post-PR-#38 ABC — the spec is "migrate to bunker." For the
transitional window where some operators are still on LocalSigner
+ their `account.prvkey` is intact, we catch that signal and use
our hand-rolled NIP-44 v2 impl against the stored prvkey. Same
wire output either way.
Removed once every operator account on this instance is bunker-
backed (S7 fully landed). At that point this helper collapses to
`return await signer.nip44_encrypt(plaintext, peer_pubkey_hex)`.
"""
try:
return await signer.nip44_encrypt(plaintext, peer_pubkey_hex)
except SignerUnavailableError:
if account.signer_type == "LocalSigner" and account.prvkey:
return _nip44_local_encrypt(plaintext, account.prvkey, peer_pubkey_hex)
# ClientSideOnly, or RemoteBunkerSigner with bunker comms failure
# at config time — re-raise without wrapping; caller maps it.
raise
async def _nip44_decrypt_via_signer(
account, signer: NostrSigner, ciphertext: str, peer_pubkey_hex: str
) -> str:
"""Decrypt mirror of `_nip44_encrypt_via_signer`. Same LocalSigner
transitional fallback."""
try:
return await signer.nip44_decrypt(ciphertext, peer_pubkey_hex)
except SignerUnavailableError:
if account.signer_type == "LocalSigner" and account.prvkey:
return _nip44_local_decrypt(ciphertext, account.prvkey, peer_pubkey_hex)
raise
# =============================================================================
# Publish — operator → ATM (the satmachineadmin API path)
# =============================================================================
async def _publish_signed_event(signed_event: dict) -> None:
"""Send a signed Nostr event to all relays via the nostrclient
extension's singleton RelayManager.
Lazy import + typed-error so the API can surface "your LNbits doesn't
have nostrclient installed" as a 503 rather than a 500. Pattern
matches the cross-extension import guards in
`lnbits.core.services.users` (nostrmarket / nostrrelay).
"""
try:
from nostrclient.router import ( # type: ignore[import-not-found]
nostr_client,
)
except ImportError as exc:
raise RelayUnavailable(
"nostrclient extension is not installed; cassette config "
"publish requires it. Install + activate the nostrclient "
"extension on this LNbits instance."
) from exc
msg = json.dumps(["EVENT", signed_event])
nostr_client.relay_manager.publish_message(msg)
async def publish_to_atm(
machine: Machine,
payload: PublishCassettesPayload,
operator_user_id: str,
) -> dict:
"""Build, encrypt, sign, and publish a kind-30078 cassette config event
from the operator to the target ATM.
Returns the signed event dict on success (caller may log event.id for
audit). Raises CassetteTransportError subclasses on hard failures:
- OperatorIdentityMissing 400: operator hasn't onboarded
- SignerUnavailable 503: signer offline / client-side-only / bunker
timeout at the encrypt or sign step
- RelayUnavailable 503: nostrclient not installed
- CassetteTransportError 500: anything else
"""
atm_pubkey_hex = _atm_hex_pubkey(machine)
# Single fetch + resolve — same signer is used for both encrypt and sign.
account, signer = await _resolve_operator_signer(operator_user_id)
# NIP-44 v2 encrypt the wire payload. Bunker round-trip on
# RemoteBunkerSigner; direct prvkey on LocalSigner (transitional).
plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":"))
try:
content = await _nip44_encrypt_via_signer(
account, signer, plaintext, atm_pubkey_hex
)
except NsecBunkerTimeoutError as exc:
raise SignerUnavailable(
f"bunker unreachable while encrypting cassette config for "
f"operator {operator_user_id[:8]}...: {exc}"
) from exc
except NsecBunkerRpcError as exc:
raise SignerUnavailable(
f"bunker rejected nip44_encrypt for operator "
f"{operator_user_id[:8]}... (policy / MAC / config issue): "
f"{exc}"
) from exc
except SignerUnavailableError as exc:
raise SignerUnavailable(
f"signer cannot nip44-encrypt for operator "
f"{operator_user_id[:8]}...: {exc}"
) from exc
event: dict = {
"kind": _KIND_NIP78,
"tags": [
["d", _config_d_tag(atm_pubkey_hex)],
["p", atm_pubkey_hex],
],
"content": content,
# created_at is set inside _sign_as_operator before signing.
}
signed = await _sign_as_operator(operator_user_id, event)
if signed is None:
raise CassetteTransportError(
"sign_as_operator returned None unexpectedly — soft-fail path "
"shouldn't be reachable on a publish-initiated flow"
)
await _publish_signed_event(signed)
logger.info(
f"satmachineadmin: published kind-30078 cassette config to ATM "
f"{atm_pubkey_hex[:12]}... (event_id={signed['id'][:12]}..., "
f"machine_id={machine.id}, positions={sorted(payload.positions.keys())})"
)
return signed
# =============================================================================
# Consume — ATM → operator (the bootstrap consumer task)
# =============================================================================
async def decrypt_and_parse_state_event(
event: dict, account, signer: NostrSigner
) -> PublishCassettesPayload:
"""Decrypt + parse an inbound `bitspire-cassettes-state:<atm_pubkey_hex>`
event the ATM published toward the operator.
Caller is responsible for:
- filtering on `kind=30078` and the expected `#d` tag list
- verifying the event signature (lnbits.utils.nostr.verify_event)
- confirming `event["pubkey"]` matches a known ATM (= machine.machine_npub
canonicalised) the consumer task does this before calling here
- resolving the operator's account + signer via
`_resolve_operator_signer(...)` and passing them in
This function does:
- NIP-44 v2 decrypt of event["content"] via `signer.nip44_decrypt`
(bunker round-trip on RemoteBunkerSigner; direct prvkey on the
transitional LocalSigner path)
- JSON parse + PublishCassettesPayload validation
Error mapping:
- CassetteEventTransientError on NsecBunkerTimeoutError caller
should NOT advance state_event_id; retry on next consumer tick
- CassetteEventDecodeError on anything else (bunker RPC reject,
signer unavailable, MAC failure, JSON parse, payload shape)
terminal; caller logs + skips
"""
sender_pubkey = event.get("pubkey")
content = event.get("content")
if not isinstance(sender_pubkey, str) or not isinstance(content, str):
raise CassetteEventDecodeError(
"event missing required pubkey or content fields"
)
try:
plaintext = await _nip44_decrypt_via_signer(
account, signer, content, sender_pubkey
)
except NsecBunkerTimeoutError as exc:
raise CassetteEventTransientError(
f"bunker unreachable while decrypting cassette state event: {exc}"
) from exc
except NsecBunkerRpcError as exc:
raise CassetteEventDecodeError(
f"bunker rejected nip44_decrypt (policy / MAC / config): {exc}"
) from exc
except SignerUnavailableError as exc:
raise CassetteEventDecodeError(f"signer cannot nip44-decrypt: {exc}") from exc
except Nip44Error as exc:
# Hand-rolled LocalSigner fallback path (transitional) — MAC fail
# / version mismatch / length issue.
raise CassetteEventDecodeError(
f"NIP-44 v2 decrypt failed (LocalSigner fallback path): {exc}"
) from exc
except ValueError as exc:
# coincurve raises ValueError on a malformed pubkey hex (only
# reachable via the LocalSigner fallback path; the bunker handles
# pubkey validation server-side).
raise CassetteEventDecodeError(f"sender pubkey is malformed: {exc}") from exc
try:
raw = json.loads(plaintext)
except json.JSONDecodeError as exc:
raise CassetteEventDecodeError(
f"decrypted content isn't valid JSON: {exc}"
) from exc
try:
return PublishCassettesPayload(**raw)
except Exception as exc:
raise CassetteEventDecodeError(
f"payload didn't validate as PublishCassettesPayload: {exc}"
) from exc

324
crud.py
View file

@ -6,12 +6,12 @@
# machine model".
from datetime import datetime
from typing import List, Optional
from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
from .models import (
CassetteConfig,
ClientBalanceSummary,
CommissionSplit,
CommissionSplitLeg,
@ -26,6 +26,7 @@ from .models import (
DcaPayment,
DcaSettlement,
Machine,
PublishCassettesPayload,
SuperConfig,
TelemetrySnapshot,
UpdateDcaClientData,
@ -33,6 +34,7 @@ from .models import (
UpdateDepositStatusData,
UpdateMachineData,
UpdateSuperConfigData,
UpsertCassetteConfigData,
UpsertDcaLpData,
)
@ -44,7 +46,7 @@ db = Database("ext_satoshimachine")
# =============================================================================
async def get_super_config() -> Optional[SuperConfig]:
async def get_super_config() -> SuperConfig | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.super_config WHERE id = :id",
{"id": "default"},
@ -52,7 +54,7 @@ async def get_super_config() -> Optional[SuperConfig]:
)
async def update_super_config(data: UpdateSuperConfigData) -> Optional[SuperConfig]:
async def update_super_config(data: UpdateSuperConfigData) -> SuperConfig | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_super_config()
@ -100,7 +102,7 @@ async def create_machine(operator_user_id: str, data: CreateMachineData) -> Mach
return machine
async def get_machine(machine_id: str) -> Optional[Machine]:
async def get_machine(machine_id: str) -> Machine | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_machines WHERE id = :id",
{"id": machine_id},
@ -108,7 +110,7 @@ async def get_machine(machine_id: str) -> Optional[Machine]:
)
async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]:
async def get_machine_by_npub(machine_npub: str) -> Machine | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_machines WHERE machine_npub = :npub",
{"npub": machine_npub},
@ -116,7 +118,7 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]:
)
async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]:
async def get_active_machine_by_wallet_id(wallet_id: str) -> Machine | None:
"""Used by the invoice listener to route an incoming payment to a machine."""
return await db.fetchone(
"""
@ -129,7 +131,7 @@ async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]:
)
async def get_machines_for_operator(operator_user_id: str) -> List[Machine]:
async def get_machines_for_operator(operator_user_id: str) -> list[Machine]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_machines
@ -141,7 +143,46 @@ async def get_machines_for_operator(operator_user_id: str) -> List[Machine]:
)
async def update_machine(machine_id: str, data: UpdateMachineData) -> Optional[Machine]:
async def list_all_active_machines() -> list[Machine]:
"""Used by the cassette bootstrap consumer task to build a single
cross-operator subscription filter. Each event's pubkey routes to
the right operator via get_machine_by_atm_pubkey_hex + the machine's
operator_user_id.
"""
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_machines
WHERE is_active = true
ORDER BY created_at DESC
""",
{},
Machine,
)
async def get_machine_by_atm_pubkey_hex(atm_pubkey_hex: str) -> Machine | None:
"""Look up an active machine by its ATM pubkey, accepting hex or bech32
in machine_npub. Used by the cassette bootstrap consumer to route an
incoming state event to the right machine row (and therefore operator
privkey for decryption).
O(N) over active machines fine for small fleets. If fleet sizes
grow, normalise machine_npub-at-write to hex and add an index.
"""
from lnbits.utils.nostr import normalize_public_key
target = atm_pubkey_hex.lower()
machines = await list_all_active_machines()
for m in machines:
try:
if normalize_public_key(m.machine_npub).lower() == target:
return m
except (ValueError, AssertionError):
continue
return None
async def update_machine(machine_id: str, data: UpdateMachineData) -> Machine | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_machine(machine_id)
@ -213,7 +254,7 @@ _CLIENT_FROM = (
)
async def get_dca_client(client_id: str) -> Optional[DcaClient]:
async def get_dca_client(client_id: str) -> DcaClient | None:
return await db.fetchone(
f"SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM} WHERE c.id = :id",
{"id": client_id},
@ -223,7 +264,7 @@ async def get_dca_client(client_id: str) -> Optional[DcaClient]:
async def get_dca_client_for_machine_user(
machine_id: str, user_id: str
) -> Optional[DcaClient]:
) -> DcaClient | None:
return await db.fetchone(
f"""
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
@ -234,7 +275,7 @@ async def get_dca_client_for_machine_user(
)
async def get_dca_clients_for_machine(machine_id: str) -> List[DcaClient]:
async def get_dca_clients_for_machine(machine_id: str) -> list[DcaClient]:
return await db.fetchall(
f"""
SELECT {_CLIENT_SELECT} FROM {_CLIENT_FROM}
@ -246,7 +287,7 @@ async def get_dca_clients_for_machine(machine_id: str) -> List[DcaClient]:
)
async def get_dca_clients_for_operator(operator_user_id: str) -> List[DcaClient]:
async def get_dca_clients_for_operator(operator_user_id: str) -> list[DcaClient]:
"""All clients across every machine this operator owns."""
return await db.fetchall(
f"""
@ -261,7 +302,7 @@ async def get_dca_clients_for_operator(operator_user_id: str) -> List[DcaClient]
)
async def get_dca_clients_for_user(user_id: str) -> List[DcaClient]:
async def get_dca_clients_for_user(user_id: str) -> list[DcaClient]:
"""LP cross-operator view — every machine this LP is registered at."""
return await db.fetchall(
f"""
@ -274,7 +315,7 @@ async def get_dca_clients_for_user(user_id: str) -> List[DcaClient]:
)
async def get_flow_mode_clients_for_machine(machine_id: str) -> List[DcaClient]:
async def get_flow_mode_clients_for_machine(machine_id: str) -> list[DcaClient]:
"""Active LPs enrolled at this machine whose per-user `dca_lp` row
has `default_dca_mode = 'flow'`. Used by the distribution algorithm.
@ -302,7 +343,7 @@ async def get_flow_mode_clients_for_machine(machine_id: str) -> List[DcaClient]:
# =============================================================================
async def get_dca_lp(user_id: str) -> Optional[DcaLpPreferences]:
async def get_dca_lp(user_id: str) -> DcaLpPreferences | None:
"""Return the LP's preferences row, or None if they haven't onboarded
via satmachineclient yet."""
return await db.fetchone(
@ -325,7 +366,7 @@ async def upsert_dca_lp(
user_id: str,
data: UpsertDcaLpData,
*,
fallback_wallet_id: Optional[str] = None,
fallback_wallet_id: str | None = None,
) -> DcaLpPreferences:
"""Create or update the LP's preferences row.
@ -380,7 +421,7 @@ async def upsert_dca_lp(
async def update_dca_client(
client_id: str, data: UpdateDcaClientData
) -> Optional[DcaClient]:
) -> DcaClient | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_dca_client(client_id)
@ -442,7 +483,7 @@ async def create_deposit(
return deposit
async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]:
async def get_deposit(deposit_id: str) -> DcaDeposit | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_deposits WHERE id = :id",
{"id": deposit_id},
@ -450,7 +491,7 @@ async def get_deposit(deposit_id: str) -> Optional[DcaDeposit]:
)
async def get_deposits_for_client(client_id: str) -> List[DcaDeposit]:
async def get_deposits_for_client(client_id: str) -> list[DcaDeposit]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_deposits
@ -462,7 +503,7 @@ async def get_deposits_for_client(client_id: str) -> List[DcaDeposit]:
)
async def get_deposits_for_operator(operator_user_id: str) -> List[DcaDeposit]:
async def get_deposits_for_operator(operator_user_id: str) -> list[DcaDeposit]:
return await db.fetchall(
"""
SELECT d.*
@ -478,7 +519,7 @@ async def get_deposits_for_operator(operator_user_id: str) -> List[DcaDeposit]:
async def update_deposit(
deposit_id: str, data: UpdateDepositData
) -> Optional[DcaDeposit]:
) -> DcaDeposit | None:
update_data = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return await get_deposit(deposit_id)
@ -493,7 +534,7 @@ async def update_deposit(
async def update_deposit_status(
deposit_id: str, data: UpdateDepositStatusData
) -> Optional[DcaDeposit]:
) -> DcaDeposit | None:
payload = {
"id": deposit_id,
"status": data.status,
@ -528,8 +569,8 @@ async def delete_deposit(deposit_id: str) -> None:
async def create_settlement_idempotent(
data: CreateDcaSettlementData,
initial_status: str,
error_message: Optional[str] = None,
) -> Optional[DcaSettlement]:
error_message: str | None = None,
) -> DcaSettlement | None:
"""Insert a settlement keyed by payment_hash.
Returns the inserted row on first sight; returns the existing row
@ -589,7 +630,7 @@ async def create_settlement_idempotent(
return await get_settlement(settlement_id)
async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]:
async def get_settlement(settlement_id: str) -> DcaSettlement | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_settlements WHERE id = :id",
{"id": settlement_id},
@ -599,7 +640,7 @@ async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]:
async def get_settlement_by_payment_hash(
payment_hash: str,
) -> Optional[DcaSettlement]:
) -> DcaSettlement | None:
return await db.fetchone(
"""
SELECT * FROM satoshimachine.dca_settlements
@ -612,7 +653,7 @@ async def get_settlement_by_payment_hash(
async def get_settlements_for_machine(
machine_id: str, limit: int = 100
) -> List[DcaSettlement]:
) -> list[DcaSettlement]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_settlements
@ -705,7 +746,7 @@ async def get_stuck_settlements_for_operator(
async def force_reset_stuck_settlement(
settlement_id: str,
) -> Optional[DcaSettlement]:
) -> DcaSettlement | None:
"""Operator escape hatch for genuinely stuck settlements (processor
crashed mid-flight, etc.). Flips 'pending'/'processing' 'errored' so
the existing retry endpoint can take over. Clears processing_claim.
@ -728,7 +769,7 @@ async def force_reset_stuck_settlement(
async def get_settlements_for_operator(
operator_user_id: str, limit: int = 200
) -> List[DcaSettlement]:
) -> list[DcaSettlement]:
return await db.fetchall(
"""
SELECT s.*
@ -746,8 +787,8 @@ async def get_settlements_for_operator(
async def mark_settlement_status(
settlement_id: str,
status: str,
error_message: Optional[str] = None,
) -> Optional[DcaSettlement]:
error_message: str | None = None,
) -> DcaSettlement | None:
"""Status: 'pending' | 'processing' | 'processed' | 'partial' |
'refunded' | 'errored'. Clears processing_claim on terminal states so a
fresh claim attempt won't see a stale token."""
@ -778,7 +819,7 @@ async def mark_settlement_status(
async def claim_settlement_for_processing(
settlement_id: str,
) -> Optional[DcaSettlement]:
) -> DcaSettlement | None:
"""Optimistic-lock claim: atomically flip a settlement to 'processing'
and tag it with a per-invocation token. Returns the claimed row on
success; None if another caller already won the claim or the settlement
@ -808,7 +849,7 @@ async def claim_settlement_for_processing(
async def reset_settlement_for_retry(
settlement_id: str,
) -> Optional[DcaSettlement]:
) -> DcaSettlement | None:
"""Operator retry path. Flips 'errored''pending' and voids any
'failed' legs so process_settlement re-runs them fresh. Completed legs
are left in place we never re-pay sats that already moved."""
@ -844,7 +885,7 @@ async def apply_partial_dispense(
new_operator_fee_sats: int,
new_fiat_amount: float,
appended_note: str,
) -> Optional[DcaSettlement]:
) -> DcaSettlement | None:
"""Overwrite the monetary fields on a settlement (partial-dispense
recompute) and prepend `appended_note` to the notes column.
@ -899,7 +940,7 @@ async def count_completed_legs_for_settlement(settlement_id: str) -> int:
async def append_settlement_note(
settlement_id: str, note: str, author_user_id: str
) -> Optional[DcaSettlement]:
) -> DcaSettlement | None:
"""Prepend an operator-authored note to settlement.notes. Each entry is
timestamped (UTC) and tagged with the author's user id so the trail
is accountable. Append-only: existing entries are never edited."""
@ -944,8 +985,8 @@ async def void_open_legs_for_settlement(settlement_id: str) -> None:
async def get_commission_splits(
operator_user_id: str, machine_id: Optional[str] = None
) -> List[CommissionSplit]:
operator_user_id: str, machine_id: str | None = None
) -> list[CommissionSplit]:
"""Returns the rule set for the given scope.
Precedence (caller's responsibility): try per-machine override first;
@ -974,7 +1015,7 @@ async def get_commission_splits(
async def get_effective_commission_splits(
operator_user_id: str, machine_id: str
) -> List[CommissionSplit]:
) -> list[CommissionSplit]:
"""Per-machine override if set, otherwise operator's default ruleset."""
overrides = await get_commission_splits(operator_user_id, machine_id)
if overrides:
@ -984,9 +1025,9 @@ async def get_effective_commission_splits(
async def replace_commission_splits(
operator_user_id: str,
machine_id: Optional[str],
legs: List[CommissionSplitLeg],
) -> List[CommissionSplit]:
machine_id: str | None,
legs: list[CommissionSplitLeg],
) -> list[CommissionSplit]:
"""Atomic replace for the (operator, machine) scope. Caller should have
already validated legs sum to 1.0 via the Pydantic model."""
if machine_id is None:
@ -1072,7 +1113,7 @@ async def create_dca_payment(data: CreateDcaPaymentData) -> DcaPayment:
return payment
async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]:
async def get_dca_payment(payment_id: str) -> DcaPayment | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_payments WHERE id = :id",
{"id": payment_id},
@ -1080,7 +1121,7 @@ async def get_dca_payment(payment_id: str) -> Optional[DcaPayment]:
)
async def get_payments_for_settlement(settlement_id: str) -> List[DcaPayment]:
async def get_payments_for_settlement(settlement_id: str) -> list[DcaPayment]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_payments
@ -1092,7 +1133,7 @@ async def get_payments_for_settlement(settlement_id: str) -> List[DcaPayment]:
)
async def get_payments_for_client(client_id: str) -> List[DcaPayment]:
async def get_payments_for_client(client_id: str) -> list[DcaPayment]:
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_payments
@ -1105,8 +1146,8 @@ async def get_payments_for_client(client_id: str) -> List[DcaPayment]:
async def get_payments_for_operator(
operator_user_id: str, leg_type: Optional[str] = None, limit: int = 200
) -> List[DcaPayment]:
operator_user_id: str, leg_type: str | None = None, limit: int = 200
) -> list[DcaPayment]:
if leg_type is None:
return await db.fetchall(
"""
@ -1133,9 +1174,9 @@ async def get_payments_for_operator(
async def update_payment_status(
payment_id: str,
status: str,
external_payment_hash: Optional[str] = None,
error_message: Optional[str] = None,
) -> Optional[DcaPayment]:
external_payment_hash: str | None = None,
error_message: str | None = None,
) -> DcaPayment | None:
await db.execute(
"""
UPDATE satoshimachine.dca_payments
@ -1161,7 +1202,7 @@ async def update_payment_status(
async def get_client_balance_summary(
client_id: str,
) -> Optional[ClientBalanceSummary]:
) -> ClientBalanceSummary | None:
"""Per-client (and per-machine, since clients are per-machine in v2) summary.
DCA legs only settlement/autoforward/super_fee/operator_split legs are
@ -1210,7 +1251,7 @@ async def get_client_balance_summary(
# =============================================================================
async def get_telemetry(machine_id: str) -> Optional[TelemetrySnapshot]:
async def get_telemetry(machine_id: str) -> TelemetrySnapshot | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_telemetry WHERE machine_id = :mid",
{"mid": machine_id},
@ -1221,19 +1262,19 @@ async def get_telemetry(machine_id: str) -> Optional[TelemetrySnapshot]:
async def upsert_beacon_snapshot(
machine_id: str,
*,
cash_in: Optional[bool] = None,
cash_out: Optional[bool] = None,
cash_level: Optional[str] = None,
fiat: Optional[str] = None,
model: Optional[str] = None,
name: Optional[str] = None,
location: Optional[str] = None,
geo: Optional[str] = None,
fees_json: Optional[str] = None,
limits_json: Optional[str] = None,
denominations_json: Optional[str] = None,
version: Optional[str] = None,
) -> Optional[TelemetrySnapshot]:
cash_in: bool | None = None,
cash_out: bool | None = None,
cash_level: str | None = None,
fiat: str | None = None,
model: str | None = None,
name: str | None = None,
location: str | None = None,
geo: str | None = None,
fees_json: str | None = None,
limits_json: str | None = None,
denominations_json: str | None = None,
version: str | None = None,
) -> TelemetrySnapshot | None:
"""Upsert kind-30078 beacon fields. All fields are nullable because today's
upstream payload only carries cash_in/cash_out/cash_level/fiat/model (see
lamassu-next#43 — the enrichment is not yet shipped)."""
@ -1310,7 +1351,7 @@ async def upsert_beacon_snapshot(
async def upsert_fleet_snapshot(
machine_id: str, telemetry_json: str
) -> Optional[TelemetrySnapshot]:
) -> TelemetrySnapshot | None:
"""Upsert kind-30079 operator-only telemetry. Awaits lamassu-next#42 to
produce a real schema; we store the raw JSON blob until then."""
existing = await get_telemetry(machine_id)
@ -1334,3 +1375,156 @@ async def upsert_fleet_snapshot(
{"mid": machine_id, "json": telemetry_json, "now": now},
)
return await get_telemetry(machine_id)
# =============================================================================
# Cassette configs — operator-driven ATM cassette inventory (#29 v1.1).
# =============================================================================
# Row lifecycle per #29:
# - First population for a (machine_id, position) pair → apply_bootstrap_state
# (consumer reading the ATM's one-shot bitspire-cassettes-state event)
# - Operator edit of denomination or count → update_cassette_config
# (refuses to create new rows; the slot count is hardware-determined)
# - Row creation/deletion for a new position → admin only, via ATM
# re-provisioning + new bootstrap event (not exposed in v1 here)
def _should_apply_bootstrap_state(
existing_state_event_id: str | None, incoming_event_id: str
) -> bool:
"""Pure-function dedup gate for apply_bootstrap_state.
Returns False if any existing row for this machine already references
the incoming event_id (relay re-delivery after restart). True otherwise.
Extracted as a pure function so the dedup decision is unit-testable
without a database round-trip. The actual idempotency check in
apply_bootstrap_state fetches one existing row and passes its
state_event_id here.
"""
return existing_state_event_id != incoming_event_id
async def get_cassette_config(
machine_id: str, position: int
) -> CassetteConfig | None:
return await db.fetchone(
"SELECT * FROM satoshimachine.cassette_configs "
"WHERE machine_id = :mid AND position = :pos",
{"mid": machine_id, "pos": position},
CassetteConfig,
)
async def list_cassette_configs_for_machine(
machine_id: str,
) -> list[CassetteConfig]:
return await db.fetchall(
"SELECT * FROM satoshimachine.cassette_configs "
"WHERE machine_id = :mid ORDER BY position",
{"mid": machine_id},
CassetteConfig,
)
async def update_cassette_config(
machine_id: str,
position: int,
data: UpsertCassetteConfigData,
*,
updated_by: str | None = None,
) -> CassetteConfig | None:
"""Operator-driven row update: change denomination and/or count for a
single cassette slot. Refuses to create new rows those only land via
apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row
lifecycle: hardware-determined slot count, not operator-creatable).
Returns None if the (machine_id, position) row doesn't exist.
"""
existing = await get_cassette_config(machine_id, position)
if existing is None:
return None
update_data: dict = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return existing
update_data["updated_at"] = datetime.now()
update_data["updated_by"] = updated_by
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["mid"] = machine_id
update_data["pos"] = position
await db.execute(
f"UPDATE satoshimachine.cassette_configs SET {set_clause} "
"WHERE machine_id = :mid AND position = :pos",
update_data,
)
return await get_cassette_config(machine_id, position)
async def apply_bootstrap_state(
machine_id: str,
event_id: str,
event_created_at: datetime,
payload: PublishCassettesPayload,
) -> bool:
"""Consume an ATM-published kind-30078 bitspire-cassettes-state:<m> event
and upsert one cassette_configs row per position in the payload.
Returns True if the upsert ran; False if any existing row for this
machine already references this event_id (idempotent on relay
re-delivery / restart).
Populates both the operator-believed columns (denomination, count,
updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel
columns (state_denomination, state_count, state_at, state_event_id)
so the operator's initial view matches the ATM's reported state. v2
reconciliation UI will diverge them when continuous reverse-channel
events land + the operator subsequently edits.
"""
existing_first: dict | None = await db.fetchone(
"SELECT state_event_id FROM satoshimachine.cassette_configs "
"WHERE machine_id = :mid LIMIT 1",
{"mid": machine_id},
)
existing_event_id: str | None = None
if existing_first is not None:
existing_event_id = (
existing_first.get("state_event_id")
if isinstance(existing_first, dict)
else getattr(existing_first, "state_event_id", None)
)
if not _should_apply_bootstrap_state(existing_event_id, event_id):
return False
now = datetime.now()
for pos, row in payload.positions.items():
await db.execute(
"""
INSERT INTO satoshimachine.cassette_configs
(machine_id, position, denomination, count, updated_at,
updated_by, state_denomination, state_count, state_at,
state_event_id)
VALUES (:mid, :pos, :denom, :count, :now, :by,
:state_denom, :state_count, :state_at, :event_id)
ON CONFLICT (machine_id, position) DO UPDATE SET
denomination = excluded.denomination,
count = excluded.count,
updated_at = excluded.updated_at,
updated_by = excluded.updated_by,
state_denomination = excluded.state_denomination,
state_count = excluded.state_count,
state_at = excluded.state_at,
state_event_id = excluded.state_event_id
""",
{
"mid": machine_id,
"pos": pos,
"denom": row.denomination,
"count": row.count,
"now": now,
"by": "atm-bootstrap",
"state_denom": row.denomination,
"state_count": row.count,
"state_at": event_created_at,
"event_id": event_id,
},
)
return True

View file

@ -474,9 +474,7 @@ async def m006_rename_to_canonical_sat_vocabulary(db):
]
for table, old_col, new_col in renames:
try:
await db.fetchone(
f"SELECT {old_col} FROM satoshimachine.{table} LIMIT 1"
)
await db.fetchone(f"SELECT {old_col} FROM satoshimachine.{table} LIMIT 1")
except Exception:
# old column doesn't exist; either rename already landed or
# m001 produced the canonical schema directly on fresh install.
@ -496,15 +494,11 @@ async def m006_rename_to_canonical_sat_vocabulary(db):
]
for table, col in drops:
try:
await db.fetchone(
f"SELECT {col} FROM satoshimachine.{table} LIMIT 1"
)
await db.fetchone(f"SELECT {col} FROM satoshimachine.{table} LIMIT 1")
except Exception:
# column doesn't exist; either already dropped or never present.
continue
await db.execute(
f"ALTER TABLE satoshimachine.{table} DROP COLUMN {col}"
)
await db.execute(f"ALTER TABLE satoshimachine.{table} DROP COLUMN {col}")
async def m005_lock_deposit_currency_to_machine_fiat_code(db):
@ -538,3 +532,115 @@ async def m005_lock_deposit_currency_to_machine_fiat_code(db):
AND m.fiat_code != d.currency
)
""")
async def m007_add_cassette_configs(db):
"""Add cassette_configs table for operator-driven ATM cassette inventory.
Tracks per-machine cassette state (denomination, count, position) editable
via the satmachineadmin dashboard and published to the ATM as encrypted
kind-30078 events. See aiolabs/satmachineadmin#29 + lamassu-next#56.
Schema choice: PK (machine_id, denomination) mirrors the ATM-side
denomination-as-key invariant in
bitspire/atm-tui/src/db.zig:31 and
lamassu-next/apps/machine/electron/state-store.ts:54
(the cassettes table PK is denomination; HAL inventory map keys on
denomination; dispense lookup is cassetteDenominations.indexOf
duplicates collapse silently). Position is operator-assignable display
order, not the addressable unit.
Reserved nullable columns (state_count, state_at, state_event_id) hold
the latest bitspire-cassettes-state:<atm_pubkey_hex> event the ATM
publishes (one-shot bootstrap in v1; continuous in v2). v1 UI doesn't
render them; v2 reconciliation UI consumes them without a migration.
"""
await db.execute(f"""
CREATE TABLE IF NOT EXISTS satoshimachine.cassette_configs (
machine_id TEXT NOT NULL,
denomination INTEGER NOT NULL,
count INTEGER NOT NULL,
position INTEGER NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now},
updated_by TEXT,
state_count INTEGER,
state_at TIMESTAMP,
state_event_id TEXT,
PRIMARY KEY (machine_id, denomination)
);
""")
async def m008_flip_cassette_configs_pk_to_position(db):
"""Flip cassette_configs PK from (machine_id, denomination) to
(machine_id, position). The denomination-keyed shape from m007 was
wrong: real machines have N cartridges of the same denomination
(cash-out throughput requires multiple bays for one denom), and the
operator needs to swap cartridge denominations during refill ($20
bay becomes $50 bay) without a re-provisioning event.
Coordinated v1.1 fix with the ATM side per the 2026-05-30T18:30Z +
18:45Z log entries:
- Wire shape flips from {denominations: {<d>: {position, count}}}
to {positions: {<p>: {denomination, count}}}
- Position becomes the fixed row identity (hardware bay number);
denomination + count are operator-editable per row
- NO unique constraint on denomination (multiple same-denom cassettes
are operationally valid)
Also adds `state_denomination` nullable column reserved for v2
reverse-channel reconciliation (operator-believed denomination per
slot vs ATM-reported denomination diff highlighting in v2 UI).
SQLite doesn't support ALTER PRIMARY KEY directly; the migration
does the standard create-copy-drop-rename dance. Idempotent via the
column-probe trick used elsewhere in this file.
"""
try:
# Probe: does the old PK shape still exist? If state_denomination
# column already exists, m008 already ran — no-op.
await db.fetchone(
"SELECT state_denomination FROM satoshimachine.cassette_configs " "LIMIT 1"
)
return
except Exception:
pass
await db.execute(f"""
CREATE TABLE IF NOT EXISTS satoshimachine.cassette_configs_new (
machine_id TEXT NOT NULL,
position INTEGER NOT NULL,
denomination INTEGER NOT NULL,
count INTEGER NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now},
updated_by TEXT,
state_denomination INTEGER,
state_count INTEGER,
state_at TIMESTAMP,
state_event_id TEXT,
PRIMARY KEY (machine_id, position)
);
""")
# Backfill from the old table — column-by-column copy. In the v1
# m007 schema the row's `denomination` was simultaneously the
# operator-believed denomination AND the ATM-reported denomination
# (because the only write path was the bootstrap consumer copying
# from the ATM's state.db). So state_denomination at migration time
# = current denomination as a best-guess baseline; the next bootstrap
# event re-populates the state_* columns authoritatively.
await db.execute("""
INSERT INTO satoshimachine.cassette_configs_new
(machine_id, position, denomination, count,
updated_at, updated_by,
state_denomination, state_count, state_at, state_event_id)
SELECT machine_id, position, denomination, count,
updated_at, updated_by,
denomination, state_count, state_at, state_event_id
FROM satoshimachine.cassette_configs
""")
await db.execute("DROP TABLE satoshimachine.cassette_configs")
await db.execute(
"ALTER TABLE satoshimachine.cassette_configs_new " "RENAME TO cassette_configs"
)

298
models.py
View file

@ -6,7 +6,6 @@
# the plan at ~/.claude/plans/snug-gliding-shamir.md.
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, validator
@ -26,8 +25,8 @@ class CreateMachineData(BaseModel):
machine_npub: str
wallet_id: str
name: Optional[str] = None
location: Optional[str] = None
name: str | None = None
location: str | None = None
fiat_code: str = "GTQ"
@ -36,8 +35,8 @@ class Machine(BaseModel):
operator_user_id: str
machine_npub: str
wallet_id: str
name: Optional[str]
location: Optional[str]
name: str | None
location: str | None
fiat_code: str
is_active: bool
created_at: datetime
@ -45,11 +44,11 @@ class Machine(BaseModel):
class UpdateMachineData(BaseModel):
name: Optional[str] = None
location: Optional[str] = None
fiat_code: Optional[str] = None
is_active: Optional[bool] = None
wallet_id: Optional[str] = None
name: str | None = None
location: str | None = None
fiat_code: str | None = None
is_active: bool | None = None
wallet_id: str | None = None
# =============================================================================
@ -69,14 +68,14 @@ class CreateDcaClientData(BaseModel):
machine_id: str
user_id: str
username: Optional[str] = None
username: str | None = None
class DcaClient(BaseModel):
id: str
machine_id: str
user_id: str
username: Optional[str]
username: str | None
status: str
created_at: datetime
updated_at: datetime
@ -92,8 +91,8 @@ class UpdateDcaClientData(BaseModel):
/ mode / autoforward changes go through satmachineclient against
`dca_lp` instead."""
username: Optional[str] = None
status: Optional[str] = None
username: str | None = None
status: str | None = None
class DcaLpPreferences(BaseModel):
@ -109,8 +108,8 @@ class DcaLpPreferences(BaseModel):
user_id: str
dca_wallet_id: str
default_dca_mode: str # 'flow' | 'fixed'
fixed_mode_daily_limit: Optional[float]
autoforward_ln_address: Optional[str]
fixed_mode_daily_limit: float | None
autoforward_ln_address: str | None
autoforward_enabled: bool
created_at: datetime
updated_at: datetime
@ -121,11 +120,11 @@ class UpsertDcaLpData(BaseModel):
edits their preferences. All fields optional on update pass only
the ones being changed."""
dca_wallet_id: Optional[str] = None
default_dca_mode: Optional[str] = None
fixed_mode_daily_limit: Optional[float] = None
autoforward_ln_address: Optional[str] = None
autoforward_enabled: Optional[bool] = None
dca_wallet_id: str | None = None
default_dca_mode: str | None = None
fixed_mode_daily_limit: float | None = None
autoforward_ln_address: str | None = None
autoforward_enabled: bool | None = None
class ClientBalanceSummary(BaseModel):
@ -156,7 +155,7 @@ class CreateDepositData(BaseModel):
client_id: str
machine_id: str
amount: float
notes: Optional[str] = None
notes: str | None = None
@validator("amount")
def round_amount(cls, v):
@ -173,9 +172,9 @@ class DcaDeposit(BaseModel):
amount: float
currency: str
status: str # 'pending' | 'confirmed' | 'rejected'
notes: Optional[str]
notes: str | None
created_at: datetime
confirmed_at: Optional[datetime]
confirmed_at: datetime | None
class UpdateDepositData(BaseModel):
@ -183,8 +182,8 @@ class UpdateDepositData(BaseModel):
`CreateDepositData`; the currency is bound to the machine and not
editable after the row lands."""
amount: Optional[float] = None
notes: Optional[str] = None
amount: float | None = None
notes: str | None = None
@validator("amount")
def round_amount(cls, v):
@ -195,7 +194,7 @@ class UpdateDepositData(BaseModel):
class UpdateDepositStatusData(BaseModel):
status: str # 'pending' | 'confirmed' | 'rejected'
notes: Optional[str] = None
notes: str | None = None
# =============================================================================
@ -210,8 +209,8 @@ class UpdateDepositStatusData(BaseModel):
class CreateDcaSettlementData(BaseModel):
machine_id: str
payment_hash: str # the idempotency key (UNIQUE in the dca_settlements table)
bitspire_event_id: Optional[str] = None # reserved for direct-Nostr ingestion
bitspire_txid: Optional[str] = None
bitspire_event_id: str | None = None # reserved for direct-Nostr ingestion
bitspire_txid: str | None = None
wire_sats: int
fiat_amount: float
fiat_code: str = "GTQ"
@ -221,16 +220,16 @@ class CreateDcaSettlementData(BaseModel):
platform_fee_sats: int
operator_fee_sats: int
tx_type: str # 'cash_out' | 'cash_in'
bills_json: Optional[str] = None
cassettes_json: Optional[str] = None
bills_json: str | None = None
cassettes_json: str | None = None
class DcaSettlement(BaseModel):
id: str
machine_id: str
payment_hash: str
bitspire_event_id: Optional[str]
bitspire_txid: Optional[str]
bitspire_event_id: str | None
bitspire_txid: str | None
wire_sats: int
fiat_amount: float
fiat_code: str
@ -240,8 +239,8 @@ class DcaSettlement(BaseModel):
platform_fee_sats: int
operator_fee_sats: int
tx_type: str
bills_json: Optional[str]
cassettes_json: Optional[str]
bills_json: str | None
cassettes_json: str | None
# 'pending' (default at insert)
# 'processing' (claim taken by distribution processor)
# 'processed' (all legs paid)
@ -252,19 +251,19 @@ class DcaSettlement(BaseModel):
# never went near distribution. error_message holds the
# reason. Retry is wrong — investigate the machine.)
status: str
error_message: Optional[str]
processed_at: Optional[datetime]
error_message: str | None
processed_at: datetime | None
created_at: datetime
# Append-only audit memo. Populated when an operator triggers an in-place
# adjustment (partial-dispense, manual reconciliation override). Each
# entry timestamped + records original values so the overwrite is
# auditable from the settlement detail view alone. Never edited in place.
notes: Optional[str] = None
notes: str | None = None
# Optimistic-lock claim token written when status flips to 'processing'.
# Two concurrent process_settlement invocations can't both win the claim
# (only one matching read-back). Cleared back to NULL when the leg-
# writing pass completes (status='processed' or 'errored').
processing_claim: Optional[str] = None
processing_claim: str | None = None
# =============================================================================
@ -286,7 +285,7 @@ class CommissionSplitLeg(BaseModel):
"""
target: str
label: Optional[str] = None
label: str | None = None
fraction: float
sort_order: int = 0
@ -306,10 +305,10 @@ class CommissionSplitLeg(BaseModel):
class CommissionSplit(BaseModel):
id: str
machine_id: Optional[str] # None = operator's default ruleset
machine_id: str | None # None = operator's default ruleset
operator_user_id: str
target: str
label: Optional[str]
label: str | None
fraction: float
sort_order: int
created_at: datetime
@ -322,7 +321,7 @@ class SetCommissionSplitsData(BaseModel):
machine without an explicit override). Otherwise scoped per machine.
"""
machine_id: Optional[str] = None
machine_id: str | None = None
legs: list[CommissionSplitLeg]
@validator("legs")
@ -339,35 +338,35 @@ class SetCommissionSplitsData(BaseModel):
class CreateDcaPaymentData(BaseModel):
settlement_id: Optional[str] = None
client_id: Optional[str] = None
settlement_id: str | None = None
client_id: str | None = None
machine_id: str
operator_user_id: str
leg_type: str
# 'dca' | 'super_fee' | 'operator_split' | 'settlement' | 'autoforward' | 'refund'
destination_wallet_id: Optional[str] = None
destination_ln_address: Optional[str] = None
destination_wallet_id: str | None = None
destination_ln_address: str | None = None
amount_sats: int
amount_fiat: Optional[float] = None
exchange_rate: Optional[float] = None
amount_fiat: float | None = None
exchange_rate: float | None = None
transaction_time: datetime
external_payment_hash: Optional[str] = None
external_payment_hash: str | None = None
class DcaPayment(BaseModel):
id: str
settlement_id: Optional[str]
client_id: Optional[str]
settlement_id: str | None
client_id: str | None
machine_id: str
operator_user_id: str
leg_type: str
destination_wallet_id: Optional[str]
destination_ln_address: Optional[str]
destination_wallet_id: str | None
destination_ln_address: str | None
amount_sats: int
amount_fiat: Optional[float]
exchange_rate: Optional[float]
amount_fiat: float | None
exchange_rate: float | None
transaction_time: datetime
external_payment_hash: Optional[str]
external_payment_hash: str | None
status: str
# Leg status enum:
# 'pending' — row written, payment not yet attempted
@ -378,7 +377,7 @@ class DcaPayment(BaseModel):
# 'skipped' — intentionally not paid (no super wallet configured,
# no commission ruleset, no exchange rate, no LPs)
# 'refunded' — reserved for future refund flows
error_message: Optional[str]
error_message: str | None
created_at: datetime
@ -391,22 +390,22 @@ class TelemetrySnapshot(BaseModel):
machine_id: str
# Beacon (kind-30078) — all fields are nullable because the upstream payload
# is sparse today. As lamassu-next#43 lands, the post-#43 columns fill in.
beacon_cash_in: Optional[bool] = None
beacon_cash_out: Optional[bool] = None
beacon_cash_level: Optional[str] = None
beacon_fiat: Optional[str] = None
beacon_model: Optional[str] = None
beacon_name: Optional[str] = None
beacon_location: Optional[str] = None
beacon_geo: Optional[str] = None
beacon_fees_json: Optional[str] = None
beacon_limits_json: Optional[str] = None
beacon_denominations_json: Optional[str] = None
beacon_version: Optional[str] = None
beacon_received_at: Optional[datetime] = None
beacon_cash_in: bool | None = None
beacon_cash_out: bool | None = None
beacon_cash_level: str | None = None
beacon_fiat: str | None = None
beacon_model: str | None = None
beacon_name: str | None = None
beacon_location: str | None = None
beacon_geo: str | None = None
beacon_fees_json: str | None = None
beacon_limits_json: str | None = None
beacon_denominations_json: str | None = None
beacon_version: str | None = None
beacon_received_at: datetime | None = None
# Fleet telemetry (kind-30079) — operator-only, awaits lamassu-next#42.
telemetry_json: Optional[str] = None
telemetry_received_at: Optional[datetime] = None
telemetry_json: str | None = None
telemetry_received_at: datetime | None = None
# =============================================================================
@ -417,13 +416,13 @@ class TelemetrySnapshot(BaseModel):
class SuperConfig(BaseModel):
id: str
super_fee_fraction: float
super_fee_wallet_id: Optional[str]
super_fee_wallet_id: str | None
updated_at: datetime
class UpdateSuperConfigData(BaseModel):
super_fee_fraction: Optional[float] = None
super_fee_wallet_id: Optional[str] = None
super_fee_fraction: float | None = None
super_fee_wallet_id: str | None = None
@validator("super_fee_fraction")
def fee_in_unit_range(cls, v):
@ -448,9 +447,9 @@ class PartialDispenseData(BaseModel):
"""
settlement_id: str
dispensed_fraction: Optional[float] = None
dispensed_sats: Optional[int] = None
notes: Optional[str] = None
dispensed_fraction: float | None = None
dispensed_sats: int | None = None
notes: str | None = None
@validator("dispensed_fraction")
def fraction_in_unit_range(cls, v):
@ -530,8 +529,8 @@ class SettleBalanceData(BaseModel):
# there's no ambiguity about what rate was used.
exchange_rate: float
# If None, settle the LP's full remaining balance. Else partial.
amount_fiat: Optional[float] = None
notes: Optional[str] = None
amount_fiat: float | None = None
notes: str | None = None
@validator("exchange_rate")
def positive_rate(cls, v):
@ -546,3 +545,140 @@ class SettleBalanceData(BaseModel):
if v <= 0:
raise ValueError("amount_fiat must be > 0 if specified")
return round(float(v), 2)
# =============================================================================
# Cassette configs — operator-driven ATM cassette inventory (#29 v1.1).
# =============================================================================
# Schema is position-keyed per the coordinated v1.1 redesign at coord-log
# 2026-05-30T18:30Z + 18:45Z. The earlier denomination-keyed shape (m007)
# was wrong: real machines have N cassettes of the same denomination for
# cash-out throughput, and operators need to swap cartridge denominations
# during refill ($20 bay becomes a $50 bay) without re-provisioning.
#
# Wire shape:
# {"positions": {"<position_str>": {"denomination": N, "count": M}}}
#
# Editable surface per row:
# - denomination: yes (operator swaps cartridges during refill)
# - count: yes (refill / decrement)
# Read-only per row:
# - position: hardware bay number; the slot count is fixed by the
# dispenser model (e.g., Tejo has 4 positions).
#
# No "denomination must be unique within payload" constraint: multiple
# same-denom cassettes are operationally valid. The ATM HAL distributes
# a dispense request greedy across all positions matching the requested
# denomination (lamassu-next#56 v1.1 HAL refactor).
#
# state_* columns are reserved nullable for the v2 reverse-channel
# reconciliation consumer (bitspire-cassettes-state:<atm_pubkey_hex>).
# v1 populates them on bootstrap-event receipt but the UI doesn't render
# reconciliation. state_denomination (added in m008) lets v2 highlight
# operator-believed-vs-ATM-reported denomination drift per slot.
class CassetteConfig(BaseModel):
machine_id: str
position: int
denomination: int
count: int
updated_at: datetime
updated_by: str | None
state_denomination: int | None
state_count: int | None
state_at: datetime | None
state_event_id: str | None
class UpsertCassetteConfigData(BaseModel):
"""Operator edits a single cassette row's denomination or count from
the dashboard. Both fields optional; pass only those changed.
Position is not edited it's the row's identity (hardware bay)."""
denomination: int | None = None
count: int | None = None
@validator("denomination")
def denomination_positive(cls, v):
if v is None:
return v
if v <= 0:
raise ValueError("denomination must be > 0")
return v
@validator("count")
def count_non_negative(cls, v):
if v is None:
return v
if v < 0:
raise ValueError("count must be >= 0")
return v
class CassettePayloadRow(BaseModel):
"""One position's payload values in the wire-format
`{"positions": {"<pos>": {"denomination", "count"}}}`."""
denomination: int
count: int
@validator("denomination")
def denomination_positive(cls, v):
if v <= 0:
raise ValueError("denomination must be > 0")
return v
@validator("count")
def count_non_negative(cls, v):
if v < 0:
raise ValueError("count must be >= 0")
return v
class PublishCassettesPayload(BaseModel):
"""The decrypted JSON content of a kind-30078 cassette event, both
directions:
- operator ATM (d-tag `bitspire-cassettes:<atm_pubkey_hex>`)
- ATM operator (d-tag `bitspire-cassettes-state:<atm_pubkey_hex>`)
Wire shape: `{"positions": {"<pos_str>": {"denomination", "count"}}}`.
JSON object keys are always strings; the validator coerces back to
int on parse. The position key set MUST match what the receiver
already has (slot count is hardware-fixed; no add/remove from this
payload).
No denomination-unique constraint: multiple same-denom cassettes are
operationally valid (cash-out throughput on a popular denom).
"""
positions: dict[int, CassettePayloadRow]
@validator("positions", pre=True)
def coerce_string_keys_to_int(cls, v):
if not isinstance(v, dict):
raise ValueError("positions must be a dict")
out = {}
for k, val in v.items():
try:
key_int = int(k)
except (TypeError, ValueError) as exc:
raise ValueError(f"position key {k!r} is not an int") from exc
if key_int <= 0:
raise ValueError(f"position must be > 0 (got {key_int})")
out[key_int] = val
return out
def to_wire_dict(self) -> dict:
"""Serialise back to the wire format with string keys for JSON
object compatibility. Used by the publisher to build the kind-30078
event content before NIP-44 v2 encryption."""
return {
"positions": {
str(pos): {
"denomination": row.denomination,
"count": row.count,
}
for pos, row in self.positions.items()
}
}

294
nip44.py Normal file
View file

@ -0,0 +1,294 @@
"""
NIP-44 v2 versioned encrypted payloads (https://github.com/nostr-protocol/nips/blob/master/44.md).
Hand-rolled because lnbits historically shipped only NIP-04 (AES-CBC) in
`lnbits.utils.nostr.encrypt_content`, and the locked design at
aiolabs/satmachineadmin#29 (paired with lamassu-next#56) wires cassette config
over kind-30078 with NIP-44 v2 encrypted content.
## Runtime status (post lnbits PR #38, 2026-05-31)
**Runtime usage has migrated to the signer abstraction** via
`signer.nip44_encrypt` / `signer.nip44_decrypt` on `lnbits.core.signers.base.
NostrSigner`. For RemoteBunkerSigner-backed accounts the bunker performs the
crypto and the operator's nsec never leaves the bunker process; for the
transitional LocalSigner path `cassette_transport._nip44_*_via_signer` falls
back to the helpers in this module against the stored `account.prvkey`.
This module's runtime export footprint is therefore:
- `encrypt_for` / `decrypt_from` called by the LocalSigner fallback in
`cassette_transport` until every operator on the instance is bunker-backed
(S7 / aiolabs/satmachineadmin#21). Then those calls disappear too.
- Everything else (encrypt_with_conversation_key, decrypt_with_conversation_key,
get_conversation_key, padding helpers, error classes) is **test-only**:
referenced by `tests/test_nip44_v2.py` to validate the wire format against
the canonical paulmillr/nip44 reference vectors and the bitspire cross-test
fixture posted to the coordination log.
Don't add new runtime call sites here. The signer abstraction is the path.
Two safety nets keep the impl honest:
1. tests/test_nip44_v2.py runs reference vectors + round-trip + tamper-detection.
2. bitspire posts a sample event encrypted on their nostr-tools side to the
coord log; test_decrypts_bitspire_sample_event cross-checks our impl
against theirs by decrypting that event with a known privkey.
Wire format (per spec):
payload = base64( 0x02 || nonce (32B) || ciphertext (var) || mac (32B) )
Key derivation:
conversation_key = HKDF-extract(salt=b"nip44-v2", IKM=ecdh_shared_x) # 32B PRK, stable per pair
per-message:
nonce = csprng(32 bytes)
temp = HKDF-expand(PRK=conversation_key, info=nonce, L=76)
chacha_key = temp[0:32]
chacha_nonce = temp[32:44]
hmac_key = temp[44:76]
Padding scheme (NIP-44 v2 length-prefixed, variable-chunk):
padded = uint16_be(len(plaintext)) || plaintext || zeros
such that 2 + padded_data_len matches a fixed step.
"""
from __future__ import annotations
import base64
import hashlib
import hmac as hmac_stdlib
import os
import struct
import coincurve
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
# Spec constants.
_VERSION = 0x02
_HKDF_SALT = b"nip44-v2"
_MIN_PLAINTEXT_LEN = 1
_MAX_PLAINTEXT_LEN = 65535
_NONCE_LEN = 32
_MAC_LEN = 32
_MIN_PAYLOAD_LEN = (
1 + _NONCE_LEN + (2 + 32) + _MAC_LEN
) # version + nonce + min padded + mac
_MAX_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 65536) + _MAC_LEN
class Nip44Error(Exception):
"""Generic NIP-44 v2 envelope error. Subclasses distinguish failure modes."""
class Nip44VersionError(Nip44Error):
"""First payload byte was not 0x02. Could be a NIP-04 envelope, a v1 NIP-44, or garbage."""
class Nip44MacError(Nip44Error):
"""HMAC verification failed — payload was tampered, wrong conversation key, or corrupted in transit."""
class Nip44LengthError(Nip44Error):
"""Plaintext or payload length outside the spec-allowed range, or padding header lies."""
# =============================================================================
# Padding (NIP-44 v2)
# =============================================================================
def _calc_padded_len(plaintext_len: int) -> int:
"""Per NIP-44 v2 padding scheme:
if L <= 32: padded_len = 32
else: chunk = max(32, next_power_2(L-1) // 8); padded_len = chunk * ((L-1) // chunk + 1)
"""
if plaintext_len <= 32:
return 32
next_power = 1 << (plaintext_len - 1).bit_length()
chunk = max(32, next_power // 8)
return chunk * ((plaintext_len - 1) // chunk + 1)
def _pad(plaintext: bytes) -> bytes:
"""Prefix uint16_be length + plaintext + zero-fill to the NIP-44 v2 boundary."""
n = len(plaintext)
if n < _MIN_PLAINTEXT_LEN or n > _MAX_PLAINTEXT_LEN:
raise Nip44LengthError(
f"plaintext length {n} outside [{_MIN_PLAINTEXT_LEN}, {_MAX_PLAINTEXT_LEN}]"
)
padded_data_len = _calc_padded_len(n)
zeros = b"\x00" * (padded_data_len - n)
return struct.pack(">H", n) + plaintext + zeros
def _unpad(padded: bytes) -> bytes:
"""Strip the uint16_be length prefix and zero padding. Validates that the
declared length is consistent with the padded payload (rejects a forged
length prefix that would slice past the buffer or imply a different
padded_data_len than what we received)."""
if len(padded) < 2:
raise Nip44LengthError("padded payload too short to hold length prefix")
declared_len = struct.unpack(">H", padded[0:2])[0]
if declared_len < _MIN_PLAINTEXT_LEN or declared_len > _MAX_PLAINTEXT_LEN:
raise Nip44LengthError(f"declared plaintext length {declared_len} out of range")
if len(padded) != 2 + _calc_padded_len(declared_len):
raise Nip44LengthError(
f"padded buffer length {len(padded)} doesn't match the calculated padding "
f"for declared length {declared_len}"
)
return padded[2 : 2 + declared_len]
# =============================================================================
# Conversation + message-key derivation
# =============================================================================
def get_conversation_key(privkey_hex: str, pubkey_hex: str) -> bytes:
"""Derive the per-pair stable conversation key (PRK) used for all messages
between sender (privkey) and recipient (pubkey).
Steps:
shared_x = ECDH(privkey, pubkey).x # 32 bytes, x-coordinate
prk = HKDF-extract(salt=b"nip44-v2", IKM=shared_x)
coincurve's `.multiply(secret).format(compressed=True)[1:]` strips the
leading 0x02/0x03 parity byte to return the raw x-coord same trick
`lnbits.utils.nostr.encrypt_content` uses for NIP-04.
"""
sender = coincurve.PrivateKey(bytes.fromhex(privkey_hex))
recipient_pub = coincurve.PublicKey(b"\x02" + bytes.fromhex(pubkey_hex))
shared_x = recipient_pub.multiply(sender.secret).format(compressed=True)[1:]
# HKDF-extract is HMAC-SHA256(key=salt, msg=ikm) per RFC 5869.
return hmac_stdlib.new(_HKDF_SALT, shared_x, hashlib.sha256).digest()
def _derive_message_keys(
conversation_key: bytes, nonce: bytes
) -> tuple[bytes, bytes, bytes]:
"""Per-message key expansion: HKDF-expand(PRK=conversation_key, info=nonce, L=76).
Returns (chacha_key 32B, chacha_nonce 12B, hmac_key 32B)."""
hkdf = HKDFExpand(algorithm=hashes.SHA256(), length=76, info=nonce)
okm = hkdf.derive(conversation_key)
return okm[0:32], okm[32:44], okm[44:76]
def _hmac_aad(hmac_key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
"""HMAC-SHA256(key=hmac_key, msg=nonce || ciphertext). Returns 32-byte MAC."""
h = hmac.HMAC(hmac_key, hashes.SHA256())
h.update(nonce)
h.update(ciphertext)
return h.finalize()
def _chacha20(key: bytes, nonce: bytes, data: bytes) -> bytes:
"""ChaCha20 stream cipher (symmetric: encrypt == decrypt). Used both directions.
The `cryptography` lib's `algorithms.ChaCha20(key, nonce)` expects a
16-byte nonce arg: a 4-byte little-endian initial counter prefix +
12-byte actual nonce. NIP-44 v2 starts the counter at 0 and uses the
HKDF-derived 12-byte chacha_nonce, so we prefix four zero bytes here.
"""
if len(nonce) != 12:
raise Nip44LengthError(
f"chacha_nonce must be 12 bytes (NIP-44 v2), got {len(nonce)}"
)
cipher = Cipher(algorithms.ChaCha20(key, b"\x00\x00\x00\x00" + nonce), mode=None)
return cipher.encryptor().update(data)
# =============================================================================
# Public API — low-level (nonce-controllable for testability)
# =============================================================================
def encrypt_with_conversation_key(
plaintext: str,
conversation_key: bytes,
*,
nonce: bytes | None = None,
) -> str:
"""Encrypt `plaintext` under a precomputed `conversation_key` (32B PRK).
`nonce` is 32 random bytes when omitted (the production path). Tests pass
it explicitly to assert pinned reference vectors.
Returns the base64-encoded payload string suitable as a Nostr event's
`content` field for kind-30078 (and any other kind that uses NIP-44 v2).
"""
if nonce is None:
nonce = os.urandom(_NONCE_LEN)
elif len(nonce) != _NONCE_LEN:
raise Nip44LengthError(f"nonce must be exactly {_NONCE_LEN} bytes")
padded = _pad(plaintext.encode("utf-8"))
chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce)
ciphertext = _chacha20(chacha_key, chacha_nonce, padded)
mac = _hmac_aad(hmac_key, nonce, ciphertext)
return base64.b64encode(bytes([_VERSION]) + nonce + ciphertext + mac).decode(
"ascii"
)
def decrypt_with_conversation_key(payload_b64: str, conversation_key: bytes) -> str:
"""Decrypt a NIP-44 v2 payload using a precomputed `conversation_key`.
Raises:
Nip44VersionError payload's first byte isn't 0x02
Nip44LengthError payload too short / too long / declared length lies
Nip44MacError HMAC verification failed (tamper, wrong key, corruption)
"""
try:
raw = base64.b64decode(payload_b64, validate=True)
except (
Exception
) as exc:
raise Nip44LengthError(f"payload is not valid base64: {exc}") from exc
if len(raw) < _MIN_PAYLOAD_LEN or len(raw) > _MAX_PAYLOAD_LEN:
raise Nip44LengthError(f"payload length {len(raw)} outside valid range")
if raw[0] != _VERSION:
raise Nip44VersionError(f"unsupported NIP-44 version: 0x{raw[0]:02x}")
nonce = raw[1 : 1 + _NONCE_LEN]
mac_received = raw[-_MAC_LEN:]
ciphertext = raw[1 + _NONCE_LEN : -_MAC_LEN]
chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce)
mac_expected = _hmac_aad(hmac_key, nonce, ciphertext)
# constant-time compare to avoid timing-leak in MAC verification
if not hmac_stdlib.compare_digest(mac_received, mac_expected):
raise Nip44MacError("HMAC verification failed")
padded = _chacha20(chacha_key, chacha_nonce, ciphertext)
plaintext_bytes = _unpad(padded)
return plaintext_bytes.decode("utf-8")
# =============================================================================
# Public API — high-level (pair-keyed, the call shape app code reaches for)
# =============================================================================
def encrypt_for(
plaintext: str,
sender_privkey_hex: str,
recipient_pubkey_hex: str,
*,
nonce: bytes | None = None,
) -> str:
"""Encrypt `plaintext` from the sender (holding the privkey) to the recipient
(identified by pubkey). The recipient can decrypt with `decrypt_from(
payload, recipient_privkey_hex, sender_pubkey_hex)` symmetric on the
conversation key, which is the same derived value from either side."""
conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex)
return encrypt_with_conversation_key(plaintext, conversation_key, nonce=nonce)
def decrypt_from(
payload_b64: str, recipient_privkey_hex: str, sender_pubkey_hex: str
) -> str:
"""Decrypt a payload that the recipient (holding the privkey) received from
the sender (identified by pubkey)."""
conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex)
return decrypt_with_conversation_key(payload_b64, conversation_key)

View file

@ -191,7 +191,30 @@ window.app = Vue.createApp({
show: false,
loading: false,
machine: null,
settlements: []
settlements: [],
// Cassettes sub-tab state (#29 v1) — see openCassettePublishConfirm /
// submitCassettePublish methods + the cassettes panel in
// templates/satmachineadmin/index.html.
activeTab: 'settlements',
cassetteEdits: [], // editable working copy of cassette_configs rows
cassettesPristine: [], // last-known-clean snapshot for revert
cassettesLoading: false,
cassettesPublishing: false,
cassettesDirty: false,
cassettesError: null
},
cassettesTable: {
columns: [
{name: 'position', label: 'Bay', field: 'position', align: 'right'},
{name: 'denomination', label: 'Denomination', field: 'denomination', align: 'right'},
{name: 'count', label: 'Count', field: 'count', align: 'right'},
{name: 'state', label: 'ATM-reported', field: 'state_denomination', align: 'right'},
{name: 'updated_at', label: 'Updated', field: 'updated_at', align: 'left'}
],
pagination: {rowsPerPage: 0} // hide pagination — cassette count is small
},
cassettePublishConfirm: {
show: false
},
partialDispenseDialog: {
show: false,
@ -741,6 +764,11 @@ window.app = Vue.createApp({
async viewMachine(machine) {
this.machineDetail.machine = machine
this.machineDetail.settlements = []
this.machineDetail.cassetteEdits = []
this.machineDetail.cassettesPristine = []
this.machineDetail.cassettesDirty = false
this.machineDetail.cassettesError = null
this.machineDetail.activeTab = 'settlements'
this.machineDetail.show = true
await this.reloadMachineDetail()
},
@ -759,6 +787,102 @@ window.app = Vue.createApp({
} finally {
this.machineDetail.loading = false
}
// Cassettes load in parallel; UI only renders them when the tab
// is active, but pre-loading means no flicker on tab switch.
await this.loadMachineCassettes()
},
// -----------------------------------------------------------------
// Cassette inventory (#29 v1)
// -----------------------------------------------------------------
async loadMachineCassettes() {
if (!this.machineDetail.machine) return
this.machineDetail.cassettesLoading = true
this.machineDetail.cassettesError = null
try {
const {data} = await LNbits.api.request(
'GET',
`${MACHINES_PATH}/${this.machineDetail.machine.id}/cassettes`
)
const rows = (data || []).map(row => ({...row, _dirty: false}))
this.machineDetail.cassetteEdits = rows
this.machineDetail.cassettesPristine = JSON.parse(JSON.stringify(rows))
this.machineDetail.cassettesDirty = false
} catch (e) {
this._notifyError(e, 'Failed to load cassettes')
} finally {
this.machineDetail.cassettesLoading = false
}
},
markCassetteDirty(row) {
// Find pristine match by position (the row identity) and compare;
// flip _dirty + overall dirty flag accordingly. Editable fields
// are denomination + count; position is the immutable row key.
const pristine = this.machineDetail.cassettesPristine.find(
p => p.position === row.position
)
row._dirty =
!pristine ||
Number(row.denomination) !== Number(pristine.denomination) ||
Number(row.count) !== Number(pristine.count)
this.machineDetail.cassettesDirty =
this.machineDetail.cassetteEdits.some(r => r._dirty)
},
revertCassetteEdits() {
this.machineDetail.cassetteEdits = JSON.parse(
JSON.stringify(this.machineDetail.cassettesPristine)
)
this.machineDetail.cassettesDirty = false
this.machineDetail.cassettesError = null
},
openCassettePublishConfirm() {
if (!this.machineDetail.cassettesDirty) return
this.machineDetail.cassettesError = null
this.cassettePublishConfirm.show = true
},
async submitCassettePublish() {
// Build the PublishCassettesPayload shape (v1.1, position-keyed):
// { positions: { "<pos>": { denomination, count }, ... } }
// The API enforces the position set matches what's stored —
// since we only edit existing rows, this should always pass.
const positions = {}
for (const row of this.machineDetail.cassetteEdits) {
positions[String(row.position)] = {
denomination: Number(row.denomination),
count: Number(row.count)
}
}
const payload = {positions}
this.machineDetail.cassettesPublishing = true
try {
const {data} = await LNbits.api.request(
'POST',
`${MACHINES_PATH}/${this.machineDetail.machine.id}/cassettes/publish`,
null,
payload
)
const fresh = (data || []).map(r => ({...r, _dirty: false}))
this.machineDetail.cassetteEdits = fresh
this.machineDetail.cassettesPristine = JSON.parse(JSON.stringify(fresh))
this.machineDetail.cassettesDirty = false
this.cassettePublishConfirm.show = false
Quasar.Notify.create({
type: 'positive',
message: 'Cassette config published to ATM'
})
} catch (e) {
const detail =
(e && e.response && e.response.data && e.response.data.detail) ||
'Publish failed'
this.machineDetail.cassettesError = detail
this._notifyError(e, 'Publish failed')
} finally {
this.machineDetail.cassettesPublishing = false
}
},
settlementStatusColor(status) {

263
tasks.py
View file

@ -125,9 +125,7 @@ async def _handle_payment(payment: Payment) -> None:
# stamp is missing, SettlementInvariantError on any range/sum
# breach.
super_config = await get_super_config()
super_fee_fraction = (
float(super_config.super_fee_fraction) if super_config else 0.0
)
super_fee_fraction = float(super_config.super_fee_fraction) if super_config else 0.0
try:
data = parse_settlement(
machine=machine,
@ -194,9 +192,7 @@ async def _handle_payment(payment: Payment) -> None:
task.add_done_callback(_inflight_distributions.discard)
async def _record_rejected(
payment: Payment, machine: Machine, exc: Exception
) -> None:
async def _record_rejected(payment: Payment, machine: Machine, exc: Exception) -> None:
"""Insert a minimal `dca_settlements` row with `status='rejected'` and
the exception message for operator forensics.
@ -237,3 +233,258 @@ async def _record_rejected(
f"(machine={machine.machine_npub[:12]}..., "
f"payment_hash={payment.payment_hash[:12]}...): {exc}"
)
# =============================================================================
# Cassette bootstrap consumer (#29 v1)
# =============================================================================
# Subscribes to kind-30078 bitspire-cassettes-state:<atm_pubkey_hex> events
# published by each active machine's ATM on first boot (lamassu-next#56's
# bootstrap publish path). Decrypts the NIP-44 v2 content with the operator's
# privkey + ATM sender pubkey, validates as PublishCassettesPayload, and
# upserts cassette_configs via apply_bootstrap_state.
#
# v1 = one-shot per machine (ATM's meta.bootstrapPublishedAt makes the
# publish idempotent on ATM-side restart; satmachineadmin's apply_bootstrap_
# state dedups on state_event_id for relay re-delivery).
#
# v2 (separate issue) = continuous reverse-channel consumer with a
# last_state_created_at watermark for reconciliation UI.
#
# Implementation: polls nostrclient.router.NostrRouter.received_subscription_
# events keyed by our subscription_id. nostrclient's NostrRouter design is
# per-WebSocket-client; the singleton dict it drains into is the only
# server-side hook to consume events without standing up an in-process
# websocket. The relay manager is the same singleton publish_to_atm uses,
# so add_subscription registers a filter against the same relay pool.
CASSETTE_BOOTSTRAP_SUB_ID = "satmachineadmin-cassette-bootstrap"
_CASSETTE_POLL_INTERVAL_S = 2.0
_CASSETTE_BACKOFF_S = 30.0 # when nostrclient isn't installed yet
async def wait_for_cassette_state_events() -> None:
"""Long-running task: subscribe to bitspire-cassettes-state events from
every active machine's ATM and upsert cassette_configs on receipt.
Pattern mirrors wait_for_paid_invoices (try/except wraps each event,
never lets the loop die). Re-derives the subscription filter on each
tick from the current active-machines list newly-added machines
start receiving bootstrap events without an LNbits restart.
Soft-fail surfaces:
- nostrclient not installed log + sleep _CASSETTE_BACKOFF_S
between retries (operator may install it later)
- inbound event fails sig-verify / decrypt / parse log + skip
the event, continue the loop
- apply_bootstrap_state errors log + skip
"""
logger.info(
"satmachineadmin v2: cassette bootstrap consumer starting "
f"(sub_id={CASSETTE_BOOTSTRAP_SUB_ID})"
)
current_filter_key: str | None = None
while True:
try:
current_filter_key = await _cassette_consumer_tick(current_filter_key)
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
except _NostrclientUnavailable:
logger.warning(
"satmachineadmin: nostrclient extension not installed; "
f"cassette bootstrap consumer sleeping {_CASSETTE_BACKOFF_S}s "
"before retry. Install + activate nostrclient on this "
"LNbits instance."
)
current_filter_key = None
await asyncio.sleep(_CASSETTE_BACKOFF_S)
except Exception as exc: # listener must never die
logger.error(
f"satmachineadmin: cassette consumer loop error (continuing): " f"{exc}"
)
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
class _NostrclientUnavailable(Exception):
"""Internal sentinel — nostrclient extension import failed. Caller
sleeps a backoff then retries; the operator may install nostrclient
at any time."""
async def _cassette_consumer_tick(current_filter_key: str | None) -> str:
"""Single iteration of the bootstrap-consumer loop. Returns the filter
key used this tick so the caller can detect filter-set changes.
Raises _NostrclientUnavailable if nostrclient can't be imported (the
outer loop backs off + retries).
"""
try:
from nostrclient.router import ( # type: ignore[import-not-found]
NostrRouter,
nostr_client,
)
except ImportError as exc:
raise _NostrclientUnavailable() from exc
from .cassette_transport import build_state_d_tags_for_machines
from .crud import (
apply_bootstrap_state,
get_machine_by_atm_pubkey_hex,
list_all_active_machines,
)
machines = await list_all_active_machines()
d_tags = build_state_d_tags_for_machines(machines)
filter_key = ",".join(sorted(d_tags))
if filter_key != current_filter_key:
if d_tags:
filters = [{"kinds": [30078], "#d": d_tags}]
# nostrclient's add_subscription is typed as list[str] but the
# actual relay protocol accepts list[Filter-dict] — type ignore
# the upstream typing mismatch.
nostr_client.relay_manager.add_subscription(
CASSETTE_BOOTSTRAP_SUB_ID, filters # type: ignore[arg-type]
)
logger.info(
"satmachineadmin: (re)registered cassette bootstrap "
f"subscription with {len(d_tags)} d-tag(s)"
)
else:
nostr_client.relay_manager.close_subscription(CASSETTE_BOOTSTRAP_SUB_ID)
logger.info(
"satmachineadmin: no active machines; closed cassette "
"bootstrap subscription"
)
inbound = NostrRouter.received_subscription_events.get(CASSETTE_BOOTSTRAP_SUB_ID)
if inbound:
while inbound:
event_message = inbound.pop(0)
try:
await _handle_cassette_state_event(
event_message,
get_machine_by_atm_pubkey_hex,
apply_bootstrap_state,
)
except Exception as exc:
logger.warning(
f"satmachineadmin: cassette state event handler "
f"failed (skipping): {exc}"
)
return filter_key
async def _handle_cassette_state_event(
event_message,
get_machine_by_atm_pubkey_hex,
apply_bootstrap_state,
) -> None:
"""Verify signature, resolve the operator's signer, decrypt via the
signer abstraction (bunker round-trip for RemoteBunkerSigner; direct
prvkey on the LocalSigner transitional fallback inside the transport
helper), parse, upsert.
Each step logs at WARNING (not ERROR) so a noisy attacker can't fill
the logs this is data on a public relay, garbage is expected.
Two skip outcomes:
- Terminal (CassetteEventDecodeError / SignerUnavailable /
OperatorIdentityMissing / etc.): log + return. `apply_bootstrap_
state` is never called `state_event_id` is not advanced
same event would re-process on next poll cycle but the consumer's
WARN log surfaces the underlying issue immediately.
- Transient (CassetteEventTransientError): log at INFO (less noisy)
+ return. Same retry-via-no-advance semantics, just less
alarming in the operator log feed.
"""
import json as _json
from datetime import datetime as _datetime
from datetime import timezone as _timezone
from lnbits.utils.nostr import verify_event
from .cassette_transport import (
CassetteEventDecodeError,
CassetteEventTransientError,
CassetteTransportError,
_resolve_operator_signer,
decrypt_and_parse_state_event,
)
event_raw = event_message.event
if isinstance(event_raw, str):
event_obj = _json.loads(event_raw)
elif isinstance(event_raw, dict):
event_obj = event_raw
else:
logger.warning(
f"satmachineadmin: cassette event of unexpected type "
f"{type(event_raw).__name__}; skipping"
)
return
if not verify_event(event_obj):
logger.warning(
f"satmachineadmin: cassette state event sig verify failed "
f"(id={event_obj.get('id', '?')[:12]}...)"
)
return
sender_pubkey = event_obj.get("pubkey", "")
machine = await get_machine_by_atm_pubkey_hex(sender_pubkey)
if machine is None:
# Unknown sender — could be relay noise or an attacker. Don't
# treat as our problem.
logger.warning(
f"satmachineadmin: cassette state event from unknown ATM "
f"pubkey {sender_pubkey[:12]}... (not in dca_machines); "
"skipping"
)
return
try:
account, signer = await _resolve_operator_signer(machine.operator_user_id)
except CassetteTransportError as exc:
# OperatorIdentityMissing / SignerUnavailable — log + skip.
logger.warning(
f"satmachineadmin: can't resolve signer for operator "
f"{machine.operator_user_id[:8]}... (machine {machine.id}): "
f"{exc}"
)
return
try:
payload = await decrypt_and_parse_state_event(event_obj, account, signer)
except CassetteEventTransientError as exc:
logger.info(
f"satmachineadmin: cassette state event for machine {machine.id} "
f"hit a transient signer error (will retry next poll): {exc}"
)
return
except CassetteEventDecodeError as exc:
logger.warning(
f"satmachineadmin: cassette state event decode failed for "
f"machine {machine.id} (id={event_obj.get('id', '?')[:12]}...): "
f"{exc}"
)
return
event_id = event_obj.get("id", "")
created_at_unix = event_obj.get("created_at", 0)
event_created_at = _datetime.fromtimestamp(int(created_at_unix), tz=_timezone.utc)
applied = await apply_bootstrap_state(
machine.id, event_id, event_created_at, payload
)
if applied:
logger.info(
f"satmachineadmin: applied bootstrap state event {event_id[:12]}... "
f"to machine {machine.id} ({len(payload.positions)} cassettes)"
)
else:
# Replay: event_id already on file. Normal on relay reconnect.
logger.debug(
f"satmachineadmin: cassette state event {event_id[:12]}... "
f"already applied to machine {machine.id} (replay no-op)"
)

View file

@ -818,7 +818,7 @@
<q-btn flat dense round icon="refresh"
@click="reloadMachineDetail"
:loading="machineDetail.loading">
<q-tooltip>Reload settlements</q-tooltip>
<q-tooltip>Reload</q-tooltip>
</q-btn>
<q-btn flat dense round icon="close" v-close-popup>
<q-tooltip>Close</q-tooltip>
@ -845,7 +845,21 @@
<q-separator class="q-mb-md"></q-separator>
<div class="row items-center q-mb-sm">
<q-tabs v-model="machineDetail.activeTab" dense
align="left" class="text-grey-7"
active-color="primary" indicator-color="primary"
narrow-indicator>
<q-tab name="settlements" icon="receipt_long"
label="Settlements"></q-tab>
<q-tab name="cassettes" icon="precision_manufacturing"
label="Cassettes"></q-tab>
</q-tabs>
<q-separator></q-separator>
<q-tab-panels v-model="machineDetail.activeTab" animated>
<q-tab-panel name="settlements" class="q-px-none">
<div class="row items-center q-mt-md q-mb-sm">
<div class="col">
<h6 class="q-my-none">Settlements</h6>
<p class="text-caption q-my-none" :style="{opacity: 0.7}">
@ -959,10 +973,158 @@
</q-tr>
</template>
</q-table>
</q-tab-panel>
<q-tab-panel name="cassettes" class="q-px-none">
<div class="row items-center q-mt-md q-mb-sm">
<div class="col">
<h6 class="q-my-none">Cassettes</h6>
<p class="text-caption q-my-none" :style="{opacity: 0.7}">
Per-cassette count and physical bay position. Denomination
set is hardware-determined (re-provision via atm-tui to
change). "Publish to ATM" encrypts + signs + sends the new
config to the machine via Nostr.
</p>
</div>
<div class="col-auto">
<q-btn flat dense icon="undo" label="Revert"
:disable="!machineDetail.cassettesDirty"
@click="revertCassetteEdits">
<q-tooltip>Discard unsaved edits</q-tooltip>
</q-btn>
<q-btn color="primary" icon="cloud_upload"
label="Publish to ATM"
:disable="!machineDetail.cassettesDirty"
:loading="machineDetail.cassettesPublishing"
@click="openCassettePublishConfirm"></q-btn>
</div>
</div>
<q-banner v-if="machineDetail.cassettesError"
class="bg-red-1 text-grey-9 q-mb-md">
<template v-slot:avatar>
<q-icon name="warning" color="negative"></q-icon>
</template>
<span v-text="machineDetail.cassettesError"></span>
</q-banner>
<q-banner v-if="!machineDetail.cassetteEdits.length
&& !machineDetail.cassettesLoading"
class="bg-blue-1 text-grey-9">
<template v-slot:avatar>
<q-icon name="hourglass_empty" color="blue"></q-icon>
</template>
Waiting for the ATM's bootstrap state event. Power on the ATM
and confirm it has reached the configured relay; cassette
rows will auto-populate on receipt.
</q-banner>
<q-table v-if="machineDetail.cassetteEdits.length"
dense flat
:rows="machineDetail.cassetteEdits"
row-key="position"
:columns="cassettesTable.columns"
:pagination="cassettesTable.pagination"
hide-pagination>
<template v-slot:body="props">
<q-tr :props="props"
:style="props.row._dirty
? {boxShadow: 'inset 4px 0 0 0 #fdd835'}
: {}">
<q-td key="position" class="text-right">
<b v-text="'Bay ' + props.row.position"></b>
</q-td>
<q-td key="denomination" class="text-right">
<q-input v-model.number="props.row.denomination"
type="number" min="1" step="1" dense outlined
:suffix="machineDetail.machine.fiat_code || ''"
:style="{width: '140px', display: 'inline-block'}"
@update:model-value="markCassetteDirty(props.row)"></q-input>
</q-td>
<q-td key="count" class="text-right">
<q-input v-model.number="props.row.count" type="number"
min="0" step="1" dense outlined
:style="{width: '120px', display: 'inline-block'}"
@update:model-value="markCassetteDirty(props.row)"></q-input>
</q-td>
<q-td key="state" class="text-right">
<span v-if="props.row.state_denomination !== null"
:style="{fontSize: '0.85em', opacity: 0.7}">
<span v-text="props.row.state_denomination"></span>
<span :style="{opacity: 0.6}"
v-text="' ' + (machineDetail.machine.fiat_code || '')"></span>
<span :style="{opacity: 0.6}"> · </span>
<span v-text="'×' + props.row.state_count"></span>
</span>
<span v-else :style="{opacity: 0.4}"></span>
</q-td>
<q-td key="updated_at">
<span :style="{fontSize: '0.85em', opacity: 0.7}"
v-text="formatTime(props.row.updated_at)"></span>
</q-td>
</q-tr>
</template>
</q-table>
</q-tab-panel>
</q-tab-panels>
</q-card-section>
</q-card>
</q-dialog>
<!-- =============================================================== -->
<!-- CASSETTE PUBLISH CONFIRM DIALOG -->
<!-- =============================================================== -->
<q-dialog v-model="cassettePublishConfirm.show" persistent>
<q-card :style="{minWidth: '480px', maxWidth: '95vw'}">
<q-card-section class="row items-center q-pb-none">
<div class="text-h6">Publish cassette config to ATM</div>
<q-space ></q-space>
<q-btn icon="close" flat round dense v-close-popup></q-btn>
</q-card-section>
<q-card-section>
<q-banner class="bg-orange-1 text-grey-9 q-mb-md">
<template v-slot:avatar>
<q-icon name="warning" color="warning"></q-icon>
</template>
<b>This publish will overwrite the ATM's currently-tracked
counts.</b> If the ATM has dispensed cash since your last
refill or count baseline, those decrements will be lost.
Publish only after a physical refill (a known total), not to
"tweak" counts mid-day. v2 reconciliation will replace this
modal with reconciled state display.
</q-banner>
<p class="q-mb-sm">Sending to ATM:</p>
<q-list dense bordered>
<q-item v-for="row in machineDetail.cassetteEdits"
:key="row.position">
<q-item-section>
<q-item-label>
<b v-text="'Bay ' + row.position"></b>
</q-item-label>
</q-item-section>
<q-item-section side>
<q-item-label caption>
<b v-text="row.denomination + ' ' +
(machineDetail.machine.fiat_code || '')"></b>
· count
<b v-text="row.count"></b>
</q-item-label>
</q-item-section>
</q-item>
</q-list>
</q-card-section>
<q-card-actions align="right">
<q-btn flat label="Cancel" v-close-popup></q-btn>
<q-btn color="primary"
label="Publish to ATM"
:loading="machineDetail.cassettesPublishing"
@click="submitCassettePublish"></q-btn>
</q-card-actions>
</q-card>
</q-dialog>
<!-- =============================================================== -->
<!-- PARTIAL-DISPENSE DIALOG -->
<!-- =============================================================== -->

View file

@ -0,0 +1,220 @@
"""
Tests for the v1.1 cassette-config layer (aiolabs/satmachineadmin#29).
Covers the pure pieces that don't need a live DB:
- Pydantic validator behaviour on PublishCassettesPayload + the row /
upsert models (position key coercion, integer ranges, multiple-same-
denomination payloads, wire-format round-trip)
- _should_apply_bootstrap_state dedup helper (extracted from
apply_bootstrap_state so the relay-re-delivery decision is testable
without a database round-trip)
DB-touching tests (apply_bootstrap_state actually upserting, list-by-
machine ordering, etc.) follow the project convention from
test_deposit_currency.py: "Layer 2 is an endpoint-level behaviour better
covered by an integration test against a running LNbits; tracked in #26
as a follow-up." Smoke-tested manually via the dev container.
Wire shape pivot from m007 m008 is the v1.1 coordination point per
coord-log 2026-05-30T18:30Z + 18:45Z: position is the row identity,
denomination + count are operator-editable per row, multiple same-denom
cassettes are valid.
"""
import pytest
from ..crud import _should_apply_bootstrap_state
from ..models import (
CassettePayloadRow,
PublishCassettesPayload,
UpsertCassetteConfigData,
)
# =============================================================================
# PublishCassettesPayload — wire-shape validators
# =============================================================================
class TestPublishCassettesPayload:
"""The kind-30078 content payload, bidirectional (operator→ATM and
ATMoperator share the shape). String JSON keys must coerce to int;
per-row int constraints enforced; multiple same-denom rows are valid."""
def test_happy_path_coerces_string_keys_to_int(self):
p = PublishCassettesPayload(
positions={
"1": {"denomination": 20, "count": 49},
"2": {"denomination": 50, "count": 100},
}
)
assert set(p.positions.keys()) == {1, 2}
assert p.positions[1].denomination == 20
assert p.positions[1].count == 49
assert p.positions[2].denomination == 50
assert p.positions[2].count == 100
def test_wire_dict_round_trip_restringifies_keys(self):
"""to_wire_dict() must restringify position keys so the resulting
JSON is parseable by clients (including the ATM-side nostr-tools
NIP-44 v2 consumer per the byte-compat cross-test)."""
original = PublishCassettesPayload(
positions={
"1": {"denomination": 20, "count": 49},
"2": {"denomination": 50, "count": 100},
}
)
wire = original.to_wire_dict()
assert wire == {
"positions": {
"1": {"denomination": 20, "count": 49},
"2": {"denomination": 50, "count": 100},
}
}
# And the wire form round-trips back through the parser cleanly.
reparsed = PublishCassettesPayload(**wire)
assert reparsed.positions == original.positions
def test_accepts_multiple_same_denomination_cassettes(self):
"""v1.1 operational case: real machines have N cassettes loaded
with the same denomination for cash-out throughput. The wire shape
must accept this, and we explicitly do NOT validate uniqueness on
denomination. Coord-log 2026-05-30T18:45Z bitspire response."""
p = PublishCassettesPayload(
positions={
"1": {"denomination": 20, "count": 100},
"2": {"denomination": 20, "count": 100},
"3": {"denomination": 50, "count": 50},
"4": {"denomination": 100, "count": 25},
}
)
assert len(p.positions) == 4
denoms = [row.denomination for row in p.positions.values()]
assert denoms.count(20) == 2 # two $20 cassettes
assert sorted(denoms) == [20, 20, 50, 100]
def test_rejects_non_int_position_key(self):
with pytest.raises(ValueError) as exc:
PublishCassettesPayload(positions={"abc": {"denomination": 20, "count": 1}})
assert "is not an int" in str(exc.value)
def test_rejects_non_positive_position(self):
with pytest.raises(ValueError) as exc:
PublishCassettesPayload(positions={"0": {"denomination": 20, "count": 1}})
assert "position must be > 0" in str(exc.value)
def test_rejects_negative_position(self):
with pytest.raises(ValueError) as exc:
PublishCassettesPayload(positions={"-1": {"denomination": 20, "count": 1}})
assert "position must be > 0" in str(exc.value)
def test_rejects_negative_count(self):
with pytest.raises(ValueError):
PublishCassettesPayload(positions={"1": {"denomination": 20, "count": -1}})
def test_rejects_zero_denomination(self):
with pytest.raises(ValueError):
PublishCassettesPayload(positions={"1": {"denomination": 0, "count": 49}})
def test_rejects_negative_denomination(self):
with pytest.raises(ValueError):
PublishCassettesPayload(positions={"1": {"denomination": -20, "count": 49}})
def test_allows_zero_count(self):
"""An empty cassette is a legal state — operator must be able to
record `count=0` after a dispatcher pulled the cassette mid-day."""
p = PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 0}})
assert p.positions[1].count == 0
# =============================================================================
# CassettePayloadRow — per-row int constraints
# =============================================================================
class TestCassettePayloadRow:
def test_happy_path(self):
row = CassettePayloadRow(denomination=20, count=49)
assert row.denomination == 20
assert row.count == 49
@pytest.mark.parametrize("bad_denom", [0, -1, -100])
def test_rejects_non_positive_denomination(self, bad_denom):
with pytest.raises(ValueError):
CassettePayloadRow(denomination=bad_denom, count=1)
def test_rejects_negative_count(self):
with pytest.raises(ValueError):
CassettePayloadRow(denomination=20, count=-1)
# =============================================================================
# UpsertCassetteConfigData — operator-edit form
# =============================================================================
class TestUpsertCassetteConfigData:
"""Operator-driven row edit. Both fields optional; same int constraints
as the wire-format row but applied independently per-edit. Position is
NOT editable it's the row's identity (the hardware bay number)."""
def test_partial_update_count_only(self):
d = UpsertCassetteConfigData(count=80)
assert d.count == 80
assert d.denomination is None
def test_partial_update_denomination_only(self):
"""v1.1 operational case: operator records a cartridge swap at
refill slot 1 was $20, dispatcher replaced with $50."""
d = UpsertCassetteConfigData(denomination=50)
assert d.denomination == 50
assert d.count is None
def test_empty_update_is_legal(self):
"""An empty UpsertCassetteConfigData parses fine; the CRUD short-
circuits a no-op on empty payload (no SQL emitted)."""
d = UpsertCassetteConfigData()
assert d.count is None
assert d.denomination is None
def test_rejects_negative_count(self):
with pytest.raises(ValueError):
UpsertCassetteConfigData(count=-1)
def test_rejects_non_positive_denomination(self):
with pytest.raises(ValueError):
UpsertCassetteConfigData(denomination=0)
# =============================================================================
# _should_apply_bootstrap_state — relay re-delivery dedup
# =============================================================================
class TestShouldApplyBootstrapState:
"""Pure-function dedup gate extracted from apply_bootstrap_state so the
decision is testable without a DB. Logic: apply if-and-only-if the
existing row's state_event_id differs from the incoming event_id.
In v1.1 the ATM publishes the bootstrap event exactly once per machine,
so this is sufficient for replay protection. v2 will need a
`last_state_created_at` watermark in addition (per bitspire's
`meta.lastKnownConfigCreatedAt` on the ATM side) flagged in #29's
v2 forward-look section.
"""
def test_applies_when_no_existing_row(self):
assert _should_apply_bootstrap_state(None, "new-event-id") is True
def test_applies_when_existing_event_id_differs(self):
assert _should_apply_bootstrap_state("old-event-id", "new-event-id") is True
def test_skips_when_existing_event_id_matches(self):
"""The same bootstrap event re-delivered after a relay reconnect
or satmachineadmin restart should no-op, not re-upsert the same
rows (which would clobber any operator edits since)."""
assert _should_apply_bootstrap_state("same-event", "same-event") is False
def test_applies_when_existing_is_empty_string_and_incoming_is_id(self):
"""Defensive — a sentinel empty-string existing_state_event_id
shouldn't block a real incoming event from applying."""
assert _should_apply_bootstrap_state("", "real-event-id") is True

View file

@ -0,0 +1,485 @@
"""
Tests for the cassette bootstrap consumer's transport-decrypt path
(`cassette_transport.decrypt_and_parse_state_event`) and d-tag construction.
Post-PR-#38 migration (2026-05-31): the function takes an Account +
NostrSigner instead of a raw privkey, and is async. Tests use:
- `_FakeBunkerSigner` implements async `nip44_decrypt/encrypt` against
the hand-rolled `nip44` impl so tests don't need a live bunker.
Exercises the "happy" RemoteBunkerSigner path.
- `_FakeLocalSignerStub` raises `SignerUnavailableError` from
`nip44_decrypt`, mimicking the post-#38 `LocalSigner` stub. Combined
with an Account that has `signer_type="LocalSigner"` + `prvkey`,
exercises the transitional fallback path in
`_nip44_decrypt_via_signer`.
- `_FakeRaisingSigner` raises an arbitrary exception, used to
exercise the `NsecBunkerTimeoutError` `CassetteEventTransientError`
and `NsecBunkerRpcError` `CassetteEventDecodeError` mappings.
Coroutines are driven via `asyncio.run` so no pytest-asyncio config is
required. Matches the existing project test pattern (test_init.py
demonstrates the project lacks an asyncio plugin in CI; using asyncio.run
inside the test body sidesteps that without changing project config).
Full handler tests (the dispatch through verify_event
get_machine_by_atm_pubkey_hex apply_bootstrap_state) need a live LNbits
DB; smoke-tested manually via the dev container per the project
convention (see test_deposit_currency.py rationale).
"""
import asyncio
import json
from types import SimpleNamespace
import coincurve
import pytest
from lnbits.core.services.nip46_bunker_client import (
NsecBunkerRpcError,
NsecBunkerTimeoutError,
)
from lnbits.core.signers.base import SignerUnavailableError
from ..cassette_transport import (
CassetteEventDecodeError,
CassetteEventTransientError,
_atm_hex_pubkey,
_config_d_tag,
_state_d_tag,
build_state_d_tags_for_machines,
decrypt_and_parse_state_event,
)
from ..models import Machine, PublishCassettesPayload
from ..nip44 import (
decrypt_from as _nip44_decrypt,
)
from ..nip44 import (
encrypt_with_conversation_key,
get_conversation_key,
)
# Canonical keys (integer 1 + integer 2, the paulmillr/nip44 reference pair).
_OP_SEC = "00" * 31 + "01"
_ATM_SEC = "00" * 31 + "02"
def _pub_hex(sec_hex: str) -> str:
return (
coincurve.PrivateKey(bytes.fromhex(sec_hex))
.public_key.format(compressed=True)[1:]
.hex()
)
_OP_PUB = _pub_hex(_OP_SEC)
_ATM_PUB = _pub_hex(_ATM_SEC)
# =============================================================================
# Fake signers + account-shaped helper
# =============================================================================
class _FakeBunkerSigner:
"""Test double for RemoteBunkerSigner — implements async nip44_*
against the hand-rolled `nip44` impl. Used to exercise the
"signer.nip44_decrypt returns successfully" path without standing up
a live bunker process."""
def __init__(self, privkey_hex: str):
self._privkey_hex = privkey_hex
@property
def pubkey(self) -> str:
return _pub_hex(self._privkey_hex)
def can_sign(self) -> bool:
return True
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
ck = get_conversation_key(self._privkey_hex, peer_pubkey_hex)
return encrypt_with_conversation_key(plaintext, ck)
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
return _nip44_decrypt(ciphertext, self._privkey_hex, peer_pubkey_hex)
class _FakeLocalSignerStub:
"""Test double for the post-#38 LocalSigner stub — its nip44_* always
raises SignerUnavailableError. Combined with an Account that has
`signer_type='LocalSigner'` + `prvkey` populated, exercises the
transitional fallback in `_nip44_decrypt_via_signer` (which catches
the SignerUnavailableError and falls back to direct-prvkey via the
hand-rolled impl)."""
def can_sign(self) -> bool:
return True
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
raise SignerUnavailableError("LocalSigner does not implement nip44_encrypt")
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
raise SignerUnavailableError("LocalSigner does not implement nip44_decrypt")
class _FakeRaisingSigner:
"""Test double that raises a configurable exception on nip44_decrypt.
Used to validate the bunker-error-mapping branches in
decrypt_and_parse_state_event."""
def __init__(self, exc):
self._exc = exc
def can_sign(self) -> bool:
return True
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
raise self._exc
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
raise self._exc
def _fake_account(
signer_type: str = "RemoteBunkerSigner",
prvkey: str | None = None,
):
"""Account-shaped duck-typed object. decrypt_and_parse_state_event +
_nip44_decrypt_via_signer only read `.signer_type` and `.prvkey`; the
rest is irrelevant."""
return SimpleNamespace(
id="test-operator",
pubkey=_OP_PUB,
prvkey=prvkey,
signer_type=signer_type,
signer_config=None,
)
def _make_state_event(
payload: PublishCassettesPayload,
*,
atm_sec: str = _ATM_SEC,
op_pub: str = _OP_PUB,
atm_pub: str = _ATM_PUB,
event_id: str = "fake-event-id",
created_at: int = 1234567890,
) -> dict:
"""Build a state event the way bitspire's ATM publisher would. Skips
the sig-verify step (handler-level concern); the transport-decrypt
path doesn't depend on sig validity, only on conversation-key match."""
plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":"))
ck = get_conversation_key(atm_sec, op_pub)
content = encrypt_with_conversation_key(plaintext, ck)
return {
"kind": 30078,
"pubkey": atm_pub,
"content": content,
"tags": [
["d", f"bitspire-cassettes-state:{atm_pub}"],
["p", op_pub],
],
"created_at": created_at,
"id": event_id,
}
# =============================================================================
# decrypt_and_parse_state_event — RemoteBunkerSigner happy path
# =============================================================================
class TestDecryptViaBunkerSigner:
"""The expected production path post-#38: operator account is bunker-
backed, signer.nip44_decrypt routes through the bunker (mocked here
via _FakeBunkerSigner), and the wire payload round-trips cleanly."""
def test_happy_path_recovers_positions_keyed_payload(self):
payload = PublishCassettesPayload(
positions={
"1": {"denomination": 20, "count": 49},
"2": {"denomination": 50, "count": 100},
}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="RemoteBunkerSigner")
signer = _FakeBunkerSigner(_OP_SEC)
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
assert sorted(recovered.positions.keys()) == [1, 2]
assert recovered.positions[1].denomination == 20
assert recovered.positions[1].count == 49
assert recovered.positions[2].denomination == 50
assert recovered.positions[2].count == 100
def test_round_trips_multiple_same_denomination(self):
"""v1.1 operational case (coord-log 2026-05-30T18:45Z) — multiple
bays carrying the same denomination."""
payload = PublishCassettesPayload(
positions={
"1": {"denomination": 20, "count": 100},
"2": {"denomination": 20, "count": 100},
"3": {"denomination": 20, "count": 100},
"4": {"denomination": 20, "count": 100},
}
)
event = _make_state_event(payload)
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
assert len(recovered.positions) == 4
for pos in (1, 2, 3, 4):
assert recovered.positions[pos].denomination == 20
assert recovered.positions[pos].count == 100
# =============================================================================
# decrypt_and_parse_state_event — LocalSigner transitional fallback
# =============================================================================
class TestDecryptViaLocalSignerFallback:
"""When the operator account is still on LocalSigner (pre-bunker
migration), the LocalSigner stub raises SignerUnavailableError from
nip44_decrypt. `_nip44_decrypt_via_signer` catches that and falls
back to the hand-rolled impl using `account.prvkey`. Same wire
output; transitional until S7 retires LocalSigner accounts entirely."""
def test_localsigner_with_prvkey_decrypts_via_fallback(self):
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="LocalSigner", prvkey=_OP_SEC)
signer = _FakeLocalSignerStub()
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
assert recovered.positions[1].denomination == 20
assert recovered.positions[1].count == 49
def test_localsigner_without_prvkey_raises_decode_error(self):
"""A LocalSigner account whose prvkey field is None (impossible
in practice LocalSigner requires prvkey at provision time, but
defensive in case the row is corrupt) should surface as a
decode error, not silently succeed."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="LocalSigner", prvkey=None)
signer = _FakeLocalSignerStub()
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_clientonlysigner_raises_decode_error(self):
"""ClientSideOnlySigner has no server-side decrypt path at all;
falling back to direct-prvkey is also impossible (no prvkey).
Surface as a decode error so the consumer logs + skips."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="ClientSideOnlySigner", prvkey=None)
signer = _FakeLocalSignerStub() # behaves the same way: raises
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
# =============================================================================
# decrypt_and_parse_state_event — bunker error mapping
# =============================================================================
class TestBunkerErrorMapping:
"""The post-#38 error hierarchy splits transient (bunker partitioned)
from terminal (bunker policy reject, MAC failure). Consumer behaves
differently transient retries, terminal logs + skips. Validate the
mapping from NsecBunker* exceptions to our CassetteEvent* types."""
def test_timeout_maps_to_transient_error(self):
"""Bunker unreachable → NsecBunkerTimeoutError → caller-visible
CassetteEventTransientError. Consumer treats this as retry-
eligible (don't advance state_event_id)."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account()
signer = _FakeRaisingSigner(NsecBunkerTimeoutError("bunker unreachable"))
with pytest.raises(CassetteEventTransientError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_rpc_reject_maps_to_decode_error(self):
"""Bunker rejected the RPC (policy / MAC / config) →
NsecBunkerRpcError caller-visible CassetteEventDecodeError.
Terminal retrying won't help."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account()
signer = _FakeRaisingSigner(
NsecBunkerRpcError("bunker policy reject: kind 30078 not authorised")
)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
# =============================================================================
# decrypt_and_parse_state_event — payload + envelope validation
# =============================================================================
class TestPayloadValidation:
"""Errors that originate at the parse layer (post-decrypt), not the
signer. Same set as pre-migration covered through the bunker-signer
path since LocalSigner is going away."""
def test_tampered_content_rejected(self):
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
event["content"] = event["content"][:-2] + "AA"
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_wrong_signer_privkey_rejected(self):
"""Wrong privkey on the signer → wrong conversation key → MAC
verification fails inside nip44_decrypt surfaces as decode
error (via the hand-rolled Nip44Error since this is the fake
bunker signer; in production the bunker would raise
NsecBunkerRpcError which also maps to CassetteEventDecodeError)."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account()
wrong_sec = "00" * 31 + "03"
signer = _FakeBunkerSigner(wrong_sec)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_missing_content_rejected(self):
event = _make_state_event(
PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}})
)
del event["content"]
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_missing_pubkey_rejected(self):
event = _make_state_event(
PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}})
)
del event["pubkey"]
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_decrypted_garbage_json_rejected(self):
"""If plaintext decrypts cleanly but isn't valid JSON, surface
as decode error (not crash the consumer loop)."""
ck = get_conversation_key(_ATM_SEC, _OP_PUB)
event = {
"kind": 30078,
"pubkey": _ATM_PUB,
"content": encrypt_with_conversation_key("definitely not json", ck),
"tags": [],
"created_at": 0,
"id": "x",
}
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_decrypted_wrong_shape_rejected(self):
"""Well-formed JSON but missing 'positions' → payload-shape
validation failure."""
ck = get_conversation_key(_ATM_SEC, _OP_PUB)
event = {
"kind": 30078,
"pubkey": _ATM_PUB,
"content": encrypt_with_conversation_key('{"wrong_field": 42}', ck),
"tags": [],
"created_at": 0,
"id": "x",
}
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
# =============================================================================
# d-tag construction — unchanged by the migration, kept as regression guard
# =============================================================================
class TestDTagConstruction:
"""The `<m>` placeholder in d-tags = ATM hex pubkey (load-bearing per
coord-log 2026-05-30T11:50Z). These tests pin the canonical
substitution so a refactor can't silently break wire compatibility."""
def _machine(self, npub: str, id_: str = "m1") -> Machine:
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
return Machine(
id=id_,
operator_user_id="op1",
machine_npub=npub,
wallet_id="w1",
name=None,
location=None,
fiat_code="EUR",
is_active=True,
created_at=now,
updated_at=now,
)
def test_atm_hex_pubkey_from_hex_storage(self):
assert _atm_hex_pubkey(self._machine(_ATM_PUB)) == _ATM_PUB
def test_atm_hex_pubkey_lowercases_uppercase_hex(self):
assert _atm_hex_pubkey(self._machine(_ATM_PUB.upper())) == _ATM_PUB
def test_atm_hex_pubkey_canonicalises_bech32_to_hex(self):
from lnbits.utils.nostr import hex_to_npub
npub_bech32 = hex_to_npub(_ATM_PUB)
assert _atm_hex_pubkey(self._machine(npub_bech32)) == _ATM_PUB
def test_config_d_tag_uses_hex_pubkey_not_id(self):
"""REGRESSION GUARD: d-tag must contain the ATM hex pubkey, NOT
the internal machine UUID."""
m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey")
d_tag = _config_d_tag(_atm_hex_pubkey(m))
assert d_tag == f"bitspire-cassettes:{_ATM_PUB}"
assert "some-uuid" not in d_tag
def test_state_d_tag_uses_hex_pubkey_not_id(self):
m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey")
d_tag = _state_d_tag(_atm_hex_pubkey(m))
assert d_tag == f"bitspire-cassettes-state:{_ATM_PUB}"
assert "some-uuid" not in d_tag
def test_build_state_d_tags_for_machines(self):
atm2_pub = _pub_hex("00" * 31 + "03")
machines = [
self._machine(_ATM_PUB, id_="m1"),
self._machine(atm2_pub, id_="m2"),
]
tags = build_state_d_tags_for_machines(machines)
assert tags == [
f"bitspire-cassettes-state:{_ATM_PUB}",
f"bitspire-cassettes-state:{atm2_pub}",
]

390
tests/test_nip44_v2.py Normal file
View file

@ -0,0 +1,390 @@
"""
Tests for the hand-rolled NIP-44 v2 implementation in `nip44.py`.
Three layers of validation, ordered by trust:
1. Pinned reference vector from the canonical paulmillr/nip44 test suite
the conversation_key for (sec=1, sec=2) is widely-published as
c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d. If
our get_conversation_key() ever drifts from that value, the impl is
broken at the key-derivation layer.
2. Round-trip + tamper detection verifies the encrypt/decrypt loop
under random nonces, catches HMAC + version + padding tampering.
3. Cross-test (TBD) bitspire will post one sample event encrypted on
their nostr-tools side to the coord log; test_decrypts_bitspire_sample
wires it as a fixture and asserts byte-compatibility with the
nostr-tools NIP-44 v2 impl. Placeholder stub until the sample lands.
"""
import base64
import coincurve
import pytest
from ..nip44 import (
Nip44LengthError,
Nip44MacError,
Nip44VersionError,
_calc_padded_len,
decrypt_from,
decrypt_with_conversation_key,
encrypt_for,
encrypt_with_conversation_key,
get_conversation_key,
)
# Helper: derive a compressed-x-coord pubkey hex string from a secret hex.
def _pub_hex(sec_hex: str) -> str:
return (
coincurve.PrivateKey(bytes.fromhex(sec_hex))
.public_key.format(compressed=True)[1:]
.hex()
)
# Canonical test keys widely used across NIP-44 reference vectors.
_SEC_ONE = "00" * 31 + "01" # integer 1
_SEC_TWO = "00" * 31 + "02" # integer 2
_PUB_ONE = _pub_hex(_SEC_ONE)
_PUB_TWO = _pub_hex(_SEC_TWO)
# =============================================================================
# Layer 1 — pinned reference vector (paulmillr/nip44)
# =============================================================================
class TestConversationKeyReferenceVector:
"""Pinned reference vector from the canonical NIP-44 v2 test suite
(paulmillr/nip44). If get_conversation_key drifts from this value we
have a key-derivation regression fail loudly."""
REFERENCE_CK_HEX = (
"c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d"
)
def test_sec_one_pub_two(self):
ck = get_conversation_key(_SEC_ONE, _PUB_TWO)
assert ck.hex() == self.REFERENCE_CK_HEX
def test_sec_two_pub_one_is_symmetric(self):
"""Conversation key is symmetric: ck(privA, pubB) == ck(privB, pubA).
Both sides of a NIP-44 conversation derive the identical PRK; this
is what lets the recipient decrypt with their own privkey + the
sender's pubkey."""
ck_ab = get_conversation_key(_SEC_ONE, _PUB_TWO)
ck_ba = get_conversation_key(_SEC_TWO, _PUB_ONE)
assert ck_ab == ck_ba
# =============================================================================
# Layer 2 — round-trip + tamper detection
# =============================================================================
class TestRoundTrip:
"""Encrypt then decrypt under the high-level pair-keyed API."""
@pytest.mark.parametrize(
"plaintext",
[
"a", # 1 byte (minimum)
"hello, nip44 v2", # short
"x" * 32, # exactly the small-payload boundary
"x" * 33, # just over
"y" * 1000, # medium
"z" * 5000, # large
'{"denominations": {"20": {"position": 1, "count": 49}}}', # realistic
],
)
def test_round_trip_various_lengths(self, plaintext):
payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
recovered = decrypt_from(payload, _SEC_TWO, _PUB_ONE)
assert recovered == plaintext
def test_payloads_are_unique_under_random_nonce(self):
"""Same plaintext + same key pair should produce different payloads
each time because the nonce is fresh CSPRNG bytes. Catches a
regression where the nonce is accidentally pinned."""
plaintext = "the same message"
p1 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
p2 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
assert p1 != p2
assert decrypt_from(p1, _SEC_TWO, _PUB_ONE) == plaintext
assert decrypt_from(p2, _SEC_TWO, _PUB_ONE) == plaintext
def test_pinned_nonce_is_deterministic(self):
"""Same plaintext + same key pair + same nonce = byte-identical
payload. Regression-locks the chacha20 + hmac chain."""
ck = get_conversation_key(_SEC_ONE, _PUB_TWO)
nonce = bytes(32) # 32 zero bytes
p1 = encrypt_with_conversation_key("a", ck, nonce=nonce)
p2 = encrypt_with_conversation_key("a", ck, nonce=nonce)
assert p1 == p2
assert decrypt_with_conversation_key(p1, ck) == "a"
class TestTamperDetection:
"""HMAC-SHA256 verification catches tampered envelopes. The cryptographic
construction depends on this if HMAC verification ever no-ops, a
relay-MITM could forge ATM state events."""
def _payload(self) -> str:
return encrypt_for("important message", _SEC_ONE, _PUB_TWO)
def test_flipped_mac_byte_rejected(self):
raw = bytearray(base64.b64decode(self._payload()))
raw[-1] ^= 0x01
tampered = base64.b64encode(bytes(raw)).decode("ascii")
with pytest.raises(Nip44MacError):
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
def test_flipped_ciphertext_byte_rejected(self):
raw = bytearray(base64.b64decode(self._payload()))
# Flip a byte in the middle of the ciphertext segment
# (version[1] + nonce[32..32] + ciphertext[33..-32] + mac[-32..])
ct_start = 1 + 32
raw[ct_start + 5] ^= 0x01
tampered = base64.b64encode(bytes(raw)).decode("ascii")
with pytest.raises(Nip44MacError):
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
def test_flipped_nonce_byte_rejected(self):
raw = bytearray(base64.b64decode(self._payload()))
# Nonce starts at byte 1 (after version)
raw[1] ^= 0x01
tampered = base64.b64encode(bytes(raw)).decode("ascii")
with pytest.raises(Nip44MacError):
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
def test_wrong_recipient_privkey_rejected(self):
"""The MAC is derived from the conversation key, so a wrong
recipient privkey produces a different conversation key
different hmac_key MAC verification fails. (Doesn't decrypt
to garbage; fails fast.)"""
sec_three = "00" * 31 + "03"
with pytest.raises(Nip44MacError):
decrypt_from(self._payload(), sec_three, _PUB_ONE)
class TestVersionRejection:
def test_v1_byte_rejected(self):
raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO)))
raw[0] = 0x01
bad = base64.b64encode(bytes(raw)).decode("ascii")
with pytest.raises(Nip44VersionError):
decrypt_from(bad, _SEC_TWO, _PUB_ONE)
def test_unknown_version_byte_rejected(self):
raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO)))
raw[0] = 0xFF
bad = base64.b64encode(bytes(raw)).decode("ascii")
with pytest.raises(Nip44VersionError):
decrypt_from(bad, _SEC_TWO, _PUB_ONE)
class TestLengthGuards:
def test_empty_plaintext_rejected(self):
with pytest.raises(Nip44LengthError):
encrypt_for("", _SEC_ONE, _PUB_TWO)
def test_plaintext_at_max_length_accepted(self):
plaintext = "x" * 65535
payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
assert decrypt_from(payload, _SEC_TWO, _PUB_ONE) == plaintext
def test_plaintext_over_max_rejected(self):
with pytest.raises(Nip44LengthError):
encrypt_for("x" * 65536, _SEC_ONE, _PUB_TWO)
def test_invalid_base64_payload_rejected(self):
with pytest.raises(Nip44LengthError):
decrypt_from("not!!!base64@@@", _SEC_TWO, _PUB_ONE)
def test_payload_too_short_rejected(self):
# 50 bytes is well under the 99-byte minimum
too_short = base64.b64encode(b"\x02" + b"\x00" * 49).decode("ascii")
with pytest.raises(Nip44LengthError):
decrypt_from(too_short, _SEC_TWO, _PUB_ONE)
class TestPaddingFormula:
"""Spot-check the _calc_padded_len formula against hand-computed cases.
Locks in the NIP-44 v2 padding scheme so a refactor can't silently
break wire compatibility (which would only surface as cross-impl
decryption failures exactly what test_decrypts_bitspire_sample is
meant to catch end-to-end, but a unit test here is cheaper)."""
@pytest.mark.parametrize(
"plaintext_len,expected_padded",
[
(1, 32), # <= 32 → 32
(16, 32),
(32, 32),
(33, 64), # > 32 → next chunk
(64, 64),
(
65,
96,
), # chunk = 32 for L=65 (next_power(64) = 64; 64//8 = 8; max(32, 8) = 32)
(100, 128),
(128, 128),
# L=129: next_power(128) = 1<<8 = 256; chunk = max(32, 256//8) = 32;
# padded = 32 * (128//32 + 1) = 32 * 5 = 160.
(129, 160),
(256, 256), # chunk = 32 for L=256 (next_power(255)=256; max(32, 32) = 32)
(257, 320),
(
1000,
1024,
), # chunk = 128 for L=1000 (next_power(999)=1024; max(32, 128) = 128)
],
)
def test_calc_padded_len(self, plaintext_len, expected_padded):
assert _calc_padded_len(plaintext_len) == expected_padded
# =============================================================================
# Layer 3 — byte-compat cross-test against nostr-tools (bitspire's impl)
# =============================================================================
# -----------------------------------------------------------------------------
# Bitspire-side v1.1 fixture, posted to ~/dev/coordination/log.md at
# 2026-05-30T19:00Z. Positions-keyed wire shape per the v1.1 redesign
# (18:30Z + 18:45Z); intentionally includes two positions sharing
# denomination=20 to exercise the multi-same-denom round-trip on our
# decrypt + payload-validate path. Throwaway keypairs (one-shot, never
# sign anything else) — safe to embed verbatim.
# Generated by apps/machine/src/services/operator-config.ts-shape code
# path using the @bitSpire/nostr-client encryptContentV2 +
# createSignedEvent helpers (same code the production bootstrap publish
# uses). Round-tripped on bitspire side via decryptContentV2 before posting.
# -----------------------------------------------------------------------------
_BITSPIRE_FIXTURE = {
"atm_keypair": {
"privkey_hex": (
"814e6188d017102bbf301ba5b38fba95b2556dc79a60df4cd50605c4593578e6"
),
"pubkey_hex": (
"217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b"
),
},
"operator_keypair": {
"privkey_hex": (
"cca7dd9fe4874f6b9f3f3fae21648da686b7e714bfd4786e8fa8745933fd3185"
),
"pubkey_hex": (
"49bd8e615769f8b6a5aa8ce9617b919996abecf234599ba196789461cf239146"
),
},
"expected_plaintext": {
"positions": {
"1": {"denomination": 20, "count": 49},
"2": {"denomination": 20, "count": 38},
"3": {"denomination": 50, "count": 100},
},
},
"event": {
"kind": 30078,
"content": (
"AqOHsCcjN2W8L/Cx0uH+n++VA13W+wy7z1EcuuNX49sSagelX2lI0HEKyd+ActOc"
"iaPsHrp9ecJTkEZOD86ioldbLbEVColJwK4g1uVZSbpDeqRe+97woxVDqPnzj507"
"tFaVLF/dRmda+oKHUzkVPhE4PHQJzp9Fqji38J3nU6N68qo7KOt3qg1nSy5eDfAu"
"zt7djRBx63+/veub0rWTMMQLBgci8+Ms6Y+Zb1mki3L6NWuIR0Or+8DhcD+ZJiOu"
"WTcx"
),
"tags": [
[
"d",
"bitspire-cassettes-state:"
"217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b",
],
[
"p",
"49bd8e615769f8b6a5aa8ce9617b919996abecf234599ba196789461cf239146",
],
],
"created_at": 1780173222,
"pubkey": ("217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b"),
"id": ("72c09f333386dd4ad6125f8c69823824eea50d8091b694458bcd60701517eece"),
"sig": (
"07ecafacf0169f074e564a999ee1c31446930b43391d007c4a1f9ef7ad890d6c"
"2aa6e3ecc5318edeb5748fbd64c7ca33407099a97154e2ff7e0c626e48d71925"
),
},
}
class TestBitspireCrossTest:
"""Byte-compat cross-test between our hand-rolled NIP-44 v2 (`nip44.py`)
and the nostr-tools NIP-44 v2 impl that bitspire uses on the ATM side
(via @bitSpire/nostr-client). If these tests pass, the wire format
agrees across both implementations and the joint round-trip (operator
publish ATM apply / ATM bootstrap operator consume) is byte-safe.
If any fail, the spec ambiguity surfaces before sintra ships."""
def test_decrypts_bitspire_sample_event(self):
"""The load-bearing assertion: our `decrypt_from` recovers the
expected `{"positions": {...}}` plaintext from bitspire's encrypted
event content. v1.1 fixture intentionally exercises the multi-same-
denomination round-trip (positions 1 + 2 both hold $20)."""
import json
event = _BITSPIRE_FIXTURE["event"]
operator_privkey = _BITSPIRE_FIXTURE["operator_keypair"]["privkey_hex"]
from ..nip44 import decrypt_from
plaintext = decrypt_from(
event["content"],
operator_privkey,
event["pubkey"],
)
payload = json.loads(plaintext)
assert payload == _BITSPIRE_FIXTURE["expected_plaintext"]
# v1.1 invariant: two positions can carry the same denomination.
# Pin it explicitly so a future "fix" that re-introduces denom-
# uniqueness validation surfaces here instead of as a runtime
# rejection on real machines.
assert payload["positions"]["1"]["denomination"] == 20
assert payload["positions"]["2"]["denomination"] == 20
assert payload["positions"]["1"]["count"] != payload["positions"]["2"]["count"]
def test_signature_verifies_via_lnbits_helper(self):
"""Optional extra per bitspire's 13:15Z note (3). The consumer
path runs verify_event before NIP-44 decrypt locking the sig-
algorithm agreement here means both sides hash the event id the
same way + Schnorr-verify under the same x-only public-key
convention."""
from lnbits.utils.nostr import verify_event
assert verify_event(_BITSPIRE_FIXTURE["event"]) is True
def test_encrypt_round_trip_via_our_impl_decrypts_with_their_keys(self):
"""Optional extra per bitspire's 13:15Z note (3). Encrypt the
expected plaintext using OUR impl with the ATM keypair as
sender + operator pubkey as recipient. The resulting ciphertext
won't be byte-identical to the fixture (NIP-44 v2 nonces are
random) but it MUST decrypt back to the same plaintext when
passed to our decrypt path. Locks the encrypt direction too,
not just decrypt."""
import json
from ..nip44 import decrypt_from, encrypt_for
plaintext = json.dumps(
_BITSPIRE_FIXTURE["expected_plaintext"], separators=(",", ":")
)
atm_sec = _BITSPIRE_FIXTURE["atm_keypair"]["privkey_hex"]
atm_pub = _BITSPIRE_FIXTURE["atm_keypair"]["pubkey_hex"]
op_sec = _BITSPIRE_FIXTURE["operator_keypair"]["privkey_hex"]
op_pub = _BITSPIRE_FIXTURE["operator_keypair"]["pubkey_hex"]
our_ciphertext = encrypt_for(plaintext, atm_sec, op_pub)
recovered = decrypt_from(our_ciphertext, op_sec, atm_pub)
assert json.loads(recovered) == _BITSPIRE_FIXTURE["expected_plaintext"]
# The two ciphertexts SHOULD differ (random nonce per encrypt)
assert our_ciphertext != _BITSPIRE_FIXTURE["event"]["content"]

View file

@ -12,6 +12,13 @@ from lnbits.core.crud import get_wallet
from lnbits.core.models import User
from lnbits.decorators import check_super_user, check_user_exists
from .cassette_transport import (
CassetteTransportError,
OperatorIdentityMissing,
RelayUnavailable,
SignerUnavailable,
publish_to_atm,
)
from .crud import (
append_settlement_note,
count_completed_legs_for_settlement,
@ -39,9 +46,11 @@ from .crud import (
get_settlements_for_operator,
get_stuck_settlements_for_operator,
get_super_config,
list_cassette_configs_for_machine,
lp_is_onboarded,
replace_commission_splits,
reset_settlement_for_retry,
update_cassette_config,
update_dca_client,
update_deposit,
update_deposit_status,
@ -55,6 +64,7 @@ from .distribution import (
)
from .models import (
AppendSettlementNoteData,
CassetteConfig,
ClientBalanceSummary,
CommissionSplit,
CreateDcaClientData,
@ -66,6 +76,7 @@ from .models import (
DcaSettlement,
Machine,
PartialDispenseData,
PublishCassettesPayload,
SetCommissionSplitsData,
SettleBalanceData,
StuckSettlementsResponse,
@ -75,6 +86,7 @@ from .models import (
UpdateDepositStatusData,
UpdateMachineData,
UpdateSuperConfigData,
UpsertCassetteConfigData,
)
satmachineadmin_api_router = APIRouter()
@ -759,3 +771,127 @@ async def api_update_super_config(
HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config"
)
return config
# =============================================================================
# Cassette configs (#29 v1.1) — per-machine ATM cassette inventory
# =============================================================================
# v1.1 surface, paired with aiolabs/lamassu-next#56 ATM-side. Two endpoints:
# GET /machines/{id}/cassettes — list rows for the operator UI
# POST /machines/{id}/cassettes/publish — apply edits + publish kind-30078
#
# Row creation (new (machine_id, position) pairs) is admin-only via the
# bootstrap consumer task — slot count is hardware-determined. Operator-
# side flow is edit-and-publish over the existing rows only; the editable
# fields per row are denomination and count.
@satmachineadmin_api_router.get(
"/api/v1/dca/machines/{machine_id}/cassettes",
response_model=list[CassetteConfig],
)
async def api_list_machine_cassettes(
machine_id: str, user: User = Depends(check_user_exists)
) -> list[CassetteConfig]:
"""List the cassette config rows for one of the operator's machines,
ordered by position. Empty list = ATM hasn't yet published its
bootstrap event (or the bootstrap consumer hasn't processed it yet);
UI should show a "waiting for ATM" state."""
await _machine_owned_by(machine_id, user.id)
return await list_cassette_configs_for_machine(machine_id)
@satmachineadmin_api_router.post(
"/api/v1/dca/machines/{machine_id}/cassettes/publish",
response_model=list[CassetteConfig],
)
async def api_publish_machine_cassettes(
machine_id: str,
payload: PublishCassettesPayload,
user: User = Depends(check_user_exists),
) -> list[CassetteConfig]:
"""Operator submits the full per-machine cassette state for publish to
the ATM. Validates the position set matches what's currently in
cassette_configs for the machine (slot count is hardware-fixed),
upserts each row, then encrypts + signs + publishes a kind-30078
event tagged with d=bitspire-cassettes:<atm_pubkey_hex> and
p=<atm_pubkey_hex>.
The `<m>` placeholder in the published d-tag is the ATM's hex pubkey
from machine.machine_npub (canonicalised via normalize_public_key),
NOT the internal dca_machines.id UUID see #29 'machine_id semantics'
section and coord-log 2026-05-30T11:50Z load-bearing nudge.
Returns the fresh cassette_configs rows after the upserts so the UI
can refresh its table from one round-trip.
Errors:
400 payload position set doesn't match the machine's stored set
(operator publishing for a slot that doesn't exist on the
ATM; or the bootstrap hasn't landed yet so no rows exist)
400 operator hasn't onboarded a Nostr identity
503 signer offline / client-side-only, or nostrclient extension
not installed on this LNbits instance
500 anything else from the publish path
"""
machine = await _machine_owned_by(machine_id, user.id)
existing = await list_cassette_configs_for_machine(machine_id)
existing_positions = {row.position for row in existing}
incoming_positions = set(payload.positions.keys())
if not existing:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
"No cassette_configs rows exist for this machine yet — "
"waiting for the ATM's bootstrap state event. Power on the "
"ATM and confirm it has reached the configured relay; "
"satmachineadmin will auto-populate cassette_configs on "
"receipt."
),
)
if existing_positions != incoming_positions:
missing = existing_positions - incoming_positions
extra = incoming_positions - existing_positions
raise HTTPException(
HTTPStatus.BAD_REQUEST,
(
"Payload position set doesn't match the machine's stored "
f"set. Missing from payload: {sorted(missing)}; extra in "
f"payload: {sorted(extra)}. Slot count is hardware-fixed "
"— re-provision the ATM via atm-tui to add/remove physical "
"bays, then re-publish."
),
)
# Apply each per-row edit so the operator-believed state on
# satmachineadmin reflects the published payload, even if the ATM
# ack lands later (v2). updated_by audit-stamps the operator user id.
for pos, row in payload.positions.items():
updated = await update_cassette_config(
machine_id,
pos,
UpsertCassetteConfigData(denomination=row.denomination, count=row.count),
updated_by=user.id,
)
if updated is None:
# Defensive — we just validated the row exists, but a
# concurrent delete could land between. Surface as 500.
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"cassette row for position {pos} disappeared mid-publish",
)
try:
await publish_to_atm(machine, payload, user.id)
except OperatorIdentityMissing as exc:
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
except SignerUnavailable as exc:
raise HTTPException(HTTPStatus.SERVICE_UNAVAILABLE, str(exc)) from exc
except RelayUnavailable as exc:
raise HTTPException(HTTPStatus.SERVICE_UNAVAILABLE, str(exc)) from exc
except CassetteTransportError as exc:
raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR, str(exc)) from exc
return await list_cassette_configs_for_machine(machine_id)