diff --git a/crud.py b/crud.py index b65826c..dbbe631 100644 --- a/crud.py +++ b/crud.py @@ -528,7 +528,9 @@ async def mark_settlement_status( status: str, error_message: Optional[str] = None, ) -> Optional[DcaSettlement]: - """Status: 'pending' | 'processed' | 'partial' | 'refunded' | 'errored'.""" + """Status: 'pending' | 'processing' | 'processed' | 'partial' | + 'refunded' | 'errored'. Clears processing_claim on terminal states so a + fresh claim attempt won't see a stale token.""" await db.execute( """ UPDATE satoshimachine.dca_settlements @@ -537,6 +539,10 @@ async def mark_settlement_status( processed_at = CASE WHEN :status IN ('processed', 'partial', 'refunded') THEN :now ELSE processed_at + END, + processing_claim = CASE + WHEN :status = 'processing' THEN processing_claim + ELSE NULL END WHERE id = :id """, @@ -550,6 +556,64 @@ async def mark_settlement_status( return await get_settlement(settlement_id) +async def claim_settlement_for_processing( + settlement_id: str, +) -> Optional[DcaSettlement]: + """Optimistic-lock claim: atomically flip a settlement to 'processing' + and tag it with a per-invocation token. Returns the claimed row on + success; None if another caller already won the claim or the settlement + is not in a claimable state ('pending'). + + Pattern is portable across SQLite + PostgreSQL (doesn't rely on + UPDATE ... RETURNING). Two concurrent invocations may both run the + UPDATE, but only one row matches the WHERE clause; the loser's UPDATE + is a no-op against status='processing'. The read-back check on the + token disambiguates.""" + token = urlsafe_short_hash() + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = 'processing', processing_claim = :token + WHERE id = :id AND status = 'pending' + """, + {"id": settlement_id, "token": token}, + ) + after = await get_settlement(settlement_id) + if after is None: + return None + if after.processing_claim != token: + return None + return after + + +async def reset_settlement_for_retry( + settlement_id: str, +) -> Optional[DcaSettlement]: + """Operator retry path. Flips 'errored' → 'pending' and voids any + 'failed' legs so process_settlement re-runs them fresh. Completed legs + are left in place — we never re-pay sats that already moved.""" + await db.execute( + """ + UPDATE satoshimachine.dca_payments + SET status = 'voided' + WHERE settlement_id = :sid AND status = 'failed' + """, + {"sid": settlement_id}, + ) + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = 'pending', + error_message = NULL, + processing_claim = NULL, + processed_at = NULL + WHERE id = :id AND status = 'errored' + """, + {"id": settlement_id}, + ) + return await get_settlement(settlement_id) + + async def apply_partial_dispense( settlement_id: str, *, diff --git a/distribution.py b/distribution.py index 624f53c..e45abfc 100644 --- a/distribution.py +++ b/distribution.py @@ -36,6 +36,7 @@ from .calculations import ( ) from .crud import ( apply_partial_dispense, + claim_settlement_for_processing, count_completed_legs_for_settlement, create_dca_payment, get_client_balance_summary, @@ -297,13 +298,22 @@ async def apply_partial_dispense_and_redistribute( async def process_settlement(settlement_id: str) -> None: - """Process a pending settlement end-to-end. Safe to invoke multiple - times — the status='processed' guard skips already-processed rows.""" - settlement = await get_settlement(settlement_id) + """Process a pending settlement end-to-end. + + Concurrency-safe: an optimistic-lock claim flips the settlement to + 'processing' atomically and tags it with a per-invocation token. + Concurrent invocations on the same id can't both win — losers see the + claim mismatch on read-back and return without writing any legs. + Retries land via reset_settlement_for_retry which voids failed legs + and flips 'errored' back to 'pending'.""" + settlement = await claim_settlement_for_processing(settlement_id) if settlement is None: - logger.warning(f"distribution: settlement {settlement_id} not found") - return - if settlement.status != "pending": + # Either already claimed by a concurrent invocation, or not in a + # 'pending' state. Either way, nothing to do here. + logger.debug( + f"distribution: skip {settlement_id} — not claimable (already " + "processing or not pending)" + ) return machine = await get_machine(settlement.machine_id) if machine is None: diff --git a/migrations.py b/migrations.py index 8db8d0e..fbe3c88 100644 --- a/migrations.py +++ b/migrations.py @@ -446,3 +446,33 @@ async def m006_add_settlement_notes(db): await db.execute( "ALTER TABLE satoshimachine.dca_settlements ADD COLUMN notes TEXT" ) + + +async def m007_settlement_claim_and_machine_wallet_unique(db): + """Security + concurrency hardening (fix bundle 1). + + 1. Adds `processing_claim` to dca_settlements. The settlement processor + uses an optimistic-lock pattern: write a per-invocation claim token + alongside the status='processing' flip, then re-read and confirm the + persisted token matches. Two concurrent process_settlement invocations + on the same id can't both win the claim, so no duplicate leg + creation / double-pay. + + 2. Adds a UNIQUE index on dca_machines.wallet_id so two machine rows + can never claim the same wallet. Closes a wallet-IDOR funds-theft + vector where operator A could register a machine on operator B's + wallet_id and drain it via the settlement processor's pay_invoice. + Defence-in-depth on top of the API-layer ownership check; if a future + endpoint forgets the check, the DB still rejects. + + CREATE UNIQUE INDEX is portable across SQLite and PostgreSQL + (ALTER TABLE ADD CONSTRAINT is not on SQLite). + """ + await db.execute( + "ALTER TABLE satoshimachine.dca_settlements " + "ADD COLUMN processing_claim TEXT" + ) + await db.execute( + "CREATE UNIQUE INDEX dca_machines_wallet_id_uq " + "ON satoshimachine.dca_machines (wallet_id)" + ) diff --git a/models.py b/models.py index f3e94e2..c6509f3 100644 --- a/models.py +++ b/models.py @@ -229,6 +229,11 @@ class DcaSettlement(BaseModel): # entry timestamped + records original values so the overwrite is # auditable from the settlement detail view alone. Never edited in place. notes: Optional[str] = None + # Optimistic-lock claim token written when status flips to 'processing'. + # Two concurrent process_settlement invocations can't both win the claim + # (only one matching read-back). Cleared back to NULL when the leg- + # writing pass completes (status='processed' or 'errored'). + processing_claim: Optional[str] = None # ============================================================================= diff --git a/views_api.py b/views_api.py index 7fc176f..5bd68f5 100644 --- a/views_api.py +++ b/views_api.py @@ -37,6 +37,7 @@ from .crud import ( get_settlements_for_operator, get_super_config, replace_commission_splits, + reset_settlement_for_retry, update_dca_client, update_deposit, update_deposit_status, @@ -45,6 +46,7 @@ from .crud import ( ) from .distribution import ( apply_partial_dispense_and_redistribute, + process_settlement, settle_lp_balance, ) from .models import ( @@ -73,6 +75,19 @@ from .models import ( satmachineadmin_api_router = APIRouter() +async def _assert_wallet_owned_by(wallet_id: str, user_id: str) -> None: + """Defence-in-depth: refuse to bind any DB row to a wallet the caller + doesn't own. Used on every endpoint that accepts a wallet_id from the + request body. The DB-side UNIQUE on dca_machines.wallet_id (m007) is a + second line of defence; this check is the primary gate.""" + wallet = await get_wallet(wallet_id) + if wallet is None or wallet.user != user_id: + raise HTTPException( + HTTPStatus.BAD_REQUEST, + "wallet_id is not owned by the authenticated operator", + ) + + # ============================================================================= # Machines # ============================================================================= @@ -82,6 +97,7 @@ satmachineadmin_api_router = APIRouter() async def api_create_machine( data: CreateMachineData, user: User = Depends(check_user_exists) ) -> Machine: + await _assert_wallet_owned_by(data.wallet_id, user.id) return await create_machine(user.id, data) @@ -117,6 +133,8 @@ async def api_update_machine( machine = await get_machine(machine_id) if machine is None or machine.operator_user_id != user.id: raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") + if data.wallet_id is not None: + await _assert_wallet_owned_by(data.wallet_id, user.id) updated = await update_machine(machine_id, data) if updated is None: raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found") @@ -451,6 +469,41 @@ async def api_partial_dispense( raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/retry", + response_model=DcaSettlement, +) +async def api_retry_settlement( + settlement_id: str, user: User = Depends(check_user_exists) +) -> DcaSettlement: + """Operator retry path for an errored settlement. + + Voids any failed legs (completed legs are NEVER re-paid — Lightning + sats already moved) and flips status 'errored' → 'pending', then + re-invokes process_settlement. The optimistic-lock claim guards + against a concurrent listener re-fire racing this retry.""" + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + if settlement.status != "errored": + raise HTTPException( + HTTPStatus.BAD_REQUEST, + f"settlement status must be 'errored' to retry " + f"(currently '{settlement.status}')", + ) + updated = await reset_settlement_for_retry(settlement_id) + if updated is None or updated.status != "pending": + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, "failed to reset settlement" + ) + await process_settlement(settlement_id) + after = await get_settlement(settlement_id) + return after if after is not None else updated + + @satmachineadmin_api_router.post( "/api/v1/dca/settlements/{settlement_id}/notes", response_model=DcaSettlement,