spirekeeper/tests/test_pairing.py
Padreug b55fc8bc1c
Some checks failed
ci.yml / fix(pairing): default bunker_relay to the spire's public event relay, not localhost (pull_request) Failing after 0s
fix(pairing): default bunker_relay to the spire's public event relay, not localhost
The seed minted via the Pair UI baked an unreachable bunker relay into
bunker_url. The UI form has no bunker_relay field, so pair_spire fell back to
its default `settings.lnbits_nsec_bunker_url` — which on a deployed instance is
the INTERNAL relay lnbits uses to reach the co-located bunker (e.g.
ws://127.0.0.1:5000/nostrrelay/demo). The remote ATM can't reach localhost, so
connectNewSeed hangs -> BunkerTimeoutError "Signer Unreachable". (Flagged by
bitspire on the demo; the localhost-relay /pair gotcha the coord thread called out.)

Default bunker_relay to the spire's own public event relay (relays[0]) instead:
the bunker lives on the same operator nostrrelay the spire publishes its events
to, so that URL is machine-reachable. An explicit `bunker_relay` still overrides
for split-relay deploys. An empty override now falls back to the same default
rather than raising.

Regression test: with no (or empty) bunker_relay, bunker_url embeds relays[0]
and contains no 127.0.0.1.

NOTE: relays[0] is a pragmatic default; whether the seed should carry multiple
relays / be sourced from the operator's nostrclient relay is a follow-up.
2026-06-22 17:18:24 +02:00

332 lines
11 KiB
Python

"""Unit tests for the seed-URL pairing service (S0 / #9, model A1).
The bunker admin client is faked — these exercise the orchestration
(create_new_key -> ensure-policy -> create_new_token -> get_key_tokens),
the policy reconciliation, and the seed-URL / bunker:// wire shape, with
no live nsecbunkerd. npub<->hex round-trips through lnbits' real helpers
so the parsing is exercised for real.
Async is driven via asyncio.run (this venv has no pytest-asyncio), matching
the rest of the suite.
"""
import asyncio
import base64
import json
from datetime import datetime, timezone
import pytest
from lnbits.core.services.nsec_bunker import NsecBunkerError
from lnbits.utils.nostr import hex_to_npub
from ..models import Machine
from ..pairing import (
SEED_URL_SCHEME,
SPIRE_POLICY_METHODS_NO_KIND,
SPIRE_POLICY_NAME,
SPIRE_POLICY_RULES,
PairingError,
build_seed_url,
pair_spire,
revoke_spire,
spire_key_name,
)
_NOW = datetime(2026, 6, 16, tzinfo=timezone.utc)
_SPIRE_HEX = "522a4538f1df96508d9ee8b14072344dd4a566acfe03c25a92a39179c6fca891"
_SPIRE_NPUB = hex_to_npub(_SPIRE_HEX)
_RELAYS = ["wss://lnbits.demo.aiolabs.dev/nostrrelay/demo"]
_BUNKER_RELAY = "wss://bunker.internal/relay"
_PASSPHRASE = "keystore-pass" # pragma: allowlist secret
@pytest.fixture(autouse=True)
def _clear_policy_cache():
# lnbits' ensure_policy caches resolved policy ids on
# (admin_pubkey, name); clear between tests so each FakeBunker's
# canned policy state is honoured rather than a stale cached id.
from lnbits.core.signers import remote_bunker
remote_bunker._POLICY_ID_CACHE.clear()
yield
def _machine(mid: str = "m1") -> Machine:
return Machine(
id=mid,
operator_user_id="op1",
machine_npub="placeholder",
wallet_id="w1",
name="sintra",
location=None,
fiat_code="EUR",
is_active=True,
created_at=_NOW,
updated_at=_NOW,
)
class FakeBunker:
"""Records calls; returns canned bunker responses."""
admin_pubkey = "fake-admin-pubkey"
# pragma: allowlist secret
def __init__(self, *, policies=None, token_secret="s3cr3t", revoke_count=1):
self._policies = policies or []
self._token_secret = token_secret
self._revoke_count = revoke_count
self.calls: list[tuple] = []
self._next_policy_id = 7
async def create_new_key(self, name, passphrase):
self.calls.append(("create_new_key", name, passphrase))
return _SPIRE_NPUB
async def get_policies(self):
self.calls.append(("get_policies",))
return list(self._policies)
async def create_new_policy(self, name, rules):
self.calls.append(("create_new_policy", name, rules))
pid = self._next_policy_id
self._policies.append({"id": pid, "name": name, "rules": list(rules)})
return pid
async def add_policy_rule(self, policy_id, rule):
self.calls.append(("add_policy_rule", policy_id, rule))
async def create_new_token(
self, key_name, client_name, policy_id, duration_hours=None
):
self.calls.append(
("create_new_token", key_name, client_name, policy_id, duration_hours)
)
async def revoke_key_user(self, key_name):
self.calls.append(("revoke_key_user", key_name))
return self._revoke_count
async def get_key_tokens(self, key_name):
self.calls.append(("get_key_tokens", key_name))
return [
{
"clientName": f"spire-client-{key_name.split('-', 1)[1]}",
"token": f"{_SPIRE_NPUB}#{self._token_secret}",
}
]
def named(self, name):
return [c for c in self.calls if c[0] == name]
def _pair(bunker, machine=None):
return asyncio.run(
pair_spire(
machine or _machine(),
relays=_RELAYS,
admin_client=bunker,
bunker_relay=_BUNKER_RELAY,
keystore_passphrase=_PASSPHRASE,
)
)
def test_pair_happy_path_mints_key_policy_token():
bunker = FakeBunker(token_secret="abc123") # pragma: allowlist secret
result = _pair(bunker)
assert ("create_new_key", "spire-m1", _PASSPHRASE) in bunker.calls
assert result.bunker_key_name == spire_key_name("m1") == "spire-m1"
assert result.spire_npub == _SPIRE_NPUB
assert result.spire_pubkey_hex == _SPIRE_HEX
created = bunker.named("create_new_policy")
assert created and created[0][1] == SPIRE_POLICY_NAME
token_call = bunker.named("create_new_token")[0]
assert token_call[1] == "spire-m1" # key_name
assert token_call[2] == "spire-client-m1" # client_name
assert token_call[3] == 7 # policy_id from the fake's create_new_policy
def test_bunker_url_carries_pubkey_relay_secret():
result = _pair(FakeBunker(token_secret="topsecret")) # pragma: allowlist secret
assert result.bunker_url.startswith(f"bunker://{_SPIRE_HEX}?")
assert "relay=wss%3A%2F%2Fbunker.internal%2Frelay" in result.bunker_url
assert "secret=topsecret" in result.bunker_url
def test_seed_url_decodes_to_contract():
result = _pair(FakeBunker(token_secret="zzz")) # pragma: allowlist secret
assert result.seed_url.startswith(SEED_URL_SCHEME)
blob = result.seed_url[len(SEED_URL_SCHEME) :]
payload = json.loads(base64.urlsafe_b64decode(blob + "=" * (-len(blob) % 4)))
assert payload == {
"v": 1,
"spire_npub": _SPIRE_NPUB,
"spire_pubkey": _SPIRE_HEX,
"bunker_url": result.bunker_url,
"relays": _RELAYS,
}
def test_fresh_policy_adds_kindless_nip44_rules():
bunker = FakeBunker() # no existing policies
_pair(bunker)
added = [c[2]["method"] for c in bunker.named("add_policy_rule")]
# kind-scoped rules went in via create_new_policy; only the kind-less
# nip44 methods are reconciled in via add_policy_rule.
assert set(added) == set(SPIRE_POLICY_METHODS_NO_KIND)
def test_existing_policy_reused_not_recreated():
existing = [
{
"id": 42,
"name": SPIRE_POLICY_NAME,
"rules": [dict(r) for r in SPIRE_POLICY_RULES],
}
]
bunker = FakeBunker(policies=existing)
_pair(bunker)
assert not bunker.named("create_new_policy") # reused, not recreated
assert bunker.named("create_new_token")[0][3] == 42 # used existing id
added = [c[2]["method"] for c in bunker.named("add_policy_rule")]
assert set(added) == set(SPIRE_POLICY_METHODS_NO_KIND)
def test_fully_provisioned_policy_adds_nothing():
rules = [dict(r) for r in SPIRE_POLICY_RULES] + [
{"method": m, "kind": None} for m in SPIRE_POLICY_METHODS_NO_KIND
]
bunker = FakeBunker(policies=[{"id": 9, "name": SPIRE_POLICY_NAME, "rules": rules}])
_pair(bunker)
assert not bunker.named("add_policy_rule")
assert not bunker.named("create_new_policy")
def test_malformed_token_raises():
bunker = FakeBunker()
async def _bad_tokens(key_name):
_ = key_name
return [{"token": "no-hash-here"}]
bunker.get_key_tokens = _bad_tokens
with pytest.raises(PairingError, match="malformed token"):
_pair(bunker)
def test_bunker_relay_defaults_to_spire_event_relay():
"""No explicit bunker_relay -> the relay baked into bunker_url is the spire's
own public event relay (relays[0]), NOT lnbits's internal bunker URL. This
is the localhost-relay /pair gotcha: a UI-minted seed (the form has no
bunker_relay field) must embed a machine-reachable relay, not ws://127.0.0.1.
An empty bunker_relay falls back to the same default."""
from urllib.parse import quote
for empty in (None, ""):
result = asyncio.run(
pair_spire(
_machine(),
relays=_RELAYS,
admin_client=FakeBunker(token_secret="s"), # pragma: allowlist secret
bunker_relay=empty,
keystore_passphrase=_PASSPHRASE,
)
)
assert f"relay={quote(_RELAYS[0], safe='')}" in result.bunker_url
assert "127.0.0.1" not in result.bunker_url
def test_missing_relay_or_passphrase_raises():
with pytest.raises(PairingError, match="PASSPHRASE"):
asyncio.run(
pair_spire(
_machine(),
relays=_RELAYS,
admin_client=FakeBunker(),
bunker_relay=_BUNKER_RELAY,
keystore_passphrase="",
)
)
with pytest.raises(PairingError, match="relay is required"):
asyncio.run(
pair_spire(
_machine(),
relays=[],
admin_client=FakeBunker(),
bunker_relay=_BUNKER_RELAY,
keystore_passphrase=_PASSPHRASE,
)
)
def test_build_seed_url_roundtrip():
url = build_seed_url(
spire_npub=_SPIRE_NPUB,
spire_pubkey_hex=_SPIRE_HEX,
bunker_url="bunker://x?relay=r&secret=s",
relays=_RELAYS,
)
blob = url[len(SEED_URL_SCHEME) :]
payload = json.loads(base64.urlsafe_b64decode(blob + "=" * (-len(blob) % 4)))
assert payload["spire_pubkey"] == _SPIRE_HEX
assert payload["relays"] == _RELAYS
def test_pair_threads_duration_hours():
bunker = FakeBunker()
asyncio.run(
pair_spire(
_machine(),
relays=_RELAYS,
admin_client=bunker,
bunker_relay=_BUNKER_RELAY,
keystore_passphrase=_PASSPHRASE,
duration_hours=720,
)
)
# create_new_token tuple is (name, key, client, policy_id, duration_hours)
assert bunker.named("create_new_token")[0][4] == 720
def test_pair_default_duration_is_none():
bunker = FakeBunker()
_pair(bunker) # no duration_hours
assert bunker.named("create_new_token")[0][4] is None
def test_revoke_spire_calls_revoke_key_user():
# revoke goes through revoke_key_user (KeyUser.revokedAt) — the subject-
# level ban that cuts the whole binding, not just one token's grant.
# (Token-revoke also works post-bind since nsecbunkerd#27, but only
# severs a single token; revoke_key_user is the full-deauth call.)
bunker = FakeBunker(revoke_count=2)
count = asyncio.run(revoke_spire(_machine(), admin_client=bunker))
assert count == 2
assert bunker.named("revoke_key_user") == [("revoke_key_user", "spire-m1")]
assert not bunker.named("revoke_token") # never token-revoke
def test_revoke_spire_maps_bunker_error():
bunker = FakeBunker()
async def _boom(key_name):
raise NsecBunkerError("nope")
bunker.revoke_key_user = _boom
with pytest.raises(PairingError, match="revoke"):
asyncio.run(revoke_spire(_machine(), admin_client=bunker))
def test_policy_authorizes_required_signing_kinds():
# Kinds the spire signs as its OWN identity, confirmed against the
# consumer signing sites in bitspire#52 (2026-06-18). A missing kind is a
# silent bunker reject. 22242 = NIP-42 relay AUTH (must be bunker-signed —
# it proves control of spire_pubkey). nip04 stays out (v1 path is dead).
kinds = {r["kind"] for r in SPIRE_POLICY_RULES if r["method"] == "sign_event"}
assert {21000, 30078, 22242} <= kinds
assert "nip04_encrypt" not in SPIRE_POLICY_METHODS_NO_KIND
assert "nip04_decrypt" not in SPIRE_POLICY_METHODS_NO_KIND