spirekeeper/tests/test_cassette_state_consumer.py
Padreug d448fab0d2 chore(v2): lint pass — black + ruff auto-fix + mypy regressions (#29 v1.1)
Pre-merge lint hygiene on the PR #30 touched files:

- `black` reformatted 9 files (cassette_transport, crud, models, tasks,
  views_api, nip44, all 3 cassette test files, migrations). Cosmetic:
  line lengths, trailing commas, multi-line argument layout.
- `ruff check --fix` cleared 176 of 202 errors auto-fixed. Mostly
  `UP006` `typing.Optional` → `| None` modernization, `I001` import
  sort order, `UP035` typing-extensions cleanup.
- Two new mypy regressions introduced by the migration commit dcb7de0
  fixed:
  - `crud.py:apply_bootstrap_state` — annotated `existing_first: dict
    | None` on the dedup fetch.
  - `tasks.py:_cassette_consumer_tick` — `# type: ignore[arg-type]` on
    the `nostr_client.relay_manager.add_subscription` call; nostrclient's
    upstream typing declares `list[str]` for filters but the actual
    Nostr protocol takes `list[<filter-dict>]`. The runtime accepts it
    (live smoke at 13:43Z dispatched `nip44_decrypt` cleanly through
    this subscription); the typing mismatch is upstream's.

Remaining lint state, intentionally not addressed in this commit
(all pre-existing baseline, not regressions):
- 8 mypy errors in `calculations.py` + the unchanged-by-this-PR parts
  of `crud.py` — pre-existing on v2-bitspire.
- 26 ruff style warnings: 14 are N805 false-positives on Pydantic
  validators (`cls` first-arg is correct for `@validator`-decorated
  methods); 4 are N818 exception-name-suffix preferences on my new
  exception classes (renaming would touch many call sites; keep
  `OperatorIdentityMissing` / `SignerUnavailable` / `RelayUnavailable`
  / `_NostrclientUnavailable` as-is for clarity); 5 are E501 line-too-
  long on docstrings (the long lines are formatted for clarity);
  1 RUF002 unicode-minus in a docstring.

Tests: 155 passed, 1 pre-existing async-plugin failure unchanged.
Live smoke (both publish + consume directions through the bunker)
unaffected — this is purely a code-style pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 15:50:14 +02:00

485 lines
19 KiB
Python

"""
Tests for the cassette bootstrap consumer's transport-decrypt path
(`cassette_transport.decrypt_and_parse_state_event`) and d-tag construction.
Post-PR-#38 migration (2026-05-31): the function takes an Account +
NostrSigner instead of a raw privkey, and is async. Tests use:
- `_FakeBunkerSigner` — implements async `nip44_decrypt/encrypt` against
the hand-rolled `nip44` impl so tests don't need a live bunker.
Exercises the "happy" RemoteBunkerSigner path.
- `_FakeLocalSignerStub` — raises `SignerUnavailableError` from
`nip44_decrypt`, mimicking the post-#38 `LocalSigner` stub. Combined
with an Account that has `signer_type="LocalSigner"` + `prvkey`,
exercises the transitional fallback path in
`_nip44_decrypt_via_signer`.
- `_FakeRaisingSigner` — raises an arbitrary exception, used to
exercise the `NsecBunkerTimeoutError` → `CassetteEventTransientError`
and `NsecBunkerRpcError` → `CassetteEventDecodeError` mappings.
Coroutines are driven via `asyncio.run` so no pytest-asyncio config is
required. Matches the existing project test pattern (test_init.py
demonstrates the project lacks an asyncio plugin in CI; using asyncio.run
inside the test body sidesteps that without changing project config).
Full handler tests (the dispatch through verify_event →
get_machine_by_atm_pubkey_hex → apply_bootstrap_state) need a live LNbits
DB; smoke-tested manually via the dev container per the project
convention (see test_deposit_currency.py rationale).
"""
import asyncio
import json
from types import SimpleNamespace
import coincurve
import pytest
from lnbits.core.services.nip46_bunker_client import (
NsecBunkerRpcError,
NsecBunkerTimeoutError,
)
from lnbits.core.signers.base import SignerUnavailableError
from ..cassette_transport import (
CassetteEventDecodeError,
CassetteEventTransientError,
_atm_hex_pubkey,
_config_d_tag,
_state_d_tag,
build_state_d_tags_for_machines,
decrypt_and_parse_state_event,
)
from ..models import Machine, PublishCassettesPayload
from ..nip44 import (
decrypt_from as _nip44_decrypt,
)
from ..nip44 import (
encrypt_with_conversation_key,
get_conversation_key,
)
# Canonical keys (integer 1 + integer 2, the paulmillr/nip44 reference pair).
_OP_SEC = "00" * 31 + "01"
_ATM_SEC = "00" * 31 + "02"
def _pub_hex(sec_hex: str) -> str:
return (
coincurve.PrivateKey(bytes.fromhex(sec_hex))
.public_key.format(compressed=True)[1:]
.hex()
)
_OP_PUB = _pub_hex(_OP_SEC)
_ATM_PUB = _pub_hex(_ATM_SEC)
# =============================================================================
# Fake signers + account-shaped helper
# =============================================================================
class _FakeBunkerSigner:
"""Test double for RemoteBunkerSigner — implements async nip44_*
against the hand-rolled `nip44` impl. Used to exercise the
"signer.nip44_decrypt returns successfully" path without standing up
a live bunker process."""
def __init__(self, privkey_hex: str):
self._privkey_hex = privkey_hex
@property
def pubkey(self) -> str:
return _pub_hex(self._privkey_hex)
def can_sign(self) -> bool:
return True
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
ck = get_conversation_key(self._privkey_hex, peer_pubkey_hex)
return encrypt_with_conversation_key(plaintext, ck)
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
return _nip44_decrypt(ciphertext, self._privkey_hex, peer_pubkey_hex)
class _FakeLocalSignerStub:
"""Test double for the post-#38 LocalSigner stub — its nip44_* always
raises SignerUnavailableError. Combined with an Account that has
`signer_type='LocalSigner'` + `prvkey` populated, exercises the
transitional fallback in `_nip44_decrypt_via_signer` (which catches
the SignerUnavailableError and falls back to direct-prvkey via the
hand-rolled impl)."""
def can_sign(self) -> bool:
return True
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
raise SignerUnavailableError("LocalSigner does not implement nip44_encrypt")
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
raise SignerUnavailableError("LocalSigner does not implement nip44_decrypt")
class _FakeRaisingSigner:
"""Test double that raises a configurable exception on nip44_decrypt.
Used to validate the bunker-error-mapping branches in
decrypt_and_parse_state_event."""
def __init__(self, exc):
self._exc = exc
def can_sign(self) -> bool:
return True
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
raise self._exc
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
raise self._exc
def _fake_account(
signer_type: str = "RemoteBunkerSigner",
prvkey: str | None = None,
):
"""Account-shaped duck-typed object. decrypt_and_parse_state_event +
_nip44_decrypt_via_signer only read `.signer_type` and `.prvkey`; the
rest is irrelevant."""
return SimpleNamespace(
id="test-operator",
pubkey=_OP_PUB,
prvkey=prvkey,
signer_type=signer_type,
signer_config=None,
)
def _make_state_event(
payload: PublishCassettesPayload,
*,
atm_sec: str = _ATM_SEC,
op_pub: str = _OP_PUB,
atm_pub: str = _ATM_PUB,
event_id: str = "fake-event-id",
created_at: int = 1234567890,
) -> dict:
"""Build a state event the way bitspire's ATM publisher would. Skips
the sig-verify step (handler-level concern); the transport-decrypt
path doesn't depend on sig validity, only on conversation-key match."""
plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":"))
ck = get_conversation_key(atm_sec, op_pub)
content = encrypt_with_conversation_key(plaintext, ck)
return {
"kind": 30078,
"pubkey": atm_pub,
"content": content,
"tags": [
["d", f"bitspire-cassettes-state:{atm_pub}"],
["p", op_pub],
],
"created_at": created_at,
"id": event_id,
}
# =============================================================================
# decrypt_and_parse_state_event — RemoteBunkerSigner happy path
# =============================================================================
class TestDecryptViaBunkerSigner:
"""The expected production path post-#38: operator account is bunker-
backed, signer.nip44_decrypt routes through the bunker (mocked here
via _FakeBunkerSigner), and the wire payload round-trips cleanly."""
def test_happy_path_recovers_positions_keyed_payload(self):
payload = PublishCassettesPayload(
positions={
"1": {"denomination": 20, "count": 49},
"2": {"denomination": 50, "count": 100},
}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="RemoteBunkerSigner")
signer = _FakeBunkerSigner(_OP_SEC)
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
assert sorted(recovered.positions.keys()) == [1, 2]
assert recovered.positions[1].denomination == 20
assert recovered.positions[1].count == 49
assert recovered.positions[2].denomination == 50
assert recovered.positions[2].count == 100
def test_round_trips_multiple_same_denomination(self):
"""v1.1 operational case (coord-log 2026-05-30T18:45Z) — multiple
bays carrying the same denomination."""
payload = PublishCassettesPayload(
positions={
"1": {"denomination": 20, "count": 100},
"2": {"denomination": 20, "count": 100},
"3": {"denomination": 20, "count": 100},
"4": {"denomination": 20, "count": 100},
}
)
event = _make_state_event(payload)
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
assert len(recovered.positions) == 4
for pos in (1, 2, 3, 4):
assert recovered.positions[pos].denomination == 20
assert recovered.positions[pos].count == 100
# =============================================================================
# decrypt_and_parse_state_event — LocalSigner transitional fallback
# =============================================================================
class TestDecryptViaLocalSignerFallback:
"""When the operator account is still on LocalSigner (pre-bunker
migration), the LocalSigner stub raises SignerUnavailableError from
nip44_decrypt. `_nip44_decrypt_via_signer` catches that and falls
back to the hand-rolled impl using `account.prvkey`. Same wire
output; transitional until S7 retires LocalSigner accounts entirely."""
def test_localsigner_with_prvkey_decrypts_via_fallback(self):
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="LocalSigner", prvkey=_OP_SEC)
signer = _FakeLocalSignerStub()
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
assert recovered.positions[1].denomination == 20
assert recovered.positions[1].count == 49
def test_localsigner_without_prvkey_raises_decode_error(self):
"""A LocalSigner account whose prvkey field is None (impossible
in practice — LocalSigner requires prvkey at provision time, but
defensive in case the row is corrupt) should surface as a
decode error, not silently succeed."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="LocalSigner", prvkey=None)
signer = _FakeLocalSignerStub()
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_clientonlysigner_raises_decode_error(self):
"""ClientSideOnlySigner has no server-side decrypt path at all;
falling back to direct-prvkey is also impossible (no prvkey).
Surface as a decode error so the consumer logs + skips."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account(signer_type="ClientSideOnlySigner", prvkey=None)
signer = _FakeLocalSignerStub() # behaves the same way: raises
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
# =============================================================================
# decrypt_and_parse_state_event — bunker error mapping
# =============================================================================
class TestBunkerErrorMapping:
"""The post-#38 error hierarchy splits transient (bunker partitioned)
from terminal (bunker policy reject, MAC failure). Consumer behaves
differently — transient retries, terminal logs + skips. Validate the
mapping from NsecBunker* exceptions to our CassetteEvent* types."""
def test_timeout_maps_to_transient_error(self):
"""Bunker unreachable → NsecBunkerTimeoutError → caller-visible
CassetteEventTransientError. Consumer treats this as retry-
eligible (don't advance state_event_id)."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account()
signer = _FakeRaisingSigner(NsecBunkerTimeoutError("bunker unreachable"))
with pytest.raises(CassetteEventTransientError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_rpc_reject_maps_to_decode_error(self):
"""Bunker rejected the RPC (policy / MAC / config) →
NsecBunkerRpcError → caller-visible CassetteEventDecodeError.
Terminal — retrying won't help."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account()
signer = _FakeRaisingSigner(
NsecBunkerRpcError("bunker policy reject: kind 30078 not authorised")
)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
# =============================================================================
# decrypt_and_parse_state_event — payload + envelope validation
# =============================================================================
class TestPayloadValidation:
"""Errors that originate at the parse layer (post-decrypt), not the
signer. Same set as pre-migration — covered through the bunker-signer
path since LocalSigner is going away."""
def test_tampered_content_rejected(self):
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
event["content"] = event["content"][:-2] + "AA"
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_wrong_signer_privkey_rejected(self):
"""Wrong privkey on the signer → wrong conversation key → MAC
verification fails inside nip44_decrypt → surfaces as decode
error (via the hand-rolled Nip44Error since this is the fake
bunker signer; in production the bunker would raise
NsecBunkerRpcError which also maps to CassetteEventDecodeError)."""
payload = PublishCassettesPayload(
positions={"1": {"denomination": 20, "count": 49}}
)
event = _make_state_event(payload)
account = _fake_account()
wrong_sec = "00" * 31 + "03"
signer = _FakeBunkerSigner(wrong_sec)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_missing_content_rejected(self):
event = _make_state_event(
PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}})
)
del event["content"]
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_missing_pubkey_rejected(self):
event = _make_state_event(
PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}})
)
del event["pubkey"]
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_decrypted_garbage_json_rejected(self):
"""If plaintext decrypts cleanly but isn't valid JSON, surface
as decode error (not crash the consumer loop)."""
ck = get_conversation_key(_ATM_SEC, _OP_PUB)
event = {
"kind": 30078,
"pubkey": _ATM_PUB,
"content": encrypt_with_conversation_key("definitely not json", ck),
"tags": [],
"created_at": 0,
"id": "x",
}
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
def test_decrypted_wrong_shape_rejected(self):
"""Well-formed JSON but missing 'positions' → payload-shape
validation failure."""
ck = get_conversation_key(_ATM_SEC, _OP_PUB)
event = {
"kind": 30078,
"pubkey": _ATM_PUB,
"content": encrypt_with_conversation_key('{"wrong_field": 42}', ck),
"tags": [],
"created_at": 0,
"id": "x",
}
account = _fake_account()
signer = _FakeBunkerSigner(_OP_SEC)
with pytest.raises(CassetteEventDecodeError):
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
# =============================================================================
# d-tag construction — unchanged by the migration, kept as regression guard
# =============================================================================
class TestDTagConstruction:
"""The `<m>` placeholder in d-tags = ATM hex pubkey (load-bearing per
coord-log 2026-05-30T11:50Z). These tests pin the canonical
substitution so a refactor can't silently break wire compatibility."""
def _machine(self, npub: str, id_: str = "m1") -> Machine:
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
return Machine(
id=id_,
operator_user_id="op1",
machine_npub=npub,
wallet_id="w1",
name=None,
location=None,
fiat_code="EUR",
is_active=True,
created_at=now,
updated_at=now,
)
def test_atm_hex_pubkey_from_hex_storage(self):
assert _atm_hex_pubkey(self._machine(_ATM_PUB)) == _ATM_PUB
def test_atm_hex_pubkey_lowercases_uppercase_hex(self):
assert _atm_hex_pubkey(self._machine(_ATM_PUB.upper())) == _ATM_PUB
def test_atm_hex_pubkey_canonicalises_bech32_to_hex(self):
from lnbits.utils.nostr import hex_to_npub
npub_bech32 = hex_to_npub(_ATM_PUB)
assert _atm_hex_pubkey(self._machine(npub_bech32)) == _ATM_PUB
def test_config_d_tag_uses_hex_pubkey_not_id(self):
"""REGRESSION GUARD: d-tag must contain the ATM hex pubkey, NOT
the internal machine UUID."""
m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey")
d_tag = _config_d_tag(_atm_hex_pubkey(m))
assert d_tag == f"bitspire-cassettes:{_ATM_PUB}"
assert "some-uuid" not in d_tag
def test_state_d_tag_uses_hex_pubkey_not_id(self):
m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey")
d_tag = _state_d_tag(_atm_hex_pubkey(m))
assert d_tag == f"bitspire-cassettes-state:{_ATM_PUB}"
assert "some-uuid" not in d_tag
def test_build_state_d_tags_for_machines(self):
atm2_pub = _pub_hex("00" * 31 + "03")
machines = [
self._machine(_ATM_PUB, id_="m1"),
self._machine(atm2_pub, id_="m2"),
]
tags = build_state_d_tags_for_machines(machines)
assert tags == [
f"bitspire-cassettes-state:{_ATM_PUB}",
f"bitspire-cassettes-state:{atm2_pub}",
]