diff --git a/__init__.py b/__init__.py index 162f3dc..2d0ebcf 100644 --- a/__init__.py +++ b/__init__.py @@ -5,7 +5,7 @@ from lnbits.tasks import create_permanent_unique_task from loguru import logger from .crud import db -from .tasks import wait_for_paid_invoices +from .tasks import wait_for_cassette_state_events, wait_for_paid_invoices from .views import satmachineadmin_generic_router from .views_api import satmachineadmin_api_router @@ -42,6 +42,14 @@ def satmachineadmin_start(): "ext_satmachineadmin", wait_for_paid_invoices ) scheduled_tasks.append(invoice_task) + # Cassette bootstrap consumer (#29 v1) — subscribes to + # bitspire-cassettes-state events from each active ATM and upserts + # cassette_configs on receipt. Soft-fails if nostrclient isn't + # installed (logs + backs off, never crashes). + cassette_task = create_permanent_unique_task( + "ext_satmachineadmin_cassette_bootstrap", wait_for_cassette_state_events + ) + scheduled_tasks.append(cassette_task) __all__ = [ diff --git a/crud.py b/crud.py index fc87070..9da14c8 100644 --- a/crud.py +++ b/crud.py @@ -144,6 +144,45 @@ async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: ) +async def list_all_active_machines() -> List[Machine]: + """Used by the cassette bootstrap consumer task to build a single + cross-operator subscription filter. Each event's pubkey routes to + the right operator via get_machine_by_atm_pubkey_hex + the machine's + operator_user_id. + """ + return await db.fetchall( + """ + SELECT * FROM satoshimachine.dca_machines + WHERE is_active = true + ORDER BY created_at DESC + """, + {}, + Machine, + ) + + +async def get_machine_by_atm_pubkey_hex(atm_pubkey_hex: str) -> Optional[Machine]: + """Look up an active machine by its ATM pubkey, accepting hex or bech32 + in machine_npub. Used by the cassette bootstrap consumer to route an + incoming state event to the right machine row (and therefore operator + privkey for decryption). + + O(N) over active machines — fine for small fleets. If fleet sizes + grow, normalise machine_npub-at-write to hex and add an index. + """ + from lnbits.utils.nostr import normalize_public_key + + target = atm_pubkey_hex.lower() + machines = await list_all_active_machines() + for m in machines: + try: + if normalize_public_key(m.machine_npub).lower() == target: + return m + except (ValueError, AssertionError): + continue + return None + + async def update_machine(machine_id: str, data: UpdateMachineData) -> Optional[Machine]: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: diff --git a/tasks.py b/tasks.py index 7d77f0e..8be382f 100644 --- a/tasks.py +++ b/tasks.py @@ -25,6 +25,7 @@ # sat-amount invariants (range/sum). import asyncio +from typing import Optional from lnbits.core.models import Payment from lnbits.tasks import register_invoice_listener @@ -237,3 +238,226 @@ async def _record_rejected( f"(machine={machine.machine_npub[:12]}..., " f"payment_hash={payment.payment_hash[:12]}...): {exc}" ) + + +# ============================================================================= +# Cassette bootstrap consumer (#29 v1) +# ============================================================================= +# Subscribes to kind-30078 bitspire-cassettes-state: events +# published by each active machine's ATM on first boot (lamassu-next#56's +# bootstrap publish path). Decrypts the NIP-44 v2 content with the operator's +# privkey + ATM sender pubkey, validates as PublishCassettesPayload, and +# upserts cassette_configs via apply_bootstrap_state. +# +# v1 = one-shot per machine (ATM's meta.bootstrapPublishedAt makes the +# publish idempotent on ATM-side restart; satmachineadmin's apply_bootstrap_ +# state dedups on state_event_id for relay re-delivery). +# +# v2 (separate issue) = continuous reverse-channel consumer with a +# last_state_created_at watermark for reconciliation UI. +# +# Implementation: polls nostrclient.router.NostrRouter.received_subscription_ +# events keyed by our subscription_id. nostrclient's NostrRouter design is +# per-WebSocket-client; the singleton dict it drains into is the only +# server-side hook to consume events without standing up an in-process +# websocket. The relay manager is the same singleton publish_to_atm uses, +# so add_subscription registers a filter against the same relay pool. + +CASSETTE_BOOTSTRAP_SUB_ID = "satmachineadmin-cassette-bootstrap" +_CASSETTE_POLL_INTERVAL_S = 2.0 +_CASSETTE_BACKOFF_S = 30.0 # when nostrclient isn't installed yet + + +async def wait_for_cassette_state_events() -> None: + """Long-running task: subscribe to bitspire-cassettes-state events from + every active machine's ATM and upsert cassette_configs on receipt. + + Pattern mirrors wait_for_paid_invoices (try/except wraps each event, + never lets the loop die). Re-derives the subscription filter on each + tick from the current active-machines list — newly-added machines + start receiving bootstrap events without an LNbits restart. + + Soft-fail surfaces: + - nostrclient not installed → log + sleep _CASSETTE_BACKOFF_S + between retries (operator may install it later) + - inbound event fails sig-verify / decrypt / parse → log + skip + the event, continue the loop + - apply_bootstrap_state errors → log + skip + """ + logger.info( + "satmachineadmin v2: cassette bootstrap consumer starting " + f"(sub_id={CASSETTE_BOOTSTRAP_SUB_ID})" + ) + current_filter_key: Optional[str] = None + while True: + try: + current_filter_key = await _cassette_consumer_tick(current_filter_key) + await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S) + except _NostrclientUnavailable: + logger.warning( + "satmachineadmin: nostrclient extension not installed; " + f"cassette bootstrap consumer sleeping {_CASSETTE_BACKOFF_S}s " + "before retry. Install + activate nostrclient on this " + "LNbits instance." + ) + current_filter_key = None + await asyncio.sleep(_CASSETTE_BACKOFF_S) + except Exception as exc: # listener must never die + logger.error( + f"satmachineadmin: cassette consumer loop error (continuing): " + f"{exc}" + ) + await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S) + + +class _NostrclientUnavailable(Exception): + """Internal sentinel — nostrclient extension import failed. Caller + sleeps a backoff then retries; the operator may install nostrclient + at any time.""" + + +async def _cassette_consumer_tick(current_filter_key: Optional[str]) -> str: + """Single iteration of the bootstrap-consumer loop. Returns the filter + key used this tick so the caller can detect filter-set changes. + + Raises _NostrclientUnavailable if nostrclient can't be imported (the + outer loop backs off + retries). + """ + try: + from nostrclient.router import ( # type: ignore[import-not-found] + NostrRouter, + nostr_client, + ) + except ImportError as exc: + raise _NostrclientUnavailable() from exc + + from .cassette_transport import build_state_d_tags_for_machines + from .crud import ( + apply_bootstrap_state, + get_machine_by_atm_pubkey_hex, + list_all_active_machines, + ) + + machines = await list_all_active_machines() + d_tags = build_state_d_tags_for_machines(machines) + filter_key = ",".join(sorted(d_tags)) + + if filter_key != current_filter_key: + if d_tags: + filters = [{"kinds": [30078], "#d": d_tags}] + nostr_client.relay_manager.add_subscription( + CASSETTE_BOOTSTRAP_SUB_ID, filters + ) + logger.info( + "satmachineadmin: (re)registered cassette bootstrap " + f"subscription with {len(d_tags)} d-tag(s)" + ) + else: + nostr_client.relay_manager.close_subscription( + CASSETTE_BOOTSTRAP_SUB_ID + ) + logger.info( + "satmachineadmin: no active machines; closed cassette " + "bootstrap subscription" + ) + + inbound = NostrRouter.received_subscription_events.get( + CASSETTE_BOOTSTRAP_SUB_ID + ) + if inbound: + while inbound: + event_message = inbound.pop(0) + try: + await _handle_cassette_state_event( + event_message, get_machine_by_atm_pubkey_hex, + apply_bootstrap_state, + ) + except Exception as exc: # noqa: BLE001 — log + skip + logger.warning( + f"satmachineadmin: cassette state event handler " + f"failed (skipping): {exc}" + ) + + return filter_key + + +async def _handle_cassette_state_event( + event_message, + get_machine_by_atm_pubkey_hex, + apply_bootstrap_state, +) -> None: + """Verify signature, route to the right operator's privkey, decrypt, + parse, upsert. Each step that fails is logged at WARNING (not ERROR) + so a noisy attacker can't fill the logs — this is data on a public + relay, garbage is expected.""" + import json as _json + from datetime import datetime as _datetime + from datetime import timezone as _timezone + + from lnbits.core.crud.users import get_account + from lnbits.utils.nostr import verify_event + + from .cassette_transport import decrypt_and_parse_state_event + + event_raw = event_message.event + if isinstance(event_raw, str): + event_obj = _json.loads(event_raw) + elif isinstance(event_raw, dict): + event_obj = event_raw + else: + logger.warning( + f"satmachineadmin: cassette event of unexpected type " + f"{type(event_raw).__name__}; skipping" + ) + return + + if not verify_event(event_obj): + logger.warning( + f"satmachineadmin: cassette state event sig verify failed " + f"(id={event_obj.get('id', '?')[:12]}...)" + ) + return + + sender_pubkey = event_obj.get("pubkey", "") + machine = await get_machine_by_atm_pubkey_hex(sender_pubkey) + if machine is None: + # Unknown sender — could be relay noise or an attacker. Don't + # treat as our problem. + logger.warning( + f"satmachineadmin: cassette state event from unknown ATM " + f"pubkey {sender_pubkey[:12]}... (not in dca_machines); " + "skipping" + ) + return + + account = await get_account(machine.operator_user_id) + if account is None or not account.prvkey: + logger.warning( + f"satmachineadmin: operator {machine.operator_user_id[:8]}... " + "has no privkey on file; can't decrypt cassette state event for " + f"machine {machine.id}. Onboard via Nostr-login." + ) + return + + payload = decrypt_and_parse_state_event(event_obj, account.prvkey) + + event_id = event_obj.get("id", "") + created_at_unix = event_obj.get("created_at", 0) + event_created_at = _datetime.fromtimestamp( + int(created_at_unix), tz=_timezone.utc + ) + + applied = await apply_bootstrap_state( + machine.id, event_id, event_created_at, payload + ) + if applied: + logger.info( + f"satmachineadmin: applied bootstrap state event {event_id[:12]}... " + f"to machine {machine.id} ({len(payload.denominations)} cassettes)" + ) + else: + # Replay: event_id already on file. Normal on relay reconnect. + logger.debug( + f"satmachineadmin: cassette state event {event_id[:12]}... " + f"already applied to machine {machine.id} (replay no-op)" + ) diff --git a/tests/test_cassette_state_consumer.py b/tests/test_cassette_state_consumer.py new file mode 100644 index 0000000..4cd228e --- /dev/null +++ b/tests/test_cassette_state_consumer.py @@ -0,0 +1,263 @@ +""" +Tests for the cassette bootstrap consumer (`tasks._handle_cassette_state_event` +and `cassette_transport.decrypt_and_parse_state_event`). + +Covers the consumer-side validation path end-to-end without standing up +the full nostrclient relay subscription: + - happy path: signed event from a known ATM → decrypt → parse → returns + a PublishCassettesPayload + - sig-verify failure path (covered at the transport-decrypt level + the + handler-level rejection test) + - tampered ciphertext → CassetteEventDecodeError + - unknown sender pubkey → CassetteEventDecodeError (well-formed but + decrypt fails because conversation key is wrong) + - malformed pubkey → CassetteEventDecodeError + +Full handler tests (the dispatch through verify_event → get_machine_by_atm_ +pubkey_hex → apply_bootstrap_state) need a live LNbits DB; they're +smoke-tested manually via the dev container per the project's existing +convention (see test_deposit_currency.py). +""" + +import json + +import coincurve +import pytest + +from ..cassette_transport import ( + CassetteEventDecodeError, + _atm_hex_pubkey, + _config_d_tag, + _state_d_tag, + build_state_d_tags_for_machines, + decrypt_and_parse_state_event, +) +from ..models import Machine, PublishCassettesPayload +from ..nip44 import encrypt_with_conversation_key, get_conversation_key + + +# Canonical keys (integer 1 + integer 2, the paulmillr/nip44 reference pair). +_OP_SEC = "00" * 31 + "01" +_ATM_SEC = "00" * 31 + "02" + + +def _pub_hex(sec_hex: str) -> str: + return ( + coincurve.PrivateKey(bytes.fromhex(sec_hex)) + .public_key.format(compressed=True)[1:] + .hex() + ) + + +_OP_PUB = _pub_hex(_OP_SEC) +_ATM_PUB = _pub_hex(_ATM_SEC) + + +def _make_state_event( + payload: PublishCassettesPayload, + *, + atm_sec: str = _ATM_SEC, + op_pub: str = _OP_PUB, + atm_pub: str = _ATM_PUB, + event_id: str = "fake-event-id", + created_at: int = 1234567890, +) -> dict: + """Build a state event the way bitspire's ATM publisher would. + Skips the actual sig-verify step (the handler-level test does + that against verify_event); the transport-level decrypt path + doesn't care about sig validity, only about the conversation key.""" + plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":")) + ck = get_conversation_key(atm_sec, op_pub) + content = encrypt_with_conversation_key(plaintext, ck) + return { + "kind": 30078, + "pubkey": atm_pub, + "content": content, + "tags": [ + ["d", f"bitspire-cassettes-state:{atm_pub}"], + ["p", op_pub], + ], + "created_at": created_at, + "id": event_id, + } + + +# ============================================================================= +# decrypt_and_parse_state_event — transport-decrypt path +# ============================================================================= + + +class TestDecryptAndParseStateEvent: + """The function the consumer task calls per inbound event. Verifies + NIP-44 v2 decrypt + JSON-parse + PublishCassettesPayload validation. + Sig verification is the caller's responsibility (the handler does it + before reaching here).""" + + def test_happy_path(self): + payload = PublishCassettesPayload( + denominations={ + "20": {"position": 1, "count": 49}, + "50": {"position": 2, "count": 100}, + } + ) + event = _make_state_event(payload) + recovered = decrypt_and_parse_state_event(event, _OP_SEC) + assert sorted(recovered.denominations.keys()) == [20, 50] + assert recovered.denominations[20].position == 1 + assert recovered.denominations[20].count == 49 + assert recovered.denominations[50].count == 100 + + def test_tampered_content_rejected(self): + payload = PublishCassettesPayload( + denominations={"20": {"position": 1, "count": 49}} + ) + event = _make_state_event(payload) + # Flip a base64 character — corrupts the ciphertext or MAC + # depending on where the flip lands. + event["content"] = event["content"][:-2] + "AA" + with pytest.raises(CassetteEventDecodeError): + decrypt_and_parse_state_event(event, _OP_SEC) + + def test_wrong_operator_privkey_rejected(self): + """The conversation key derives from operator-privkey + sender-pubkey. + A wrong privkey gives a different conversation key, which yields a + different hmac_key, so MAC verification inside NIP-44 v2 decrypt + fails — surfaced as CassetteEventDecodeError.""" + payload = PublishCassettesPayload( + denominations={"20": {"position": 1, "count": 49}} + ) + event = _make_state_event(payload) + wrong_sec = "00" * 31 + "03" + with pytest.raises(CassetteEventDecodeError): + decrypt_and_parse_state_event(event, wrong_sec) + + def test_malformed_sender_pubkey_rejected(self): + payload = PublishCassettesPayload( + denominations={"20": {"position": 1, "count": 49}} + ) + event = _make_state_event(payload) + event["pubkey"] = "not-a-real-pubkey" + with pytest.raises(CassetteEventDecodeError): + decrypt_and_parse_state_event(event, _OP_SEC) + + def test_missing_content_rejected(self): + event = _make_state_event( + PublishCassettesPayload(denominations={"20": {"position": 1, "count": 49}}) + ) + del event["content"] + with pytest.raises(CassetteEventDecodeError): + decrypt_and_parse_state_event(event, _OP_SEC) + + def test_missing_pubkey_rejected(self): + event = _make_state_event( + PublishCassettesPayload(denominations={"20": {"position": 1, "count": 49}}) + ) + del event["pubkey"] + with pytest.raises(CassetteEventDecodeError): + decrypt_and_parse_state_event(event, _OP_SEC) + + def test_decrypted_garbage_json_rejected(self): + """If the plaintext decrypts but isn't JSON, we surface an error + rather than crashing the consumer loop.""" + # Encrypt something that isn't JSON + ck = get_conversation_key(_ATM_SEC, _OP_PUB) + bad_plaintext_event = { + "kind": 30078, + "pubkey": _ATM_PUB, + "content": encrypt_with_conversation_key( + "definitely not json", ck + ), + "tags": [], + "created_at": 0, + "id": "x", + } + with pytest.raises(CassetteEventDecodeError) as exc: + decrypt_and_parse_state_event(bad_plaintext_event, _OP_SEC) + assert "JSON" in str(exc.value) or "didn't validate" in str(exc.value) + + def test_decrypted_json_with_wrong_shape_rejected(self): + """Well-formed JSON but missing the 'denominations' field is + a payload-shape failure, not a decrypt failure.""" + ck = get_conversation_key(_ATM_SEC, _OP_PUB) + bad_shape_event = { + "kind": 30078, + "pubkey": _ATM_PUB, + "content": encrypt_with_conversation_key( + '{"wrong_field": 42}', ck + ), + "tags": [], + "created_at": 0, + "id": "x", + } + with pytest.raises(CassetteEventDecodeError) as exc: + decrypt_and_parse_state_event(bad_shape_event, _OP_SEC) + assert "didn't validate" in str(exc.value) + + +# ============================================================================= +# d-tag construction — _atm_hex_pubkey, _config_d_tag, _state_d_tag, helper +# ============================================================================= + + +class TestDTagConstruction: + """The `` placeholder in d-tags = ATM hex pubkey (load-bearing per + coord-log 11:50Z). These tests pin the canonical substitution so a + refactor can't silently break wire compatibility.""" + + def _machine(self, npub: str, id_: str = "m1") -> Machine: + from datetime import datetime, timezone + + now = datetime.now(timezone.utc) + return Machine( + id=id_, + operator_user_id="op1", + machine_npub=npub, + wallet_id="w1", + name=None, + location=None, + fiat_code="EUR", + is_active=True, + created_at=now, + updated_at=now, + ) + + def test_atm_hex_pubkey_from_hex_storage(self): + assert _atm_hex_pubkey(self._machine(_ATM_PUB)) == _ATM_PUB + + def test_atm_hex_pubkey_lowercases_uppercase_hex(self): + assert _atm_hex_pubkey(self._machine(_ATM_PUB.upper())) == _ATM_PUB + + def test_atm_hex_pubkey_canonicalises_bech32_to_hex(self): + """Operator may have entered npub1... in the UI; canonical d-tag + substitution is always the hex form.""" + from lnbits.utils.nostr import hex_to_npub + + npub_bech32 = hex_to_npub(_ATM_PUB) + assert _atm_hex_pubkey(self._machine(npub_bech32)) == _ATM_PUB + + def test_config_d_tag_uses_hex_pubkey_not_id(self): + """REGRESSION GUARD: d-tag must contain the ATM hex pubkey, NOT + the internal machine UUID. If this test fails, bitspire's ATM + won't see our publishes.""" + m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey") + d_tag = _config_d_tag(_atm_hex_pubkey(m)) + assert d_tag == f"bitspire-cassettes:{_ATM_PUB}" + assert "some-uuid" not in d_tag + + def test_state_d_tag_uses_hex_pubkey_not_id(self): + m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey") + d_tag = _state_d_tag(_atm_hex_pubkey(m)) + assert d_tag == f"bitspire-cassettes-state:{_ATM_PUB}" + assert "some-uuid" not in d_tag + + def test_build_state_d_tags_for_machines(self): + atm2_pub = _pub_hex("00" * 31 + "03") + machines = [ + self._machine(_ATM_PUB, id_="m1"), + self._machine(atm2_pub, id_="m2"), + ] + tags = build_state_d_tags_for_machines(machines) + assert tags == [ + f"bitspire-cassettes-state:{_ATM_PUB}", + f"bitspire-cassettes-state:{atm2_pub}", + ]