diff --git a/bitspire.py b/bitspire.py index 195c0a9..9c60432 100644 --- a/bitspire.py +++ b/bitspire.py @@ -65,6 +65,52 @@ def is_bitspire_payment(extra: dict) -> bool: return isinstance(extra, dict) and extra.get("source") == BITSPIRE_SOURCE +class SettlementAttributionError(ValueError): + """The signer of the kind-21000 invoice doesn't match the machine identity. + + Raised by `assert_nostr_attribution`. The caller records the + settlement with `status='rejected'` and the exception message in + `error_message`, then skips distribution. + """ + + +def assert_nostr_attribution(machine: Machine, extra: dict) -> None: + """Assert that the originating Nostr signer pubkey matches the machine. + + Reads `extra["nostr_sender_pubkey"]` — populated by LNbits' + nostr-transport dispatcher from the signature-verified kind-21000 + event that triggered invoice creation (aiolabs/lnbits PR #4, S5/G5). + Normalises both sides to lowercase hex via + `lnbits.utils.nostr.normalize_public_key` (the UI lets operators + enter either hex or `npub1...` bech32 for `machine.machine_npub`). + + Raises `SettlementAttributionError` if the stamp is missing, + unparseable, or doesn't match. In v2 every bitSpire ATM creates + invoices via nostr-transport, so a settlement landing on a machine + wallet without the stamp means the invoice was issued by some other + path (HTTP API, manual UI, a different extension) — always wrong + for a `dca_machines` wallet. + """ + sender_pubkey = _coerce_str(extra.get("nostr_sender_pubkey")) + if not sender_pubkey: + raise SettlementAttributionError( + "missing nostr_sender_pubkey on Payment.extra — invoice was not " + "issued through the nostr-transport path" + ) + from lnbits.utils.nostr import normalize_public_key + + try: + expected = normalize_public_key(machine.machine_npub).lower() + actual = normalize_public_key(sender_pubkey).lower() + except (ValueError, AssertionError) as exc: + raise SettlementAttributionError(f"unparseable pubkey: {exc}") from exc + if expected != actual: + raise SettlementAttributionError( + f"signer {actual[:12]}... does not match " + f"machine identity {expected[:12]}..." + ) + + def parse_settlement( machine: Machine, payment_hash: str, diff --git a/crud.py b/crud.py index 948852c..8fd40d3 100644 --- a/crud.py +++ b/crud.py @@ -69,9 +69,7 @@ async def update_super_config(data: UpdateSuperConfigData) -> Optional[SuperConf # ============================================================================= -async def create_machine( - operator_user_id: str, data: CreateMachineData -) -> Machine: +async def create_machine(operator_user_id: str, data: CreateMachineData) -> Machine: machine_id = urlsafe_short_hash() now = datetime.now() await db.execute( @@ -143,9 +141,7 @@ async def get_machines_for_operator(operator_user_id: str) -> List[Machine]: ) -async def update_machine( - machine_id: str, data: UpdateMachineData -) -> Optional[Machine]: +async def update_machine(machine_id: str, data: UpdateMachineData) -> Optional[Machine]: update_data = {k: v for k, v in data.dict().items() if v is not None} if not update_data: return await get_machine(machine_id) @@ -308,9 +304,7 @@ async def delete_dca_client(client_id: str) -> None: # ============================================================================= -async def create_deposit( - creator_user_id: str, data: CreateDepositData -) -> DcaDeposit: +async def create_deposit(creator_user_id: str, data: CreateDepositData) -> DcaDeposit: deposit_id = urlsafe_short_hash() await db.execute( """ @@ -422,11 +416,24 @@ async def delete_deposit(deposit_id: str) -> None: async def create_settlement_idempotent( data: CreateDcaSettlementData, + initial_status: str, + error_message: Optional[str] = None, ) -> Optional[DcaSettlement]: - """Insert a settlement keyed by payment_hash. Returns the inserted row on - first sight; returns the existing row if the payment_hash was already seen - (subscription replay, dispatcher double-fire). The UNIQUE constraint on - payment_hash is the source of truth.""" + """Insert a settlement keyed by payment_hash. + + Returns the inserted row on first sight; returns the existing row + if the payment_hash was already seen (subscription replay, + dispatcher double-fire). The UNIQUE constraint on payment_hash is + the source of truth. + + `initial_status` is the row's status at insert time. Normal + settlements arrive as 'pending' and the distribution processor + transitions them through 'processing' → 'processed' / 'errored'. + A row that fails the Nostr attribution cross-check (bitspire. + assert_nostr_attribution) is inserted directly as 'rejected' with + the failure reason in `error_message` — never goes near the + distribution path. + """ existing = await get_settlement_by_payment_hash(data.payment_hash) if existing is not None: return existing @@ -438,12 +445,13 @@ async def create_settlement_idempotent( gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats, commission_sats, platform_fee_sats, operator_fee_sats, used_fallback_split, tx_type, bills_json, cassettes_json, - status, created_at) + status, error_message, created_at) VALUES (:id, :machine_id, :payment_hash, :bitspire_event_id, :bitspire_txid, :gross_sats, :fiat_amount, :fiat_code, :exchange_rate, :net_sats, :commission_sats, :platform_fee_sats, :operator_fee_sats, :used_fallback_split, - :tx_type, :bills_json, :cassettes_json, :status, :created_at) + :tx_type, :bills_json, :cassettes_json, :status, + :error_message, :created_at) """, { "id": settlement_id, @@ -463,7 +471,8 @@ async def create_settlement_idempotent( "tx_type": data.tx_type, "bills_json": data.bills_json, "cassettes_json": data.cassettes_json, - "status": "pending", + "status": initial_status, + "error_message": error_message, "created_at": datetime.now(), }, ) @@ -511,18 +520,34 @@ async def get_stuck_settlements_for_operator( ) -> dict: """Operator worklist of settlements that didn't process cleanly. - Returns a dict with three keyed lists: - - 'errored': any status='errored' for this operator (no age filter — - operators always want to see these) - - 'stuck_pending': status='pending' AND older than threshold (listener - crashed before invoking process_settlement) + Returns a dict with four keyed lists: + - 'rejected': any status='rejected' (Nostr attribution cross-check + failed — signer didn't match the machine identity). Distinct + from 'errored' because retry is wrong: the row was misrouted, + not operationally failed. Operator must investigate the machine. + - 'errored': any status='errored' (distribution failed for an + operational reason — wallet error, network, downstream payment). + Operator retries from this bucket. + - 'stuck_pending': status='pending' AND older than threshold + (listener crashed before invoking process_settlement). - 'stuck_processing': status='processing' AND older than threshold (processor crashed mid-flight; processing_claim is set but no - completion landed) + completion landed). """ from datetime import timedelta threshold_at = datetime.now() - timedelta(minutes=threshold_minutes) + rejected = await db.fetchall( + """ + SELECT s.* + FROM satoshimachine.dca_settlements s + JOIN satoshimachine.dca_machines m ON m.id = s.machine_id + WHERE m.operator_user_id = :uid AND s.status = 'rejected' + ORDER BY s.created_at DESC + """, + {"uid": operator_user_id}, + DcaSettlement, + ) errored = await db.fetchall( """ SELECT s.* @@ -561,6 +586,7 @@ async def get_stuck_settlements_for_operator( DcaSettlement, ) return { + "rejected": rejected, "errored": errored, "stuck_pending": stuck_pending, "stuck_processing": stuck_processing, diff --git a/models.py b/models.py index 3b0025b..fe652b4 100644 --- a/models.py +++ b/models.py @@ -220,7 +220,16 @@ class DcaSettlement(BaseModel): tx_type: str bills_json: Optional[str] cassettes_json: Optional[str] - status: str # 'pending' | 'processed' | 'partial' | 'refunded' | 'errored' + # 'pending' (default at insert) + # 'processing' (claim taken by distribution processor) + # 'processed' (all legs paid) + # 'partial' (operator marked partial-dispense after the fact) + # 'refunded' (operator-initiated refund) + # 'errored' (operational distribution failure — retry path applies) + # 'rejected' (Nostr attribution cross-check failed at land time; + # never went near distribution. error_message holds the + # reason. Retry is wrong — investigate the machine.) + status: str error_message: Optional[str] processed_at: Optional[datetime] created_at: datetime @@ -433,21 +442,27 @@ class PartialDispenseData(BaseModel): class StuckSettlementsResponse(BaseModel): """Operator worklist surfacing settlements that didn't process cleanly. - Three categories, segregated so the UI can render them with appropriate - affordances (retry / investigate / force-error): + Four categories, segregated so the UI can render them with the + right affordances (investigate / retry / force-error): - - errored: distribution failed; one or more legs reported a payment + - rejected: Nostr attribution cross-check failed at land time — + the kind-21000 invoice signer didn't match the machine identity. + Distribution never ran. Retry is *wrong* for these: the row was + misrouted, not operationally failed. Operator investigates the + machine. + - errored: distribution ran and one or more legs reported a payment error. Operator retry endpoint handles these directly. - - stuck_pending: landed but never picked up by the processor (listener - crashed before invoking process_settlement, or the claim was lost). - Older than `threshold_minutes`. + - stuck_pending: landed but never picked up by the processor + (listener crashed before invoking process_settlement, or the + claim was lost). Older than `threshold_minutes`. - stuck_processing: claim was taken but no completion in - `threshold_minutes`. The processor likely crashed mid-flight. + `threshold_minutes`. Processor likely crashed mid-flight. Operator can force-recover via POST .../force-reset. """ threshold_minutes: int - errored: list # list[DcaSettlement] + rejected: list # list[DcaSettlement] + errored: list stuck_pending: list stuck_processing: list diff --git a/static/js/index.js b/static/js/index.js index 5d8091e..33c58c6 100644 --- a/static/js/index.js +++ b/static/js/index.js @@ -33,7 +33,8 @@ const SETTLEMENT_STATUS_COLOR = { processed: 'green', partial: 'orange', refunded: 'purple', - errored: 'red' + errored: 'red', + rejected: 'deep-orange' } window.app = Vue.createApp({ @@ -69,6 +70,7 @@ window.app = Vue.createApp({ // Worklist (P9g) worklist: { + rejected: [], errored: [], stuck_pending: [], stuck_processing: [], @@ -262,6 +264,13 @@ window.app = Vue.createApp({ }, worklistBuckets() { return [ + { + key: 'rejected', + label: 'Rejected — Nostr attribution failed; investigate machine', + icon: 'gpp_bad', + color: 'deep-orange', + rows: this.worklist.rejected + }, { key: 'errored', label: 'Errored — needs retry', @@ -443,6 +452,7 @@ window.app = Vue.createApp({ try { const {data} = await LNbits.api.request('GET', STUCK_PATH) this.worklistCount = + (data?.rejected?.length || 0) + (data?.errored?.length || 0) + (data?.stuck_pending?.length || 0) + (data?.stuck_processing?.length || 0) @@ -457,10 +467,12 @@ window.app = Vue.createApp({ const {data} = await LNbits.api.request( 'GET', `${STUCK_PATH}?threshold_minutes=${this.worklistThreshold}` ) + this.worklist.rejected = data?.rejected || [] this.worklist.errored = data?.errored || [] this.worklist.stuck_pending = data?.stuck_pending || [] this.worklist.stuck_processing = data?.stuck_processing || [] this.worklist.totalCount = + this.worklist.rejected.length + this.worklist.errored.length + this.worklist.stuck_pending.length + this.worklist.stuck_processing.length diff --git a/tasks.py b/tasks.py index 1e6cf48..6bd8f13 100644 --- a/tasks.py +++ b/tasks.py @@ -18,7 +18,11 @@ from lnbits.core.models import Payment from lnbits.tasks import register_invoice_listener from loguru import logger -from .bitspire import parse_settlement +from .bitspire import ( + SettlementAttributionError, + assert_nostr_attribution, + parse_settlement, +) from .crud import ( create_settlement_idempotent, get_active_machine_by_wallet_id, @@ -59,16 +63,50 @@ async def _handle_payment(payment: Payment) -> None: machine = await get_active_machine_by_wallet_id(payment.wallet_id) if machine is None: return + extra = payment.extra or {} super_config = await get_super_config() super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0 data, used_fallback = parse_settlement( machine=machine, payment_hash=payment.payment_hash, gross_sats=payment.sat, - extra=payment.extra or {}, + extra=extra, super_fee_pct=super_fee_pct, ) - settlement = await create_settlement_idempotent(data) + # Stamp the originating Nostr event id (the kind-21000 create_invoice + # RPC) onto the row for post-hoc forensics — pairs with the + # assert_nostr_attribution check below so an auditor can trace + # settlement -> RPC event -> signing key without trusting our DB. + nostr_event_id = extra.get("nostr_event_id") + if isinstance(nostr_event_id, str) and nostr_event_id: + data.bitspire_event_id = nostr_event_id + + # Cross-check the signature-verified signer pubkey (stamped by + # LNbits' nostr-transport dispatcher onto Payment.extra) against + # the machine identity. Routing today is wallet_id-only with no + # cryptographic binding — this restores end-to-end attribution + # between "the npub that asked LNbits for the invoice" and "the + # machine we're crediting" (aiolabs/satmachineadmin#19, G5). + try: + assert_nostr_attribution(machine, extra) + except SettlementAttributionError as exc: + rejected = await create_settlement_idempotent( + data, initial_status="rejected", error_message=str(exc) + ) + if rejected is None: + logger.error( + f"satmachineadmin: failed to insert rejected settlement for " + f"payment_hash={payment.payment_hash[:12]}..." + ) + return + logger.error( + f"satmachineadmin: rejected settlement {rejected.id} " + f"(machine={machine.machine_npub[:12]}..., " + f"payment_hash={payment.payment_hash[:12]}...): {exc}" + ) + return + + settlement = await create_settlement_idempotent(data, initial_status="pending") if settlement is None: logger.error( f"satmachineadmin: failed to insert settlement for " @@ -93,5 +131,3 @@ async def _handle_payment(payment: Payment) -> None: task = asyncio.create_task(process_settlement(settlement.id)) _inflight_distributions.add(task) task.add_done_callback(_inflight_distributions.discard) - - diff --git a/templates/satmachineadmin/index.html b/templates/satmachineadmin/index.html index 6e43866..2bdf02c 100644 --- a/templates/satmachineadmin/index.html +++ b/templates/satmachineadmin/index.html @@ -653,7 +653,7 @@ @click="confirmRetryFromWorklist(props.row)"> Retry distribution - diff --git a/tests/test_nostr_attribution.py b/tests/test_nostr_attribution.py new file mode 100644 index 0000000..84877de --- /dev/null +++ b/tests/test_nostr_attribution.py @@ -0,0 +1,112 @@ +""" +Tests for `bitspire.assert_nostr_attribution` — the S5 consumer-side +cross-check that pairs the signature-verified signer pubkey LNbits +stamps onto Payment.extra (post aiolabs/lnbits PR #4) with the machine +record we're about to credit. + +In v2 every bitSpire ATM creates invoices via nostr-transport, so any +inbound payment landing on a `dca_machines` wallet must carry +`extra["nostr_sender_pubkey"]` and that pubkey must canonicalise to +the same hex as `machine.machine_npub`. Anything else raises +`SettlementAttributionError` and the listener records the row with +`status='rejected'` instead of distributing. +""" + +from datetime import datetime, timezone + +import pytest + +from ..bitspire import SettlementAttributionError, assert_nostr_attribution +from ..models import Machine + +# A real Nostr pubkey pair (hex + canonical bech32). Throwaway fixture — +# never used to sign anything live. +_PUBKEY_HEX = "82341f882b6eabcbd6b1c2da5cd14df14b8e91dd0e6da41a72b78ad8f3a7d3b9" +_PUBKEY_NPUB = "npub1sg6plzptd64uh443ctd9e52d799caywapek6gxnjk79d3ua86wuszhap5a" +_OTHER_HEX = "deadbeef" * 8 + + +def _machine(npub: str) -> Machine: + now = datetime.now(timezone.utc) + return Machine( + id="m1", + operator_user_id="op1", + machine_npub=npub, + wallet_id="w1", + name="sintra-1", + location=None, + fiat_code="EUR", + is_active=True, + fallback_commission_pct=0.05, + created_at=now, + updated_at=now, + ) + + +def test_returns_silently_when_sender_hex_matches_machine_hex(): + assert_nostr_attribution( + _machine(_PUBKEY_HEX), + {"source": "bitspire", "nostr_sender_pubkey": _PUBKEY_HEX}, + ) + + +def test_returns_silently_when_sender_hex_matches_machine_bech32(): + """Operator entered npub1... in the UI; LNbits stamps hex. Both must + normalise to the same canonical hex before comparison.""" + assert_nostr_attribution( + _machine(_PUBKEY_NPUB), + {"source": "bitspire", "nostr_sender_pubkey": _PUBKEY_HEX}, + ) + + +def test_returns_silently_under_case_variance(): + assert_nostr_attribution( + _machine(_PUBKEY_HEX.upper()), + {"nostr_sender_pubkey": _PUBKEY_HEX.lower()}, + ) + + +@pytest.mark.parametrize( + "extra", + [ + {}, + {"source": "bitspire"}, + {"nostr_sender_pubkey": ""}, + {"nostr_sender_pubkey": None}, + ], +) +def test_raises_when_attribution_absent(extra): + """Every cash-out invoice goes through nostr-transport in v2; a + settlement reaching a machine wallet without `nostr_sender_pubkey` + means it was issued by some other path (HTTP API, manual UI, a + different extension). Always wrong for a `dca_machines` wallet.""" + with pytest.raises(SettlementAttributionError) as exc: + assert_nostr_attribution(_machine(_PUBKEY_HEX), extra) + assert "missing nostr_sender_pubkey" in str(exc.value) + + +def test_raises_when_sender_differs_from_machine(): + with pytest.raises(SettlementAttributionError) as exc: + assert_nostr_attribution( + _machine(_PUBKEY_HEX), + {"nostr_sender_pubkey": _OTHER_HEX}, + ) + assert "does not match" in str(exc.value) + + +def test_raises_when_sender_pubkey_unparseable(): + with pytest.raises(SettlementAttributionError) as exc: + assert_nostr_attribution( + _machine(_PUBKEY_HEX), + {"nostr_sender_pubkey": "not-a-real-pubkey"}, + ) + assert "unparseable pubkey" in str(exc.value) + + +def test_raises_when_machine_npub_unparseable(): + with pytest.raises(SettlementAttributionError) as exc: + assert_nostr_attribution( + _machine("not-a-real-pubkey"), + {"nostr_sender_pubkey": _PUBKEY_HEX}, + ) + assert "unparseable pubkey" in str(exc.value) diff --git a/views_api.py b/views_api.py index e09ea00..7cdd4db 100644 --- a/views_api.py +++ b/views_api.py @@ -105,9 +105,7 @@ async def api_create_machine( return await create_machine(user.id, data) -@satmachineadmin_api_router.get( - "/api/v1/dca/machines", response_model=list[Machine] -) +@satmachineadmin_api_router.get("/api/v1/dca/machines", response_model=list[Machine]) async def api_list_machines( user: User = Depends(check_user_exists), ) -> list[Machine]: @@ -183,9 +181,7 @@ async def _client_owned_by(client_id: str, user_id: str) -> DcaClient: return client -@satmachineadmin_api_router.post( - "/api/v1/dca/clients", response_model=DcaClient -) +@satmachineadmin_api_router.post("/api/v1/dca/clients", response_model=DcaClient) async def api_create_client( data: CreateDcaClientData, user: User = Depends(check_user_exists) ) -> DcaClient: @@ -194,9 +190,7 @@ async def api_create_client( return await create_dca_client(data) -@satmachineadmin_api_router.get( - "/api/v1/dca/clients", response_model=list[DcaClient] -) +@satmachineadmin_api_router.get("/api/v1/dca/clients", response_model=list[DcaClient]) async def api_list_clients( machine_id: str | None = None, user: User = Depends(check_user_exists), @@ -306,9 +300,7 @@ async def _deposit_owned_by(deposit_id: str, user_id: str) -> DcaDeposit: return deposit -@satmachineadmin_api_router.post( - "/api/v1/dca/deposits", response_model=DcaDeposit -) +@satmachineadmin_api_router.post("/api/v1/dca/deposits", response_model=DcaDeposit) async def api_create_deposit( data: CreateDepositData, user: User = Depends(check_user_exists) ) -> DcaDeposit: @@ -322,9 +314,7 @@ async def api_create_deposit( return await create_deposit(user.id, data) -@satmachineadmin_api_router.get( - "/api/v1/dca/deposits", response_model=list[DcaDeposit] -) +@satmachineadmin_api_router.get("/api/v1/dca/deposits", response_model=list[DcaDeposit]) async def api_list_deposits( client_id: str | None = None, user: User = Depends(check_user_exists), @@ -439,8 +429,10 @@ async def api_list_stuck_settlements( ) -> StuckSettlementsResponse: """Operator worklist of settlements that didn't process cleanly. - Returns three lists: - - errored: distribution failed; retry endpoint handles these + Returns four lists: + - rejected: Nostr attribution cross-check failed — signer didn't + match the machine identity. Investigate; do not retry. + - errored: distribution ran and failed; retry endpoint handles these - stuck_pending: landed but never picked up by the processor - stuck_processing: claim taken but no completion in N minutes @@ -448,12 +440,11 @@ async def api_list_stuck_settlements( Operators can force-recover stuck-processing settlements via POST /api/v1/dca/settlements/{id}/force-reset.""" if threshold_minutes < 1: - raise HTTPException( - HTTPStatus.BAD_REQUEST, "threshold_minutes must be >= 1" - ) + raise HTTPException(HTTPStatus.BAD_REQUEST, "threshold_minutes must be >= 1") buckets = await get_stuck_settlements_for_operator(user.id, threshold_minutes) return StuckSettlementsResponse( threshold_minutes=threshold_minutes, + rejected=buckets["rejected"], errored=buckets["errored"], stuck_pending=buckets["stuck_pending"], stuck_processing=buckets["stuck_processing"], @@ -556,9 +547,7 @@ async def api_force_reset_settlement( ) updated = await force_reset_stuck_settlement(settlement_id) if updated is None: - raise HTTPException( - HTTPStatus.INTERNAL_SERVER_ERROR, "failed to force-reset" - ) + raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR, "failed to force-reset") return updated @@ -648,9 +637,7 @@ async def api_append_settlement_note( # ============================================================================= -@satmachineadmin_api_router.get( - "/api/v1/dca/payments", response_model=list[DcaPayment] -) +@satmachineadmin_api_router.get("/api/v1/dca/payments", response_model=list[DcaPayment]) async def api_list_payments( leg_type: str | None = None, user: User = Depends(check_user_exists), @@ -723,9 +710,7 @@ async def api_delete_commission_splits( # ============================================================================= -@satmachineadmin_api_router.get( - "/api/v1/dca/super-config", response_model=SuperConfig -) +@satmachineadmin_api_router.get("/api/v1/dca/super-config", response_model=SuperConfig) async def api_get_super_config( _user: User = Depends(check_user_exists), ) -> SuperConfig: @@ -734,15 +719,11 @@ async def api_get_super_config( instance-wide; operators see it but can't change it.""" config = await get_super_config() if config is None: - raise HTTPException( - HTTPStatus.NOT_FOUND, "Super config not initialised" - ) + raise HTTPException(HTTPStatus.NOT_FOUND, "Super config not initialised") return config -@satmachineadmin_api_router.put( - "/api/v1/dca/super-config", response_model=SuperConfig -) +@satmachineadmin_api_router.put("/api/v1/dca/super-config", response_model=SuperConfig) async def api_update_super_config( data: UpdateSuperConfigData, _user: User = Depends(check_super_user), @@ -757,5 +738,3 @@ async def api_update_super_config( HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config" ) return config - -