diff --git a/crud.py b/crud.py index dbbe631..1e6780e 100644 --- a/crud.py +++ b/crud.py @@ -506,6 +506,90 @@ async def get_settlements_for_machine( ) +async def get_stuck_settlements_for_operator( + operator_user_id: str, threshold_minutes: int = 30 +) -> dict: + """Operator worklist of settlements that didn't process cleanly. + + Returns a dict with three keyed lists: + - 'errored': any status='errored' for this operator (no age filter — + operators always want to see these) + - 'stuck_pending': status='pending' AND older than threshold (listener + crashed before invoking process_settlement) + - 'stuck_processing': status='processing' AND older than threshold + (processor crashed mid-flight; processing_claim is set but no + completion landed) + """ + from datetime import timedelta + + threshold_at = datetime.now() - timedelta(minutes=threshold_minutes) + errored = await db.fetchall( + """ + SELECT s.* + FROM satoshimachine.dca_settlements s + JOIN satoshimachine.dca_machines m ON m.id = s.machine_id + WHERE m.operator_user_id = :uid AND s.status = 'errored' + ORDER BY s.created_at DESC + """, + {"uid": operator_user_id}, + DcaSettlement, + ) + stuck_pending = await db.fetchall( + """ + SELECT s.* + FROM satoshimachine.dca_settlements s + JOIN satoshimachine.dca_machines m ON m.id = s.machine_id + WHERE m.operator_user_id = :uid + AND s.status = 'pending' + AND s.created_at < :threshold + ORDER BY s.created_at ASC + """, + {"uid": operator_user_id, "threshold": threshold_at}, + DcaSettlement, + ) + stuck_processing = await db.fetchall( + """ + SELECT s.* + FROM satoshimachine.dca_settlements s + JOIN satoshimachine.dca_machines m ON m.id = s.machine_id + WHERE m.operator_user_id = :uid + AND s.status = 'processing' + AND s.created_at < :threshold + ORDER BY s.created_at ASC + """, + {"uid": operator_user_id, "threshold": threshold_at}, + DcaSettlement, + ) + return { + "errored": errored, + "stuck_pending": stuck_pending, + "stuck_processing": stuck_processing, + } + + +async def force_reset_stuck_settlement( + settlement_id: str, +) -> Optional[DcaSettlement]: + """Operator escape hatch for genuinely stuck settlements (processor + crashed mid-flight, etc.). Flips 'pending'/'processing' → 'errored' so + the existing retry endpoint can take over. Clears processing_claim. + + Caller is responsible for verifying the settlement is *actually* stuck + (e.g., via threshold check on created_at). This function trusts the + decision.""" + await db.execute( + """ + UPDATE satoshimachine.dca_settlements + SET status = 'errored', + processing_claim = NULL, + error_message = 'force-reset by operator (was stuck)' + WHERE id = :id AND status IN ('pending', 'processing') + """, + {"id": settlement_id}, + ) + return await get_settlement(settlement_id) + + async def get_settlements_for_operator( operator_user_id: str, limit: int = 200 ) -> List[DcaSettlement]: diff --git a/models.py b/models.py index c6509f3..1d23a77 100644 --- a/models.py +++ b/models.py @@ -407,6 +407,28 @@ class PartialDispenseData(BaseModel): return v +class StuckSettlementsResponse(BaseModel): + """Operator worklist surfacing settlements that didn't process cleanly. + + Three categories, segregated so the UI can render them with appropriate + affordances (retry / investigate / force-error): + + - errored: distribution failed; one or more legs reported a payment + error. Operator retry endpoint handles these directly. + - stuck_pending: landed but never picked up by the processor (listener + crashed before invoking process_settlement, or the claim was lost). + Older than `threshold_minutes`. + - stuck_processing: claim was taken but no completion in + `threshold_minutes`. The processor likely crashed mid-flight. + Operator can force-recover via POST .../force-reset. + """ + + threshold_minutes: int + errored: list # list[DcaSettlement] + stuck_pending: list + stuck_processing: list + + class AppendSettlementNoteData(BaseModel): """Operator-authored free-form note on a settlement. diff --git a/views_api.py b/views_api.py index 5bd68f5..d60fe15 100644 --- a/views_api.py +++ b/views_api.py @@ -20,6 +20,7 @@ from .crud import ( delete_dca_client, delete_deposit, delete_machine, + force_reset_stuck_settlement, get_client_balance_summary, get_commission_splits, get_dca_client, @@ -35,6 +36,7 @@ from .crud import ( get_settlement, get_settlements_for_machine, get_settlements_for_operator, + get_stuck_settlements_for_operator, get_super_config, replace_commission_splits, reset_settlement_for_retry, @@ -64,6 +66,7 @@ from .models import ( PartialDispenseData, SetCommissionSplitsData, SettleBalanceData, + StuckSettlementsResponse, SuperConfig, UpdateDcaClientData, UpdateDepositData, @@ -469,6 +472,88 @@ async def api_partial_dispense( raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc +@satmachineadmin_api_router.get( + "/api/v1/dca/settlements/stuck", response_model=StuckSettlementsResponse +) +async def api_list_stuck_settlements( + threshold_minutes: int = 30, + user: User = Depends(check_user_exists), +) -> StuckSettlementsResponse: + """Operator worklist of settlements that didn't process cleanly. + + Returns three lists: + - errored: distribution failed; retry endpoint handles these + - stuck_pending: landed but never picked up by the processor + - stuck_processing: claim taken but no completion in N minutes + + `threshold_minutes` controls the age threshold for 'stuck' (default 30). + Operators can force-recover stuck-processing settlements via + POST /api/v1/dca/settlements/{id}/force-reset.""" + if threshold_minutes < 1: + raise HTTPException( + HTTPStatus.BAD_REQUEST, "threshold_minutes must be >= 1" + ) + buckets = await get_stuck_settlements_for_operator(user.id, threshold_minutes) + return StuckSettlementsResponse( + threshold_minutes=threshold_minutes, + errored=buckets["errored"], + stuck_pending=buckets["stuck_pending"], + stuck_processing=buckets["stuck_processing"], + ) + + +@satmachineadmin_api_router.post( + "/api/v1/dca/settlements/{settlement_id}/force-reset", + response_model=DcaSettlement, +) +async def api_force_reset_settlement( + settlement_id: str, + threshold_minutes: int = 30, + user: User = Depends(check_user_exists), +) -> DcaSettlement: + """Operator escape hatch for genuinely stuck settlements (processor + crashed mid-flight, claim never released). Flips status + 'pending'/'processing' → 'errored' so the retry endpoint can take over. + + Refuses unless the settlement is older than `threshold_minutes` so an + operator can't accidentally interrupt a slow-but-running settlement. + Threshold check uses created_at as a proxy — adequate for v1 since the + processor either completes fast or it crashed.""" + from datetime import datetime, timedelta, timezone + + settlement = await get_settlement(settlement_id) + if settlement is None: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + machine = await get_machine(settlement.machine_id) + if machine is None or machine.operator_user_id != user.id: + raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found") + if settlement.status not in ("pending", "processing"): + raise HTTPException( + HTTPStatus.BAD_REQUEST, + f"settlement status must be 'pending' or 'processing' to " + f"force-reset (currently '{settlement.status}')", + ) + # Age check — refuse if settlement is fresh (processor might still + # be running normally). Both sides made timezone-aware before compare. + created = settlement.created_at + if created.tzinfo is None: + created = created.replace(tzinfo=timezone.utc) + age = datetime.now(timezone.utc) - created + if age < timedelta(minutes=threshold_minutes): + raise HTTPException( + HTTPStatus.BAD_REQUEST, + f"settlement is only {age.total_seconds() / 60:.1f} minutes " + f"old (threshold {threshold_minutes}m); refusing to force-reset " + "a possibly-still-running settlement", + ) + updated = await force_reset_stuck_settlement(settlement_id) + if updated is None: + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR, "failed to force-reset" + ) + return updated + + @satmachineadmin_api_router.post( "/api/v1/dca/settlements/{settlement_id}/retry", response_model=DcaSettlement,