feat(v2): bootstrap consumer task — auto-populate cassette_configs (#29 v1)
Some checks failed
ci.yml / feat(v2): bootstrap consumer task — auto-populate cassette_configs (#29 v1) (pull_request) Failing after 0s

Long-running task wired into satmachineadmin_start that subscribes to
kind-30078 bitspire-cassettes-state:<atm_pubkey_hex> events from every
active machine's ATM and upserts cassette_configs via apply_bootstrap_state
on receipt. Pairs with bitspire's one-shot bootstrap publish in
aiolabs/lamassu-next#56 — operator's first config publish then validates
against a non-empty denomination set.

Pattern mirrors wait_for_paid_invoices (try/except per event, never lets
the loop die). Uses the same nostr_client.relay_manager singleton that
cassette_transport.publish_to_atm uses, just on the subscribe side.

Implementation: poll the singleton NostrRouter.received_subscription_events
dict keyed by our subscription_id (satmachineadmin-cassette-bootstrap).
This is the same drain pattern nostrclient's per-WebSocket NostrRouter
uses; since we use a distinct sub_id, no cross-contamination with
WebSocket-connected clients of nostrclient.

Filter is re-derived from active machines each tick — newly-added
machines start receiving bootstrap events without an LNbits restart.

Soft-fail surfaces (none crash the listener):
  - nostrclient extension not installed → log + 30s backoff
  - inbound event sig-verify fails → log + skip
  - sender pubkey not in dca_machines → log + skip (relay noise)
  - operator privkey not on file → log + skip
  - NIP-44 v2 decrypt / payload validation fails → log + skip
  - apply_bootstrap_state error → log + skip

Per-event handler routes to the right operator's privkey by looking up
the machine via get_machine_by_atm_pubkey_hex (O(N) over active
machines — fine for small fleets; if fleets grow, normalize machine_npub
at write + add an index).

CRUD additions:
  - list_all_active_machines: cross-operator query for the subscription
    filter
  - get_machine_by_atm_pubkey_hex: route inbound events to the right
    machine row + operator account; accepts hex or bech32 storage

14 tests in test_cassette_state_consumer.py covering:
  - decrypt_and_parse_state_event happy path + 6 negative paths (tamper,
    wrong privkey, malformed pubkey, missing fields, garbage JSON,
    wrong-shape payload)
  - d-tag construction regression guard (REGRESSION GUARD: d-tag uses
    ATM hex pubkey not internal UUID — pins the load-bearing detail
    from coord-log 11:50Z)
  - build_state_d_tags_for_machines + bech32 → hex canonicalisation

Full handler dispatch (verify_event → get_machine_by_atm_pubkey_hex →
apply_bootstrap_state) needs a live LNbits DB; smoke-tested manually
per the existing project convention.

Total: 146 passed, 1 skipped (cross-test fixture pending), 1 pre-existing
async-plugin failure unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-30 18:19:15 +02:00
commit e57a73083e
4 changed files with 535 additions and 1 deletions

224
tasks.py
View file

@ -25,6 +25,7 @@
# sat-amount invariants (range/sum).
import asyncio
from typing import Optional
from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
@ -237,3 +238,226 @@ async def _record_rejected(
f"(machine={machine.machine_npub[:12]}..., "
f"payment_hash={payment.payment_hash[:12]}...): {exc}"
)
# =============================================================================
# Cassette bootstrap consumer (#29 v1)
# =============================================================================
# Subscribes to kind-30078 bitspire-cassettes-state:<atm_pubkey_hex> events
# published by each active machine's ATM on first boot (lamassu-next#56's
# bootstrap publish path). Decrypts the NIP-44 v2 content with the operator's
# privkey + ATM sender pubkey, validates as PublishCassettesPayload, and
# upserts cassette_configs via apply_bootstrap_state.
#
# v1 = one-shot per machine (ATM's meta.bootstrapPublishedAt makes the
# publish idempotent on ATM-side restart; satmachineadmin's apply_bootstrap_
# state dedups on state_event_id for relay re-delivery).
#
# v2 (separate issue) = continuous reverse-channel consumer with a
# last_state_created_at watermark for reconciliation UI.
#
# Implementation: polls nostrclient.router.NostrRouter.received_subscription_
# events keyed by our subscription_id. nostrclient's NostrRouter design is
# per-WebSocket-client; the singleton dict it drains into is the only
# server-side hook to consume events without standing up an in-process
# websocket. The relay manager is the same singleton publish_to_atm uses,
# so add_subscription registers a filter against the same relay pool.
CASSETTE_BOOTSTRAP_SUB_ID = "satmachineadmin-cassette-bootstrap"
_CASSETTE_POLL_INTERVAL_S = 2.0
_CASSETTE_BACKOFF_S = 30.0 # when nostrclient isn't installed yet
async def wait_for_cassette_state_events() -> None:
"""Long-running task: subscribe to bitspire-cassettes-state events from
every active machine's ATM and upsert cassette_configs on receipt.
Pattern mirrors wait_for_paid_invoices (try/except wraps each event,
never lets the loop die). Re-derives the subscription filter on each
tick from the current active-machines list newly-added machines
start receiving bootstrap events without an LNbits restart.
Soft-fail surfaces:
- nostrclient not installed log + sleep _CASSETTE_BACKOFF_S
between retries (operator may install it later)
- inbound event fails sig-verify / decrypt / parse log + skip
the event, continue the loop
- apply_bootstrap_state errors log + skip
"""
logger.info(
"satmachineadmin v2: cassette bootstrap consumer starting "
f"(sub_id={CASSETTE_BOOTSTRAP_SUB_ID})"
)
current_filter_key: Optional[str] = None
while True:
try:
current_filter_key = await _cassette_consumer_tick(current_filter_key)
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
except _NostrclientUnavailable:
logger.warning(
"satmachineadmin: nostrclient extension not installed; "
f"cassette bootstrap consumer sleeping {_CASSETTE_BACKOFF_S}s "
"before retry. Install + activate nostrclient on this "
"LNbits instance."
)
current_filter_key = None
await asyncio.sleep(_CASSETTE_BACKOFF_S)
except Exception as exc: # listener must never die
logger.error(
f"satmachineadmin: cassette consumer loop error (continuing): "
f"{exc}"
)
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
class _NostrclientUnavailable(Exception):
"""Internal sentinel — nostrclient extension import failed. Caller
sleeps a backoff then retries; the operator may install nostrclient
at any time."""
async def _cassette_consumer_tick(current_filter_key: Optional[str]) -> str:
"""Single iteration of the bootstrap-consumer loop. Returns the filter
key used this tick so the caller can detect filter-set changes.
Raises _NostrclientUnavailable if nostrclient can't be imported (the
outer loop backs off + retries).
"""
try:
from nostrclient.router import ( # type: ignore[import-not-found]
NostrRouter,
nostr_client,
)
except ImportError as exc:
raise _NostrclientUnavailable() from exc
from .cassette_transport import build_state_d_tags_for_machines
from .crud import (
apply_bootstrap_state,
get_machine_by_atm_pubkey_hex,
list_all_active_machines,
)
machines = await list_all_active_machines()
d_tags = build_state_d_tags_for_machines(machines)
filter_key = ",".join(sorted(d_tags))
if filter_key != current_filter_key:
if d_tags:
filters = [{"kinds": [30078], "#d": d_tags}]
nostr_client.relay_manager.add_subscription(
CASSETTE_BOOTSTRAP_SUB_ID, filters
)
logger.info(
"satmachineadmin: (re)registered cassette bootstrap "
f"subscription with {len(d_tags)} d-tag(s)"
)
else:
nostr_client.relay_manager.close_subscription(
CASSETTE_BOOTSTRAP_SUB_ID
)
logger.info(
"satmachineadmin: no active machines; closed cassette "
"bootstrap subscription"
)
inbound = NostrRouter.received_subscription_events.get(
CASSETTE_BOOTSTRAP_SUB_ID
)
if inbound:
while inbound:
event_message = inbound.pop(0)
try:
await _handle_cassette_state_event(
event_message, get_machine_by_atm_pubkey_hex,
apply_bootstrap_state,
)
except Exception as exc: # noqa: BLE001 — log + skip
logger.warning(
f"satmachineadmin: cassette state event handler "
f"failed (skipping): {exc}"
)
return filter_key
async def _handle_cassette_state_event(
event_message,
get_machine_by_atm_pubkey_hex,
apply_bootstrap_state,
) -> None:
"""Verify signature, route to the right operator's privkey, decrypt,
parse, upsert. Each step that fails is logged at WARNING (not ERROR)
so a noisy attacker can't fill the logs — this is data on a public
relay, garbage is expected."""
import json as _json
from datetime import datetime as _datetime
from datetime import timezone as _timezone
from lnbits.core.crud.users import get_account
from lnbits.utils.nostr import verify_event
from .cassette_transport import decrypt_and_parse_state_event
event_raw = event_message.event
if isinstance(event_raw, str):
event_obj = _json.loads(event_raw)
elif isinstance(event_raw, dict):
event_obj = event_raw
else:
logger.warning(
f"satmachineadmin: cassette event of unexpected type "
f"{type(event_raw).__name__}; skipping"
)
return
if not verify_event(event_obj):
logger.warning(
f"satmachineadmin: cassette state event sig verify failed "
f"(id={event_obj.get('id', '?')[:12]}...)"
)
return
sender_pubkey = event_obj.get("pubkey", "")
machine = await get_machine_by_atm_pubkey_hex(sender_pubkey)
if machine is None:
# Unknown sender — could be relay noise or an attacker. Don't
# treat as our problem.
logger.warning(
f"satmachineadmin: cassette state event from unknown ATM "
f"pubkey {sender_pubkey[:12]}... (not in dca_machines); "
"skipping"
)
return
account = await get_account(machine.operator_user_id)
if account is None or not account.prvkey:
logger.warning(
f"satmachineadmin: operator {machine.operator_user_id[:8]}... "
"has no privkey on file; can't decrypt cassette state event for "
f"machine {machine.id}. Onboard via Nostr-login."
)
return
payload = decrypt_and_parse_state_event(event_obj, account.prvkey)
event_id = event_obj.get("id", "")
created_at_unix = event_obj.get("created_at", 0)
event_created_at = _datetime.fromtimestamp(
int(created_at_unix), tz=_timezone.utc
)
applied = await apply_bootstrap_state(
machine.id, event_id, event_created_at, payload
)
if applied:
logger.info(
f"satmachineadmin: applied bootstrap state event {event_id[:12]}... "
f"to machine {machine.id} ({len(payload.denominations)} cassettes)"
)
else:
# Replay: event_id already on file. Normal on relay reconnect.
logger.debug(
f"satmachineadmin: cassette state event {event_id[:12]}... "
f"already applied to machine {machine.id} (replay no-op)"
)