Pre-merge lint hygiene on the PR #30 touched files:
- `black` reformatted 9 files (cassette_transport, crud, models, tasks,
views_api, nip44, all 3 cassette test files, migrations). Cosmetic:
line lengths, trailing commas, multi-line argument layout.
- `ruff check --fix` cleared 176 of 202 errors auto-fixed. Mostly
`UP006` `typing.Optional` → `| None` modernization, `I001` import
sort order, `UP035` typing-extensions cleanup.
- Two new mypy regressions introduced by the migration commit dcb7de0
fixed:
- `crud.py:apply_bootstrap_state` — annotated `existing_first: dict
| None` on the dedup fetch.
- `tasks.py:_cassette_consumer_tick` — `# type: ignore[arg-type]` on
the `nostr_client.relay_manager.add_subscription` call; nostrclient's
upstream typing declares `list[str]` for filters but the actual
Nostr protocol takes `list[<filter-dict>]`. The runtime accepts it
(live smoke at 13:43Z dispatched `nip44_decrypt` cleanly through
this subscription); the typing mismatch is upstream's.
Remaining lint state, intentionally not addressed in this commit
(all pre-existing baseline, not regressions):
- 8 mypy errors in `calculations.py` + the unchanged-by-this-PR parts
of `crud.py` — pre-existing on v2-bitspire.
- 26 ruff style warnings: 14 are N805 false-positives on Pydantic
validators (`cls` first-arg is correct for `@validator`-decorated
methods); 4 are N818 exception-name-suffix preferences on my new
exception classes (renaming would touch many call sites; keep
`OperatorIdentityMissing` / `SignerUnavailable` / `RelayUnavailable`
/ `_NostrclientUnavailable` as-is for clarity); 5 are E501 line-too-
long on docstrings (the long lines are formatted for clarity);
1 RUF002 unicode-minus in a docstring.
Tests: 155 passed, 1 pre-existing async-plugin failure unchanged.
Live smoke (both publish + consume directions through the bunker)
unaffected — this is purely a code-style pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
390 lines
16 KiB
Python
390 lines
16 KiB
Python
"""
|
|
Tests for the hand-rolled NIP-44 v2 implementation in `nip44.py`.
|
|
|
|
Three layers of validation, ordered by trust:
|
|
1. Pinned reference vector from the canonical paulmillr/nip44 test suite —
|
|
the conversation_key for (sec=1, sec=2) is widely-published as
|
|
c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d. If
|
|
our get_conversation_key() ever drifts from that value, the impl is
|
|
broken at the key-derivation layer.
|
|
2. Round-trip + tamper detection — verifies the encrypt/decrypt loop
|
|
under random nonces, catches HMAC + version + padding tampering.
|
|
3. Cross-test (TBD) — bitspire will post one sample event encrypted on
|
|
their nostr-tools side to the coord log; test_decrypts_bitspire_sample
|
|
wires it as a fixture and asserts byte-compatibility with the
|
|
nostr-tools NIP-44 v2 impl. Placeholder stub until the sample lands.
|
|
"""
|
|
|
|
import base64
|
|
|
|
import coincurve
|
|
import pytest
|
|
|
|
from ..nip44 import (
|
|
Nip44LengthError,
|
|
Nip44MacError,
|
|
Nip44VersionError,
|
|
_calc_padded_len,
|
|
decrypt_from,
|
|
decrypt_with_conversation_key,
|
|
encrypt_for,
|
|
encrypt_with_conversation_key,
|
|
get_conversation_key,
|
|
)
|
|
|
|
|
|
# Helper: derive a compressed-x-coord pubkey hex string from a secret hex.
|
|
def _pub_hex(sec_hex: str) -> str:
|
|
return (
|
|
coincurve.PrivateKey(bytes.fromhex(sec_hex))
|
|
.public_key.format(compressed=True)[1:]
|
|
.hex()
|
|
)
|
|
|
|
|
|
# Canonical test keys widely used across NIP-44 reference vectors.
|
|
_SEC_ONE = "00" * 31 + "01" # integer 1
|
|
_SEC_TWO = "00" * 31 + "02" # integer 2
|
|
_PUB_ONE = _pub_hex(_SEC_ONE)
|
|
_PUB_TWO = _pub_hex(_SEC_TWO)
|
|
|
|
|
|
# =============================================================================
|
|
# Layer 1 — pinned reference vector (paulmillr/nip44)
|
|
# =============================================================================
|
|
|
|
|
|
class TestConversationKeyReferenceVector:
|
|
"""Pinned reference vector from the canonical NIP-44 v2 test suite
|
|
(paulmillr/nip44). If get_conversation_key drifts from this value we
|
|
have a key-derivation regression — fail loudly."""
|
|
|
|
REFERENCE_CK_HEX = (
|
|
"c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d"
|
|
)
|
|
|
|
def test_sec_one_pub_two(self):
|
|
ck = get_conversation_key(_SEC_ONE, _PUB_TWO)
|
|
assert ck.hex() == self.REFERENCE_CK_HEX
|
|
|
|
def test_sec_two_pub_one_is_symmetric(self):
|
|
"""Conversation key is symmetric: ck(privA, pubB) == ck(privB, pubA).
|
|
Both sides of a NIP-44 conversation derive the identical PRK; this
|
|
is what lets the recipient decrypt with their own privkey + the
|
|
sender's pubkey."""
|
|
ck_ab = get_conversation_key(_SEC_ONE, _PUB_TWO)
|
|
ck_ba = get_conversation_key(_SEC_TWO, _PUB_ONE)
|
|
assert ck_ab == ck_ba
|
|
|
|
|
|
# =============================================================================
|
|
# Layer 2 — round-trip + tamper detection
|
|
# =============================================================================
|
|
|
|
|
|
class TestRoundTrip:
|
|
"""Encrypt then decrypt under the high-level pair-keyed API."""
|
|
|
|
@pytest.mark.parametrize(
|
|
"plaintext",
|
|
[
|
|
"a", # 1 byte (minimum)
|
|
"hello, nip44 v2", # short
|
|
"x" * 32, # exactly the small-payload boundary
|
|
"x" * 33, # just over
|
|
"y" * 1000, # medium
|
|
"z" * 5000, # large
|
|
'{"denominations": {"20": {"position": 1, "count": 49}}}', # realistic
|
|
],
|
|
)
|
|
def test_round_trip_various_lengths(self, plaintext):
|
|
payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
recovered = decrypt_from(payload, _SEC_TWO, _PUB_ONE)
|
|
assert recovered == plaintext
|
|
|
|
def test_payloads_are_unique_under_random_nonce(self):
|
|
"""Same plaintext + same key pair should produce different payloads
|
|
each time because the nonce is fresh CSPRNG bytes. Catches a
|
|
regression where the nonce is accidentally pinned."""
|
|
plaintext = "the same message"
|
|
p1 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
p2 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
assert p1 != p2
|
|
assert decrypt_from(p1, _SEC_TWO, _PUB_ONE) == plaintext
|
|
assert decrypt_from(p2, _SEC_TWO, _PUB_ONE) == plaintext
|
|
|
|
def test_pinned_nonce_is_deterministic(self):
|
|
"""Same plaintext + same key pair + same nonce = byte-identical
|
|
payload. Regression-locks the chacha20 + hmac chain."""
|
|
ck = get_conversation_key(_SEC_ONE, _PUB_TWO)
|
|
nonce = bytes(32) # 32 zero bytes
|
|
p1 = encrypt_with_conversation_key("a", ck, nonce=nonce)
|
|
p2 = encrypt_with_conversation_key("a", ck, nonce=nonce)
|
|
assert p1 == p2
|
|
assert decrypt_with_conversation_key(p1, ck) == "a"
|
|
|
|
|
|
class TestTamperDetection:
|
|
"""HMAC-SHA256 verification catches tampered envelopes. The cryptographic
|
|
construction depends on this — if HMAC verification ever no-ops, a
|
|
relay-MITM could forge ATM state events."""
|
|
|
|
def _payload(self) -> str:
|
|
return encrypt_for("important message", _SEC_ONE, _PUB_TWO)
|
|
|
|
def test_flipped_mac_byte_rejected(self):
|
|
raw = bytearray(base64.b64decode(self._payload()))
|
|
raw[-1] ^= 0x01
|
|
tampered = base64.b64encode(bytes(raw)).decode("ascii")
|
|
with pytest.raises(Nip44MacError):
|
|
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
|
|
|
|
def test_flipped_ciphertext_byte_rejected(self):
|
|
raw = bytearray(base64.b64decode(self._payload()))
|
|
# Flip a byte in the middle of the ciphertext segment
|
|
# (version[1] + nonce[32..32] + ciphertext[33..-32] + mac[-32..])
|
|
ct_start = 1 + 32
|
|
raw[ct_start + 5] ^= 0x01
|
|
tampered = base64.b64encode(bytes(raw)).decode("ascii")
|
|
with pytest.raises(Nip44MacError):
|
|
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
|
|
|
|
def test_flipped_nonce_byte_rejected(self):
|
|
raw = bytearray(base64.b64decode(self._payload()))
|
|
# Nonce starts at byte 1 (after version)
|
|
raw[1] ^= 0x01
|
|
tampered = base64.b64encode(bytes(raw)).decode("ascii")
|
|
with pytest.raises(Nip44MacError):
|
|
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
|
|
|
|
def test_wrong_recipient_privkey_rejected(self):
|
|
"""The MAC is derived from the conversation key, so a wrong
|
|
recipient privkey produces a different conversation key →
|
|
different hmac_key → MAC verification fails. (Doesn't decrypt
|
|
to garbage; fails fast.)"""
|
|
sec_three = "00" * 31 + "03"
|
|
with pytest.raises(Nip44MacError):
|
|
decrypt_from(self._payload(), sec_three, _PUB_ONE)
|
|
|
|
|
|
class TestVersionRejection:
|
|
def test_v1_byte_rejected(self):
|
|
raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO)))
|
|
raw[0] = 0x01
|
|
bad = base64.b64encode(bytes(raw)).decode("ascii")
|
|
with pytest.raises(Nip44VersionError):
|
|
decrypt_from(bad, _SEC_TWO, _PUB_ONE)
|
|
|
|
def test_unknown_version_byte_rejected(self):
|
|
raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO)))
|
|
raw[0] = 0xFF
|
|
bad = base64.b64encode(bytes(raw)).decode("ascii")
|
|
with pytest.raises(Nip44VersionError):
|
|
decrypt_from(bad, _SEC_TWO, _PUB_ONE)
|
|
|
|
|
|
class TestLengthGuards:
|
|
def test_empty_plaintext_rejected(self):
|
|
with pytest.raises(Nip44LengthError):
|
|
encrypt_for("", _SEC_ONE, _PUB_TWO)
|
|
|
|
def test_plaintext_at_max_length_accepted(self):
|
|
plaintext = "x" * 65535
|
|
payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
assert decrypt_from(payload, _SEC_TWO, _PUB_ONE) == plaintext
|
|
|
|
def test_plaintext_over_max_rejected(self):
|
|
with pytest.raises(Nip44LengthError):
|
|
encrypt_for("x" * 65536, _SEC_ONE, _PUB_TWO)
|
|
|
|
def test_invalid_base64_payload_rejected(self):
|
|
with pytest.raises(Nip44LengthError):
|
|
decrypt_from("not!!!base64@@@", _SEC_TWO, _PUB_ONE)
|
|
|
|
def test_payload_too_short_rejected(self):
|
|
# 50 bytes is well under the 99-byte minimum
|
|
too_short = base64.b64encode(b"\x02" + b"\x00" * 49).decode("ascii")
|
|
with pytest.raises(Nip44LengthError):
|
|
decrypt_from(too_short, _SEC_TWO, _PUB_ONE)
|
|
|
|
|
|
class TestPaddingFormula:
|
|
"""Spot-check the _calc_padded_len formula against hand-computed cases.
|
|
Locks in the NIP-44 v2 padding scheme so a refactor can't silently
|
|
break wire compatibility (which would only surface as cross-impl
|
|
decryption failures — exactly what test_decrypts_bitspire_sample is
|
|
meant to catch end-to-end, but a unit test here is cheaper)."""
|
|
|
|
@pytest.mark.parametrize(
|
|
"plaintext_len,expected_padded",
|
|
[
|
|
(1, 32), # <= 32 → 32
|
|
(16, 32),
|
|
(32, 32),
|
|
(33, 64), # > 32 → next chunk
|
|
(64, 64),
|
|
(
|
|
65,
|
|
96,
|
|
), # chunk = 32 for L=65 (next_power(64) = 64; 64//8 = 8; max(32, 8) = 32)
|
|
(100, 128),
|
|
(128, 128),
|
|
# L=129: next_power(128) = 1<<8 = 256; chunk = max(32, 256//8) = 32;
|
|
# padded = 32 * (128//32 + 1) = 32 * 5 = 160.
|
|
(129, 160),
|
|
(256, 256), # chunk = 32 for L=256 (next_power(255)=256; max(32, 32) = 32)
|
|
(257, 320),
|
|
(
|
|
1000,
|
|
1024,
|
|
), # chunk = 128 for L=1000 (next_power(999)=1024; max(32, 128) = 128)
|
|
],
|
|
)
|
|
def test_calc_padded_len(self, plaintext_len, expected_padded):
|
|
assert _calc_padded_len(plaintext_len) == expected_padded
|
|
|
|
|
|
# =============================================================================
|
|
# Layer 3 — byte-compat cross-test against nostr-tools (bitspire's impl)
|
|
# =============================================================================
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Bitspire-side v1.1 fixture, posted to ~/dev/coordination/log.md at
|
|
# 2026-05-30T19:00Z. Positions-keyed wire shape per the v1.1 redesign
|
|
# (18:30Z + 18:45Z); intentionally includes two positions sharing
|
|
# denomination=20 to exercise the multi-same-denom round-trip on our
|
|
# decrypt + payload-validate path. Throwaway keypairs (one-shot, never
|
|
# sign anything else) — safe to embed verbatim.
|
|
# Generated by apps/machine/src/services/operator-config.ts-shape code
|
|
# path using the @bitSpire/nostr-client encryptContentV2 +
|
|
# createSignedEvent helpers (same code the production bootstrap publish
|
|
# uses). Round-tripped on bitspire side via decryptContentV2 before posting.
|
|
# -----------------------------------------------------------------------------
|
|
|
|
_BITSPIRE_FIXTURE = {
|
|
"atm_keypair": {
|
|
"privkey_hex": (
|
|
"814e6188d017102bbf301ba5b38fba95b2556dc79a60df4cd50605c4593578e6"
|
|
),
|
|
"pubkey_hex": (
|
|
"217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b"
|
|
),
|
|
},
|
|
"operator_keypair": {
|
|
"privkey_hex": (
|
|
"cca7dd9fe4874f6b9f3f3fae21648da686b7e714bfd4786e8fa8745933fd3185"
|
|
),
|
|
"pubkey_hex": (
|
|
"49bd8e615769f8b6a5aa8ce9617b919996abecf234599ba196789461cf239146"
|
|
),
|
|
},
|
|
"expected_plaintext": {
|
|
"positions": {
|
|
"1": {"denomination": 20, "count": 49},
|
|
"2": {"denomination": 20, "count": 38},
|
|
"3": {"denomination": 50, "count": 100},
|
|
},
|
|
},
|
|
"event": {
|
|
"kind": 30078,
|
|
"content": (
|
|
"AqOHsCcjN2W8L/Cx0uH+n++VA13W+wy7z1EcuuNX49sSagelX2lI0HEKyd+ActOc"
|
|
"iaPsHrp9ecJTkEZOD86ioldbLbEVColJwK4g1uVZSbpDeqRe+97woxVDqPnzj507"
|
|
"tFaVLF/dRmda+oKHUzkVPhE4PHQJzp9Fqji38J3nU6N68qo7KOt3qg1nSy5eDfAu"
|
|
"zt7djRBx63+/veub0rWTMMQLBgci8+Ms6Y+Zb1mki3L6NWuIR0Or+8DhcD+ZJiOu"
|
|
"WTcx"
|
|
),
|
|
"tags": [
|
|
[
|
|
"d",
|
|
"bitspire-cassettes-state:"
|
|
"217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b",
|
|
],
|
|
[
|
|
"p",
|
|
"49bd8e615769f8b6a5aa8ce9617b919996abecf234599ba196789461cf239146",
|
|
],
|
|
],
|
|
"created_at": 1780173222,
|
|
"pubkey": ("217bdc9a65b571c4d9b59da6227a7aa6ca5bbfd5280af791417c57a79d92852b"),
|
|
"id": ("72c09f333386dd4ad6125f8c69823824eea50d8091b694458bcd60701517eece"),
|
|
"sig": (
|
|
"07ecafacf0169f074e564a999ee1c31446930b43391d007c4a1f9ef7ad890d6c"
|
|
"2aa6e3ecc5318edeb5748fbd64c7ca33407099a97154e2ff7e0c626e48d71925"
|
|
),
|
|
},
|
|
}
|
|
|
|
|
|
class TestBitspireCrossTest:
|
|
"""Byte-compat cross-test between our hand-rolled NIP-44 v2 (`nip44.py`)
|
|
and the nostr-tools NIP-44 v2 impl that bitspire uses on the ATM side
|
|
(via @bitSpire/nostr-client). If these tests pass, the wire format
|
|
agrees across both implementations and the joint round-trip (operator
|
|
publish → ATM apply / ATM bootstrap → operator consume) is byte-safe.
|
|
If any fail, the spec ambiguity surfaces before sintra ships."""
|
|
|
|
def test_decrypts_bitspire_sample_event(self):
|
|
"""The load-bearing assertion: our `decrypt_from` recovers the
|
|
expected `{"positions": {...}}` plaintext from bitspire's encrypted
|
|
event content. v1.1 fixture intentionally exercises the multi-same-
|
|
denomination round-trip (positions 1 + 2 both hold $20)."""
|
|
import json
|
|
|
|
event = _BITSPIRE_FIXTURE["event"]
|
|
operator_privkey = _BITSPIRE_FIXTURE["operator_keypair"]["privkey_hex"]
|
|
|
|
from ..nip44 import decrypt_from
|
|
|
|
plaintext = decrypt_from(
|
|
event["content"],
|
|
operator_privkey,
|
|
event["pubkey"],
|
|
)
|
|
payload = json.loads(plaintext)
|
|
assert payload == _BITSPIRE_FIXTURE["expected_plaintext"]
|
|
|
|
# v1.1 invariant: two positions can carry the same denomination.
|
|
# Pin it explicitly so a future "fix" that re-introduces denom-
|
|
# uniqueness validation surfaces here instead of as a runtime
|
|
# rejection on real machines.
|
|
assert payload["positions"]["1"]["denomination"] == 20
|
|
assert payload["positions"]["2"]["denomination"] == 20
|
|
assert payload["positions"]["1"]["count"] != payload["positions"]["2"]["count"]
|
|
|
|
def test_signature_verifies_via_lnbits_helper(self):
|
|
"""Optional extra per bitspire's 13:15Z note (3). The consumer
|
|
path runs verify_event before NIP-44 decrypt — locking the sig-
|
|
algorithm agreement here means both sides hash the event id the
|
|
same way + Schnorr-verify under the same x-only public-key
|
|
convention."""
|
|
from lnbits.utils.nostr import verify_event
|
|
|
|
assert verify_event(_BITSPIRE_FIXTURE["event"]) is True
|
|
|
|
def test_encrypt_round_trip_via_our_impl_decrypts_with_their_keys(self):
|
|
"""Optional extra per bitspire's 13:15Z note (3). Encrypt the
|
|
expected plaintext using OUR impl with the ATM keypair as
|
|
sender + operator pubkey as recipient. The resulting ciphertext
|
|
won't be byte-identical to the fixture (NIP-44 v2 nonces are
|
|
random) but it MUST decrypt back to the same plaintext when
|
|
passed to our decrypt path. Locks the encrypt direction too,
|
|
not just decrypt."""
|
|
import json
|
|
|
|
from ..nip44 import decrypt_from, encrypt_for
|
|
|
|
plaintext = json.dumps(
|
|
_BITSPIRE_FIXTURE["expected_plaintext"], separators=(",", ":")
|
|
)
|
|
atm_sec = _BITSPIRE_FIXTURE["atm_keypair"]["privkey_hex"]
|
|
atm_pub = _BITSPIRE_FIXTURE["atm_keypair"]["pubkey_hex"]
|
|
op_sec = _BITSPIRE_FIXTURE["operator_keypair"]["privkey_hex"]
|
|
op_pub = _BITSPIRE_FIXTURE["operator_keypair"]["pubkey_hex"]
|
|
|
|
our_ciphertext = encrypt_for(plaintext, atm_sec, op_pub)
|
|
recovered = decrypt_from(our_ciphertext, op_sec, atm_pub)
|
|
assert json.loads(recovered) == _BITSPIRE_FIXTURE["expected_plaintext"]
|
|
# The two ciphertexts SHOULD differ (random nonce per encrypt)
|
|
assert our_ciphertext != _BITSPIRE_FIXTURE["event"]["content"]
|