fix: cap DCA allocations when ATM cash exceeds tracked balances

When there's a sync mismatch (more cash in ATM than tracked client
balances), cap each client's allocation to their remaining fiat
balance equivalent in sats. Orphan sats stay in the source wallet.

This prevents over-allocation when deposits haven't been recorded
yet or when there's a timing mismatch between ATM transactions
and balance tracking.

- Detect sync mismatch: total_confirmed_deposits < fiat_amount
- In sync mismatch mode: allocate based on client balance, not tx amount
- Track orphan_sats that couldn't be distributed
- Normal mode unchanged: proportional distribution using calculate_distribution()

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
padreug 2026-01-11 15:54:48 +01:00
parent 49f3670bac
commit 946d5ab8bb

View file

@ -669,15 +669,21 @@ class LamassuTransactionProcessor:
logger.error(f"Error fetching transactions from Lamassu database: {e}")
return []
async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> Dict[str, int]:
"""Calculate how much each Flow Mode client should receive"""
async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> tuple[Dict[str, Any], int]:
"""Calculate how much each Flow Mode client should receive.
Returns:
tuple: (distributions dict, orphan_sats int)
- distributions: {client_id: {fiat_amount, sats_amount, exchange_rate}}
- orphan_sats: sats that couldn't be distributed due to sync mismatch
"""
try:
# Get all active Flow Mode clients
flow_clients = await get_flow_mode_clients()
if not flow_clients:
logger.info("No Flow Mode clients found - skipping distribution")
return {}
return {}, 0
# Extract transaction details - guaranteed clean from data ingestion
crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in
@ -701,10 +707,10 @@ class LamassuTransactionProcessor:
# Validate required fields
if crypto_atoms is None:
logger.error(f"Missing crypto_amount in transaction: {transaction}")
return {}
return {}, 0
if fiat_amount is None:
logger.error(f"Missing fiat_amount in transaction: {transaction}")
return {}
return {}, 0
if commission_percentage is None:
logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0")
commission_percentage = 0.0
@ -750,37 +756,76 @@ class LamassuTransactionProcessor:
if total_confirmed_deposits == 0:
logger.info("No clients with remaining DCA balance - skipping distribution")
return {}
return {}, 0
# Calculate sat allocations using the extracted pure function
sat_allocations = calculate_distribution(base_crypto_atoms, client_balances)
# Detect sync mismatch: more money in ATM than tracked client balances
sync_mismatch = total_confirmed_deposits < fiat_amount
if sync_mismatch:
orphan_fiat = fiat_amount - total_confirmed_deposits
logger.warning(
f"Sync mismatch detected: tracked balances ({total_confirmed_deposits:.2f} GTQ) "
f"< transaction ({fiat_amount} GTQ). Orphan amount: {orphan_fiat:.2f} GTQ"
)
if not sat_allocations:
logger.info("No allocations calculated - skipping distribution")
return {}
# Build final distributions dict with additional tracking fields
# Calculate distribution amounts
distributions = {}
for client_id, client_sats_amount in sat_allocations.items():
# Calculate proportion for logging
proportion = client_balances[client_id] / total_confirmed_deposits
# Calculate equivalent fiat value in GTQ for tracking purposes
client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0
if sync_mismatch:
# SYNC MISMATCH MODE: Cap each client's allocation to their remaining fiat balance
# Each client gets sats equivalent to their full remaining balance
for client_id, client_balance in client_balances.items():
# Calculate sats equivalent to this client's remaining fiat balance
client_sats_amount = round(client_balance * exchange_rate)
proportion = client_balance / total_confirmed_deposits
distributions[client_id] = {
"fiat_amount": client_fiat_amount,
"sats_amount": client_sats_amount,
"exchange_rate": exchange_rate
}
# Calculate equivalent fiat value in GTQ for tracking purposes
client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0
logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)")
distributions[client_id] = {
"fiat_amount": client_fiat_amount,
"sats_amount": client_sats_amount,
"exchange_rate": exchange_rate
}
# Verification: ensure total distribution equals base amount
total_distributed = sum(dist["sats_amount"] for dist in distributions.values())
if total_distributed != base_crypto_atoms:
logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats")
raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}")
logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)")
# Calculate orphan sats (difference between base amount and distributed)
total_distributed = sum(dist["sats_amount"] for dist in distributions.values())
orphan_sats = base_crypto_atoms - total_distributed
logger.info(
f"Sync mismatch distribution: {total_distributed} sats to clients, "
f"{orphan_sats} sats orphaned (staying in source wallet)"
)
else:
# NORMAL MODE: Proportional distribution based on transaction amount
sat_allocations = calculate_distribution(base_crypto_atoms, client_balances)
if not sat_allocations:
logger.info("No allocations calculated - skipping distribution")
return {}, 0
# Build final distributions dict with additional tracking fields
for client_id, client_sats_amount in sat_allocations.items():
# Calculate proportion for logging
proportion = client_balances[client_id] / total_confirmed_deposits
# Calculate equivalent fiat value in GTQ for tracking purposes
client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0
distributions[client_id] = {
"fiat_amount": client_fiat_amount,
"sats_amount": client_sats_amount,
"exchange_rate": exchange_rate
}
logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)")
# Verification: ensure total distribution equals base amount
total_distributed = sum(dist["sats_amount"] for dist in distributions.values())
if total_distributed != base_crypto_atoms:
logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats")
raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}")
orphan_sats = 0
# Safety check: Re-verify all clients still have positive balances before finalizing distributions
# This prevents race conditions where balances changed during calculation
@ -800,18 +845,18 @@ class LamassuTransactionProcessor:
# Recalculate proportions if some clients were rejected
if len(final_distributions) == 0:
logger.info("All clients rejected due to negative balances - no distributions")
return {}
return {}, orphan_sats
# For simplicity, we'll still return the original distributions but log the warning
# In a production system, you might want to recalculate the entire distribution
logger.warning("Proceeding with original distribution despite balance warnings - manual review recommended")
logger.info(f"Distribution verified: {total_distributed} sats distributed across {len(distributions)} clients (clients with positive allocations only)")
return distributions
return distributions, orphan_sats
except Exception as e:
logger.error(f"Error calculating distribution amounts: {e}")
return {}
return {}, 0
async def distribute_to_clients(self, transaction: Dict[str, Any], distributions: Dict[str, Dict[str, int]]) -> None:
"""Send Bitcoin payments to DCA clients"""
@ -1140,10 +1185,16 @@ class LamassuTransactionProcessor:
stored_transaction = await self.store_lamassu_transaction(transaction)
# Calculate distribution amounts
distributions = await self.calculate_distribution_amounts(transaction)
distributions, orphan_sats = await self.calculate_distribution_amounts(transaction)
if not distributions:
logger.info(f"No distributions calculated for transaction {transaction_id}")
if orphan_sats > 0:
logger.warning(
f"No client distributions for transaction {transaction_id}, "
f"but {orphan_sats} orphan sats remain in source wallet"
)
else:
logger.info(f"No distributions calculated for transaction {transaction_id}")
return
# Calculate commission amount for sending to commission wallet