fix: cap DCA allocations when ATM cash exceeds tracked balances #2

Closed
padreug wants to merge 1 commit from fix/cap-dca-allocations-sync-mismatch into main
Showing only changes of commit 5e8110f322 - Show all commits

View file

@ -668,15 +668,21 @@ class LamassuTransactionProcessor:
logger.error(f"Error fetching transactions from Lamassu database: {e}") logger.error(f"Error fetching transactions from Lamassu database: {e}")
return [] return []
async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> Dict[str, int]: async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> tuple[Dict[str, Any], int]:
"""Calculate how much each Flow Mode client should receive""" """Calculate how much each Flow Mode client should receive.
Returns:
tuple: (distributions dict, orphan_sats int)
- distributions: {client_id: {fiat_amount, sats_amount, exchange_rate}}
- orphan_sats: sats that couldn't be distributed due to sync mismatch
"""
try: try:
# Get all active Flow Mode clients # Get all active Flow Mode clients
flow_clients = await get_flow_mode_clients() flow_clients = await get_flow_mode_clients()
if not flow_clients: if not flow_clients:
logger.info("No Flow Mode clients found - skipping distribution") logger.info("No Flow Mode clients found - skipping distribution")
return {} return {}, 0
# Extract transaction details - guaranteed clean from data ingestion # Extract transaction details - guaranteed clean from data ingestion
crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in
@ -700,10 +706,10 @@ class LamassuTransactionProcessor:
# Validate required fields # Validate required fields
if crypto_atoms is None: if crypto_atoms is None:
logger.error(f"Missing crypto_amount in transaction: {transaction}") logger.error(f"Missing crypto_amount in transaction: {transaction}")
return {} return {}, 0
if fiat_amount is None: if fiat_amount is None:
logger.error(f"Missing fiat_amount in transaction: {transaction}") logger.error(f"Missing fiat_amount in transaction: {transaction}")
return {} return {}, 0
if commission_percentage is None: if commission_percentage is None:
logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0") logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0")
commission_percentage = 0.0 commission_percentage = 0.0
@ -757,13 +763,43 @@ class LamassuTransactionProcessor:
if total_confirmed_deposits == 0: if total_confirmed_deposits == 0:
logger.info("No clients with remaining DCA balance - skipping distribution") logger.info("No clients with remaining DCA balance - skipping distribution")
return {} return {}, 0
# Calculate proportional distribution with remainder allocation # Detect sync mismatch: more money in ATM than tracked client balances
sync_mismatch = total_confirmed_deposits < fiat_amount
if sync_mismatch:
orphan_fiat = fiat_amount - total_confirmed_deposits
logger.warning(
f"Sync mismatch detected: tracked balances ({total_confirmed_deposits:.2f} GTQ) "
f"< transaction ({fiat_amount} GTQ). Orphan amount: {orphan_fiat:.2f} GTQ"
)
# Calculate distribution amounts
distributions = {} distributions = {}
distributed_sats = 0 distributed_sats = 0
client_calculations = [] client_calculations = []
if sync_mismatch:
# SYNC MISMATCH MODE: Cap each client's allocation to their remaining fiat balance
# Each client gets sats equivalent to their full remaining balance
for client_id, client_balance in client_balances.items():
# Calculate sats equivalent to this client's remaining fiat balance
client_sats_amount = round(client_balance * exchange_rate)
proportion = client_balance / total_confirmed_deposits
client_calculations.append({
'client_id': client_id,
'proportion': proportion,
'exact_share': client_sats_amount, # In sync mismatch, exact_share = allocated
'allocated_sats': client_sats_amount,
'client_balance': client_balance
})
distributed_sats += client_sats_amount
logger.info(f"Sync mismatch mode: distributing {distributed_sats} sats based on client balances (not transaction amount)")
else:
# NORMAL MODE: Proportional distribution based on transaction amount
# First pass: calculate base amounts and track remainders # First pass: calculate base amounts and track remainders
for client_id, client_balance in client_balances.items(): for client_id, client_balance in client_balances.items():
# Calculate this client's proportion of the total DCA pool # Calculate this client's proportion of the total DCA pool
@ -804,6 +840,9 @@ class LamassuTransactionProcessor:
else: else:
client_calculations[i % len(client_calculations)]['allocated_sats'] -= 1 client_calculations[i % len(client_calculations)]['allocated_sats'] -= 1
# Recalculate distributed_sats after remainder distribution
distributed_sats = sum(calc['allocated_sats'] for calc in client_calculations)
# Second pass: create distributions with final amounts # Second pass: create distributions with final amounts
for calc in client_calculations: for calc in client_calculations:
client_id = calc['client_id'] client_id = calc['client_id']
@ -821,8 +860,19 @@ class LamassuTransactionProcessor:
logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)") logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)")
# Verification: ensure total distribution equals base amount # Verification and orphan calculation
total_distributed = sum(dist["sats_amount"] for dist in distributions.values()) total_distributed = sum(dist["sats_amount"] for dist in distributions.values())
if sync_mismatch:
# In sync mismatch mode, orphan_sats is the difference between base amount and distributed
orphan_sats = base_crypto_atoms - total_distributed
logger.info(
f"Sync mismatch distribution: {total_distributed} sats to clients, "
f"{orphan_sats} sats orphaned (staying in source wallet)"
)
else:
# In normal mode, verify distribution equals base amount
orphan_sats = 0
if total_distributed != base_crypto_atoms: if total_distributed != base_crypto_atoms:
logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats") logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats")
raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}") raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}")
@ -845,18 +895,18 @@ class LamassuTransactionProcessor:
# Recalculate proportions if some clients were rejected # Recalculate proportions if some clients were rejected
if len(final_distributions) == 0: if len(final_distributions) == 0:
logger.info("All clients rejected due to negative balances - no distributions") logger.info("All clients rejected due to negative balances - no distributions")
return {} return {}, orphan_sats
# For simplicity, we'll still return the original distributions but log the warning # For simplicity, we'll still return the original distributions but log the warning
# In a production system, you might want to recalculate the entire distribution # In a production system, you might want to recalculate the entire distribution
logger.warning("Proceeding with original distribution despite balance warnings - manual review recommended") logger.warning("Proceeding with original distribution despite balance warnings - manual review recommended")
logger.info(f"Distribution verified: {total_distributed} sats distributed across {len(distributions)} clients (clients with positive allocations only)") logger.info(f"Distribution verified: {total_distributed} sats distributed across {len(distributions)} clients (clients with positive allocations only)")
return distributions return distributions, orphan_sats
except Exception as e: except Exception as e:
logger.error(f"Error calculating distribution amounts: {e}") logger.error(f"Error calculating distribution amounts: {e}")
return {} return {}, 0
async def distribute_to_clients(self, transaction: Dict[str, Any], distributions: Dict[str, Dict[str, int]]) -> None: async def distribute_to_clients(self, transaction: Dict[str, Any], distributions: Dict[str, Dict[str, int]]) -> None:
"""Send Bitcoin payments to DCA clients""" """Send Bitcoin payments to DCA clients"""
@ -1190,9 +1240,15 @@ class LamassuTransactionProcessor:
stored_transaction = await self.store_lamassu_transaction(transaction) stored_transaction = await self.store_lamassu_transaction(transaction)
# Calculate distribution amounts # Calculate distribution amounts
distributions = await self.calculate_distribution_amounts(transaction) distributions, orphan_sats = await self.calculate_distribution_amounts(transaction)
if not distributions: if not distributions:
if orphan_sats > 0:
logger.warning(
f"No client distributions for transaction {transaction_id}, "
f"but {orphan_sats} orphan sats remain in source wallet"
)
else:
logger.info(f"No distributions calculated for transaction {transaction_id}") logger.info(f"No distributions calculated for transaction {transaction_id}")
return return
@ -1225,6 +1281,13 @@ class LamassuTransactionProcessor:
distributions_total_sats distributions_total_sats
) )
# Log final summary including orphan sats if any
if orphan_sats > 0:
logger.info(
f"Successfully processed transaction {transaction_id} "
f"({orphan_sats} orphan sats remain in source wallet due to sync mismatch)"
)
else:
logger.info(f"Successfully processed transaction {transaction_id}") logger.info(f"Successfully processed transaction {transaction_id}")
except Exception as e: except Exception as e: