Structural half of S8 (aiolabs/satmachineadmin#22). Listener now accepts BOTH inbound and outbound payments instead of filtering on `is_in=True`; distribution gates the DCA leg on tx_type so the liquidity-flow direction at the ATM drives behaviour, not the Lightning protocol direction at the operator's wallet. tasks.py: - Drop the `if not payment.is_in` pre-filter; keep `payment.success`. - Pair-name the two axes (`is_lightning_inbound`/`_outbound` for protocol vs `tx_type ∈ {cash_out, cash_in}` for business) per the naming-inversion memory. - Outbound payments need `extra.source == "bitspire"` before we touch them — without it we can't tell the operator paying their landlord from a cash-in settlement; skip silently. - Cross-axis sanity gate: refuse to process when protocol direction disagrees with business direction (cash_out must be inbound, cash_in must be outbound). Catches a buggy/malicious upstream stamping `type=cash_out` on an outbound payment. distribution.py: - Gate `_pay_dca_distributions` on `tx_type == "cash_out"`. Cash-in liquidity stays in the operator's wallet — there's no LP share to distribute. Skipped leg is written as an audit row via `_record_skipped_leg` so the dashboard surfaces "DCA intentionally skipped" instead of a phantom missing leg. Still pending in S8: the UI marker (cash_in tx_type chip in the operator settlements table) and end-to-end test against a real LNURL-withdraw redemption. Tests: 75 passed (no regression vs prior green state; `test_router` remains a pre-existing pytest-asyncio plugin issue). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
874 lines
32 KiB
Python
874 lines
32 KiB
Python
# Satoshi Machine v2 — settlement distribution (P2).
|
|
#
|
|
# Picks up a dca_settlements row with status='pending' and pays out the
|
|
# three leg groups via LNbits internal transfers (create_invoice +
|
|
# pay_invoice on the same instance auto-detect internal). All legs land
|
|
# in dca_payments with the appropriate leg_type discriminator and inherit
|
|
# the Payment.tag "satmachine:{machine_npub}" so LNbits payment-history
|
|
# filters work natively.
|
|
#
|
|
# Leg order:
|
|
# 1. super_fee — platform_fee_sats → super_fee_wallet_id (if set)
|
|
# 2. operator_split — operator_fee_sats split per operator's rules
|
|
# 3. dca — principal_sats distributed proportionally to active LPs,
|
|
# each leg capped at the LP's remaining fiat balance
|
|
# (preserves the v1 sync-mismatch fix from PR #2)
|
|
#
|
|
# Atomicity: LN payments cannot be rolled back. We attempt each leg, record
|
|
# success/failure per dca_payments row, and mark the settlement 'processed'
|
|
# only when every leg completed. Any failure marks 'errored' with a message
|
|
# but leaves the successful legs in place. Sats that don't get paid out
|
|
# (failed legs, no LP coverage, missing super wallet) remain in the
|
|
# machine's wallet — visible to the operator on the dashboard.
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional
|
|
|
|
from lnbits.core.services import create_invoice, pay_invoice
|
|
from lnbits.core.services.lnurl import get_pr_from_lnurl
|
|
from lnurl import LnAddress
|
|
from loguru import logger
|
|
|
|
from .calculations import (
|
|
allocate_operator_split_legs,
|
|
calculate_distribution,
|
|
)
|
|
from .crud import (
|
|
apply_partial_dispense,
|
|
claim_settlement_for_processing,
|
|
count_completed_legs_for_settlement,
|
|
create_dca_payment,
|
|
get_client_balance_summary,
|
|
get_dca_lp,
|
|
get_effective_commission_splits,
|
|
get_flow_mode_clients_for_machine,
|
|
get_machine,
|
|
get_settlement,
|
|
get_super_config,
|
|
mark_settlement_status,
|
|
update_payment_status,
|
|
void_open_legs_for_settlement,
|
|
)
|
|
from .models import (
|
|
CreateDcaPaymentData,
|
|
DcaClient,
|
|
DcaLpPreferences,
|
|
DcaPayment,
|
|
DcaSettlement,
|
|
Machine,
|
|
PartialDispenseData,
|
|
SettleBalanceData,
|
|
SuperConfig,
|
|
)
|
|
|
|
PAYMENT_TAG_PREFIX = "satmachine"
|
|
|
|
|
|
def _payment_tag(machine: Machine) -> str:
|
|
return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}"
|
|
|
|
|
|
async def _record_skipped_leg(
|
|
settlement: DcaSettlement,
|
|
machine: Machine,
|
|
leg_type: str,
|
|
amount_sats: int,
|
|
reason: str,
|
|
client_id: str | None = None,
|
|
) -> None:
|
|
"""Audit row for sats intentionally left in the machine wallet.
|
|
|
|
Distinct from 'failed' (which means pay_invoice errored). 'skipped' means
|
|
we never attempted the pay — by design, because some prerequisite was
|
|
missing (super wallet not configured, no operator ruleset, no exchange
|
|
rate, no eligible LPs). Operator sees these in payment history and on
|
|
the settlement detail blob; the audit trail explains where un-paid
|
|
sats are sitting.
|
|
"""
|
|
if amount_sats <= 0:
|
|
return
|
|
leg = await create_dca_payment(
|
|
CreateDcaPaymentData(
|
|
settlement_id=settlement.id,
|
|
client_id=client_id,
|
|
machine_id=machine.id,
|
|
operator_user_id=machine.operator_user_id,
|
|
leg_type=leg_type,
|
|
destination_wallet_id=None,
|
|
destination_ln_address=None,
|
|
amount_sats=amount_sats,
|
|
amount_fiat=None,
|
|
exchange_rate=None,
|
|
transaction_time=datetime.now(timezone.utc),
|
|
external_payment_hash=None,
|
|
)
|
|
)
|
|
await update_payment_status(leg.id, "skipped", None, reason[:512])
|
|
logger.info(
|
|
f"distribution: skipped {leg_type} leg " f"({amount_sats} sats) — {reason}"
|
|
)
|
|
|
|
|
|
def _resolve_partial_dispense_wire(
|
|
settlement: DcaSettlement, data: PartialDispenseData
|
|
) -> int:
|
|
if data.dispensed_sats is not None:
|
|
new_wire = int(data.dispensed_sats)
|
|
elif data.dispensed_fraction is not None:
|
|
new_wire = round(settlement.wire_sats * float(data.dispensed_fraction))
|
|
else:
|
|
raise ValueError("provide one of dispensed_sats or dispensed_fraction")
|
|
if new_wire < 0:
|
|
raise ValueError("partial dispense cannot be negative")
|
|
if new_wire > settlement.wire_sats:
|
|
raise ValueError(
|
|
f"partial dispense ({new_wire} sats) cannot exceed the original "
|
|
f"wire amount ({settlement.wire_sats} sats)"
|
|
)
|
|
return new_wire
|
|
|
|
|
|
def _build_partial_dispense_memo(
|
|
settlement: DcaSettlement,
|
|
data: PartialDispenseData,
|
|
*,
|
|
new_wire: int,
|
|
new_principal: int,
|
|
new_fee: int,
|
|
new_platform: int,
|
|
new_operator: int,
|
|
) -> str:
|
|
reason = (data.notes or "").strip() or "(no reason given)"
|
|
if data.dispensed_sats is not None:
|
|
adjust = f"dispensed_sats={data.dispensed_sats}"
|
|
else:
|
|
adjust = f"dispensed_fraction={data.dispensed_fraction}"
|
|
ts = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
return (
|
|
f"[{ts}] partial dispense applied — {adjust}. "
|
|
f"Original wire={settlement.wire_sats} "
|
|
f"principal={settlement.principal_sats} "
|
|
f"fee={settlement.fee_sats} "
|
|
f"(super_fee={settlement.platform_fee_sats} "
|
|
f"operator_fee={settlement.operator_fee_sats}). "
|
|
f"New wire={new_wire} principal={new_principal} "
|
|
f"fee={new_fee} "
|
|
f"(super_fee={new_platform} operator_fee={new_operator}). "
|
|
f"Reason: {reason}"
|
|
)
|
|
|
|
|
|
async def settle_lp_balance(
|
|
client: DcaClient, machine: Machine, data: SettleBalanceData
|
|
) -> DcaPayment:
|
|
"""Operator UX action — closes satmachineadmin#4.
|
|
|
|
Settle an LP's remaining fiat balance from the operator's chosen funding
|
|
wallet at the rate the operator specified. Records a leg_type='settlement'
|
|
row that counts against the LP's balance summary (so a subsequent
|
|
get_client_balance_summary reflects the new zero/reduced balance).
|
|
|
|
Caller is responsible for verifying the operator owns both the client's
|
|
machine and the funding wallet (API endpoint does this). The amount_fiat
|
|
is capped at the LP's remaining balance — operators cannot accidentally
|
|
over-pay via this path.
|
|
|
|
The destination wallet is the LP's own `dca_lp.dca_wallet_id` — the
|
|
operator can't redirect this; if the LP hasn't onboarded yet there's
|
|
no destination and we refuse.
|
|
"""
|
|
prefs = await get_dca_lp(client.user_id)
|
|
if prefs is None:
|
|
raise ValueError(
|
|
f"client {client.id} (user {client.user_id[:8]}...) has not "
|
|
f"onboarded via satmachineclient — no DCA wallet configured"
|
|
)
|
|
summary = await get_client_balance_summary(client.id)
|
|
if summary is None:
|
|
raise ValueError(f"client {client.id} balance not available")
|
|
remaining = float(summary.remaining_balance)
|
|
if remaining <= 0:
|
|
raise ValueError(f"client {client.id} has no remaining balance to settle")
|
|
|
|
# Resolve fiat amount: explicit if given (capped at remaining), else full.
|
|
requested = float(data.amount_fiat) if data.amount_fiat is not None else remaining
|
|
amount_fiat = round(min(requested, remaining), 2)
|
|
if amount_fiat <= 0:
|
|
raise ValueError("computed settlement amount is zero")
|
|
|
|
exchange_rate = float(data.exchange_rate)
|
|
amount_sats = round(amount_fiat * exchange_rate)
|
|
if amount_sats <= 0:
|
|
raise ValueError(
|
|
f"computed sat amount is zero (amount_fiat={amount_fiat}, "
|
|
f"exchange_rate={exchange_rate})"
|
|
)
|
|
|
|
reason = (data.notes or "").strip() or "(no reason given)"
|
|
memo = (
|
|
f"satmachine balance settle — {amount_fiat:.2f} "
|
|
f"{machine.fiat_code} @ {exchange_rate:g} sat/{machine.fiat_code} "
|
|
f"= {amount_sats} sats. Reason: {reason}"
|
|
)
|
|
|
|
leg_row = await create_dca_payment(
|
|
CreateDcaPaymentData(
|
|
settlement_id=None,
|
|
client_id=client.id,
|
|
machine_id=machine.id,
|
|
operator_user_id=machine.operator_user_id,
|
|
leg_type="settlement",
|
|
destination_wallet_id=prefs.dca_wallet_id,
|
|
destination_ln_address=None,
|
|
amount_sats=amount_sats,
|
|
amount_fiat=amount_fiat,
|
|
exchange_rate=exchange_rate,
|
|
transaction_time=datetime.now(timezone.utc),
|
|
external_payment_hash=None,
|
|
)
|
|
)
|
|
extra = {
|
|
"satmachine_leg": "settlement",
|
|
"satmachine_client_id": client.id,
|
|
"satmachine_machine_npub": machine.machine_npub,
|
|
"satmachine_exchange_rate": exchange_rate,
|
|
}
|
|
try:
|
|
new_invoice = await create_invoice(
|
|
wallet_id=prefs.dca_wallet_id,
|
|
amount=float(amount_sats),
|
|
internal=True,
|
|
memo=memo,
|
|
extra=extra,
|
|
)
|
|
if not new_invoice or not new_invoice.bolt11:
|
|
await update_payment_status(
|
|
leg_row.id, "failed", None, "create_invoice returned empty"
|
|
)
|
|
raise ValueError("create_invoice returned empty")
|
|
paid = await pay_invoice(
|
|
wallet_id=data.funding_wallet_id,
|
|
payment_request=new_invoice.bolt11,
|
|
description=memo,
|
|
tag=_payment_tag(machine),
|
|
extra=extra,
|
|
)
|
|
completed = await update_payment_status(
|
|
leg_row.id, "completed", paid.payment_hash, None
|
|
)
|
|
return completed if completed is not None else leg_row
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"distribution: balance-settle failed for client {client.id} "
|
|
f"({amount_sats} sats from wallet {data.funding_wallet_id}): {exc}"
|
|
)
|
|
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
|
|
raise
|
|
|
|
|
|
async def apply_partial_dispense_and_redistribute(
|
|
settlement_id: str, data: PartialDispenseData
|
|
) -> DcaSettlement:
|
|
"""Operator UX action — closes satmachineadmin#3.
|
|
|
|
When a bitSpire dispense fails mid-transaction (e.g., dispenser jam after
|
|
6 of 10 bills), the operator confirms the actual amount dispensed and we
|
|
re-allocate the split against that partial wire amount. Sat amounts scale
|
|
linearly, preserving the original commission ratio exactly. The two-stage
|
|
super/operator split also scales by the *original* platform_fee_sats /
|
|
fee_sats ratio rather than re-reading current super_fee_fraction —
|
|
this honors the "absolute fields are the source of truth" invariant
|
|
even when super has changed the global rate since the settlement landed
|
|
(closes #11 H6).
|
|
|
|
Hard guard: refuses if any dca_payments leg has already completed.
|
|
Lightning payments can't be clawed back, so we won't try.
|
|
|
|
Side effects:
|
|
- Voids pending/failed legs (status → 'voided').
|
|
- Overwrites the settlement's monetary fields with the new totals.
|
|
- Appends a timestamped memo to settlement.notes capturing the
|
|
original values + operator's reason.
|
|
- Resets settlement.status to 'pending' and triggers process_settlement.
|
|
"""
|
|
settlement = await get_settlement(settlement_id)
|
|
if settlement is None:
|
|
raise ValueError(f"settlement {settlement_id} not found")
|
|
if settlement.wire_sats <= 0:
|
|
raise ValueError("cannot partial-dispense a zero-wire settlement")
|
|
completed = await count_completed_legs_for_settlement(settlement_id)
|
|
if completed > 0:
|
|
raise ValueError(
|
|
f"cannot partial-dispense: {completed} leg(s) already completed "
|
|
"(Lightning payments can't be clawed back)"
|
|
)
|
|
|
|
new_wire = _resolve_partial_dispense_wire(settlement, data)
|
|
# Linear scale preserves the original commission ratio exactly.
|
|
scale = new_wire / settlement.wire_sats
|
|
new_fee = round(settlement.fee_sats * scale)
|
|
new_principal = new_wire - new_fee
|
|
new_fiat = round(float(settlement.fiat_amount) * scale, 2)
|
|
|
|
# Re-derive the stage-1 split from the ORIGINAL ratio stored on this
|
|
# settlement row — NOT the current super_fee_fraction. The contract was
|
|
# locked at landing; super raising or lowering the global rate after
|
|
# the fact must not retroactively change this transaction's share.
|
|
# Operator absorbs the rounding remainder so platform + operator
|
|
# == new_fee exactly.
|
|
if settlement.fee_sats > 0:
|
|
ratio = settlement.platform_fee_sats / settlement.fee_sats
|
|
else:
|
|
ratio = 0.0
|
|
new_platform = round(new_fee * ratio)
|
|
new_platform = max(0, min(new_platform, new_fee))
|
|
new_operator = new_fee - new_platform
|
|
|
|
memo = _build_partial_dispense_memo(
|
|
settlement,
|
|
data,
|
|
new_wire=new_wire,
|
|
new_principal=new_principal,
|
|
new_fee=new_fee,
|
|
new_platform=new_platform,
|
|
new_operator=new_operator,
|
|
)
|
|
|
|
await void_open_legs_for_settlement(settlement_id)
|
|
updated = await apply_partial_dispense(
|
|
settlement_id,
|
|
new_wire_sats=new_wire,
|
|
new_principal_sats=new_principal,
|
|
new_fee_sats=new_fee,
|
|
new_platform_fee_sats=new_platform,
|
|
new_operator_fee_sats=new_operator,
|
|
new_fiat_amount=new_fiat,
|
|
appended_note=memo,
|
|
)
|
|
if updated is None:
|
|
raise ValueError(f"settlement {settlement_id} disappeared mid-update")
|
|
|
|
logger.info(
|
|
f"distribution: partial-dispense applied to settlement "
|
|
f"{settlement_id} — re-running distribution"
|
|
)
|
|
await process_settlement(settlement_id)
|
|
after = await get_settlement(settlement_id)
|
|
return after if after is not None else updated
|
|
|
|
|
|
async def process_settlement(settlement_id: str) -> None:
|
|
"""Process a pending settlement end-to-end.
|
|
|
|
Concurrency-safe: an optimistic-lock claim flips the settlement to
|
|
'processing' atomically and tags it with a per-invocation token.
|
|
Concurrent invocations on the same id can't both win — losers see the
|
|
claim mismatch on read-back and return without writing any legs.
|
|
Retries land via reset_settlement_for_retry which voids failed legs
|
|
and flips 'errored' back to 'pending'."""
|
|
settlement = await claim_settlement_for_processing(settlement_id)
|
|
if settlement is None:
|
|
# Either already claimed by a concurrent invocation, or not in a
|
|
# 'pending' state. Either way, nothing to do here.
|
|
logger.debug(
|
|
f"distribution: skip {settlement_id} — not claimable (already "
|
|
"processing or not pending)"
|
|
)
|
|
return
|
|
machine = await get_machine(settlement.machine_id)
|
|
if machine is None:
|
|
logger.error(
|
|
f"distribution: settlement {settlement_id} references missing "
|
|
f"machine {settlement.machine_id}"
|
|
)
|
|
await mark_settlement_status(settlement_id, "errored", "machine missing")
|
|
return
|
|
super_config = await get_super_config()
|
|
errors: List[str] = []
|
|
|
|
try:
|
|
await _pay_super_fee(settlement, machine, super_config, errors)
|
|
await _pay_operator_splits(settlement, machine, errors)
|
|
# DCA distribution: applies to cash_out (LPs share the principal
|
|
# the customer paid into BTC). Does NOT apply to cash_in — that
|
|
# flow is liquidity coming IN to the operator's wallet, not
|
|
# going OUT to LPs. Skip with an audit row so the operator
|
|
# dashboard surfaces "DCA intentionally skipped for cash_in
|
|
# settlement" rather than displaying a phantom missing leg.
|
|
# See aiolabs/satmachineadmin#22 (S8 — wire cash-in path).
|
|
if settlement.tx_type == "cash_out":
|
|
await _pay_dca_distributions(settlement, machine, errors)
|
|
else:
|
|
await _record_skipped_leg(
|
|
settlement,
|
|
machine,
|
|
leg_type="dca",
|
|
amount_sats=settlement.principal_sats,
|
|
reason=(
|
|
f"DCA distribution does not apply to tx_type="
|
|
f"{settlement.tx_type!r}; principal stays in the "
|
|
"operator's wallet as liquidity received from the "
|
|
"cash-in customer."
|
|
),
|
|
)
|
|
except Exception as exc: # last-resort guard
|
|
logger.exception("distribution: unexpected error processing settlement")
|
|
errors.append(f"unexpected: {exc}")
|
|
|
|
if errors:
|
|
await mark_settlement_status(settlement_id, "errored", "; ".join(errors)[:512])
|
|
else:
|
|
await mark_settlement_status(settlement_id, "processed", None)
|
|
|
|
|
|
# =============================================================================
|
|
# Leg 1 — super fee
|
|
# =============================================================================
|
|
|
|
|
|
async def _pay_super_fee(
|
|
settlement: DcaSettlement,
|
|
machine: Machine,
|
|
super_config: SuperConfig | None,
|
|
errors: List[str],
|
|
) -> None:
|
|
if settlement.platform_fee_sats <= 0:
|
|
return
|
|
if super_config is None or not super_config.super_fee_wallet_id:
|
|
# Super has configured a fee but not a destination wallet — leave
|
|
# the sats in the machine wallet and record a skipped audit row.
|
|
# The super needs to configure their wallet before they can collect.
|
|
await _record_skipped_leg(
|
|
settlement,
|
|
machine,
|
|
leg_type="super_fee",
|
|
amount_sats=settlement.platform_fee_sats,
|
|
reason="super_fee_wallet_id not configured by LNbits super",
|
|
)
|
|
return
|
|
await _pay_internal(
|
|
settlement=settlement,
|
|
machine=machine,
|
|
leg_type="super_fee",
|
|
client_id=None,
|
|
destination_wallet_id=super_config.super_fee_wallet_id,
|
|
amount_sats=settlement.platform_fee_sats,
|
|
memo=f"satmachine super fee — {machine.name or machine.machine_npub[:12]}",
|
|
errors=errors,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Leg 2 — operator commission splits
|
|
# =============================================================================
|
|
|
|
|
|
async def _pay_operator_splits(
|
|
settlement: DcaSettlement,
|
|
machine: Machine,
|
|
errors: List[str],
|
|
) -> None:
|
|
if settlement.operator_fee_sats <= 0:
|
|
return
|
|
splits = await get_effective_commission_splits(machine.operator_user_id, machine.id)
|
|
if not splits:
|
|
await _record_skipped_leg(
|
|
settlement,
|
|
machine,
|
|
leg_type="operator_split",
|
|
amount_sats=settlement.operator_fee_sats,
|
|
reason=(
|
|
"operator has no commission_splits ruleset for this machine "
|
|
"(neither per-machine override nor operator default)"
|
|
),
|
|
)
|
|
return
|
|
# Pure allocator handles the rounding rule (last leg absorbs remainder).
|
|
leg_amounts = allocate_operator_split_legs(
|
|
settlement.operator_fee_sats,
|
|
[float(leg.fraction) for leg in splits],
|
|
)
|
|
for idx, (leg, amount) in enumerate(zip(splits, leg_amounts, strict=True)):
|
|
if amount <= 0:
|
|
continue
|
|
label = leg.label or f"split-{idx + 1}"
|
|
memo = (
|
|
f"satmachine operator split — "
|
|
f"{machine.name or machine.machine_npub[:12]} ({label})"
|
|
)
|
|
await _pay_split_leg(
|
|
settlement=settlement,
|
|
machine=machine,
|
|
target=leg.target,
|
|
amount_sats=amount,
|
|
memo=memo,
|
|
errors=errors,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Leg 3 — DCA distribution to active LPs
|
|
# =============================================================================
|
|
|
|
|
|
async def _pay_dca_distributions(
|
|
settlement: DcaSettlement,
|
|
machine: Machine,
|
|
errors: List[str],
|
|
) -> None:
|
|
if settlement.principal_sats <= 0:
|
|
return
|
|
if settlement.exchange_rate <= 0:
|
|
# Fallback path with no exchange rate (bitSpire Payment.extra absent).
|
|
# Without a rate we can't compute fiat balances → can't compute
|
|
# proportional shares → leave principal_sats in the machine wallet
|
|
# for manual reconciliation. Audit row makes the strand visible.
|
|
await _record_skipped_leg(
|
|
settlement,
|
|
machine,
|
|
leg_type="dca",
|
|
amount_sats=settlement.principal_sats,
|
|
reason=(
|
|
"no exchange_rate on settlement (bitSpire fallback path; "
|
|
"see aiolabs/lamassu-next#44)"
|
|
),
|
|
)
|
|
return
|
|
clients = await get_flow_mode_clients_for_machine(machine.id)
|
|
if not clients:
|
|
await _record_skipped_leg(
|
|
settlement,
|
|
machine,
|
|
leg_type="dca",
|
|
amount_sats=settlement.principal_sats,
|
|
reason="no active flow-mode LPs registered at this machine",
|
|
)
|
|
return
|
|
# Build {client_id: remaining_fiat_balance} for proportional allocation.
|
|
client_balances: dict[str, float] = {}
|
|
for client in clients:
|
|
summary = await get_client_balance_summary(client.id)
|
|
if summary is None or summary.remaining_balance <= 0:
|
|
continue
|
|
client_balances[client.id] = summary.remaining_balance
|
|
if not client_balances:
|
|
await _record_skipped_leg(
|
|
settlement,
|
|
machine,
|
|
leg_type="dca",
|
|
amount_sats=settlement.principal_sats,
|
|
reason=(
|
|
"no LP has remaining-fiat-balance > 0 — all confirmed deposits "
|
|
"already paid out"
|
|
),
|
|
)
|
|
return
|
|
# Compute proportional sat allocations, then cap each at the client's
|
|
# remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard).
|
|
raw_allocations = calculate_distribution(
|
|
base_amount_sats=settlement.principal_sats,
|
|
client_balances=client_balances,
|
|
)
|
|
capped_allocations: dict[str, int] = {}
|
|
for client_id, raw_sats in raw_allocations.items():
|
|
remaining_fiat = client_balances[client_id]
|
|
cap_sats = int(remaining_fiat * float(settlement.exchange_rate))
|
|
capped_allocations[client_id] = min(raw_sats, cap_sats)
|
|
client_by_id = {c.id: c for c in clients}
|
|
for client_id, amount_sats in capped_allocations.items():
|
|
await _pay_one_dca_leg(
|
|
settlement, machine, client_by_id[client_id], amount_sats, errors
|
|
)
|
|
|
|
|
|
async def _pay_one_dca_leg(
|
|
settlement: DcaSettlement,
|
|
machine: Machine,
|
|
client: DcaClient,
|
|
amount_sats: int,
|
|
errors: List[str],
|
|
) -> None:
|
|
"""Pay a single DCA leg + best-effort autoforward.
|
|
|
|
Reads the LP's destination wallet + autoforward config from `dca_lp`.
|
|
Callers reach this through `get_flow_mode_clients_for_machine` which
|
|
INNER JOINs on `dca_lp`, so a `prefs is None` here would indicate a
|
|
race (LP deleted their dca_lp row between query and pay) — we
|
|
defensively skip.
|
|
"""
|
|
if amount_sats <= 0:
|
|
return
|
|
prefs = await get_dca_lp(client.user_id)
|
|
if prefs is None:
|
|
errors.append(f"client {client.id}: dca_lp row disappeared mid-distribution")
|
|
return
|
|
amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2)
|
|
memo = f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}"
|
|
dca_leg = await _pay_internal(
|
|
settlement=settlement,
|
|
machine=machine,
|
|
leg_type="dca",
|
|
client_id=client.id,
|
|
destination_wallet_id=prefs.dca_wallet_id,
|
|
amount_sats=amount_sats,
|
|
amount_fiat=amount_fiat,
|
|
exchange_rate=float(settlement.exchange_rate),
|
|
memo=memo,
|
|
errors=errors,
|
|
)
|
|
# Best-effort auto-forward to LP's external LN address (closes
|
|
# satmachineadmin#8). Skip if the DCA leg failed (nothing to forward).
|
|
# If autoforward fails, sats stay in the LP's LNbits wallet — the
|
|
# explicit safety constraint.
|
|
if (
|
|
dca_leg is not None
|
|
and dca_leg.status == "completed"
|
|
and prefs.autoforward_enabled
|
|
and prefs.autoforward_ln_address
|
|
):
|
|
await _attempt_autoforward(client, prefs, machine, settlement, amount_sats)
|
|
|
|
|
|
# =============================================================================
|
|
# Internal transfer helper
|
|
# =============================================================================
|
|
|
|
|
|
async def _attempt_autoforward(
|
|
client: DcaClient,
|
|
prefs: DcaLpPreferences,
|
|
machine: Machine,
|
|
settlement: DcaSettlement,
|
|
amount_sats: int,
|
|
) -> None:
|
|
"""LP auto-forward (best-effort) — closes satmachineadmin#8.
|
|
|
|
Resolves the LP's configured LN address, requests a bolt11 invoice for
|
|
the DCA leg's sat amount, and pays it from the LP's LNbits wallet. Each
|
|
attempt records a dca_payments row with leg_type='autoforward' for
|
|
audit, regardless of outcome.
|
|
|
|
Safety: on any failure (malformed address, LNURL resolution fail,
|
|
payment timeout, etc.) we log a warning and leave the sats in the LP's
|
|
LNbits wallet. The LP can move them manually via the LNbits UI. We
|
|
never re-raise; failed forwarding must not block subsequent legs.
|
|
"""
|
|
address = prefs.autoforward_ln_address
|
|
if not address:
|
|
return
|
|
leg = await create_dca_payment(
|
|
CreateDcaPaymentData(
|
|
settlement_id=settlement.id,
|
|
client_id=client.id,
|
|
machine_id=machine.id,
|
|
operator_user_id=machine.operator_user_id,
|
|
leg_type="autoforward",
|
|
destination_wallet_id=None,
|
|
destination_ln_address=address,
|
|
amount_sats=amount_sats,
|
|
amount_fiat=None,
|
|
exchange_rate=None,
|
|
transaction_time=datetime.now(timezone.utc),
|
|
external_payment_hash=None,
|
|
)
|
|
)
|
|
try:
|
|
lnaddr = LnAddress(address)
|
|
bolt11 = await get_pr_from_lnurl(
|
|
lnurl=lnaddr,
|
|
amount_msat=amount_sats * 1000,
|
|
comment=f"satmachine autoforward — {machine.machine_npub[:12]}",
|
|
)
|
|
paid = await pay_invoice(
|
|
wallet_id=prefs.dca_wallet_id,
|
|
payment_request=bolt11,
|
|
description=f"satmachine autoforward → {address}",
|
|
tag=_payment_tag(machine),
|
|
extra={
|
|
"satmachine_leg": "autoforward",
|
|
"satmachine_settlement_id": settlement.id,
|
|
"satmachine_machine_npub": machine.machine_npub,
|
|
"satmachine_destination": address,
|
|
},
|
|
)
|
|
await update_payment_status(leg.id, "completed", paid.payment_hash, None)
|
|
logger.info(
|
|
f"distribution: autoforward {amount_sats} sats from client "
|
|
f"{client.id} → {address} OK"
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
f"distribution: autoforward FAILED for client {client.id} "
|
|
f"→ {address}: {exc}. Sats stay in LP's LNbits wallet."
|
|
)
|
|
await update_payment_status(leg.id, "failed", None, str(exc)[:512])
|
|
|
|
|
|
async def _pay_split_leg(
|
|
*,
|
|
settlement: DcaSettlement,
|
|
machine: Machine,
|
|
target: str,
|
|
amount_sats: int,
|
|
memo: str,
|
|
errors: List[str],
|
|
) -> Optional[DcaPayment]:
|
|
"""Pay a commission-split leg to an arbitrary target.
|
|
|
|
`target` accepts (splitpayments pattern):
|
|
- Lightning address (user@domain) — resolved via LNURL-pay
|
|
- LNURL string (LNURL...) — resolved via LNURL-pay
|
|
- LNbits wallet invoice key — resolved via get_wallet_for_key,
|
|
then internal create_invoice + pay
|
|
- LNbits wallet id — direct internal create_invoice + pay
|
|
|
|
Records a dca_payments row regardless of outcome (success → 'completed',
|
|
failure → 'failed'); operator sees the row in audit either way.
|
|
"""
|
|
target = (target or "").strip()
|
|
# External target: Lightning address or LNURL.
|
|
if "@" in target or target.upper().startswith("LNURL"):
|
|
leg_row = await create_dca_payment(
|
|
CreateDcaPaymentData(
|
|
settlement_id=settlement.id,
|
|
client_id=None,
|
|
machine_id=machine.id,
|
|
operator_user_id=machine.operator_user_id,
|
|
leg_type="operator_split",
|
|
destination_wallet_id=None,
|
|
destination_ln_address=target,
|
|
amount_sats=amount_sats,
|
|
amount_fiat=None,
|
|
exchange_rate=None,
|
|
transaction_time=datetime.now(timezone.utc),
|
|
external_payment_hash=None,
|
|
)
|
|
)
|
|
extra = {
|
|
"satmachine_leg": "operator_split",
|
|
"satmachine_settlement_id": settlement.id,
|
|
"satmachine_machine_npub": machine.machine_npub,
|
|
"satmachine_destination": target,
|
|
}
|
|
try:
|
|
ln_target = LnAddress(target) if "@" in target else target
|
|
bolt11 = await get_pr_from_lnurl(
|
|
lnurl=ln_target,
|
|
amount_msat=amount_sats * 1000,
|
|
comment=memo,
|
|
)
|
|
paid = await pay_invoice(
|
|
wallet_id=machine.wallet_id,
|
|
payment_request=bolt11,
|
|
description=memo,
|
|
tag=_payment_tag(machine),
|
|
extra=extra,
|
|
)
|
|
await update_payment_status(
|
|
leg_row.id, "completed", paid.payment_hash, None
|
|
)
|
|
return leg_row
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"distribution: operator_split (LNURL/LN-addr) FAILED "
|
|
f"target={target} settlement={settlement.id}: {exc}"
|
|
)
|
|
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
|
|
errors.append(f"operator_split→{target}: {exc}")
|
|
return leg_row
|
|
|
|
# Internal LNbits target: try as invoice key first, fall back to wallet id.
|
|
resolved_wallet_id = target
|
|
try:
|
|
from lnbits.core.crud.wallets import get_wallet_for_key
|
|
|
|
wallet = await get_wallet_for_key(target)
|
|
if wallet is not None:
|
|
resolved_wallet_id = wallet.id
|
|
except Exception:
|
|
# If get_wallet_for_key isn't importable in this LNbits version, just
|
|
# treat target as a wallet id directly.
|
|
pass
|
|
return await _pay_internal(
|
|
settlement=settlement,
|
|
machine=machine,
|
|
leg_type="operator_split",
|
|
client_id=None,
|
|
destination_wallet_id=resolved_wallet_id,
|
|
amount_sats=amount_sats,
|
|
memo=memo,
|
|
errors=errors,
|
|
)
|
|
|
|
|
|
async def _pay_internal(
|
|
*,
|
|
settlement: DcaSettlement,
|
|
machine: Machine,
|
|
leg_type: str,
|
|
client_id: str | None,
|
|
destination_wallet_id: str,
|
|
amount_sats: int,
|
|
memo: str,
|
|
errors: List[str],
|
|
amount_fiat: float | None = None,
|
|
exchange_rate: float | None = None,
|
|
) -> DcaPayment | None:
|
|
"""Create an invoice on the destination wallet, pay it from the machine
|
|
wallet, and record the leg in dca_payments. Returns the dca_payments row
|
|
on success (including the failed case — the row stays for audit)."""
|
|
tag = _payment_tag(machine)
|
|
leg_row = await create_dca_payment(
|
|
CreateDcaPaymentData(
|
|
settlement_id=settlement.id,
|
|
client_id=client_id,
|
|
machine_id=machine.id,
|
|
operator_user_id=machine.operator_user_id,
|
|
leg_type=leg_type,
|
|
destination_wallet_id=destination_wallet_id,
|
|
destination_ln_address=None,
|
|
amount_sats=amount_sats,
|
|
amount_fiat=amount_fiat,
|
|
exchange_rate=exchange_rate,
|
|
transaction_time=datetime.now(),
|
|
external_payment_hash=None,
|
|
)
|
|
)
|
|
extra = {
|
|
"satmachine_leg": leg_type,
|
|
"satmachine_settlement_id": settlement.id,
|
|
"satmachine_machine_npub": machine.machine_npub,
|
|
}
|
|
try:
|
|
new_invoice = await create_invoice(
|
|
wallet_id=destination_wallet_id,
|
|
amount=float(amount_sats),
|
|
internal=True,
|
|
memo=memo,
|
|
extra=extra,
|
|
)
|
|
if not new_invoice or not new_invoice.bolt11:
|
|
await update_payment_status(
|
|
leg_row.id, "failed", None, "create_invoice returned empty"
|
|
)
|
|
errors.append(f"{leg_type}: create_invoice empty")
|
|
return leg_row
|
|
paid = await pay_invoice(
|
|
wallet_id=machine.wallet_id,
|
|
payment_request=new_invoice.bolt11,
|
|
description=memo,
|
|
tag=tag,
|
|
extra=extra,
|
|
)
|
|
await update_payment_status(leg_row.id, "completed", paid.payment_hash, None)
|
|
return leg_row
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"distribution: {leg_type} leg failed "
|
|
f"(settlement={settlement.id} amount={amount_sats}): {exc}"
|
|
)
|
|
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
|
|
errors.append(f"{leg_type}: {exc}")
|
|
return leg_row
|