Modernize the entire customer-merchant communication layer from deprecated NIP-04 (kind 4, AES-256-CBC) to NIP-17 private direct messages using NIP-44 v2 encryption (ChaCha20 + HMAC-SHA256) and NIP-59 gift wrapping (rumor/seal/gift-wrap protocol). No backwards compatibility retained. New modules: - nostr/nip44.py: NIP-44 v2 encryption verified against official spec vectors - nostr/nip59.py: NIP-59 gift wrap with wrap/unwrap convenience functions - tests/: 44 unit tests for NIP-44 and NIP-59 Key changes: - Subscription filters: kind 4 → kind 1059 gift wraps - Message handler: _handle_nip04_message → _handle_gift_wrap (unwrap + route) - send_dm/reply_to_structured_dm: NIP-59 gift wrap to recipient + self-archive - Merchant model: removed NIP-04 crypto methods (decrypt/encrypt/build_dm_event) - helpers.py: removed NIP-04 functions, kept Schnorr signing + key normalization - views_api.py: consolidated DM sending through send_dm() service function Reliability improvements: - Event deduplication via bounded LRU set in NostrClient - Subscription health monitor (resubscribes after 120s of silence) - Preserved 5-minute lenient time window from prior work Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
139 lines
4.9 KiB
Python
139 lines
4.9 KiB
Python
"""Tests for NIP-44 v2 encryption against official spec test vectors."""
|
|
|
|
import coincurve
|
|
import pytest
|
|
|
|
from nostr.nip44 import (
|
|
calc_padded_len,
|
|
decrypt,
|
|
encrypt,
|
|
get_conversation_key,
|
|
get_message_keys,
|
|
)
|
|
|
|
|
|
def pubkey_from_secret(secret_hex: str) -> str:
|
|
"""Derive x-only public key hex from secret key hex."""
|
|
sk = coincurve.PrivateKey(bytes.fromhex(secret_hex))
|
|
return sk.public_key.format(compressed=True)[1:].hex()
|
|
|
|
|
|
# --- Test vector from NIP-44 spec ---
|
|
|
|
SPEC_VECTOR = {
|
|
"sec1": "0000000000000000000000000000000000000000000000000000000000000001",
|
|
"sec2": "0000000000000000000000000000000000000000000000000000000000000002",
|
|
"conversation_key": "c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d",
|
|
"nonce": "0000000000000000000000000000000000000000000000000000000000000001",
|
|
"plaintext": "a",
|
|
"payload": "AgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABee0G5VSK0/9YypIObAtDKfYEAjD35uVkHyB0F4DwrcNaCXlCWZKaArsGrY6M9wnuTMxWfp1RTN9Xga8no+kF5Vsb",
|
|
}
|
|
|
|
|
|
class TestConversationKey:
|
|
def test_spec_vector(self):
|
|
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
|
key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
|
assert key.hex() == SPEC_VECTOR["conversation_key"]
|
|
|
|
def test_symmetric(self):
|
|
"""conv(a, B) == conv(b, A)"""
|
|
pub1 = pubkey_from_secret(SPEC_VECTOR["sec1"])
|
|
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
|
key_ab = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
|
key_ba = get_conversation_key(SPEC_VECTOR["sec2"], pub1)
|
|
assert key_ab == key_ba
|
|
|
|
|
|
class TestMessageKeys:
|
|
def test_returns_correct_lengths(self):
|
|
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
|
nonce = bytes.fromhex(SPEC_VECTOR["nonce"])
|
|
chacha_key, chacha_nonce, hmac_key = get_message_keys(conv_key, nonce)
|
|
assert len(chacha_key) == 32
|
|
assert len(chacha_nonce) == 12
|
|
assert len(hmac_key) == 32
|
|
|
|
def test_rejects_bad_key_length(self):
|
|
with pytest.raises(ValueError):
|
|
get_message_keys(b"\x00" * 16, b"\x00" * 32)
|
|
|
|
def test_rejects_bad_nonce_length(self):
|
|
with pytest.raises(ValueError):
|
|
get_message_keys(b"\x00" * 32, b"\x00" * 16)
|
|
|
|
|
|
class TestPadding:
|
|
@pytest.mark.parametrize(
|
|
"unpadded,expected",
|
|
[
|
|
(1, 32),
|
|
(2, 32),
|
|
(31, 32),
|
|
(32, 32),
|
|
(33, 64),
|
|
(64, 64),
|
|
(65, 96),
|
|
(256, 256),
|
|
(257, 320),
|
|
(1024, 1024),
|
|
(65535, 65536),
|
|
],
|
|
)
|
|
def test_calc_padded_len(self, unpadded, expected):
|
|
assert calc_padded_len(unpadded) == expected
|
|
|
|
def test_rejects_zero(self):
|
|
with pytest.raises(ValueError):
|
|
calc_padded_len(0)
|
|
|
|
|
|
class TestEncryptDecrypt:
|
|
def test_spec_vector(self):
|
|
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
|
nonce = bytes.fromhex(SPEC_VECTOR["nonce"])
|
|
payload = encrypt(SPEC_VECTOR["plaintext"], conv_key, nonce)
|
|
assert payload == SPEC_VECTOR["payload"]
|
|
|
|
def test_spec_vector_decrypt(self):
|
|
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
|
plaintext = decrypt(SPEC_VECTOR["payload"], conv_key)
|
|
assert plaintext == SPEC_VECTOR["plaintext"]
|
|
|
|
def test_round_trip_short(self):
|
|
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
|
conv_key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
|
msg = "x"
|
|
assert decrypt(encrypt(msg, conv_key), conv_key) == msg
|
|
|
|
def test_round_trip_long(self):
|
|
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
|
conv_key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
|
msg = "A" * 65535
|
|
assert decrypt(encrypt(msg, conv_key), conv_key) == msg
|
|
|
|
def test_round_trip_unicode(self):
|
|
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
|
conv_key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
|
msg = "hello world! \U0001f680\U0001f30e\U0001f4ac"
|
|
assert decrypt(encrypt(msg, conv_key), conv_key) == msg
|
|
|
|
def test_tampered_mac_rejected(self):
|
|
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
|
payload = SPEC_VECTOR["payload"]
|
|
tampered = payload[:-1] + ("a" if payload[-1] != "a" else "b")
|
|
with pytest.raises(ValueError, match="invalid MAC"):
|
|
decrypt(tampered, conv_key)
|
|
|
|
def test_empty_plaintext_rejected(self):
|
|
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
|
with pytest.raises(ValueError, match="invalid plaintext length"):
|
|
encrypt("", conv_key)
|
|
|
|
def test_unknown_version_rejected(self):
|
|
with pytest.raises(ValueError, match="unknown version"):
|
|
decrypt("#invalid", bytes(32))
|
|
|
|
def test_short_payload_rejected(self):
|
|
with pytest.raises(ValueError, match="invalid payload size"):
|
|
decrypt("AAAA", bytes(32))
|