feat(pairing): seed-URL pairing — operator-side producer (S0 / #9) #21
2 changed files with 508 additions and 0 deletions
feat(pairing): bunker pairing service — mint per-spire key + seed URL (#9)
Operator-side producer for seed-URL pairing (S0/#9, model A1). pair_spire()
orchestrates the nsecbunkerd admin chain via lnbits' NsecBunkerAdminClient:
create_new_key(spire-<id>) -> _ensure_spire_policy -> create_new_token ->
get_key_tokens -> package the <npub>#secret token into a bunker:// URL +
base64url seed URL {spire_npub, spire_pubkey, bunker_url, relays}.
The spire later self-signs all its events as that bunker-held key; lnbits'
path-B roster maps the npub to the operator wallet — no nsec on the spire.
spirekeeper does steps 1-4 only; the NIP-46 connect/bind happens spire-side
(bitspire#52) with the spire's own client keypair.
Scoped policy 'spirekeeper-spire': sign_event 21000/21001-3/30078 + nip44
(kind-less via add_policy_rule). Local _ensure_spire_policy (no cache)
avoids lnbits' admin-pubkey-keyed _ensure_policy cache (policy-name-blind).
9 unit tests with a fake bunker (orchestration, policy reconcile, seed/
bunker:// wire shape, error paths); npub<->hex via lnbits' real helpers.
200 tests green.
Known gaps (lnbits NsecBunkerAdminClient): no token-expiry param, no revoke
RPC — re-pair works; 'revoke spire access' deferred to a bunker follow-up.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
commit
a77f5bcb5c
268
pairing.py
Normal file
268
pairing.py
Normal file
|
|
@ -0,0 +1,268 @@
|
|||
"""Seed-URL pairing for bitSpire machines (S0 / aiolabs/spirekeeper#9), model A1.
|
||||
|
||||
Mints a per-spire signing key *inside the operator's nsecbunkerd*, issues a
|
||||
scoped NIP-46 connect token, and builds the one-shot **seed URL** the spire
|
||||
redeems at first boot. The spire then self-signs all of its own events
|
||||
(kind-21000 cash RPC, kind-30078 beacon + cassette-state, CLINK 21001-21003)
|
||||
as that bunker-held key; lnbits' path-B roster (`nostr_transport/roster.py`)
|
||||
maps the spire npub to the operator's wallet. No nsec ever lands on the
|
||||
spire's disk.
|
||||
|
||||
Division of labour (vs. lnbits' `RemoteBunkerSigner.provision`, which is the
|
||||
reference for the admin chain):
|
||||
|
||||
spirekeeper (here) spire, at first boot (bitspire#52)
|
||||
────────────────── ──────────────────────────────────
|
||||
1. create_new_key 5. NIP-46 connect — redeem the token with a
|
||||
2. _ensure_spire_policy freshly-generated *client* keypair; bunker
|
||||
3. create_new_token binds (client_pubkey → spire key). The
|
||||
4. get_key_tokens ─ seed ─► client_nsec stays on the spire; the
|
||||
(package token in URL) signing key never leaves the bunker.
|
||||
|
||||
We deliberately do NOT run the connect/eager-bind step here: the spire is the
|
||||
NIP-46 client, so the binding must happen spire-side with the spire's own
|
||||
client keypair. spirekeeper only mints + packages.
|
||||
|
||||
Seed URL wire format (contract shared with bitspire#52):
|
||||
|
||||
spire-seed:v1:<base64url(json)> json = {
|
||||
"v": 1,
|
||||
"spire_npub": "npub1…", # the bunker-minted spire identity
|
||||
"spire_pubkey": "<64-hex>", # same key, hex (consumer convenience)
|
||||
"bunker_url": "bunker://<spire_pubkey>?relay=<bunker_relay>&secret=<sec>",
|
||||
"relays": ["wss://…"], # relays for the spire's own events
|
||||
}
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
from urllib.parse import quote
|
||||
|
||||
from lnbits.core.services.nsec_bunker import (
|
||||
NsecBunkerAdminClient,
|
||||
NsecBunkerError,
|
||||
NsecBunkerNotConfiguredError,
|
||||
npub_to_hex,
|
||||
)
|
||||
from lnbits.settings import settings
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .models import Machine
|
||||
|
||||
SEED_URL_SCHEME = "spire-seed:v1:"
|
||||
|
||||
# Policy granted to every spire's connect token. Scoped to exactly what a
|
||||
# bitSpire signs as itself:
|
||||
# - 21000 nostr-transport cash RPC envelope to lnbits
|
||||
# - 21001-21003 CLINK Offer / Debit / Manage (payment flow)
|
||||
# - 30078 NIP-78 beacon + bitspire-cassettes-state hello-event
|
||||
# Kind-scoped rules go in create_new_policy; kind-less methods (nip44, for
|
||||
# encrypting cassette-state to the operator) are added via add_policy_rule
|
||||
# because nsecbunkerd's create_new_policy chokes on null `kind`
|
||||
# (rule.kind.toString()). Mirrors lnbits' DEFAULT_POLICY_* split.
|
||||
#
|
||||
# NOTE (reconcile when bitspire#52 lands): confirm this kind set against the
|
||||
# spire's actual createSignedEvent / finalizeEvent call sites. Over-granting
|
||||
# here only widens what a spire may sign *as its own key* — low blast radius —
|
||||
# but under-granting makes the bunker reject the spire's events.
|
||||
SPIRE_POLICY_NAME = "spirekeeper-spire"
|
||||
SPIRE_POLICY_RULES = [
|
||||
{"method": "sign_event", "kind": 21000},
|
||||
{"method": "sign_event", "kind": 21001},
|
||||
{"method": "sign_event", "kind": 21002},
|
||||
{"method": "sign_event", "kind": 21003},
|
||||
{"method": "sign_event", "kind": 30078},
|
||||
]
|
||||
SPIRE_POLICY_METHODS_NO_KIND = ["nip44_encrypt", "nip44_decrypt"]
|
||||
|
||||
|
||||
class PairingError(Exception):
|
||||
"""Pairing could not be completed (bunker unreachable, misconfigured,
|
||||
or returned an unusable response). The caller maps this to a 4xx/5xx;
|
||||
no machine state is mutated on failure."""
|
||||
|
||||
|
||||
class PairResult(BaseModel):
|
||||
"""Output of a successful pair. The API layer persists
|
||||
`bunker_spire_key_name` + `spire_npub` (→ machine_npub) + `paired_at`,
|
||||
and returns `seed_url` to the operator (QR + copy)."""
|
||||
|
||||
spire_npub: str
|
||||
spire_pubkey_hex: str
|
||||
bunker_key_name: str
|
||||
bunker_url: str
|
||||
seed_url: str
|
||||
|
||||
|
||||
def spire_key_name(machine_id: str) -> str:
|
||||
"""The spire's key name in the bunker keystore. Stable across re-pairs
|
||||
so re-issuing a token reuses the same underlying key (create_new_key
|
||||
is replace-by-name on the bunker side)."""
|
||||
return f"spire-{machine_id}"
|
||||
|
||||
|
||||
async def _ensure_spire_policy(client: NsecBunkerAdminClient) -> int:
|
||||
"""Find-or-create the shared `spirekeeper-spire` policy and reconcile
|
||||
(add-only) any missing rules. Returns its bunker-assigned id.
|
||||
|
||||
Mirrors lnbits' `_ensure_policy` but with the spire rule set and **no
|
||||
admin-pubkey cache** — lnbits' cache is keyed on `admin_pubkey` alone
|
||||
(remote_bunker.py:157), so reusing its helper for a second policy name
|
||||
would return the wrong (cached lnbits-default) id. Pairing is an
|
||||
infrequent operator action, so the extra `get_policies` round-trip is
|
||||
cheap and correct. (Filed as an lnbits follow-up: cache key should
|
||||
include policy_name.)
|
||||
|
||||
Add-only, never remove: the policy may be shared across spires (and in
|
||||
principle other consumers); removing a rule would affect them all.
|
||||
"""
|
||||
policies = await client.get_policies()
|
||||
existing = [p for p in policies if p.get("name") == SPIRE_POLICY_NAME]
|
||||
if existing:
|
||||
policy = max(existing, key=lambda p: int(p["id"]))
|
||||
policy_id = int(policy["id"])
|
||||
live_rules = policy.get("rules") or []
|
||||
else:
|
||||
policy_id = await client.create_new_policy(
|
||||
SPIRE_POLICY_NAME, SPIRE_POLICY_RULES
|
||||
)
|
||||
live_rules = list(SPIRE_POLICY_RULES)
|
||||
|
||||
def _norm(method: str, kind) -> tuple[str, str | None]:
|
||||
# nsecbunkerd stores kind as a string; None = kind-less rule.
|
||||
return (method, str(kind) if kind is not None else None)
|
||||
|
||||
live_keys = {
|
||||
_norm(r["method"], r.get("kind"))
|
||||
for r in live_rules
|
||||
if isinstance(r, dict) and r.get("method")
|
||||
}
|
||||
for rule in SPIRE_POLICY_RULES:
|
||||
key = _norm(rule["method"], rule.get("kind"))
|
||||
if key not in live_keys:
|
||||
await client.add_policy_rule(policy_id, rule)
|
||||
live_keys.add(key)
|
||||
for method in SPIRE_POLICY_METHODS_NO_KIND:
|
||||
key = _norm(method, None)
|
||||
if key not in live_keys:
|
||||
await client.add_policy_rule(policy_id, {"method": method})
|
||||
live_keys.add(key)
|
||||
return policy_id
|
||||
|
||||
|
||||
def _recover_token(tokens: list[dict], client_name: str) -> str:
|
||||
"""Pull the freshly-issued `<npub>#<secret>` token out of the bunker's
|
||||
`get_key_tokens` response. Match by client name when the bunker
|
||||
serializes it; otherwise fall back to the most-recent entry (same
|
||||
defensiveness as lnbits' provision())."""
|
||||
matching = [
|
||||
t
|
||||
for t in tokens
|
||||
if t.get("clientName") == client_name or t.get("client_name") == client_name
|
||||
] or tokens
|
||||
if not matching:
|
||||
raise PairingError("bunker returned no tokens after create_new_token")
|
||||
token = matching[-1].get("token")
|
||||
if not isinstance(token, str) or "#" not in token:
|
||||
raise PairingError(f"bunker returned a malformed token: {token!r}")
|
||||
return token
|
||||
|
||||
|
||||
def build_seed_url(
|
||||
*, spire_npub: str, spire_pubkey_hex: str, bunker_url: str, relays: list[str]
|
||||
) -> str:
|
||||
payload = {
|
||||
"v": 1,
|
||||
"spire_npub": spire_npub,
|
||||
"spire_pubkey": spire_pubkey_hex,
|
||||
"bunker_url": bunker_url,
|
||||
"relays": relays,
|
||||
}
|
||||
blob = (
|
||||
base64.urlsafe_b64encode(json.dumps(payload, separators=(",", ":")).encode())
|
||||
.decode()
|
||||
.rstrip("=")
|
||||
)
|
||||
return SEED_URL_SCHEME + blob
|
||||
|
||||
|
||||
async def pair_spire(
|
||||
machine: Machine,
|
||||
*,
|
||||
relays: list[str],
|
||||
admin_client: NsecBunkerAdminClient,
|
||||
bunker_relay: str | None = None,
|
||||
keystore_passphrase: str | None = None,
|
||||
) -> PairResult:
|
||||
"""Mint a bunker-held key + scoped connect token for `machine` and
|
||||
return the seed URL the spire redeems at first boot.
|
||||
|
||||
`admin_client` must already be connected (the caller owns the
|
||||
`async with NsecBunkerAdminClient.from_settings()` context) — keeps
|
||||
connection lifecycle out of the orchestration so this is unit-testable
|
||||
with a fake client.
|
||||
|
||||
`bunker_relay` / `keystore_passphrase` default to the lnbits bunker
|
||||
settings; injectable for tests. `relays` are the relays the spire will
|
||||
use for its *own* events (kind-21000/30078) — typically the operator's
|
||||
nostrrelay; supplied by the API layer.
|
||||
|
||||
Raises PairingError on any bunker failure; no state is persisted here
|
||||
(the API layer persists on success).
|
||||
"""
|
||||
relay = (
|
||||
bunker_relay if bunker_relay is not None else settings.lnbits_nsec_bunker_url
|
||||
)
|
||||
passphrase = (
|
||||
keystore_passphrase
|
||||
if keystore_passphrase is not None
|
||||
else settings.lnbits_nsec_bunker_keystore_passphrase
|
||||
)
|
||||
if not relay:
|
||||
raise PairingError(
|
||||
"LNBITS_NSEC_BUNKER_URL is not set — cannot build a spire bunker connection"
|
||||
)
|
||||
if not passphrase:
|
||||
raise PairingError(
|
||||
"LNBITS_NSEC_BUNKER_KEYSTORE_PASSPHRASE is not set — "
|
||||
"cannot mint a spire key"
|
||||
)
|
||||
if not relays:
|
||||
raise PairingError("at least one relay is required for the seed URL")
|
||||
|
||||
key_name = spire_key_name(machine.id)
|
||||
client_name = f"spire-client-{machine.id}"
|
||||
|
||||
try:
|
||||
spire_npub = await admin_client.create_new_key(key_name, passphrase)
|
||||
spire_pubkey_hex = npub_to_hex(spire_npub)
|
||||
policy_id = await _ensure_spire_policy(admin_client)
|
||||
await admin_client.create_new_token(key_name, client_name, policy_id)
|
||||
tokens = await admin_client.get_key_tokens(key_name)
|
||||
except NsecBunkerNotConfiguredError as exc:
|
||||
raise PairingError(f"nsecbunkerd is not configured: {exc}") from exc
|
||||
except NsecBunkerError as exc:
|
||||
raise PairingError(f"bunker admin RPC failed during pairing: {exc}") from exc
|
||||
|
||||
token = _recover_token(tokens, client_name)
|
||||
_, _, secret = token.partition("#")
|
||||
|
||||
bunker_url = (
|
||||
f"bunker://{spire_pubkey_hex}?relay={quote(relay, safe='')}"
|
||||
f"&secret={quote(secret, safe='')}"
|
||||
)
|
||||
seed_url = build_seed_url(
|
||||
spire_npub=spire_npub,
|
||||
spire_pubkey_hex=spire_pubkey_hex,
|
||||
bunker_url=bunker_url,
|
||||
relays=relays,
|
||||
)
|
||||
return PairResult(
|
||||
spire_npub=spire_npub,
|
||||
spire_pubkey_hex=spire_pubkey_hex,
|
||||
bunker_key_name=key_name,
|
||||
bunker_url=bunker_url,
|
||||
seed_url=seed_url,
|
||||
)
|
||||
240
tests/test_pairing.py
Normal file
240
tests/test_pairing.py
Normal file
|
|
@ -0,0 +1,240 @@
|
|||
"""Unit tests for the seed-URL pairing service (S0 / #9, model A1).
|
||||
|
||||
The bunker admin client is faked — these exercise the orchestration
|
||||
(create_new_key -> ensure-policy -> create_new_token -> get_key_tokens),
|
||||
the policy reconciliation, and the seed-URL / bunker:// wire shape, with
|
||||
no live nsecbunkerd. npub<->hex round-trips through lnbits' real helpers
|
||||
so the parsing is exercised for real.
|
||||
|
||||
Async is driven via asyncio.run (this venv has no pytest-asyncio), matching
|
||||
the rest of the suite.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
from lnbits.utils.nostr import hex_to_npub
|
||||
|
||||
from ..models import Machine
|
||||
from ..pairing import (
|
||||
SEED_URL_SCHEME,
|
||||
SPIRE_POLICY_METHODS_NO_KIND,
|
||||
SPIRE_POLICY_NAME,
|
||||
SPIRE_POLICY_RULES,
|
||||
PairingError,
|
||||
build_seed_url,
|
||||
pair_spire,
|
||||
spire_key_name,
|
||||
)
|
||||
|
||||
_NOW = datetime(2026, 6, 16, tzinfo=timezone.utc)
|
||||
_SPIRE_HEX = "522a4538f1df96508d9ee8b14072344dd4a566acfe03c25a92a39179c6fca891"
|
||||
_SPIRE_NPUB = hex_to_npub(_SPIRE_HEX)
|
||||
_RELAYS = ["wss://lnbits.demo.aiolabs.dev/nostrrelay/demo"]
|
||||
_BUNKER_RELAY = "wss://bunker.internal/relay"
|
||||
_PASSPHRASE = "keystore-pass" # pragma: allowlist secret
|
||||
|
||||
|
||||
def _machine(mid: str = "m1") -> Machine:
|
||||
return Machine(
|
||||
id=mid,
|
||||
operator_user_id="op1",
|
||||
machine_npub="placeholder",
|
||||
wallet_id="w1",
|
||||
name="sintra",
|
||||
location=None,
|
||||
fiat_code="EUR",
|
||||
is_active=True,
|
||||
created_at=_NOW,
|
||||
updated_at=_NOW,
|
||||
)
|
||||
|
||||
|
||||
class FakeBunker:
|
||||
"""Records calls; returns canned bunker responses."""
|
||||
|
||||
# pragma: allowlist secret
|
||||
def __init__(self, *, policies=None, token_secret="s3cr3t"):
|
||||
self._policies = policies or []
|
||||
self._token_secret = token_secret
|
||||
self.calls: list[tuple] = []
|
||||
self._next_policy_id = 7
|
||||
|
||||
async def create_new_key(self, name, passphrase):
|
||||
self.calls.append(("create_new_key", name, passphrase))
|
||||
return _SPIRE_NPUB
|
||||
|
||||
async def get_policies(self):
|
||||
self.calls.append(("get_policies",))
|
||||
return list(self._policies)
|
||||
|
||||
async def create_new_policy(self, name, rules):
|
||||
self.calls.append(("create_new_policy", name, rules))
|
||||
pid = self._next_policy_id
|
||||
self._policies.append({"id": pid, "name": name, "rules": list(rules)})
|
||||
return pid
|
||||
|
||||
async def add_policy_rule(self, policy_id, rule):
|
||||
self.calls.append(("add_policy_rule", policy_id, rule))
|
||||
|
||||
async def create_new_token(self, key_name, client_name, policy_id):
|
||||
self.calls.append(("create_new_token", key_name, client_name, policy_id))
|
||||
|
||||
async def get_key_tokens(self, key_name):
|
||||
self.calls.append(("get_key_tokens", key_name))
|
||||
return [
|
||||
{
|
||||
"clientName": f"spire-client-{key_name.split('-', 1)[1]}",
|
||||
"token": f"{_SPIRE_NPUB}#{self._token_secret}",
|
||||
}
|
||||
]
|
||||
|
||||
def named(self, name):
|
||||
return [c for c in self.calls if c[0] == name]
|
||||
|
||||
|
||||
def _pair(bunker, machine=None):
|
||||
return asyncio.run(
|
||||
pair_spire(
|
||||
machine or _machine(),
|
||||
relays=_RELAYS,
|
||||
admin_client=bunker,
|
||||
bunker_relay=_BUNKER_RELAY,
|
||||
keystore_passphrase=_PASSPHRASE,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_pair_happy_path_mints_key_policy_token():
|
||||
bunker = FakeBunker(token_secret="abc123") # pragma: allowlist secret
|
||||
result = _pair(bunker)
|
||||
|
||||
assert ("create_new_key", "spire-m1", _PASSPHRASE) in bunker.calls
|
||||
assert result.bunker_key_name == spire_key_name("m1") == "spire-m1"
|
||||
|
||||
assert result.spire_npub == _SPIRE_NPUB
|
||||
assert result.spire_pubkey_hex == _SPIRE_HEX
|
||||
|
||||
created = bunker.named("create_new_policy")
|
||||
assert created and created[0][1] == SPIRE_POLICY_NAME
|
||||
token_call = bunker.named("create_new_token")[0]
|
||||
assert token_call[1] == "spire-m1" # key_name
|
||||
assert token_call[2] == "spire-client-m1" # client_name
|
||||
assert token_call[3] == 7 # policy_id from the fake's create_new_policy
|
||||
|
||||
|
||||
def test_bunker_url_carries_pubkey_relay_secret():
|
||||
result = _pair(FakeBunker(token_secret="topsecret")) # pragma: allowlist secret
|
||||
assert result.bunker_url.startswith(f"bunker://{_SPIRE_HEX}?")
|
||||
assert "relay=wss%3A%2F%2Fbunker.internal%2Frelay" in result.bunker_url
|
||||
assert "secret=topsecret" in result.bunker_url
|
||||
|
||||
|
||||
def test_seed_url_decodes_to_contract():
|
||||
result = _pair(FakeBunker(token_secret="zzz")) # pragma: allowlist secret
|
||||
assert result.seed_url.startswith(SEED_URL_SCHEME)
|
||||
blob = result.seed_url[len(SEED_URL_SCHEME) :]
|
||||
payload = json.loads(base64.urlsafe_b64decode(blob + "=" * (-len(blob) % 4)))
|
||||
assert payload == {
|
||||
"v": 1,
|
||||
"spire_npub": _SPIRE_NPUB,
|
||||
"spire_pubkey": _SPIRE_HEX,
|
||||
"bunker_url": result.bunker_url,
|
||||
"relays": _RELAYS,
|
||||
}
|
||||
|
||||
|
||||
def test_fresh_policy_adds_kindless_nip44_rules():
|
||||
bunker = FakeBunker() # no existing policies
|
||||
_pair(bunker)
|
||||
added = [c[2]["method"] for c in bunker.named("add_policy_rule")]
|
||||
# kind-scoped rules went in via create_new_policy; only the kind-less
|
||||
# nip44 methods are reconciled in via add_policy_rule.
|
||||
assert set(added) == set(SPIRE_POLICY_METHODS_NO_KIND)
|
||||
|
||||
|
||||
def test_existing_policy_reused_not_recreated():
|
||||
existing = [
|
||||
{
|
||||
"id": 42,
|
||||
"name": SPIRE_POLICY_NAME,
|
||||
"rules": [dict(r) for r in SPIRE_POLICY_RULES],
|
||||
}
|
||||
]
|
||||
bunker = FakeBunker(policies=existing)
|
||||
_pair(bunker)
|
||||
assert not bunker.named("create_new_policy") # reused, not recreated
|
||||
assert bunker.named("create_new_token")[0][3] == 42 # used existing id
|
||||
added = [c[2]["method"] for c in bunker.named("add_policy_rule")]
|
||||
assert set(added) == set(SPIRE_POLICY_METHODS_NO_KIND)
|
||||
|
||||
|
||||
def test_fully_provisioned_policy_adds_nothing():
|
||||
rules = [dict(r) for r in SPIRE_POLICY_RULES] + [
|
||||
{"method": m, "kind": None} for m in SPIRE_POLICY_METHODS_NO_KIND
|
||||
]
|
||||
bunker = FakeBunker(policies=[{"id": 9, "name": SPIRE_POLICY_NAME, "rules": rules}])
|
||||
_pair(bunker)
|
||||
assert not bunker.named("add_policy_rule")
|
||||
assert not bunker.named("create_new_policy")
|
||||
|
||||
|
||||
def test_malformed_token_raises():
|
||||
bunker = FakeBunker()
|
||||
|
||||
async def _bad_tokens(key_name):
|
||||
_ = key_name
|
||||
return [{"token": "no-hash-here"}]
|
||||
|
||||
bunker.get_key_tokens = _bad_tokens
|
||||
with pytest.raises(PairingError, match="malformed token"):
|
||||
_pair(bunker)
|
||||
|
||||
|
||||
def test_missing_relay_or_passphrase_raises():
|
||||
with pytest.raises(PairingError, match="LNBITS_NSEC_BUNKER_URL"):
|
||||
asyncio.run(
|
||||
pair_spire(
|
||||
_machine(),
|
||||
relays=_RELAYS,
|
||||
admin_client=FakeBunker(),
|
||||
bunker_relay="",
|
||||
keystore_passphrase=_PASSPHRASE,
|
||||
)
|
||||
)
|
||||
with pytest.raises(PairingError, match="PASSPHRASE"):
|
||||
asyncio.run(
|
||||
pair_spire(
|
||||
_machine(),
|
||||
relays=_RELAYS,
|
||||
admin_client=FakeBunker(),
|
||||
bunker_relay=_BUNKER_RELAY,
|
||||
keystore_passphrase="",
|
||||
)
|
||||
)
|
||||
with pytest.raises(PairingError, match="relay is required"):
|
||||
asyncio.run(
|
||||
pair_spire(
|
||||
_machine(),
|
||||
relays=[],
|
||||
admin_client=FakeBunker(),
|
||||
bunker_relay=_BUNKER_RELAY,
|
||||
keystore_passphrase=_PASSPHRASE,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_build_seed_url_roundtrip():
|
||||
url = build_seed_url(
|
||||
spire_npub=_SPIRE_NPUB,
|
||||
spire_pubkey_hex=_SPIRE_HEX,
|
||||
bunker_url="bunker://x?relay=r&secret=s",
|
||||
relays=_RELAYS,
|
||||
)
|
||||
blob = url[len(SEED_URL_SCHEME) :]
|
||||
payload = json.loads(base64.urlsafe_b64decode(blob + "=" * (-len(blob) % 4)))
|
||||
assert payload["spire_pubkey"] == _SPIRE_HEX
|
||||
assert payload["relays"] == _RELAYS
|
||||
Loading…
Add table
Add a link
Reference in a new issue