diff --git a/distribution.py b/distribution.py index b4dfbeb..fc94f9c 100644 --- a/distribution.py +++ b/distribution.py @@ -391,28 +391,7 @@ async def process_settlement(settlement_id: str) -> None: try: await _pay_super_fee(settlement, machine, super_config, errors) await _pay_operator_splits(settlement, machine, errors) - # DCA distribution: applies to cash_out (LPs share the principal - # the customer paid into BTC). Does NOT apply to cash_in — that - # flow is liquidity coming IN to the operator's wallet, not - # going OUT to LPs. Skip with an audit row so the operator - # dashboard surfaces "DCA intentionally skipped for cash_in - # settlement" rather than displaying a phantom missing leg. - # See aiolabs/satmachineadmin#22 (S8 — wire cash-in path). - if settlement.tx_type == "cash_out": - await _pay_dca_distributions(settlement, machine, errors) - else: - await _record_skipped_leg( - settlement, - machine, - leg_type="dca", - amount_sats=settlement.principal_sats, - reason=( - f"DCA distribution does not apply to tx_type=" - f"{settlement.tx_type!r}; principal stays in the " - "operator's wallet as liquidity received from the " - "cash-in customer." - ), - ) + await _pay_dca_distributions(settlement, machine, errors) except Exception as exc: # last-resort guard logger.exception("distribution: unexpected error processing settlement") errors.append(f"unexpected: {exc}") diff --git a/nostr_publish.py b/nostr_publish.py new file mode 100644 index 0000000..e2851c9 --- /dev/null +++ b/nostr_publish.py @@ -0,0 +1,288 @@ +# Satoshi Machine v2 — kind:30078 publisher (S4 — NIP-78 fleet roster). +# +# Publishes operator-signed kind:30078 (NIP-78 addressable) events to the +# `nostrclient` LNbits extension on every machine CRUD event. Two d-tags: +# +# - `bitspire-config:` — per-machine config, one event each +# - `bitspire-fleet` — aggregate roster across operator's +# active fleet +# +# Read flow for external observers (status pages, the future lnbits +# nostr-transport roster-gating in S6, etc.): +# +# REQ ... {"kinds": [30078], "authors": [], +# "#d": ["bitspire-config:"]} → per-machine config +# REQ ... {"kinds": [30078], "authors": [], +# "#d": ["bitspire-fleet"]} → fleet roster +# +# Soft-failure model: publishing is best-effort. If the operator has no +# Nostr keypair on file (account never went through Nostr-login flow), if +# the `nostrclient` extension isn't installed, or if no relays are +# currently connected, the publish logs a warning and returns. The +# settlement / distribution path does NOT depend on the publish — these +# events exist for external observers, not internal flow control. +# +# Cross-codebase: this is the producer side. Future ATM-side consumer +# (lamassu-next) reads kind:30078 events with `#p=` to learn +# its operator's config; future LNbits-server-side roster-gating +# (lnbits#14 Item 3, S6) reads `bitspire-fleet` events to gate auto- +# account creation. Both are out of scope for this commit. + +from __future__ import annotations + +import json +import time +from typing import Optional + +import coincurve +from lnbits.core.crud.users import get_account +from lnbits.utils.nostr import sign_event +from loguru import logger + +from .crud import get_machines_for_operator +from .models import Machine + +_KIND_NIP78 = 30078 +_D_TAG_CONFIG_PREFIX = "bitspire-config:" +_D_TAG_FLEET = "bitspire-fleet" + + +def _machine_config_d_tag(machine_id: str) -> str: + return f"{_D_TAG_CONFIG_PREFIX}{machine_id}" + + +async def _publish_signed_event(signed_event: dict) -> None: + """Send a signed Nostr event to all configured relays via the + `nostrclient` extension's singleton RelayManager. + + Lazy import + try/except so satmachineadmin doesn't hard-fail at boot + when nostrclient isn't installed — pattern matches the cross-extension + import guards in `lnbits.core.services.users` (nostrmarket / nostrrelay). + """ + try: + from nostrclient.router import nostr_client # type: ignore[import-not-found] + except ImportError: + d_tag = next( + (t[1] for t in signed_event.get("tags", []) if t and t[0] == "d"), + "?", + ) + logger.warning( + "satmachineadmin: nostrclient extension not installed; " + f"skipping kind:{signed_event.get('kind')} publish " + f"(d={d_tag})" + ) + return + msg = json.dumps(["EVENT", signed_event]) + nostr_client.relay_manager.publish_message(msg) + + +async def _sign_as_operator( + operator_user_id: str, event: dict +) -> Optional[dict]: + """Sign `event` on behalf of the operator. + + Mutates `event` to add `created_at` (now), `pubkey`, `id`, and `sig`. + Returns the signed event; returns None (with a warning log) if the + operator account doesn't have an available signer — covers + (a) accounts created via non-Nostr login that never set up identity, + (b) accounts where the server has only the pubkey + (`ClientSideOnlySigner`), + (c) post-bunker future (lnbits#18) where signing routes through a + NIP-46 bunker that isn't reachable. + + Soft-failure is the right behaviour — publishing kind:30078 is a + side-effect of machine CRUD, not a precondition for it. The machine + row still gets written; only the public-facing event is skipped. + + Routing: post-`aiolabs/lnbits#17` (signer abstraction) we go through + `lnbits.core.signers.resolve_signer`, which transparently handles + `LocalSigner` (envelope-encrypted nsec at rest, decrypted on demand) + and `ClientSideOnlySigner` (raises `SignerUnavailableError` — we + treat as soft-fail). On pre-#17 lnbits versions the import fails and + we fall back to a direct `account.prvkey` read so this code keeps + working during the #17 cascade rollout window. Both paths produce + identical signed events; the hybrid avoids a hard ordering + dependency between this extension and the lnbits #17 PR landing. + """ + account = await get_account(operator_user_id) + if account is None or not account.pubkey: + logger.warning( + f"satmachineadmin: operator {operator_user_id[:8]}... has no " + f"Nostr pubkey on file; skipping kind:{event['kind']} publish. " + "Onboard via the LNbits Nostr-login flow." + ) + return None + + # `created_at` is part of the BIP-340 event-id hash; must be set + # before signing so both code paths below see the same value. + event["created_at"] = int(time.time()) + + try: + from lnbits.core.signers import ( # type: ignore[import-not-found] + SignerError, + SignerUnavailableError, + resolve_signer, + ) + except ImportError: + # Pre-#17 lnbits — direct prvkey read. Identical to the + # original implementation; the abstraction takes over once + # #17 cascades to this host. + if not account.prvkey: + logger.warning( + f"satmachineadmin: operator {operator_user_id[:8]}... has " + f"no signing key on file; skipping kind:{event['kind']} " + f"publish. Onboard via the LNbits Nostr-login flow, or " + f"wait for aiolabs/lnbits#18 bunker integration." + ) + return None + private_key = coincurve.PrivateKey(bytes.fromhex(account.prvkey)) + return sign_event(event, account.pubkey, private_key) + + # Post-#17 lnbits — route through the signer abstraction. + try: + signer = resolve_signer(account) + except SignerError as exc: + logger.warning( + f"satmachineadmin: signer resolve failed for operator " + f"{operator_user_id[:8]}...: {exc}. Skipping kind:" + f"{event['kind']} publish." + ) + return None + + if not signer.can_sign(): + logger.warning( + f"satmachineadmin: operator {operator_user_id[:8]}... has a " + f"client-side-only signer; server can't publish kind:" + f"{event['kind']} on their behalf. Wait for bunker " + f"integration (lnbits#18) or operator-driven publishing." + ) + return None + + try: + return signer.sign_event(event) + except SignerUnavailableError as exc: + logger.warning( + f"satmachineadmin: signer unavailable for operator " + f"{operator_user_id[:8]}...: {exc}. Skipping kind:" + f"{event['kind']} publish." + ) + return None + + +async def publish_machine_config(machine: Machine) -> None: + """Publish a per-machine kind:30078 config event signed by the operator. + + Idempotent — kind:30078 is replaceable, keyed by `(author_pubkey, + d-tag)`. Re-publishing simply replaces the previous version at + compliant relays. Safe to call from any CRUD path that mutates + machine fields visible in the published payload. + + Tagged with `p=` so external observers can find the + config event for a specific ATM via a single filter: + `{"kinds": [30078], "#p": [""]}`. + """ + content = json.dumps( + { + "atm_pubkey": machine.machine_npub, + "machine_id": machine.id, + "name": machine.name, + "location": machine.location, + "fiat_code": machine.fiat_code, + "is_active": machine.is_active, + } + ) + event: dict = { + "kind": _KIND_NIP78, + "tags": [ + ["d", _machine_config_d_tag(machine.id)], + ["p", machine.machine_npub], + ], + "content": content, + } + signed = await _sign_as_operator(machine.operator_user_id, event) + if signed is not None: + await _publish_signed_event(signed) + + +async def publish_fleet_roster(operator_user_id: str) -> None: + """Publish the operator's aggregate fleet roster as kind:30078. + + Lists every active machine's `atm_pubkey` + basic display fields. + External observers consume this to answer "is this ATM npub a real + machine of operator X?" without having to enumerate `bitspire-config:*` + events. The future LNbits-side roster-gating (S6 / lnbits#14 Item 3) + will read this event to gate auto-account-from-npub. + + Tagged with one `p=` per active machine — lets a filter + by ATM pubkey return both the machine's own config event AND the + operator's roster that lists it. Replaceable; safe to re-publish + after every CRUD. + """ + machines = await get_machines_for_operator(operator_user_id) + active = [m for m in machines if m.is_active] + content = json.dumps( + { + "machines": [ + { + "atm_pubkey": m.machine_npub, + "machine_id": m.id, + "name": m.name, + "location": m.location, + "fiat_code": m.fiat_code, + } + for m in active + ], + } + ) + event: dict = { + "kind": _KIND_NIP78, + "tags": [ + ["d", _D_TAG_FLEET], + *[["p", m.machine_npub] for m in active], + ], + "content": content, + } + signed = await _sign_as_operator(operator_user_id, event) + if signed is not None: + await _publish_signed_event(signed) + + +async def tombstone_machine_config( + operator_user_id: str, machine_id: str, machine_npub: str +) -> None: + """Mark a per-machine config event as deleted (tombstone pattern). + + kind:30078 is replaceable — the operator can't truly "delete" the + event, but they can replace it with a marker. Two NIP-compliant + options: + (a) Publish a new kind:30078 with the same d-tag whose content + signals deletion (e.g. `{"deleted": true}`). Relays replace + the previous payload but keep the tombstone. External readers + treat `content.deleted == true` as "this machine is gone." + (b) Publish a kind:5 (NIP-09 deletion) referencing the (kind, + d-tag) of the to-be-deleted event. Compliant relays drop the + original; non-compliant relays may keep both. + + We use (a) — pragmatic, survives non-NIP-09 relays, and the content + is small enough that the storage cost is negligible. The fleet + roster publish that follows on a delete then omits the machine, + so the roster + the tombstone together tell the full story. + """ + content = json.dumps( + { + "machine_id": machine_id, + "deleted": True, + "deleted_at": int(time.time()), + } + ) + event: dict = { + "kind": _KIND_NIP78, + "tags": [ + ["d", _machine_config_d_tag(machine_id)], + ["p", machine_npub], + ], + "content": content, + } + signed = await _sign_as_operator(operator_user_id, event) + if signed is not None: + await _publish_signed_event(signed) diff --git a/tasks.py b/tasks.py index 7d77f0e..6e0e8cb 100644 --- a/tasks.py +++ b/tasks.py @@ -73,41 +73,13 @@ async def wait_for_paid_invoices() -> None: async def _handle_payment(payment: Payment) -> None: - if not payment.success: + if not payment.is_in or not payment.success: return machine = await get_active_machine_by_wallet_id(payment.wallet_id) if machine is None: return extra = payment.extra or {} - # Two axes, deliberately named in pairs to avoid the inversion trap - # documented at `~/.claude/projects/.../memory/feedback_naming_business_vs_protocol.md`: - # - # - is_lightning_inbound / is_lightning_outbound: PROTOCOL direction - # at the operator's wallet. `payment.is_in` from LNbits. - # - tx_type ∈ {"cash_out", "cash_in"}: BUSINESS direction at the ATM. - # Sourced from Payment.extra (canonical, stamped by bitSpire). - # - # Canonical mapping: - # cash_out ↔ is_lightning_inbound (customer pays ATM's invoice in BTC, - # operator wallet receives sats) - # cash_in ↔ is_lightning_outbound (customer redeems ATM's LNURL- - # withdraw, operator wallet sends sats) - # - # Process BOTH directions; reject mismatches at the discriminator gate. - is_lightning_inbound = payment.is_in - is_lightning_outbound = not payment.is_in - - # Outbound payments from the operator's wallet need an extra - # discriminator before we touch them. An operator may legitimately - # send sats for non-ATM reasons (manual send, different extension, - # etc.). Without `source=bitspire` on Payment.extra we can't tell - # the operator paying their landlord from a cash-in settlement — - # skip silently. (For cash-out / inbound payments we already gate - # on machine-owned wallet via `get_active_machine_by_wallet_id`.) - if is_lightning_outbound and extra.get("source") != "bitspire": - return - # 1) Attribution FIRST — uses only `extra.nostr_sender_pubkey` (no parse # needed). If this fails, every subsequent field on `extra` is # attacker-controlled and untrustworthy — record a minimal rejected @@ -140,26 +112,6 @@ async def _handle_payment(payment: Payment) -> None: await _record_rejected(payment, machine, exc) return - # Cross-axis sanity: protocol direction must agree with business - # direction per the canonical mapping above. A mismatch means - # something upstream is confused — refuse to process. Concrete - # symptom this catches: an attacker (or a buggy extension) stamps - # `source=bitspire, type=cash_out` on an outbound payment from the - # operator's wallet to attempt a fake "we just received sats" row. - expected_inbound = data.tx_type == "cash_out" - if is_lightning_inbound != expected_inbound: - await _record_rejected( - payment, - machine, - SettlementInvariantError( - f"direction mismatch: payment.is_in={is_lightning_inbound} " - f"but tx_type={data.tx_type!r}. Expected cash_out ↔ inbound, " - "cash_in ↔ outbound." - ), - ) - return - del is_lightning_outbound # only used for the discriminator above - # Stamp the originating Nostr event id (the kind-21000 create_invoice # RPC) onto the row for post-hoc forensics — an auditor can trace # settlement → RPC event → signing key without trusting our DB. diff --git a/views_api.py b/views_api.py index 93ceeeb..6491f31 100644 --- a/views_api.py +++ b/views_api.py @@ -53,6 +53,11 @@ from .distribution import ( process_settlement, settle_lp_balance, ) +from .nostr_publish import ( + publish_fleet_roster, + publish_machine_config, + tombstone_machine_config, +) from .models import ( AppendSettlementNoteData, ClientBalanceSummary, @@ -104,6 +109,12 @@ async def api_create_machine( ) -> Machine: await _assert_wallet_owned_by(data.wallet_id, user.id) machine = await create_machine(user.id, data) + # NIP-78 (kind:30078) publish — operator-signed per-machine config + + # refreshed fleet roster so external observers see the new machine. + # Soft-failure: best-effort, log + skip if nostrclient or operator key + # not available (see nostr_publish module docstring). + await publish_machine_config(machine) + await publish_fleet_roster(user.id) return machine @@ -142,6 +153,11 @@ async def api_update_machine( updated = await update_machine(machine_id, data) if updated is None: raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + # Re-publish: per-machine config picks up the changed fields, fleet + # roster picks up any is_active flip (deactivated machines drop out + # of the roster but their per-machine config stays — replaceable). + await publish_machine_config(updated) + await publish_fleet_roster(user.id) return updated @@ -155,6 +171,12 @@ async def api_delete_machine( if machine is None or machine.operator_user_id != user.id: raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") await delete_machine(machine_id) + # Tombstone the per-machine config (replaceable event with + # `content.deleted=true`) + refresh the roster without the machine. + # External readers see the tombstone OR the absence from the roster + # and treat it as gone. + await tombstone_machine_config(user.id, machine_id, machine.machine_npub) + await publish_fleet_roster(user.id) # =============================================================================