Compare commits
No commits in common. "da07bae554976e9044134c1b4e47b3ea41f0062d" and "58a097411723228d0c382a92b57572af9e2598aa" have entirely different histories.
da07bae554
...
58a0974117
6 changed files with 0 additions and 1076 deletions
152
crud.py
152
crud.py
|
|
@ -12,7 +12,6 @@ from lnbits.db import Database
|
||||||
from lnbits.helpers import urlsafe_short_hash
|
from lnbits.helpers import urlsafe_short_hash
|
||||||
|
|
||||||
from .models import (
|
from .models import (
|
||||||
CassetteConfig,
|
|
||||||
ClientBalanceSummary,
|
ClientBalanceSummary,
|
||||||
CommissionSplit,
|
CommissionSplit,
|
||||||
CommissionSplitLeg,
|
CommissionSplitLeg,
|
||||||
|
|
@ -27,7 +26,6 @@ from .models import (
|
||||||
DcaPayment,
|
DcaPayment,
|
||||||
DcaSettlement,
|
DcaSettlement,
|
||||||
Machine,
|
Machine,
|
||||||
PublishCassettesPayload,
|
|
||||||
SuperConfig,
|
SuperConfig,
|
||||||
TelemetrySnapshot,
|
TelemetrySnapshot,
|
||||||
UpdateDcaClientData,
|
UpdateDcaClientData,
|
||||||
|
|
@ -35,7 +33,6 @@ from .models import (
|
||||||
UpdateDepositStatusData,
|
UpdateDepositStatusData,
|
||||||
UpdateMachineData,
|
UpdateMachineData,
|
||||||
UpdateSuperConfigData,
|
UpdateSuperConfigData,
|
||||||
UpsertCassetteConfigData,
|
|
||||||
UpsertDcaLpData,
|
UpsertDcaLpData,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -1337,152 +1334,3 @@ async def upsert_fleet_snapshot(
|
||||||
{"mid": machine_id, "json": telemetry_json, "now": now},
|
{"mid": machine_id, "json": telemetry_json, "now": now},
|
||||||
)
|
)
|
||||||
return await get_telemetry(machine_id)
|
return await get_telemetry(machine_id)
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Cassette configs — operator-driven ATM cassette inventory (#29).
|
|
||||||
# =============================================================================
|
|
||||||
# Row lifecycle per #29:
|
|
||||||
# - First population for a (machine_id, denomination) pair → apply_bootstrap_state
|
|
||||||
# (consumer reading the ATM's one-shot bitspire-cassettes-state event)
|
|
||||||
# - Operator edit of count or position → update_cassette_config (refuses to
|
|
||||||
# create new rows; the denomination set is hardware-determined)
|
|
||||||
# - Row creation/deletion for a new denomination → admin only, via ATM
|
|
||||||
# re-provisioning + new bootstrap event (not exposed in v1 here)
|
|
||||||
|
|
||||||
|
|
||||||
def _should_apply_bootstrap_state(
|
|
||||||
existing_state_event_id: Optional[str], incoming_event_id: str
|
|
||||||
) -> bool:
|
|
||||||
"""Pure-function dedup gate for apply_bootstrap_state.
|
|
||||||
|
|
||||||
Returns False if any existing row for this machine already references
|
|
||||||
the incoming event_id (relay re-delivery after restart). True otherwise.
|
|
||||||
|
|
||||||
Extracted as a pure function so the dedup decision is unit-testable
|
|
||||||
without a database round-trip. The actual idempotency check in
|
|
||||||
apply_bootstrap_state fetches one existing row and passes its
|
|
||||||
state_event_id here.
|
|
||||||
"""
|
|
||||||
return existing_state_event_id != incoming_event_id
|
|
||||||
|
|
||||||
|
|
||||||
async def get_cassette_config(
|
|
||||||
machine_id: str, denomination: int
|
|
||||||
) -> Optional[CassetteConfig]:
|
|
||||||
return await db.fetchone(
|
|
||||||
"SELECT * FROM satoshimachine.cassette_configs "
|
|
||||||
"WHERE machine_id = :mid AND denomination = :denom",
|
|
||||||
{"mid": machine_id, "denom": denomination},
|
|
||||||
CassetteConfig,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def list_cassette_configs_for_machine(
|
|
||||||
machine_id: str,
|
|
||||||
) -> List[CassetteConfig]:
|
|
||||||
return await db.fetchall(
|
|
||||||
"SELECT * FROM satoshimachine.cassette_configs "
|
|
||||||
"WHERE machine_id = :mid ORDER BY position, denomination",
|
|
||||||
{"mid": machine_id},
|
|
||||||
CassetteConfig,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def update_cassette_config(
|
|
||||||
machine_id: str,
|
|
||||||
denomination: int,
|
|
||||||
data: UpsertCassetteConfigData,
|
|
||||||
*,
|
|
||||||
updated_by: Optional[str] = None,
|
|
||||||
) -> Optional[CassetteConfig]:
|
|
||||||
"""Operator-driven row update: change count and/or position for a single
|
|
||||||
cassette. Refuses to create new rows — those only land via
|
|
||||||
apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row
|
|
||||||
lifecycle: hardware-determined denomination set, not operator-creatable).
|
|
||||||
Returns None if the (machine_id, denomination) row doesn't exist.
|
|
||||||
"""
|
|
||||||
existing = await get_cassette_config(machine_id, denomination)
|
|
||||||
if existing is None:
|
|
||||||
return None
|
|
||||||
update_data: dict = {k: v for k, v in data.dict().items() if v is not None}
|
|
||||||
if not update_data:
|
|
||||||
return existing
|
|
||||||
update_data["updated_at"] = datetime.now()
|
|
||||||
update_data["updated_by"] = updated_by
|
|
||||||
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
|
|
||||||
update_data["mid"] = machine_id
|
|
||||||
update_data["denom"] = denomination
|
|
||||||
await db.execute(
|
|
||||||
f"UPDATE satoshimachine.cassette_configs SET {set_clause} "
|
|
||||||
"WHERE machine_id = :mid AND denomination = :denom",
|
|
||||||
update_data,
|
|
||||||
)
|
|
||||||
return await get_cassette_config(machine_id, denomination)
|
|
||||||
|
|
||||||
|
|
||||||
async def apply_bootstrap_state(
|
|
||||||
machine_id: str,
|
|
||||||
event_id: str,
|
|
||||||
event_created_at: datetime,
|
|
||||||
payload: PublishCassettesPayload,
|
|
||||||
) -> bool:
|
|
||||||
"""Consume an ATM-published kind-30078 bitspire-cassettes-state:<m> event
|
|
||||||
and upsert one cassette_configs row per denomination in the payload.
|
|
||||||
|
|
||||||
Returns True if the upsert ran; False if any existing row for this
|
|
||||||
machine already references this event_id (idempotent on relay
|
|
||||||
re-delivery / restart).
|
|
||||||
|
|
||||||
Populates both the operator-believed columns (count, position,
|
|
||||||
updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel
|
|
||||||
columns (state_count, state_at, state_event_id) so the operator's
|
|
||||||
initial view matches the ATM's reported state. v2 reconciliation UI
|
|
||||||
will diverge them when continuous reverse-channel events land.
|
|
||||||
"""
|
|
||||||
existing_first = await db.fetchone(
|
|
||||||
"SELECT state_event_id FROM satoshimachine.cassette_configs "
|
|
||||||
"WHERE machine_id = :mid LIMIT 1",
|
|
||||||
{"mid": machine_id},
|
|
||||||
)
|
|
||||||
existing_event_id: Optional[str] = None
|
|
||||||
if existing_first is not None:
|
|
||||||
existing_event_id = (
|
|
||||||
existing_first.get("state_event_id")
|
|
||||||
if isinstance(existing_first, dict)
|
|
||||||
else getattr(existing_first, "state_event_id", None)
|
|
||||||
)
|
|
||||||
if not _should_apply_bootstrap_state(existing_event_id, event_id):
|
|
||||||
return False
|
|
||||||
|
|
||||||
now = datetime.now()
|
|
||||||
for denom, row in payload.denominations.items():
|
|
||||||
await db.execute(
|
|
||||||
"""
|
|
||||||
INSERT INTO satoshimachine.cassette_configs
|
|
||||||
(machine_id, denomination, count, position, updated_at,
|
|
||||||
updated_by, state_count, state_at, state_event_id)
|
|
||||||
VALUES (:mid, :denom, :count, :pos, :now, :by,
|
|
||||||
:state_count, :state_at, :event_id)
|
|
||||||
ON CONFLICT (machine_id, denomination) DO UPDATE SET
|
|
||||||
count = excluded.count,
|
|
||||||
position = excluded.position,
|
|
||||||
updated_at = excluded.updated_at,
|
|
||||||
updated_by = excluded.updated_by,
|
|
||||||
state_count = excluded.state_count,
|
|
||||||
state_at = excluded.state_at,
|
|
||||||
state_event_id = excluded.state_event_id
|
|
||||||
""",
|
|
||||||
{
|
|
||||||
"mid": machine_id,
|
|
||||||
"denom": denom,
|
|
||||||
"count": row.count,
|
|
||||||
"pos": row.position,
|
|
||||||
"now": now,
|
|
||||||
"by": "atm-bootstrap",
|
|
||||||
"state_count": row.count,
|
|
||||||
"state_at": event_created_at,
|
|
||||||
"event_id": event_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
|
||||||
|
|
@ -538,40 +538,3 @@ async def m005_lock_deposit_currency_to_machine_fiat_code(db):
|
||||||
AND m.fiat_code != d.currency
|
AND m.fiat_code != d.currency
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
|
||||||
async def m007_add_cassette_configs(db):
|
|
||||||
"""Add cassette_configs table for operator-driven ATM cassette inventory.
|
|
||||||
|
|
||||||
Tracks per-machine cassette state (denomination, count, position) editable
|
|
||||||
via the satmachineadmin dashboard and published to the ATM as encrypted
|
|
||||||
kind-30078 events. See aiolabs/satmachineadmin#29 + lamassu-next#56.
|
|
||||||
|
|
||||||
Schema choice: PK (machine_id, denomination) mirrors the ATM-side
|
|
||||||
denomination-as-key invariant in
|
|
||||||
bitspire/atm-tui/src/db.zig:31 and
|
|
||||||
lamassu-next/apps/machine/electron/state-store.ts:54
|
|
||||||
(the cassettes table PK is denomination; HAL inventory map keys on
|
|
||||||
denomination; dispense lookup is cassetteDenominations.indexOf —
|
|
||||||
duplicates collapse silently). Position is operator-assignable display
|
|
||||||
order, not the addressable unit.
|
|
||||||
|
|
||||||
Reserved nullable columns (state_count, state_at, state_event_id) hold
|
|
||||||
the latest bitspire-cassettes-state:<atm_pubkey_hex> event the ATM
|
|
||||||
publishes (one-shot bootstrap in v1; continuous in v2). v1 UI doesn't
|
|
||||||
render them; v2 reconciliation UI consumes them without a migration.
|
|
||||||
"""
|
|
||||||
await db.execute(f"""
|
|
||||||
CREATE TABLE IF NOT EXISTS satoshimachine.cassette_configs (
|
|
||||||
machine_id TEXT NOT NULL,
|
|
||||||
denomination INTEGER NOT NULL,
|
|
||||||
count INTEGER NOT NULL,
|
|
||||||
position INTEGER NOT NULL,
|
|
||||||
updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now},
|
|
||||||
updated_by TEXT,
|
|
||||||
state_count INTEGER,
|
|
||||||
state_at TIMESTAMP,
|
|
||||||
state_event_id TEXT,
|
|
||||||
PRIMARY KEY (machine_id, denomination)
|
|
||||||
);
|
|
||||||
""")
|
|
||||||
|
|
|
||||||
124
models.py
124
models.py
|
|
@ -546,127 +546,3 @@ class SettleBalanceData(BaseModel):
|
||||||
if v <= 0:
|
if v <= 0:
|
||||||
raise ValueError("amount_fiat must be > 0 if specified")
|
raise ValueError("amount_fiat must be > 0 if specified")
|
||||||
return round(float(v), 2)
|
return round(float(v), 2)
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Cassette configs — operator-driven ATM cassette inventory (#29).
|
|
||||||
# =============================================================================
|
|
||||||
# Schema is denomination-keyed per the locked design (#29 body + the
|
|
||||||
# 06:40Z coord-log audit): every ATM-side layer below the wire keys on
|
|
||||||
# denomination (state-store.ts:54, hal-service.ts:116/189). The
|
|
||||||
# satmachineadmin schema mirrors this so the operator UI can't author a
|
|
||||||
# duplicate-denomination payload that the ATM would silently collapse.
|
|
||||||
#
|
|
||||||
# Position is operator-assignable display order (and used by the ATM as
|
|
||||||
# the HAL slot-index assignment), not the addressable unit.
|
|
||||||
#
|
|
||||||
# state_count / state_at / state_event_id are reserved nullable from day 1
|
|
||||||
# for the v2 reverse-channel reconciliation consumer (bitspire-cassettes-
|
|
||||||
# state:<atm_pubkey_hex>). v1 populates them on bootstrap-event receipt
|
|
||||||
# but the UI doesn't render reconciliation.
|
|
||||||
|
|
||||||
|
|
||||||
class CassetteConfig(BaseModel):
|
|
||||||
machine_id: str
|
|
||||||
denomination: int
|
|
||||||
count: int
|
|
||||||
position: int
|
|
||||||
updated_at: datetime
|
|
||||||
updated_by: Optional[str]
|
|
||||||
state_count: Optional[int]
|
|
||||||
state_at: Optional[datetime]
|
|
||||||
state_event_id: Optional[str]
|
|
||||||
|
|
||||||
|
|
||||||
class UpsertCassetteConfigData(BaseModel):
|
|
||||||
"""Operator edits a single cassette row's count or position from the
|
|
||||||
dashboard. Both fields optional; pass only those changed."""
|
|
||||||
|
|
||||||
count: Optional[int] = None
|
|
||||||
position: Optional[int] = None
|
|
||||||
|
|
||||||
@validator("count")
|
|
||||||
def count_non_negative(cls, v):
|
|
||||||
if v is None:
|
|
||||||
return v
|
|
||||||
if v < 0:
|
|
||||||
raise ValueError("count must be >= 0")
|
|
||||||
return v
|
|
||||||
|
|
||||||
@validator("position")
|
|
||||||
def position_positive(cls, v):
|
|
||||||
if v is None:
|
|
||||||
return v
|
|
||||||
if v <= 0:
|
|
||||||
raise ValueError("position must be > 0")
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
class CassettePayloadRow(BaseModel):
|
|
||||||
"""One denomination's payload values in the wire-format
|
|
||||||
`{"denominations": {"<denom>": {"position", "count"}}}`."""
|
|
||||||
|
|
||||||
position: int
|
|
||||||
count: int
|
|
||||||
|
|
||||||
@validator("position")
|
|
||||||
def position_positive(cls, v):
|
|
||||||
if v <= 0:
|
|
||||||
raise ValueError("position must be > 0")
|
|
||||||
return v
|
|
||||||
|
|
||||||
@validator("count")
|
|
||||||
def count_non_negative(cls, v):
|
|
||||||
if v < 0:
|
|
||||||
raise ValueError("count must be >= 0")
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
class PublishCassettesPayload(BaseModel):
|
|
||||||
"""The decrypted JSON content of a kind-30078 cassette event, both
|
|
||||||
directions:
|
|
||||||
- operator → ATM (d-tag `bitspire-cassettes:<atm_pubkey_hex>`)
|
|
||||||
- ATM → operator (d-tag `bitspire-cassettes-state:<atm_pubkey_hex>`)
|
|
||||||
|
|
||||||
Wire shape: `{"denominations": {"<denom_str>": {"position", "count"}}}`.
|
|
||||||
JSON object keys are always strings; the validator coerces back to
|
|
||||||
int on parse. The denomination key set MUST match what the receiver
|
|
||||||
already has (no add / no remove from this payload).
|
|
||||||
"""
|
|
||||||
|
|
||||||
denominations: dict[int, CassettePayloadRow]
|
|
||||||
|
|
||||||
@validator("denominations", pre=True)
|
|
||||||
def coerce_string_keys_to_int(cls, v):
|
|
||||||
if not isinstance(v, dict):
|
|
||||||
raise ValueError("denominations must be a dict")
|
|
||||||
out = {}
|
|
||||||
for k, val in v.items():
|
|
||||||
try:
|
|
||||||
key_int = int(k)
|
|
||||||
except (TypeError, ValueError) as exc:
|
|
||||||
raise ValueError(
|
|
||||||
f"denomination key {k!r} is not an int"
|
|
||||||
) from exc
|
|
||||||
if key_int <= 0:
|
|
||||||
raise ValueError(f"denomination must be > 0 (got {key_int})")
|
|
||||||
out[key_int] = val
|
|
||||||
return out
|
|
||||||
|
|
||||||
@validator("denominations")
|
|
||||||
def no_duplicate_positions(cls, v):
|
|
||||||
positions = [row.position for row in v.values()]
|
|
||||||
if len(set(positions)) != len(positions):
|
|
||||||
raise ValueError("duplicate position values in payload")
|
|
||||||
return v
|
|
||||||
|
|
||||||
def to_wire_dict(self) -> dict:
|
|
||||||
"""Serialise back to the wire format with string keys for JSON
|
|
||||||
object compatibility. Used by the publisher to build the kind-30078
|
|
||||||
event content before NIP-44 v2 encryption."""
|
|
||||||
return {
|
|
||||||
"denominations": {
|
|
||||||
str(denom): {"position": row.position, "count": row.count}
|
|
||||||
for denom, row in self.denominations.items()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
271
nip44.py
271
nip44.py
|
|
@ -1,271 +0,0 @@
|
||||||
"""
|
|
||||||
NIP-44 v2 — versioned encrypted payloads (https://github.com/nostr-protocol/nips/blob/master/44.md).
|
|
||||||
|
|
||||||
Hand-rolled because lnbits ships only NIP-04 (AES-CBC) in `lnbits.utils.nostr.encrypt_content`,
|
|
||||||
and the locked design at aiolabs/satmachineadmin#29 (paired with lamassu-next#56) wires
|
|
||||||
cassette config over kind-30078 with NIP-44 v2 encrypted content. Adding a Python NIP-44
|
|
||||||
v2 lib dep was an option per the plan; chose the hand-roll path to stay dep-light and
|
|
||||||
keep the impl auditable inline.
|
|
||||||
|
|
||||||
Two safety nets keep this honest:
|
|
||||||
1. tests/test_nip44_v2.py runs reference vectors + round-trip + tamper-detection.
|
|
||||||
2. bitspire posts a sample event encrypted on their nostr-tools side to the coord log;
|
|
||||||
test_decrypts_bitspire_sample_event_from_coord_log cross-checks our impl against
|
|
||||||
theirs by decrypting that event with a known privkey.
|
|
||||||
|
|
||||||
Wire format (per spec):
|
|
||||||
payload = base64( 0x02 || nonce (32B) || ciphertext (var) || mac (32B) )
|
|
||||||
|
|
||||||
Key derivation:
|
|
||||||
conversation_key = HKDF-extract(salt=b"nip44-v2", IKM=ecdh_shared_x) # 32B PRK, stable per pair
|
|
||||||
per-message:
|
|
||||||
nonce = csprng(32 bytes)
|
|
||||||
temp = HKDF-expand(PRK=conversation_key, info=nonce, L=76)
|
|
||||||
chacha_key = temp[0:32]
|
|
||||||
chacha_nonce = temp[32:44]
|
|
||||||
hmac_key = temp[44:76]
|
|
||||||
|
|
||||||
Padding scheme (NIP-44 v2 length-prefixed, variable-chunk):
|
|
||||||
padded = uint16_be(len(plaintext)) || plaintext || zeros
|
|
||||||
such that 2 + padded_data_len matches a fixed step.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import base64
|
|
||||||
import hashlib
|
|
||||||
import hmac as hmac_stdlib
|
|
||||||
import os
|
|
||||||
import struct
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import coincurve
|
|
||||||
from cryptography.hazmat.primitives import hashes, hmac
|
|
||||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
|
||||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
|
|
||||||
|
|
||||||
# Spec constants.
|
|
||||||
_VERSION = 0x02
|
|
||||||
_HKDF_SALT = b"nip44-v2"
|
|
||||||
_MIN_PLAINTEXT_LEN = 1
|
|
||||||
_MAX_PLAINTEXT_LEN = 65535
|
|
||||||
_NONCE_LEN = 32
|
|
||||||
_MAC_LEN = 32
|
|
||||||
_MIN_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 32) + _MAC_LEN # version + nonce + min padded + mac
|
|
||||||
_MAX_PAYLOAD_LEN = 1 + _NONCE_LEN + (2 + 65536) + _MAC_LEN
|
|
||||||
|
|
||||||
|
|
||||||
class Nip44Error(Exception):
|
|
||||||
"""Generic NIP-44 v2 envelope error. Subclasses distinguish failure modes."""
|
|
||||||
|
|
||||||
|
|
||||||
class Nip44VersionError(Nip44Error):
|
|
||||||
"""First payload byte was not 0x02. Could be a NIP-04 envelope, a v1 NIP-44, or garbage."""
|
|
||||||
|
|
||||||
|
|
||||||
class Nip44MacError(Nip44Error):
|
|
||||||
"""HMAC verification failed — payload was tampered, wrong conversation key, or corrupted in transit."""
|
|
||||||
|
|
||||||
|
|
||||||
class Nip44LengthError(Nip44Error):
|
|
||||||
"""Plaintext or payload length outside the spec-allowed range, or padding header lies."""
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Padding (NIP-44 v2)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
def _calc_padded_len(plaintext_len: int) -> int:
|
|
||||||
"""Per NIP-44 v2 padding scheme:
|
|
||||||
if L <= 32: padded_len = 32
|
|
||||||
else: chunk = max(32, next_power_2(L-1) // 8); padded_len = chunk * ((L-1) // chunk + 1)
|
|
||||||
"""
|
|
||||||
if plaintext_len <= 32:
|
|
||||||
return 32
|
|
||||||
next_power = 1 << (plaintext_len - 1).bit_length()
|
|
||||||
chunk = max(32, next_power // 8)
|
|
||||||
return chunk * ((plaintext_len - 1) // chunk + 1)
|
|
||||||
|
|
||||||
|
|
||||||
def _pad(plaintext: bytes) -> bytes:
|
|
||||||
"""Prefix uint16_be length + plaintext + zero-fill to the NIP-44 v2 boundary."""
|
|
||||||
n = len(plaintext)
|
|
||||||
if n < _MIN_PLAINTEXT_LEN or n > _MAX_PLAINTEXT_LEN:
|
|
||||||
raise Nip44LengthError(
|
|
||||||
f"plaintext length {n} outside [{_MIN_PLAINTEXT_LEN}, {_MAX_PLAINTEXT_LEN}]"
|
|
||||||
)
|
|
||||||
padded_data_len = _calc_padded_len(n)
|
|
||||||
zeros = b"\x00" * (padded_data_len - n)
|
|
||||||
return struct.pack(">H", n) + plaintext + zeros
|
|
||||||
|
|
||||||
|
|
||||||
def _unpad(padded: bytes) -> bytes:
|
|
||||||
"""Strip the uint16_be length prefix and zero padding. Validates that the
|
|
||||||
declared length is consistent with the padded payload (rejects a forged
|
|
||||||
length prefix that would slice past the buffer or imply a different
|
|
||||||
padded_data_len than what we received)."""
|
|
||||||
if len(padded) < 2:
|
|
||||||
raise Nip44LengthError("padded payload too short to hold length prefix")
|
|
||||||
declared_len = struct.unpack(">H", padded[0:2])[0]
|
|
||||||
if declared_len < _MIN_PLAINTEXT_LEN or declared_len > _MAX_PLAINTEXT_LEN:
|
|
||||||
raise Nip44LengthError(f"declared plaintext length {declared_len} out of range")
|
|
||||||
if len(padded) != 2 + _calc_padded_len(declared_len):
|
|
||||||
raise Nip44LengthError(
|
|
||||||
f"padded buffer length {len(padded)} doesn't match the calculated padding "
|
|
||||||
f"for declared length {declared_len}"
|
|
||||||
)
|
|
||||||
return padded[2 : 2 + declared_len]
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Conversation + message-key derivation
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
def get_conversation_key(privkey_hex: str, pubkey_hex: str) -> bytes:
|
|
||||||
"""Derive the per-pair stable conversation key (PRK) used for all messages
|
|
||||||
between sender (privkey) and recipient (pubkey).
|
|
||||||
|
|
||||||
Steps:
|
|
||||||
shared_x = ECDH(privkey, pubkey).x # 32 bytes, x-coordinate
|
|
||||||
prk = HKDF-extract(salt=b"nip44-v2", IKM=shared_x)
|
|
||||||
|
|
||||||
coincurve's `.multiply(secret).format(compressed=True)[1:]` strips the
|
|
||||||
leading 0x02/0x03 parity byte to return the raw x-coord — same trick
|
|
||||||
`lnbits.utils.nostr.encrypt_content` uses for NIP-04.
|
|
||||||
"""
|
|
||||||
sender = coincurve.PrivateKey(bytes.fromhex(privkey_hex))
|
|
||||||
recipient_pub = coincurve.PublicKey(b"\x02" + bytes.fromhex(pubkey_hex))
|
|
||||||
shared_x = recipient_pub.multiply(sender.secret).format(compressed=True)[1:]
|
|
||||||
# HKDF-extract is HMAC-SHA256(key=salt, msg=ikm) per RFC 5869.
|
|
||||||
return hmac_stdlib.new(_HKDF_SALT, shared_x, hashlib.sha256).digest()
|
|
||||||
|
|
||||||
|
|
||||||
def _derive_message_keys(
|
|
||||||
conversation_key: bytes, nonce: bytes
|
|
||||||
) -> tuple[bytes, bytes, bytes]:
|
|
||||||
"""Per-message key expansion: HKDF-expand(PRK=conversation_key, info=nonce, L=76).
|
|
||||||
Returns (chacha_key 32B, chacha_nonce 12B, hmac_key 32B)."""
|
|
||||||
hkdf = HKDFExpand(algorithm=hashes.SHA256(), length=76, info=nonce)
|
|
||||||
okm = hkdf.derive(conversation_key)
|
|
||||||
return okm[0:32], okm[32:44], okm[44:76]
|
|
||||||
|
|
||||||
|
|
||||||
def _hmac_aad(hmac_key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
|
|
||||||
"""HMAC-SHA256(key=hmac_key, msg=nonce || ciphertext). Returns 32-byte MAC."""
|
|
||||||
h = hmac.HMAC(hmac_key, hashes.SHA256())
|
|
||||||
h.update(nonce)
|
|
||||||
h.update(ciphertext)
|
|
||||||
return h.finalize()
|
|
||||||
|
|
||||||
|
|
||||||
def _chacha20(key: bytes, nonce: bytes, data: bytes) -> bytes:
|
|
||||||
"""ChaCha20 stream cipher (symmetric: encrypt == decrypt). Used both directions.
|
|
||||||
|
|
||||||
The `cryptography` lib's `algorithms.ChaCha20(key, nonce)` expects a
|
|
||||||
16-byte nonce arg: a 4-byte little-endian initial counter prefix +
|
|
||||||
12-byte actual nonce. NIP-44 v2 starts the counter at 0 and uses the
|
|
||||||
HKDF-derived 12-byte chacha_nonce, so we prefix four zero bytes here.
|
|
||||||
"""
|
|
||||||
if len(nonce) != 12:
|
|
||||||
raise Nip44LengthError(
|
|
||||||
f"chacha_nonce must be 12 bytes (NIP-44 v2), got {len(nonce)}"
|
|
||||||
)
|
|
||||||
cipher = Cipher(algorithms.ChaCha20(key, b"\x00\x00\x00\x00" + nonce), mode=None)
|
|
||||||
return cipher.encryptor().update(data)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Public API — low-level (nonce-controllable for testability)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
def encrypt_with_conversation_key(
|
|
||||||
plaintext: str,
|
|
||||||
conversation_key: bytes,
|
|
||||||
*,
|
|
||||||
nonce: Optional[bytes] = None,
|
|
||||||
) -> str:
|
|
||||||
"""Encrypt `plaintext` under a precomputed `conversation_key` (32B PRK).
|
|
||||||
|
|
||||||
`nonce` is 32 random bytes when omitted (the production path). Tests pass
|
|
||||||
it explicitly to assert pinned reference vectors.
|
|
||||||
|
|
||||||
Returns the base64-encoded payload string suitable as a Nostr event's
|
|
||||||
`content` field for kind-30078 (and any other kind that uses NIP-44 v2).
|
|
||||||
"""
|
|
||||||
if nonce is None:
|
|
||||||
nonce = os.urandom(_NONCE_LEN)
|
|
||||||
elif len(nonce) != _NONCE_LEN:
|
|
||||||
raise Nip44LengthError(f"nonce must be exactly {_NONCE_LEN} bytes")
|
|
||||||
|
|
||||||
padded = _pad(plaintext.encode("utf-8"))
|
|
||||||
chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce)
|
|
||||||
ciphertext = _chacha20(chacha_key, chacha_nonce, padded)
|
|
||||||
mac = _hmac_aad(hmac_key, nonce, ciphertext)
|
|
||||||
return base64.b64encode(
|
|
||||||
bytes([_VERSION]) + nonce + ciphertext + mac
|
|
||||||
).decode("ascii")
|
|
||||||
|
|
||||||
|
|
||||||
def decrypt_with_conversation_key(payload_b64: str, conversation_key: bytes) -> str:
|
|
||||||
"""Decrypt a NIP-44 v2 payload using a precomputed `conversation_key`.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
Nip44VersionError — payload's first byte isn't 0x02
|
|
||||||
Nip44LengthError — payload too short / too long / declared length lies
|
|
||||||
Nip44MacError — HMAC verification failed (tamper, wrong key, corruption)
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
raw = base64.b64decode(payload_b64, validate=True)
|
|
||||||
except Exception as exc: # noqa: BLE001 — we want any base64 failure surfaced uniformly
|
|
||||||
raise Nip44LengthError(f"payload is not valid base64: {exc}") from exc
|
|
||||||
|
|
||||||
if len(raw) < _MIN_PAYLOAD_LEN or len(raw) > _MAX_PAYLOAD_LEN:
|
|
||||||
raise Nip44LengthError(f"payload length {len(raw)} outside valid range")
|
|
||||||
if raw[0] != _VERSION:
|
|
||||||
raise Nip44VersionError(f"unsupported NIP-44 version: 0x{raw[0]:02x}")
|
|
||||||
|
|
||||||
nonce = raw[1 : 1 + _NONCE_LEN]
|
|
||||||
mac_received = raw[-_MAC_LEN:]
|
|
||||||
ciphertext = raw[1 + _NONCE_LEN : -_MAC_LEN]
|
|
||||||
|
|
||||||
chacha_key, chacha_nonce, hmac_key = _derive_message_keys(conversation_key, nonce)
|
|
||||||
mac_expected = _hmac_aad(hmac_key, nonce, ciphertext)
|
|
||||||
# constant-time compare to avoid timing-leak in MAC verification
|
|
||||||
if not hmac_stdlib.compare_digest(mac_received, mac_expected):
|
|
||||||
raise Nip44MacError("HMAC verification failed")
|
|
||||||
|
|
||||||
padded = _chacha20(chacha_key, chacha_nonce, ciphertext)
|
|
||||||
plaintext_bytes = _unpad(padded)
|
|
||||||
return plaintext_bytes.decode("utf-8")
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Public API — high-level (pair-keyed, the call shape app code reaches for)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
def encrypt_for(
|
|
||||||
plaintext: str,
|
|
||||||
sender_privkey_hex: str,
|
|
||||||
recipient_pubkey_hex: str,
|
|
||||||
*,
|
|
||||||
nonce: Optional[bytes] = None,
|
|
||||||
) -> str:
|
|
||||||
"""Encrypt `plaintext` from the sender (holding the privkey) to the recipient
|
|
||||||
(identified by pubkey). The recipient can decrypt with `decrypt_from(
|
|
||||||
payload, recipient_privkey_hex, sender_pubkey_hex)` — symmetric on the
|
|
||||||
conversation key, which is the same derived value from either side."""
|
|
||||||
conversation_key = get_conversation_key(sender_privkey_hex, recipient_pubkey_hex)
|
|
||||||
return encrypt_with_conversation_key(plaintext, conversation_key, nonce=nonce)
|
|
||||||
|
|
||||||
|
|
||||||
def decrypt_from(
|
|
||||||
payload_b64: str, recipient_privkey_hex: str, sender_pubkey_hex: str
|
|
||||||
) -> str:
|
|
||||||
"""Decrypt a payload that the recipient (holding the privkey) received from
|
|
||||||
the sender (identified by pubkey)."""
|
|
||||||
conversation_key = get_conversation_key(recipient_privkey_hex, sender_pubkey_hex)
|
|
||||||
return decrypt_with_conversation_key(payload_b64, conversation_key)
|
|
||||||
|
|
@ -1,220 +0,0 @@
|
||||||
"""
|
|
||||||
Tests for the v1 cassette-config layer (aiolabs/satmachineadmin#29).
|
|
||||||
|
|
||||||
Covers the pure pieces that don't need a live DB:
|
|
||||||
- Pydantic validator behaviour on PublishCassettesPayload + the row /
|
|
||||||
upsert models (denomination key coercion, integer ranges, no-duplicate-
|
|
||||||
positions, wire-format round-trip)
|
|
||||||
- _should_apply_bootstrap_state dedup helper (extracted from
|
|
||||||
apply_bootstrap_state so the relay-re-delivery decision is testable
|
|
||||||
without a database round-trip)
|
|
||||||
|
|
||||||
DB-touching tests (apply_bootstrap_state actually upserting, list-by-
|
|
||||||
machine ordering, etc.) follow the project convention from
|
|
||||||
test_deposit_currency.py: "Layer 2 is an endpoint-level behaviour better
|
|
||||||
covered by an integration test against a running LNbits; tracked in #26
|
|
||||||
as a follow-up." Smoke-tested manually via the dev container.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from ..crud import _should_apply_bootstrap_state
|
|
||||||
from ..models import (
|
|
||||||
CassettePayloadRow,
|
|
||||||
PublishCassettesPayload,
|
|
||||||
UpsertCassetteConfigData,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# PublishCassettesPayload — wire-shape validators
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class TestPublishCassettesPayload:
|
|
||||||
"""The kind-30078 content payload, bidirectional (operator→ATM and
|
|
||||||
ATM→operator share the shape). String JSON keys must coerce to int;
|
|
||||||
duplicate positions must reject; per-row int constraints enforced."""
|
|
||||||
|
|
||||||
def test_happy_path_coerces_string_keys_to_int(self):
|
|
||||||
p = PublishCassettesPayload(
|
|
||||||
denominations={
|
|
||||||
"20": {"position": 1, "count": 49},
|
|
||||||
"50": {"position": 2, "count": 100},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
assert set(p.denominations.keys()) == {20, 50}
|
|
||||||
assert p.denominations[20].position == 1
|
|
||||||
assert p.denominations[20].count == 49
|
|
||||||
assert p.denominations[50].count == 100
|
|
||||||
|
|
||||||
def test_wire_dict_round_trip_restringifies_keys(self):
|
|
||||||
"""to_wire_dict() must restringify denomination keys so the
|
|
||||||
resulting JSON is parseable by clients (including the ATM-side
|
|
||||||
nostr-tools NIP-44 v2 consumer per the byte-compat cross-test)."""
|
|
||||||
original = PublishCassettesPayload(
|
|
||||||
denominations={
|
|
||||||
"20": {"position": 1, "count": 49},
|
|
||||||
"50": {"position": 2, "count": 100},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
wire = original.to_wire_dict()
|
|
||||||
assert wire == {
|
|
||||||
"denominations": {
|
|
||||||
"20": {"position": 1, "count": 49},
|
|
||||||
"50": {"position": 2, "count": 100},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
# And the wire form round-trips back through the parser cleanly.
|
|
||||||
reparsed = PublishCassettesPayload(**wire)
|
|
||||||
assert reparsed.denominations == original.denominations
|
|
||||||
|
|
||||||
def test_rejects_non_int_key(self):
|
|
||||||
with pytest.raises(ValueError) as exc:
|
|
||||||
PublishCassettesPayload(
|
|
||||||
denominations={"abc": {"position": 1, "count": 1}}
|
|
||||||
)
|
|
||||||
assert "is not an int" in str(exc.value)
|
|
||||||
|
|
||||||
def test_rejects_non_positive_denomination(self):
|
|
||||||
with pytest.raises(ValueError) as exc:
|
|
||||||
PublishCassettesPayload(
|
|
||||||
denominations={"0": {"position": 1, "count": 1}}
|
|
||||||
)
|
|
||||||
assert "denomination must be > 0" in str(exc.value)
|
|
||||||
|
|
||||||
def test_rejects_negative_denomination(self):
|
|
||||||
with pytest.raises(ValueError) as exc:
|
|
||||||
PublishCassettesPayload(
|
|
||||||
denominations={"-20": {"position": 1, "count": 1}}
|
|
||||||
)
|
|
||||||
assert "denomination must be > 0" in str(exc.value)
|
|
||||||
|
|
||||||
def test_rejects_duplicate_position(self):
|
|
||||||
"""Two cassettes can't occupy the same physical slot. The schema
|
|
||||||
PK is (machine_id, denomination), so duplicates land via the
|
|
||||||
payload; reject at the validator layer before the publish path
|
|
||||||
builds an event the ATM will misinterpret."""
|
|
||||||
with pytest.raises(ValueError) as exc:
|
|
||||||
PublishCassettesPayload(
|
|
||||||
denominations={
|
|
||||||
"20": {"position": 1, "count": 49},
|
|
||||||
"50": {"position": 1, "count": 100},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
assert "duplicate position" in str(exc.value)
|
|
||||||
|
|
||||||
def test_rejects_negative_count(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
PublishCassettesPayload(
|
|
||||||
denominations={"20": {"position": 1, "count": -1}}
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_rejects_zero_position(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
PublishCassettesPayload(
|
|
||||||
denominations={"20": {"position": 0, "count": 1}}
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_allows_zero_count(self):
|
|
||||||
"""An empty cassette is a legal state — operator must be able to
|
|
||||||
record `count=0` after a dispatcher pulled the cassette mid-day."""
|
|
||||||
p = PublishCassettesPayload(
|
|
||||||
denominations={"20": {"position": 1, "count": 0}}
|
|
||||||
)
|
|
||||||
assert p.denominations[20].count == 0
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# CassettePayloadRow — per-row int constraints (single-row tests)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class TestCassettePayloadRow:
|
|
||||||
def test_happy_path(self):
|
|
||||||
row = CassettePayloadRow(position=1, count=49)
|
|
||||||
assert row.position == 1
|
|
||||||
assert row.count == 49
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("bad_position", [0, -1, -100])
|
|
||||||
def test_rejects_non_positive_position(self, bad_position):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
CassettePayloadRow(position=bad_position, count=1)
|
|
||||||
|
|
||||||
def test_rejects_negative_count(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
CassettePayloadRow(position=1, count=-1)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# UpsertCassetteConfigData — operator-edit form
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class TestUpsertCassetteConfigData:
|
|
||||||
"""Operator-driven row edit. Both fields optional; same int constraints
|
|
||||||
as the wire-format row but applied independently per-edit."""
|
|
||||||
|
|
||||||
def test_partial_update_count_only(self):
|
|
||||||
d = UpsertCassetteConfigData(count=80)
|
|
||||||
assert d.count == 80
|
|
||||||
assert d.position is None
|
|
||||||
|
|
||||||
def test_partial_update_position_only(self):
|
|
||||||
d = UpsertCassetteConfigData(position=3)
|
|
||||||
assert d.position == 3
|
|
||||||
assert d.count is None
|
|
||||||
|
|
||||||
def test_empty_update_is_legal(self):
|
|
||||||
"""An empty UpsertCassetteConfigData parses fine; the CRUD short-
|
|
||||||
circuits a no-op on empty payload (no SQL emitted)."""
|
|
||||||
d = UpsertCassetteConfigData()
|
|
||||||
assert d.count is None
|
|
||||||
assert d.position is None
|
|
||||||
|
|
||||||
def test_rejects_negative_count(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
UpsertCassetteConfigData(count=-1)
|
|
||||||
|
|
||||||
def test_rejects_non_positive_position(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
UpsertCassetteConfigData(position=0)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# _should_apply_bootstrap_state — relay re-delivery dedup
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class TestShouldApplyBootstrapState:
|
|
||||||
"""Pure-function dedup gate extracted from apply_bootstrap_state so the
|
|
||||||
decision is testable without a DB. Logic: apply if-and-only-if the
|
|
||||||
existing row's state_event_id differs from the incoming event_id.
|
|
||||||
|
|
||||||
In v1 the ATM publishes the bootstrap event exactly once per machine,
|
|
||||||
so this is sufficient for replay protection. v2 will need a
|
|
||||||
`last_state_created_at` watermark in addition (per bitspire's
|
|
||||||
`meta.lastKnownConfigCreatedAt` on the ATM side) — flagged in #29's
|
|
||||||
v2 forward-look section.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def test_applies_when_no_existing_row(self):
|
|
||||||
assert _should_apply_bootstrap_state(None, "new-event-id") is True
|
|
||||||
|
|
||||||
def test_applies_when_existing_event_id_differs(self):
|
|
||||||
assert (
|
|
||||||
_should_apply_bootstrap_state("old-event-id", "new-event-id") is True
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_skips_when_existing_event_id_matches(self):
|
|
||||||
"""The same bootstrap event re-delivered after a relay reconnect
|
|
||||||
or satmachineadmin restart should no-op, not re-upsert the same
|
|
||||||
rows (which would clobber any operator edits since)."""
|
|
||||||
assert (
|
|
||||||
_should_apply_bootstrap_state("same-event", "same-event") is False
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_applies_when_existing_is_empty_string_and_incoming_is_id(self):
|
|
||||||
"""Defensive — a sentinel empty-string existing_state_event_id
|
|
||||||
shouldn't block a real incoming event from applying."""
|
|
||||||
assert _should_apply_bootstrap_state("", "real-event-id") is True
|
|
||||||
|
|
@ -1,272 +0,0 @@
|
||||||
"""
|
|
||||||
Tests for the hand-rolled NIP-44 v2 implementation in `nip44.py`.
|
|
||||||
|
|
||||||
Three layers of validation, ordered by trust:
|
|
||||||
1. Pinned reference vector from the canonical paulmillr/nip44 test suite —
|
|
||||||
the conversation_key for (sec=1, sec=2) is widely-published as
|
|
||||||
c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d. If
|
|
||||||
our get_conversation_key() ever drifts from that value, the impl is
|
|
||||||
broken at the key-derivation layer.
|
|
||||||
2. Round-trip + tamper detection — verifies the encrypt/decrypt loop
|
|
||||||
under random nonces, catches HMAC + version + padding tampering.
|
|
||||||
3. Cross-test (TBD) — bitspire will post one sample event encrypted on
|
|
||||||
their nostr-tools side to the coord log; test_decrypts_bitspire_sample
|
|
||||||
wires it as a fixture and asserts byte-compatibility with the
|
|
||||||
nostr-tools NIP-44 v2 impl. Placeholder stub until the sample lands.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import base64
|
|
||||||
|
|
||||||
import coincurve
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from ..nip44 import (
|
|
||||||
Nip44LengthError,
|
|
||||||
Nip44MacError,
|
|
||||||
Nip44VersionError,
|
|
||||||
_calc_padded_len,
|
|
||||||
decrypt_from,
|
|
||||||
decrypt_with_conversation_key,
|
|
||||||
encrypt_for,
|
|
||||||
encrypt_with_conversation_key,
|
|
||||||
get_conversation_key,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Helper: derive a compressed-x-coord pubkey hex string from a secret hex.
|
|
||||||
def _pub_hex(sec_hex: str) -> str:
|
|
||||||
return (
|
|
||||||
coincurve.PrivateKey(bytes.fromhex(sec_hex))
|
|
||||||
.public_key.format(compressed=True)[1:]
|
|
||||||
.hex()
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Canonical test keys widely used across NIP-44 reference vectors.
|
|
||||||
_SEC_ONE = "00" * 31 + "01" # integer 1
|
|
||||||
_SEC_TWO = "00" * 31 + "02" # integer 2
|
|
||||||
_PUB_ONE = _pub_hex(_SEC_ONE)
|
|
||||||
_PUB_TWO = _pub_hex(_SEC_TWO)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Layer 1 — pinned reference vector (paulmillr/nip44)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class TestConversationKeyReferenceVector:
|
|
||||||
"""Pinned reference vector from the canonical NIP-44 v2 test suite
|
|
||||||
(paulmillr/nip44). If get_conversation_key drifts from this value we
|
|
||||||
have a key-derivation regression — fail loudly."""
|
|
||||||
|
|
||||||
REFERENCE_CK_HEX = (
|
|
||||||
"c41c775356fd92eadc63ff5a0dc1da211b268cbea22316767095b2871ea1412d"
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_sec_one_pub_two(self):
|
|
||||||
ck = get_conversation_key(_SEC_ONE, _PUB_TWO)
|
|
||||||
assert ck.hex() == self.REFERENCE_CK_HEX
|
|
||||||
|
|
||||||
def test_sec_two_pub_one_is_symmetric(self):
|
|
||||||
"""Conversation key is symmetric: ck(privA, pubB) == ck(privB, pubA).
|
|
||||||
Both sides of a NIP-44 conversation derive the identical PRK; this
|
|
||||||
is what lets the recipient decrypt with their own privkey + the
|
|
||||||
sender's pubkey."""
|
|
||||||
ck_ab = get_conversation_key(_SEC_ONE, _PUB_TWO)
|
|
||||||
ck_ba = get_conversation_key(_SEC_TWO, _PUB_ONE)
|
|
||||||
assert ck_ab == ck_ba
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Layer 2 — round-trip + tamper detection
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class TestRoundTrip:
|
|
||||||
"""Encrypt then decrypt under the high-level pair-keyed API."""
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"plaintext",
|
|
||||||
[
|
|
||||||
"a", # 1 byte (minimum)
|
|
||||||
"hello, nip44 v2", # short
|
|
||||||
"x" * 32, # exactly the small-payload boundary
|
|
||||||
"x" * 33, # just over
|
|
||||||
"y" * 1000, # medium
|
|
||||||
"z" * 5000, # large
|
|
||||||
'{"denominations": {"20": {"position": 1, "count": 49}}}', # realistic
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_round_trip_various_lengths(self, plaintext):
|
|
||||||
payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
||||||
recovered = decrypt_from(payload, _SEC_TWO, _PUB_ONE)
|
|
||||||
assert recovered == plaintext
|
|
||||||
|
|
||||||
def test_payloads_are_unique_under_random_nonce(self):
|
|
||||||
"""Same plaintext + same key pair should produce different payloads
|
|
||||||
each time because the nonce is fresh CSPRNG bytes. Catches a
|
|
||||||
regression where the nonce is accidentally pinned."""
|
|
||||||
plaintext = "the same message"
|
|
||||||
p1 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
||||||
p2 = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
||||||
assert p1 != p2
|
|
||||||
assert decrypt_from(p1, _SEC_TWO, _PUB_ONE) == plaintext
|
|
||||||
assert decrypt_from(p2, _SEC_TWO, _PUB_ONE) == plaintext
|
|
||||||
|
|
||||||
def test_pinned_nonce_is_deterministic(self):
|
|
||||||
"""Same plaintext + same key pair + same nonce = byte-identical
|
|
||||||
payload. Regression-locks the chacha20 + hmac chain."""
|
|
||||||
ck = get_conversation_key(_SEC_ONE, _PUB_TWO)
|
|
||||||
nonce = bytes(32) # 32 zero bytes
|
|
||||||
p1 = encrypt_with_conversation_key("a", ck, nonce=nonce)
|
|
||||||
p2 = encrypt_with_conversation_key("a", ck, nonce=nonce)
|
|
||||||
assert p1 == p2
|
|
||||||
assert decrypt_with_conversation_key(p1, ck) == "a"
|
|
||||||
|
|
||||||
|
|
||||||
class TestTamperDetection:
|
|
||||||
"""HMAC-SHA256 verification catches tampered envelopes. The cryptographic
|
|
||||||
construction depends on this — if HMAC verification ever no-ops, a
|
|
||||||
relay-MITM could forge ATM state events."""
|
|
||||||
|
|
||||||
def _payload(self) -> str:
|
|
||||||
return encrypt_for("important message", _SEC_ONE, _PUB_TWO)
|
|
||||||
|
|
||||||
def test_flipped_mac_byte_rejected(self):
|
|
||||||
raw = bytearray(base64.b64decode(self._payload()))
|
|
||||||
raw[-1] ^= 0x01
|
|
||||||
tampered = base64.b64encode(bytes(raw)).decode("ascii")
|
|
||||||
with pytest.raises(Nip44MacError):
|
|
||||||
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
|
|
||||||
|
|
||||||
def test_flipped_ciphertext_byte_rejected(self):
|
|
||||||
raw = bytearray(base64.b64decode(self._payload()))
|
|
||||||
# Flip a byte in the middle of the ciphertext segment
|
|
||||||
# (version[1] + nonce[32..32] + ciphertext[33..-32] + mac[-32..])
|
|
||||||
ct_start = 1 + 32
|
|
||||||
raw[ct_start + 5] ^= 0x01
|
|
||||||
tampered = base64.b64encode(bytes(raw)).decode("ascii")
|
|
||||||
with pytest.raises(Nip44MacError):
|
|
||||||
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
|
|
||||||
|
|
||||||
def test_flipped_nonce_byte_rejected(self):
|
|
||||||
raw = bytearray(base64.b64decode(self._payload()))
|
|
||||||
# Nonce starts at byte 1 (after version)
|
|
||||||
raw[1] ^= 0x01
|
|
||||||
tampered = base64.b64encode(bytes(raw)).decode("ascii")
|
|
||||||
with pytest.raises(Nip44MacError):
|
|
||||||
decrypt_from(tampered, _SEC_TWO, _PUB_ONE)
|
|
||||||
|
|
||||||
def test_wrong_recipient_privkey_rejected(self):
|
|
||||||
"""The MAC is derived from the conversation key, so a wrong
|
|
||||||
recipient privkey produces a different conversation key →
|
|
||||||
different hmac_key → MAC verification fails. (Doesn't decrypt
|
|
||||||
to garbage; fails fast.)"""
|
|
||||||
sec_three = "00" * 31 + "03"
|
|
||||||
with pytest.raises(Nip44MacError):
|
|
||||||
decrypt_from(self._payload(), sec_three, _PUB_ONE)
|
|
||||||
|
|
||||||
|
|
||||||
class TestVersionRejection:
|
|
||||||
def test_v1_byte_rejected(self):
|
|
||||||
raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO)))
|
|
||||||
raw[0] = 0x01
|
|
||||||
bad = base64.b64encode(bytes(raw)).decode("ascii")
|
|
||||||
with pytest.raises(Nip44VersionError):
|
|
||||||
decrypt_from(bad, _SEC_TWO, _PUB_ONE)
|
|
||||||
|
|
||||||
def test_unknown_version_byte_rejected(self):
|
|
||||||
raw = bytearray(base64.b64decode(encrypt_for("x", _SEC_ONE, _PUB_TWO)))
|
|
||||||
raw[0] = 0xFF
|
|
||||||
bad = base64.b64encode(bytes(raw)).decode("ascii")
|
|
||||||
with pytest.raises(Nip44VersionError):
|
|
||||||
decrypt_from(bad, _SEC_TWO, _PUB_ONE)
|
|
||||||
|
|
||||||
|
|
||||||
class TestLengthGuards:
|
|
||||||
def test_empty_plaintext_rejected(self):
|
|
||||||
with pytest.raises(Nip44LengthError):
|
|
||||||
encrypt_for("", _SEC_ONE, _PUB_TWO)
|
|
||||||
|
|
||||||
def test_plaintext_at_max_length_accepted(self):
|
|
||||||
plaintext = "x" * 65535
|
|
||||||
payload = encrypt_for(plaintext, _SEC_ONE, _PUB_TWO)
|
|
||||||
assert decrypt_from(payload, _SEC_TWO, _PUB_ONE) == plaintext
|
|
||||||
|
|
||||||
def test_plaintext_over_max_rejected(self):
|
|
||||||
with pytest.raises(Nip44LengthError):
|
|
||||||
encrypt_for("x" * 65536, _SEC_ONE, _PUB_TWO)
|
|
||||||
|
|
||||||
def test_invalid_base64_payload_rejected(self):
|
|
||||||
with pytest.raises(Nip44LengthError):
|
|
||||||
decrypt_from("not!!!base64@@@", _SEC_TWO, _PUB_ONE)
|
|
||||||
|
|
||||||
def test_payload_too_short_rejected(self):
|
|
||||||
# 50 bytes is well under the 99-byte minimum
|
|
||||||
too_short = base64.b64encode(b"\x02" + b"\x00" * 49).decode("ascii")
|
|
||||||
with pytest.raises(Nip44LengthError):
|
|
||||||
decrypt_from(too_short, _SEC_TWO, _PUB_ONE)
|
|
||||||
|
|
||||||
|
|
||||||
class TestPaddingFormula:
|
|
||||||
"""Spot-check the _calc_padded_len formula against hand-computed cases.
|
|
||||||
Locks in the NIP-44 v2 padding scheme so a refactor can't silently
|
|
||||||
break wire compatibility (which would only surface as cross-impl
|
|
||||||
decryption failures — exactly what test_decrypts_bitspire_sample is
|
|
||||||
meant to catch end-to-end, but a unit test here is cheaper)."""
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"plaintext_len,expected_padded",
|
|
||||||
[
|
|
||||||
(1, 32), # <= 32 → 32
|
|
||||||
(16, 32),
|
|
||||||
(32, 32),
|
|
||||||
(33, 64), # > 32 → next chunk
|
|
||||||
(64, 64),
|
|
||||||
(65, 96), # chunk = 32 for L=65 (next_power(64) = 64; 64//8 = 8; max(32, 8) = 32)
|
|
||||||
(100, 128),
|
|
||||||
(128, 128),
|
|
||||||
# L=129: next_power(128) = 1<<8 = 256; chunk = max(32, 256//8) = 32;
|
|
||||||
# padded = 32 * (128//32 + 1) = 32 * 5 = 160.
|
|
||||||
(129, 160),
|
|
||||||
(256, 256), # chunk = 32 for L=256 (next_power(255)=256; max(32, 32) = 32)
|
|
||||||
(257, 320),
|
|
||||||
(1000, 1024), # chunk = 128 for L=1000 (next_power(999)=1024; max(32, 128) = 128)
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_calc_padded_len(self, plaintext_len, expected_padded):
|
|
||||||
assert _calc_padded_len(plaintext_len) == expected_padded
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Layer 3 — byte-compat cross-test against nostr-tools (bitspire's impl)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(
|
|
||||||
reason=(
|
|
||||||
"Waiting on bitspire to post one sample encrypted event to "
|
|
||||||
"~/dev/coordination/log.md per the 2026-05-30T15:55Z entry. Once "
|
|
||||||
"posted, hardcode the (event_id, content, recipient_privkey, "
|
|
||||||
"expected_plaintext) fixture here and remove the skip — this test "
|
|
||||||
"is the byte-compat cross-test between our hand-rolled NIP-44 v2 "
|
|
||||||
"and the nostr-tools impl the ATM uses."
|
|
||||||
)
|
|
||||||
)
|
|
||||||
def test_decrypts_bitspire_sample_event_from_coord_log():
|
|
||||||
"""Cross-impl byte-compatibility test. Bitspire generates one event on
|
|
||||||
their side (nostr-tools NIP-44 v2 impl), posts the raw event JSON +
|
|
||||||
a known throwaway recipient privkey to the coord log, and we assert
|
|
||||||
our `decrypt_from` recovers the expected `{"denominations": {...}}`
|
|
||||||
plaintext.
|
|
||||||
|
|
||||||
If this passes, both impls produce byte-identical wire format. If it
|
|
||||||
fails, the spec ambiguity surfaces before either side ships — exactly
|
|
||||||
what bitspire flagged in the plan review (`07:55Z`).
|
|
||||||
"""
|
|
||||||
# event_b64_content = "..." # paste from coord log
|
|
||||||
# sender_pubkey_hex = "..."
|
|
||||||
# recipient_privkey_hex = "..."
|
|
||||||
# expected_plaintext = '{"denominations": {"20": {"position": 1, "count": 49}}}'
|
|
||||||
# recovered = decrypt_from(event_b64_content, recipient_privkey_hex, sender_pubkey_hex)
|
|
||||||
# assert recovered == expected_plaintext
|
|
||||||
raise NotImplementedError("fixture pending — see skip reason")
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue