From 13684e7134a7e7956f3c990acdb55a6e6965145d Mon Sep 17 00:00:00 2001 From: Padreug Date: Sat, 30 May 2026 18:00:11 +0200 Subject: [PATCH 1/3] feat(v2): m007 cassette_configs schema + Pydantic models (#29 v1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the operator-side schema for per-machine ATM cassette inventory (aiolabs/satmachineadmin#29). Schema choice mirrors the ATM-side denomination-as-key invariant audited at coord-log 2026-05-30T06:40Z across bitspire/atm-tui/src/db.zig:31, lamassu-next state-store.ts:54, and hal-service.ts:116/189 — every ATM layer keys on denomination, so the operator-side PK is (machine_id, denomination) to make duplicate-denomination payloads impossible at the schema boundary. Reserved nullable columns (state_count, state_at, state_event_id) hold the latest bitspire-cassettes-state: event the ATM publishes. v1 populates them on bootstrap-event receipt; v1 UI doesn't render reconciliation. v2 reconciliation UI consumes them without a migration. Pydantic models in this commit: - CassetteConfig — read model for a stored row - UpsertCassetteConfigData — operator-edit form (count and/or position) - CassettePayloadRow — one denomination's wire-format values - PublishCassettesPayload — the full kind-30078 content payload, bidirectional (operator → ATM and ATM → operator share the shape). Validates int-coerced denomination keys, positive ints, no duplicate positions, and exposes to_wire_dict() that re-stringifies keys for JSON compatibility. CRUD + transport + API + UI land in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) --- migrations.py | 37 +++++++++++++++ models.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) diff --git a/migrations.py b/migrations.py index 38b29d0..807292f 100644 --- a/migrations.py +++ b/migrations.py @@ -538,3 +538,40 @@ async def m005_lock_deposit_currency_to_machine_fiat_code(db): AND m.fiat_code != d.currency ) """) + + +async def m007_add_cassette_configs(db): + """Add cassette_configs table for operator-driven ATM cassette inventory. + + Tracks per-machine cassette state (denomination, count, position) editable + via the satmachineadmin dashboard and published to the ATM as encrypted + kind-30078 events. See aiolabs/satmachineadmin#29 + lamassu-next#56. + + Schema choice: PK (machine_id, denomination) mirrors the ATM-side + denomination-as-key invariant in + bitspire/atm-tui/src/db.zig:31 and + lamassu-next/apps/machine/electron/state-store.ts:54 + (the cassettes table PK is denomination; HAL inventory map keys on + denomination; dispense lookup is cassetteDenominations.indexOf — + duplicates collapse silently). Position is operator-assignable display + order, not the addressable unit. + + Reserved nullable columns (state_count, state_at, state_event_id) hold + the latest bitspire-cassettes-state: event the ATM + publishes (one-shot bootstrap in v1; continuous in v2). v1 UI doesn't + render them; v2 reconciliation UI consumes them without a migration. + """ + await db.execute(f""" + CREATE TABLE IF NOT EXISTS satoshimachine.cassette_configs ( + machine_id TEXT NOT NULL, + denomination INTEGER NOT NULL, + count INTEGER NOT NULL, + position INTEGER NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, + updated_by TEXT, + state_count INTEGER, + state_at TIMESTAMP, + state_event_id TEXT, + PRIMARY KEY (machine_id, denomination) + ); + """) diff --git a/models.py b/models.py index d683cac..5f88067 100644 --- a/models.py +++ b/models.py @@ -546,3 +546,127 @@ class SettleBalanceData(BaseModel): if v <= 0: raise ValueError("amount_fiat must be > 0 if specified") return round(float(v), 2) + + +# ============================================================================= +# Cassette configs — operator-driven ATM cassette inventory (#29). +# ============================================================================= +# Schema is denomination-keyed per the locked design (#29 body + the +# 06:40Z coord-log audit): every ATM-side layer below the wire keys on +# denomination (state-store.ts:54, hal-service.ts:116/189). The +# satmachineadmin schema mirrors this so the operator UI can't author a +# duplicate-denomination payload that the ATM would silently collapse. +# +# Position is operator-assignable display order (and used by the ATM as +# the HAL slot-index assignment), not the addressable unit. +# +# state_count / state_at / state_event_id are reserved nullable from day 1 +# for the v2 reverse-channel reconciliation consumer (bitspire-cassettes- +# state:). v1 populates them on bootstrap-event receipt +# but the UI doesn't render reconciliation. + + +class CassetteConfig(BaseModel): + machine_id: str + denomination: int + count: int + position: int + updated_at: datetime + updated_by: Optional[str] + state_count: Optional[int] + state_at: Optional[datetime] + state_event_id: Optional[str] + + +class UpsertCassetteConfigData(BaseModel): + """Operator edits a single cassette row's count or position from the + dashboard. Both fields optional; pass only those changed.""" + + count: Optional[int] = None + position: Optional[int] = None + + @validator("count") + def count_non_negative(cls, v): + if v is None: + return v + if v < 0: + raise ValueError("count must be >= 0") + return v + + @validator("position") + def position_positive(cls, v): + if v is None: + return v + if v <= 0: + raise ValueError("position must be > 0") + return v + + +class CassettePayloadRow(BaseModel): + """One denomination's payload values in the wire-format + `{"denominations": {"": {"position", "count"}}}`.""" + + position: int + count: int + + @validator("position") + def position_positive(cls, v): + if v <= 0: + raise ValueError("position must be > 0") + return v + + @validator("count") + def count_non_negative(cls, v): + if v < 0: + raise ValueError("count must be >= 0") + return v + + +class PublishCassettesPayload(BaseModel): + """The decrypted JSON content of a kind-30078 cassette event, both + directions: + - operator → ATM (d-tag `bitspire-cassettes:`) + - ATM → operator (d-tag `bitspire-cassettes-state:`) + + Wire shape: `{"denominations": {"": {"position", "count"}}}`. + JSON object keys are always strings; the validator coerces back to + int on parse. The denomination key set MUST match what the receiver + already has (no add / no remove from this payload). + """ + + denominations: dict[int, CassettePayloadRow] + + @validator("denominations", pre=True) + def coerce_string_keys_to_int(cls, v): + if not isinstance(v, dict): + raise ValueError("denominations must be a dict") + out = {} + for k, val in v.items(): + try: + key_int = int(k) + except (TypeError, ValueError) as exc: + raise ValueError( + f"denomination key {k!r} is not an int" + ) from exc + if key_int <= 0: + raise ValueError(f"denomination must be > 0 (got {key_int})") + out[key_int] = val + return out + + @validator("denominations") + def no_duplicate_positions(cls, v): + positions = [row.position for row in v.values()] + if len(set(positions)) != len(positions): + raise ValueError("duplicate position values in payload") + return v + + def to_wire_dict(self) -> dict: + """Serialise back to the wire format with string keys for JSON + object compatibility. Used by the publisher to build the kind-30078 + event content before NIP-44 v2 encryption.""" + return { + "denominations": { + str(denom): {"position": row.position, "count": row.count} + for denom, row in self.denominations.items() + } + } From 9b8008db1ff74e01ed4c7cf3b67bc5afc5059f43 Mon Sep 17 00:00:00 2001 From: Padreug Date: Sat, 30 May 2026 18:03:52 +0200 Subject: [PATCH 2/3] feat(v2): cassette_configs CRUD + unit tests (#29 v1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire up the cassette_configs storage layer: - get_cassette_config / list_cassette_configs_for_machine — reads - update_cassette_config — operator UI per-row edit (count + position). Refuses to create new rows; the denomination set is hardware-determined per #29 row lifecycle. - apply_bootstrap_state — consumer-side upsert from an ATM-published kind-30078 bitspire-cassettes-state event. Populates both the operator-believed columns and the v2 reverse-channel columns (state_count, state_at, state_event_id) in one transaction. Returns False on relay re-delivery (any existing row's state_event_id matches the incoming event_id). - _should_apply_bootstrap_state — pure-function dedup gate extracted from apply_bootstrap_state so the relay-re-delivery decision is unit-testable without a database round-trip. 23 new pure-function/model tests in tests/test_cassette_configs.py covering the wire-shape validators (denomination key coercion, no-duplicate- positions, int ranges, wire-dict round-trip) and the dedup-helper logic. DB-touching CRUD follows the existing project convention (see test_deposit_currency.py rationale): smoke-tested manually via the dev container, integration tests deferred. Total: 98 passed, 1 pre-existing async-plugin failure unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- crud.py | 152 +++++++++++++++++++++++ tests/test_cassette_configs.py | 220 +++++++++++++++++++++++++++++++++ 2 files changed, 372 insertions(+) create mode 100644 tests/test_cassette_configs.py diff --git a/crud.py b/crud.py index 51b07a4..fc87070 100644 --- a/crud.py +++ b/crud.py @@ -12,6 +12,7 @@ from lnbits.db import Database from lnbits.helpers import urlsafe_short_hash from .models import ( + CassetteConfig, ClientBalanceSummary, CommissionSplit, CommissionSplitLeg, @@ -26,6 +27,7 @@ from .models import ( DcaPayment, DcaSettlement, Machine, + PublishCassettesPayload, SuperConfig, TelemetrySnapshot, UpdateDcaClientData, @@ -33,6 +35,7 @@ from .models import ( UpdateDepositStatusData, UpdateMachineData, UpdateSuperConfigData, + UpsertCassetteConfigData, UpsertDcaLpData, ) @@ -1334,3 +1337,152 @@ async def upsert_fleet_snapshot( {"mid": machine_id, "json": telemetry_json, "now": now}, ) return await get_telemetry(machine_id) + + +# ============================================================================= +# Cassette configs — operator-driven ATM cassette inventory (#29). +# ============================================================================= +# Row lifecycle per #29: +# - First population for a (machine_id, denomination) pair → apply_bootstrap_state +# (consumer reading the ATM's one-shot bitspire-cassettes-state event) +# - Operator edit of count or position → update_cassette_config (refuses to +# create new rows; the denomination set is hardware-determined) +# - Row creation/deletion for a new denomination → admin only, via ATM +# re-provisioning + new bootstrap event (not exposed in v1 here) + + +def _should_apply_bootstrap_state( + existing_state_event_id: Optional[str], incoming_event_id: str +) -> bool: + """Pure-function dedup gate for apply_bootstrap_state. + + Returns False if any existing row for this machine already references + the incoming event_id (relay re-delivery after restart). True otherwise. + + Extracted as a pure function so the dedup decision is unit-testable + without a database round-trip. The actual idempotency check in + apply_bootstrap_state fetches one existing row and passes its + state_event_id here. + """ + return existing_state_event_id != incoming_event_id + + +async def get_cassette_config( + machine_id: str, denomination: int +) -> Optional[CassetteConfig]: + return await db.fetchone( + "SELECT * FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid AND denomination = :denom", + {"mid": machine_id, "denom": denomination}, + CassetteConfig, + ) + + +async def list_cassette_configs_for_machine( + machine_id: str, +) -> List[CassetteConfig]: + return await db.fetchall( + "SELECT * FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid ORDER BY position, denomination", + {"mid": machine_id}, + CassetteConfig, + ) + + +async def update_cassette_config( + machine_id: str, + denomination: int, + data: UpsertCassetteConfigData, + *, + updated_by: Optional[str] = None, +) -> Optional[CassetteConfig]: + """Operator-driven row update: change count and/or position for a single + cassette. Refuses to create new rows — those only land via + apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row + lifecycle: hardware-determined denomination set, not operator-creatable). + Returns None if the (machine_id, denomination) row doesn't exist. + """ + existing = await get_cassette_config(machine_id, denomination) + if existing is None: + return None + update_data: dict = {k: v for k, v in data.dict().items() if v is not None} + if not update_data: + return existing + update_data["updated_at"] = datetime.now() + update_data["updated_by"] = updated_by + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) + update_data["mid"] = machine_id + update_data["denom"] = denomination + await db.execute( + f"UPDATE satoshimachine.cassette_configs SET {set_clause} " + "WHERE machine_id = :mid AND denomination = :denom", + update_data, + ) + return await get_cassette_config(machine_id, denomination) + + +async def apply_bootstrap_state( + machine_id: str, + event_id: str, + event_created_at: datetime, + payload: PublishCassettesPayload, +) -> bool: + """Consume an ATM-published kind-30078 bitspire-cassettes-state: event + and upsert one cassette_configs row per denomination in the payload. + + Returns True if the upsert ran; False if any existing row for this + machine already references this event_id (idempotent on relay + re-delivery / restart). + + Populates both the operator-believed columns (count, position, + updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel + columns (state_count, state_at, state_event_id) so the operator's + initial view matches the ATM's reported state. v2 reconciliation UI + will diverge them when continuous reverse-channel events land. + """ + existing_first = await db.fetchone( + "SELECT state_event_id FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid LIMIT 1", + {"mid": machine_id}, + ) + existing_event_id: Optional[str] = None + if existing_first is not None: + existing_event_id = ( + existing_first.get("state_event_id") + if isinstance(existing_first, dict) + else getattr(existing_first, "state_event_id", None) + ) + if not _should_apply_bootstrap_state(existing_event_id, event_id): + return False + + now = datetime.now() + for denom, row in payload.denominations.items(): + await db.execute( + """ + INSERT INTO satoshimachine.cassette_configs + (machine_id, denomination, count, position, updated_at, + updated_by, state_count, state_at, state_event_id) + VALUES (:mid, :denom, :count, :pos, :now, :by, + :state_count, :state_at, :event_id) + ON CONFLICT (machine_id, denomination) DO UPDATE SET + count = excluded.count, + position = excluded.position, + updated_at = excluded.updated_at, + updated_by = excluded.updated_by, + state_count = excluded.state_count, + state_at = excluded.state_at, + state_event_id = excluded.state_event_id + """, + { + "mid": machine_id, + "denom": denom, + "count": row.count, + "pos": row.position, + "now": now, + "by": "atm-bootstrap", + "state_count": row.count, + "state_at": event_created_at, + "event_id": event_id, + }, + ) + return True diff --git a/tests/test_cassette_configs.py b/tests/test_cassette_configs.py new file mode 100644 index 0000000..f9d9a4a --- /dev/null +++ b/tests/test_cassette_configs.py @@ -0,0 +1,220 @@ +""" +Tests for the v1 cassette-config layer (aiolabs/satmachineadmin#29). + +Covers the pure pieces that don't need a live DB: + - Pydantic validator behaviour on PublishCassettesPayload + the row / + upsert models (denomination key coercion, integer ranges, no-duplicate- + positions, wire-format round-trip) + - _should_apply_bootstrap_state dedup helper (extracted from + apply_bootstrap_state so the relay-re-delivery decision is testable + without a database round-trip) + +DB-touching tests (apply_bootstrap_state actually upserting, list-by- +machine ordering, etc.) follow the project convention from +test_deposit_currency.py: "Layer 2 is an endpoint-level behaviour better +covered by an integration test against a running LNbits; tracked in #26 +as a follow-up." Smoke-tested manually via the dev container. +""" + +import pytest + +from ..crud import _should_apply_bootstrap_state +from ..models import ( + CassettePayloadRow, + PublishCassettesPayload, + UpsertCassetteConfigData, +) + + +# ============================================================================= +# PublishCassettesPayload — wire-shape validators +# ============================================================================= + + +class TestPublishCassettesPayload: + """The kind-30078 content payload, bidirectional (operator→ATM and + ATM→operator share the shape). String JSON keys must coerce to int; + duplicate positions must reject; per-row int constraints enforced.""" + + def test_happy_path_coerces_string_keys_to_int(self): + p = PublishCassettesPayload( + denominations={ + "20": {"position": 1, "count": 49}, + "50": {"position": 2, "count": 100}, + } + ) + assert set(p.denominations.keys()) == {20, 50} + assert p.denominations[20].position == 1 + assert p.denominations[20].count == 49 + assert p.denominations[50].count == 100 + + def test_wire_dict_round_trip_restringifies_keys(self): + """to_wire_dict() must restringify denomination keys so the + resulting JSON is parseable by clients (including the ATM-side + nostr-tools NIP-44 v2 consumer per the byte-compat cross-test).""" + original = PublishCassettesPayload( + denominations={ + "20": {"position": 1, "count": 49}, + "50": {"position": 2, "count": 100}, + } + ) + wire = original.to_wire_dict() + assert wire == { + "denominations": { + "20": {"position": 1, "count": 49}, + "50": {"position": 2, "count": 100}, + } + } + # And the wire form round-trips back through the parser cleanly. + reparsed = PublishCassettesPayload(**wire) + assert reparsed.denominations == original.denominations + + def test_rejects_non_int_key(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={"abc": {"position": 1, "count": 1}} + ) + assert "is not an int" in str(exc.value) + + def test_rejects_non_positive_denomination(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={"0": {"position": 1, "count": 1}} + ) + assert "denomination must be > 0" in str(exc.value) + + def test_rejects_negative_denomination(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={"-20": {"position": 1, "count": 1}} + ) + assert "denomination must be > 0" in str(exc.value) + + def test_rejects_duplicate_position(self): + """Two cassettes can't occupy the same physical slot. The schema + PK is (machine_id, denomination), so duplicates land via the + payload; reject at the validator layer before the publish path + builds an event the ATM will misinterpret.""" + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={ + "20": {"position": 1, "count": 49}, + "50": {"position": 1, "count": 100}, + } + ) + assert "duplicate position" in str(exc.value) + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + PublishCassettesPayload( + denominations={"20": {"position": 1, "count": -1}} + ) + + def test_rejects_zero_position(self): + with pytest.raises(ValueError): + PublishCassettesPayload( + denominations={"20": {"position": 0, "count": 1}} + ) + + def test_allows_zero_count(self): + """An empty cassette is a legal state — operator must be able to + record `count=0` after a dispatcher pulled the cassette mid-day.""" + p = PublishCassettesPayload( + denominations={"20": {"position": 1, "count": 0}} + ) + assert p.denominations[20].count == 0 + + +# ============================================================================= +# CassettePayloadRow — per-row int constraints (single-row tests) +# ============================================================================= + + +class TestCassettePayloadRow: + def test_happy_path(self): + row = CassettePayloadRow(position=1, count=49) + assert row.position == 1 + assert row.count == 49 + + @pytest.mark.parametrize("bad_position", [0, -1, -100]) + def test_rejects_non_positive_position(self, bad_position): + with pytest.raises(ValueError): + CassettePayloadRow(position=bad_position, count=1) + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + CassettePayloadRow(position=1, count=-1) + + +# ============================================================================= +# UpsertCassetteConfigData — operator-edit form +# ============================================================================= + + +class TestUpsertCassetteConfigData: + """Operator-driven row edit. Both fields optional; same int constraints + as the wire-format row but applied independently per-edit.""" + + def test_partial_update_count_only(self): + d = UpsertCassetteConfigData(count=80) + assert d.count == 80 + assert d.position is None + + def test_partial_update_position_only(self): + d = UpsertCassetteConfigData(position=3) + assert d.position == 3 + assert d.count is None + + def test_empty_update_is_legal(self): + """An empty UpsertCassetteConfigData parses fine; the CRUD short- + circuits a no-op on empty payload (no SQL emitted).""" + d = UpsertCassetteConfigData() + assert d.count is None + assert d.position is None + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + UpsertCassetteConfigData(count=-1) + + def test_rejects_non_positive_position(self): + with pytest.raises(ValueError): + UpsertCassetteConfigData(position=0) + + +# ============================================================================= +# _should_apply_bootstrap_state — relay re-delivery dedup +# ============================================================================= + + +class TestShouldApplyBootstrapState: + """Pure-function dedup gate extracted from apply_bootstrap_state so the + decision is testable without a DB. Logic: apply if-and-only-if the + existing row's state_event_id differs from the incoming event_id. + + In v1 the ATM publishes the bootstrap event exactly once per machine, + so this is sufficient for replay protection. v2 will need a + `last_state_created_at` watermark in addition (per bitspire's + `meta.lastKnownConfigCreatedAt` on the ATM side) — flagged in #29's + v2 forward-look section. + """ + + def test_applies_when_no_existing_row(self): + assert _should_apply_bootstrap_state(None, "new-event-id") is True + + def test_applies_when_existing_event_id_differs(self): + assert ( + _should_apply_bootstrap_state("old-event-id", "new-event-id") is True + ) + + def test_skips_when_existing_event_id_matches(self): + """The same bootstrap event re-delivered after a relay reconnect + or satmachineadmin restart should no-op, not re-upsert the same + rows (which would clobber any operator edits since).""" + assert ( + _should_apply_bootstrap_state("same-event", "same-event") is False + ) + + def test_applies_when_existing_is_empty_string_and_incoming_is_id(self): + """Defensive — a sentinel empty-string existing_state_event_id + shouldn't block a real incoming event from applying.""" + assert _should_apply_bootstrap_state("", "real-event-id") is True From da07bae554976e9044134c1b4e47b3ea41f0062d Mon Sep 17 00:00:00 2001 From: Padreug Date: Sat, 30 May 2026 18:10:30 +0200 Subject: [PATCH 3/3] feat(v2): hand-rolled NIP-44 v2 crypto + reference-vector tests (#29 v1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LNbits ships only NIP-04 (AES-CBC) in lnbits.utils.nostr.encrypt_content, but the locked design at #29 (paired with lamassu-next#56) wires kind-30078 cassette config with NIP-44 v2 content per the privacy-by-default architecture (dcd0874). Hand-rolling rather than adding a Python lib dep per the plan-approval (option A) — keeps the impl auditable inline and avoids pulling in a non-trivial dep tree. nip44.py covers the full envelope: - get_conversation_key — ECDH x-coord + HKDF-extract with salt b"nip44-v2" - encrypt_with_conversation_key / decrypt_with_conversation_key — low-level, nonce-controllable for testing pinned vectors - encrypt_for / decrypt_from — high-level pair-keyed API (the shape app code reaches for) - _pad / _unpad — NIP-44 v2 length-prefixed padding scheme - HMAC-SHA256 verification on nonce || ciphertext, constant-time compare via hmac.compare_digest - Typed errors (Nip44VersionError / Nip44MacError / Nip44LengthError) so callers can distinguish tamper from corruption from spec mismatch Stack: coincurve for ECDH (already a transitive lnbits dep), cryptography for ChaCha20 + HKDF-expand (also already there). No new pyproject deps. 34 tests in tests/test_nip44_v2.py, three layers: 1. Pinned reference vector — conversation_key for (sec=1, sec=2) matches the canonical paulmillr/nip44 published value (c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d). Regression-fails loudly if key derivation drifts. 2. Round-trip + tamper detection — encrypt/decrypt across plaintext lengths (1, 32, 33, 1000, 5000, 65535 bytes); flipped MAC byte; flipped ciphertext byte; flipped nonce byte; wrong recipient privkey; version-byte rejection; padding-formula spot checks. 3. Cross-impl byte-compat — placeholder test_decrypts_bitspire_sample marked @pytest.mark.skip, pending bitspire posting a sample event encrypted on their nostr-tools side to the coord log (per the 2026-05-30T15:55Z entry). Wire that fixture and unskip when posted. Total: 132 passed, 1 skipped (cross-test fixture pending), 1 pre-existing async-plugin failure unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- nip44.py | 271 ++++++++++++++++++++++++++++++++++++++++ tests/test_nip44_v2.py | 272 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 543 insertions(+) create mode 100644 nip44.py create mode 100644 tests/test_nip44_v2.py diff --git a/nip44.py b/nip44.py new file mode 100644 index 0000000..928e9de --- /dev/null +++ b/nip44.py @@ -0,0 +1,271 @@ +""" +NIP-44 v2 — versioned encrypted payloads (https://github.com/nostr-protocol/nips/blob/master/44.md). + +Hand-rolled because lnbits ships only NIP-04 (AES-CBC) in `lnbits.utils.nostr.encrypt_content`, +and the locked design at aiolabs/satmachineadmin#29 (paired with lamassu-next#56) wires +cassette config over kind-30078 with NIP-44 v2 encrypted content. Adding a Python NIP-44 +v2 lib dep was an option per the plan; chose the hand-roll path to stay dep-light and +keep the impl auditable inline. + +Two safety nets keep this honest: + 1. tests/test_nip44_v2.py runs reference vectors + round-trip + tamper-detection. + 2. bitspire posts a sample event encrypted on their nostr-tools side to the coord log; + test_decrypts_bitspire_sample_event_from_coord_log cross-checks our impl against + theirs by decrypting that event with a known privkey. + +Wire format (per spec): + payload = base64( 0x02 || nonce (32B) || ciphertext (var) || mac (32B) ) + +Key derivation: + conversation_key = HKDF-extract(salt=b"nip44-v2", IKM=ecdh_shared_x) # 32B PRK, stable per pair + per-message: + nonce = csprng(32 bytes) + temp = HKDF-expand(PRK=conversation_key, info=nonce, L=76) + chacha_key = temp[0:32] + chacha_nonce = temp[32:44] + hmac_key = temp[44:76] + +Padding scheme (NIP-44 v2 length-prefixed, variable-chunk): + padded = uint16_be(len(plaintext)) || plaintext || zeros + such that 2 + padded_data_len matches a fixed step. +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac as hmac_stdlib +import os +import struct +from typing import Optional + +import coincurve +from cryptography.hazmat.primitives import hashes, hmac +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms +from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand + +# Spec constants. +_VERSION = 0x02 +_HKDF_SALT = b"nip44-v2" +_MIN_PLAINTEXT_LEN = 1 +_MAX_PLAINTEXT_LEN = 65535 +_NONCE_LEN = 32 +_MAC_LEN = 32 +_MIN_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 32) + _MAC_LEN # version + nonce + min padded + mac +_MAX_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 65536) + _MAC_LEN + + +class Nip44Error(Exception): + """Generic NIP-44 v2 envelope error. Subclasses distinguish failure modes.""" + + +class Nip44VersionError(Nip44Error): + """First payload byte was not 0x02. Could be a NIP-04 envelope, a v1 NIP-44, or garbage.""" + + +class Nip44MacError(Nip44Error): + """HMAC verification failed — payload was tampered, wrong conversation key, or corrupted in transit.""" + + +class Nip44LengthError(Nip44Error): + """Plaintext or payload length outside the spec-allowed range, or padding header lies.""" + + +# ============================================================================= +# Padding (NIP-44 v2) +# ============================================================================= + + +def _calc_padded_len(plaintext_len: int) -> int: + """Per NIP-44 v2 padding scheme: + if L <= 32: padded_len = 32 + else: chunk = max(32, next_power_2(L-1) // 8); padded_len = chunk * ((L-1) // chunk + 1) + """ + if plaintext_len <= 32: + return 32 + next_power = 1 << (plaintext_len - 1).bit_length() + chunk = max(32, next_power // 8) + return chunk * ((plaintext_len - 1) // chunk + 1) + + +def _pad(plaintext: bytes) -> bytes: + """Prefix uint16_be length + plaintext + zero-fill to the NIP-44 v2 boundary.""" + n = len(plaintext) + if n < _MIN_PLAINTEXT_LEN or n > _MAX_PLAINTEXT_LEN: + raise Nip44LengthError( + f"plaintext length {n} outside [{_MIN_PLAINTEXT_LEN}, {_MAX_PLAINTEXT_LEN}]" + ) + padded_data_len = _calc_padded_len(n) + zeros = b"\x00" * (padded_data_len - n) + return struct.pack(">H", n) + plaintext + zeros + + +def _unpad(padded: bytes) -> bytes: + """Strip the uint16_be length prefix and zero padding. Validates that the + declared length is consistent with the padded payload (rejects a forged + length prefix that would slice past the buffer or imply a different + padded_data_len than what we received).""" + if len(padded) < 2: + raise Nip44LengthError("padded payload too short to hold length prefix") + declared_len = struct.unpack(">H", padded[0:2])[0] + if declared_len < _MIN_PLAINTEXT_LEN or declared_len > _MAX_PLAINTEXT_LEN: + raise Nip44LengthError(f"declared plaintext length {declared_len} out of range") + if len(padded) != 2 + _calc_padded_len(declared_len): + raise Nip44LengthError( + f"padded buffer length {len(padded)} doesn't match the calculated padding " + f"for declared length {declared_len}" + ) + return padded[2 : 2 + declared_len] + + +# ============================================================================= +# Conversation + message-key derivation +# ============================================================================= + + +def get_conversation_key(privkey_hex: str, pubkey_hex: str) -> bytes: + """Derive the per-pair stable conversation key (PRK) used for all messages + between sender (privkey) and recipient (pubkey). + + Steps: + shared_x = ECDH(privkey, pubkey).x # 32 bytes, x-coordinate + prk = HKDF-extract(salt=b"nip44-v2", IKM=shared_x) + + coincurve's `.multiply(secret).format(compressed=True)[1:]` strips the + leading 0x02/0x03 parity byte to return the raw x-coord — same trick + `lnbits.utils.nostr.encrypt_content` uses for NIP-04. + """ + sender = coincurve.PrivateKey(bytes.fromhex(privkey_hex)) + recipient_pub = coincurve.PublicKey(b"\x02" + bytes.fromhex(pubkey_hex)) + shared_x = recipient_pub.multiply(sender.secret).format(compressed=True)[1:] + # HKDF-extract is HMAC-SHA256(key=salt, msg=ikm) per RFC 5869. + return hmac_stdlib.new(_HKDF_SALT, shared_x, hashlib.sha256).digest() + + +def _derive_message_keys( + conversation_key: bytes, nonce: bytes +) -> tuple[bytes, bytes, bytes]: + """Per-message key expansion: HKDF-expand(PRK=conversation_key, info=nonce, L=76). + Returns (chacha_key 32B, chacha_nonce 12B, hmac_key 32B).""" + hkdf = HKDFExpand(algorithm=hashes.SHA256(), length=76, info=nonce) + okm = hkdf.derive(conversation_key) + return okm[0:32], okm[32:44], okm[44:76] + + +def _hmac_aad(hmac_key: bytes, nonce: bytes, ciphertext: bytes) -> bytes: + """HMAC-SHA256(key=hmac_key, msg=nonce || ciphertext). Returns 32-byte MAC.""" + h = hmac.HMAC(hmac_key, hashes.SHA256()) + h.update(nonce) + h.update(ciphertext) + return h.finalize() + + +def _chacha20(key: bytes, nonce: bytes, data: bytes) -> bytes: + """ChaCha20 stream cipher (symmetric: encrypt == decrypt). Used both directions. + + The `cryptography` lib's `algorithms.ChaCha20(key, nonce)` expects a + 16-byte nonce arg: a 4-byte little-endian initial counter prefix + + 12-byte actual nonce. NIP-44 v2 starts the counter at 0 and uses the + HKDF-derived 12-byte chacha_nonce, so we prefix four zero bytes here. + """ + if len(nonce) != 12: + raise Nip44LengthError( + f"chacha_nonce must be 12 bytes (NIP-44 v2), got {len(nonce)}" + ) + cipher = Cipher(algorithms.ChaCha20(key, b"\x00\x00\x00\x00" + nonce), mode=None) + return cipher.encryptor().update(data) + + +# ============================================================================= +# Public API — low-level (nonce-controllable for testability) +# ============================================================================= + + +def encrypt_with_conversation_key( + plaintext: str, + conversation_key: bytes, + *, + nonce: Optional[bytes] = None, +) -> str: + """Encrypt `plaintext` under a precomputed `conversation_key` (32B PRK). + + `nonce` is 32 random bytes when omitted (the production path). Tests pass + it explicitly to assert pinned reference vectors. + + Returns the base64-encoded payload string suitable as a Nostr event's + `content` field for kind-30078 (and any other kind that uses NIP-44 v2). + """ + if nonce is None: + nonce = os.urandom(_NONCE_LEN) + elif len(nonce) != _NONCE_LEN: + raise Nip44LengthError(f"nonce must be exactly {_NONCE_LEN} bytes") + + padded = _pad(plaintext.encode("utf-8")) + chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce) + ciphertext = _chacha20(chacha_key, chacha_nonce, padded) + mac = _hmac_aad(hmac_key, nonce, ciphertext) + return base64.b64encode( + bytes([_VERSION]) + nonce + ciphertext + mac + ).decode("ascii") + + +def decrypt_with_conversation_key(payload_b64: str, conversation_key: bytes) -> str: + """Decrypt a NIP-44 v2 payload using a precomputed `conversation_key`. + + Raises: + Nip44VersionError — payload's first byte isn't 0x02 + Nip44LengthError — payload too short / too long / declared length lies + Nip44MacError — HMAC verification failed (tamper, wrong key, corruption) + """ + try: + raw = base64.b64decode(payload_b64, validate=True) + except Exception as exc: # noqa: BLE001 — we want any base64 failure surfaced uniformly + raise Nip44LengthError(f"payload is not valid base64: {exc}") from exc + + if len(raw) < _MIN_PAYLOAD_LEN or len(raw) > _MAX_PAYLOAD_LEN: + raise Nip44LengthError(f"payload length {len(raw)} outside valid range") + if raw[0] != _VERSION: + raise Nip44VersionError(f"unsupported NIP-44 version: 0x{raw[0]:02x}") + + nonce = raw[1 : 1 + _NONCE_LEN] + mac_received = raw[-_MAC_LEN:] + ciphertext = raw[1 + _NONCE_LEN : -_MAC_LEN] + + chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce) + mac_expected = _hmac_aad(hmac_key, nonce, ciphertext) + # constant-time compare to avoid timing-leak in MAC verification + if not hmac_stdlib.compare_digest(mac_received, mac_expected): + raise Nip44MacError("HMAC verification failed") + + padded = _chacha20(chacha_key, chacha_nonce, ciphertext) + plaintext_bytes = _unpad(padded) + return plaintext_bytes.decode("utf-8") + + +# ============================================================================= +# Public API — high-level (pair-keyed, the call shape app code reaches for) +# ============================================================================= + + +def encrypt_for( + plaintext: str, + sender_privkey_hex: str, + recipient_pubkey_hex: str, + *, + nonce: Optional[bytes] = None, +) -> str: + """Encrypt `plaintext` from the sender (holding the privkey) to the recipient + (identified by pubkey). The recipient can decrypt with `decrypt_from( + payload, recipient_privkey_hex, sender_pubkey_hex)` — symmetric on the + conversation key, which is the same derived value from either side.""" + conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex) + return encrypt_with_conversation_key(plaintext, conversation_key, nonce=nonce) + + +def decrypt_from( + payload_b64: str, recipient_privkey_hex: str, sender_pubkey_hex: str +) -> str: + """Decrypt a payload that the recipient (holding the privkey) received from + the sender (identified by pubkey).""" + conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex) + return decrypt_with_conversation_key(payload_b64, conversation_key) diff --git a/tests/test_nip44_v2.py b/tests/test_nip44_v2.py new file mode 100644 index 0000000..247c0ac --- /dev/null +++ b/tests/test_nip44_v2.py @@ -0,0 +1,272 @@ +""" +Tests for the hand-rolled NIP-44 v2 implementation in `nip44.py`. + +Three layers of validation, ordered by trust: + 1. Pinned reference vector from the canonical paulmillr/nip44 test suite — + the conversation_key for (sec=1, sec=2) is widely-published as + c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d. If + our get_conversation_key() ever drifts from that value, the impl is + broken at the key-derivation layer. + 2. Round-trip + tamper detection — verifies the encrypt/decrypt loop + under random nonces, catches HMAC + version + padding tampering. + 3. Cross-test (TBD) — bitspire will post one sample event encrypted on + their nostr-tools side to the coord log; test_decrypts_bitspire_sample + wires it as a fixture and asserts byte-compatibility with the + nostr-tools NIP-44 v2 impl. Placeholder stub until the sample lands. +""" + +import base64 + +import coincurve +import pytest + +from ..nip44 import ( + Nip44LengthError, + Nip44MacError, + Nip44VersionError, + _calc_padded_len, + decrypt_from, + decrypt_with_conversation_key, + encrypt_for, + encrypt_with_conversation_key, + get_conversation_key, +) + +# Helper: derive a compressed-x-coord pubkey hex string from a secret hex. +def _pub_hex(sec_hex: str) -> str: + return ( + coincurve.PrivateKey(bytes.fromhex(sec_hex)) + .public_key.format(compressed=True)[1:] + .hex() + ) + + +# Canonical test keys widely used across NIP-44 reference vectors. +_SEC_ONE = "00" * 31 + "01" # integer 1 +_SEC_TWO = "00" * 31 + "02" # integer 2 +_PUB_ONE = _pub_hex(_SEC_ONE) +_PUB_TWO = _pub_hex(_SEC_TWO) + + +# ============================================================================= +# Layer 1 — pinned reference vector (paulmillr/nip44) +# ============================================================================= + + +class TestConversationKeyReferenceVector: + """Pinned reference vector from the canonical NIP-44 v2 test suite + (paulmillr/nip44). If get_conversation_key drifts from this value we + have a key-derivation regression — fail loudly.""" + + REFERENCE_CK_HEX = ( + "c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d" + ) + + def test_sec_one_pub_two(self): + ck = get_conversation_key(_SEC_ONE, _PUB_TWO) + assert ck.hex() == self.REFERENCE_CK_HEX + + def test_sec_two_pub_one_is_symmetric(self): + """Conversation key is symmetric: ck(privA, pubB) == ck(privB, pubA). + Both sides of a NIP-44 conversation derive the identical PRK; this + is what lets the recipient decrypt with their own privkey + the + sender's pubkey.""" + ck_ab = get_conversation_key(_SEC_ONE, _PUB_TWO) + ck_ba = get_conversation_key(_SEC_TWO, _PUB_ONE) + assert ck_ab == ck_ba + + +# ============================================================================= +# Layer 2 — round-trip + tamper detection +# ============================================================================= + + +class TestRoundTrip: + """Encrypt then decrypt under the high-level pair-keyed API.""" + + @pytest.mark.parametrize( + "plaintext", + [ + "a", # 1 byte (minimum) + "hello, nip44 v2", # short + "x" * 32, # exactly the small-payload boundary + "x" * 33, # just over + "y" * 1000, # medium + "z" * 5000, # large + '{"denominations": {"20": {"position": 1, "count": 49}}}', # realistic + ], + ) + def test_round_trip_various_lengths(self, plaintext): + payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + recovered = decrypt_from(payload, _SEC_TWO, _PUB_ONE) + assert recovered == plaintext + + def test_payloads_are_unique_under_random_nonce(self): + """Same plaintext + same key pair should produce different payloads + each time because the nonce is fresh CSPRNG bytes. Catches a + regression where the nonce is accidentally pinned.""" + plaintext = "the same message" + p1 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + p2 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + assert p1 != p2 + assert decrypt_from(p1, _SEC_TWO, _PUB_ONE) == plaintext + assert decrypt_from(p2, _SEC_TWO, _PUB_ONE) == plaintext + + def test_pinned_nonce_is_deterministic(self): + """Same plaintext + same key pair + same nonce = byte-identical + payload. Regression-locks the chacha20 + hmac chain.""" + ck = get_conversation_key(_SEC_ONE, _PUB_TWO) + nonce = bytes(32) # 32 zero bytes + p1 = encrypt_with_conversation_key("a", ck, nonce=nonce) + p2 = encrypt_with_conversation_key("a", ck, nonce=nonce) + assert p1 == p2 + assert decrypt_with_conversation_key(p1, ck) == "a" + + +class TestTamperDetection: + """HMAC-SHA256 verification catches tampered envelopes. The cryptographic + construction depends on this — if HMAC verification ever no-ops, a + relay-MITM could forge ATM state events.""" + + def _payload(self) -> str: + return encrypt_for("important message", _SEC_ONE, _PUB_TWO) + + def test_flipped_mac_byte_rejected(self): + raw = bytearray(base64.b64decode(self._payload())) + raw[-1] ^= 0x01 + tampered = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44MacError): + decrypt_from(tampered, _SEC_TWO, _PUB_ONE) + + def test_flipped_ciphertext_byte_rejected(self): + raw = bytearray(base64.b64decode(self._payload())) + # Flip a byte in the middle of the ciphertext segment + # (version[1] + nonce[32..32] + ciphertext[33..-32] + mac[-32..]) + ct_start = 1 + 32 + raw[ct_start + 5] ^= 0x01 + tampered = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44MacError): + decrypt_from(tampered, _SEC_TWO, _PUB_ONE) + + def test_flipped_nonce_byte_rejected(self): + raw = bytearray(base64.b64decode(self._payload())) + # Nonce starts at byte 1 (after version) + raw[1] ^= 0x01 + tampered = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44MacError): + decrypt_from(tampered, _SEC_TWO, _PUB_ONE) + + def test_wrong_recipient_privkey_rejected(self): + """The MAC is derived from the conversation key, so a wrong + recipient privkey produces a different conversation key → + different hmac_key → MAC verification fails. (Doesn't decrypt + to garbage; fails fast.)""" + sec_three = "00" * 31 + "03" + with pytest.raises(Nip44MacError): + decrypt_from(self._payload(), sec_three, _PUB_ONE) + + +class TestVersionRejection: + def test_v1_byte_rejected(self): + raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO))) + raw[0] = 0x01 + bad = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44VersionError): + decrypt_from(bad, _SEC_TWO, _PUB_ONE) + + def test_unknown_version_byte_rejected(self): + raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO))) + raw[0] = 0xFF + bad = base64.b64encode(bytes(raw)).decode("ascii") + with pytest.raises(Nip44VersionError): + decrypt_from(bad, _SEC_TWO, _PUB_ONE) + + +class TestLengthGuards: + def test_empty_plaintext_rejected(self): + with pytest.raises(Nip44LengthError): + encrypt_for("", _SEC_ONE, _PUB_TWO) + + def test_plaintext_at_max_length_accepted(self): + plaintext = "x" * 65535 + payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) + assert decrypt_from(payload, _SEC_TWO, _PUB_ONE) == plaintext + + def test_plaintext_over_max_rejected(self): + with pytest.raises(Nip44LengthError): + encrypt_for("x" * 65536, _SEC_ONE, _PUB_TWO) + + def test_invalid_base64_payload_rejected(self): + with pytest.raises(Nip44LengthError): + decrypt_from("not!!!base64@@@", _SEC_TWO, _PUB_ONE) + + def test_payload_too_short_rejected(self): + # 50 bytes is well under the 99-byte minimum + too_short = base64.b64encode(b"\x02" + b"\x00" * 49).decode("ascii") + with pytest.raises(Nip44LengthError): + decrypt_from(too_short, _SEC_TWO, _PUB_ONE) + + +class TestPaddingFormula: + """Spot-check the _calc_padded_len formula against hand-computed cases. + Locks in the NIP-44 v2 padding scheme so a refactor can't silently + break wire compatibility (which would only surface as cross-impl + decryption failures — exactly what test_decrypts_bitspire_sample is + meant to catch end-to-end, but a unit test here is cheaper).""" + + @pytest.mark.parametrize( + "plaintext_len,expected_padded", + [ + (1, 32), # <= 32 → 32 + (16, 32), + (32, 32), + (33, 64), # > 32 → next chunk + (64, 64), + (65, 96), # chunk = 32 for L=65 (next_power(64) = 64; 64//8 = 8; max(32, 8) = 32) + (100, 128), + (128, 128), + # L=129: next_power(128) = 1<<8 = 256; chunk = max(32, 256//8) = 32; + # padded = 32 * (128//32 + 1) = 32 * 5 = 160. + (129, 160), + (256, 256), # chunk = 32 for L=256 (next_power(255)=256; max(32, 32) = 32) + (257, 320), + (1000, 1024), # chunk = 128 for L=1000 (next_power(999)=1024; max(32, 128) = 128) + ], + ) + def test_calc_padded_len(self, plaintext_len, expected_padded): + assert _calc_padded_len(plaintext_len) == expected_padded + + +# ============================================================================= +# Layer 3 — byte-compat cross-test against nostr-tools (bitspire's impl) +# ============================================================================= + + +@pytest.mark.skip( + reason=( + "Waiting on bitspire to post one sample encrypted event to " + "~/dev/coordination/log.md per the 2026-05-30T15:55Z entry. Once " + "posted, hardcode the (event_id, content, recipient_privkey, " + "expected_plaintext) fixture here and remove the skip — this test " + "is the byte-compat cross-test between our hand-rolled NIP-44 v2 " + "and the nostr-tools impl the ATM uses." + ) +) +def test_decrypts_bitspire_sample_event_from_coord_log(): + """Cross-impl byte-compatibility test. Bitspire generates one event on + their side (nostr-tools NIP-44 v2 impl), posts the raw event JSON + + a known throwaway recipient privkey to the coord log, and we assert + our `decrypt_from` recovers the expected `{"denominations": {...}}` + plaintext. + + If this passes, both impls produce byte-identical wire format. If it + fails, the spec ambiguity surfaces before either side ships — exactly + what bitspire flagged in the plan review (`07:55Z`). + """ + # event_b64_content = "..." # paste from coord log + # sender_pubkey_hex = "..." + # recipient_privkey_hex = "..." + # expected_plaintext = '{"denominations": {"20": {"position": 1, "count": 49}}}' + # recovered = decrypt_from(event_b64_content, recipient_privkey_hex, sender_pubkey_hex) + # assert recovered == expected_plaintext + raise NotImplementedError("fixture pending — see skip reason")