diff --git a/crud.py b/crud.py index fc87070..51b07a4 100644 --- a/crud.py +++ b/crud.py @@ -12,7 +12,6 @@ from lnbits.db import Database from lnbits.helpers import urlsafe_short_hash from .models import ( - CassetteConfig, ClientBalanceSummary, CommissionSplit, CommissionSplitLeg, @@ -27,7 +26,6 @@ from .models import ( DcaPayment, DcaSettlement, Machine, - PublishCassettesPayload, SuperConfig, TelemetrySnapshot, UpdateDcaClientData, @@ -35,7 +33,6 @@ from .models import ( UpdateDepositStatusData, UpdateMachineData, UpdateSuperConfigData, - UpsertCassetteConfigData, UpsertDcaLpData, ) @@ -1337,152 +1334,3 @@ async def upsert_fleet_snapshot( {"mid": machine_id, "json": telemetry_json, "now": now}, ) return await get_telemetry(machine_id) - - -# ============================================================================= -# Cassette configs — operator-driven ATM cassette inventory (#29). -# ============================================================================= -# Row lifecycle per #29: -# - First population for a (machine_id, denomination) pair → apply_bootstrap_state -# (consumer reading the ATM's one-shot bitspire-cassettes-state event) -# - Operator edit of count or position → update_cassette_config (refuses to -# create new rows; the denomination set is hardware-determined) -# - Row creation/deletion for a new denomination → admin only, via ATM -# re-provisioning + new bootstrap event (not exposed in v1 here) - - -def _should_apply_bootstrap_state( - existing_state_event_id: Optional[str], incoming_event_id: str -) -> bool: - """Pure-function dedup gate for apply_bootstrap_state. - - Returns False if any existing row for this machine already references - the incoming event_id (relay re-delivery after restart). True otherwise. - - Extracted as a pure function so the dedup decision is unit-testable - without a database round-trip. The actual idempotency check in - apply_bootstrap_state fetches one existing row and passes its - state_event_id here. - """ - return existing_state_event_id != incoming_event_id - - -async def get_cassette_config( - machine_id: str, denomination: int -) -> Optional[CassetteConfig]: - return await db.fetchone( - "SELECT * FROM satoshimachine.cassette_configs " - "WHERE machine_id = :mid AND denomination = :denom", - {"mid": machine_id, "denom": denomination}, - CassetteConfig, - ) - - -async def list_cassette_configs_for_machine( - machine_id: str, -) -> List[CassetteConfig]: - return await db.fetchall( - "SELECT * FROM satoshimachine.cassette_configs " - "WHERE machine_id = :mid ORDER BY position, denomination", - {"mid": machine_id}, - CassetteConfig, - ) - - -async def update_cassette_config( - machine_id: str, - denomination: int, - data: UpsertCassetteConfigData, - *, - updated_by: Optional[str] = None, -) -> Optional[CassetteConfig]: - """Operator-driven row update: change count and/or position for a single - cassette. Refuses to create new rows — those only land via - apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row - lifecycle: hardware-determined denomination set, not operator-creatable). - Returns None if the (machine_id, denomination) row doesn't exist. - """ - existing = await get_cassette_config(machine_id, denomination) - if existing is None: - return None - update_data: dict = {k: v for k, v in data.dict().items() if v is not None} - if not update_data: - return existing - update_data["updated_at"] = datetime.now() - update_data["updated_by"] = updated_by - set_clause = ", ".join(f"{k} = :{k}" for k in update_data) - update_data["mid"] = machine_id - update_data["denom"] = denomination - await db.execute( - f"UPDATE satoshimachine.cassette_configs SET {set_clause} " - "WHERE machine_id = :mid AND denomination = :denom", - update_data, - ) - return await get_cassette_config(machine_id, denomination) - - -async def apply_bootstrap_state( - machine_id: str, - event_id: str, - event_created_at: datetime, - payload: PublishCassettesPayload, -) -> bool: - """Consume an ATM-published kind-30078 bitspire-cassettes-state: event - and upsert one cassette_configs row per denomination in the payload. - - Returns True if the upsert ran; False if any existing row for this - machine already references this event_id (idempotent on relay - re-delivery / restart). - - Populates both the operator-believed columns (count, position, - updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel - columns (state_count, state_at, state_event_id) so the operator's - initial view matches the ATM's reported state. v2 reconciliation UI - will diverge them when continuous reverse-channel events land. - """ - existing_first = await db.fetchone( - "SELECT state_event_id FROM satoshimachine.cassette_configs " - "WHERE machine_id = :mid LIMIT 1", - {"mid": machine_id}, - ) - existing_event_id: Optional[str] = None - if existing_first is not None: - existing_event_id = ( - existing_first.get("state_event_id") - if isinstance(existing_first, dict) - else getattr(existing_first, "state_event_id", None) - ) - if not _should_apply_bootstrap_state(existing_event_id, event_id): - return False - - now = datetime.now() - for denom, row in payload.denominations.items(): - await db.execute( - """ - INSERT INTO satoshimachine.cassette_configs - (machine_id, denomination, count, position, updated_at, - updated_by, state_count, state_at, state_event_id) - VALUES (:mid, :denom, :count, :pos, :now, :by, - :state_count, :state_at, :event_id) - ON CONFLICT (machine_id, denomination) DO UPDATE SET - count = excluded.count, - position = excluded.position, - updated_at = excluded.updated_at, - updated_by = excluded.updated_by, - state_count = excluded.state_count, - state_at = excluded.state_at, - state_event_id = excluded.state_event_id - """, - { - "mid": machine_id, - "denom": denom, - "count": row.count, - "pos": row.position, - "now": now, - "by": "atm-bootstrap", - "state_count": row.count, - "state_at": event_created_at, - "event_id": event_id, - }, - ) - return True diff --git a/migrations.py b/migrations.py index 807292f..38b29d0 100644 --- a/migrations.py +++ b/migrations.py @@ -538,40 +538,3 @@ async def m005_lock_deposit_currency_to_machine_fiat_code(db): AND m.fiat_code != d.currency ) """) - - -async def m007_add_cassette_configs(db): - """Add cassette_configs table for operator-driven ATM cassette inventory. - - Tracks per-machine cassette state (denomination, count, position) editable - via the satmachineadmin dashboard and published to the ATM as encrypted - kind-30078 events. See aiolabs/satmachineadmin#29 + lamassu-next#56. - - Schema choice: PK (machine_id, denomination) mirrors the ATM-side - denomination-as-key invariant in - bitspire/atm-tui/src/db.zig:31 and - lamassu-next/apps/machine/electron/state-store.ts:54 - (the cassettes table PK is denomination; HAL inventory map keys on - denomination; dispense lookup is cassetteDenominations.indexOf — - duplicates collapse silently). Position is operator-assignable display - order, not the addressable unit. - - Reserved nullable columns (state_count, state_at, state_event_id) hold - the latest bitspire-cassettes-state: event the ATM - publishes (one-shot bootstrap in v1; continuous in v2). v1 UI doesn't - render them; v2 reconciliation UI consumes them without a migration. - """ - await db.execute(f""" - CREATE TABLE IF NOT EXISTS satoshimachine.cassette_configs ( - machine_id TEXT NOT NULL, - denomination INTEGER NOT NULL, - count INTEGER NOT NULL, - position INTEGER NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, - updated_by TEXT, - state_count INTEGER, - state_at TIMESTAMP, - state_event_id TEXT, - PRIMARY KEY (machine_id, denomination) - ); - """) diff --git a/models.py b/models.py index 5f88067..d683cac 100644 --- a/models.py +++ b/models.py @@ -546,127 +546,3 @@ class SettleBalanceData(BaseModel): if v <= 0: raise ValueError("amount_fiat must be > 0 if specified") return round(float(v), 2) - - -# ============================================================================= -# Cassette configs — operator-driven ATM cassette inventory (#29). -# ============================================================================= -# Schema is denomination-keyed per the locked design (#29 body + the -# 06:40Z coord-log audit): every ATM-side layer below the wire keys on -# denomination (state-store.ts:54, hal-service.ts:116/189). The -# satmachineadmin schema mirrors this so the operator UI can't author a -# duplicate-denomination payload that the ATM would silently collapse. -# -# Position is operator-assignable display order (and used by the ATM as -# the HAL slot-index assignment), not the addressable unit. -# -# state_count / state_at / state_event_id are reserved nullable from day 1 -# for the v2 reverse-channel reconciliation consumer (bitspire-cassettes- -# state:). v1 populates them on bootstrap-event receipt -# but the UI doesn't render reconciliation. - - -class CassetteConfig(BaseModel): - machine_id: str - denomination: int - count: int - position: int - updated_at: datetime - updated_by: Optional[str] - state_count: Optional[int] - state_at: Optional[datetime] - state_event_id: Optional[str] - - -class UpsertCassetteConfigData(BaseModel): - """Operator edits a single cassette row's count or position from the - dashboard. Both fields optional; pass only those changed.""" - - count: Optional[int] = None - position: Optional[int] = None - - @validator("count") - def count_non_negative(cls, v): - if v is None: - return v - if v < 0: - raise ValueError("count must be >= 0") - return v - - @validator("position") - def position_positive(cls, v): - if v is None: - return v - if v <= 0: - raise ValueError("position must be > 0") - return v - - -class CassettePayloadRow(BaseModel): - """One denomination's payload values in the wire-format - `{"denominations": {"": {"position", "count"}}}`.""" - - position: int - count: int - - @validator("position") - def position_positive(cls, v): - if v <= 0: - raise ValueError("position must be > 0") - return v - - @validator("count") - def count_non_negative(cls, v): - if v < 0: - raise ValueError("count must be >= 0") - return v - - -class PublishCassettesPayload(BaseModel): - """The decrypted JSON content of a kind-30078 cassette event, both - directions: - - operator → ATM (d-tag `bitspire-cassettes:`) - - ATM → operator (d-tag `bitspire-cassettes-state:`) - - Wire shape: `{"denominations": {"": {"position", "count"}}}`. - JSON object keys are always strings; the validator coerces back to - int on parse. The denomination key set MUST match what the receiver - already has (no add / no remove from this payload). - """ - - denominations: dict[int, CassettePayloadRow] - - @validator("denominations", pre=True) - def coerce_string_keys_to_int(cls, v): - if not isinstance(v, dict): - raise ValueError("denominations must be a dict") - out = {} - for k, val in v.items(): - try: - key_int = int(k) - except (TypeError, ValueError) as exc: - raise ValueError( - f"denomination key {k!r} is not an int" - ) from exc - if key_int <= 0: - raise ValueError(f"denomination must be > 0 (got {key_int})") - out[key_int] = val - return out - - @validator("denominations") - def no_duplicate_positions(cls, v): - positions = [row.position for row in v.values()] - if len(set(positions)) != len(positions): - raise ValueError("duplicate position values in payload") - return v - - def to_wire_dict(self) -> dict: - """Serialise back to the wire format with string keys for JSON - object compatibility. Used by the publisher to build the kind-30078 - event content before NIP-44 v2 encryption.""" - return { - "denominations": { - str(denom): {"position": row.position, "count": row.count} - for denom, row in self.denominations.items() - } - } diff --git a/nip44.py b/nip44.py deleted file mode 100644 index 928e9de..0000000 --- a/nip44.py +++ /dev/null @@ -1,271 +0,0 @@ -""" -NIP-44 v2 — versioned encrypted payloads (https://github.com/nostr-protocol/nips/blob/master/44.md). - -Hand-rolled because lnbits ships only NIP-04 (AES-CBC) in `lnbits.utils.nostr.encrypt_content`, -and the locked design at aiolabs/satmachineadmin#29 (paired with lamassu-next#56) wires -cassette config over kind-30078 with NIP-44 v2 encrypted content. Adding a Python NIP-44 -v2 lib dep was an option per the plan; chose the hand-roll path to stay dep-light and -keep the impl auditable inline. - -Two safety nets keep this honest: - 1. tests/test_nip44_v2.py runs reference vectors + round-trip + tamper-detection. - 2. bitspire posts a sample event encrypted on their nostr-tools side to the coord log; - test_decrypts_bitspire_sample_event_from_coord_log cross-checks our impl against - theirs by decrypting that event with a known privkey. - -Wire format (per spec): - payload = base64( 0x02 || nonce (32B) || ciphertext (var) || mac (32B) ) - -Key derivation: - conversation_key = HKDF-extract(salt=b"nip44-v2", IKM=ecdh_shared_x) # 32B PRK, stable per pair - per-message: - nonce = csprng(32 bytes) - temp = HKDF-expand(PRK=conversation_key, info=nonce, L=76) - chacha_key = temp[0:32] - chacha_nonce = temp[32:44] - hmac_key = temp[44:76] - -Padding scheme (NIP-44 v2 length-prefixed, variable-chunk): - padded = uint16_be(len(plaintext)) || plaintext || zeros - such that 2 + padded_data_len matches a fixed step. -""" - -from __future__ import annotations - -import base64 -import hashlib -import hmac as hmac_stdlib -import os -import struct -from typing import Optional - -import coincurve -from cryptography.hazmat.primitives import hashes, hmac -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms -from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand - -# Spec constants. -_VERSION = 0x02 -_HKDF_SALT = b"nip44-v2" -_MIN_PLAINTEXT_LEN = 1 -_MAX_PLAINTEXT_LEN = 65535 -_NONCE_LEN = 32 -_MAC_LEN = 32 -_MIN_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 32) + _MAC_LEN # version + nonce + min padded + mac -_MAX_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 65536) + _MAC_LEN - - -class Nip44Error(Exception): - """Generic NIP-44 v2 envelope error. Subclasses distinguish failure modes.""" - - -class Nip44VersionError(Nip44Error): - """First payload byte was not 0x02. Could be a NIP-04 envelope, a v1 NIP-44, or garbage.""" - - -class Nip44MacError(Nip44Error): - """HMAC verification failed — payload was tampered, wrong conversation key, or corrupted in transit.""" - - -class Nip44LengthError(Nip44Error): - """Plaintext or payload length outside the spec-allowed range, or padding header lies.""" - - -# ============================================================================= -# Padding (NIP-44 v2) -# ============================================================================= - - -def _calc_padded_len(plaintext_len: int) -> int: - """Per NIP-44 v2 padding scheme: - if L <= 32: padded_len = 32 - else: chunk = max(32, next_power_2(L-1) // 8); padded_len = chunk * ((L-1) // chunk + 1) - """ - if plaintext_len <= 32: - return 32 - next_power = 1 << (plaintext_len - 1).bit_length() - chunk = max(32, next_power // 8) - return chunk * ((plaintext_len - 1) // chunk + 1) - - -def _pad(plaintext: bytes) -> bytes: - """Prefix uint16_be length + plaintext + zero-fill to the NIP-44 v2 boundary.""" - n = len(plaintext) - if n < _MIN_PLAINTEXT_LEN or n > _MAX_PLAINTEXT_LEN: - raise Nip44LengthError( - f"plaintext length {n} outside [{_MIN_PLAINTEXT_LEN}, {_MAX_PLAINTEXT_LEN}]" - ) - padded_data_len = _calc_padded_len(n) - zeros = b"\x00" * (padded_data_len - n) - return struct.pack(">H", n) + plaintext + zeros - - -def _unpad(padded: bytes) -> bytes: - """Strip the uint16_be length prefix and zero padding. Validates that the - declared length is consistent with the padded payload (rejects a forged - length prefix that would slice past the buffer or imply a different - padded_data_len than what we received).""" - if len(padded) < 2: - raise Nip44LengthError("padded payload too short to hold length prefix") - declared_len = struct.unpack(">H", padded[0:2])[0] - if declared_len < _MIN_PLAINTEXT_LEN or declared_len > _MAX_PLAINTEXT_LEN: - raise Nip44LengthError(f"declared plaintext length {declared_len} out of range") - if len(padded) != 2 + _calc_padded_len(declared_len): - raise Nip44LengthError( - f"padded buffer length {len(padded)} doesn't match the calculated padding " - f"for declared length {declared_len}" - ) - return padded[2 : 2 + declared_len] - - -# ============================================================================= -# Conversation + message-key derivation -# ============================================================================= - - -def get_conversation_key(privkey_hex: str, pubkey_hex: str) -> bytes: - """Derive the per-pair stable conversation key (PRK) used for all messages - between sender (privkey) and recipient (pubkey). - - Steps: - shared_x = ECDH(privkey, pubkey).x # 32 bytes, x-coordinate - prk = HKDF-extract(salt=b"nip44-v2", IKM=shared_x) - - coincurve's `.multiply(secret).format(compressed=True)[1:]` strips the - leading 0x02/0x03 parity byte to return the raw x-coord — same trick - `lnbits.utils.nostr.encrypt_content` uses for NIP-04. - """ - sender = coincurve.PrivateKey(bytes.fromhex(privkey_hex)) - recipient_pub = coincurve.PublicKey(b"\x02" + bytes.fromhex(pubkey_hex)) - shared_x = recipient_pub.multiply(sender.secret).format(compressed=True)[1:] - # HKDF-extract is HMAC-SHA256(key=salt, msg=ikm) per RFC 5869. - return hmac_stdlib.new(_HKDF_SALT, shared_x, hashlib.sha256).digest() - - -def _derive_message_keys( - conversation_key: bytes, nonce: bytes -) -> tuple[bytes, bytes, bytes]: - """Per-message key expansion: HKDF-expand(PRK=conversation_key, info=nonce, L=76). - Returns (chacha_key 32B, chacha_nonce 12B, hmac_key 32B).""" - hkdf = HKDFExpand(algorithm=hashes.SHA256(), length=76, info=nonce) - okm = hkdf.derive(conversation_key) - return okm[0:32], okm[32:44], okm[44:76] - - -def _hmac_aad(hmac_key: bytes, nonce: bytes, ciphertext: bytes) -> bytes: - """HMAC-SHA256(key=hmac_key, msg=nonce || ciphertext). Returns 32-byte MAC.""" - h = hmac.HMAC(hmac_key, hashes.SHA256()) - h.update(nonce) - h.update(ciphertext) - return h.finalize() - - -def _chacha20(key: bytes, nonce: bytes, data: bytes) -> bytes: - """ChaCha20 stream cipher (symmetric: encrypt == decrypt). Used both directions. - - The `cryptography` lib's `algorithms.ChaCha20(key, nonce)` expects a - 16-byte nonce arg: a 4-byte little-endian initial counter prefix + - 12-byte actual nonce. NIP-44 v2 starts the counter at 0 and uses the - HKDF-derived 12-byte chacha_nonce, so we prefix four zero bytes here. - """ - if len(nonce) != 12: - raise Nip44LengthError( - f"chacha_nonce must be 12 bytes (NIP-44 v2), got {len(nonce)}" - ) - cipher = Cipher(algorithms.ChaCha20(key, b"\x00\x00\x00\x00" + nonce), mode=None) - return cipher.encryptor().update(data) - - -# ============================================================================= -# Public API — low-level (nonce-controllable for testability) -# ============================================================================= - - -def encrypt_with_conversation_key( - plaintext: str, - conversation_key: bytes, - *, - nonce: Optional[bytes] = None, -) -> str: - """Encrypt `plaintext` under a precomputed `conversation_key` (32B PRK). - - `nonce` is 32 random bytes when omitted (the production path). Tests pass - it explicitly to assert pinned reference vectors. - - Returns the base64-encoded payload string suitable as a Nostr event's - `content` field for kind-30078 (and any other kind that uses NIP-44 v2). - """ - if nonce is None: - nonce = os.urandom(_NONCE_LEN) - elif len(nonce) != _NONCE_LEN: - raise Nip44LengthError(f"nonce must be exactly {_NONCE_LEN} bytes") - - padded = _pad(plaintext.encode("utf-8")) - chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce) - ciphertext = _chacha20(chacha_key, chacha_nonce, padded) - mac = _hmac_aad(hmac_key, nonce, ciphertext) - return base64.b64encode( - bytes([_VERSION]) + nonce + ciphertext + mac - ).decode("ascii") - - -def decrypt_with_conversation_key(payload_b64: str, conversation_key: bytes) -> str: - """Decrypt a NIP-44 v2 payload using a precomputed `conversation_key`. - - Raises: - Nip44VersionError — payload's first byte isn't 0x02 - Nip44LengthError — payload too short / too long / declared length lies - Nip44MacError — HMAC verification failed (tamper, wrong key, corruption) - """ - try: - raw = base64.b64decode(payload_b64, validate=True) - except Exception as exc: # noqa: BLE001 — we want any base64 failure surfaced uniformly - raise Nip44LengthError(f"payload is not valid base64: {exc}") from exc - - if len(raw) < _MIN_PAYLOAD_LEN or len(raw) > _MAX_PAYLOAD_LEN: - raise Nip44LengthError(f"payload length {len(raw)} outside valid range") - if raw[0] != _VERSION: - raise Nip44VersionError(f"unsupported NIP-44 version: 0x{raw[0]:02x}") - - nonce = raw[1 : 1 + _NONCE_LEN] - mac_received = raw[-_MAC_LEN:] - ciphertext = raw[1 + _NONCE_LEN : -_MAC_LEN] - - chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce) - mac_expected = _hmac_aad(hmac_key, nonce, ciphertext) - # constant-time compare to avoid timing-leak in MAC verification - if not hmac_stdlib.compare_digest(mac_received, mac_expected): - raise Nip44MacError("HMAC verification failed") - - padded = _chacha20(chacha_key, chacha_nonce, ciphertext) - plaintext_bytes = _unpad(padded) - return plaintext_bytes.decode("utf-8") - - -# ============================================================================= -# Public API — high-level (pair-keyed, the call shape app code reaches for) -# ============================================================================= - - -def encrypt_for( - plaintext: str, - sender_privkey_hex: str, - recipient_pubkey_hex: str, - *, - nonce: Optional[bytes] = None, -) -> str: - """Encrypt `plaintext` from the sender (holding the privkey) to the recipient - (identified by pubkey). The recipient can decrypt with `decrypt_from( - payload, recipient_privkey_hex, sender_pubkey_hex)` — symmetric on the - conversation key, which is the same derived value from either side.""" - conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex) - return encrypt_with_conversation_key(plaintext, conversation_key, nonce=nonce) - - -def decrypt_from( - payload_b64: str, recipient_privkey_hex: str, sender_pubkey_hex: str -) -> str: - """Decrypt a payload that the recipient (holding the privkey) received from - the sender (identified by pubkey).""" - conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex) - return decrypt_with_conversation_key(payload_b64, conversation_key) diff --git a/tests/test_cassette_configs.py b/tests/test_cassette_configs.py deleted file mode 100644 index f9d9a4a..0000000 --- a/tests/test_cassette_configs.py +++ /dev/null @@ -1,220 +0,0 @@ -""" -Tests for the v1 cassette-config layer (aiolabs/satmachineadmin#29). - -Covers the pure pieces that don't need a live DB: - - Pydantic validator behaviour on PublishCassettesPayload + the row / - upsert models (denomination key coercion, integer ranges, no-duplicate- - positions, wire-format round-trip) - - _should_apply_bootstrap_state dedup helper (extracted from - apply_bootstrap_state so the relay-re-delivery decision is testable - without a database round-trip) - -DB-touching tests (apply_bootstrap_state actually upserting, list-by- -machine ordering, etc.) follow the project convention from -test_deposit_currency.py: "Layer 2 is an endpoint-level behaviour better -covered by an integration test against a running LNbits; tracked in #26 -as a follow-up." Smoke-tested manually via the dev container. -""" - -import pytest - -from ..crud import _should_apply_bootstrap_state -from ..models import ( - CassettePayloadRow, - PublishCassettesPayload, - UpsertCassetteConfigData, -) - - -# ============================================================================= -# PublishCassettesPayload — wire-shape validators -# ============================================================================= - - -class TestPublishCassettesPayload: - """The kind-30078 content payload, bidirectional (operator→ATM and - ATM→operator share the shape). String JSON keys must coerce to int; - duplicate positions must reject; per-row int constraints enforced.""" - - def test_happy_path_coerces_string_keys_to_int(self): - p = PublishCassettesPayload( - denominations={ - "20": {"position": 1, "count": 49}, - "50": {"position": 2, "count": 100}, - } - ) - assert set(p.denominations.keys()) == {20, 50} - assert p.denominations[20].position == 1 - assert p.denominations[20].count == 49 - assert p.denominations[50].count == 100 - - def test_wire_dict_round_trip_restringifies_keys(self): - """to_wire_dict() must restringify denomination keys so the - resulting JSON is parseable by clients (including the ATM-side - nostr-tools NIP-44 v2 consumer per the byte-compat cross-test).""" - original = PublishCassettesPayload( - denominations={ - "20": {"position": 1, "count": 49}, - "50": {"position": 2, "count": 100}, - } - ) - wire = original.to_wire_dict() - assert wire == { - "denominations": { - "20": {"position": 1, "count": 49}, - "50": {"position": 2, "count": 100}, - } - } - # And the wire form round-trips back through the parser cleanly. - reparsed = PublishCassettesPayload(**wire) - assert reparsed.denominations == original.denominations - - def test_rejects_non_int_key(self): - with pytest.raises(ValueError) as exc: - PublishCassettesPayload( - denominations={"abc": {"position": 1, "count": 1}} - ) - assert "is not an int" in str(exc.value) - - def test_rejects_non_positive_denomination(self): - with pytest.raises(ValueError) as exc: - PublishCassettesPayload( - denominations={"0": {"position": 1, "count": 1}} - ) - assert "denomination must be > 0" in str(exc.value) - - def test_rejects_negative_denomination(self): - with pytest.raises(ValueError) as exc: - PublishCassettesPayload( - denominations={"-20": {"position": 1, "count": 1}} - ) - assert "denomination must be > 0" in str(exc.value) - - def test_rejects_duplicate_position(self): - """Two cassettes can't occupy the same physical slot. The schema - PK is (machine_id, denomination), so duplicates land via the - payload; reject at the validator layer before the publish path - builds an event the ATM will misinterpret.""" - with pytest.raises(ValueError) as exc: - PublishCassettesPayload( - denominations={ - "20": {"position": 1, "count": 49}, - "50": {"position": 1, "count": 100}, - } - ) - assert "duplicate position" in str(exc.value) - - def test_rejects_negative_count(self): - with pytest.raises(ValueError): - PublishCassettesPayload( - denominations={"20": {"position": 1, "count": -1}} - ) - - def test_rejects_zero_position(self): - with pytest.raises(ValueError): - PublishCassettesPayload( - denominations={"20": {"position": 0, "count": 1}} - ) - - def test_allows_zero_count(self): - """An empty cassette is a legal state — operator must be able to - record `count=0` after a dispatcher pulled the cassette mid-day.""" - p = PublishCassettesPayload( - denominations={"20": {"position": 1, "count": 0}} - ) - assert p.denominations[20].count == 0 - - -# ============================================================================= -# CassettePayloadRow — per-row int constraints (single-row tests) -# ============================================================================= - - -class TestCassettePayloadRow: - def test_happy_path(self): - row = CassettePayloadRow(position=1, count=49) - assert row.position == 1 - assert row.count == 49 - - @pytest.mark.parametrize("bad_position", [0, -1, -100]) - def test_rejects_non_positive_position(self, bad_position): - with pytest.raises(ValueError): - CassettePayloadRow(position=bad_position, count=1) - - def test_rejects_negative_count(self): - with pytest.raises(ValueError): - CassettePayloadRow(position=1, count=-1) - - -# ============================================================================= -# UpsertCassetteConfigData — operator-edit form -# ============================================================================= - - -class TestUpsertCassetteConfigData: - """Operator-driven row edit. Both fields optional; same int constraints - as the wire-format row but applied independently per-edit.""" - - def test_partial_update_count_only(self): - d = UpsertCassetteConfigData(count=80) - assert d.count == 80 - assert d.position is None - - def test_partial_update_position_only(self): - d = UpsertCassetteConfigData(position=3) - assert d.position == 3 - assert d.count is None - - def test_empty_update_is_legal(self): - """An empty UpsertCassetteConfigData parses fine; the CRUD short- - circuits a no-op on empty payload (no SQL emitted).""" - d = UpsertCassetteConfigData() - assert d.count is None - assert d.position is None - - def test_rejects_negative_count(self): - with pytest.raises(ValueError): - UpsertCassetteConfigData(count=-1) - - def test_rejects_non_positive_position(self): - with pytest.raises(ValueError): - UpsertCassetteConfigData(position=0) - - -# ============================================================================= -# _should_apply_bootstrap_state — relay re-delivery dedup -# ============================================================================= - - -class TestShouldApplyBootstrapState: - """Pure-function dedup gate extracted from apply_bootstrap_state so the - decision is testable without a DB. Logic: apply if-and-only-if the - existing row's state_event_id differs from the incoming event_id. - - In v1 the ATM publishes the bootstrap event exactly once per machine, - so this is sufficient for replay protection. v2 will need a - `last_state_created_at` watermark in addition (per bitspire's - `meta.lastKnownConfigCreatedAt` on the ATM side) — flagged in #29's - v2 forward-look section. - """ - - def test_applies_when_no_existing_row(self): - assert _should_apply_bootstrap_state(None, "new-event-id") is True - - def test_applies_when_existing_event_id_differs(self): - assert ( - _should_apply_bootstrap_state("old-event-id", "new-event-id") is True - ) - - def test_skips_when_existing_event_id_matches(self): - """The same bootstrap event re-delivered after a relay reconnect - or satmachineadmin restart should no-op, not re-upsert the same - rows (which would clobber any operator edits since).""" - assert ( - _should_apply_bootstrap_state("same-event", "same-event") is False - ) - - def test_applies_when_existing_is_empty_string_and_incoming_is_id(self): - """Defensive — a sentinel empty-string existing_state_event_id - shouldn't block a real incoming event from applying.""" - assert _should_apply_bootstrap_state("", "real-event-id") is True diff --git a/tests/test_nip44_v2.py b/tests/test_nip44_v2.py deleted file mode 100644 index 247c0ac..0000000 --- a/tests/test_nip44_v2.py +++ /dev/null @@ -1,272 +0,0 @@ -""" -Tests for the hand-rolled NIP-44 v2 implementation in `nip44.py`. - -Three layers of validation, ordered by trust: - 1. Pinned reference vector from the canonical paulmillr/nip44 test suite — - the conversation_key for (sec=1, sec=2) is widely-published as - c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d. If - our get_conversation_key() ever drifts from that value, the impl is - broken at the key-derivation layer. - 2. Round-trip + tamper detection — verifies the encrypt/decrypt loop - under random nonces, catches HMAC + version + padding tampering. - 3. Cross-test (TBD) — bitspire will post one sample event encrypted on - their nostr-tools side to the coord log; test_decrypts_bitspire_sample - wires it as a fixture and asserts byte-compatibility with the - nostr-tools NIP-44 v2 impl. Placeholder stub until the sample lands. -""" - -import base64 - -import coincurve -import pytest - -from ..nip44 import ( - Nip44LengthError, - Nip44MacError, - Nip44VersionError, - _calc_padded_len, - decrypt_from, - decrypt_with_conversation_key, - encrypt_for, - encrypt_with_conversation_key, - get_conversation_key, -) - -# Helper: derive a compressed-x-coord pubkey hex string from a secret hex. -def _pub_hex(sec_hex: str) -> str: - return ( - coincurve.PrivateKey(bytes.fromhex(sec_hex)) - .public_key.format(compressed=True)[1:] - .hex() - ) - - -# Canonical test keys widely used across NIP-44 reference vectors. -_SEC_ONE = "00" * 31 + "01" # integer 1 -_SEC_TWO = "00" * 31 + "02" # integer 2 -_PUB_ONE = _pub_hex(_SEC_ONE) -_PUB_TWO = _pub_hex(_SEC_TWO) - - -# ============================================================================= -# Layer 1 — pinned reference vector (paulmillr/nip44) -# ============================================================================= - - -class TestConversationKeyReferenceVector: - """Pinned reference vector from the canonical NIP-44 v2 test suite - (paulmillr/nip44). If get_conversation_key drifts from this value we - have a key-derivation regression — fail loudly.""" - - REFERENCE_CK_HEX = ( - "c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d" - ) - - def test_sec_one_pub_two(self): - ck = get_conversation_key(_SEC_ONE, _PUB_TWO) - assert ck.hex() == self.REFERENCE_CK_HEX - - def test_sec_two_pub_one_is_symmetric(self): - """Conversation key is symmetric: ck(privA, pubB) == ck(privB, pubA). - Both sides of a NIP-44 conversation derive the identical PRK; this - is what lets the recipient decrypt with their own privkey + the - sender's pubkey.""" - ck_ab = get_conversation_key(_SEC_ONE, _PUB_TWO) - ck_ba = get_conversation_key(_SEC_TWO, _PUB_ONE) - assert ck_ab == ck_ba - - -# ============================================================================= -# Layer 2 — round-trip + tamper detection -# ============================================================================= - - -class TestRoundTrip: - """Encrypt then decrypt under the high-level pair-keyed API.""" - - @pytest.mark.parametrize( - "plaintext", - [ - "a", # 1 byte (minimum) - "hello, nip44 v2", # short - "x" * 32, # exactly the small-payload boundary - "x" * 33, # just over - "y" * 1000, # medium - "z" * 5000, # large - '{"denominations": {"20": {"position": 1, "count": 49}}}', # realistic - ], - ) - def test_round_trip_various_lengths(self, plaintext): - payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) - recovered = decrypt_from(payload, _SEC_TWO, _PUB_ONE) - assert recovered == plaintext - - def test_payloads_are_unique_under_random_nonce(self): - """Same plaintext + same key pair should produce different payloads - each time because the nonce is fresh CSPRNG bytes. Catches a - regression where the nonce is accidentally pinned.""" - plaintext = "the same message" - p1 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) - p2 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) - assert p1 != p2 - assert decrypt_from(p1, _SEC_TWO, _PUB_ONE) == plaintext - assert decrypt_from(p2, _SEC_TWO, _PUB_ONE) == plaintext - - def test_pinned_nonce_is_deterministic(self): - """Same plaintext + same key pair + same nonce = byte-identical - payload. Regression-locks the chacha20 + hmac chain.""" - ck = get_conversation_key(_SEC_ONE, _PUB_TWO) - nonce = bytes(32) # 32 zero bytes - p1 = encrypt_with_conversation_key("a", ck, nonce=nonce) - p2 = encrypt_with_conversation_key("a", ck, nonce=nonce) - assert p1 == p2 - assert decrypt_with_conversation_key(p1, ck) == "a" - - -class TestTamperDetection: - """HMAC-SHA256 verification catches tampered envelopes. The cryptographic - construction depends on this — if HMAC verification ever no-ops, a - relay-MITM could forge ATM state events.""" - - def _payload(self) -> str: - return encrypt_for("important message", _SEC_ONE, _PUB_TWO) - - def test_flipped_mac_byte_rejected(self): - raw = bytearray(base64.b64decode(self._payload())) - raw[-1] ^= 0x01 - tampered = base64.b64encode(bytes(raw)).decode("ascii") - with pytest.raises(Nip44MacError): - decrypt_from(tampered, _SEC_TWO, _PUB_ONE) - - def test_flipped_ciphertext_byte_rejected(self): - raw = bytearray(base64.b64decode(self._payload())) - # Flip a byte in the middle of the ciphertext segment - # (version[1] + nonce[32..32] + ciphertext[33..-32] + mac[-32..]) - ct_start = 1 + 32 - raw[ct_start + 5] ^= 0x01 - tampered = base64.b64encode(bytes(raw)).decode("ascii") - with pytest.raises(Nip44MacError): - decrypt_from(tampered, _SEC_TWO, _PUB_ONE) - - def test_flipped_nonce_byte_rejected(self): - raw = bytearray(base64.b64decode(self._payload())) - # Nonce starts at byte 1 (after version) - raw[1] ^= 0x01 - tampered = base64.b64encode(bytes(raw)).decode("ascii") - with pytest.raises(Nip44MacError): - decrypt_from(tampered, _SEC_TWO, _PUB_ONE) - - def test_wrong_recipient_privkey_rejected(self): - """The MAC is derived from the conversation key, so a wrong - recipient privkey produces a different conversation key → - different hmac_key → MAC verification fails. (Doesn't decrypt - to garbage; fails fast.)""" - sec_three = "00" * 31 + "03" - with pytest.raises(Nip44MacError): - decrypt_from(self._payload(), sec_three, _PUB_ONE) - - -class TestVersionRejection: - def test_v1_byte_rejected(self): - raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO))) - raw[0] = 0x01 - bad = base64.b64encode(bytes(raw)).decode("ascii") - with pytest.raises(Nip44VersionError): - decrypt_from(bad, _SEC_TWO, _PUB_ONE) - - def test_unknown_version_byte_rejected(self): - raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO))) - raw[0] = 0xFF - bad = base64.b64encode(bytes(raw)).decode("ascii") - with pytest.raises(Nip44VersionError): - decrypt_from(bad, _SEC_TWO, _PUB_ONE) - - -class TestLengthGuards: - def test_empty_plaintext_rejected(self): - with pytest.raises(Nip44LengthError): - encrypt_for("", _SEC_ONE, _PUB_TWO) - - def test_plaintext_at_max_length_accepted(self): - plaintext = "x" * 65535 - payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO) - assert decrypt_from(payload, _SEC_TWO, _PUB_ONE) == plaintext - - def test_plaintext_over_max_rejected(self): - with pytest.raises(Nip44LengthError): - encrypt_for("x" * 65536, _SEC_ONE, _PUB_TWO) - - def test_invalid_base64_payload_rejected(self): - with pytest.raises(Nip44LengthError): - decrypt_from("not!!!base64@@@", _SEC_TWO, _PUB_ONE) - - def test_payload_too_short_rejected(self): - # 50 bytes is well under the 99-byte minimum - too_short = base64.b64encode(b"\x02" + b"\x00" * 49).decode("ascii") - with pytest.raises(Nip44LengthError): - decrypt_from(too_short, _SEC_TWO, _PUB_ONE) - - -class TestPaddingFormula: - """Spot-check the _calc_padded_len formula against hand-computed cases. - Locks in the NIP-44 v2 padding scheme so a refactor can't silently - break wire compatibility (which would only surface as cross-impl - decryption failures — exactly what test_decrypts_bitspire_sample is - meant to catch end-to-end, but a unit test here is cheaper).""" - - @pytest.mark.parametrize( - "plaintext_len,expected_padded", - [ - (1, 32), # <= 32 → 32 - (16, 32), - (32, 32), - (33, 64), # > 32 → next chunk - (64, 64), - (65, 96), # chunk = 32 for L=65 (next_power(64) = 64; 64//8 = 8; max(32, 8) = 32) - (100, 128), - (128, 128), - # L=129: next_power(128) = 1<<8 = 256; chunk = max(32, 256//8) = 32; - # padded = 32 * (128//32 + 1) = 32 * 5 = 160. - (129, 160), - (256, 256), # chunk = 32 for L=256 (next_power(255)=256; max(32, 32) = 32) - (257, 320), - (1000, 1024), # chunk = 128 for L=1000 (next_power(999)=1024; max(32, 128) = 128) - ], - ) - def test_calc_padded_len(self, plaintext_len, expected_padded): - assert _calc_padded_len(plaintext_len) == expected_padded - - -# ============================================================================= -# Layer 3 — byte-compat cross-test against nostr-tools (bitspire's impl) -# ============================================================================= - - -@pytest.mark.skip( - reason=( - "Waiting on bitspire to post one sample encrypted event to " - "~/dev/coordination/log.md per the 2026-05-30T15:55Z entry. Once " - "posted, hardcode the (event_id, content, recipient_privkey, " - "expected_plaintext) fixture here and remove the skip — this test " - "is the byte-compat cross-test between our hand-rolled NIP-44 v2 " - "and the nostr-tools impl the ATM uses." - ) -) -def test_decrypts_bitspire_sample_event_from_coord_log(): - """Cross-impl byte-compatibility test. Bitspire generates one event on - their side (nostr-tools NIP-44 v2 impl), posts the raw event JSON + - a known throwaway recipient privkey to the coord log, and we assert - our `decrypt_from` recovers the expected `{"denominations": {...}}` - plaintext. - - If this passes, both impls produce byte-identical wire format. If it - fails, the spec ambiguity surfaces before either side ships — exactly - what bitspire flagged in the plan review (`07:55Z`). - """ - # event_b64_content = "..." # paste from coord log - # sender_pubkey_hex = "..." - # recipient_privkey_hex = "..." - # expected_plaintext = '{"denominations": {"20": {"position": 1, "count": 49}}}' - # recovered = decrypt_from(event_b64_content, recipient_privkey_hex, sender_pubkey_hex) - # assert recovered == expected_plaintext - raise NotImplementedError("fixture pending — see skip reason")