fix(v2)(security): wallet IDOR + settlement-processing concurrency
Closes the HIGH-severity security finding from the v2 branch review:
operator A could register a machine pointing at operator B's wallet_id
(or update their machine to do so), then drain B's wallet via the
settlement processor's pay_invoice call. LNbits' pay_invoice doesn't
enforce caller identity at the backend layer — wallet_id is trusted as
the source-of-truth for the source wallet.
Two-layer defence:
1. **API layer.** New _assert_wallet_owned_by helper in views_api.py
refuses any wallet_id from the request body that doesn't resolve to a
wallet owned by the authenticated operator. Applied on
api_create_machine and api_update_machine. Pattern lifted from the
existing api_settle_client_balance which already did this for
funding_wallet_id (260-265 in the original file).
2. **DB layer.** m007 adds a UNIQUE index on dca_machines.wallet_id —
even if a future endpoint forgets the API check, the DB rejects two
rows claiming the same wallet. CREATE UNIQUE INDEX is portable across
SQLite and PostgreSQL (ALTER TABLE ADD CONSTRAINT is not on SQLite).
Same commit also addresses concurrency findings H1+H2+H3 from the
architectural review (race conditions on process_settlement +
no retry path for errored settlements):
- m007 also adds processing_claim TEXT to dca_settlements.
- crud.claim_settlement_for_processing does optimistic-lock via
UPDATE ... SET status='processing', processing_claim=:token
WHERE id=:id AND status='pending' (portable; no UPDATE...RETURNING).
Read-back compares the token; only one concurrent caller wins.
- crud.reset_settlement_for_retry voids failed legs and flips
'errored' → 'pending' so process_settlement re-runs them. Completed
legs are LEFT IN PLACE — we never re-pay sats that already moved.
- crud.mark_settlement_status clears processing_claim on terminal
states so a fresh claim attempt won't see a stale token.
- distribution.process_settlement now uses the claim instead of the
status-read-and-check pattern. Concurrent listener re-fires +
partial-dispense recomputes can't double-pay legs.
- New endpoint:
POST /api/v1/dca/settlements/{id}/retry (operator-scoped)
Refuses if status != 'errored' (400). Resets, then re-runs
process_settlement via the claim path.
DcaSettlement gains a processing_claim: Optional[str] field. Visible to
operators in settlement detail; stale claims (status='processing' for
many minutes) are a "processor crashed mid-flight" signal — operator
can manually mark errored + retry.
32 routes registered. 72/72 tests pass.
Refs: aiolabs/satmachineadmin#9 — closes the v2-branch security finding
and HIGH-priority concurrency findings from the internal review.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d0a947b7e6
commit
3ede66ff92
5 changed files with 169 additions and 7 deletions
66
crud.py
66
crud.py
|
|
@ -528,7 +528,9 @@ async def mark_settlement_status(
|
|||
status: str,
|
||||
error_message: Optional[str] = None,
|
||||
) -> Optional[DcaSettlement]:
|
||||
"""Status: 'pending' | 'processed' | 'partial' | 'refunded' | 'errored'."""
|
||||
"""Status: 'pending' | 'processing' | 'processed' | 'partial' |
|
||||
'refunded' | 'errored'. Clears processing_claim on terminal states so a
|
||||
fresh claim attempt won't see a stale token."""
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE satoshimachine.dca_settlements
|
||||
|
|
@ -537,6 +539,10 @@ async def mark_settlement_status(
|
|||
processed_at = CASE
|
||||
WHEN :status IN ('processed', 'partial', 'refunded')
|
||||
THEN :now ELSE processed_at
|
||||
END,
|
||||
processing_claim = CASE
|
||||
WHEN :status = 'processing' THEN processing_claim
|
||||
ELSE NULL
|
||||
END
|
||||
WHERE id = :id
|
||||
""",
|
||||
|
|
@ -550,6 +556,64 @@ async def mark_settlement_status(
|
|||
return await get_settlement(settlement_id)
|
||||
|
||||
|
||||
async def claim_settlement_for_processing(
|
||||
settlement_id: str,
|
||||
) -> Optional[DcaSettlement]:
|
||||
"""Optimistic-lock claim: atomically flip a settlement to 'processing'
|
||||
and tag it with a per-invocation token. Returns the claimed row on
|
||||
success; None if another caller already won the claim or the settlement
|
||||
is not in a claimable state ('pending').
|
||||
|
||||
Pattern is portable across SQLite + PostgreSQL (doesn't rely on
|
||||
UPDATE ... RETURNING). Two concurrent invocations may both run the
|
||||
UPDATE, but only one row matches the WHERE clause; the loser's UPDATE
|
||||
is a no-op against status='processing'. The read-back check on the
|
||||
token disambiguates."""
|
||||
token = urlsafe_short_hash()
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE satoshimachine.dca_settlements
|
||||
SET status = 'processing', processing_claim = :token
|
||||
WHERE id = :id AND status = 'pending'
|
||||
""",
|
||||
{"id": settlement_id, "token": token},
|
||||
)
|
||||
after = await get_settlement(settlement_id)
|
||||
if after is None:
|
||||
return None
|
||||
if after.processing_claim != token:
|
||||
return None
|
||||
return after
|
||||
|
||||
|
||||
async def reset_settlement_for_retry(
|
||||
settlement_id: str,
|
||||
) -> Optional[DcaSettlement]:
|
||||
"""Operator retry path. Flips 'errored' → 'pending' and voids any
|
||||
'failed' legs so process_settlement re-runs them fresh. Completed legs
|
||||
are left in place — we never re-pay sats that already moved."""
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE satoshimachine.dca_payments
|
||||
SET status = 'voided'
|
||||
WHERE settlement_id = :sid AND status = 'failed'
|
||||
""",
|
||||
{"sid": settlement_id},
|
||||
)
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE satoshimachine.dca_settlements
|
||||
SET status = 'pending',
|
||||
error_message = NULL,
|
||||
processing_claim = NULL,
|
||||
processed_at = NULL
|
||||
WHERE id = :id AND status = 'errored'
|
||||
""",
|
||||
{"id": settlement_id},
|
||||
)
|
||||
return await get_settlement(settlement_id)
|
||||
|
||||
|
||||
async def apply_partial_dispense(
|
||||
settlement_id: str,
|
||||
*,
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ from .calculations import (
|
|||
)
|
||||
from .crud import (
|
||||
apply_partial_dispense,
|
||||
claim_settlement_for_processing,
|
||||
count_completed_legs_for_settlement,
|
||||
create_dca_payment,
|
||||
get_client_balance_summary,
|
||||
|
|
@ -297,13 +298,22 @@ async def apply_partial_dispense_and_redistribute(
|
|||
|
||||
|
||||
async def process_settlement(settlement_id: str) -> None:
|
||||
"""Process a pending settlement end-to-end. Safe to invoke multiple
|
||||
times — the status='processed' guard skips already-processed rows."""
|
||||
settlement = await get_settlement(settlement_id)
|
||||
"""Process a pending settlement end-to-end.
|
||||
|
||||
Concurrency-safe: an optimistic-lock claim flips the settlement to
|
||||
'processing' atomically and tags it with a per-invocation token.
|
||||
Concurrent invocations on the same id can't both win — losers see the
|
||||
claim mismatch on read-back and return without writing any legs.
|
||||
Retries land via reset_settlement_for_retry which voids failed legs
|
||||
and flips 'errored' back to 'pending'."""
|
||||
settlement = await claim_settlement_for_processing(settlement_id)
|
||||
if settlement is None:
|
||||
logger.warning(f"distribution: settlement {settlement_id} not found")
|
||||
return
|
||||
if settlement.status != "pending":
|
||||
# Either already claimed by a concurrent invocation, or not in a
|
||||
# 'pending' state. Either way, nothing to do here.
|
||||
logger.debug(
|
||||
f"distribution: skip {settlement_id} — not claimable (already "
|
||||
"processing or not pending)"
|
||||
)
|
||||
return
|
||||
machine = await get_machine(settlement.machine_id)
|
||||
if machine is None:
|
||||
|
|
|
|||
|
|
@ -446,3 +446,33 @@ async def m006_add_settlement_notes(db):
|
|||
await db.execute(
|
||||
"ALTER TABLE satoshimachine.dca_settlements ADD COLUMN notes TEXT"
|
||||
)
|
||||
|
||||
|
||||
async def m007_settlement_claim_and_machine_wallet_unique(db):
|
||||
"""Security + concurrency hardening (fix bundle 1).
|
||||
|
||||
1. Adds `processing_claim` to dca_settlements. The settlement processor
|
||||
uses an optimistic-lock pattern: write a per-invocation claim token
|
||||
alongside the status='processing' flip, then re-read and confirm the
|
||||
persisted token matches. Two concurrent process_settlement invocations
|
||||
on the same id can't both win the claim, so no duplicate leg
|
||||
creation / double-pay.
|
||||
|
||||
2. Adds a UNIQUE index on dca_machines.wallet_id so two machine rows
|
||||
can never claim the same wallet. Closes a wallet-IDOR funds-theft
|
||||
vector where operator A could register a machine on operator B's
|
||||
wallet_id and drain it via the settlement processor's pay_invoice.
|
||||
Defence-in-depth on top of the API-layer ownership check; if a future
|
||||
endpoint forgets the check, the DB still rejects.
|
||||
|
||||
CREATE UNIQUE INDEX is portable across SQLite and PostgreSQL
|
||||
(ALTER TABLE ADD CONSTRAINT is not on SQLite).
|
||||
"""
|
||||
await db.execute(
|
||||
"ALTER TABLE satoshimachine.dca_settlements "
|
||||
"ADD COLUMN processing_claim TEXT"
|
||||
)
|
||||
await db.execute(
|
||||
"CREATE UNIQUE INDEX dca_machines_wallet_id_uq "
|
||||
"ON satoshimachine.dca_machines (wallet_id)"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -229,6 +229,11 @@ class DcaSettlement(BaseModel):
|
|||
# entry timestamped + records original values so the overwrite is
|
||||
# auditable from the settlement detail view alone. Never edited in place.
|
||||
notes: Optional[str] = None
|
||||
# Optimistic-lock claim token written when status flips to 'processing'.
|
||||
# Two concurrent process_settlement invocations can't both win the claim
|
||||
# (only one matching read-back). Cleared back to NULL when the leg-
|
||||
# writing pass completes (status='processed' or 'errored').
|
||||
processing_claim: Optional[str] = None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
|
|
|||
53
views_api.py
53
views_api.py
|
|
@ -37,6 +37,7 @@ from .crud import (
|
|||
get_settlements_for_operator,
|
||||
get_super_config,
|
||||
replace_commission_splits,
|
||||
reset_settlement_for_retry,
|
||||
update_dca_client,
|
||||
update_deposit,
|
||||
update_deposit_status,
|
||||
|
|
@ -45,6 +46,7 @@ from .crud import (
|
|||
)
|
||||
from .distribution import (
|
||||
apply_partial_dispense_and_redistribute,
|
||||
process_settlement,
|
||||
settle_lp_balance,
|
||||
)
|
||||
from .models import (
|
||||
|
|
@ -73,6 +75,19 @@ from .models import (
|
|||
satmachineadmin_api_router = APIRouter()
|
||||
|
||||
|
||||
async def _assert_wallet_owned_by(wallet_id: str, user_id: str) -> None:
|
||||
"""Defence-in-depth: refuse to bind any DB row to a wallet the caller
|
||||
doesn't own. Used on every endpoint that accepts a wallet_id from the
|
||||
request body. The DB-side UNIQUE on dca_machines.wallet_id (m007) is a
|
||||
second line of defence; this check is the primary gate."""
|
||||
wallet = await get_wallet(wallet_id)
|
||||
if wallet is None or wallet.user != user_id:
|
||||
raise HTTPException(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"wallet_id is not owned by the authenticated operator",
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Machines
|
||||
# =============================================================================
|
||||
|
|
@ -82,6 +97,7 @@ satmachineadmin_api_router = APIRouter()
|
|||
async def api_create_machine(
|
||||
data: CreateMachineData, user: User = Depends(check_user_exists)
|
||||
) -> Machine:
|
||||
await _assert_wallet_owned_by(data.wallet_id, user.id)
|
||||
return await create_machine(user.id, data)
|
||||
|
||||
|
||||
|
|
@ -117,6 +133,8 @@ async def api_update_machine(
|
|||
machine = await get_machine(machine_id)
|
||||
if machine is None or machine.operator_user_id != user.id:
|
||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
||||
if data.wallet_id is not None:
|
||||
await _assert_wallet_owned_by(data.wallet_id, user.id)
|
||||
updated = await update_machine(machine_id, data)
|
||||
if updated is None:
|
||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
||||
|
|
@ -451,6 +469,41 @@ async def api_partial_dispense(
|
|||
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
|
||||
|
||||
|
||||
@satmachineadmin_api_router.post(
|
||||
"/api/v1/dca/settlements/{settlement_id}/retry",
|
||||
response_model=DcaSettlement,
|
||||
)
|
||||
async def api_retry_settlement(
|
||||
settlement_id: str, user: User = Depends(check_user_exists)
|
||||
) -> DcaSettlement:
|
||||
"""Operator retry path for an errored settlement.
|
||||
|
||||
Voids any failed legs (completed legs are NEVER re-paid — Lightning
|
||||
sats already moved) and flips status 'errored' → 'pending', then
|
||||
re-invokes process_settlement. The optimistic-lock claim guards
|
||||
against a concurrent listener re-fire racing this retry."""
|
||||
settlement = await get_settlement(settlement_id)
|
||||
if settlement is None:
|
||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
||||
machine = await get_machine(settlement.machine_id)
|
||||
if machine is None or machine.operator_user_id != user.id:
|
||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
||||
if settlement.status != "errored":
|
||||
raise HTTPException(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"settlement status must be 'errored' to retry "
|
||||
f"(currently '{settlement.status}')",
|
||||
)
|
||||
updated = await reset_settlement_for_retry(settlement_id)
|
||||
if updated is None or updated.status != "pending":
|
||||
raise HTTPException(
|
||||
HTTPStatus.INTERNAL_SERVER_ERROR, "failed to reset settlement"
|
||||
)
|
||||
await process_settlement(settlement_id)
|
||||
after = await get_settlement(settlement_id)
|
||||
return after if after is not None else updated
|
||||
|
||||
|
||||
@satmachineadmin_api_router.post(
|
||||
"/api/v1/dca/settlements/{settlement_id}/notes",
|
||||
response_model=DcaSettlement,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue