Pre-merge lint hygiene on the PR #30 touched files:
- `black` reformatted 9 files (cassette_transport, crud, models, tasks,
views_api, nip44, all 3 cassette test files, migrations). Cosmetic:
line lengths, trailing commas, multi-line argument layout.
- `ruff check --fix` cleared 176 of 202 errors auto-fixed. Mostly
`UP006` `typing.Optional` → `| None` modernization, `I001` import
sort order, `UP035` typing-extensions cleanup.
- Two new mypy regressions introduced by the migration commit dcb7de0
fixed:
- `crud.py:apply_bootstrap_state` — annotated `existing_first: dict
| None` on the dedup fetch.
- `tasks.py:_cassette_consumer_tick` — `# type: ignore[arg-type]` on
the `nostr_client.relay_manager.add_subscription` call; nostrclient's
upstream typing declares `list[str]` for filters but the actual
Nostr protocol takes `list[<filter-dict>]`. The runtime accepts it
(live smoke at 13:43Z dispatched `nip44_decrypt` cleanly through
this subscription); the typing mismatch is upstream's.
Remaining lint state, intentionally not addressed in this commit
(all pre-existing baseline, not regressions):
- 8 mypy errors in `calculations.py` + the unchanged-by-this-PR parts
of `crud.py` — pre-existing on v2-bitspire.
- 26 ruff style warnings: 14 are N805 false-positives on Pydantic
validators (`cls` first-arg is correct for `@validator`-decorated
methods); 4 are N818 exception-name-suffix preferences on my new
exception classes (renaming would touch many call sites; keep
`OperatorIdentityMissing` / `SignerUnavailable` / `RelayUnavailable`
/ `_NostrclientUnavailable` as-is for clarity); 5 are E501 line-too-
long on docstrings (the long lines are formatted for clarity);
1 RUF002 unicode-minus in a docstring.
Tests: 155 passed, 1 pre-existing async-plugin failure unchanged.
Live smoke (both publish + consume directions through the bunker)
unaffected — this is purely a code-style pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
485 lines
19 KiB
Python
485 lines
19 KiB
Python
"""
|
|
Tests for the cassette bootstrap consumer's transport-decrypt path
|
|
(`cassette_transport.decrypt_and_parse_state_event`) and d-tag construction.
|
|
|
|
Post-PR-#38 migration (2026-05-31): the function takes an Account +
|
|
NostrSigner instead of a raw privkey, and is async. Tests use:
|
|
- `_FakeBunkerSigner` — implements async `nip44_decrypt/encrypt` against
|
|
the hand-rolled `nip44` impl so tests don't need a live bunker.
|
|
Exercises the "happy" RemoteBunkerSigner path.
|
|
- `_FakeLocalSignerStub` — raises `SignerUnavailableError` from
|
|
`nip44_decrypt`, mimicking the post-#38 `LocalSigner` stub. Combined
|
|
with an Account that has `signer_type="LocalSigner"` + `prvkey`,
|
|
exercises the transitional fallback path in
|
|
`_nip44_decrypt_via_signer`.
|
|
- `_FakeRaisingSigner` — raises an arbitrary exception, used to
|
|
exercise the `NsecBunkerTimeoutError` → `CassetteEventTransientError`
|
|
and `NsecBunkerRpcError` → `CassetteEventDecodeError` mappings.
|
|
|
|
Coroutines are driven via `asyncio.run` so no pytest-asyncio config is
|
|
required. Matches the existing project test pattern (test_init.py
|
|
demonstrates the project lacks an asyncio plugin in CI; using asyncio.run
|
|
inside the test body sidesteps that without changing project config).
|
|
|
|
Full handler tests (the dispatch through verify_event →
|
|
get_machine_by_atm_pubkey_hex → apply_bootstrap_state) need a live LNbits
|
|
DB; smoke-tested manually via the dev container per the project
|
|
convention (see test_deposit_currency.py rationale).
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from types import SimpleNamespace
|
|
|
|
import coincurve
|
|
import pytest
|
|
from lnbits.core.services.nip46_bunker_client import (
|
|
NsecBunkerRpcError,
|
|
NsecBunkerTimeoutError,
|
|
)
|
|
from lnbits.core.signers.base import SignerUnavailableError
|
|
|
|
from ..cassette_transport import (
|
|
CassetteEventDecodeError,
|
|
CassetteEventTransientError,
|
|
_atm_hex_pubkey,
|
|
_config_d_tag,
|
|
_state_d_tag,
|
|
build_state_d_tags_for_machines,
|
|
decrypt_and_parse_state_event,
|
|
)
|
|
from ..models import Machine, PublishCassettesPayload
|
|
from ..nip44 import (
|
|
decrypt_from as _nip44_decrypt,
|
|
)
|
|
from ..nip44 import (
|
|
encrypt_with_conversation_key,
|
|
get_conversation_key,
|
|
)
|
|
|
|
# Canonical keys (integer 1 + integer 2, the paulmillr/nip44 reference pair).
|
|
_OP_SEC = "00" * 31 + "01"
|
|
_ATM_SEC = "00" * 31 + "02"
|
|
|
|
|
|
def _pub_hex(sec_hex: str) -> str:
|
|
return (
|
|
coincurve.PrivateKey(bytes.fromhex(sec_hex))
|
|
.public_key.format(compressed=True)[1:]
|
|
.hex()
|
|
)
|
|
|
|
|
|
_OP_PUB = _pub_hex(_OP_SEC)
|
|
_ATM_PUB = _pub_hex(_ATM_SEC)
|
|
|
|
|
|
# =============================================================================
|
|
# Fake signers + account-shaped helper
|
|
# =============================================================================
|
|
|
|
|
|
class _FakeBunkerSigner:
|
|
"""Test double for RemoteBunkerSigner — implements async nip44_*
|
|
against the hand-rolled `nip44` impl. Used to exercise the
|
|
"signer.nip44_decrypt returns successfully" path without standing up
|
|
a live bunker process."""
|
|
|
|
def __init__(self, privkey_hex: str):
|
|
self._privkey_hex = privkey_hex
|
|
|
|
@property
|
|
def pubkey(self) -> str:
|
|
return _pub_hex(self._privkey_hex)
|
|
|
|
def can_sign(self) -> bool:
|
|
return True
|
|
|
|
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
|
|
ck = get_conversation_key(self._privkey_hex, peer_pubkey_hex)
|
|
return encrypt_with_conversation_key(plaintext, ck)
|
|
|
|
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
|
|
return _nip44_decrypt(ciphertext, self._privkey_hex, peer_pubkey_hex)
|
|
|
|
|
|
class _FakeLocalSignerStub:
|
|
"""Test double for the post-#38 LocalSigner stub — its nip44_* always
|
|
raises SignerUnavailableError. Combined with an Account that has
|
|
`signer_type='LocalSigner'` + `prvkey` populated, exercises the
|
|
transitional fallback in `_nip44_decrypt_via_signer` (which catches
|
|
the SignerUnavailableError and falls back to direct-prvkey via the
|
|
hand-rolled impl)."""
|
|
|
|
def can_sign(self) -> bool:
|
|
return True
|
|
|
|
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
|
|
raise SignerUnavailableError("LocalSigner does not implement nip44_encrypt")
|
|
|
|
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
|
|
raise SignerUnavailableError("LocalSigner does not implement nip44_decrypt")
|
|
|
|
|
|
class _FakeRaisingSigner:
|
|
"""Test double that raises a configurable exception on nip44_decrypt.
|
|
Used to validate the bunker-error-mapping branches in
|
|
decrypt_and_parse_state_event."""
|
|
|
|
def __init__(self, exc):
|
|
self._exc = exc
|
|
|
|
def can_sign(self) -> bool:
|
|
return True
|
|
|
|
async def nip44_encrypt(self, plaintext: str, peer_pubkey_hex: str) -> str:
|
|
raise self._exc
|
|
|
|
async def nip44_decrypt(self, ciphertext: str, peer_pubkey_hex: str) -> str:
|
|
raise self._exc
|
|
|
|
|
|
def _fake_account(
|
|
signer_type: str = "RemoteBunkerSigner",
|
|
prvkey: str | None = None,
|
|
):
|
|
"""Account-shaped duck-typed object. decrypt_and_parse_state_event +
|
|
_nip44_decrypt_via_signer only read `.signer_type` and `.prvkey`; the
|
|
rest is irrelevant."""
|
|
return SimpleNamespace(
|
|
id="test-operator",
|
|
pubkey=_OP_PUB,
|
|
prvkey=prvkey,
|
|
signer_type=signer_type,
|
|
signer_config=None,
|
|
)
|
|
|
|
|
|
def _make_state_event(
|
|
payload: PublishCassettesPayload,
|
|
*,
|
|
atm_sec: str = _ATM_SEC,
|
|
op_pub: str = _OP_PUB,
|
|
atm_pub: str = _ATM_PUB,
|
|
event_id: str = "fake-event-id",
|
|
created_at: int = 1234567890,
|
|
) -> dict:
|
|
"""Build a state event the way bitspire's ATM publisher would. Skips
|
|
the sig-verify step (handler-level concern); the transport-decrypt
|
|
path doesn't depend on sig validity, only on conversation-key match."""
|
|
plaintext = json.dumps(payload.to_wire_dict(), separators=(",", ":"))
|
|
ck = get_conversation_key(atm_sec, op_pub)
|
|
content = encrypt_with_conversation_key(plaintext, ck)
|
|
return {
|
|
"kind": 30078,
|
|
"pubkey": atm_pub,
|
|
"content": content,
|
|
"tags": [
|
|
["d", f"bitspire-cassettes-state:{atm_pub}"],
|
|
["p", op_pub],
|
|
],
|
|
"created_at": created_at,
|
|
"id": event_id,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# decrypt_and_parse_state_event — RemoteBunkerSigner happy path
|
|
# =============================================================================
|
|
|
|
|
|
class TestDecryptViaBunkerSigner:
|
|
"""The expected production path post-#38: operator account is bunker-
|
|
backed, signer.nip44_decrypt routes through the bunker (mocked here
|
|
via _FakeBunkerSigner), and the wire payload round-trips cleanly."""
|
|
|
|
def test_happy_path_recovers_positions_keyed_payload(self):
|
|
payload = PublishCassettesPayload(
|
|
positions={
|
|
"1": {"denomination": 20, "count": 49},
|
|
"2": {"denomination": 50, "count": 100},
|
|
}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account(signer_type="RemoteBunkerSigner")
|
|
signer = _FakeBunkerSigner(_OP_SEC)
|
|
|
|
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
assert sorted(recovered.positions.keys()) == [1, 2]
|
|
assert recovered.positions[1].denomination == 20
|
|
assert recovered.positions[1].count == 49
|
|
assert recovered.positions[2].denomination == 50
|
|
assert recovered.positions[2].count == 100
|
|
|
|
def test_round_trips_multiple_same_denomination(self):
|
|
"""v1.1 operational case (coord-log 2026-05-30T18:45Z) — multiple
|
|
bays carrying the same denomination."""
|
|
payload = PublishCassettesPayload(
|
|
positions={
|
|
"1": {"denomination": 20, "count": 100},
|
|
"2": {"denomination": 20, "count": 100},
|
|
"3": {"denomination": 20, "count": 100},
|
|
"4": {"denomination": 20, "count": 100},
|
|
}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account()
|
|
signer = _FakeBunkerSigner(_OP_SEC)
|
|
|
|
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
assert len(recovered.positions) == 4
|
|
for pos in (1, 2, 3, 4):
|
|
assert recovered.positions[pos].denomination == 20
|
|
assert recovered.positions[pos].count == 100
|
|
|
|
|
|
# =============================================================================
|
|
# decrypt_and_parse_state_event — LocalSigner transitional fallback
|
|
# =============================================================================
|
|
|
|
|
|
class TestDecryptViaLocalSignerFallback:
|
|
"""When the operator account is still on LocalSigner (pre-bunker
|
|
migration), the LocalSigner stub raises SignerUnavailableError from
|
|
nip44_decrypt. `_nip44_decrypt_via_signer` catches that and falls
|
|
back to the hand-rolled impl using `account.prvkey`. Same wire
|
|
output; transitional until S7 retires LocalSigner accounts entirely."""
|
|
|
|
def test_localsigner_with_prvkey_decrypts_via_fallback(self):
|
|
payload = PublishCassettesPayload(
|
|
positions={"1": {"denomination": 20, "count": 49}}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account(signer_type="LocalSigner", prvkey=_OP_SEC)
|
|
signer = _FakeLocalSignerStub()
|
|
|
|
recovered = asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
assert recovered.positions[1].denomination == 20
|
|
assert recovered.positions[1].count == 49
|
|
|
|
def test_localsigner_without_prvkey_raises_decode_error(self):
|
|
"""A LocalSigner account whose prvkey field is None (impossible
|
|
in practice — LocalSigner requires prvkey at provision time, but
|
|
defensive in case the row is corrupt) should surface as a
|
|
decode error, not silently succeed."""
|
|
payload = PublishCassettesPayload(
|
|
positions={"1": {"denomination": 20, "count": 49}}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account(signer_type="LocalSigner", prvkey=None)
|
|
signer = _FakeLocalSignerStub()
|
|
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
def test_clientonlysigner_raises_decode_error(self):
|
|
"""ClientSideOnlySigner has no server-side decrypt path at all;
|
|
falling back to direct-prvkey is also impossible (no prvkey).
|
|
Surface as a decode error so the consumer logs + skips."""
|
|
payload = PublishCassettesPayload(
|
|
positions={"1": {"denomination": 20, "count": 49}}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account(signer_type="ClientSideOnlySigner", prvkey=None)
|
|
signer = _FakeLocalSignerStub() # behaves the same way: raises
|
|
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
|
|
# =============================================================================
|
|
# decrypt_and_parse_state_event — bunker error mapping
|
|
# =============================================================================
|
|
|
|
|
|
class TestBunkerErrorMapping:
|
|
"""The post-#38 error hierarchy splits transient (bunker partitioned)
|
|
from terminal (bunker policy reject, MAC failure). Consumer behaves
|
|
differently — transient retries, terminal logs + skips. Validate the
|
|
mapping from NsecBunker* exceptions to our CassetteEvent* types."""
|
|
|
|
def test_timeout_maps_to_transient_error(self):
|
|
"""Bunker unreachable → NsecBunkerTimeoutError → caller-visible
|
|
CassetteEventTransientError. Consumer treats this as retry-
|
|
eligible (don't advance state_event_id)."""
|
|
payload = PublishCassettesPayload(
|
|
positions={"1": {"denomination": 20, "count": 49}}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account()
|
|
signer = _FakeRaisingSigner(NsecBunkerTimeoutError("bunker unreachable"))
|
|
with pytest.raises(CassetteEventTransientError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
def test_rpc_reject_maps_to_decode_error(self):
|
|
"""Bunker rejected the RPC (policy / MAC / config) →
|
|
NsecBunkerRpcError → caller-visible CassetteEventDecodeError.
|
|
Terminal — retrying won't help."""
|
|
payload = PublishCassettesPayload(
|
|
positions={"1": {"denomination": 20, "count": 49}}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account()
|
|
signer = _FakeRaisingSigner(
|
|
NsecBunkerRpcError("bunker policy reject: kind 30078 not authorised")
|
|
)
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
|
|
# =============================================================================
|
|
# decrypt_and_parse_state_event — payload + envelope validation
|
|
# =============================================================================
|
|
|
|
|
|
class TestPayloadValidation:
|
|
"""Errors that originate at the parse layer (post-decrypt), not the
|
|
signer. Same set as pre-migration — covered through the bunker-signer
|
|
path since LocalSigner is going away."""
|
|
|
|
def test_tampered_content_rejected(self):
|
|
payload = PublishCassettesPayload(
|
|
positions={"1": {"denomination": 20, "count": 49}}
|
|
)
|
|
event = _make_state_event(payload)
|
|
event["content"] = event["content"][:-2] + "AA"
|
|
account = _fake_account()
|
|
signer = _FakeBunkerSigner(_OP_SEC)
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
def test_wrong_signer_privkey_rejected(self):
|
|
"""Wrong privkey on the signer → wrong conversation key → MAC
|
|
verification fails inside nip44_decrypt → surfaces as decode
|
|
error (via the hand-rolled Nip44Error since this is the fake
|
|
bunker signer; in production the bunker would raise
|
|
NsecBunkerRpcError which also maps to CassetteEventDecodeError)."""
|
|
payload = PublishCassettesPayload(
|
|
positions={"1": {"denomination": 20, "count": 49}}
|
|
)
|
|
event = _make_state_event(payload)
|
|
account = _fake_account()
|
|
wrong_sec = "00" * 31 + "03"
|
|
signer = _FakeBunkerSigner(wrong_sec)
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
def test_missing_content_rejected(self):
|
|
event = _make_state_event(
|
|
PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}})
|
|
)
|
|
del event["content"]
|
|
account = _fake_account()
|
|
signer = _FakeBunkerSigner(_OP_SEC)
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
def test_missing_pubkey_rejected(self):
|
|
event = _make_state_event(
|
|
PublishCassettesPayload(positions={"1": {"denomination": 20, "count": 49}})
|
|
)
|
|
del event["pubkey"]
|
|
account = _fake_account()
|
|
signer = _FakeBunkerSigner(_OP_SEC)
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
def test_decrypted_garbage_json_rejected(self):
|
|
"""If plaintext decrypts cleanly but isn't valid JSON, surface
|
|
as decode error (not crash the consumer loop)."""
|
|
ck = get_conversation_key(_ATM_SEC, _OP_PUB)
|
|
event = {
|
|
"kind": 30078,
|
|
"pubkey": _ATM_PUB,
|
|
"content": encrypt_with_conversation_key("definitely not json", ck),
|
|
"tags": [],
|
|
"created_at": 0,
|
|
"id": "x",
|
|
}
|
|
account = _fake_account()
|
|
signer = _FakeBunkerSigner(_OP_SEC)
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
def test_decrypted_wrong_shape_rejected(self):
|
|
"""Well-formed JSON but missing 'positions' → payload-shape
|
|
validation failure."""
|
|
ck = get_conversation_key(_ATM_SEC, _OP_PUB)
|
|
event = {
|
|
"kind": 30078,
|
|
"pubkey": _ATM_PUB,
|
|
"content": encrypt_with_conversation_key('{"wrong_field": 42}', ck),
|
|
"tags": [],
|
|
"created_at": 0,
|
|
"id": "x",
|
|
}
|
|
account = _fake_account()
|
|
signer = _FakeBunkerSigner(_OP_SEC)
|
|
with pytest.raises(CassetteEventDecodeError):
|
|
asyncio.run(decrypt_and_parse_state_event(event, account, signer))
|
|
|
|
|
|
# =============================================================================
|
|
# d-tag construction — unchanged by the migration, kept as regression guard
|
|
# =============================================================================
|
|
|
|
|
|
class TestDTagConstruction:
|
|
"""The `<m>` placeholder in d-tags = ATM hex pubkey (load-bearing per
|
|
coord-log 2026-05-30T11:50Z). These tests pin the canonical
|
|
substitution so a refactor can't silently break wire compatibility."""
|
|
|
|
def _machine(self, npub: str, id_: str = "m1") -> Machine:
|
|
from datetime import datetime, timezone
|
|
|
|
now = datetime.now(timezone.utc)
|
|
return Machine(
|
|
id=id_,
|
|
operator_user_id="op1",
|
|
machine_npub=npub,
|
|
wallet_id="w1",
|
|
name=None,
|
|
location=None,
|
|
fiat_code="EUR",
|
|
is_active=True,
|
|
created_at=now,
|
|
updated_at=now,
|
|
)
|
|
|
|
def test_atm_hex_pubkey_from_hex_storage(self):
|
|
assert _atm_hex_pubkey(self._machine(_ATM_PUB)) == _ATM_PUB
|
|
|
|
def test_atm_hex_pubkey_lowercases_uppercase_hex(self):
|
|
assert _atm_hex_pubkey(self._machine(_ATM_PUB.upper())) == _ATM_PUB
|
|
|
|
def test_atm_hex_pubkey_canonicalises_bech32_to_hex(self):
|
|
from lnbits.utils.nostr import hex_to_npub
|
|
|
|
npub_bech32 = hex_to_npub(_ATM_PUB)
|
|
assert _atm_hex_pubkey(self._machine(npub_bech32)) == _ATM_PUB
|
|
|
|
def test_config_d_tag_uses_hex_pubkey_not_id(self):
|
|
"""REGRESSION GUARD: d-tag must contain the ATM hex pubkey, NOT
|
|
the internal machine UUID."""
|
|
m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey")
|
|
d_tag = _config_d_tag(_atm_hex_pubkey(m))
|
|
assert d_tag == f"bitspire-cassettes:{_ATM_PUB}"
|
|
assert "some-uuid" not in d_tag
|
|
|
|
def test_state_d_tag_uses_hex_pubkey_not_id(self):
|
|
m = self._machine(_ATM_PUB, id_="some-uuid-not-the-pubkey")
|
|
d_tag = _state_d_tag(_atm_hex_pubkey(m))
|
|
assert d_tag == f"bitspire-cassettes-state:{_ATM_PUB}"
|
|
assert "some-uuid" not in d_tag
|
|
|
|
def test_build_state_d_tags_for_machines(self):
|
|
atm2_pub = _pub_hex("00" * 31 + "03")
|
|
machines = [
|
|
self._machine(_ATM_PUB, id_="m1"),
|
|
self._machine(atm2_pub, id_="m2"),
|
|
]
|
|
tags = build_state_d_tags_for_machines(machines)
|
|
assert tags == [
|
|
f"bitspire-cassettes-state:{_ATM_PUB}",
|
|
f"bitspire-cassettes-state:{atm2_pub}",
|
|
]
|