feat(pairing): optional token TTL + revoke endpoint (#9/#12, #22)
Some checks failed
ci.yml / feat(pairing): optional token TTL + revoke endpoint (#9/#12, #22) (pull_request) Failing after 0s

Builds on the seed-URL pairing in #21 (stacked).

(b) TTL — PairMachineData.duration_hours (validated > 0) threads through
    pair_spire -> create_new_token (lnbits#55). None = non-expiring.

(c) Revoke — POST /machines/{id}/revoke -> revoke_spire ->
    admin_client.revoke_key_user(spire-<id>). Per spirekeeper#22, revoke
    MUST go through KeyUser.revokedAt (revoke_key_user), NOT token revoke:
    lnbits eager-binds (redeems) the connect token at provision, so
    nsecbunkerd has materialised the policy into per-KeyUser grants its
    ACL checks BEFORE the Token.revokedAt filter -> token revoke is a
    silent no-op. Returns RevokeResult{revoked_count}: >=1 = cut, 0 =
    never bound. set_machine_unpaired clears paired_at (keeps npub +
    bunker_spire_key_name for audit / re-pair).

7 new tests (duration threading + default-None; revoke routes to
revoke_key_user and never token-revoke + error mapping; endpoint wiring
revoke happy/zero/502). 210 green; new code black/ruff-clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-06-18 18:51:54 +02:00
commit a5efdf22a1
6 changed files with 223 additions and 10 deletions

View file

@ -66,7 +66,7 @@ def _wire(monkeypatch, *, pair="ok"):
async def fake_owned(machine_id, user_id):
return _machine()
async def fake_pair(machine, *, relays, admin_client):
async def fake_pair(machine, *, relays, admin_client, duration_hours=None):
if pair == "error":
raise PairingError("boom")
return _result()
@ -118,3 +118,50 @@ def test_pair_failure_maps_to_bad_gateway(monkeypatch):
assert ei.value.status_code == 502
# nothing persisted on failure
assert state["persisted"] is None
def _wire_revoke(monkeypatch, *, revoke="ok", count=2):
state = {"unpaired": None}
async def fake_owned(machine_id, user_id):
return _machine()
async def fake_revoke(machine, *, admin_client):
if revoke == "error":
raise PairingError("boom")
return count
async def fake_unpaired(machine_id):
state["unpaired"] = machine_id
return _machine()
monkeypatch.setattr(views_api, "_machine_owned_by", fake_owned)
monkeypatch.setattr(views_api, "NsecBunkerAdminClient", _FakeAdmin)
monkeypatch.setattr(views_api, "revoke_spire", fake_revoke)
monkeypatch.setattr(views_api, "set_machine_unpaired", fake_unpaired)
return state
def _call_revoke():
user = SimpleNamespace(id="op1")
return asyncio.run(views_api.api_revoke_machine("m1", user))
def test_revoke_cuts_access_and_marks_unpaired(monkeypatch):
state = _wire_revoke(monkeypatch, count=2)
result = _call_revoke()
assert result.revoked_count == 2
assert state["unpaired"] == "m1"
def test_revoke_zero_when_nothing_bound(monkeypatch):
_wire_revoke(monkeypatch, count=0)
assert _call_revoke().revoked_count == 0
def test_revoke_failure_maps_to_bad_gateway(monkeypatch):
state = _wire_revoke(monkeypatch, revoke="error")
with pytest.raises(HTTPException) as ei:
_call_revoke()
assert ei.value.status_code == 502
assert state["unpaired"] is None # not persisted on failure

View file

@ -16,6 +16,7 @@ import json
from datetime import datetime, timezone
import pytest
from lnbits.core.services.nsec_bunker import NsecBunkerError
from lnbits.utils.nostr import hex_to_npub
from ..models import Machine
@ -27,6 +28,7 @@ from ..pairing import (
PairingError,
build_seed_url,
pair_spire,
revoke_spire,
spire_key_name,
)
@ -70,9 +72,10 @@ class FakeBunker:
admin_pubkey = "fake-admin-pubkey"
# pragma: allowlist secret
def __init__(self, *, policies=None, token_secret="s3cr3t"):
def __init__(self, *, policies=None, token_secret="s3cr3t", revoke_count=1):
self._policies = policies or []
self._token_secret = token_secret
self._revoke_count = revoke_count
self.calls: list[tuple] = []
self._next_policy_id = 7
@ -93,8 +96,16 @@ class FakeBunker:
async def add_policy_rule(self, policy_id, rule):
self.calls.append(("add_policy_rule", policy_id, rule))
async def create_new_token(self, key_name, client_name, policy_id):
self.calls.append(("create_new_token", key_name, client_name, policy_id))
async def create_new_token(
self, key_name, client_name, policy_id, duration_hours=None
):
self.calls.append(
("create_new_token", key_name, client_name, policy_id, duration_hours)
)
async def revoke_key_user(self, key_name):
self.calls.append(("revoke_key_user", key_name))
return self._revoke_count
async def get_key_tokens(self, key_name):
self.calls.append(("get_key_tokens", key_name))
@ -251,3 +262,46 @@ def test_build_seed_url_roundtrip():
payload = json.loads(base64.urlsafe_b64decode(blob + "=" * (-len(blob) % 4)))
assert payload["spire_pubkey"] == _SPIRE_HEX
assert payload["relays"] == _RELAYS
def test_pair_threads_duration_hours():
bunker = FakeBunker()
asyncio.run(
pair_spire(
_machine(),
relays=_RELAYS,
admin_client=bunker,
bunker_relay=_BUNKER_RELAY,
keystore_passphrase=_PASSPHRASE,
duration_hours=720,
)
)
# create_new_token tuple is (name, key, client, policy_id, duration_hours)
assert bunker.named("create_new_token")[0][4] == 720
def test_pair_default_duration_is_none():
bunker = FakeBunker()
_pair(bunker) # no duration_hours
assert bunker.named("create_new_token")[0][4] is None
def test_revoke_spire_calls_revoke_key_user():
# revoke MUST go through revoke_key_user (KeyUser.revokedAt), not token
# revoke — token revoke is a no-op once redeemed (spirekeeper#22).
bunker = FakeBunker(revoke_count=2)
count = asyncio.run(revoke_spire(_machine(), admin_client=bunker))
assert count == 2
assert bunker.named("revoke_key_user") == [("revoke_key_user", "spire-m1")]
assert not bunker.named("revoke_token") # never token-revoke
def test_revoke_spire_maps_bunker_error():
bunker = FakeBunker()
async def _boom(key_name):
raise NsecBunkerError("nope")
bunker.revoke_key_user = _boom
with pytest.raises(PairingError, match="revoke"):
asyncio.run(revoke_spire(_machine(), admin_client=bunker))