satmachineadmin/tasks.py
Padreug aeaee1f568 refactor(v2): extract kind-30078 publish primitives to nostr_publish.py (#39 1/3)
Layer 2 prep: a second consumer (fee_transport.py for #39) is about to
land that uses the same operator-signer + NIP-44 v2 + nostrclient publish
flow as cassette_transport.py. Extracting shared primitives now rather
than duplicating ~100 lines.

New `nostr_publish.py` module:
- Error hierarchy: NostrPublishError base + OperatorIdentityMissing,
  SignerUnavailable, RelayUnavailable subclasses (all transport-layer
  failures, domain-agnostic).
- `resolve_operator_signer(operator_user_id)` — fetch account + resolve
  to NostrSigner, with the can-sign + has-pubkey checks.
- `sign_as_operator(operator_user_id, event)` — wrap signer.sign_event,
  set created_at before signing.
- `nip44_encrypt_via_signer` + `nip44_decrypt_via_signer` — transitional
  LocalSigner → RemoteBunkerSigner cascade (bunker handles natively;
  LocalSigner falls back to hand-rolled NIP-44 v2 against the stored
  prvkey).
- `publish_signed_event(signed)` — nostrclient relay-manager publish
  with lazy import + RelayUnavailable on missing extension.
- High-level `publish_encrypted_kind_30078(operator_user_id,
  recipient_pubkey_hex, d_tag, payload)` — builds event, encrypts via
  signer, signs, publishes. The whole flow in one call; callers
  (cassette_transport, soon fee_transport) just specify domain.

`cassette_transport.py`:
- Imports from nostr_publish; CassetteTransportError becomes a subclass
  of NostrPublishError so existing catches still work.
- `publish_to_atm` reduces to a thin wrapper that builds the
  cassette-specific payload + d-tag and delegates to
  `publish_encrypted_kind_30078`.
- Consumer path (`decrypt_and_parse_state_event`) still owns
  cassette-specific decode/transient distinctions; uses imported
  `nip44_decrypt_via_signer`.
- Re-exports OperatorIdentityMissing / SignerUnavailable /
  RelayUnavailable so views_api can keep importing from
  cassette_transport without change.

`tasks.py` — cassette bootstrap consumer imports `resolve_operator_signer`
from nostr_publish directly instead of the cassette_transport
underscore-prefixed name.

164/164 tests green; behavior unchanged.

Refs: aiolabs/satmachineadmin#37 (parent), #39 (Layer 2, this commit
is prep).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 19:54:08 +02:00

490 lines
20 KiB
Python

# Satoshi Machine v2 — invoice listener (P1 + fix bundle 2).
#
# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then
# for each successful inbound payment:
# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip.
# 2. Verifies the originating Nostr signer matches the machine identity
# (assert_nostr_attribution; uses Payment.extra.nostr_sender_pubkey
# stamped by lnbits nostr-transport dispatcher).
# 3. Parses Payment.extra for bitSpire's canonical split stamp per
# aiolabs/lamassu-next#44 (`source: "bitspire"`, principal_sats,
# fee_sats, exchange_rate). Raises if the stamp is missing or
# garbage (no more Lamassu-era reverse-derivation fallback).
# 4. Computes the two-stage split (super_fee first, operator remainder).
# 5. Inserts a dca_settlements row idempotently (keyed by payment_hash).
# 6. Spawns the distribution processor on a background task so the
# LNbits invoice queue (which serves ALL extensions on the node)
# keeps draining while we move sats. Concurrency is safe because
# process_settlement now uses an optimistic-lock claim (fix bundle 1).
#
# Rejection paths (settlement still recorded with status='rejected' for
# operator forensics, but distribution is skipped):
# - SettlementAttributionError: signer mismatch (G5).
# - SettlementMetadataError: Payment.extra missing bitSpire stamp.
# - SettlementInvariantError: stamped values violate the canonical
# sat-amount invariants (range/sum).
import asyncio
from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
from loguru import logger
from .bitspire import (
SettlementAttributionError,
SettlementInvariantError,
SettlementMetadataError,
assert_nostr_attribution,
parse_settlement,
)
from .crud import (
create_settlement_idempotent,
get_active_machine_by_wallet_id,
get_super_config,
)
from .distribution import process_settlement
from .models import CreateDcaSettlementData, Machine
LISTENER_NAME = "ext_satmachineadmin"
# Holds strong refs to in-flight distribution tasks so Python's GC doesn't
# collect them mid-flight (asyncio.create_task only weakly references its
# task once awaiters drop). Tasks self-clean by removing themselves on
# completion via the done_callback below.
_inflight_distributions: set = set()
async def wait_for_paid_invoices() -> None:
invoice_queue: asyncio.Queue = asyncio.Queue()
register_invoice_listener(invoice_queue, LISTENER_NAME)
logger.info(
"satmachineadmin v2: invoice listener registered as "
f"`{LISTENER_NAME}` — waiting for bitSpire settlements."
)
while True:
payment: Payment = await invoice_queue.get()
try:
await _handle_payment(payment)
except Exception as exc: # listener must never die
logger.error(
f"satmachineadmin: error handling payment "
f"{payment.payment_hash[:12]}...: {exc}"
)
async def _handle_payment(payment: Payment) -> None:
if not payment.success:
return
machine = await get_active_machine_by_wallet_id(payment.wallet_id)
if machine is None:
return
extra = payment.extra or {}
# Two axes, deliberately named in pairs to avoid the inversion trap
# documented at `~/.claude/projects/.../memory/feedback_naming_business_vs_protocol.md`:
#
# - is_lightning_inbound / is_lightning_outbound: PROTOCOL direction
# at the operator's wallet. `payment.is_in` from LNbits.
# - tx_type ∈ {"cash_out", "cash_in"}: BUSINESS direction at the ATM.
# Sourced from Payment.extra (canonical, stamped by bitSpire).
#
# Canonical mapping:
# cash_out ↔ is_lightning_inbound (customer pays ATM's invoice in BTC,
# operator wallet receives sats)
# cash_in ↔ is_lightning_outbound (customer redeems ATM's LNURL-
# withdraw, operator wallet sends sats)
#
# Process BOTH directions; reject mismatches at the discriminator gate.
is_lightning_inbound = payment.is_in
is_lightning_outbound = not payment.is_in
# Outbound payments from the operator's wallet need an extra
# discriminator before we touch them. An operator may legitimately
# send sats for non-ATM reasons (manual send, different extension,
# etc.). Without `source=bitspire` on Payment.extra we can't tell
# the operator paying their landlord from a cash-in settlement —
# skip silently. (For cash-out / inbound payments we already gate
# on machine-owned wallet via `get_active_machine_by_wallet_id`.)
if is_lightning_outbound and extra.get("source") != "bitspire":
return
# 1) Attribution FIRST — uses only `extra.nostr_sender_pubkey` (no parse
# needed). If this fails, every subsequent field on `extra` is
# attacker-controlled and untrustworthy — record a minimal rejected
# row with placeholder zeros (don't display unverified split numbers
# in the operator dashboard).
try:
assert_nostr_attribution(machine, extra)
except SettlementAttributionError as exc:
await _record_rejected(payment, machine, exc)
return
# 2) Parse + invariants. parse_settlement enforces the canonical
# sat-amount invariants on the bitSpire-stamped numbers (range +
# direction-specific sum). Raises SettlementMetadataError if the
# stamp is missing, SettlementInvariantError on any range/sum
# breach.
super_config = await get_super_config()
assert super_config is not None # m001 inserts the default singleton
try:
data = parse_settlement(
machine=machine,
payment_hash=payment.payment_hash,
wire_sats=payment.sat,
extra=extra,
super_config=super_config,
)
except (SettlementMetadataError, SettlementInvariantError) as exc:
await _record_rejected(payment, machine, exc)
return
# Cross-axis sanity: protocol direction must agree with business
# direction per the canonical mapping above. A mismatch means
# something upstream is confused — refuse to process. Concrete
# symptom this catches: an attacker (or a buggy extension) stamps
# `source=bitspire, type=cash_out` on an outbound payment from the
# operator's wallet to attempt a fake "we just received sats" row.
expected_inbound = data.tx_type == "cash_out"
if is_lightning_inbound != expected_inbound:
await _record_rejected(
payment,
machine,
SettlementInvariantError(
f"direction mismatch: payment.is_in={is_lightning_inbound} "
f"but tx_type={data.tx_type!r}. Expected cash_out ↔ inbound, "
"cash_in ↔ outbound."
),
)
return
del is_lightning_outbound # only used for the discriminator above
# Stamp the originating Nostr event id (the kind-21000 create_invoice
# RPC) onto the row for post-hoc forensics — an auditor can trace
# settlement → RPC event → signing key without trusting our DB.
nostr_event_id = extra.get("nostr_event_id")
if isinstance(nostr_event_id, str) and nostr_event_id:
data.bitspire_event_id = nostr_event_id
# 3) Insert + distribute.
settlement = await create_settlement_idempotent(data, initial_status="pending")
if settlement is None:
logger.error(
f"satmachineadmin: failed to insert settlement for "
f"payment_hash={payment.payment_hash[:12]}..."
)
return
logger.info(
f"satmachineadmin: landed settlement {settlement.id} for "
f"machine={machine.machine_npub[:12]}... "
f"wire={data.wire_sats}sats principal={data.principal_sats}sats "
f"fee={data.fee_sats}sats "
f"(super_fee={data.platform_fee_sats} "
f"operator_fee={data.operator_fee_sats})"
)
# Spawn distribution on a background task so the LNbits invoice queue
# (shared across all extensions) keeps draining while we move sats.
# Concurrency-safe: process_settlement uses claim_settlement_for_processing
# so a listener re-fire can't double-process. Listener latency is now
# bounded by the create_settlement_idempotent insert, not by the N+M
# internal pay_invoice round-trips of a full distribution.
task = asyncio.create_task(process_settlement(settlement.id))
_inflight_distributions.add(task)
task.add_done_callback(_inflight_distributions.discard)
async def _record_rejected(payment: Payment, machine: Machine, exc: Exception) -> None:
"""Insert a minimal `dca_settlements` row with `status='rejected'` and
the exception message for operator forensics.
Used for every rejection path (attribution / metadata / invariant).
The split fields are zero placeholders — we deliberately do NOT
display attacker-supplied numbers in the operator dashboard. The
wire amount (`payment.sat`) is the only value LNbits authenticated;
everything else from Payment.extra is untrusted in this branch.
"""
data = CreateDcaSettlementData(
machine_id=machine.id,
payment_hash=payment.payment_hash,
wire_sats=payment.sat,
fiat_amount=0.0,
fiat_code=machine.fiat_code,
exchange_rate=0.0,
principal_sats=0,
fee_sats=0,
platform_fee_sats=0,
operator_fee_sats=0,
# tx_type is unknown for rejection paths; default to cash_out
# (the only direction currently wired). When S8 lands the
# listener will branch on tx_type from extra, and this default
# gets revisited.
tx_type="cash_out",
)
rejected = await create_settlement_idempotent(
data, initial_status="rejected", error_message=str(exc)
)
if rejected is None:
logger.error(
f"satmachineadmin: failed to insert rejected settlement for "
f"payment_hash={payment.payment_hash[:12]}..."
)
return
logger.error(
f"satmachineadmin: rejected settlement {rejected.id} "
f"(machine={machine.machine_npub[:12]}..., "
f"payment_hash={payment.payment_hash[:12]}...): {exc}"
)
# =============================================================================
# Cassette bootstrap consumer (#29 v1)
# =============================================================================
# Subscribes to kind-30078 bitspire-cassettes-state:<atm_pubkey_hex> events
# published by each active machine's ATM on first boot (lamassu-next#56's
# bootstrap publish path). Decrypts the NIP-44 v2 content with the operator's
# privkey + ATM sender pubkey, validates as PublishCassettesPayload, and
# upserts cassette_configs via apply_bootstrap_state.
#
# v1 = one-shot per machine (ATM's meta.bootstrapPublishedAt makes the
# publish idempotent on ATM-side restart; satmachineadmin's apply_bootstrap_
# state dedups on state_event_id for relay re-delivery).
#
# v2 (separate issue) = continuous reverse-channel consumer with a
# last_state_created_at watermark for reconciliation UI.
#
# Implementation: polls nostrclient.router.NostrRouter.received_subscription_
# events keyed by our subscription_id. nostrclient's NostrRouter design is
# per-WebSocket-client; the singleton dict it drains into is the only
# server-side hook to consume events without standing up an in-process
# websocket. The relay manager is the same singleton publish_to_atm uses,
# so add_subscription registers a filter against the same relay pool.
CASSETTE_BOOTSTRAP_SUB_ID = "satmachineadmin-cassette-bootstrap"
_CASSETTE_POLL_INTERVAL_S = 2.0
_CASSETTE_BACKOFF_S = 30.0 # when nostrclient isn't installed yet
async def wait_for_cassette_state_events() -> None:
"""Long-running task: subscribe to bitspire-cassettes-state events from
every active machine's ATM and upsert cassette_configs on receipt.
Pattern mirrors wait_for_paid_invoices (try/except wraps each event,
never lets the loop die). Re-derives the subscription filter on each
tick from the current active-machines list — newly-added machines
start receiving bootstrap events without an LNbits restart.
Soft-fail surfaces:
- nostrclient not installed → log + sleep _CASSETTE_BACKOFF_S
between retries (operator may install it later)
- inbound event fails sig-verify / decrypt / parse → log + skip
the event, continue the loop
- apply_bootstrap_state errors → log + skip
"""
logger.info(
"satmachineadmin v2: cassette bootstrap consumer starting "
f"(sub_id={CASSETTE_BOOTSTRAP_SUB_ID})"
)
current_filter_key: str | None = None
while True:
try:
current_filter_key = await _cassette_consumer_tick(current_filter_key)
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
except _NostrclientUnavailable:
logger.warning(
"satmachineadmin: nostrclient extension not installed; "
f"cassette bootstrap consumer sleeping {_CASSETTE_BACKOFF_S}s "
"before retry. Install + activate nostrclient on this "
"LNbits instance."
)
current_filter_key = None
await asyncio.sleep(_CASSETTE_BACKOFF_S)
except Exception as exc: # listener must never die
logger.error(
f"satmachineadmin: cassette consumer loop error (continuing): " f"{exc}"
)
await asyncio.sleep(_CASSETTE_POLL_INTERVAL_S)
class _NostrclientUnavailable(Exception):
"""Internal sentinel — nostrclient extension import failed. Caller
sleeps a backoff then retries; the operator may install nostrclient
at any time."""
async def _cassette_consumer_tick(current_filter_key: str | None) -> str:
"""Single iteration of the bootstrap-consumer loop. Returns the filter
key used this tick so the caller can detect filter-set changes.
Raises _NostrclientUnavailable if nostrclient can't be imported (the
outer loop backs off + retries).
"""
try:
from nostrclient.router import ( # type: ignore[import-not-found]
NostrRouter,
nostr_client,
)
except ImportError as exc:
raise _NostrclientUnavailable() from exc
from .cassette_transport import build_state_d_tags_for_machines
from .crud import (
apply_bootstrap_state,
get_machine_by_atm_pubkey_hex,
list_all_active_machines,
)
machines = await list_all_active_machines()
d_tags = build_state_d_tags_for_machines(machines)
filter_key = ",".join(sorted(d_tags))
if filter_key != current_filter_key:
if d_tags:
filters = [{"kinds": [30078], "#d": d_tags}]
# nostrclient's add_subscription is typed as list[str] but the
# actual relay protocol accepts list[Filter-dict] — type ignore
# the upstream typing mismatch.
nostr_client.relay_manager.add_subscription(
CASSETTE_BOOTSTRAP_SUB_ID, filters # type: ignore[arg-type]
)
logger.info(
"satmachineadmin: (re)registered cassette bootstrap "
f"subscription with {len(d_tags)} d-tag(s)"
)
else:
nostr_client.relay_manager.close_subscription(CASSETTE_BOOTSTRAP_SUB_ID)
logger.info(
"satmachineadmin: no active machines; closed cassette "
"bootstrap subscription"
)
inbound = NostrRouter.received_subscription_events.get(CASSETTE_BOOTSTRAP_SUB_ID)
if inbound:
while inbound:
event_message = inbound.pop(0)
try:
await _handle_cassette_state_event(
event_message,
get_machine_by_atm_pubkey_hex,
apply_bootstrap_state,
)
except Exception as exc:
logger.warning(
f"satmachineadmin: cassette state event handler "
f"failed (skipping): {exc}"
)
return filter_key
async def _handle_cassette_state_event(
event_message,
get_machine_by_atm_pubkey_hex,
apply_bootstrap_state,
) -> None:
"""Verify signature, resolve the operator's signer, decrypt via the
signer abstraction (bunker round-trip for RemoteBunkerSigner; direct
prvkey on the LocalSigner transitional fallback inside the transport
helper), parse, upsert.
Each step logs at WARNING (not ERROR) so a noisy attacker can't fill
the logs — this is data on a public relay, garbage is expected.
Two skip outcomes:
- Terminal (CassetteEventDecodeError / SignerUnavailable /
OperatorIdentityMissing / etc.): log + return. `apply_bootstrap_
state` is never called → `state_event_id` is not advanced →
same event would re-process on next poll cycle but the consumer's
WARN log surfaces the underlying issue immediately.
- Transient (CassetteEventTransientError): log at INFO (less noisy)
+ return. Same retry-via-no-advance semantics, just less
alarming in the operator log feed.
"""
import json as _json
from datetime import datetime as _datetime
from datetime import timezone as _timezone
from lnbits.utils.nostr import verify_event
from .cassette_transport import (
CassetteEventDecodeError,
CassetteEventTransientError,
CassetteTransportError,
decrypt_and_parse_state_event,
)
from .nostr_publish import resolve_operator_signer
event_raw = event_message.event
if isinstance(event_raw, str):
event_obj = _json.loads(event_raw)
elif isinstance(event_raw, dict):
event_obj = event_raw
else:
logger.warning(
f"satmachineadmin: cassette event of unexpected type "
f"{type(event_raw).__name__}; skipping"
)
return
if not verify_event(event_obj):
logger.warning(
f"satmachineadmin: cassette state event sig verify failed "
f"(id={event_obj.get('id', '?')[:12]}...)"
)
return
sender_pubkey = event_obj.get("pubkey", "")
machine = await get_machine_by_atm_pubkey_hex(sender_pubkey)
if machine is None:
# Unknown sender — could be relay noise or an attacker. Don't
# treat as our problem.
logger.warning(
f"satmachineadmin: cassette state event from unknown ATM "
f"pubkey {sender_pubkey[:12]}... (not in dca_machines); "
"skipping"
)
return
try:
account, signer = await resolve_operator_signer(machine.operator_user_id)
except CassetteTransportError as exc:
# OperatorIdentityMissing / SignerUnavailable — log + skip.
logger.warning(
f"satmachineadmin: can't resolve signer for operator "
f"{machine.operator_user_id[:8]}... (machine {machine.id}): "
f"{exc}"
)
return
try:
payload = await decrypt_and_parse_state_event(event_obj, account, signer)
except CassetteEventTransientError as exc:
logger.info(
f"satmachineadmin: cassette state event for machine {machine.id} "
f"hit a transient signer error (will retry next poll): {exc}"
)
return
except CassetteEventDecodeError as exc:
logger.warning(
f"satmachineadmin: cassette state event decode failed for "
f"machine {machine.id} (id={event_obj.get('id', '?')[:12]}...): "
f"{exc}"
)
return
event_id = event_obj.get("id", "")
created_at_unix = event_obj.get("created_at", 0)
event_created_at = _datetime.fromtimestamp(int(created_at_unix), tz=_timezone.utc)
applied = await apply_bootstrap_state(
machine.id, event_id, event_created_at, payload
)
if applied:
logger.info(
f"satmachineadmin: applied bootstrap state event {event_id[:12]}... "
f"to machine {machine.id} ({len(payload.positions)} cassettes)"
)
else:
# Replay: event_id already on file. Normal on relay reconnect.
logger.debug(
f"satmachineadmin: cassette state event {event_id[:12]}... "
f"already applied to machine {machine.id} (replay no-op)"
)