Modernize the entire customer-merchant communication layer from deprecated NIP-04 (kind 4, AES-256-CBC) to NIP-17 private direct messages using NIP-44 v2 encryption (ChaCha20 + HMAC-SHA256) and NIP-59 gift wrapping (rumor/seal/gift-wrap protocol). No backwards compatibility retained. New modules: - nostr/nip44.py: NIP-44 v2 encryption verified against official spec vectors - nostr/nip59.py: NIP-59 gift wrap with wrap/unwrap convenience functions - tests/: 44 unit tests for NIP-44 and NIP-59 Key changes: - Subscription filters: kind 4 → kind 1059 gift wraps - Message handler: _handle_nip04_message → _handle_gift_wrap (unwrap + route) - send_dm/reply_to_structured_dm: NIP-59 gift wrap to recipient + self-archive - Merchant model: removed NIP-04 crypto methods (decrypt/encrypt/build_dm_event) - helpers.py: removed NIP-04 functions, kept Schnorr signing + key normalization - views_api.py: consolidated DM sending through send_dm() service function Reliability improvements: - Event deduplication via bounded LRU set in NostrClient - Subscription health monitor (resubscribes after 120s of silence) - Preserved 5-minute lenient time window from prior work Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
180 lines
5.8 KiB
Python
180 lines
5.8 KiB
Python
"""
|
|
NIP-44 v2: Encrypted Payloads (Versioned)
|
|
|
|
secp256k1 ECDH, HKDF, padding, ChaCha20, HMAC-SHA256, base64
|
|
|
|
Reference: https://github.com/nostr-protocol/nips/blob/master/44.md
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import math
|
|
import secrets
|
|
import struct
|
|
|
|
import coincurve
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
|
|
from cryptography.hazmat.primitives import hashes
|
|
|
|
VERSION = 2
|
|
MIN_PLAINTEXT_SIZE = 1
|
|
MAX_PLAINTEXT_SIZE = 65535
|
|
|
|
|
|
def get_conversation_key(private_key_hex: str, public_key_hex: str) -> bytes:
|
|
"""
|
|
Calculate long-term conversation key between two users via ECDH + HKDF-extract.
|
|
Symmetric: get_conversation_key(a, B) == get_conversation_key(b, A)
|
|
"""
|
|
pk = coincurve.PublicKey(bytes.fromhex("02" + public_key_hex))
|
|
sk = coincurve.PrivateKey(bytes.fromhex(private_key_hex))
|
|
shared_point = pk.multiply(sk.secret)
|
|
shared_x = shared_point.format(compressed=False)[1:33]
|
|
|
|
# HKDF-extract only (not expand) with salt='nip44-v2'
|
|
conversation_key = hmac.new(b"nip44-v2", shared_x, hashlib.sha256).digest()
|
|
return conversation_key
|
|
|
|
|
|
def get_message_keys(
|
|
conversation_key: bytes, nonce: bytes
|
|
) -> tuple[bytes, bytes, bytes]:
|
|
"""
|
|
Derive per-message keys from conversation_key and nonce using HKDF-expand.
|
|
Returns (chacha_key, chacha_nonce, hmac_key).
|
|
"""
|
|
if len(conversation_key) != 32:
|
|
raise ValueError("invalid conversation_key length")
|
|
if len(nonce) != 32:
|
|
raise ValueError("invalid nonce length")
|
|
|
|
keys = HKDFExpand(
|
|
algorithm=hashes.SHA256(),
|
|
length=76,
|
|
info=nonce,
|
|
).derive(conversation_key)
|
|
|
|
chacha_key = keys[0:32]
|
|
chacha_nonce = keys[32:44]
|
|
hmac_key = keys[44:76]
|
|
return chacha_key, chacha_nonce, hmac_key
|
|
|
|
|
|
def calc_padded_len(unpadded_len: int) -> int:
|
|
"""Calculate padded length using power-of-two chunking."""
|
|
if unpadded_len <= 0:
|
|
raise ValueError("invalid plaintext length")
|
|
if unpadded_len <= 32:
|
|
return 32
|
|
next_power = 1 << (math.floor(math.log2(unpadded_len - 1)) + 1)
|
|
if next_power <= 256:
|
|
chunk = 32
|
|
else:
|
|
chunk = next_power // 8
|
|
return chunk * (math.floor((unpadded_len - 1) / chunk) + 1)
|
|
|
|
|
|
def _pad(plaintext: str) -> bytes:
|
|
"""Convert plaintext string to padded byte array."""
|
|
unpadded = plaintext.encode("utf-8")
|
|
unpadded_len = len(unpadded)
|
|
if unpadded_len < MIN_PLAINTEXT_SIZE or unpadded_len > MAX_PLAINTEXT_SIZE:
|
|
raise ValueError(
|
|
f"invalid plaintext length: {unpadded_len} "
|
|
f"(must be {MIN_PLAINTEXT_SIZE}..{MAX_PLAINTEXT_SIZE})"
|
|
)
|
|
prefix = struct.pack(">H", unpadded_len)
|
|
padded_len = calc_padded_len(unpadded_len)
|
|
suffix = b"\x00" * (padded_len - unpadded_len)
|
|
return prefix + unpadded + suffix
|
|
|
|
|
|
def _unpad(padded: bytes) -> str:
|
|
"""Convert padded byte array back to plaintext string."""
|
|
unpadded_len = struct.unpack(">H", padded[0:2])[0]
|
|
unpadded = padded[2 : 2 + unpadded_len]
|
|
if (
|
|
unpadded_len == 0
|
|
or len(unpadded) != unpadded_len
|
|
or len(padded) != 2 + calc_padded_len(unpadded_len)
|
|
):
|
|
raise ValueError("invalid padding")
|
|
return unpadded.decode("utf-8")
|
|
|
|
|
|
def _hmac_aad(key: bytes, message: bytes, aad: bytes) -> bytes:
|
|
"""HMAC-SHA256 with AAD: hmac(key, aad || message)."""
|
|
if len(aad) != 32:
|
|
raise ValueError("AAD associated data must be 32 bytes")
|
|
return hmac.new(key, aad + message, hashlib.sha256).digest()
|
|
|
|
|
|
def _chacha20(key: bytes, nonce: bytes, data: bytes) -> bytes:
|
|
"""ChaCha20 encrypt/decrypt with initial counter = 0."""
|
|
# cryptography's ChaCha20 takes a 16-byte nonce: 4-byte counter (LE) + 12-byte nonce
|
|
full_nonce = b"\x00\x00\x00\x00" + nonce
|
|
cipher = Cipher(algorithms.ChaCha20(key, full_nonce), mode=None)
|
|
encryptor = cipher.encryptor()
|
|
return encryptor.update(data) + encryptor.finalize()
|
|
|
|
|
|
def _decode_payload(payload: str) -> tuple[bytes, bytes, bytes]:
|
|
"""Decode base64 payload into (nonce, ciphertext, mac)."""
|
|
plen = len(payload)
|
|
if plen == 0 or payload[0] == "#":
|
|
raise ValueError("unknown version")
|
|
if plen < 132 or plen > 87472:
|
|
raise ValueError("invalid payload size")
|
|
|
|
data = base64.b64decode(payload)
|
|
dlen = len(data)
|
|
if dlen < 99 or dlen > 65603:
|
|
raise ValueError("invalid data size")
|
|
|
|
vers = data[0]
|
|
if vers != VERSION:
|
|
raise ValueError(f"unknown version {vers}")
|
|
|
|
nonce = data[1:33]
|
|
ciphertext = data[33 : dlen - 32]
|
|
mac = data[dlen - 32 : dlen]
|
|
return nonce, ciphertext, mac
|
|
|
|
|
|
def encrypt(
|
|
plaintext: str,
|
|
conversation_key: bytes,
|
|
nonce: bytes | None = None,
|
|
) -> str:
|
|
"""
|
|
Encrypt plaintext using NIP-44 v2.
|
|
Returns base64-encoded payload.
|
|
"""
|
|
if nonce is None:
|
|
nonce = secrets.token_bytes(32)
|
|
if len(nonce) != 32:
|
|
raise ValueError("invalid nonce length")
|
|
|
|
chacha_key, chacha_nonce, hmac_key = get_message_keys(conversation_key, nonce)
|
|
padded = _pad(plaintext)
|
|
ciphertext = _chacha20(chacha_key, chacha_nonce, padded)
|
|
mac = _hmac_aad(hmac_key, ciphertext, nonce)
|
|
return base64.b64encode(
|
|
struct.pack("B", VERSION) + nonce + ciphertext + mac
|
|
).decode("ascii")
|
|
|
|
|
|
def decrypt(payload: str, conversation_key: bytes) -> str:
|
|
"""
|
|
Decrypt a NIP-44 v2 base64 payload.
|
|
Returns plaintext string.
|
|
"""
|
|
nonce, ciphertext, mac = _decode_payload(payload)
|
|
chacha_key, chacha_nonce, hmac_key = get_message_keys(conversation_key, nonce)
|
|
calculated_mac = _hmac_aad(hmac_key, ciphertext, nonce)
|
|
if not hmac.compare_digest(calculated_mac, mac):
|
|
raise ValueError("invalid MAC")
|
|
padded_plaintext = _chacha20(chacha_key, chacha_nonce, ciphertext)
|
|
return _unpad(padded_plaintext)
|