feat(v2): cassette_transport — kind-30078 publish + decrypt (#29 v1)
Some checks failed
ci.yml / feat(v2): cassette_transport — kind-30078 publish + decrypt (#29 v1) (pull_request) Failing after 0s

The Nostr-wire layer for operator ↔ ATM cassette config. Owns both
directions:

  operator → ATM (publish_to_atm):
    build PublishCassettesPayload → NIP-44 v2 encrypt to ATM pubkey →
    sign as operator via _sign_as_operator hybrid → publish through
    nostrclient.router.nostr_client.relay_manager
    d-tag: bitspire-cassettes:<atm_pubkey_hex>
    p-tag: <atm_pubkey_hex>

  ATM → operator (decrypt_and_parse_state_event):
    consumer task feeds inbound events (already sig-verified by the
    subscription layer); we NIP-44 v2 decrypt with operator privkey +
    event sender pubkey, JSON-parse, validate as PublishCassettesPayload
    d-tag: bitspire-cassettes-state:<atm_pubkey_hex>
    p-tag: <operator_pubkey_hex>

`_sign_as_operator` recovers the hybrid signer pattern from commits
131ff92 / e13178d (removed in dcd0874 for the NIP-78 fleet rip): tries
`from lnbits.core.signers import resolve_signer` first (post-#17 path),
falls back to a direct `account.prvkey` read for pre-#17 lnbits hosts.
Both paths produce identical signed events. Unlike the prior fleet-
publish that soft-failed on missing identity (CRUD side-effect), this
publish is operator-initiated so missing identity raises
OperatorIdentityMissing for the API to surface as 400.

`_atm_hex_pubkey(machine)` centralises the `<m>` placeholder rule from
the 2026-05-30T11:50Z coord-log entry: always normalize_public_key on
machine.machine_npub, NEVER use the internal dca_machines.id UUID. The
build_state_d_tags_for_machines helper exposes the canonical d-tag
list for the consumer subscription filter to use.

Typed errors map cleanly to HTTP statuses in the API caller:
  - OperatorIdentityMissing → 400 (operator hasn't onboarded)
  - SignerUnavailable → 503 (signer offline / client-side-only)
  - RelayUnavailable → 503 (nostrclient not installed)
  - CassetteEventDecodeError → consumer-side log + skip (never crash)

NIP-44 v2 ECDH needs the raw operator scalar, which the signer
abstraction's high-level sign_event doesn't expose. v1 reads
account.prvkey directly (same surface as the pre-#17 sign fallback);
post-bunker (lnbits#18) this becomes a NIP-44-over-bunker RPC and the
operator nsec leaves the LNbits host — v2 follow-up.

Smoke-tested via docker exec: round-trip publish (build → encrypt →
parse) of the realistic {"denominations": {"20": ..., "50": ...}}
payload; tamper detection on a corrupted content field; malformed
pubkey rejection.

Full suite: 132 passed, 1 skipped, 1 pre-existing async-plugin failure.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-30 18:14:16 +02:00
commit b9d5ea3c57

370
cassette_transport.py Normal file
View file

@ -0,0 +1,370 @@
"""
Cassette-config Nostr transport operator ATM kind-30078 publish + consume.
Per the locked design at aiolabs/satmachineadmin#29 (paired with
lamassu-next#56) and the dcd0874 privacy-by-default pivot, the operator
publishes denomination-keyed cassette config to a target ATM via:
kind = 30078 (NIP-78, replaceable)
tags = [
["d", "bitspire-cassettes:<atm_pubkey_hex>"],
["p", "<atm_pubkey_hex>"]
]
content = NIP-44 v2 encrypted JSON of PublishCassettesPayload.to_wire_dict()
pubkey = operator pubkey
sig = operator signature
The ATM-side consumer (lamassu-next#56) subscribes by the d-tag + its own
npub, decrypts, validates, applies, hot-reloads HAL.
Reverse direction (ATM operator, v1 = one-shot bootstrap on first boot,
v2 = continuous reverse channel for reconciliation):
kind = 30078
tags = [
["d", "bitspire-cassettes-state:<atm_pubkey_hex>"],
["p", "<operator_pubkey_hex>"]
]
content = NIP-44 v2 encrypted JSON, same PublishCassettesPayload shape
pubkey = ATM pubkey
This module owns the wire-format side of both directions. The consumer
task (tasks.py) calls `decrypt_and_parse_state_event` per incoming event;
the API endpoint (views_api.py) calls `publish_to_atm` per operator submit.
The `<m>` placeholder semantics (load-bearing per the 2026-05-30T11:50Z
coord-log entry): always the ATM's hex pubkey, NEVER satmachineadmin's
internal dca_machines.id UUID. Helper `_atm_hex_pubkey(machine)`
centralises the canonicalisation via lnbits.utils.nostr.normalize_public_key.
"""
from __future__ import annotations
import json
import time
from typing import Optional
import coincurve
from lnbits.core.crud.users import get_account
from lnbits.utils.nostr import normalize_public_key, sign_event
from loguru import logger
from .models import Machine, PublishCassettesPayload
from .nip44 import (
Nip44Error,
decrypt_with_conversation_key,
encrypt_with_conversation_key,
get_conversation_key,
)
_KIND_NIP78 = 30078
_D_TAG_CONFIG_PREFIX = "bitspire-cassettes:" # operator → ATM
_D_TAG_STATE_PREFIX = "bitspire-cassettes-state:" # ATM → operator
# =============================================================================
# Errors
# =============================================================================
class CassetteTransportError(Exception):
"""Generic transport-layer error. Subclasses distinguish failure modes
so the API can surface meaningful HTTP statuses + the consumer task
can log + skip without crashing."""
class OperatorIdentityMissing(CassetteTransportError):
"""Operator account has no Nostr pubkey on file, or no signer is
available (pre-bunker rollout operator hasn't onboarded via
Nostr-login)."""
class SignerUnavailable(CassetteTransportError):
"""Resolved signer can't sign server-side (client-side-only signer,
or transient bunker unreachability post-lnbits#18). Publish skipped."""
class RelayUnavailable(CassetteTransportError):
"""nostrclient extension isn't installed or its relay manager isn't
reachable. Treated as soft-fail; publish skipped + logged."""
class CassetteEventDecodeError(CassetteTransportError):
"""Inbound state event failed validation: bad signature, NIP-44 v2
decrypt failure, or payload didn't conform to PublishCassettesPayload."""
# =============================================================================
# Helpers — canonical pubkey + d-tag construction
# =============================================================================
def _atm_hex_pubkey(machine: Machine) -> str:
"""Canonicalise machine.machine_npub (hex OR npub bech32 — operator
enters either in the UI) to lowercase hex. ALL d-tag substitutions
use this value; using the internal machine.id UUID would silently
no-op the wire-level filter (per coord-log 11:50Z load-bearing nudge).
"""
return normalize_public_key(machine.machine_npub).lower()
def _config_d_tag(atm_pubkey_hex: str) -> str:
"""d-tag for operator → ATM publish. ATM subscribes by this tag."""
return f"{_D_TAG_CONFIG_PREFIX}{atm_pubkey_hex}"
def _state_d_tag(atm_pubkey_hex: str) -> str:
"""d-tag for ATM → operator publish (bootstrap in v1, continuous v2)."""
return f"{_D_TAG_STATE_PREFIX}{atm_pubkey_hex}"
def build_state_d_tags_for_machines(machines: list[Machine]) -> list[str]:
"""Bootstrap-consumer subscription filter helper: returns the full
`#d=[...]` list for all known ATMs an operator subscribes to."""
return [_state_d_tag(_atm_hex_pubkey(m)) for m in machines]
# =============================================================================
# Sign-as-operator — hybrid path (resolve_signer post #17, prvkey fallback)
# =============================================================================
async def _sign_as_operator(
operator_user_id: str, event: dict
) -> Optional[dict]:
"""Sign `event` using the operator's stored Nostr identity.
Mutates `event` to add `created_at` (now), `pubkey`, `id`, and `sig`.
Returns the signed event, or raises a typed CassetteTransportError
on a hard failure the caller should surface to the operator.
Routing: post-`aiolabs/lnbits#17` (signer abstraction) we go through
`lnbits.core.signers.resolve_signer`, which transparently handles
LocalSigner (envelope-encrypted nsec at rest, decrypted on demand)
and ClientSideOnlySigner (raises SignerUnavailableError). On pre-#17
lnbits versions the import fails and we fall back to a direct
`account.prvkey` read. Both paths produce identical signed events.
Pattern preserved from the removed nostr_publish.py at commit
e13178d / 131ff92 recovered here for the cassette transport.
Unlike the prior fleet-publish path (which soft-failed on missing
operator identity since the publish was a CRUD side-effect), the
cassette publish is operator-initiated so missing identity is a hard
error surfaced as HTTP 400 by the API caller.
"""
account = await get_account(operator_user_id)
if account is None or not account.pubkey:
raise OperatorIdentityMissing(
f"operator {operator_user_id[:8]}... has no Nostr pubkey on file. "
"Onboard via the LNbits Nostr-login flow to publish cassette "
"config to your ATMs."
)
# created_at is part of the BIP-340 event-id hash; must be set before
# signing so both code paths below see the same value.
event["created_at"] = int(time.time())
try:
from lnbits.core.signers import ( # type: ignore[import-not-found]
SignerError,
SignerUnavailableError,
resolve_signer,
)
except ImportError:
# Pre-#17 lnbits — direct prvkey read. Removed once the #17
# cascade lands on every host that runs this extension.
if not account.prvkey:
raise OperatorIdentityMissing(
f"operator {operator_user_id[:8]}... has no signing key "
"on file (pre-lnbits#17 path). Onboard via Nostr-login or "
"wait for aiolabs/lnbits#18 bunker integration."
)
private_key = coincurve.PrivateKey(bytes.fromhex(account.prvkey))
return sign_event(event, account.pubkey, private_key)
# Post-#17 lnbits — route through the signer abstraction.
try:
signer = resolve_signer(account)
except SignerError as exc:
raise SignerUnavailable(
f"signer resolve failed for operator {operator_user_id[:8]}...: "
f"{exc}"
) from exc
if not signer.can_sign():
raise SignerUnavailable(
f"operator {operator_user_id[:8]}... has a client-side-only "
"signer; server can't publish on their behalf. Wait for bunker "
"integration (lnbits#18) or operator-driven publishing."
)
try:
return signer.sign_event(event)
except SignerUnavailableError as exc:
raise SignerUnavailable(
f"signer unavailable for operator {operator_user_id[:8]}...: "
f"{exc}"
) from exc
async def _get_operator_privkey_hex(operator_user_id: str) -> str:
"""Fetch the operator's signing key hex for NIP-44 v2 encryption.
NIP-44 v2 ECDH needs the raw private scalar, which the signer
abstraction's high-level `sign_event` doesn't expose. For v1 we
read `account.prvkey` directly same surface that the pre-#17
fallback in `_sign_as_operator` uses. Post-bunker (lnbits#18)
this becomes a NIP-44-over-bunker call routed through the bunker
client (the operator's nsec never leaves the bunker process), but
that path is v2 follow-up.
Raises OperatorIdentityMissing on missing keys.
"""
account = await get_account(operator_user_id)
if account is None or not account.prvkey:
raise OperatorIdentityMissing(
f"operator {operator_user_id[:8]}... has no signing key on "
"file; can't NIP-44 v2 encrypt the cassette payload to the "
"ATM. Onboard via the LNbits Nostr-login flow."
)
return account.prvkey
# =============================================================================
# Publish — operator → ATM (the satmachineadmin API path)
# =============================================================================
async def _publish_signed_event(signed_event: dict) -> None:
"""Send a signed Nostr event to all relays via the nostrclient
extension's singleton RelayManager.
Lazy import + typed-error so the API can surface "your LNbits doesn't
have nostrclient installed" as a 503 rather than a 500. Pattern
matches the cross-extension import guards in
`lnbits.core.services.users` (nostrmarket / nostrrelay).
"""
try:
from nostrclient.router import ( # type: ignore[import-not-found]
nostr_client,
)
except ImportError as exc:
raise RelayUnavailable(
"nostrclient extension is not installed; cassette config "
"publish requires it. Install + activate the nostrclient "
"extension on this LNbits instance."
) from exc
msg = json.dumps(["EVENT", signed_event])
nostr_client.relay_manager.publish_message(msg)
async def publish_to_atm(
machine: Machine,
payload: PublishCassettesPayload,
operator_user_id: str,
) -> dict:
"""Build, encrypt, sign, and publish a kind-30078 cassette config event
from the operator to the target ATM.
Returns the signed event dict on success (caller may log event.id for
audit). Raises CassetteTransportError subclasses on hard failures:
- OperatorIdentityMissing 400: operator hasn't onboarded
- SignerUnavailable 503: signer offline / client-side-only
- RelayUnavailable 503: nostrclient not installed
- CassetteTransportError 500: anything else
"""
atm_pubkey_hex = _atm_hex_pubkey(machine)
# Build the NIP-44 v2 encrypted content using the operator's privkey
# as sender and the ATM pubkey as recipient.
operator_privkey_hex = await _get_operator_privkey_hex(operator_user_id)
plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":"))
conversation_key = get_conversation_key(operator_privkey_hex, atm_pubkey_hex)
content = encrypt_with_conversation_key(plaintext, conversation_key)
event: dict = {
"kind": _KIND_NIP78,
"tags": [
["d", _config_d_tag(atm_pubkey_hex)],
["p", atm_pubkey_hex],
],
"content": content,
}
signed = await _sign_as_operator(operator_user_id, event)
# _sign_as_operator raises on hard failure; a None return would mean
# an unexpected soft-path slipped through — treat as hard error here.
if signed is None:
raise CassetteTransportError(
"sign_as_operator returned None unexpectedly — soft-fail path "
"shouldn't be reachable on a publish-initiated flow"
)
await _publish_signed_event(signed)
logger.info(
f"satmachineadmin: published kind-30078 cassette config to ATM "
f"{atm_pubkey_hex[:12]}... (event_id={signed['id'][:12]}..., "
f"machine_id={machine.id}, denominations={list(payload.denominations.keys())})"
)
return signed
# =============================================================================
# Consume — ATM → operator (the bootstrap consumer task)
# =============================================================================
def decrypt_and_parse_state_event(
event: dict, operator_privkey_hex: str
) -> PublishCassettesPayload:
"""Decrypt + parse an inbound `bitspire-cassettes-state:<atm_pubkey_hex>`
event the ATM published toward the operator. Caller is responsible
for:
- filtering on `kind=30078` and the expected `#d` tag list
- verifying the event signature (lnbits.utils.nostr.verify_event)
- confirming `event["pubkey"]` matches a known ATM in the operator's
machines table (the d-tag suffix == event pubkey == machine.machine_npub
canonicalised)
This function does:
- NIP-44 v2 decrypt of event["content"] using the sender's pubkey
from event["pubkey"] and the operator's privkey
- JSON parse + PublishCassettesPayload validation
Raises CassetteEventDecodeError on any decode/validate failure.
"""
sender_pubkey = event.get("pubkey")
content = event.get("content")
if not isinstance(sender_pubkey, str) or not isinstance(content, str):
raise CassetteEventDecodeError(
"event missing required pubkey or content fields"
)
try:
conversation_key = get_conversation_key(
operator_privkey_hex, sender_pubkey
)
plaintext = decrypt_with_conversation_key(content, conversation_key)
except Nip44Error as exc:
raise CassetteEventDecodeError(
f"NIP-44 v2 decrypt failed: {exc}"
) from exc
except ValueError as exc:
# coincurve raises ValueError on a malformed pubkey hex.
raise CassetteEventDecodeError(
f"sender pubkey is malformed: {exc}"
) from exc
try:
raw = json.loads(plaintext)
except json.JSONDecodeError as exc:
raise CassetteEventDecodeError(
f"decrypted content isn't valid JSON: {exc}"
) from exc
try:
return PublishCassettesPayload(**raw)
except Exception as exc: # noqa: BLE001 — Pydantic raises various subclasses
raise CassetteEventDecodeError(
f"payload didn't validate as PublishCassettesPayload: {exc}"
) from exc