feat(v2): abandoned-tx queue + force-reset for stuck settlements (P3f)
Completes the P3 operator-UX cluster. Surfaces settlements that didn't
process cleanly as a queryable worklist so operators can investigate +
retry without scanning the full settlement history.
New endpoints:
GET /api/v1/dca/settlements/stuck?threshold_minutes=30
Returns StuckSettlementsResponse with three buckets:
- errored: distribution failed; existing /retry endpoint handles
- stuck_pending: landed but never picked up (listener crashed
before invoking process_settlement)
- stuck_processing: claim taken but no completion in N minutes;
processor crashed mid-flight, processing_claim is set but no
terminal state landed
POST /api/v1/dca/settlements/{id}/force-reset
Operator escape hatch for genuinely stuck settlements. Flips
'pending'/'processing' → 'errored' so the /retry endpoint can take
over. Refuses unless the settlement is older than threshold_minutes
(default 30) so operators can't accidentally interrupt a
slow-but-running settlement. Age check uses created_at as proxy.
CRUD:
- get_stuck_settlements_for_operator(uid, threshold_minutes) joins
dca_settlements → dca_machines and returns the three lists
scoped per operator. No age filter on 'errored' (operators always
want to see those); age filter applies to 'pending'/'processing'.
- force_reset_stuck_settlement(id) UPDATEs 'pending'/'processing' to
'errored', clears processing_claim, sets a marker error_message.
The retry endpoint shipped in fix bundle 1 (commit 3ede66f) is the
intended downstream — operator sees stuck-processing row, hits force-
reset (flips to errored), then hits retry (flips to pending, voids
failed legs, re-runs process_settlement via the claim path).
34 routes registered. 72/72 tests pass.
Refs: aiolabs/satmachineadmin#9 — completes P3 operator-UX cluster
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
3ede66ff92
commit
578f2c142d
3 changed files with 191 additions and 0 deletions
84
crud.py
84
crud.py
|
|
@ -506,6 +506,90 @@ async def get_settlements_for_machine(
|
|||
)
|
||||
|
||||
|
||||
async def get_stuck_settlements_for_operator(
|
||||
operator_user_id: str, threshold_minutes: int = 30
|
||||
) -> dict:
|
||||
"""Operator worklist of settlements that didn't process cleanly.
|
||||
|
||||
Returns a dict with three keyed lists:
|
||||
- 'errored': any status='errored' for this operator (no age filter —
|
||||
operators always want to see these)
|
||||
- 'stuck_pending': status='pending' AND older than threshold (listener
|
||||
crashed before invoking process_settlement)
|
||||
- 'stuck_processing': status='processing' AND older than threshold
|
||||
(processor crashed mid-flight; processing_claim is set but no
|
||||
completion landed)
|
||||
"""
|
||||
from datetime import timedelta
|
||||
|
||||
threshold_at = datetime.now() - timedelta(minutes=threshold_minutes)
|
||||
errored = await db.fetchall(
|
||||
"""
|
||||
SELECT s.*
|
||||
FROM satoshimachine.dca_settlements s
|
||||
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
||||
WHERE m.operator_user_id = :uid AND s.status = 'errored'
|
||||
ORDER BY s.created_at DESC
|
||||
""",
|
||||
{"uid": operator_user_id},
|
||||
DcaSettlement,
|
||||
)
|
||||
stuck_pending = await db.fetchall(
|
||||
"""
|
||||
SELECT s.*
|
||||
FROM satoshimachine.dca_settlements s
|
||||
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
||||
WHERE m.operator_user_id = :uid
|
||||
AND s.status = 'pending'
|
||||
AND s.created_at < :threshold
|
||||
ORDER BY s.created_at ASC
|
||||
""",
|
||||
{"uid": operator_user_id, "threshold": threshold_at},
|
||||
DcaSettlement,
|
||||
)
|
||||
stuck_processing = await db.fetchall(
|
||||
"""
|
||||
SELECT s.*
|
||||
FROM satoshimachine.dca_settlements s
|
||||
JOIN satoshimachine.dca_machines m ON m.id = s.machine_id
|
||||
WHERE m.operator_user_id = :uid
|
||||
AND s.status = 'processing'
|
||||
AND s.created_at < :threshold
|
||||
ORDER BY s.created_at ASC
|
||||
""",
|
||||
{"uid": operator_user_id, "threshold": threshold_at},
|
||||
DcaSettlement,
|
||||
)
|
||||
return {
|
||||
"errored": errored,
|
||||
"stuck_pending": stuck_pending,
|
||||
"stuck_processing": stuck_processing,
|
||||
}
|
||||
|
||||
|
||||
async def force_reset_stuck_settlement(
|
||||
settlement_id: str,
|
||||
) -> Optional[DcaSettlement]:
|
||||
"""Operator escape hatch for genuinely stuck settlements (processor
|
||||
crashed mid-flight, etc.). Flips 'pending'/'processing' → 'errored' so
|
||||
the existing retry endpoint can take over. Clears processing_claim.
|
||||
|
||||
Caller is responsible for verifying the settlement is *actually* stuck
|
||||
(e.g., via threshold check on created_at). This function trusts the
|
||||
decision."""
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE satoshimachine.dca_settlements
|
||||
SET status = 'errored',
|
||||
processing_claim = NULL,
|
||||
error_message = 'force-reset by operator (was stuck)'
|
||||
WHERE id = :id AND status IN ('pending', 'processing')
|
||||
""",
|
||||
{"id": settlement_id},
|
||||
)
|
||||
return await get_settlement(settlement_id)
|
||||
|
||||
|
||||
async def get_settlements_for_operator(
|
||||
operator_user_id: str, limit: int = 200
|
||||
) -> List[DcaSettlement]:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue