Replace NIP-04 messaging with NIP-17 (NIP-44 + NIP-59 gift wrapping)
Modernize the entire customer-merchant communication layer from deprecated NIP-04 (kind 4, AES-256-CBC) to NIP-17 private direct messages using NIP-44 v2 encryption (ChaCha20 + HMAC-SHA256) and NIP-59 gift wrapping (rumor/seal/gift-wrap protocol). No backwards compatibility retained. New modules: - nostr/nip44.py: NIP-44 v2 encryption verified against official spec vectors - nostr/nip59.py: NIP-59 gift wrap with wrap/unwrap convenience functions - tests/: 44 unit tests for NIP-44 and NIP-59 Key changes: - Subscription filters: kind 4 → kind 1059 gift wraps - Message handler: _handle_nip04_message → _handle_gift_wrap (unwrap + route) - send_dm/reply_to_structured_dm: NIP-59 gift wrap to recipient + self-archive - Merchant model: removed NIP-04 crypto methods (decrypt/encrypt/build_dm_event) - helpers.py: removed NIP-04 functions, kept Schnorr signing + key normalization - views_api.py: consolidated DM sending through send_dm() service function Reliability improvements: - Event deduplication via bounded LRU set in NostrClient - Subscription health monitor (resubscribes after 120s of silence) - Preserved 5-minute lenient time window from prior work Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
319d5eeb04
commit
725944ae9c
13 changed files with 869 additions and 165 deletions
27
tests/conftest.py
Normal file
27
tests/conftest.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
"""
|
||||
Stub out the nostrmarket root package and all LNbits dependencies so that
|
||||
nostr/* unit tests can run without the full LNbits environment.
|
||||
|
||||
pytest walks up from tests/ and tries to import the parent __init__.py,
|
||||
which pulls in fastapi, lnbits, websocket, etc. We preemptively register
|
||||
the parent package as a simple module so that import never happens.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
|
||||
# Register 'nostrmarket' as an already-imported namespace package
|
||||
# pointing at the extension root, so pytest doesn't try to exec __init__.py
|
||||
_ext_root = Path(__file__).resolve().parent.parent
|
||||
_pkg = types.ModuleType("nostrmarket")
|
||||
_pkg.__path__ = [str(_ext_root)]
|
||||
_pkg.__package__ = "nostrmarket"
|
||||
sys.modules["nostrmarket"] = _pkg
|
||||
|
||||
# Also ensure the nostr subpackage is importable
|
||||
_nostr_dir = _ext_root / "nostr"
|
||||
_nostr_pkg = types.ModuleType("nostrmarket.nostr")
|
||||
_nostr_pkg.__path__ = [str(_nostr_dir)]
|
||||
_nostr_pkg.__package__ = "nostrmarket.nostr"
|
||||
sys.modules["nostrmarket.nostr"] = _nostr_pkg
|
||||
139
tests/test_nip44.py
Normal file
139
tests/test_nip44.py
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
"""Tests for NIP-44 v2 encryption against official spec test vectors."""
|
||||
|
||||
import coincurve
|
||||
import pytest
|
||||
|
||||
from nostr.nip44 import (
|
||||
calc_padded_len,
|
||||
decrypt,
|
||||
encrypt,
|
||||
get_conversation_key,
|
||||
get_message_keys,
|
||||
)
|
||||
|
||||
|
||||
def pubkey_from_secret(secret_hex: str) -> str:
|
||||
"""Derive x-only public key hex from secret key hex."""
|
||||
sk = coincurve.PrivateKey(bytes.fromhex(secret_hex))
|
||||
return sk.public_key.format(compressed=True)[1:].hex()
|
||||
|
||||
|
||||
# --- Test vector from NIP-44 spec ---
|
||||
|
||||
SPEC_VECTOR = {
|
||||
"sec1": "0000000000000000000000000000000000000000000000000000000000000001",
|
||||
"sec2": "0000000000000000000000000000000000000000000000000000000000000002",
|
||||
"conversation_key": "c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d",
|
||||
"nonce": "0000000000000000000000000000000000000000000000000000000000000001",
|
||||
"plaintext": "a",
|
||||
"payload": "AgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABee0G5VSK0/9YypIObAtDKfYEAjD35uVkHyB0F4DwrcNaCXlCWZKaArsGrY6M9wnuTMxWfp1RTN9Xga8no+kF5Vsb",
|
||||
}
|
||||
|
||||
|
||||
class TestConversationKey:
|
||||
def test_spec_vector(self):
|
||||
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
||||
key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
||||
assert key.hex() == SPEC_VECTOR["conversation_key"]
|
||||
|
||||
def test_symmetric(self):
|
||||
"""conv(a, B) == conv(b, A)"""
|
||||
pub1 = pubkey_from_secret(SPEC_VECTOR["sec1"])
|
||||
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
||||
key_ab = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
||||
key_ba = get_conversation_key(SPEC_VECTOR["sec2"], pub1)
|
||||
assert key_ab == key_ba
|
||||
|
||||
|
||||
class TestMessageKeys:
|
||||
def test_returns_correct_lengths(self):
|
||||
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
||||
nonce = bytes.fromhex(SPEC_VECTOR["nonce"])
|
||||
chacha_key, chacha_nonce, hmac_key = get_message_keys(conv_key, nonce)
|
||||
assert len(chacha_key) == 32
|
||||
assert len(chacha_nonce) == 12
|
||||
assert len(hmac_key) == 32
|
||||
|
||||
def test_rejects_bad_key_length(self):
|
||||
with pytest.raises(ValueError):
|
||||
get_message_keys(b"\x00" * 16, b"\x00" * 32)
|
||||
|
||||
def test_rejects_bad_nonce_length(self):
|
||||
with pytest.raises(ValueError):
|
||||
get_message_keys(b"\x00" * 32, b"\x00" * 16)
|
||||
|
||||
|
||||
class TestPadding:
|
||||
@pytest.mark.parametrize(
|
||||
"unpadded,expected",
|
||||
[
|
||||
(1, 32),
|
||||
(2, 32),
|
||||
(31, 32),
|
||||
(32, 32),
|
||||
(33, 64),
|
||||
(64, 64),
|
||||
(65, 96),
|
||||
(256, 256),
|
||||
(257, 320),
|
||||
(1024, 1024),
|
||||
(65535, 65536),
|
||||
],
|
||||
)
|
||||
def test_calc_padded_len(self, unpadded, expected):
|
||||
assert calc_padded_len(unpadded) == expected
|
||||
|
||||
def test_rejects_zero(self):
|
||||
with pytest.raises(ValueError):
|
||||
calc_padded_len(0)
|
||||
|
||||
|
||||
class TestEncryptDecrypt:
|
||||
def test_spec_vector(self):
|
||||
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
||||
nonce = bytes.fromhex(SPEC_VECTOR["nonce"])
|
||||
payload = encrypt(SPEC_VECTOR["plaintext"], conv_key, nonce)
|
||||
assert payload == SPEC_VECTOR["payload"]
|
||||
|
||||
def test_spec_vector_decrypt(self):
|
||||
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
||||
plaintext = decrypt(SPEC_VECTOR["payload"], conv_key)
|
||||
assert plaintext == SPEC_VECTOR["plaintext"]
|
||||
|
||||
def test_round_trip_short(self):
|
||||
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
||||
conv_key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
||||
msg = "x"
|
||||
assert decrypt(encrypt(msg, conv_key), conv_key) == msg
|
||||
|
||||
def test_round_trip_long(self):
|
||||
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
||||
conv_key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
||||
msg = "A" * 65535
|
||||
assert decrypt(encrypt(msg, conv_key), conv_key) == msg
|
||||
|
||||
def test_round_trip_unicode(self):
|
||||
pub2 = pubkey_from_secret(SPEC_VECTOR["sec2"])
|
||||
conv_key = get_conversation_key(SPEC_VECTOR["sec1"], pub2)
|
||||
msg = "hello world! \U0001f680\U0001f30e\U0001f4ac"
|
||||
assert decrypt(encrypt(msg, conv_key), conv_key) == msg
|
||||
|
||||
def test_tampered_mac_rejected(self):
|
||||
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
||||
payload = SPEC_VECTOR["payload"]
|
||||
tampered = payload[:-1] + ("a" if payload[-1] != "a" else "b")
|
||||
with pytest.raises(ValueError, match="invalid MAC"):
|
||||
decrypt(tampered, conv_key)
|
||||
|
||||
def test_empty_plaintext_rejected(self):
|
||||
conv_key = bytes.fromhex(SPEC_VECTOR["conversation_key"])
|
||||
with pytest.raises(ValueError, match="invalid plaintext length"):
|
||||
encrypt("", conv_key)
|
||||
|
||||
def test_unknown_version_rejected(self):
|
||||
with pytest.raises(ValueError, match="unknown version"):
|
||||
decrypt("#invalid", bytes(32))
|
||||
|
||||
def test_short_payload_rejected(self):
|
||||
with pytest.raises(ValueError, match="invalid payload size"):
|
||||
decrypt("AAAA", bytes(32))
|
||||
191
tests/test_nip59.py
Normal file
191
tests/test_nip59.py
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
"""Tests for NIP-59 gift wrap protocol."""
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
import coincurve
|
||||
import pytest
|
||||
|
||||
from nostr.nip59 import (
|
||||
create_gift_wrap,
|
||||
create_rumor,
|
||||
create_seal,
|
||||
unseal,
|
||||
unwrap_gift_wrap,
|
||||
unwrap_message,
|
||||
wrap_message,
|
||||
)
|
||||
|
||||
|
||||
def _generate_keypair() -> tuple[str, str]:
|
||||
"""Generate a (privkey_hex, pubkey_hex) pair."""
|
||||
sk = coincurve.PrivateKey()
|
||||
privkey = sk.secret.hex()
|
||||
pubkey = sk.public_key.format(compressed=True)[1:].hex()
|
||||
return privkey, pubkey
|
||||
|
||||
|
||||
SENDER_PRIV, SENDER_PUB = _generate_keypair()
|
||||
RECIPIENT_PRIV, RECIPIENT_PUB = _generate_keypair()
|
||||
|
||||
|
||||
class TestCreateRumor:
|
||||
def test_has_id_but_no_sig(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello", kind=14)
|
||||
assert rumor.id != ""
|
||||
assert rumor.sig is None
|
||||
|
||||
def test_kind_and_content(self):
|
||||
rumor = create_rumor(SENDER_PUB, "test message", kind=14, tags=[["p", RECIPIENT_PUB]])
|
||||
assert rumor.kind == 14
|
||||
assert rumor.content == "test message"
|
||||
assert rumor.pubkey == SENDER_PUB
|
||||
assert ["p", RECIPIENT_PUB] in rumor.tags
|
||||
|
||||
def test_custom_timestamp(self):
|
||||
ts = 1700000000
|
||||
rumor = create_rumor(SENDER_PUB, "hello", created_at=ts)
|
||||
assert rumor.created_at == ts
|
||||
|
||||
|
||||
class TestCreateSeal:
|
||||
def test_kind_13_with_empty_tags(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
assert seal.kind == 13
|
||||
assert seal.tags == []
|
||||
assert seal.pubkey == SENDER_PUB
|
||||
|
||||
def test_is_signed(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
assert seal.sig is not None
|
||||
assert len(seal.sig) == 128 # 64 bytes hex
|
||||
|
||||
def test_content_is_encrypted(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
# Content should be base64 NIP-44 payload, not plaintext JSON
|
||||
assert "hello" not in seal.content
|
||||
|
||||
def test_timestamp_is_randomized(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
now = int(time.time())
|
||||
# Seal timestamp should be in the past (up to 2 days)
|
||||
assert seal.created_at <= now
|
||||
assert seal.created_at >= now - (2 * 24 * 60 * 60 + 10)
|
||||
|
||||
|
||||
class TestCreateGiftWrap:
|
||||
def test_kind_1059_with_p_tag(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
wrap = create_gift_wrap(seal, RECIPIENT_PUB)
|
||||
assert wrap.kind == 1059
|
||||
assert ["p", RECIPIENT_PUB] in wrap.tags
|
||||
|
||||
def test_uses_ephemeral_key(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
wrap = create_gift_wrap(seal, RECIPIENT_PUB)
|
||||
# Gift wrap pubkey should be neither sender nor recipient
|
||||
assert wrap.pubkey != SENDER_PUB
|
||||
assert wrap.pubkey != RECIPIENT_PUB
|
||||
|
||||
def test_different_wraps_have_different_ephemeral_keys(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
wrap1 = create_gift_wrap(seal, RECIPIENT_PUB)
|
||||
wrap2 = create_gift_wrap(seal, RECIPIENT_PUB)
|
||||
assert wrap1.pubkey != wrap2.pubkey
|
||||
|
||||
|
||||
class TestUnwrap:
|
||||
def test_unwrap_gift_wrap_returns_seal(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
wrap = create_gift_wrap(seal, RECIPIENT_PUB)
|
||||
|
||||
recovered_seal = unwrap_gift_wrap(wrap, RECIPIENT_PRIV)
|
||||
assert recovered_seal.kind == 13
|
||||
assert recovered_seal.pubkey == SENDER_PUB
|
||||
|
||||
def test_unseal_returns_rumor(self):
|
||||
rumor = create_rumor(SENDER_PUB, "hello world")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
|
||||
recovered_rumor = unseal(seal, RECIPIENT_PRIV)
|
||||
assert recovered_rumor.content == "hello world"
|
||||
assert recovered_rumor.pubkey == SENDER_PUB
|
||||
assert recovered_rumor.kind == 14
|
||||
|
||||
def test_wrong_key_fails(self):
|
||||
rumor = create_rumor(SENDER_PUB, "secret")
|
||||
seal = create_seal(rumor, SENDER_PRIV, RECIPIENT_PUB)
|
||||
wrap = create_gift_wrap(seal, RECIPIENT_PUB)
|
||||
|
||||
wrong_priv, _ = _generate_keypair()
|
||||
with pytest.raises(Exception):
|
||||
unwrap_message(wrap, wrong_priv)
|
||||
|
||||
|
||||
class TestFullRoundTrip:
|
||||
def test_wrap_unwrap_message(self):
|
||||
content = "Are you going to the party tonight?"
|
||||
wrap = wrap_message(content, SENDER_PRIV, SENDER_PUB, RECIPIENT_PUB)
|
||||
|
||||
assert wrap.kind == 1059
|
||||
assert ["p", RECIPIENT_PUB] in wrap.tags
|
||||
|
||||
rumor = unwrap_message(wrap, RECIPIENT_PRIV)
|
||||
assert rumor.content == content
|
||||
assert rumor.pubkey == SENDER_PUB
|
||||
assert rumor.kind == 14
|
||||
assert rumor.sig is None
|
||||
|
||||
def test_wrap_with_custom_kind_and_tags(self):
|
||||
tags = [["p", RECIPIENT_PUB], ["subject", "test"]]
|
||||
wrap = wrap_message(
|
||||
"order data",
|
||||
SENDER_PRIV,
|
||||
SENDER_PUB,
|
||||
RECIPIENT_PUB,
|
||||
kind=14,
|
||||
tags=tags,
|
||||
)
|
||||
|
||||
rumor = unwrap_message(wrap, RECIPIENT_PRIV)
|
||||
assert rumor.content == "order data"
|
||||
assert rumor.kind == 14
|
||||
assert ["subject", "test"] in rumor.tags
|
||||
|
||||
def test_self_wrap_for_archival(self):
|
||||
"""Merchant wraps a copy to self (same sender and recipient)."""
|
||||
content = '{"type": 1, "payment_options": [{"type": "ln", "link": "lnbc..."}]}'
|
||||
wrap = wrap_message(content, SENDER_PRIV, SENDER_PUB, SENDER_PUB)
|
||||
|
||||
rumor = unwrap_message(wrap, SENDER_PRIV)
|
||||
assert rumor.content == content
|
||||
assert rumor.pubkey == SENDER_PUB
|
||||
|
||||
def test_json_content_preserved(self):
|
||||
"""Order JSON payloads survive the wrap/unwrap cycle."""
|
||||
order = {
|
||||
"type": 0,
|
||||
"id": "test-order-123",
|
||||
"items": [{"product_id": "abc", "quantity": 2}],
|
||||
"shipping_id": "zone-1",
|
||||
}
|
||||
content = json.dumps(order)
|
||||
wrap = wrap_message(content, SENDER_PRIV, SENDER_PUB, RECIPIENT_PUB)
|
||||
|
||||
rumor = unwrap_message(wrap, RECIPIENT_PRIV)
|
||||
recovered_order = json.loads(rumor.content)
|
||||
assert recovered_order == order
|
||||
|
||||
def test_unicode_content(self):
|
||||
content = "Payment received! \u2705 Your order is being processed \U0001f4e6"
|
||||
wrap = wrap_message(content, SENDER_PRIV, SENDER_PUB, RECIPIENT_PUB)
|
||||
rumor = unwrap_message(wrap, RECIPIENT_PRIV)
|
||||
assert rumor.content == content
|
||||
Loading…
Add table
Add a link
Reference in a new issue