Some checks failed
ci.yml / fix: complete the unpaired-machine sweep + regression test (pull_request) Failing after 0s
Full sweep of every machine_npub deref found one more reachable crash: _record_rejected (tasks.py) logs machine_npub[:12], and the assert_nostr_attribution guard now routes an unpaired machine there, so None[:12] -> TypeError. Fall back to machine.id. Every other deref is safe by the attribution-gate invariant: a settlement only flows past assert_nostr_attribution (now rejecting unpaired) for a paired machine, so the downstream distribution / parse-path / "landed" logs can't see None; the collision-loop display already uses `(m.machine_npub or m.id)`. Adds tests/test_unpaired_machine_guards.py: attribution rejects an unpaired machine with the domain SettlementAttributionError (not AttributeError), and build_state_d_tags skips it. New tests + every guard-affected suite pass. (Two pre-existing test_pair_endpoint failures — #29 drift: fake_pair lacks bunker_relay, and the test DB lacks super_config — are out of scope; filed separately.)
498 lines
21 KiB
Python
498 lines
21 KiB
Python
# Satoshi Machine v2 — invoice listener (P1 + fix bundle 2).
|
|
#
|
|
# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then
|
|
# for each successful inbound payment:
|
|
# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip.
|
|
# 2. Verifies the originating Nostr signer matches the machine identity
|
|
# (assert_nostr_attribution; uses Payment.extra.nostr_sender_pubkey
|
|
# stamped by lnbits nostr-transport dispatcher).
|
|
# 3. Parses Payment.extra for bitSpire's canonical split stamp per
|
|
# aiolabs/lamassu-next#44 (`source: "bitspire"`, principal_sats,
|
|
# fee_sats, exchange_rate). Raises if the stamp is missing or
|
|
# garbage (no more Lamassu-era reverse-derivation fallback).
|
|
# 4. Computes the two-stage split (super_fee first, operator remainder).
|
|
# 5. Inserts a dca_settlements row idempotently (keyed by payment_hash).
|
|
# 6. Spawns the distribution processor on a background task so the
|
|
# LNbits invoice queue (which serves ALL extensions on the node)
|
|
# keeps draining while we move sats. Concurrency is safe because
|
|
# process_settlement now uses an optimistic-lock claim (fix bundle 1).
|
|
#
|
|
# Rejection paths (settlement still recorded with status='rejected' for
|
|
# operator forensics, but distribution is skipped):
|
|
# - SettlementAttributionError: signer mismatch (G5).
|
|
# - SettlementMetadataError: Payment.extra missing bitSpire stamp.
|
|
# - SettlementInvariantError: stamped values violate the canonical
|
|
# sat-amount invariants (range/sum).
|
|
|
|
import asyncio
|
|
|
|
from lnbits.core.models import Payment
|
|
from lnbits.tasks import register_invoice_listener
|
|
from loguru import logger
|
|
|
|
from .bitspire import (
|
|
SettlementAttributionError,
|
|
SettlementInvariantError,
|
|
SettlementMetadataError,
|
|
assert_nostr_attribution,
|
|
parse_settlement,
|
|
)
|
|
from .crud import (
|
|
create_settlement_idempotent,
|
|
get_active_machine_by_wallet_id,
|
|
get_super_config,
|
|
)
|
|
from .distribution import process_settlement
|
|
from .models import CreateDcaSettlementData, Machine
|
|
|
|
LISTENER_NAME = "ext_spirekeeper"
|
|
|
|
# Holds strong refs to in-flight distribution tasks so Python's GC doesn't
|
|
# collect them mid-flight (asyncio.create_task only weakly references its
|
|
# task once awaiters drop). Tasks self-clean by removing themselves on
|
|
# completion via the done_callback below.
|
|
_inflight_distributions: set = set()
|
|
|
|
|
|
async def wait_for_paid_invoices() -> None:
|
|
invoice_queue: asyncio.Queue = asyncio.Queue()
|
|
register_invoice_listener(invoice_queue, LISTENER_NAME)
|
|
logger.info(
|
|
"spirekeeper v2: invoice listener registered as "
|
|
f"`{LISTENER_NAME}` — waiting for bitSpire settlements."
|
|
)
|
|
while True:
|
|
payment: Payment = await invoice_queue.get()
|
|
try:
|
|
await _handle_payment(payment)
|
|
except Exception as exc: # listener must never die
|
|
logger.error(
|
|
f"spirekeeper: error handling payment "
|
|
f"{payment.payment_hash[:12]}...: {exc}"
|
|
)
|
|
|
|
|
|
async def _handle_payment(payment: Payment) -> None:
|
|
if not payment.success:
|
|
return
|
|
machine = await get_active_machine_by_wallet_id(payment.wallet_id)
|
|
if machine is None:
|
|
return
|
|
extra = payment.extra or {}
|
|
|
|
# Two axes, deliberately named in pairs to avoid the inversion trap
|
|
# documented at `~/.claude/projects/.../memory/feedback_naming_business_vs_protocol.md`:
|
|
#
|
|
# - is_lightning_inbound / is_lightning_outbound: PROTOCOL direction
|
|
# at the operator's wallet. `payment.is_in` from LNbits.
|
|
# - tx_type ∈ {"cash_out", "cash_in"}: BUSINESS direction at the ATM.
|
|
# Sourced from Payment.extra (canonical, stamped by bitSpire).
|
|
#
|
|
# Canonical mapping:
|
|
# cash_out ↔ is_lightning_inbound (customer pays ATM's invoice in BTC,
|
|
# operator wallet receives sats)
|
|
# cash_in ↔ is_lightning_outbound (customer redeems ATM's LNURL-
|
|
# withdraw, operator wallet sends sats)
|
|
#
|
|
# Process BOTH directions; reject mismatches at the discriminator gate.
|
|
is_lightning_inbound = payment.is_in
|
|
is_lightning_outbound = not payment.is_in
|
|
|
|
# Outbound payments from the operator's wallet need an extra
|
|
# discriminator before we touch them. An operator may legitimately
|
|
# send sats for non-ATM reasons (manual send, different extension,
|
|
# etc.). Without `source=bitspire` on Payment.extra we can't tell
|
|
# the operator paying their landlord from a cash-in settlement —
|
|
# skip silently. (For cash-out / inbound payments we already gate
|
|
# on machine-owned wallet via `get_active_machine_by_wallet_id`.)
|
|
if is_lightning_outbound and extra.get("source") != "bitspire":
|
|
return
|
|
|
|
# 1) Attribution FIRST — uses only `extra.nostr_sender_pubkey` (no parse
|
|
# needed). If this fails, every subsequent field on `extra` is
|
|
# attacker-controlled and untrustworthy — record a minimal rejected
|
|
# row with placeholder zeros (don't display unverified split numbers
|
|
# in the operator dashboard).
|
|
try:
|
|
assert_nostr_attribution(machine, extra)
|
|
except SettlementAttributionError as exc:
|
|
await _record_rejected(payment, machine, exc)
|
|
return
|
|
|
|
# 2) Parse + invariants. parse_settlement enforces the canonical
|
|
# sat-amount invariants on the bitSpire-stamped numbers (range +
|
|
# direction-specific sum). Raises SettlementMetadataError if the
|
|
# stamp is missing, SettlementInvariantError on any range/sum
|
|
# breach.
|
|
super_config = await get_super_config()
|
|
assert super_config is not None # m001 inserts the default singleton
|
|
try:
|
|
data = parse_settlement(
|
|
machine=machine,
|
|
payment_hash=payment.payment_hash,
|
|
# `payment.sat` is signed by protocol direction (negative for an
|
|
# outbound cash-in payout, positive for an inbound cash-out
|
|
# receipt). The settlement's `wire_sats` is a magnitude — direction
|
|
# is carried separately by `tx_type` — so pass the absolute value.
|
|
wire_sats=abs(payment.sat),
|
|
extra=extra,
|
|
super_config=super_config,
|
|
)
|
|
except (SettlementMetadataError, SettlementInvariantError) as exc:
|
|
await _record_rejected(payment, machine, exc)
|
|
return
|
|
|
|
# Cross-axis sanity: protocol direction must agree with business
|
|
# direction per the canonical mapping above. A mismatch means
|
|
# something upstream is confused — refuse to process. Concrete
|
|
# symptom this catches: an attacker (or a buggy extension) stamps
|
|
# `source=bitspire, type=cash_out` on an outbound payment from the
|
|
# operator's wallet to attempt a fake "we just received sats" row.
|
|
expected_inbound = data.tx_type == "cash_out"
|
|
if is_lightning_inbound != expected_inbound:
|
|
await _record_rejected(
|
|
payment,
|
|
machine,
|
|
SettlementInvariantError(
|
|
f"direction mismatch: payment.is_in={is_lightning_inbound} "
|
|
f"but tx_type={data.tx_type!r}. Expected cash_out ↔ inbound, "
|
|
"cash_in ↔ outbound."
|
|
),
|
|
)
|
|
return
|
|
del is_lightning_outbound # only used for the discriminator above
|
|
|
|
# Stamp the originating Nostr event id (the kind-21000 create_invoice
|
|
# RPC) onto the row for post-hoc forensics — an auditor can trace
|
|
# settlement → RPC event → signing key without trusting our DB.
|
|
nostr_event_id = extra.get("nostr_event_id")
|
|
if isinstance(nostr_event_id, str) and nostr_event_id:
|
|
data.bitspire_event_id = nostr_event_id
|
|
|
|
# 3) Insert + distribute.
|
|
settlement = await create_settlement_idempotent(data, initial_status="pending")
|
|
if settlement is None:
|
|
logger.error(
|
|
f"spirekeeper: failed to insert settlement for "
|
|
f"payment_hash={payment.payment_hash[:12]}..."
|
|
)
|
|
return
|
|
logger.info(
|
|
f"spirekeeper: landed settlement {settlement.id} for "
|
|
f"machine={machine.machine_npub[:12]}... "
|
|
f"wire={data.wire_sats}sats principal={data.principal_sats}sats "
|
|
f"fee={data.fee_sats}sats "
|
|
f"(super_fee={data.platform_fee_sats} "
|
|
f"operator_fee={data.operator_fee_sats})"
|
|
)
|
|
# Spawn distribution on a background task so the LNbits invoice queue
|
|
# (shared across all extensions) keeps draining while we move sats.
|
|
# Concurrency-safe: process_settlement uses claim_settlement_for_processing
|
|
# so a listener re-fire can't double-process. Listener latency is now
|
|
# bounded by the create_settlement_idempotent insert, not by the N+M
|
|
# internal pay_invoice round-trips of a full distribution.
|
|
task = asyncio.create_task(process_settlement(settlement.id))
|
|
_inflight_distributions.add(task)
|
|
task.add_done_callback(_inflight_distributions.discard)
|
|
|
|
|
|
async def _record_rejected(payment: Payment, machine: Machine, exc: Exception) -> None:
|
|
"""Insert a minimal `dca_settlements` row with `status='rejected'` and
|
|
the exception message for operator forensics.
|
|
|
|
Used for every rejection path (attribution / metadata / invariant).
|
|
The split fields are zero placeholders — we deliberately do NOT
|
|
display attacker-supplied numbers in the operator dashboard. The
|
|
wire amount (`payment.sat`) is the only value LNbits authenticated;
|
|
everything else from Payment.extra is untrusted in this branch.
|
|
"""
|
|
data = CreateDcaSettlementData(
|
|
machine_id=machine.id,
|
|
payment_hash=payment.payment_hash,
|
|
# Magnitude, not the signed `payment.sat` (negative for outbound).
|
|
wire_sats=abs(payment.sat),
|
|
fiat_amount=0.0,
|
|
fiat_code=machine.fiat_code,
|
|
exchange_rate=0.0,
|
|
principal_sats=0,
|
|
fee_sats=0,
|
|
platform_fee_sats=0,
|
|
operator_fee_sats=0,
|
|
# The parsed tx_type is unavailable on the rejection path, but the
|
|
# authenticated protocol direction is: an outbound payment is a
|
|
# cash-in, an inbound one a cash-out. Use that so a rejected row shows
|
|
# the right direction instead of always reading "cash-out".
|
|
tx_type="cash_in" if not payment.is_in else "cash_out",
|
|
)
|
|
rejected = await create_settlement_idempotent(
|
|
data, initial_status="rejected", error_message=str(exc)
|
|
)
|
|
if rejected is None:
|
|
logger.error(
|
|
f"spirekeeper: failed to insert rejected settlement for "
|
|
f"payment_hash={payment.payment_hash[:12]}..."
|
|
)
|
|
return
|
|
logger.error(
|
|
f"spirekeeper: rejected settlement {rejected.id} "
|
|
# An unpaired machine (machine_npub None) reaches here now that
|
|
# assert_nostr_attribution rejects it — fall back to the id so the
|
|
# log line doesn't crash on None[:12].
|
|
f"(machine={(machine.machine_npub or machine.id)[:12]}..., "
|
|
f"payment_hash={payment.payment_hash[:12]}...): {exc}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Cassette bootstrap consumer (#29 v1)
|
|
# =============================================================================
|
|
# Subscribes to kind-30078 bitspire-cassettes-state:<atm_pubkey_hex> events
|
|
# published by each active machine's ATM on first boot (lamassu-next#56's
|
|
# bootstrap publish path). Decrypts the NIP-44 v2 content with the operator's
|
|
# privkey + ATM sender pubkey, validates as PublishCassettesPayload, and
|
|
# upserts cassette_configs via apply_bootstrap_state.
|
|
#
|
|
# v1 = one-shot per machine (ATM's meta.bootstrapPublishedAt makes the
|
|
# publish idempotent on ATM-side restart; spirekeeper's apply_bootstrap_
|
|
# state dedups on state_event_id for relay re-delivery).
|
|
#
|
|
# v2 (separate issue) = continuous reverse-channel consumer with a
|
|
# last_state_created_at watermark for reconciliation UI.
|
|
#
|
|
# Implementation: polls nostrclient.router.NostrRouter.received_subscription_
|
|
# events keyed by our subscription_id. nostrclient's NostrRouter design is
|
|
# per-WebSocket-client; the singleton dict it drains into is the only
|
|
# server-side hook to consume events without standing up an in-process
|
|
# websocket. The relay manager is the same singleton publish_to_atm uses,
|
|
# so add_subscription registers a filter against the same relay pool.
|
|
|
|
CASSETTE_BOOTSTRAP_SUB_ID = "spirekeeper-cassette-bootstrap"
|
|
_CASSETTE_POLL_INTERVAL_S = 2.0
|
|
_CASSETTE_BACKOFF_S = 30.0 # when nostrclient isn't installed yet
|
|
|
|
|
|
async def wait_for_cassette_state_events() -> None:
|
|
"""Long-running task: subscribe to bitspire-cassettes-state events from
|
|
every active machine's ATM and upsert cassette_configs on receipt.
|
|
|
|
Pattern mirrors wait_for_paid_invoices (try/except wraps each event,
|
|
never lets the loop die). Re-derives the subscription filter on each
|
|
tick from the current active-machines list — newly-added machines
|
|
start receiving bootstrap events without an LNbits restart.
|
|
|
|
Soft-fail surfaces:
|
|
- nostrclient not installed → log + sleep _CASSETTE_BACKOFF_S
|
|
between retries (operator may install it later)
|
|
- inbound event fails sig-verify / decrypt / parse → log + skip
|
|
the event, continue the loop
|
|
- apply_bootstrap_state errors → log + skip
|
|
"""
|
|
logger.info(
|
|
"spirekeeper v2: cassette bootstrap consumer starting "
|
|
f"(sub_id={CASSETTE_BOOTSTRAP_SUB_ID})"
|
|
)
|
|
current_filter_key: str | None = None
|
|
while True:
|
|
try:
|
|
current_filter_key = await _cassette_consumer_tick(current_filter_key)
|
|
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
|
|
except _NostrclientUnavailable:
|
|
logger.warning(
|
|
"spirekeeper: nostrclient extension not installed; "
|
|
f"cassette bootstrap consumer sleeping {_CASSETTE_BACKOFF_S}s "
|
|
"before retry. Install + activate nostrclient on this "
|
|
"LNbits instance."
|
|
)
|
|
current_filter_key = None
|
|
await asyncio.sleep(_CASSETTE_BACKOFF_S)
|
|
except Exception as exc: # listener must never die
|
|
logger.error(
|
|
f"spirekeeper: cassette consumer loop error (continuing): " f"{exc}"
|
|
)
|
|
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
|
|
|
|
|
|
class _NostrclientUnavailable(Exception):
|
|
"""Internal sentinel — nostrclient extension import failed. Caller
|
|
sleeps a backoff then retries; the operator may install nostrclient
|
|
at any time."""
|
|
|
|
|
|
async def _cassette_consumer_tick(current_filter_key: str | None) -> str:
|
|
"""Single iteration of the bootstrap-consumer loop. Returns the filter
|
|
key used this tick so the caller can detect filter-set changes.
|
|
|
|
Raises _NostrclientUnavailable if nostrclient can't be imported (the
|
|
outer loop backs off + retries).
|
|
"""
|
|
try:
|
|
from nostrclient.router import ( # type: ignore[import-not-found]
|
|
NostrRouter,
|
|
nostr_client,
|
|
)
|
|
except ImportError as exc:
|
|
raise _NostrclientUnavailable() from exc
|
|
|
|
from .cassette_transport import build_state_d_tags_for_machines
|
|
from .crud import (
|
|
apply_bootstrap_state,
|
|
get_machine_by_atm_pubkey_hex,
|
|
list_all_active_machines,
|
|
)
|
|
|
|
machines = await list_all_active_machines()
|
|
d_tags = build_state_d_tags_for_machines(machines)
|
|
filter_key = ",".join(sorted(d_tags))
|
|
|
|
if filter_key != current_filter_key:
|
|
if d_tags:
|
|
filters = [{"kinds": [30078], "#d": d_tags}]
|
|
# nostrclient's add_subscription is typed as list[str] but the
|
|
# actual relay protocol accepts list[Filter-dict] — type ignore
|
|
# the upstream typing mismatch.
|
|
nostr_client.relay_manager.add_subscription(
|
|
CASSETTE_BOOTSTRAP_SUB_ID, filters # type: ignore[arg-type]
|
|
)
|
|
logger.info(
|
|
"spirekeeper: (re)registered cassette bootstrap "
|
|
f"subscription with {len(d_tags)} d-tag(s)"
|
|
)
|
|
else:
|
|
nostr_client.relay_manager.close_subscription(CASSETTE_BOOTSTRAP_SUB_ID)
|
|
logger.info(
|
|
"spirekeeper: no active machines; closed cassette "
|
|
"bootstrap subscription"
|
|
)
|
|
|
|
inbound = NostrRouter.received_subscription_events.get(CASSETTE_BOOTSTRAP_SUB_ID)
|
|
if inbound:
|
|
while inbound:
|
|
event_message = inbound.pop(0)
|
|
try:
|
|
await _handle_cassette_state_event(
|
|
event_message,
|
|
get_machine_by_atm_pubkey_hex,
|
|
apply_bootstrap_state,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
f"spirekeeper: cassette state event handler "
|
|
f"failed (skipping): {exc}"
|
|
)
|
|
|
|
return filter_key
|
|
|
|
|
|
async def _handle_cassette_state_event(
|
|
event_message,
|
|
get_machine_by_atm_pubkey_hex,
|
|
apply_bootstrap_state,
|
|
) -> None:
|
|
"""Verify signature, resolve the operator's signer, decrypt via the
|
|
signer abstraction (bunker round-trip for RemoteBunkerSigner; direct
|
|
prvkey on the LocalSigner transitional fallback inside the transport
|
|
helper), parse, upsert.
|
|
|
|
Each step logs at WARNING (not ERROR) so a noisy attacker can't fill
|
|
the logs — this is data on a public relay, garbage is expected.
|
|
|
|
Two skip outcomes:
|
|
- Terminal (CassetteEventDecodeError / SignerUnavailable /
|
|
OperatorIdentityMissing / etc.): log + return. `apply_bootstrap_
|
|
state` is never called → `state_event_id` is not advanced →
|
|
same event would re-process on next poll cycle but the consumer's
|
|
WARN log surfaces the underlying issue immediately.
|
|
- Transient (CassetteEventTransientError): log at INFO (less noisy)
|
|
+ return. Same retry-via-no-advance semantics, just less
|
|
alarming in the operator log feed.
|
|
"""
|
|
import json as _json
|
|
from datetime import datetime as _datetime
|
|
from datetime import timezone as _timezone
|
|
|
|
from lnbits.utils.nostr import verify_event
|
|
|
|
from .cassette_transport import (
|
|
CassetteEventDecodeError,
|
|
CassetteEventTransientError,
|
|
CassetteTransportError,
|
|
decrypt_and_parse_state_event,
|
|
)
|
|
from .nostr_publish import resolve_operator_signer
|
|
|
|
event_raw = event_message.event
|
|
if isinstance(event_raw, str):
|
|
event_obj = _json.loads(event_raw)
|
|
elif isinstance(event_raw, dict):
|
|
event_obj = event_raw
|
|
else:
|
|
logger.warning(
|
|
f"spirekeeper: cassette event of unexpected type "
|
|
f"{type(event_raw).__name__}; skipping"
|
|
)
|
|
return
|
|
|
|
if not verify_event(event_obj):
|
|
logger.warning(
|
|
f"spirekeeper: cassette state event sig verify failed "
|
|
f"(id={event_obj.get('id', '?')[:12]}...)"
|
|
)
|
|
return
|
|
|
|
sender_pubkey = event_obj.get("pubkey", "")
|
|
machine = await get_machine_by_atm_pubkey_hex(sender_pubkey)
|
|
if machine is None:
|
|
# Unknown sender — could be relay noise or an attacker. Don't
|
|
# treat as our problem.
|
|
logger.warning(
|
|
f"spirekeeper: cassette state event from unknown ATM "
|
|
f"pubkey {sender_pubkey[:12]}... (not in dca_machines); "
|
|
"skipping"
|
|
)
|
|
return
|
|
|
|
try:
|
|
account, signer = await resolve_operator_signer(machine.operator_user_id)
|
|
except CassetteTransportError as exc:
|
|
# OperatorIdentityMissing / SignerUnavailable — log + skip.
|
|
logger.warning(
|
|
f"spirekeeper: can't resolve signer for operator "
|
|
f"{machine.operator_user_id[:8]}... (machine {machine.id}): "
|
|
f"{exc}"
|
|
)
|
|
return
|
|
|
|
try:
|
|
payload = await decrypt_and_parse_state_event(event_obj, account, signer)
|
|
except CassetteEventTransientError as exc:
|
|
logger.info(
|
|
f"spirekeeper: cassette state event for machine {machine.id} "
|
|
f"hit a transient signer error (will retry next poll): {exc}"
|
|
)
|
|
return
|
|
except CassetteEventDecodeError as exc:
|
|
logger.warning(
|
|
f"spirekeeper: cassette state event decode failed for "
|
|
f"machine {machine.id} (id={event_obj.get('id', '?')[:12]}...): "
|
|
f"{exc}"
|
|
)
|
|
return
|
|
|
|
event_id = event_obj.get("id", "")
|
|
created_at_unix = event_obj.get("created_at", 0)
|
|
event_created_at = _datetime.fromtimestamp(int(created_at_unix), tz=_timezone.utc)
|
|
|
|
applied = await apply_bootstrap_state(
|
|
machine.id, event_id, event_created_at, payload
|
|
)
|
|
if applied:
|
|
logger.info(
|
|
f"spirekeeper: applied bootstrap state event {event_id[:12]}... "
|
|
f"to machine {machine.id} ({len(payload.positions)} cassettes)"
|
|
)
|
|
else:
|
|
# Replay: event_id already on file. Normal on relay reconnect.
|
|
logger.debug(
|
|
f"spirekeeper: cassette state event {event_id[:12]}... "
|
|
f"already applied to machine {machine.id} (replay no-op)"
|
|
)
|