Some checks failed
ci.yml / chore(v2): lint pass — black + ruff auto-fix + mypy regressions (#29 v1.1) (pull_request) Failing after 0s
Pre-merge lint hygiene on the PR #30 touched files:
- `black` reformatted 9 files (cassette_transport, crud, models, tasks,
views_api, nip44, all 3 cassette test files, migrations). Cosmetic:
line lengths, trailing commas, multi-line argument layout.
- `ruff check --fix` cleared 176 of 202 errors auto-fixed. Mostly
`UP006` `typing.Optional` → `| None` modernization, `I001` import
sort order, `UP035` typing-extensions cleanup.
- Two new mypy regressions introduced by the migration commit dcb7de0
fixed:
- `crud.py:apply_bootstrap_state` — annotated `existing_first: dict
| None` on the dedup fetch.
- `tasks.py:_cassette_consumer_tick` — `# type: ignore[arg-type]` on
the `nostr_client.relay_manager.add_subscription` call; nostrclient's
upstream typing declares `list[str]` for filters but the actual
Nostr protocol takes `list[<filter-dict>]`. The runtime accepts it
(live smoke at 13:43Z dispatched `nip44_decrypt` cleanly through
this subscription); the typing mismatch is upstream's.
Remaining lint state, intentionally not addressed in this commit
(all pre-existing baseline, not regressions):
- 8 mypy errors in `calculations.py` + the unchanged-by-this-PR parts
of `crud.py` — pre-existing on v2-bitspire.
- 26 ruff style warnings: 14 are N805 false-positives on Pydantic
validators (`cls` first-arg is correct for `@validator`-decorated
methods); 4 are N818 exception-name-suffix preferences on my new
exception classes (renaming would touch many call sites; keep
`OperatorIdentityMissing` / `SignerUnavailable` / `RelayUnavailable`
/ `_NostrclientUnavailable` as-is for clarity); 5 are E501 line-too-
long on docstrings (the long lines are formatted for clarity);
1 RUF002 unicode-minus in a docstring.
Tests: 155 passed, 1 pre-existing async-plugin failure unchanged.
Live smoke (both publish + consume directions through the bunker)
unaffected — this is purely a code-style pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
294 lines
12 KiB
Python
294 lines
12 KiB
Python
"""
|
|
NIP-44 v2 — versioned encrypted payloads (https://github.com/nostr-protocol/nips/blob/master/44.md).
|
|
|
|
Hand-rolled because lnbits historically shipped only NIP-04 (AES-CBC) in
|
|
`lnbits.utils.nostr.encrypt_content`, and the locked design at
|
|
aiolabs/satmachineadmin#29 (paired with lamassu-next#56) wires cassette config
|
|
over kind-30078 with NIP-44 v2 encrypted content.
|
|
|
|
## Runtime status (post lnbits PR #38, 2026-05-31)
|
|
|
|
**Runtime usage has migrated to the signer abstraction** via
|
|
`signer.nip44_encrypt` / `signer.nip44_decrypt` on `lnbits.core.signers.base.
|
|
NostrSigner`. For RemoteBunkerSigner-backed accounts the bunker performs the
|
|
crypto and the operator's nsec never leaves the bunker process; for the
|
|
transitional LocalSigner path `cassette_transport._nip44_*_via_signer` falls
|
|
back to the helpers in this module against the stored `account.prvkey`.
|
|
|
|
This module's runtime export footprint is therefore:
|
|
- `encrypt_for` / `decrypt_from` — called by the LocalSigner fallback in
|
|
`cassette_transport` until every operator on the instance is bunker-backed
|
|
(S7 / aiolabs/satmachineadmin#21). Then those calls disappear too.
|
|
- Everything else (encrypt_with_conversation_key, decrypt_with_conversation_key,
|
|
get_conversation_key, padding helpers, error classes) is **test-only**:
|
|
referenced by `tests/test_nip44_v2.py` to validate the wire format against
|
|
the canonical paulmillr/nip44 reference vectors and the bitspire cross-test
|
|
fixture posted to the coordination log.
|
|
|
|
Don't add new runtime call sites here. The signer abstraction is the path.
|
|
|
|
Two safety nets keep the impl honest:
|
|
1. tests/test_nip44_v2.py runs reference vectors + round-trip + tamper-detection.
|
|
2. bitspire posts a sample event encrypted on their nostr-tools side to the
|
|
coord log; test_decrypts_bitspire_sample_event cross-checks our impl
|
|
against theirs by decrypting that event with a known privkey.
|
|
|
|
Wire format (per spec):
|
|
payload = base64( 0x02 || nonce (32B) || ciphertext (var) || mac (32B) )
|
|
|
|
Key derivation:
|
|
conversation_key = HKDF-extract(salt=b"nip44-v2", IKM=ecdh_shared_x) # 32B PRK, stable per pair
|
|
per-message:
|
|
nonce = csprng(32 bytes)
|
|
temp = HKDF-expand(PRK=conversation_key, info=nonce, L=76)
|
|
chacha_key = temp[0:32]
|
|
chacha_nonce = temp[32:44]
|
|
hmac_key = temp[44:76]
|
|
|
|
Padding scheme (NIP-44 v2 length-prefixed, variable-chunk):
|
|
padded = uint16_be(len(plaintext)) || plaintext || zeros
|
|
such that 2 + padded_data_len matches a fixed step.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac as hmac_stdlib
|
|
import os
|
|
import struct
|
|
|
|
import coincurve
|
|
from cryptography.hazmat.primitives import hashes, hmac
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
|
|
|
|
# Spec constants.
|
|
_VERSION = 0x02
|
|
_HKDF_SALT = b"nip44-v2"
|
|
_MIN_PLAINTEXT_LEN = 1
|
|
_MAX_PLAINTEXT_LEN = 65535
|
|
_NONCE_LEN = 32
|
|
_MAC_LEN = 32
|
|
_MIN_PAYLOAD_LEN = (
|
|
1 + _NONCE_LEN + (2 + 32) + _MAC_LEN
|
|
) # version + nonce + min padded + mac
|
|
_MAX_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 65536) + _MAC_LEN
|
|
|
|
|
|
class Nip44Error(Exception):
|
|
"""Generic NIP-44 v2 envelope error. Subclasses distinguish failure modes."""
|
|
|
|
|
|
class Nip44VersionError(Nip44Error):
|
|
"""First payload byte was not 0x02. Could be a NIP-04 envelope, a v1 NIP-44, or garbage."""
|
|
|
|
|
|
class Nip44MacError(Nip44Error):
|
|
"""HMAC verification failed — payload was tampered, wrong conversation key, or corrupted in transit."""
|
|
|
|
|
|
class Nip44LengthError(Nip44Error):
|
|
"""Plaintext or payload length outside the spec-allowed range, or padding header lies."""
|
|
|
|
|
|
# =============================================================================
|
|
# Padding (NIP-44 v2)
|
|
# =============================================================================
|
|
|
|
|
|
def _calc_padded_len(plaintext_len: int) -> int:
|
|
"""Per NIP-44 v2 padding scheme:
|
|
if L <= 32: padded_len = 32
|
|
else: chunk = max(32, next_power_2(L-1) // 8); padded_len = chunk * ((L-1) // chunk + 1)
|
|
"""
|
|
if plaintext_len <= 32:
|
|
return 32
|
|
next_power = 1 << (plaintext_len - 1).bit_length()
|
|
chunk = max(32, next_power // 8)
|
|
return chunk * ((plaintext_len - 1) // chunk + 1)
|
|
|
|
|
|
def _pad(plaintext: bytes) -> bytes:
|
|
"""Prefix uint16_be length + plaintext + zero-fill to the NIP-44 v2 boundary."""
|
|
n = len(plaintext)
|
|
if n < _MIN_PLAINTEXT_LEN or n > _MAX_PLAINTEXT_LEN:
|
|
raise Nip44LengthError(
|
|
f"plaintext length {n} outside [{_MIN_PLAINTEXT_LEN}, {_MAX_PLAINTEXT_LEN}]"
|
|
)
|
|
padded_data_len = _calc_padded_len(n)
|
|
zeros = b"\x00" * (padded_data_len - n)
|
|
return struct.pack(">H", n) + plaintext + zeros
|
|
|
|
|
|
def _unpad(padded: bytes) -> bytes:
|
|
"""Strip the uint16_be length prefix and zero padding. Validates that the
|
|
declared length is consistent with the padded payload (rejects a forged
|
|
length prefix that would slice past the buffer or imply a different
|
|
padded_data_len than what we received)."""
|
|
if len(padded) < 2:
|
|
raise Nip44LengthError("padded payload too short to hold length prefix")
|
|
declared_len = struct.unpack(">H", padded[0:2])[0]
|
|
if declared_len < _MIN_PLAINTEXT_LEN or declared_len > _MAX_PLAINTEXT_LEN:
|
|
raise Nip44LengthError(f"declared plaintext length {declared_len} out of range")
|
|
if len(padded) != 2 + _calc_padded_len(declared_len):
|
|
raise Nip44LengthError(
|
|
f"padded buffer length {len(padded)} doesn't match the calculated padding "
|
|
f"for declared length {declared_len}"
|
|
)
|
|
return padded[2 : 2 + declared_len]
|
|
|
|
|
|
# =============================================================================
|
|
# Conversation + message-key derivation
|
|
# =============================================================================
|
|
|
|
|
|
def get_conversation_key(privkey_hex: str, pubkey_hex: str) -> bytes:
|
|
"""Derive the per-pair stable conversation key (PRK) used for all messages
|
|
between sender (privkey) and recipient (pubkey).
|
|
|
|
Steps:
|
|
shared_x = ECDH(privkey, pubkey).x # 32 bytes, x-coordinate
|
|
prk = HKDF-extract(salt=b"nip44-v2", IKM=shared_x)
|
|
|
|
coincurve's `.multiply(secret).format(compressed=True)[1:]` strips the
|
|
leading 0x02/0x03 parity byte to return the raw x-coord — same trick
|
|
`lnbits.utils.nostr.encrypt_content` uses for NIP-04.
|
|
"""
|
|
sender = coincurve.PrivateKey(bytes.fromhex(privkey_hex))
|
|
recipient_pub = coincurve.PublicKey(b"\x02" + bytes.fromhex(pubkey_hex))
|
|
shared_x = recipient_pub.multiply(sender.secret).format(compressed=True)[1:]
|
|
# HKDF-extract is HMAC-SHA256(key=salt, msg=ikm) per RFC 5869.
|
|
return hmac_stdlib.new(_HKDF_SALT, shared_x, hashlib.sha256).digest()
|
|
|
|
|
|
def _derive_message_keys(
|
|
conversation_key: bytes, nonce: bytes
|
|
) -> tuple[bytes, bytes, bytes]:
|
|
"""Per-message key expansion: HKDF-expand(PRK=conversation_key, info=nonce, L=76).
|
|
Returns (chacha_key 32B, chacha_nonce 12B, hmac_key 32B)."""
|
|
hkdf = HKDFExpand(algorithm=hashes.SHA256(), length=76, info=nonce)
|
|
okm = hkdf.derive(conversation_key)
|
|
return okm[0:32], okm[32:44], okm[44:76]
|
|
|
|
|
|
def _hmac_aad(hmac_key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
|
|
"""HMAC-SHA256(key=hmac_key, msg=nonce || ciphertext). Returns 32-byte MAC."""
|
|
h = hmac.HMAC(hmac_key, hashes.SHA256())
|
|
h.update(nonce)
|
|
h.update(ciphertext)
|
|
return h.finalize()
|
|
|
|
|
|
def _chacha20(key: bytes, nonce: bytes, data: bytes) -> bytes:
|
|
"""ChaCha20 stream cipher (symmetric: encrypt == decrypt). Used both directions.
|
|
|
|
The `cryptography` lib's `algorithms.ChaCha20(key, nonce)` expects a
|
|
16-byte nonce arg: a 4-byte little-endian initial counter prefix +
|
|
12-byte actual nonce. NIP-44 v2 starts the counter at 0 and uses the
|
|
HKDF-derived 12-byte chacha_nonce, so we prefix four zero bytes here.
|
|
"""
|
|
if len(nonce) != 12:
|
|
raise Nip44LengthError(
|
|
f"chacha_nonce must be 12 bytes (NIP-44 v2), got {len(nonce)}"
|
|
)
|
|
cipher = Cipher(algorithms.ChaCha20(key, b"\x00\x00\x00\x00" + nonce), mode=None)
|
|
return cipher.encryptor().update(data)
|
|
|
|
|
|
# =============================================================================
|
|
# Public API — low-level (nonce-controllable for testability)
|
|
# =============================================================================
|
|
|
|
|
|
def encrypt_with_conversation_key(
|
|
plaintext: str,
|
|
conversation_key: bytes,
|
|
*,
|
|
nonce: bytes | None = None,
|
|
) -> str:
|
|
"""Encrypt `plaintext` under a precomputed `conversation_key` (32B PRK).
|
|
|
|
`nonce` is 32 random bytes when omitted (the production path). Tests pass
|
|
it explicitly to assert pinned reference vectors.
|
|
|
|
Returns the base64-encoded payload string suitable as a Nostr event's
|
|
`content` field for kind-30078 (and any other kind that uses NIP-44 v2).
|
|
"""
|
|
if nonce is None:
|
|
nonce = os.urandom(_NONCE_LEN)
|
|
elif len(nonce) != _NONCE_LEN:
|
|
raise Nip44LengthError(f"nonce must be exactly {_NONCE_LEN} bytes")
|
|
|
|
padded = _pad(plaintext.encode("utf-8"))
|
|
chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce)
|
|
ciphertext = _chacha20(chacha_key, chacha_nonce, padded)
|
|
mac = _hmac_aad(hmac_key, nonce, ciphertext)
|
|
return base64.b64encode(bytes([_VERSION]) + nonce + ciphertext + mac).decode(
|
|
"ascii"
|
|
)
|
|
|
|
|
|
def decrypt_with_conversation_key(payload_b64: str, conversation_key: bytes) -> str:
|
|
"""Decrypt a NIP-44 v2 payload using a precomputed `conversation_key`.
|
|
|
|
Raises:
|
|
Nip44VersionError — payload's first byte isn't 0x02
|
|
Nip44LengthError — payload too short / too long / declared length lies
|
|
Nip44MacError — HMAC verification failed (tamper, wrong key, corruption)
|
|
"""
|
|
try:
|
|
raw = base64.b64decode(payload_b64, validate=True)
|
|
except (
|
|
Exception
|
|
) as exc:
|
|
raise Nip44LengthError(f"payload is not valid base64: {exc}") from exc
|
|
|
|
if len(raw) < _MIN_PAYLOAD_LEN or len(raw) > _MAX_PAYLOAD_LEN:
|
|
raise Nip44LengthError(f"payload length {len(raw)} outside valid range")
|
|
if raw[0] != _VERSION:
|
|
raise Nip44VersionError(f"unsupported NIP-44 version: 0x{raw[0]:02x}")
|
|
|
|
nonce = raw[1 : 1 + _NONCE_LEN]
|
|
mac_received = raw[-_MAC_LEN:]
|
|
ciphertext = raw[1 + _NONCE_LEN : -_MAC_LEN]
|
|
|
|
chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce)
|
|
mac_expected = _hmac_aad(hmac_key, nonce, ciphertext)
|
|
# constant-time compare to avoid timing-leak in MAC verification
|
|
if not hmac_stdlib.compare_digest(mac_received, mac_expected):
|
|
raise Nip44MacError("HMAC verification failed")
|
|
|
|
padded = _chacha20(chacha_key, chacha_nonce, ciphertext)
|
|
plaintext_bytes = _unpad(padded)
|
|
return plaintext_bytes.decode("utf-8")
|
|
|
|
|
|
# =============================================================================
|
|
# Public API — high-level (pair-keyed, the call shape app code reaches for)
|
|
# =============================================================================
|
|
|
|
|
|
def encrypt_for(
|
|
plaintext: str,
|
|
sender_privkey_hex: str,
|
|
recipient_pubkey_hex: str,
|
|
*,
|
|
nonce: bytes | None = None,
|
|
) -> str:
|
|
"""Encrypt `plaintext` from the sender (holding the privkey) to the recipient
|
|
(identified by pubkey). The recipient can decrypt with `decrypt_from(
|
|
payload, recipient_privkey_hex, sender_pubkey_hex)` — symmetric on the
|
|
conversation key, which is the same derived value from either side."""
|
|
conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex)
|
|
return encrypt_with_conversation_key(plaintext, conversation_key, nonce=nonce)
|
|
|
|
|
|
def decrypt_from(
|
|
payload_b64: str, recipient_privkey_hex: str, sender_pubkey_hex: str
|
|
) -> str:
|
|
"""Decrypt a payload that the recipient (holding the privkey) received from
|
|
the sender (identified by pubkey)."""
|
|
conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex)
|
|
return decrypt_with_conversation_key(payload_b64, conversation_key)
|