Some checks failed
ci.yml / docs(pairing): TTL + token-revoke now enforced post-bind (nsecbunkerd#27) (pull_request) Failing after 0s
nsecbunkerd#27 (deployed 2026-06-19) reverses the #24 finding: the sign-time ACL now evaluates token lifecycle live on every request (checkIfPubkeyAllowed step 4 joins through a liveWhere filter; applyToken stopped photocopying grants into SigningConditions). So: - duration_hours / token expiresAt now bounds an ESTABLISHED binding — an expired token stops signing post-bind, not just at connect. The prior docstring (connect-window-only, pointing at the now-closed nsecbunkerd#24) is corrected. - Token-revoke is no longer a post-redeem no-op (closes the #22 mechanism bunker-side). revoke_spire keeps using revoke_key_user because that's the subject-level ban cutting the whole binding, not just one token's grant — rationale updated, behavior unchanged. Doc/comment only; 20 pairing tests green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
320 lines
11 KiB
Python
320 lines
11 KiB
Python
"""Unit tests for the seed-URL pairing service (S0 / #9, model A1).
|
|
|
|
The bunker admin client is faked — these exercise the orchestration
|
|
(create_new_key -> ensure-policy -> create_new_token -> get_key_tokens),
|
|
the policy reconciliation, and the seed-URL / bunker:// wire shape, with
|
|
no live nsecbunkerd. npub<->hex round-trips through lnbits' real helpers
|
|
so the parsing is exercised for real.
|
|
|
|
Async is driven via asyncio.run (this venv has no pytest-asyncio), matching
|
|
the rest of the suite.
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
from lnbits.core.services.nsec_bunker import NsecBunkerError
|
|
from lnbits.utils.nostr import hex_to_npub
|
|
|
|
from ..models import Machine
|
|
from ..pairing import (
|
|
SEED_URL_SCHEME,
|
|
SPIRE_POLICY_METHODS_NO_KIND,
|
|
SPIRE_POLICY_NAME,
|
|
SPIRE_POLICY_RULES,
|
|
PairingError,
|
|
build_seed_url,
|
|
pair_spire,
|
|
revoke_spire,
|
|
spire_key_name,
|
|
)
|
|
|
|
_NOW = datetime(2026, 6, 16, tzinfo=timezone.utc)
|
|
_SPIRE_HEX = "522a4538f1df96508d9ee8b14072344dd4a566acfe03c25a92a39179c6fca891"
|
|
_SPIRE_NPUB = hex_to_npub(_SPIRE_HEX)
|
|
_RELAYS = ["wss://lnbits.demo.aiolabs.dev/nostrrelay/demo"]
|
|
_BUNKER_RELAY = "wss://bunker.internal/relay"
|
|
_PASSPHRASE = "keystore-pass" # pragma: allowlist secret
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _clear_policy_cache():
|
|
# lnbits' ensure_policy caches resolved policy ids on
|
|
# (admin_pubkey, name); clear between tests so each FakeBunker's
|
|
# canned policy state is honoured rather than a stale cached id.
|
|
from lnbits.core.signers import remote_bunker
|
|
|
|
remote_bunker._POLICY_ID_CACHE.clear()
|
|
yield
|
|
|
|
|
|
def _machine(mid: str = "m1") -> Machine:
|
|
return Machine(
|
|
id=mid,
|
|
operator_user_id="op1",
|
|
machine_npub="placeholder",
|
|
wallet_id="w1",
|
|
name="sintra",
|
|
location=None,
|
|
fiat_code="EUR",
|
|
is_active=True,
|
|
created_at=_NOW,
|
|
updated_at=_NOW,
|
|
)
|
|
|
|
|
|
class FakeBunker:
|
|
"""Records calls; returns canned bunker responses."""
|
|
|
|
admin_pubkey = "fake-admin-pubkey"
|
|
|
|
# pragma: allowlist secret
|
|
def __init__(self, *, policies=None, token_secret="s3cr3t", revoke_count=1):
|
|
self._policies = policies or []
|
|
self._token_secret = token_secret
|
|
self._revoke_count = revoke_count
|
|
self.calls: list[tuple] = []
|
|
self._next_policy_id = 7
|
|
|
|
async def create_new_key(self, name, passphrase):
|
|
self.calls.append(("create_new_key", name, passphrase))
|
|
return _SPIRE_NPUB
|
|
|
|
async def get_policies(self):
|
|
self.calls.append(("get_policies",))
|
|
return list(self._policies)
|
|
|
|
async def create_new_policy(self, name, rules):
|
|
self.calls.append(("create_new_policy", name, rules))
|
|
pid = self._next_policy_id
|
|
self._policies.append({"id": pid, "name": name, "rules": list(rules)})
|
|
return pid
|
|
|
|
async def add_policy_rule(self, policy_id, rule):
|
|
self.calls.append(("add_policy_rule", policy_id, rule))
|
|
|
|
async def create_new_token(
|
|
self, key_name, client_name, policy_id, duration_hours=None
|
|
):
|
|
self.calls.append(
|
|
("create_new_token", key_name, client_name, policy_id, duration_hours)
|
|
)
|
|
|
|
async def revoke_key_user(self, key_name):
|
|
self.calls.append(("revoke_key_user", key_name))
|
|
return self._revoke_count
|
|
|
|
async def get_key_tokens(self, key_name):
|
|
self.calls.append(("get_key_tokens", key_name))
|
|
return [
|
|
{
|
|
"clientName": f"spire-client-{key_name.split('-', 1)[1]}",
|
|
"token": f"{_SPIRE_NPUB}#{self._token_secret}",
|
|
}
|
|
]
|
|
|
|
def named(self, name):
|
|
return [c for c in self.calls if c[0] == name]
|
|
|
|
|
|
def _pair(bunker, machine=None):
|
|
return asyncio.run(
|
|
pair_spire(
|
|
machine or _machine(),
|
|
relays=_RELAYS,
|
|
admin_client=bunker,
|
|
bunker_relay=_BUNKER_RELAY,
|
|
keystore_passphrase=_PASSPHRASE,
|
|
)
|
|
)
|
|
|
|
|
|
def test_pair_happy_path_mints_key_policy_token():
|
|
bunker = FakeBunker(token_secret="abc123") # pragma: allowlist secret
|
|
result = _pair(bunker)
|
|
|
|
assert ("create_new_key", "spire-m1", _PASSPHRASE) in bunker.calls
|
|
assert result.bunker_key_name == spire_key_name("m1") == "spire-m1"
|
|
|
|
assert result.spire_npub == _SPIRE_NPUB
|
|
assert result.spire_pubkey_hex == _SPIRE_HEX
|
|
|
|
created = bunker.named("create_new_policy")
|
|
assert created and created[0][1] == SPIRE_POLICY_NAME
|
|
token_call = bunker.named("create_new_token")[0]
|
|
assert token_call[1] == "spire-m1" # key_name
|
|
assert token_call[2] == "spire-client-m1" # client_name
|
|
assert token_call[3] == 7 # policy_id from the fake's create_new_policy
|
|
|
|
|
|
def test_bunker_url_carries_pubkey_relay_secret():
|
|
result = _pair(FakeBunker(token_secret="topsecret")) # pragma: allowlist secret
|
|
assert result.bunker_url.startswith(f"bunker://{_SPIRE_HEX}?")
|
|
assert "relay=wss%3A%2F%2Fbunker.internal%2Frelay" in result.bunker_url
|
|
assert "secret=topsecret" in result.bunker_url
|
|
|
|
|
|
def test_seed_url_decodes_to_contract():
|
|
result = _pair(FakeBunker(token_secret="zzz")) # pragma: allowlist secret
|
|
assert result.seed_url.startswith(SEED_URL_SCHEME)
|
|
blob = result.seed_url[len(SEED_URL_SCHEME) :]
|
|
payload = json.loads(base64.urlsafe_b64decode(blob + "=" * (-len(blob) % 4)))
|
|
assert payload == {
|
|
"v": 1,
|
|
"spire_npub": _SPIRE_NPUB,
|
|
"spire_pubkey": _SPIRE_HEX,
|
|
"bunker_url": result.bunker_url,
|
|
"relays": _RELAYS,
|
|
}
|
|
|
|
|
|
def test_fresh_policy_adds_kindless_nip44_rules():
|
|
bunker = FakeBunker() # no existing policies
|
|
_pair(bunker)
|
|
added = [c[2]["method"] for c in bunker.named("add_policy_rule")]
|
|
# kind-scoped rules went in via create_new_policy; only the kind-less
|
|
# nip44 methods are reconciled in via add_policy_rule.
|
|
assert set(added) == set(SPIRE_POLICY_METHODS_NO_KIND)
|
|
|
|
|
|
def test_existing_policy_reused_not_recreated():
|
|
existing = [
|
|
{
|
|
"id": 42,
|
|
"name": SPIRE_POLICY_NAME,
|
|
"rules": [dict(r) for r in SPIRE_POLICY_RULES],
|
|
}
|
|
]
|
|
bunker = FakeBunker(policies=existing)
|
|
_pair(bunker)
|
|
assert not bunker.named("create_new_policy") # reused, not recreated
|
|
assert bunker.named("create_new_token")[0][3] == 42 # used existing id
|
|
added = [c[2]["method"] for c in bunker.named("add_policy_rule")]
|
|
assert set(added) == set(SPIRE_POLICY_METHODS_NO_KIND)
|
|
|
|
|
|
def test_fully_provisioned_policy_adds_nothing():
|
|
rules = [dict(r) for r in SPIRE_POLICY_RULES] + [
|
|
{"method": m, "kind": None} for m in SPIRE_POLICY_METHODS_NO_KIND
|
|
]
|
|
bunker = FakeBunker(policies=[{"id": 9, "name": SPIRE_POLICY_NAME, "rules": rules}])
|
|
_pair(bunker)
|
|
assert not bunker.named("add_policy_rule")
|
|
assert not bunker.named("create_new_policy")
|
|
|
|
|
|
def test_malformed_token_raises():
|
|
bunker = FakeBunker()
|
|
|
|
async def _bad_tokens(key_name):
|
|
_ = key_name
|
|
return [{"token": "no-hash-here"}]
|
|
|
|
bunker.get_key_tokens = _bad_tokens
|
|
with pytest.raises(PairingError, match="malformed token"):
|
|
_pair(bunker)
|
|
|
|
|
|
def test_missing_relay_or_passphrase_raises():
|
|
with pytest.raises(PairingError, match="LNBITS_NSEC_BUNKER_URL"):
|
|
asyncio.run(
|
|
pair_spire(
|
|
_machine(),
|
|
relays=_RELAYS,
|
|
admin_client=FakeBunker(),
|
|
bunker_relay="",
|
|
keystore_passphrase=_PASSPHRASE,
|
|
)
|
|
)
|
|
with pytest.raises(PairingError, match="PASSPHRASE"):
|
|
asyncio.run(
|
|
pair_spire(
|
|
_machine(),
|
|
relays=_RELAYS,
|
|
admin_client=FakeBunker(),
|
|
bunker_relay=_BUNKER_RELAY,
|
|
keystore_passphrase="",
|
|
)
|
|
)
|
|
with pytest.raises(PairingError, match="relay is required"):
|
|
asyncio.run(
|
|
pair_spire(
|
|
_machine(),
|
|
relays=[],
|
|
admin_client=FakeBunker(),
|
|
bunker_relay=_BUNKER_RELAY,
|
|
keystore_passphrase=_PASSPHRASE,
|
|
)
|
|
)
|
|
|
|
|
|
def test_build_seed_url_roundtrip():
|
|
url = build_seed_url(
|
|
spire_npub=_SPIRE_NPUB,
|
|
spire_pubkey_hex=_SPIRE_HEX,
|
|
bunker_url="bunker://x?relay=r&secret=s",
|
|
relays=_RELAYS,
|
|
)
|
|
blob = url[len(SEED_URL_SCHEME) :]
|
|
payload = json.loads(base64.urlsafe_b64decode(blob + "=" * (-len(blob) % 4)))
|
|
assert payload["spire_pubkey"] == _SPIRE_HEX
|
|
assert payload["relays"] == _RELAYS
|
|
|
|
|
|
def test_pair_threads_duration_hours():
|
|
bunker = FakeBunker()
|
|
asyncio.run(
|
|
pair_spire(
|
|
_machine(),
|
|
relays=_RELAYS,
|
|
admin_client=bunker,
|
|
bunker_relay=_BUNKER_RELAY,
|
|
keystore_passphrase=_PASSPHRASE,
|
|
duration_hours=720,
|
|
)
|
|
)
|
|
# create_new_token tuple is (name, key, client, policy_id, duration_hours)
|
|
assert bunker.named("create_new_token")[0][4] == 720
|
|
|
|
|
|
def test_pair_default_duration_is_none():
|
|
bunker = FakeBunker()
|
|
_pair(bunker) # no duration_hours
|
|
assert bunker.named("create_new_token")[0][4] is None
|
|
|
|
|
|
def test_revoke_spire_calls_revoke_key_user():
|
|
# revoke goes through revoke_key_user (KeyUser.revokedAt) — the subject-
|
|
# level ban that cuts the whole binding, not just one token's grant.
|
|
# (Token-revoke also works post-bind since nsecbunkerd#27, but only
|
|
# severs a single token; revoke_key_user is the full-deauth call.)
|
|
bunker = FakeBunker(revoke_count=2)
|
|
count = asyncio.run(revoke_spire(_machine(), admin_client=bunker))
|
|
assert count == 2
|
|
assert bunker.named("revoke_key_user") == [("revoke_key_user", "spire-m1")]
|
|
assert not bunker.named("revoke_token") # never token-revoke
|
|
|
|
|
|
def test_revoke_spire_maps_bunker_error():
|
|
bunker = FakeBunker()
|
|
|
|
async def _boom(key_name):
|
|
raise NsecBunkerError("nope")
|
|
|
|
bunker.revoke_key_user = _boom
|
|
with pytest.raises(PairingError, match="revoke"):
|
|
asyncio.run(revoke_spire(_machine(), admin_client=bunker))
|
|
|
|
|
|
def test_policy_authorizes_required_signing_kinds():
|
|
# Kinds the spire signs as its OWN identity, confirmed against the
|
|
# consumer signing sites in bitspire#52 (2026-06-18). A missing kind is a
|
|
# silent bunker reject. 22242 = NIP-42 relay AUTH (must be bunker-signed —
|
|
# it proves control of spire_pubkey). nip04 stays out (v1 path is dead).
|
|
kinds = {r["kind"] for r in SPIRE_POLICY_RULES if r["method"] == "sign_event"}
|
|
assert {21000, 30078, 22242} <= kinds
|
|
assert "nip04_encrypt" not in SPIRE_POLICY_METHODS_NO_KIND
|
|
assert "nip04_decrypt" not in SPIRE_POLICY_METHODS_NO_KIND
|