Compare commits
No commits in common. "3ede66ff92f0d63431d1f74a4417db6f38ed4f22" and "937749f149ebbffd7f6dac60d1a969850c154e42" have entirely different histories.
3ede66ff92
...
937749f149
9 changed files with 62 additions and 2006 deletions
176
bitspire.py
176
bitspire.py
|
|
@ -1,176 +0,0 @@
|
||||||
# Satoshi Machine v2 — bitSpire payment parser.
|
|
||||||
#
|
|
||||||
# Translates an inbound LNbits Payment (cash-out customer paid the ATM's
|
|
||||||
# invoice) into the principal/commission split needed by satmachineadmin.
|
|
||||||
#
|
|
||||||
# Happy path: bitSpire populates Payment.extra with the canonical split
|
|
||||||
# fields per aiolabs/lamassu-next#44 — we read them directly.
|
|
||||||
#
|
|
||||||
# Fallback path: extra is missing (older bitSpire, edge case). We back-derive
|
|
||||||
# the split from the machine's fallback_commission_pct using the Lamassu-era
|
|
||||||
# formula (base = total / (1 + commission)) and mark used_fallback_split=true
|
|
||||||
# so the audit trail shows we estimated.
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
from typing import Any, Optional, Tuple
|
|
||||||
|
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
from .calculations import calculate_commission
|
|
||||||
from .models import CreateDcaSettlementData, Machine
|
|
||||||
|
|
||||||
# Sentinel value bitSpire sets in Payment.extra.source so we know an inbound
|
|
||||||
# payment originated from an ATM cash-out and not some other extension or
|
|
||||||
# customer-initiated transfer.
|
|
||||||
BITSPIRE_SOURCE = "bitspire"
|
|
||||||
|
|
||||||
|
|
||||||
def _coerce_int(v: Any) -> Optional[int]:
|
|
||||||
if v is None:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
return int(v)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _coerce_float(v: Any) -> Optional[float]:
|
|
||||||
if v is None:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
return float(v)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _coerce_str(v: Any) -> Optional[str]:
|
|
||||||
if v is None:
|
|
||||||
return None
|
|
||||||
return str(v) if not isinstance(v, str) else v
|
|
||||||
|
|
||||||
|
|
||||||
def _json_dumps(v: Any) -> Optional[str]:
|
|
||||||
if v is None:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
return json.dumps(v)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def is_bitspire_payment(extra: dict) -> bool:
|
|
||||||
"""True if Payment.extra carries the bitSpire source marker (post-#44)."""
|
|
||||||
return isinstance(extra, dict) and extra.get("source") == BITSPIRE_SOURCE
|
|
||||||
|
|
||||||
|
|
||||||
def parse_settlement(
|
|
||||||
machine: Machine,
|
|
||||||
payment_hash: str,
|
|
||||||
gross_sats: int,
|
|
||||||
extra: dict,
|
|
||||||
super_fee_pct: float,
|
|
||||||
) -> Tuple[CreateDcaSettlementData, bool]:
|
|
||||||
"""Build a CreateDcaSettlementData for an inbound payment landing on
|
|
||||||
`machine`'s wallet.
|
|
||||||
|
|
||||||
Returns (data, used_fallback): when `used_fallback` is True, bitSpire
|
|
||||||
didn't populate Payment.extra so we back-derived the split. Caller
|
|
||||||
should log this for visibility — once aiolabs/lamassu-next#44 ships,
|
|
||||||
fallback usage should drop to zero.
|
|
||||||
"""
|
|
||||||
if is_bitspire_payment(extra):
|
|
||||||
data = _parse_extra(machine, payment_hash, gross_sats, extra, super_fee_pct)
|
|
||||||
return data, False
|
|
||||||
logger.warning(
|
|
||||||
f"satmachineadmin: settlement on machine {machine.machine_npub[:12]}... "
|
|
||||||
f"missing bitSpire extra metadata; back-deriving via "
|
|
||||||
f"fallback_commission_pct={machine.fallback_commission_pct}. "
|
|
||||||
f"See aiolabs/lamassu-next#44."
|
|
||||||
)
|
|
||||||
return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct), True
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_extra(
|
|
||||||
machine: Machine,
|
|
||||||
payment_hash: str,
|
|
||||||
gross_sats: int,
|
|
||||||
extra: dict,
|
|
||||||
super_fee_pct: float,
|
|
||||||
) -> CreateDcaSettlementData:
|
|
||||||
"""Happy path: bitSpire populated Payment.extra per lamassu-next#44."""
|
|
||||||
net_sats = _coerce_int(extra.get("net_sats"))
|
|
||||||
fee_sats = _coerce_int(extra.get("fee_sats"))
|
|
||||||
if net_sats is None or fee_sats is None:
|
|
||||||
# Missing key fields — shouldn't happen post-#44 but defensive.
|
|
||||||
return _parse_fallback(machine, payment_hash, gross_sats, super_fee_pct)
|
|
||||||
commission_sats = fee_sats
|
|
||||||
platform_fee_sats = round(commission_sats * super_fee_pct)
|
|
||||||
operator_fee_sats = commission_sats - platform_fee_sats
|
|
||||||
exchange_rate = _coerce_float(extra.get("exchange_rate"))
|
|
||||||
if exchange_rate is None or exchange_rate <= 0:
|
|
||||||
# Without exchange rate we can't compute fiat. Use 1.0 as a stand-in
|
|
||||||
# and let the operator correct via manual reconciliation.
|
|
||||||
exchange_rate = 1.0
|
|
||||||
fiat_amount = round(gross_sats / exchange_rate, 2) if exchange_rate > 0 else 0.0
|
|
||||||
fiat_code = _coerce_str(extra.get("currency")) or machine.fiat_code
|
|
||||||
return CreateDcaSettlementData(
|
|
||||||
machine_id=machine.id,
|
|
||||||
payment_hash=payment_hash,
|
|
||||||
bitspire_event_id=None,
|
|
||||||
bitspire_txid=_coerce_str(extra.get("txid")),
|
|
||||||
gross_sats=gross_sats,
|
|
||||||
fiat_amount=fiat_amount,
|
|
||||||
fiat_code=fiat_code,
|
|
||||||
exchange_rate=exchange_rate,
|
|
||||||
net_sats=net_sats,
|
|
||||||
commission_sats=commission_sats,
|
|
||||||
platform_fee_sats=platform_fee_sats,
|
|
||||||
operator_fee_sats=operator_fee_sats,
|
|
||||||
used_fallback_split=False,
|
|
||||||
tx_type=_coerce_str(extra.get("type")) or "cash_out",
|
|
||||||
bills_json=_json_dumps(extra.get("bills")),
|
|
||||||
cassettes_json=_json_dumps(extra.get("cassettes")),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_fallback(
|
|
||||||
machine: Machine,
|
|
||||||
payment_hash: str,
|
|
||||||
gross_sats: int,
|
|
||||||
super_fee_pct: float,
|
|
||||||
) -> CreateDcaSettlementData:
|
|
||||||
"""Back-derive the split using the machine's fallback_commission_pct.
|
|
||||||
|
|
||||||
Same formula as the Lamassu integration used:
|
|
||||||
base_amount = round(gross / (1 + commission_pct))
|
|
||||||
commission = gross - base_amount
|
|
||||||
"""
|
|
||||||
net_sats, commission_sats, _effective = calculate_commission(
|
|
||||||
crypto_atoms=gross_sats,
|
|
||||||
commission_percentage=machine.fallback_commission_pct,
|
|
||||||
discount=0.0,
|
|
||||||
)
|
|
||||||
platform_fee_sats = round(commission_sats * super_fee_pct)
|
|
||||||
operator_fee_sats = commission_sats - platform_fee_sats
|
|
||||||
# No exchange rate from the wire; leave fiat_amount=0 so it's visibly
|
|
||||||
# incomplete on the operator's reconciliation screen.
|
|
||||||
return CreateDcaSettlementData(
|
|
||||||
machine_id=machine.id,
|
|
||||||
payment_hash=payment_hash,
|
|
||||||
bitspire_event_id=None,
|
|
||||||
bitspire_txid=None,
|
|
||||||
gross_sats=gross_sats,
|
|
||||||
fiat_amount=0.0,
|
|
||||||
fiat_code=machine.fiat_code,
|
|
||||||
exchange_rate=0.0,
|
|
||||||
net_sats=net_sats,
|
|
||||||
commission_sats=commission_sats,
|
|
||||||
platform_fee_sats=platform_fee_sats,
|
|
||||||
operator_fee_sats=operator_fee_sats,
|
|
||||||
used_fallback_split=True,
|
|
||||||
tx_type="cash_out",
|
|
||||||
bills_json=None,
|
|
||||||
cassettes_json=None,
|
|
||||||
)
|
|
||||||
|
|
@ -131,70 +131,6 @@ def calculate_distribution(
|
||||||
return distributions
|
return distributions
|
||||||
|
|
||||||
|
|
||||||
def split_two_stage_commission(
|
|
||||||
commission_sats: int, super_fee_pct: float
|
|
||||||
) -> Tuple[int, int]:
|
|
||||||
"""Stage-1 of the v2 commission split: super takes `super_fee_pct` of the
|
|
||||||
total commission; the remainder is what the operator's own ruleset acts on.
|
|
||||||
|
|
||||||
Returns (platform_fee_sats, operator_fee_sats). Platform is rounded;
|
|
||||||
operator absorbs the rounding remainder so platform_fee + operator_fee
|
|
||||||
== commission_sats exactly.
|
|
||||||
|
|
||||||
Examples:
|
|
||||||
>>> split_two_stage_commission(100, 0.30)
|
|
||||||
(30, 70)
|
|
||||||
>>> split_two_stage_commission(7965, 0.30)
|
|
||||||
(2390, 5575)
|
|
||||||
>>> split_two_stage_commission(100, 0.0)
|
|
||||||
(0, 100)
|
|
||||||
>>> split_two_stage_commission(100, 1.0)
|
|
||||||
(100, 0)
|
|
||||||
"""
|
|
||||||
if commission_sats <= 0:
|
|
||||||
return 0, 0
|
|
||||||
platform = round(commission_sats * super_fee_pct)
|
|
||||||
platform = max(0, min(platform, commission_sats))
|
|
||||||
operator = commission_sats - platform
|
|
||||||
return platform, operator
|
|
||||||
|
|
||||||
|
|
||||||
def allocate_operator_split_legs(
|
|
||||||
operator_fee_sats: int, leg_pcts: list
|
|
||||||
) -> list:
|
|
||||||
"""Stage-2 of the v2 commission split: the operator's remainder is sliced
|
|
||||||
across N leg wallets per `leg_pcts` (each in 0..1, sum should equal 1.0).
|
|
||||||
|
|
||||||
The last leg absorbs the rounding remainder so the sum of allocations
|
|
||||||
exactly equals operator_fee_sats (assuming pcts sum to ~1.0). Returns
|
|
||||||
a list of integer sat amounts in the same order as leg_pcts.
|
|
||||||
|
|
||||||
Examples:
|
|
||||||
>>> allocate_operator_split_legs(70, [0.5, 0.3, 0.2])
|
|
||||||
[35, 21, 14]
|
|
||||||
>>> allocate_operator_split_legs(5575, [0.5, 0.3, 0.2])
|
|
||||||
[2787, 1672, 1116]
|
|
||||||
>>> allocate_operator_split_legs(100, [1.0])
|
|
||||||
[100]
|
|
||||||
>>> allocate_operator_split_legs(0, [0.5, 0.5])
|
|
||||||
[0, 0]
|
|
||||||
"""
|
|
||||||
if not leg_pcts:
|
|
||||||
return []
|
|
||||||
if operator_fee_sats <= 0:
|
|
||||||
return [0] * len(leg_pcts)
|
|
||||||
allocations: list = []
|
|
||||||
remaining = operator_fee_sats
|
|
||||||
for idx, pct in enumerate(leg_pcts):
|
|
||||||
if idx == len(leg_pcts) - 1:
|
|
||||||
allocations.append(remaining)
|
|
||||||
else:
|
|
||||||
amount = round(operator_fee_sats * float(pct))
|
|
||||||
allocations.append(amount)
|
|
||||||
remaining -= amount
|
|
||||||
return allocations
|
|
||||||
|
|
||||||
|
|
||||||
def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float:
|
def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float:
|
||||||
"""
|
"""
|
||||||
Calculate exchange rate in sats per fiat unit.
|
Calculate exchange rate in sats per fiat unit.
|
||||||
|
|
|
||||||
212
crud.py
212
crud.py
|
|
@ -118,19 +118,6 @@ async def get_machine_by_npub(machine_npub: str) -> Optional[Machine]:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def get_active_machine_by_wallet_id(wallet_id: str) -> Optional[Machine]:
|
|
||||||
"""Used by the invoice listener to route an incoming payment to a machine."""
|
|
||||||
return await db.fetchone(
|
|
||||||
"""
|
|
||||||
SELECT * FROM satoshimachine.dca_machines
|
|
||||||
WHERE wallet_id = :wid AND is_active = true
|
|
||||||
LIMIT 1
|
|
||||||
""",
|
|
||||||
{"wid": wallet_id},
|
|
||||||
Machine,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_machines_for_operator(operator_user_id: str) -> List[Machine]:
|
async def get_machines_for_operator(operator_user_id: str) -> List[Machine]:
|
||||||
return await db.fetchall(
|
return await db.fetchall(
|
||||||
"""
|
"""
|
||||||
|
|
@ -423,24 +410,24 @@ async def delete_deposit(deposit_id: str) -> None:
|
||||||
async def create_settlement_idempotent(
|
async def create_settlement_idempotent(
|
||||||
data: CreateDcaSettlementData,
|
data: CreateDcaSettlementData,
|
||||||
) -> Optional[DcaSettlement]:
|
) -> Optional[DcaSettlement]:
|
||||||
"""Insert a settlement keyed by payment_hash. Returns the inserted row on
|
"""Insert a settlement keyed by bitspire_event_id. Returns the inserted row
|
||||||
first sight; returns the existing row if the payment_hash was already seen
|
on first sight; returns the existing row if the event_id was already seen
|
||||||
(subscription replay, dispatcher double-fire). The UNIQUE constraint on
|
(subscription replay, relay double-delivery). The UNIQUE constraint on
|
||||||
payment_hash is the source of truth."""
|
bitspire_event_id is the source of truth."""
|
||||||
existing = await get_settlement_by_payment_hash(data.payment_hash)
|
existing = await get_settlement_by_event_id(data.bitspire_event_id)
|
||||||
if existing is not None:
|
if existing is not None:
|
||||||
return existing
|
return existing
|
||||||
settlement_id = urlsafe_short_hash()
|
settlement_id = urlsafe_short_hash()
|
||||||
await db.execute(
|
await db.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO satoshimachine.dca_settlements
|
INSERT INTO satoshimachine.dca_settlements
|
||||||
(id, machine_id, payment_hash, bitspire_event_id, bitspire_txid,
|
(id, machine_id, bitspire_event_id, bitspire_txid, payment_hash,
|
||||||
gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats,
|
gross_sats, fiat_amount, fiat_code, exchange_rate, net_sats,
|
||||||
commission_sats, platform_fee_sats, operator_fee_sats,
|
commission_sats, platform_fee_sats, operator_fee_sats,
|
||||||
used_fallback_split, tx_type, bills_json, cassettes_json,
|
used_fallback_split, tx_type, bills_json, cassettes_json,
|
||||||
status, created_at)
|
status, created_at)
|
||||||
VALUES (:id, :machine_id, :payment_hash, :bitspire_event_id,
|
VALUES (:id, :machine_id, :bitspire_event_id, :bitspire_txid,
|
||||||
:bitspire_txid, :gross_sats, :fiat_amount, :fiat_code,
|
:payment_hash, :gross_sats, :fiat_amount, :fiat_code,
|
||||||
:exchange_rate, :net_sats, :commission_sats,
|
:exchange_rate, :net_sats, :commission_sats,
|
||||||
:platform_fee_sats, :operator_fee_sats, :used_fallback_split,
|
:platform_fee_sats, :operator_fee_sats, :used_fallback_split,
|
||||||
:tx_type, :bills_json, :cassettes_json, :status, :created_at)
|
:tx_type, :bills_json, :cassettes_json, :status, :created_at)
|
||||||
|
|
@ -448,9 +435,9 @@ async def create_settlement_idempotent(
|
||||||
{
|
{
|
||||||
"id": settlement_id,
|
"id": settlement_id,
|
||||||
"machine_id": data.machine_id,
|
"machine_id": data.machine_id,
|
||||||
"payment_hash": data.payment_hash,
|
|
||||||
"bitspire_event_id": data.bitspire_event_id,
|
"bitspire_event_id": data.bitspire_event_id,
|
||||||
"bitspire_txid": data.bitspire_txid,
|
"bitspire_txid": data.bitspire_txid,
|
||||||
|
"payment_hash": data.payment_hash,
|
||||||
"gross_sats": data.gross_sats,
|
"gross_sats": data.gross_sats,
|
||||||
"fiat_amount": data.fiat_amount,
|
"fiat_amount": data.fiat_amount,
|
||||||
"fiat_code": data.fiat_code,
|
"fiat_code": data.fiat_code,
|
||||||
|
|
@ -478,15 +465,15 @@ async def get_settlement(settlement_id: str) -> Optional[DcaSettlement]:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def get_settlement_by_payment_hash(
|
async def get_settlement_by_event_id(
|
||||||
payment_hash: str,
|
bitspire_event_id: str,
|
||||||
) -> Optional[DcaSettlement]:
|
) -> Optional[DcaSettlement]:
|
||||||
return await db.fetchone(
|
return await db.fetchone(
|
||||||
"""
|
"""
|
||||||
SELECT * FROM satoshimachine.dca_settlements
|
SELECT * FROM satoshimachine.dca_settlements
|
||||||
WHERE payment_hash = :hash
|
WHERE bitspire_event_id = :eid
|
||||||
""",
|
""",
|
||||||
{"hash": payment_hash},
|
{"eid": bitspire_event_id},
|
||||||
DcaSettlement,
|
DcaSettlement,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -528,9 +515,7 @@ async def mark_settlement_status(
|
||||||
status: str,
|
status: str,
|
||||||
error_message: Optional[str] = None,
|
error_message: Optional[str] = None,
|
||||||
) -> Optional[DcaSettlement]:
|
) -> Optional[DcaSettlement]:
|
||||||
"""Status: 'pending' | 'processing' | 'processed' | 'partial' |
|
"""Status: 'pending' | 'processed' | 'partial' | 'refunded' | 'errored'."""
|
||||||
'refunded' | 'errored'. Clears processing_claim on terminal states so a
|
|
||||||
fresh claim attempt won't see a stale token."""
|
|
||||||
await db.execute(
|
await db.execute(
|
||||||
"""
|
"""
|
||||||
UPDATE satoshimachine.dca_settlements
|
UPDATE satoshimachine.dca_settlements
|
||||||
|
|
@ -539,10 +524,6 @@ async def mark_settlement_status(
|
||||||
processed_at = CASE
|
processed_at = CASE
|
||||||
WHEN :status IN ('processed', 'partial', 'refunded')
|
WHEN :status IN ('processed', 'partial', 'refunded')
|
||||||
THEN :now ELSE processed_at
|
THEN :now ELSE processed_at
|
||||||
END,
|
|
||||||
processing_claim = CASE
|
|
||||||
WHEN :status = 'processing' THEN processing_claim
|
|
||||||
ELSE NULL
|
|
||||||
END
|
END
|
||||||
WHERE id = :id
|
WHERE id = :id
|
||||||
""",
|
""",
|
||||||
|
|
@ -556,165 +537,6 @@ async def mark_settlement_status(
|
||||||
return await get_settlement(settlement_id)
|
return await get_settlement(settlement_id)
|
||||||
|
|
||||||
|
|
||||||
async def claim_settlement_for_processing(
|
|
||||||
settlement_id: str,
|
|
||||||
) -> Optional[DcaSettlement]:
|
|
||||||
"""Optimistic-lock claim: atomically flip a settlement to 'processing'
|
|
||||||
and tag it with a per-invocation token. Returns the claimed row on
|
|
||||||
success; None if another caller already won the claim or the settlement
|
|
||||||
is not in a claimable state ('pending').
|
|
||||||
|
|
||||||
Pattern is portable across SQLite + PostgreSQL (doesn't rely on
|
|
||||||
UPDATE ... RETURNING). Two concurrent invocations may both run the
|
|
||||||
UPDATE, but only one row matches the WHERE clause; the loser's UPDATE
|
|
||||||
is a no-op against status='processing'. The read-back check on the
|
|
||||||
token disambiguates."""
|
|
||||||
token = urlsafe_short_hash()
|
|
||||||
await db.execute(
|
|
||||||
"""
|
|
||||||
UPDATE satoshimachine.dca_settlements
|
|
||||||
SET status = 'processing', processing_claim = :token
|
|
||||||
WHERE id = :id AND status = 'pending'
|
|
||||||
""",
|
|
||||||
{"id": settlement_id, "token": token},
|
|
||||||
)
|
|
||||||
after = await get_settlement(settlement_id)
|
|
||||||
if after is None:
|
|
||||||
return None
|
|
||||||
if after.processing_claim != token:
|
|
||||||
return None
|
|
||||||
return after
|
|
||||||
|
|
||||||
|
|
||||||
async def reset_settlement_for_retry(
|
|
||||||
settlement_id: str,
|
|
||||||
) -> Optional[DcaSettlement]:
|
|
||||||
"""Operator retry path. Flips 'errored' → 'pending' and voids any
|
|
||||||
'failed' legs so process_settlement re-runs them fresh. Completed legs
|
|
||||||
are left in place — we never re-pay sats that already moved."""
|
|
||||||
await db.execute(
|
|
||||||
"""
|
|
||||||
UPDATE satoshimachine.dca_payments
|
|
||||||
SET status = 'voided'
|
|
||||||
WHERE settlement_id = :sid AND status = 'failed'
|
|
||||||
""",
|
|
||||||
{"sid": settlement_id},
|
|
||||||
)
|
|
||||||
await db.execute(
|
|
||||||
"""
|
|
||||||
UPDATE satoshimachine.dca_settlements
|
|
||||||
SET status = 'pending',
|
|
||||||
error_message = NULL,
|
|
||||||
processing_claim = NULL,
|
|
||||||
processed_at = NULL
|
|
||||||
WHERE id = :id AND status = 'errored'
|
|
||||||
""",
|
|
||||||
{"id": settlement_id},
|
|
||||||
)
|
|
||||||
return await get_settlement(settlement_id)
|
|
||||||
|
|
||||||
|
|
||||||
async def apply_partial_dispense(
|
|
||||||
settlement_id: str,
|
|
||||||
*,
|
|
||||||
new_gross_sats: int,
|
|
||||||
new_net_sats: int,
|
|
||||||
new_commission_sats: int,
|
|
||||||
new_platform_fee_sats: int,
|
|
||||||
new_operator_fee_sats: int,
|
|
||||||
new_fiat_amount: float,
|
|
||||||
appended_note: str,
|
|
||||||
) -> Optional[DcaSettlement]:
|
|
||||||
"""Overwrite the monetary fields on a settlement (partial-dispense
|
|
||||||
recompute) and prepend `appended_note` to the notes column.
|
|
||||||
|
|
||||||
Notes are append-only: new lines go at the top (newest first) so the
|
|
||||||
settlement detail view shows the most recent adjustment first without
|
|
||||||
needing to scroll. Resets status to 'pending' so process_settlement
|
|
||||||
can re-distribute via the existing idempotent path."""
|
|
||||||
await db.execute(
|
|
||||||
"""
|
|
||||||
UPDATE satoshimachine.dca_settlements
|
|
||||||
SET gross_sats = :gross,
|
|
||||||
net_sats = :net,
|
|
||||||
commission_sats = :commission,
|
|
||||||
platform_fee_sats = :platform,
|
|
||||||
operator_fee_sats = :operator,
|
|
||||||
fiat_amount = :fiat,
|
|
||||||
status = 'pending',
|
|
||||||
error_message = NULL,
|
|
||||||
processed_at = NULL,
|
|
||||||
notes = CASE
|
|
||||||
WHEN notes IS NULL OR notes = '' THEN :note
|
|
||||||
ELSE :note || char(10) || char(10) || notes
|
|
||||||
END
|
|
||||||
WHERE id = :id
|
|
||||||
""",
|
|
||||||
{
|
|
||||||
"id": settlement_id,
|
|
||||||
"gross": new_gross_sats,
|
|
||||||
"net": new_net_sats,
|
|
||||||
"commission": new_commission_sats,
|
|
||||||
"platform": new_platform_fee_sats,
|
|
||||||
"operator": new_operator_fee_sats,
|
|
||||||
"fiat": new_fiat_amount,
|
|
||||||
"note": appended_note,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return await get_settlement(settlement_id)
|
|
||||||
|
|
||||||
|
|
||||||
async def count_completed_legs_for_settlement(settlement_id: str) -> int:
|
|
||||||
"""Used by partial-dispense to refuse adjustments after any leg has
|
|
||||||
successfully moved sats (Lightning payments can't be clawed back)."""
|
|
||||||
row = await db.fetchone(
|
|
||||||
"""
|
|
||||||
SELECT COUNT(*) AS n FROM satoshimachine.dca_payments
|
|
||||||
WHERE settlement_id = :sid AND status = 'completed'
|
|
||||||
""",
|
|
||||||
{"sid": settlement_id},
|
|
||||||
)
|
|
||||||
return int(row["n"]) if row else 0
|
|
||||||
|
|
||||||
|
|
||||||
async def append_settlement_note(
|
|
||||||
settlement_id: str, note: str, author_user_id: str
|
|
||||||
) -> Optional[DcaSettlement]:
|
|
||||||
"""Prepend an operator-authored note to settlement.notes. Each entry is
|
|
||||||
timestamped (UTC) and tagged with the author's user id so the trail
|
|
||||||
is accountable. Append-only: existing entries are never edited."""
|
|
||||||
from datetime import timezone
|
|
||||||
|
|
||||||
ts = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
||||||
formatted = f"[{ts} by {author_user_id}] {note}"
|
|
||||||
await db.execute(
|
|
||||||
"""
|
|
||||||
UPDATE satoshimachine.dca_settlements
|
|
||||||
SET notes = CASE
|
|
||||||
WHEN notes IS NULL OR notes = '' THEN :note
|
|
||||||
ELSE :note || char(10) || char(10) || notes
|
|
||||||
END
|
|
||||||
WHERE id = :id
|
|
||||||
""",
|
|
||||||
{"id": settlement_id, "note": formatted},
|
|
||||||
)
|
|
||||||
return await get_settlement(settlement_id)
|
|
||||||
|
|
||||||
|
|
||||||
async def void_open_legs_for_settlement(settlement_id: str) -> None:
|
|
||||||
"""Marks pending/failed legs as 'voided' before re-running distribution
|
|
||||||
on a partial-dispense recompute. Preserves the rows for audit but stops
|
|
||||||
them from being interpreted as live."""
|
|
||||||
await db.execute(
|
|
||||||
"""
|
|
||||||
UPDATE satoshimachine.dca_payments
|
|
||||||
SET status = 'voided'
|
|
||||||
WHERE settlement_id = :sid AND status IN ('pending', 'failed')
|
|
||||||
""",
|
|
||||||
{"sid": settlement_id},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# Commission splits — operator's remainder-distribution rules.
|
# Commission splits — operator's remainder-distribution rules.
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
@ -955,15 +777,11 @@ async def get_client_balance_summary(
|
||||||
""",
|
""",
|
||||||
{"cid": client_id},
|
{"cid": client_id},
|
||||||
)
|
)
|
||||||
# Both DCA legs (auto, from bitSpire settlements) and balance-settle legs
|
|
||||||
# (operator-initiated under #4) reduce the LP's remaining fiat balance.
|
|
||||||
payments_row = await db.fetchone(
|
payments_row = await db.fetchone(
|
||||||
"""
|
"""
|
||||||
SELECT COALESCE(SUM(amount_fiat), 0) AS total
|
SELECT COALESCE(SUM(amount_fiat), 0) AS total
|
||||||
FROM satoshimachine.dca_payments
|
FROM satoshimachine.dca_payments
|
||||||
WHERE client_id = :cid
|
WHERE client_id = :cid AND leg_type = 'dca' AND status = 'completed'
|
||||||
AND leg_type IN ('dca', 'settlement')
|
|
||||||
AND status = 'completed'
|
|
||||||
""",
|
""",
|
||||||
{"cid": client_id},
|
{"cid": client_id},
|
||||||
)
|
)
|
||||||
|
|
|
||||||
572
distribution.py
572
distribution.py
|
|
@ -1,572 +0,0 @@
|
||||||
# Satoshi Machine v2 — settlement distribution (P2).
|
|
||||||
#
|
|
||||||
# Picks up a dca_settlements row with status='pending' and pays out the
|
|
||||||
# three leg groups via LNbits internal transfers (create_invoice +
|
|
||||||
# pay_invoice on the same instance auto-detect internal). All legs land
|
|
||||||
# in dca_payments with the appropriate leg_type discriminator and inherit
|
|
||||||
# the Payment.tag "satmachine:{machine_npub}" so LNbits payment-history
|
|
||||||
# filters work natively.
|
|
||||||
#
|
|
||||||
# Leg order:
|
|
||||||
# 1. super_fee — platform_fee_sats → super_fee_wallet_id (if set)
|
|
||||||
# 2. operator_split — operator_fee_sats split per operator's rules
|
|
||||||
# 3. dca — net_sats distributed proportionally to active LPs,
|
|
||||||
# each leg capped at the LP's remaining fiat balance
|
|
||||||
# (preserves the v1 sync-mismatch fix from PR #2)
|
|
||||||
#
|
|
||||||
# Atomicity: LN payments cannot be rolled back. We attempt each leg, record
|
|
||||||
# success/failure per dca_payments row, and mark the settlement 'processed'
|
|
||||||
# only when every leg completed. Any failure marks 'errored' with a message
|
|
||||||
# but leaves the successful legs in place. Sats that don't get paid out
|
|
||||||
# (failed legs, no LP coverage, missing super wallet) remain in the
|
|
||||||
# machine's wallet — visible to the operator on the dashboard.
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from typing import List
|
|
||||||
|
|
||||||
from lnbits.core.services import create_invoice, pay_invoice
|
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
from .calculations import (
|
|
||||||
allocate_operator_split_legs,
|
|
||||||
calculate_distribution,
|
|
||||||
split_two_stage_commission,
|
|
||||||
)
|
|
||||||
from .crud import (
|
|
||||||
apply_partial_dispense,
|
|
||||||
claim_settlement_for_processing,
|
|
||||||
count_completed_legs_for_settlement,
|
|
||||||
create_dca_payment,
|
|
||||||
get_client_balance_summary,
|
|
||||||
get_effective_commission_splits,
|
|
||||||
get_flow_mode_clients_for_machine,
|
|
||||||
get_machine,
|
|
||||||
get_settlement,
|
|
||||||
get_super_config,
|
|
||||||
mark_settlement_status,
|
|
||||||
update_payment_status,
|
|
||||||
void_open_legs_for_settlement,
|
|
||||||
)
|
|
||||||
from .models import (
|
|
||||||
CreateDcaPaymentData,
|
|
||||||
DcaClient,
|
|
||||||
DcaPayment,
|
|
||||||
DcaSettlement,
|
|
||||||
Machine,
|
|
||||||
PartialDispenseData,
|
|
||||||
SettleBalanceData,
|
|
||||||
SuperConfig,
|
|
||||||
)
|
|
||||||
|
|
||||||
PAYMENT_TAG_PREFIX = "satmachine"
|
|
||||||
|
|
||||||
|
|
||||||
def _payment_tag(machine: Machine) -> str:
|
|
||||||
return f"{PAYMENT_TAG_PREFIX}:{machine.machine_npub}"
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve_partial_dispense_gross(
|
|
||||||
settlement: DcaSettlement, data: PartialDispenseData
|
|
||||||
) -> int:
|
|
||||||
if data.dispensed_sats is not None:
|
|
||||||
new_gross = int(data.dispensed_sats)
|
|
||||||
elif data.dispensed_fraction is not None:
|
|
||||||
new_gross = round(settlement.gross_sats * float(data.dispensed_fraction))
|
|
||||||
else:
|
|
||||||
raise ValueError("provide one of dispensed_sats or dispensed_fraction")
|
|
||||||
if new_gross < 0:
|
|
||||||
raise ValueError("partial dispense cannot be negative")
|
|
||||||
if new_gross > settlement.gross_sats:
|
|
||||||
raise ValueError(
|
|
||||||
f"partial dispense ({new_gross} sats) cannot exceed the original "
|
|
||||||
f"gross ({settlement.gross_sats} sats)"
|
|
||||||
)
|
|
||||||
return new_gross
|
|
||||||
|
|
||||||
|
|
||||||
def _build_partial_dispense_memo(
|
|
||||||
settlement: DcaSettlement,
|
|
||||||
data: PartialDispenseData,
|
|
||||||
*,
|
|
||||||
new_gross: int,
|
|
||||||
new_net: int,
|
|
||||||
new_commission: int,
|
|
||||||
new_platform: int,
|
|
||||||
new_operator: int,
|
|
||||||
) -> str:
|
|
||||||
reason = (data.notes or "").strip() or "(no reason given)"
|
|
||||||
if data.dispensed_sats is not None:
|
|
||||||
adjust = f"dispensed_sats={data.dispensed_sats}"
|
|
||||||
else:
|
|
||||||
adjust = f"dispensed_fraction={data.dispensed_fraction}"
|
|
||||||
ts = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
||||||
return (
|
|
||||||
f"[{ts}] partial dispense applied — {adjust}. "
|
|
||||||
f"Original gross={settlement.gross_sats} net={settlement.net_sats} "
|
|
||||||
f"commission={settlement.commission_sats} "
|
|
||||||
f"(super_fee={settlement.platform_fee_sats} "
|
|
||||||
f"operator_fee={settlement.operator_fee_sats}). "
|
|
||||||
f"New gross={new_gross} net={new_net} commission={new_commission} "
|
|
||||||
f"(super_fee={new_platform} operator_fee={new_operator}). "
|
|
||||||
f"Reason: {reason}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def settle_lp_balance(
|
|
||||||
client: DcaClient, machine: Machine, data: SettleBalanceData
|
|
||||||
) -> DcaPayment:
|
|
||||||
"""Operator UX action — closes satmachineadmin#4.
|
|
||||||
|
|
||||||
Settle an LP's remaining fiat balance from the operator's chosen funding
|
|
||||||
wallet at the rate the operator specified. Records a leg_type='settlement'
|
|
||||||
row that counts against the LP's balance summary (so a subsequent
|
|
||||||
get_client_balance_summary reflects the new zero/reduced balance).
|
|
||||||
|
|
||||||
Caller is responsible for verifying the operator owns both the client's
|
|
||||||
machine and the funding wallet (API endpoint does this). The amount_fiat
|
|
||||||
is capped at the LP's remaining balance — operators cannot accidentally
|
|
||||||
over-pay via this path.
|
|
||||||
"""
|
|
||||||
summary = await get_client_balance_summary(client.id)
|
|
||||||
if summary is None:
|
|
||||||
raise ValueError(f"client {client.id} balance not available")
|
|
||||||
remaining = float(summary.remaining_balance)
|
|
||||||
if remaining <= 0:
|
|
||||||
raise ValueError(
|
|
||||||
f"client {client.id} has no remaining balance to settle"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Resolve fiat amount: explicit if given (capped at remaining), else full.
|
|
||||||
requested = (
|
|
||||||
float(data.amount_fiat) if data.amount_fiat is not None else remaining
|
|
||||||
)
|
|
||||||
amount_fiat = round(min(requested, remaining), 2)
|
|
||||||
if amount_fiat <= 0:
|
|
||||||
raise ValueError("computed settlement amount is zero")
|
|
||||||
|
|
||||||
exchange_rate = float(data.exchange_rate)
|
|
||||||
amount_sats = round(amount_fiat * exchange_rate)
|
|
||||||
if amount_sats <= 0:
|
|
||||||
raise ValueError(
|
|
||||||
f"computed sat amount is zero (amount_fiat={amount_fiat}, "
|
|
||||||
f"exchange_rate={exchange_rate})"
|
|
||||||
)
|
|
||||||
|
|
||||||
reason = (data.notes or "").strip() or "(no reason given)"
|
|
||||||
memo = (
|
|
||||||
f"satmachine balance settle — {amount_fiat:.2f} "
|
|
||||||
f"{machine.fiat_code} @ {exchange_rate:g} sat/{machine.fiat_code} "
|
|
||||||
f"= {amount_sats} sats. Reason: {reason}"
|
|
||||||
)
|
|
||||||
|
|
||||||
leg_row = await create_dca_payment(
|
|
||||||
CreateDcaPaymentData(
|
|
||||||
settlement_id=None,
|
|
||||||
client_id=client.id,
|
|
||||||
machine_id=machine.id,
|
|
||||||
operator_user_id=machine.operator_user_id,
|
|
||||||
leg_type="settlement",
|
|
||||||
destination_wallet_id=client.wallet_id,
|
|
||||||
destination_ln_address=None,
|
|
||||||
amount_sats=amount_sats,
|
|
||||||
amount_fiat=amount_fiat,
|
|
||||||
exchange_rate=exchange_rate,
|
|
||||||
transaction_time=datetime.now(timezone.utc),
|
|
||||||
external_payment_hash=None,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
extra = {
|
|
||||||
"satmachine_leg": "settlement",
|
|
||||||
"satmachine_client_id": client.id,
|
|
||||||
"satmachine_machine_npub": machine.machine_npub,
|
|
||||||
"satmachine_exchange_rate": exchange_rate,
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
new_invoice = await create_invoice(
|
|
||||||
wallet_id=client.wallet_id,
|
|
||||||
amount=float(amount_sats),
|
|
||||||
internal=True,
|
|
||||||
memo=memo,
|
|
||||||
extra=extra,
|
|
||||||
)
|
|
||||||
if not new_invoice or not new_invoice.bolt11:
|
|
||||||
await update_payment_status(
|
|
||||||
leg_row.id, "failed", None, "create_invoice returned empty"
|
|
||||||
)
|
|
||||||
raise ValueError("create_invoice returned empty")
|
|
||||||
paid = await pay_invoice(
|
|
||||||
wallet_id=data.funding_wallet_id,
|
|
||||||
payment_request=new_invoice.bolt11,
|
|
||||||
description=memo,
|
|
||||||
tag=_payment_tag(machine),
|
|
||||||
extra=extra,
|
|
||||||
)
|
|
||||||
completed = await update_payment_status(
|
|
||||||
leg_row.id, "completed", paid.payment_hash, None
|
|
||||||
)
|
|
||||||
return completed if completed is not None else leg_row
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error(
|
|
||||||
f"distribution: balance-settle failed for client {client.id} "
|
|
||||||
f"({amount_sats} sats from wallet {data.funding_wallet_id}): {exc}"
|
|
||||||
)
|
|
||||||
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
async def apply_partial_dispense_and_redistribute(
|
|
||||||
settlement_id: str, data: PartialDispenseData
|
|
||||||
) -> DcaSettlement:
|
|
||||||
"""Operator UX action — closes satmachineadmin#3.
|
|
||||||
|
|
||||||
When a bitSpire dispense fails mid-transaction (e.g., dispenser jam after
|
|
||||||
6 of 10 bills), the operator confirms the actual amount dispensed and we
|
|
||||||
re-allocate the split against that partial gross. Sat amounts scale
|
|
||||||
linearly, preserving the original commission ratio exactly; the two-stage
|
|
||||||
super/operator split is recomputed using the CURRENT super_fee_pct
|
|
||||||
(super may have changed the rate since the original landed).
|
|
||||||
|
|
||||||
Hard guard: refuses if any dca_payments leg has already completed.
|
|
||||||
Lightning payments can't be clawed back, so we won't try.
|
|
||||||
|
|
||||||
Side effects:
|
|
||||||
- Voids pending/failed legs (status → 'voided').
|
|
||||||
- Overwrites the settlement's monetary fields with the new totals.
|
|
||||||
- Appends a timestamped memo to settlement.notes capturing the
|
|
||||||
original values + operator's reason.
|
|
||||||
- Resets settlement.status to 'pending' and triggers process_settlement.
|
|
||||||
"""
|
|
||||||
settlement = await get_settlement(settlement_id)
|
|
||||||
if settlement is None:
|
|
||||||
raise ValueError(f"settlement {settlement_id} not found")
|
|
||||||
if settlement.gross_sats <= 0:
|
|
||||||
raise ValueError("cannot partial-dispense a zero-gross settlement")
|
|
||||||
completed = await count_completed_legs_for_settlement(settlement_id)
|
|
||||||
if completed > 0:
|
|
||||||
raise ValueError(
|
|
||||||
f"cannot partial-dispense: {completed} leg(s) already completed "
|
|
||||||
"(Lightning payments can't be clawed back)"
|
|
||||||
)
|
|
||||||
|
|
||||||
new_gross = _resolve_partial_dispense_gross(settlement, data)
|
|
||||||
# Linear scale preserves the original commission ratio exactly.
|
|
||||||
scale = new_gross / settlement.gross_sats
|
|
||||||
new_commission = round(settlement.commission_sats * scale)
|
|
||||||
new_net = new_gross - new_commission
|
|
||||||
new_fiat = round(float(settlement.fiat_amount) * scale, 2)
|
|
||||||
|
|
||||||
# Re-stage-1 split using the CURRENT super_fee_pct.
|
|
||||||
super_config = await get_super_config()
|
|
||||||
super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0
|
|
||||||
new_platform, new_operator = split_two_stage_commission(
|
|
||||||
new_commission, super_fee_pct
|
|
||||||
)
|
|
||||||
|
|
||||||
memo = _build_partial_dispense_memo(
|
|
||||||
settlement,
|
|
||||||
data,
|
|
||||||
new_gross=new_gross,
|
|
||||||
new_net=new_net,
|
|
||||||
new_commission=new_commission,
|
|
||||||
new_platform=new_platform,
|
|
||||||
new_operator=new_operator,
|
|
||||||
)
|
|
||||||
|
|
||||||
await void_open_legs_for_settlement(settlement_id)
|
|
||||||
updated = await apply_partial_dispense(
|
|
||||||
settlement_id,
|
|
||||||
new_gross_sats=new_gross,
|
|
||||||
new_net_sats=new_net,
|
|
||||||
new_commission_sats=new_commission,
|
|
||||||
new_platform_fee_sats=new_platform,
|
|
||||||
new_operator_fee_sats=new_operator,
|
|
||||||
new_fiat_amount=new_fiat,
|
|
||||||
appended_note=memo,
|
|
||||||
)
|
|
||||||
if updated is None:
|
|
||||||
raise ValueError(f"settlement {settlement_id} disappeared mid-update")
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"distribution: partial-dispense applied to settlement "
|
|
||||||
f"{settlement_id} — re-running distribution"
|
|
||||||
)
|
|
||||||
await process_settlement(settlement_id)
|
|
||||||
after = await get_settlement(settlement_id)
|
|
||||||
return after if after is not None else updated
|
|
||||||
|
|
||||||
|
|
||||||
async def process_settlement(settlement_id: str) -> None:
|
|
||||||
"""Process a pending settlement end-to-end.
|
|
||||||
|
|
||||||
Concurrency-safe: an optimistic-lock claim flips the settlement to
|
|
||||||
'processing' atomically and tags it with a per-invocation token.
|
|
||||||
Concurrent invocations on the same id can't both win — losers see the
|
|
||||||
claim mismatch on read-back and return without writing any legs.
|
|
||||||
Retries land via reset_settlement_for_retry which voids failed legs
|
|
||||||
and flips 'errored' back to 'pending'."""
|
|
||||||
settlement = await claim_settlement_for_processing(settlement_id)
|
|
||||||
if settlement is None:
|
|
||||||
# Either already claimed by a concurrent invocation, or not in a
|
|
||||||
# 'pending' state. Either way, nothing to do here.
|
|
||||||
logger.debug(
|
|
||||||
f"distribution: skip {settlement_id} — not claimable (already "
|
|
||||||
"processing or not pending)"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
machine = await get_machine(settlement.machine_id)
|
|
||||||
if machine is None:
|
|
||||||
logger.error(
|
|
||||||
f"distribution: settlement {settlement_id} references missing "
|
|
||||||
f"machine {settlement.machine_id}"
|
|
||||||
)
|
|
||||||
await mark_settlement_status(
|
|
||||||
settlement_id, "errored", "machine missing"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
super_config = await get_super_config()
|
|
||||||
errors: List[str] = []
|
|
||||||
|
|
||||||
try:
|
|
||||||
await _pay_super_fee(settlement, machine, super_config, errors)
|
|
||||||
await _pay_operator_splits(settlement, machine, errors)
|
|
||||||
await _pay_dca_distributions(settlement, machine, errors)
|
|
||||||
except Exception as exc: # last-resort guard
|
|
||||||
logger.exception("distribution: unexpected error processing settlement")
|
|
||||||
errors.append(f"unexpected: {exc}")
|
|
||||||
|
|
||||||
if errors:
|
|
||||||
await mark_settlement_status(
|
|
||||||
settlement_id, "errored", "; ".join(errors)[:512]
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
await mark_settlement_status(settlement_id, "processed", None)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Leg 1 — super fee
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
async def _pay_super_fee(
|
|
||||||
settlement: DcaSettlement,
|
|
||||||
machine: Machine,
|
|
||||||
super_config: SuperConfig | None,
|
|
||||||
errors: List[str],
|
|
||||||
) -> None:
|
|
||||||
if settlement.platform_fee_sats <= 0:
|
|
||||||
return
|
|
||||||
if super_config is None or not super_config.super_fee_wallet_id:
|
|
||||||
# Super has configured a fee but not a destination wallet — leave
|
|
||||||
# the sats in the machine wallet and warn. The super needs to
|
|
||||||
# configure their wallet before they can collect.
|
|
||||||
logger.warning(
|
|
||||||
f"distribution: super_fee_sats={settlement.platform_fee_sats} "
|
|
||||||
f"left in machine wallet (super_fee_wallet_id not set)"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
await _pay_internal(
|
|
||||||
settlement=settlement,
|
|
||||||
machine=machine,
|
|
||||||
leg_type="super_fee",
|
|
||||||
client_id=None,
|
|
||||||
destination_wallet_id=super_config.super_fee_wallet_id,
|
|
||||||
amount_sats=settlement.platform_fee_sats,
|
|
||||||
memo=f"satmachine super fee — {machine.name or machine.machine_npub[:12]}",
|
|
||||||
errors=errors,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Leg 2 — operator commission splits
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
async def _pay_operator_splits(
|
|
||||||
settlement: DcaSettlement,
|
|
||||||
machine: Machine,
|
|
||||||
errors: List[str],
|
|
||||||
) -> None:
|
|
||||||
if settlement.operator_fee_sats <= 0:
|
|
||||||
return
|
|
||||||
splits = await get_effective_commission_splits(
|
|
||||||
machine.operator_user_id, machine.id
|
|
||||||
)
|
|
||||||
if not splits:
|
|
||||||
logger.warning(
|
|
||||||
f"distribution: operator_fee_sats={settlement.operator_fee_sats} "
|
|
||||||
f"left in machine wallet (operator has no commission_splits ruleset "
|
|
||||||
f"for machine {machine.id})"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
# Pure allocator handles the rounding rule (last leg absorbs remainder).
|
|
||||||
leg_amounts = allocate_operator_split_legs(
|
|
||||||
settlement.operator_fee_sats,
|
|
||||||
[float(leg.pct) for leg in splits],
|
|
||||||
)
|
|
||||||
for idx, (leg, amount) in enumerate(zip(splits, leg_amounts, strict=True)):
|
|
||||||
if amount <= 0:
|
|
||||||
continue
|
|
||||||
label = leg.label or f"split-{idx + 1}"
|
|
||||||
memo = (
|
|
||||||
f"satmachine operator split — "
|
|
||||||
f"{machine.name or machine.machine_npub[:12]} ({label})"
|
|
||||||
)
|
|
||||||
await _pay_internal(
|
|
||||||
settlement=settlement,
|
|
||||||
machine=machine,
|
|
||||||
leg_type="operator_split",
|
|
||||||
client_id=None,
|
|
||||||
destination_wallet_id=leg.wallet_id,
|
|
||||||
amount_sats=amount,
|
|
||||||
memo=memo,
|
|
||||||
errors=errors,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Leg 3 — DCA distribution to active LPs
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
async def _pay_dca_distributions(
|
|
||||||
settlement: DcaSettlement,
|
|
||||||
machine: Machine,
|
|
||||||
errors: List[str],
|
|
||||||
) -> None:
|
|
||||||
if settlement.net_sats <= 0:
|
|
||||||
return
|
|
||||||
if settlement.exchange_rate <= 0:
|
|
||||||
# Fallback path with no exchange rate (bitSpire Payment.extra absent).
|
|
||||||
# Without a rate we can't compute fiat balances → can't compute
|
|
||||||
# proportional shares → leave net_sats in the machine wallet for
|
|
||||||
# the operator to manually reconcile.
|
|
||||||
logger.warning(
|
|
||||||
f"distribution: net_sats={settlement.net_sats} left in machine "
|
|
||||||
f"wallet (no exchange_rate; fallback path; see lamassu-next#44)"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
clients = await get_flow_mode_clients_for_machine(machine.id)
|
|
||||||
if not clients:
|
|
||||||
return
|
|
||||||
# Build {client_id: remaining_fiat_balance} for proportional allocation.
|
|
||||||
client_balances: dict[str, float] = {}
|
|
||||||
for client in clients:
|
|
||||||
summary = await get_client_balance_summary(client.id)
|
|
||||||
if summary is None or summary.remaining_balance <= 0:
|
|
||||||
continue
|
|
||||||
client_balances[client.id] = summary.remaining_balance
|
|
||||||
if not client_balances:
|
|
||||||
return
|
|
||||||
# Compute proportional sat allocations, then cap each at the client's
|
|
||||||
# remaining-fiat-balance-in-sats (the v1 sync-mismatch safeguard).
|
|
||||||
raw_allocations = calculate_distribution(
|
|
||||||
base_amount_sats=settlement.net_sats,
|
|
||||||
client_balances=client_balances,
|
|
||||||
)
|
|
||||||
capped_allocations: dict[str, int] = {}
|
|
||||||
for client_id, raw_sats in raw_allocations.items():
|
|
||||||
remaining_fiat = client_balances[client_id]
|
|
||||||
cap_sats = int(remaining_fiat * float(settlement.exchange_rate))
|
|
||||||
capped_allocations[client_id] = min(raw_sats, cap_sats)
|
|
||||||
# Pay each capped allocation.
|
|
||||||
client_by_id = {c.id: c for c in clients}
|
|
||||||
for client_id, amount_sats in capped_allocations.items():
|
|
||||||
if amount_sats <= 0:
|
|
||||||
continue
|
|
||||||
client = client_by_id[client_id]
|
|
||||||
amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2)
|
|
||||||
memo = (
|
|
||||||
f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}"
|
|
||||||
)
|
|
||||||
await _pay_internal(
|
|
||||||
settlement=settlement,
|
|
||||||
machine=machine,
|
|
||||||
leg_type="dca",
|
|
||||||
client_id=client.id,
|
|
||||||
destination_wallet_id=client.wallet_id,
|
|
||||||
amount_sats=amount_sats,
|
|
||||||
amount_fiat=amount_fiat,
|
|
||||||
exchange_rate=float(settlement.exchange_rate),
|
|
||||||
memo=memo,
|
|
||||||
errors=errors,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Internal transfer helper
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
async def _pay_internal(
|
|
||||||
*,
|
|
||||||
settlement: DcaSettlement,
|
|
||||||
machine: Machine,
|
|
||||||
leg_type: str,
|
|
||||||
client_id: str | None,
|
|
||||||
destination_wallet_id: str,
|
|
||||||
amount_sats: int,
|
|
||||||
memo: str,
|
|
||||||
errors: List[str],
|
|
||||||
amount_fiat: float | None = None,
|
|
||||||
exchange_rate: float | None = None,
|
|
||||||
) -> DcaPayment | None:
|
|
||||||
"""Create an invoice on the destination wallet, pay it from the machine
|
|
||||||
wallet, and record the leg in dca_payments. Returns the dca_payments row
|
|
||||||
on success (including the failed case — the row stays for audit)."""
|
|
||||||
tag = _payment_tag(machine)
|
|
||||||
leg_row = await create_dca_payment(
|
|
||||||
CreateDcaPaymentData(
|
|
||||||
settlement_id=settlement.id,
|
|
||||||
client_id=client_id,
|
|
||||||
machine_id=machine.id,
|
|
||||||
operator_user_id=machine.operator_user_id,
|
|
||||||
leg_type=leg_type,
|
|
||||||
destination_wallet_id=destination_wallet_id,
|
|
||||||
destination_ln_address=None,
|
|
||||||
amount_sats=amount_sats,
|
|
||||||
amount_fiat=amount_fiat,
|
|
||||||
exchange_rate=exchange_rate,
|
|
||||||
transaction_time=datetime.now(),
|
|
||||||
external_payment_hash=None,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
extra = {
|
|
||||||
"satmachine_leg": leg_type,
|
|
||||||
"satmachine_settlement_id": settlement.id,
|
|
||||||
"satmachine_machine_npub": machine.machine_npub,
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
new_invoice = await create_invoice(
|
|
||||||
wallet_id=destination_wallet_id,
|
|
||||||
amount=float(amount_sats),
|
|
||||||
internal=True,
|
|
||||||
memo=memo,
|
|
||||||
extra=extra,
|
|
||||||
)
|
|
||||||
if not new_invoice or not new_invoice.bolt11:
|
|
||||||
await update_payment_status(
|
|
||||||
leg_row.id, "failed", None, "create_invoice returned empty"
|
|
||||||
)
|
|
||||||
errors.append(f"{leg_type}: create_invoice empty")
|
|
||||||
return leg_row
|
|
||||||
paid = await pay_invoice(
|
|
||||||
wallet_id=machine.wallet_id,
|
|
||||||
payment_request=new_invoice.bolt11,
|
|
||||||
description=memo,
|
|
||||||
tag=tag,
|
|
||||||
extra=extra,
|
|
||||||
)
|
|
||||||
await update_payment_status(
|
|
||||||
leg_row.id, "completed", paid.payment_hash, None
|
|
||||||
)
|
|
||||||
return leg_row
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error(
|
|
||||||
f"distribution: {leg_type} leg failed "
|
|
||||||
f"(settlement={settlement.id} amount={amount_sats}): {exc}"
|
|
||||||
)
|
|
||||||
await update_payment_status(leg_row.id, "failed", None, str(exc)[:512])
|
|
||||||
errors.append(f"{leg_type}: {exc}")
|
|
||||||
return leg_row
|
|
||||||
|
|
@ -290,28 +290,20 @@ async def m005_satmachine_v2_overhaul(db):
|
||||||
"ON satoshimachine.dca_deposits (client_id, created_at DESC)"
|
"ON satoshimachine.dca_deposits (client_id, created_at DESC)"
|
||||||
)
|
)
|
||||||
|
|
||||||
# dca_settlements — idempotency table for bitSpire-driven settlements.
|
# dca_settlements — idempotency table for bitSpire kind-21000 events.
|
||||||
# The natural unique key is payment_hash (every LN invoice has a globally
|
# CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute BIGINT
|
||||||
# unique hash; subscription replays / dispatcher double-fires dedup via the
|
# (not as a derived percentage). Today this is just the contractual split. Once
|
||||||
# UNIQUE constraint). bitspire_event_id is reserved for a future path where
|
# the v2 promotion engine ships, the two values diverge when discounts fire and
|
||||||
# we subscribe to raw Nostr events directly (kind-30078/30079 ingestion
|
# this row is the only audit-grade record of who forgave what. Do not collapse
|
||||||
# uses dca_telemetry; bitspire_event_id is kept here for future bookkeeping
|
# them into a single commission_pct field. See plan section "Customer discounts".
|
||||||
# if we ever bypass the LNbits Payment system).
|
|
||||||
#
|
|
||||||
# CRITICAL: platform_fee_sats and operator_fee_sats are stored as absolute
|
|
||||||
# BIGINT (not a derived percentage). Today this is just the contractual
|
|
||||||
# split. Once the v2 promotion engine ships, the two values diverge when
|
|
||||||
# discounts fire and this row is the only audit-grade record of who forgave
|
|
||||||
# what. Do not collapse them into a single commission_pct field. See plan
|
|
||||||
# section "Customer discounts".
|
|
||||||
await db.execute(
|
await db.execute(
|
||||||
f"""
|
f"""
|
||||||
CREATE TABLE satoshimachine.dca_settlements (
|
CREATE TABLE satoshimachine.dca_settlements (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
machine_id TEXT NOT NULL,
|
machine_id TEXT NOT NULL,
|
||||||
payment_hash TEXT NOT NULL UNIQUE,
|
bitspire_event_id TEXT NOT NULL UNIQUE,
|
||||||
bitspire_event_id TEXT,
|
|
||||||
bitspire_txid TEXT,
|
bitspire_txid TEXT,
|
||||||
|
payment_hash TEXT NOT NULL,
|
||||||
gross_sats BIGINT NOT NULL,
|
gross_sats BIGINT NOT NULL,
|
||||||
fiat_amount DECIMAL(10,2) NOT NULL,
|
fiat_amount DECIMAL(10,2) NOT NULL,
|
||||||
fiat_code TEXT NOT NULL DEFAULT 'GTQ',
|
fiat_code TEXT NOT NULL DEFAULT 'GTQ',
|
||||||
|
|
@ -335,7 +327,10 @@ async def m005_satmachine_v2_overhaul(db):
|
||||||
"CREATE INDEX dca_settlements_machine_idx "
|
"CREATE INDEX dca_settlements_machine_idx "
|
||||||
"ON satoshimachine.dca_settlements (machine_id, created_at DESC)"
|
"ON satoshimachine.dca_settlements (machine_id, created_at DESC)"
|
||||||
)
|
)
|
||||||
# payment_hash UNIQUE already creates a lookup index — no extra index needed.
|
await db.execute(
|
||||||
|
"CREATE INDEX dca_settlements_payment_hash_idx "
|
||||||
|
"ON satoshimachine.dca_settlements (payment_hash)"
|
||||||
|
)
|
||||||
|
|
||||||
# dca_commission_splits — operator's rules for distributing the *remainder*
|
# dca_commission_splits — operator's rules for distributing the *remainder*
|
||||||
# of each commission (commission_sats - platform_fee_sats). One row per leg.
|
# of each commission (commission_sats - platform_fee_sats). One row per leg.
|
||||||
|
|
@ -428,51 +423,3 @@ async def m005_satmachine_v2_overhaul(db):
|
||||||
);
|
);
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def m006_add_settlement_notes(db):
|
|
||||||
"""Audit memo on dca_settlements.
|
|
||||||
|
|
||||||
When an operator triggers an in-place adjustment (partial-dispense,
|
|
||||||
manual reconciliation override, etc.), the settlement row's monetary
|
|
||||||
fields are overwritten with the new numbers. To preserve the audit
|
|
||||||
trail without a separate history table, we append a timestamped memo
|
|
||||||
to this notes column capturing the previous values and the reason.
|
|
||||||
|
|
||||||
Operators see this directly in the settlement detail view, so any
|
|
||||||
overwrite is visible and dated. Append-only convention: new memos
|
|
||||||
are prepended with a timestamp; never edited in place.
|
|
||||||
"""
|
|
||||||
await db.execute(
|
|
||||||
"ALTER TABLE satoshimachine.dca_settlements ADD COLUMN notes TEXT"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def m007_settlement_claim_and_machine_wallet_unique(db):
|
|
||||||
"""Security + concurrency hardening (fix bundle 1).
|
|
||||||
|
|
||||||
1. Adds `processing_claim` to dca_settlements. The settlement processor
|
|
||||||
uses an optimistic-lock pattern: write a per-invocation claim token
|
|
||||||
alongside the status='processing' flip, then re-read and confirm the
|
|
||||||
persisted token matches. Two concurrent process_settlement invocations
|
|
||||||
on the same id can't both win the claim, so no duplicate leg
|
|
||||||
creation / double-pay.
|
|
||||||
|
|
||||||
2. Adds a UNIQUE index on dca_machines.wallet_id so two machine rows
|
|
||||||
can never claim the same wallet. Closes a wallet-IDOR funds-theft
|
|
||||||
vector where operator A could register a machine on operator B's
|
|
||||||
wallet_id and drain it via the settlement processor's pay_invoice.
|
|
||||||
Defence-in-depth on top of the API-layer ownership check; if a future
|
|
||||||
endpoint forgets the check, the DB still rejects.
|
|
||||||
|
|
||||||
CREATE UNIQUE INDEX is portable across SQLite and PostgreSQL
|
|
||||||
(ALTER TABLE ADD CONSTRAINT is not on SQLite).
|
|
||||||
"""
|
|
||||||
await db.execute(
|
|
||||||
"ALTER TABLE satoshimachine.dca_settlements "
|
|
||||||
"ADD COLUMN processing_claim TEXT"
|
|
||||||
)
|
|
||||||
await db.execute(
|
|
||||||
"CREATE UNIQUE INDEX dca_machines_wallet_id_uq "
|
|
||||||
"ON satoshimachine.dca_machines (wallet_id)"
|
|
||||||
)
|
|
||||||
|
|
|
||||||
70
models.py
70
models.py
|
|
@ -185,9 +185,9 @@ class UpdateDepositStatusData(BaseModel):
|
||||||
|
|
||||||
class CreateDcaSettlementData(BaseModel):
|
class CreateDcaSettlementData(BaseModel):
|
||||||
machine_id: str
|
machine_id: str
|
||||||
payment_hash: str # the idempotency key (UNIQUE in the dca_settlements table)
|
bitspire_event_id: str # nostr event id — the idempotency key
|
||||||
bitspire_event_id: Optional[str] = None # reserved for direct-Nostr ingestion
|
|
||||||
bitspire_txid: Optional[str] = None
|
bitspire_txid: Optional[str] = None
|
||||||
|
payment_hash: str
|
||||||
gross_sats: int
|
gross_sats: int
|
||||||
fiat_amount: float
|
fiat_amount: float
|
||||||
fiat_code: str = "GTQ"
|
fiat_code: str = "GTQ"
|
||||||
|
|
@ -205,9 +205,9 @@ class CreateDcaSettlementData(BaseModel):
|
||||||
class DcaSettlement(BaseModel):
|
class DcaSettlement(BaseModel):
|
||||||
id: str
|
id: str
|
||||||
machine_id: str
|
machine_id: str
|
||||||
payment_hash: str
|
bitspire_event_id: str
|
||||||
bitspire_event_id: Optional[str]
|
|
||||||
bitspire_txid: Optional[str]
|
bitspire_txid: Optional[str]
|
||||||
|
payment_hash: str
|
||||||
gross_sats: int
|
gross_sats: int
|
||||||
fiat_amount: float
|
fiat_amount: float
|
||||||
fiat_code: str
|
fiat_code: str
|
||||||
|
|
@ -224,16 +224,6 @@ class DcaSettlement(BaseModel):
|
||||||
error_message: Optional[str]
|
error_message: Optional[str]
|
||||||
processed_at: Optional[datetime]
|
processed_at: Optional[datetime]
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
# Append-only audit memo. Populated when an operator triggers an in-place
|
|
||||||
# adjustment (partial-dispense, manual reconciliation override). Each
|
|
||||||
# entry timestamped + records original values so the overwrite is
|
|
||||||
# auditable from the settlement detail view alone. Never edited in place.
|
|
||||||
notes: Optional[str] = None
|
|
||||||
# Optimistic-lock claim token written when status flips to 'processing'.
|
|
||||||
# Two concurrent process_settlement invocations can't both win the claim
|
|
||||||
# (only one matching read-back). Cleared back to NULL when the leg-
|
|
||||||
# writing pass completes (status='processed' or 'errored').
|
|
||||||
processing_claim: Optional[str] = None
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
@ -407,60 +397,18 @@ class PartialDispenseData(BaseModel):
|
||||||
return v
|
return v
|
||||||
|
|
||||||
|
|
||||||
class AppendSettlementNoteData(BaseModel):
|
|
||||||
"""Operator-authored free-form note on a settlement.
|
|
||||||
|
|
||||||
Notes are prepended (newest first) to the settlement's `notes` column,
|
|
||||||
with a UTC timestamp and the author's user id so each entry is
|
|
||||||
accountable. Useful for cash-drawer reconciliation context, off-the-
|
|
||||||
record refund records, or any narrative an operator wants to attach
|
|
||||||
for future reference.
|
|
||||||
"""
|
|
||||||
|
|
||||||
note: str
|
|
||||||
|
|
||||||
@validator("note")
|
|
||||||
def non_empty(cls, v):
|
|
||||||
v = v.strip() if isinstance(v, str) else v
|
|
||||||
if not v:
|
|
||||||
raise ValueError("note cannot be empty")
|
|
||||||
if len(v) > 2000:
|
|
||||||
raise ValueError("note too long (max 2000 chars)")
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
class SettleBalanceData(BaseModel):
|
class SettleBalanceData(BaseModel):
|
||||||
"""Resolves satmachineadmin#4 — operator settles small remaining LP balance
|
"""Resolves satmachineadmin#4 — operator settles small remaining LP balance
|
||||||
from their own wallet at a specified exchange rate.
|
from their own wallet at the current exchange rate."""
|
||||||
|
|
||||||
Use case: an LP has a small remaining fiat balance (e.g. 47 GTQ) that
|
|
||||||
keeps shrinking proportionally on each new transaction (Zeno's paradox).
|
|
||||||
Operator hits 'Settle', specifies the exchange rate they're willing to
|
|
||||||
honor, and the system pays out the remaining balance in sats from the
|
|
||||||
operator's wallet. The LP's balance goes to zero; settlement legs count
|
|
||||||
against the LP's balance summary alongside DCA legs.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
client_id: str
|
||||||
funding_wallet_id: str
|
funding_wallet_id: str
|
||||||
# The exchange rate the operator is settling at (sats per 1 fiat unit).
|
# If None, settle the full remaining balance.
|
||||||
# Operator picks the rate so they can use exchange spot, a market
|
|
||||||
# midpoint, or a favorable rate as a gesture. Required and explicit so
|
|
||||||
# there's no ambiguity about what rate was used.
|
|
||||||
exchange_rate: float
|
|
||||||
# If None, settle the LP's full remaining balance. Else partial.
|
|
||||||
amount_fiat: Optional[float] = None
|
amount_fiat: Optional[float] = None
|
||||||
notes: Optional[str] = None
|
notes: Optional[str] = None
|
||||||
|
|
||||||
@validator("exchange_rate")
|
|
||||||
def positive_rate(cls, v):
|
|
||||||
if v is None or v <= 0:
|
|
||||||
raise ValueError("exchange_rate must be > 0 (sats per fiat unit)")
|
|
||||||
return float(v)
|
|
||||||
|
|
||||||
@validator("amount_fiat")
|
@validator("amount_fiat")
|
||||||
def round_amount(cls, v):
|
def round_amount(cls, v):
|
||||||
if v is None:
|
if v is not None:
|
||||||
return v
|
|
||||||
if v <= 0:
|
|
||||||
raise ValueError("amount_fiat must be > 0 if specified")
|
|
||||||
return round(float(v), 2)
|
return round(float(v), 2)
|
||||||
|
return v
|
||||||
|
|
|
||||||
87
tasks.py
87
tasks.py
|
|
@ -1,88 +1,27 @@
|
||||||
# Satoshi Machine v2 — invoice listener (P1).
|
# Satoshi Machine v2 — task placeholders.
|
||||||
#
|
#
|
||||||
# Subscribes to LNbits' invoice dispatcher (register_invoice_listener), then
|
# The v1 SSH/PostgreSQL polling + invoice listener are intentionally absent.
|
||||||
# for each successful inbound payment:
|
# They will be replaced in P1 (Nostr subscription manager: subscribes via
|
||||||
# 1. Checks if wallet_id belongs to an active dca_machines row. If not, skip.
|
# lnbits.core.services.nostr_transport to kind-21000 settlements + kind-30078
|
||||||
# 2. Parses Payment.extra for bitSpire split metadata (post-lamassu-next#44).
|
# beacons + kind-30079 telemetry per registered machine, with auto-reconnect).
|
||||||
# Falls back to machine.fallback_commission_pct if extra is absent.
|
|
||||||
# 3. Computes the two-stage split (super_fee first, operator remainder).
|
|
||||||
# 4. Inserts a dca_settlements row idempotently (keyed by payment_hash).
|
|
||||||
#
|
#
|
||||||
# The actual distribution of sats — paying out the LP DCA legs, the super-fee
|
# These no-op stubs keep __init__.py importable in the interim so the
|
||||||
# leg, and the operator's commission-split legs — happens in a separate
|
# extension can be activated even before P1 lands.
|
||||||
# settlement-processor task (P2). This listener only LANDS the settlement
|
|
||||||
# row; status='pending' tells the processor it still needs to move the money.
|
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from lnbits.core.models import Payment
|
|
||||||
from lnbits.tasks import register_invoice_listener
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from .bitspire import parse_settlement
|
|
||||||
from .crud import (
|
|
||||||
create_settlement_idempotent,
|
|
||||||
get_active_machine_by_wallet_id,
|
|
||||||
get_super_config,
|
|
||||||
)
|
|
||||||
from .distribution import process_settlement
|
|
||||||
|
|
||||||
LISTENER_NAME = "ext_satmachineadmin"
|
|
||||||
|
|
||||||
|
|
||||||
async def wait_for_paid_invoices() -> None:
|
async def wait_for_paid_invoices() -> None:
|
||||||
invoice_queue: asyncio.Queue = asyncio.Queue()
|
"""No-op placeholder pending P1 Nostr subscription manager."""
|
||||||
register_invoice_listener(invoice_queue, LISTENER_NAME)
|
logger.debug(
|
||||||
logger.info(
|
"satmachineadmin v2: invoice listener stub running. "
|
||||||
"satmachineadmin v2: invoice listener registered as "
|
"Real Nostr-transport subscription pending P1."
|
||||||
f"`{LISTENER_NAME}` — waiting for bitSpire settlements."
|
|
||||||
)
|
)
|
||||||
|
# Sleep forever; the task system expects a long-lived coroutine.
|
||||||
while True:
|
while True:
|
||||||
payment: Payment = await invoice_queue.get()
|
await asyncio.sleep(3600)
|
||||||
try:
|
|
||||||
await _handle_payment(payment)
|
|
||||||
except Exception as exc: # listener must never die
|
|
||||||
logger.error(
|
|
||||||
f"satmachineadmin: error handling payment "
|
|
||||||
f"{payment.payment_hash[:12]}...: {exc}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def _handle_payment(payment: Payment) -> None:
|
|
||||||
if not payment.is_in or not payment.success:
|
|
||||||
return
|
|
||||||
machine = await get_active_machine_by_wallet_id(payment.wallet_id)
|
|
||||||
if machine is None:
|
|
||||||
return
|
|
||||||
super_config = await get_super_config()
|
|
||||||
super_fee_pct = float(super_config.super_fee_pct) if super_config else 0.0
|
|
||||||
data, used_fallback = parse_settlement(
|
|
||||||
machine=machine,
|
|
||||||
payment_hash=payment.payment_hash,
|
|
||||||
gross_sats=payment.sat,
|
|
||||||
extra=payment.extra or {},
|
|
||||||
super_fee_pct=super_fee_pct,
|
|
||||||
)
|
|
||||||
settlement = await create_settlement_idempotent(data)
|
|
||||||
if settlement is None:
|
|
||||||
logger.error(
|
|
||||||
f"satmachineadmin: failed to insert settlement for "
|
|
||||||
f"payment_hash={payment.payment_hash[:12]}..."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
fb = " (fallback split)" if used_fallback else ""
|
|
||||||
logger.info(
|
|
||||||
f"satmachineadmin: landed settlement {settlement.id} for "
|
|
||||||
f"machine={machine.machine_npub[:12]}... "
|
|
||||||
f"gross={data.gross_sats}sats net={data.net_sats}sats "
|
|
||||||
f"commission={data.commission_sats}sats "
|
|
||||||
f"(super_fee={data.platform_fee_sats} "
|
|
||||||
f"operator_fee={data.operator_fee_sats}){fb}"
|
|
||||||
)
|
|
||||||
# Trigger distribution synchronously so latency is one bitSpire-tx wide.
|
|
||||||
# process_settlement is idempotent (status='processed' guard); if this
|
|
||||||
# task crashes mid-process, the next manual or scheduled retry resumes.
|
|
||||||
await process_settlement(settlement.id)
|
|
||||||
|
|
||||||
|
|
||||||
async def hourly_transaction_polling() -> None:
|
async def hourly_transaction_polling() -> None:
|
||||||
|
|
|
||||||
|
|
@ -1,144 +0,0 @@
|
||||||
"""
|
|
||||||
Tests for the v2 two-stage commission split (super first, operator remainder).
|
|
||||||
|
|
||||||
The plan calls out a verification scenario explicitly:
|
|
||||||
super_fee_pct=30%, operator split 50/30/20 on a 100-sat commission
|
|
||||||
→ super_wallet gets 30, operator_self gets 35, employee 21, maint 14.
|
|
||||||
|
|
||||||
Also covers the edge cases: super_fee_pct=0 (no super), super_fee_pct=1.0
|
|
||||||
(everything to super), single-leg operator ruleset, zero operator fee.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from ..calculations import (
|
|
||||||
allocate_operator_split_legs,
|
|
||||||
split_two_stage_commission,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TestSplitTwoStageCommission:
|
|
||||||
"""Stage-1: super takes super_fee_pct of commission; operator gets rest."""
|
|
||||||
|
|
||||||
def test_plan_example_100sats_30pct(self):
|
|
||||||
platform, operator = split_two_stage_commission(100, 0.30)
|
|
||||||
assert platform == 30
|
|
||||||
assert operator == 70
|
|
||||||
assert platform + operator == 100
|
|
||||||
|
|
||||||
def test_realistic_7965sats_30pct(self):
|
|
||||||
# From the plan's 2000 GTQ → 266800 sats @ 3% commission example.
|
|
||||||
platform, operator = split_two_stage_commission(7965, 0.30)
|
|
||||||
assert platform == 2390 # round(7965 * 0.30) = 2389.5 → 2390
|
|
||||||
assert operator == 5575 # 7965 - 2390
|
|
||||||
assert platform + operator == 7965
|
|
||||||
|
|
||||||
def test_super_pct_zero_leaves_all_to_operator(self):
|
|
||||||
platform, operator = split_two_stage_commission(7965, 0.0)
|
|
||||||
assert platform == 0
|
|
||||||
assert operator == 7965
|
|
||||||
|
|
||||||
def test_super_pct_one_takes_everything(self):
|
|
||||||
platform, operator = split_two_stage_commission(7965, 1.0)
|
|
||||||
assert platform == 7965
|
|
||||||
assert operator == 0
|
|
||||||
|
|
||||||
def test_zero_commission(self):
|
|
||||||
platform, operator = split_two_stage_commission(0, 0.30)
|
|
||||||
assert platform == 0
|
|
||||||
assert operator == 0
|
|
||||||
|
|
||||||
def test_negative_commission_clamps_to_zero(self):
|
|
||||||
# Defensive: should never happen, but verify we don't go negative.
|
|
||||||
platform, operator = split_two_stage_commission(-100, 0.30)
|
|
||||||
assert platform == 0
|
|
||||||
assert operator == 0
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("commission_sats", [1, 7, 100, 7965, 1_000_000])
|
|
||||||
@pytest.mark.parametrize("super_pct", [0.0, 0.1, 0.30, 0.5, 0.777, 1.0])
|
|
||||||
def test_invariant_sum_equals_commission(self, commission_sats, super_pct):
|
|
||||||
platform, operator = split_two_stage_commission(commission_sats, super_pct)
|
|
||||||
assert platform + operator == commission_sats
|
|
||||||
assert 0 <= platform <= commission_sats
|
|
||||||
assert 0 <= operator <= commission_sats
|
|
||||||
|
|
||||||
|
|
||||||
class TestAllocateOperatorSplitLegs:
|
|
||||||
"""Stage-2: operator's remainder split across N leg wallets per pct rules."""
|
|
||||||
|
|
||||||
def test_plan_example_50_30_20_on_70(self):
|
|
||||||
amounts = allocate_operator_split_legs(70, [0.5, 0.3, 0.2])
|
|
||||||
assert amounts == [35, 21, 14]
|
|
||||||
assert sum(amounts) == 70
|
|
||||||
|
|
||||||
def test_realistic_50_30_20_on_5575(self):
|
|
||||||
amounts = allocate_operator_split_legs(5575, [0.5, 0.3, 0.2])
|
|
||||||
# 50%: round(2787.5) = 2788; 30%: round(1672.5) = 1672; last absorbs
|
|
||||||
# remainder: 5575 - 2788 - 1672 = 1115.
|
|
||||||
# Note: round() uses banker's rounding so 2787.5 → 2788 actually
|
|
||||||
# because 2788 is even. Confirm by total invariant.
|
|
||||||
assert sum(amounts) == 5575
|
|
||||||
assert len(amounts) == 3
|
|
||||||
|
|
||||||
def test_single_leg_full_remainder(self):
|
|
||||||
amounts = allocate_operator_split_legs(100, [1.0])
|
|
||||||
assert amounts == [100]
|
|
||||||
|
|
||||||
def test_zero_operator_fee_zeros_all_legs(self):
|
|
||||||
amounts = allocate_operator_split_legs(0, [0.5, 0.5])
|
|
||||||
assert amounts == [0, 0]
|
|
||||||
|
|
||||||
def test_empty_legs_list_returns_empty(self):
|
|
||||||
amounts = allocate_operator_split_legs(100, [])
|
|
||||||
assert amounts == []
|
|
||||||
|
|
||||||
def test_last_leg_absorbs_rounding_remainder(self):
|
|
||||||
# 100 / 3 ≈ 33.33 each; rounding makes the first two 33 and last 34.
|
|
||||||
amounts = allocate_operator_split_legs(100, [1 / 3, 1 / 3, 1 / 3])
|
|
||||||
assert sum(amounts) == 100
|
|
||||||
assert amounts[0] == round(100 / 3) # 33
|
|
||||||
assert amounts[1] == round(100 / 3) # 33
|
|
||||||
# Last leg absorbs the rounding (34, not 33) so total == 100.
|
|
||||||
assert amounts[2] == 100 - amounts[0] - amounts[1]
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"operator_fee,pcts",
|
|
||||||
[
|
|
||||||
(1, [0.5, 0.5]),
|
|
||||||
(7, [0.5, 0.3, 0.2]),
|
|
||||||
(100, [0.5, 0.5]),
|
|
||||||
(5575, [0.5, 0.3, 0.2]),
|
|
||||||
(1_000_000, [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_invariant_sum_equals_operator_fee(self, operator_fee, pcts):
|
|
||||||
amounts = allocate_operator_split_legs(operator_fee, pcts)
|
|
||||||
assert sum(amounts) == operator_fee
|
|
||||||
assert all(a >= 0 for a in amounts)
|
|
||||||
|
|
||||||
|
|
||||||
class TestEndToEndScenarios:
|
|
||||||
"""The full two-stage split — super then operator legs — composed."""
|
|
||||||
|
|
||||||
def test_plan_example_full(self):
|
|
||||||
# 100 sats commission, super=30%, operator splits 50/30/20.
|
|
||||||
platform, operator = split_two_stage_commission(100, 0.30)
|
|
||||||
legs = allocate_operator_split_legs(operator, [0.5, 0.3, 0.2])
|
|
||||||
assert platform == 30
|
|
||||||
assert legs == [35, 21, 14]
|
|
||||||
assert platform + sum(legs) == 100
|
|
||||||
|
|
||||||
def test_super_pct_zero_full_pipeline(self):
|
|
||||||
platform, operator = split_two_stage_commission(7965, 0.0)
|
|
||||||
legs = allocate_operator_split_legs(operator, [1.0])
|
|
||||||
assert platform == 0
|
|
||||||
assert legs == [7965]
|
|
||||||
assert platform + sum(legs) == 7965
|
|
||||||
|
|
||||||
def test_super_pct_one_full_pipeline(self):
|
|
||||||
platform, operator = split_two_stage_commission(7965, 1.0)
|
|
||||||
legs = allocate_operator_split_legs(operator, [0.5, 0.5])
|
|
||||||
assert platform == 7965
|
|
||||||
# Operator has zero to distribute; both legs get zero.
|
|
||||||
assert legs == [0, 0]
|
|
||||||
assert platform + sum(legs) == 7965
|
|
||||||
664
views_api.py
664
views_api.py
|
|
@ -1,661 +1,20 @@
|
||||||
# Satoshi Machine v2 — operator API surface (P1b).
|
# Satoshi Machine v2 — API placeholder.
|
||||||
#
|
#
|
||||||
# All endpoints are operator-scoped via check_user_exists. Every query
|
# The v1 super-only Lamassu endpoints have been removed. The v2 operator-
|
||||||
# filters by the authenticated user's id so two operators on the same
|
# scoped surface (machines / clients / deposits / settlements / commission
|
||||||
# LNbits instance can never see each other's machines, settlements, or
|
# splits / partial-tx / balance-settle / super platform-fee) lands in P1+.
|
||||||
# clients. The super-only platform-fee write endpoint lands in P2.
|
# See plan section "Critical files to modify".
|
||||||
|
#
|
||||||
|
# This stub keeps __init__.py importable and surfaces a clear 503 on every
|
||||||
|
# v1 route so existing clients get a precise error instead of a silent 404.
|
||||||
|
|
||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, HTTPException
|
||||||
from lnbits.core.crud import get_wallet
|
|
||||||
from lnbits.core.models import User
|
|
||||||
from lnbits.decorators import check_super_user, check_user_exists
|
|
||||||
|
|
||||||
from .crud import (
|
|
||||||
append_settlement_note,
|
|
||||||
create_dca_client,
|
|
||||||
create_deposit,
|
|
||||||
create_machine,
|
|
||||||
delete_dca_client,
|
|
||||||
delete_deposit,
|
|
||||||
delete_machine,
|
|
||||||
get_client_balance_summary,
|
|
||||||
get_commission_splits,
|
|
||||||
get_dca_client,
|
|
||||||
get_dca_clients_for_machine,
|
|
||||||
get_dca_clients_for_operator,
|
|
||||||
get_deposit,
|
|
||||||
get_deposits_for_client,
|
|
||||||
get_deposits_for_operator,
|
|
||||||
get_effective_commission_splits,
|
|
||||||
get_machine,
|
|
||||||
get_machines_for_operator,
|
|
||||||
get_payments_for_operator,
|
|
||||||
get_settlement,
|
|
||||||
get_settlements_for_machine,
|
|
||||||
get_settlements_for_operator,
|
|
||||||
get_super_config,
|
|
||||||
replace_commission_splits,
|
|
||||||
reset_settlement_for_retry,
|
|
||||||
update_dca_client,
|
|
||||||
update_deposit,
|
|
||||||
update_deposit_status,
|
|
||||||
update_machine,
|
|
||||||
update_super_config,
|
|
||||||
)
|
|
||||||
from .distribution import (
|
|
||||||
apply_partial_dispense_and_redistribute,
|
|
||||||
process_settlement,
|
|
||||||
settle_lp_balance,
|
|
||||||
)
|
|
||||||
from .models import (
|
|
||||||
AppendSettlementNoteData,
|
|
||||||
ClientBalanceSummary,
|
|
||||||
CommissionSplit,
|
|
||||||
CreateDcaClientData,
|
|
||||||
CreateDepositData,
|
|
||||||
CreateMachineData,
|
|
||||||
DcaClient,
|
|
||||||
DcaDeposit,
|
|
||||||
DcaPayment,
|
|
||||||
DcaSettlement,
|
|
||||||
Machine,
|
|
||||||
PartialDispenseData,
|
|
||||||
SetCommissionSplitsData,
|
|
||||||
SettleBalanceData,
|
|
||||||
SuperConfig,
|
|
||||||
UpdateDcaClientData,
|
|
||||||
UpdateDepositData,
|
|
||||||
UpdateDepositStatusData,
|
|
||||||
UpdateMachineData,
|
|
||||||
UpdateSuperConfigData,
|
|
||||||
)
|
|
||||||
|
|
||||||
satmachineadmin_api_router = APIRouter()
|
satmachineadmin_api_router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
async def _assert_wallet_owned_by(wallet_id: str, user_id: str) -> None:
|
|
||||||
"""Defence-in-depth: refuse to bind any DB row to a wallet the caller
|
|
||||||
doesn't own. Used on every endpoint that accepts a wallet_id from the
|
|
||||||
request body. The DB-side UNIQUE on dca_machines.wallet_id (m007) is a
|
|
||||||
second line of defence; this check is the primary gate."""
|
|
||||||
wallet = await get_wallet(wallet_id)
|
|
||||||
if wallet is None or wallet.user != user_id:
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.BAD_REQUEST,
|
|
||||||
"wallet_id is not owned by the authenticated operator",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Machines
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.post("/api/v1/dca/machines", response_model=Machine)
|
|
||||||
async def api_create_machine(
|
|
||||||
data: CreateMachineData, user: User = Depends(check_user_exists)
|
|
||||||
) -> Machine:
|
|
||||||
await _assert_wallet_owned_by(data.wallet_id, user.id)
|
|
||||||
return await create_machine(user.id, data)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/machines", response_model=list[Machine]
|
|
||||||
)
|
|
||||||
async def api_list_machines(
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> list[Machine]:
|
|
||||||
return await get_machines_for_operator(user.id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/machines/{machine_id}", response_model=Machine
|
|
||||||
)
|
|
||||||
async def api_get_machine(
|
|
||||||
machine_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> Machine:
|
|
||||||
machine = await get_machine(machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
|
||||||
return machine
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.put(
|
|
||||||
"/api/v1/dca/machines/{machine_id}", response_model=Machine
|
|
||||||
)
|
|
||||||
async def api_update_machine(
|
|
||||||
machine_id: str,
|
|
||||||
data: UpdateMachineData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> Machine:
|
|
||||||
machine = await get_machine(machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
|
||||||
if data.wallet_id is not None:
|
|
||||||
await _assert_wallet_owned_by(data.wallet_id, user.id)
|
|
||||||
updated = await update_machine(machine_id, data)
|
|
||||||
if updated is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
|
||||||
return updated
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.delete(
|
|
||||||
"/api/v1/dca/machines/{machine_id}", status_code=HTTPStatus.NO_CONTENT
|
|
||||||
)
|
|
||||||
async def api_delete_machine(
|
|
||||||
machine_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> None:
|
|
||||||
machine = await get_machine(machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
|
||||||
await delete_machine(machine_id)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# DCA Clients (LPs) — scoped per (machine, user).
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
async def _machine_owned_by(machine_id: str, user_id: str) -> Machine:
|
|
||||||
"""Lookup-with-ownership guard. 404 (not 403) so operators can't probe
|
|
||||||
for other operators' machines."""
|
|
||||||
machine = await get_machine(machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user_id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
|
||||||
return machine
|
|
||||||
|
|
||||||
|
|
||||||
async def _client_owned_by(client_id: str, user_id: str) -> DcaClient:
|
|
||||||
"""Lookup-with-ownership guard for an LP record; ownership is checked
|
|
||||||
transitively via the client's machine. 404 if either doesn't match."""
|
|
||||||
client = await get_dca_client(client_id)
|
|
||||||
if client is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
|
|
||||||
machine = await get_machine(client.machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user_id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
|
|
||||||
return client
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.post(
|
|
||||||
"/api/v1/dca/clients", response_model=DcaClient
|
|
||||||
)
|
|
||||||
async def api_create_client(
|
|
||||||
data: CreateDcaClientData, user: User = Depends(check_user_exists)
|
|
||||||
) -> DcaClient:
|
|
||||||
# Operator can only register LPs on machines they own.
|
|
||||||
await _machine_owned_by(data.machine_id, user.id)
|
|
||||||
return await create_dca_client(data)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/clients", response_model=list[DcaClient]
|
|
||||||
)
|
|
||||||
async def api_list_clients(
|
|
||||||
machine_id: str | None = None,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> list[DcaClient]:
|
|
||||||
"""List the operator's LPs. Without ?machine_id, returns all LPs across
|
|
||||||
the operator's fleet. With ?machine_id, scoped to that machine (with
|
|
||||||
ownership check)."""
|
|
||||||
if machine_id is None:
|
|
||||||
return await get_dca_clients_for_operator(user.id)
|
|
||||||
await _machine_owned_by(machine_id, user.id)
|
|
||||||
return await get_dca_clients_for_machine(machine_id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/clients/{client_id}", response_model=DcaClient
|
|
||||||
)
|
|
||||||
async def api_get_client(
|
|
||||||
client_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> DcaClient:
|
|
||||||
return await _client_owned_by(client_id, user.id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.put(
|
|
||||||
"/api/v1/dca/clients/{client_id}", response_model=DcaClient
|
|
||||||
)
|
|
||||||
async def api_update_client(
|
|
||||||
client_id: str,
|
|
||||||
data: UpdateDcaClientData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> DcaClient:
|
|
||||||
await _client_owned_by(client_id, user.id)
|
|
||||||
updated = await update_dca_client(client_id, data)
|
|
||||||
if updated is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
|
|
||||||
return updated
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.delete(
|
|
||||||
"/api/v1/dca/clients/{client_id}", status_code=HTTPStatus.NO_CONTENT
|
|
||||||
)
|
|
||||||
async def api_delete_client(
|
|
||||||
client_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> None:
|
|
||||||
await _client_owned_by(client_id, user.id)
|
|
||||||
await delete_dca_client(client_id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/clients/{client_id}/balance",
|
|
||||||
response_model=ClientBalanceSummary,
|
|
||||||
)
|
|
||||||
async def api_get_client_balance(
|
|
||||||
client_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> ClientBalanceSummary:
|
|
||||||
await _client_owned_by(client_id, user.id)
|
|
||||||
summary = await get_client_balance_summary(client_id)
|
|
||||||
if summary is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Client not found")
|
|
||||||
return summary
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.post(
|
|
||||||
"/api/v1/dca/clients/{client_id}/settle", response_model=DcaPayment
|
|
||||||
)
|
|
||||||
async def api_settle_client_balance(
|
|
||||||
client_id: str,
|
|
||||||
data: SettleBalanceData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> DcaPayment:
|
|
||||||
"""Operator UX — closes satmachineadmin#4.
|
|
||||||
|
|
||||||
Settle an LP's remaining fiat balance from the operator's chosen funding
|
|
||||||
wallet at the specified exchange rate. The amount_fiat is capped at the
|
|
||||||
LP's remaining balance; if omitted, settles the full remaining.
|
|
||||||
|
|
||||||
Use case: avoid the Zeno's-paradox of vanishing tiny shares for small
|
|
||||||
remaining balances. Operator hits 'Settle' on the LP, gets to specify
|
|
||||||
the rate, and the system pays out the rest in sats from their wallet.
|
|
||||||
"""
|
|
||||||
client = await _client_owned_by(client_id, user.id)
|
|
||||||
machine = await _machine_owned_by(client.machine_id, user.id)
|
|
||||||
# Verify the operator owns the funding wallet.
|
|
||||||
funding_wallet = await get_wallet(data.funding_wallet_id)
|
|
||||||
if funding_wallet is None or funding_wallet.user != user.id:
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.BAD_REQUEST,
|
|
||||||
"funding_wallet_id is not owned by the authenticated operator",
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
return await settle_lp_balance(client, machine, data)
|
|
||||||
except ValueError as exc:
|
|
||||||
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Deposits — operator records fiat handed in by an LP at a machine.
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
async def _deposit_owned_by(deposit_id: str, user_id: str) -> DcaDeposit:
|
|
||||||
deposit = await get_deposit(deposit_id)
|
|
||||||
if deposit is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
|
|
||||||
machine = await get_machine(deposit.machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user_id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
|
|
||||||
return deposit
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.post(
|
|
||||||
"/api/v1/dca/deposits", response_model=DcaDeposit
|
|
||||||
)
|
|
||||||
async def api_create_deposit(
|
|
||||||
data: CreateDepositData, user: User = Depends(check_user_exists)
|
|
||||||
) -> DcaDeposit:
|
|
||||||
# Verify the (client_id, machine_id) pair belongs to the operator.
|
|
||||||
client = await _client_owned_by(data.client_id, user.id)
|
|
||||||
if client.machine_id != data.machine_id:
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.BAD_REQUEST,
|
|
||||||
"client_id and machine_id refer to different machines",
|
|
||||||
)
|
|
||||||
return await create_deposit(user.id, data)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/deposits", response_model=list[DcaDeposit]
|
|
||||||
)
|
|
||||||
async def api_list_deposits(
|
|
||||||
client_id: str | None = None,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> list[DcaDeposit]:
|
|
||||||
"""Operator's deposits across all their machines; ?client_id scopes to
|
|
||||||
a single LP (with ownership check)."""
|
|
||||||
if client_id is not None:
|
|
||||||
await _client_owned_by(client_id, user.id)
|
|
||||||
return await get_deposits_for_client(client_id)
|
|
||||||
return await get_deposits_for_operator(user.id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit
|
|
||||||
)
|
|
||||||
async def api_get_deposit(
|
|
||||||
deposit_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> DcaDeposit:
|
|
||||||
return await _deposit_owned_by(deposit_id, user.id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.put(
|
|
||||||
"/api/v1/dca/deposits/{deposit_id}", response_model=DcaDeposit
|
|
||||||
)
|
|
||||||
async def api_update_deposit(
|
|
||||||
deposit_id: str,
|
|
||||||
data: UpdateDepositData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> DcaDeposit:
|
|
||||||
existing = await _deposit_owned_by(deposit_id, user.id)
|
|
||||||
if existing.status != "pending":
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.BAD_REQUEST,
|
|
||||||
"Only pending deposits can be edited",
|
|
||||||
)
|
|
||||||
updated = await update_deposit(deposit_id, data)
|
|
||||||
if updated is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
|
|
||||||
return updated
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.put(
|
|
||||||
"/api/v1/dca/deposits/{deposit_id}/status", response_model=DcaDeposit
|
|
||||||
)
|
|
||||||
async def api_update_deposit_status(
|
|
||||||
deposit_id: str,
|
|
||||||
data: UpdateDepositStatusData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> DcaDeposit:
|
|
||||||
await _deposit_owned_by(deposit_id, user.id)
|
|
||||||
updated = await update_deposit_status(deposit_id, data)
|
|
||||||
if updated is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Deposit not found")
|
|
||||||
return updated
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.delete(
|
|
||||||
"/api/v1/dca/deposits/{deposit_id}", status_code=HTTPStatus.NO_CONTENT
|
|
||||||
)
|
|
||||||
async def api_delete_deposit(
|
|
||||||
deposit_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> None:
|
|
||||||
existing = await _deposit_owned_by(deposit_id, user.id)
|
|
||||||
if existing.status != "pending":
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.BAD_REQUEST,
|
|
||||||
"Only pending deposits can be deleted",
|
|
||||||
)
|
|
||||||
await delete_deposit(deposit_id)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Settlements (read-only at this phase; landing happens in tasks.py)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/settlements", response_model=list[DcaSettlement]
|
|
||||||
)
|
|
||||||
async def api_list_settlements(
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> list[DcaSettlement]:
|
|
||||||
return await get_settlements_for_operator(user.id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/machines/{machine_id}/settlements",
|
|
||||||
response_model=list[DcaSettlement],
|
|
||||||
)
|
|
||||||
async def api_list_settlements_for_machine(
|
|
||||||
machine_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> list[DcaSettlement]:
|
|
||||||
machine = await get_machine(machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Machine not found")
|
|
||||||
return await get_settlements_for_machine(machine_id)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/settlements/{settlement_id}", response_model=DcaSettlement
|
|
||||||
)
|
|
||||||
async def api_get_settlement(
|
|
||||||
settlement_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> DcaSettlement:
|
|
||||||
settlement = await get_settlement(settlement_id)
|
|
||||||
if settlement is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
machine = await get_machine(settlement.machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
return settlement
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.post(
|
|
||||||
"/api/v1/dca/settlements/{settlement_id}/partial-dispense",
|
|
||||||
response_model=DcaSettlement,
|
|
||||||
)
|
|
||||||
async def api_partial_dispense(
|
|
||||||
settlement_id: str,
|
|
||||||
data: PartialDispenseData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> DcaSettlement:
|
|
||||||
"""Operator UX — resolves satmachineadmin#3.
|
|
||||||
|
|
||||||
Recompute the split for a settlement that didn't dispense the full
|
|
||||||
amount (jam, mid-tx error). Provide one of dispensed_fraction (0..1)
|
|
||||||
or dispensed_sats. Optionally include a reason in notes.
|
|
||||||
|
|
||||||
Refuses when any leg has already completed — Lightning payments can't
|
|
||||||
be clawed back. Use balance settlement (P3e) for those cases.
|
|
||||||
"""
|
|
||||||
settlement = await get_settlement(settlement_id)
|
|
||||||
if settlement is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
machine = await get_machine(settlement.machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
if (data.dispensed_fraction is None) == (data.dispensed_sats is None):
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.BAD_REQUEST,
|
|
||||||
"Provide exactly one of dispensed_fraction or dispensed_sats",
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
return await apply_partial_dispense_and_redistribute(settlement_id, data)
|
|
||||||
except ValueError as exc:
|
|
||||||
raise HTTPException(HTTPStatus.BAD_REQUEST, str(exc)) from exc
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.post(
|
|
||||||
"/api/v1/dca/settlements/{settlement_id}/retry",
|
|
||||||
response_model=DcaSettlement,
|
|
||||||
)
|
|
||||||
async def api_retry_settlement(
|
|
||||||
settlement_id: str, user: User = Depends(check_user_exists)
|
|
||||||
) -> DcaSettlement:
|
|
||||||
"""Operator retry path for an errored settlement.
|
|
||||||
|
|
||||||
Voids any failed legs (completed legs are NEVER re-paid — Lightning
|
|
||||||
sats already moved) and flips status 'errored' → 'pending', then
|
|
||||||
re-invokes process_settlement. The optimistic-lock claim guards
|
|
||||||
against a concurrent listener re-fire racing this retry."""
|
|
||||||
settlement = await get_settlement(settlement_id)
|
|
||||||
if settlement is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
machine = await get_machine(settlement.machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
if settlement.status != "errored":
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.BAD_REQUEST,
|
|
||||||
f"settlement status must be 'errored' to retry "
|
|
||||||
f"(currently '{settlement.status}')",
|
|
||||||
)
|
|
||||||
updated = await reset_settlement_for_retry(settlement_id)
|
|
||||||
if updated is None or updated.status != "pending":
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.INTERNAL_SERVER_ERROR, "failed to reset settlement"
|
|
||||||
)
|
|
||||||
await process_settlement(settlement_id)
|
|
||||||
after = await get_settlement(settlement_id)
|
|
||||||
return after if after is not None else updated
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.post(
|
|
||||||
"/api/v1/dca/settlements/{settlement_id}/notes",
|
|
||||||
response_model=DcaSettlement,
|
|
||||||
)
|
|
||||||
async def api_append_settlement_note(
|
|
||||||
settlement_id: str,
|
|
||||||
data: AppendSettlementNoteData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> DcaSettlement:
|
|
||||||
"""Operator appends a free-form note to the settlement. Useful for cash-
|
|
||||||
drawer reconciliation context, off-LN refund records, or any narrative
|
|
||||||
an operator wants to attach. Each entry is timestamped (UTC) and tagged
|
|
||||||
with the author's user id; existing entries are never modified.
|
|
||||||
|
|
||||||
For richer queryable audit (filter by author, time range, action type),
|
|
||||||
see aiolabs/satmachineadmin (future audit-table feature)."""
|
|
||||||
settlement = await get_settlement(settlement_id)
|
|
||||||
if settlement is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
machine = await get_machine(settlement.machine_id)
|
|
||||||
if machine is None or machine.operator_user_id != user.id:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
updated = await append_settlement_note(settlement_id, data.note, user.id)
|
|
||||||
if updated is None:
|
|
||||||
raise HTTPException(HTTPStatus.NOT_FOUND, "Settlement not found")
|
|
||||||
return updated
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Payments (read-only — the leg-typed breakdown of distributions)
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/payments", response_model=list[DcaPayment]
|
|
||||||
)
|
|
||||||
async def api_list_payments(
|
|
||||||
leg_type: str | None = None,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> list[DcaPayment]:
|
|
||||||
return await get_payments_for_operator(user.id, leg_type=leg_type)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Commission splits — operator's rules for distributing the commission
|
|
||||||
# remainder (post-super-fee). Sum-to-1.0 invariant enforced at the model
|
|
||||||
# boundary by SetCommissionSplitsData.
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/commission-splits", response_model=list[CommissionSplit]
|
|
||||||
)
|
|
||||||
async def api_get_commission_splits(
|
|
||||||
machine_id: str | None = None,
|
|
||||||
effective: bool = False,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> list[CommissionSplit]:
|
|
||||||
"""No machine_id: operator's default ruleset (rows where machine_id IS NULL).
|
|
||||||
With machine_id: per-machine override only (404 the machine if not yours).
|
|
||||||
With machine_id and ?effective=true: per-machine override if set, else
|
|
||||||
operator default — what the settlement processor actually applies."""
|
|
||||||
if machine_id is not None:
|
|
||||||
await _machine_owned_by(machine_id, user.id)
|
|
||||||
if effective:
|
|
||||||
return await get_effective_commission_splits(user.id, machine_id)
|
|
||||||
return await get_commission_splits(user.id, machine_id)
|
|
||||||
return await get_commission_splits(user.id, None)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.put(
|
|
||||||
"/api/v1/dca/commission-splits", response_model=list[CommissionSplit]
|
|
||||||
)
|
|
||||||
async def api_replace_commission_splits(
|
|
||||||
data: SetCommissionSplitsData,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> list[CommissionSplit]:
|
|
||||||
"""Atomic replace for the (operator, machine) scope. If
|
|
||||||
data.machine_id is None, replaces the operator's default ruleset;
|
|
||||||
otherwise replaces the per-machine override (machine must be owned).
|
|
||||||
Sum-to-1.0 invariant enforced upstream by the Pydantic validator."""
|
|
||||||
if data.machine_id is not None:
|
|
||||||
await _machine_owned_by(data.machine_id, user.id)
|
|
||||||
return await replace_commission_splits(user.id, data.machine_id, data.legs)
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.delete(
|
|
||||||
"/api/v1/dca/commission-splits",
|
|
||||||
status_code=HTTPStatus.NO_CONTENT,
|
|
||||||
)
|
|
||||||
async def api_delete_commission_splits(
|
|
||||||
machine_id: str | None = None,
|
|
||||||
user: User = Depends(check_user_exists),
|
|
||||||
) -> None:
|
|
||||||
"""Clear a ruleset. With machine_id: clears the per-machine override
|
|
||||||
(machine falls back to operator default). Without: clears the operator
|
|
||||||
default (any per-machine overrides keep applying)."""
|
|
||||||
if machine_id is not None:
|
|
||||||
await _machine_owned_by(machine_id, user.id)
|
|
||||||
# Atomic replace with an empty leg list — same effect as DELETE WHERE.
|
|
||||||
await replace_commission_splits(user.id, machine_id, [])
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Super config — operators read; super (LNbits instance admin) writes.
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.get(
|
|
||||||
"/api/v1/dca/super-config", response_model=SuperConfig
|
|
||||||
)
|
|
||||||
async def api_get_super_config(
|
|
||||||
_user: User = Depends(check_user_exists),
|
|
||||||
) -> SuperConfig:
|
|
||||||
"""Returns the platform-fee config so operators can display it as a
|
|
||||||
read-only line item in their UI. The fee is set by the LNbits super
|
|
||||||
instance-wide; operators see it but can't change it."""
|
|
||||||
config = await get_super_config()
|
|
||||||
if config is None:
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.NOT_FOUND, "Super config not initialised"
|
|
||||||
)
|
|
||||||
return config
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.put(
|
|
||||||
"/api/v1/dca/super-config", response_model=SuperConfig
|
|
||||||
)
|
|
||||||
async def api_update_super_config(
|
|
||||||
data: UpdateSuperConfigData,
|
|
||||||
_user: User = Depends(check_super_user),
|
|
||||||
) -> SuperConfig:
|
|
||||||
"""Super-only: set the platform fee % charged on every operator's
|
|
||||||
commission, plus the destination wallet for collecting it. The fee is
|
|
||||||
enforced before the operator's own commission_splits ruleset fires
|
|
||||||
(see distribution.process_settlement)."""
|
|
||||||
config = await update_super_config(data)
|
|
||||||
if config is None:
|
|
||||||
raise HTTPException(
|
|
||||||
HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to update super config"
|
|
||||||
)
|
|
||||||
return config
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Catch-all stub for endpoints not yet implemented (clients, deposits,
|
|
||||||
# commission splits, partial-tx, balance-settle, super-config write). Each
|
|
||||||
# lands in a follow-up commit. The catch-all comes LAST so specific routes
|
|
||||||
# above take precedence.
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
@satmachineadmin_api_router.api_route(
|
@satmachineadmin_api_router.api_route(
|
||||||
"/api/v1/dca/{full_path:path}",
|
"/api/v1/dca/{full_path:path}",
|
||||||
methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
|
methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
|
||||||
|
|
@ -663,6 +22,7 @@ async def api_update_super_config(
|
||||||
async def v2_in_progress_stub(full_path: str) -> None:
|
async def v2_in_progress_stub(full_path: str) -> None:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
HTTPStatus.SERVICE_UNAVAILABLE,
|
HTTPStatus.SERVICE_UNAVAILABLE,
|
||||||
f"satmachineadmin v2: /api/v1/dca/{full_path} not yet implemented "
|
f"satmachineadmin v2 API not yet implemented (path: /{full_path}). "
|
||||||
"(landing in P2+). See aiolabs/satmachineadmin#9.",
|
"The v1 Lamassu surface has been removed; per-operator endpoints "
|
||||||
|
"land in P1. See plan.",
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue