From 5e8110f322d4f8f5d5df2fd810e84b542b259411 Mon Sep 17 00:00:00 2001 From: Patrick Mulligan Date: Wed, 31 Dec 2025 23:41:16 +0100 Subject: [PATCH] fix: cap DCA allocations when ATM cash exceeds tracked balances MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When total tracked client balances < transaction fiat amount (sync mismatch), the proportional allocation formula was giving each client more GTQ-equivalent sats than their actual remaining balance, causing all payments to be refused. Changes: - Add sync mismatch detection in calculate_distribution_amounts() - Cap allocations to remaining balance when sync mismatch detected - Track orphan sats (unallocated due to sync mismatch) - Update return type to tuple (distributions, orphan_sats) - Log orphan amounts for reconciliation tracking Fixes #1 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- transaction_processor.py | 197 ++++++++++++++++++++++++++------------- 1 file changed, 130 insertions(+), 67 deletions(-) diff --git a/transaction_processor.py b/transaction_processor.py index bd6ae77..1686842 100644 --- a/transaction_processor.py +++ b/transaction_processor.py @@ -668,15 +668,21 @@ class LamassuTransactionProcessor: logger.error(f"Error fetching transactions from Lamassu database: {e}") return [] - async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> Dict[str, int]: - """Calculate how much each Flow Mode client should receive""" + async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> tuple[Dict[str, Any], int]: + """Calculate how much each Flow Mode client should receive. + + Returns: + tuple: (distributions dict, orphan_sats int) + - distributions: {client_id: {fiat_amount, sats_amount, exchange_rate}} + - orphan_sats: sats that couldn't be distributed due to sync mismatch + """ try: # Get all active Flow Mode clients flow_clients = await get_flow_mode_clients() if not flow_clients: logger.info("No Flow Mode clients found - skipping distribution") - return {} + return {}, 0 # Extract transaction details - guaranteed clean from data ingestion crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in @@ -700,10 +706,10 @@ class LamassuTransactionProcessor: # Validate required fields if crypto_atoms is None: logger.error(f"Missing crypto_amount in transaction: {transaction}") - return {} + return {}, 0 if fiat_amount is None: logger.error(f"Missing fiat_amount in transaction: {transaction}") - return {} + return {}, 0 if commission_percentage is None: logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0") commission_percentage = 0.0 @@ -757,52 +763,85 @@ class LamassuTransactionProcessor: if total_confirmed_deposits == 0: logger.info("No clients with remaining DCA balance - skipping distribution") - return {} + return {}, 0 + + # Detect sync mismatch: more money in ATM than tracked client balances + sync_mismatch = total_confirmed_deposits < fiat_amount + if sync_mismatch: + orphan_fiat = fiat_amount - total_confirmed_deposits + logger.warning( + f"Sync mismatch detected: tracked balances ({total_confirmed_deposits:.2f} GTQ) " + f"< transaction ({fiat_amount} GTQ). Orphan amount: {orphan_fiat:.2f} GTQ" + ) - # Calculate proportional distribution with remainder allocation + # Calculate distribution amounts distributions = {} distributed_sats = 0 client_calculations = [] - - # First pass: calculate base amounts and track remainders - for client_id, client_balance in client_balances.items(): - # Calculate this client's proportion of the total DCA pool - proportion = client_balance / total_confirmed_deposits - - # Calculate exact share (with decimals) - exact_share = base_crypto_atoms * proportion - - # Use banker's rounding for base allocation - client_sats_amount = round(exact_share) - - client_calculations.append({ - 'client_id': client_id, - 'proportion': proportion, - 'exact_share': exact_share, - 'allocated_sats': client_sats_amount, - 'client_balance': client_balance - }) - - distributed_sats += client_sats_amount - - # Handle any remainder due to rounding (should be small) - remainder = base_crypto_atoms - distributed_sats - - if remainder != 0: - logger.info(f"Distributing remainder: {remainder} sats among {len(client_calculations)} clients") - - # Sort clients by largest fractional remainder to distribute fairly - client_calculations.sort( - key=lambda x: x['exact_share'] - x['allocated_sats'], - reverse=True - ) - - # Distribute remainder one sat at a time to clients with largest fractional parts - for i in range(abs(remainder)): - if remainder > 0: - client_calculations[i % len(client_calculations)]['allocated_sats'] += 1 - else: - client_calculations[i % len(client_calculations)]['allocated_sats'] -= 1 + + if sync_mismatch: + # SYNC MISMATCH MODE: Cap each client's allocation to their remaining fiat balance + # Each client gets sats equivalent to their full remaining balance + for client_id, client_balance in client_balances.items(): + # Calculate sats equivalent to this client's remaining fiat balance + client_sats_amount = round(client_balance * exchange_rate) + proportion = client_balance / total_confirmed_deposits + + client_calculations.append({ + 'client_id': client_id, + 'proportion': proportion, + 'exact_share': client_sats_amount, # In sync mismatch, exact_share = allocated + 'allocated_sats': client_sats_amount, + 'client_balance': client_balance + }) + + distributed_sats += client_sats_amount + + logger.info(f"Sync mismatch mode: distributing {distributed_sats} sats based on client balances (not transaction amount)") + else: + # NORMAL MODE: Proportional distribution based on transaction amount + # First pass: calculate base amounts and track remainders + for client_id, client_balance in client_balances.items(): + # Calculate this client's proportion of the total DCA pool + proportion = client_balance / total_confirmed_deposits + + # Calculate exact share (with decimals) + exact_share = base_crypto_atoms * proportion + + # Use banker's rounding for base allocation + client_sats_amount = round(exact_share) + + client_calculations.append({ + 'client_id': client_id, + 'proportion': proportion, + 'exact_share': exact_share, + 'allocated_sats': client_sats_amount, + 'client_balance': client_balance + }) + + distributed_sats += client_sats_amount + + # Handle any remainder due to rounding (should be small) + remainder = base_crypto_atoms - distributed_sats + + if remainder != 0: + logger.info(f"Distributing remainder: {remainder} sats among {len(client_calculations)} clients") + + # Sort clients by largest fractional remainder to distribute fairly + client_calculations.sort( + key=lambda x: x['exact_share'] - x['allocated_sats'], + reverse=True + ) + + # Distribute remainder one sat at a time to clients with largest fractional parts + for i in range(abs(remainder)): + if remainder > 0: + client_calculations[i % len(client_calculations)]['allocated_sats'] += 1 + else: + client_calculations[i % len(client_calculations)]['allocated_sats'] -= 1 + + # Recalculate distributed_sats after remainder distribution + distributed_sats = sum(calc['allocated_sats'] for calc in client_calculations) # Second pass: create distributions with final amounts for calc in client_calculations: @@ -821,12 +860,23 @@ class LamassuTransactionProcessor: logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)") - # Verification: ensure total distribution equals base amount + # Verification and orphan calculation total_distributed = sum(dist["sats_amount"] for dist in distributions.values()) - if total_distributed != base_crypto_atoms: - logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats") - raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}") - + + if sync_mismatch: + # In sync mismatch mode, orphan_sats is the difference between base amount and distributed + orphan_sats = base_crypto_atoms - total_distributed + logger.info( + f"Sync mismatch distribution: {total_distributed} sats to clients, " + f"{orphan_sats} sats orphaned (staying in source wallet)" + ) + else: + # In normal mode, verify distribution equals base amount + orphan_sats = 0 + if total_distributed != base_crypto_atoms: + logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats") + raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}") + # Safety check: Re-verify all clients still have positive balances before finalizing distributions # This prevents race conditions where balances changed during calculation final_distributions = {} @@ -838,25 +888,25 @@ class LamassuTransactionProcessor: logger.info(f"Client {client_id[:8]}... final balance check: {current_balance.remaining_balance:.2f} GTQ - APPROVED for {distribution['sats_amount']} sats") else: logger.warning(f"Client {client_id[:8]}... final balance check: {current_balance.remaining_balance:.2f} GTQ - REJECTED (negative balance)") - + if len(final_distributions) != len(distributions): logger.warning(f"Rejected {len(distributions) - len(final_distributions)} clients due to negative balances during final check") - + # Recalculate proportions if some clients were rejected if len(final_distributions) == 0: logger.info("All clients rejected due to negative balances - no distributions") - return {} - + return {}, orphan_sats + # For simplicity, we'll still return the original distributions but log the warning # In a production system, you might want to recalculate the entire distribution logger.warning("Proceeding with original distribution despite balance warnings - manual review recommended") - + logger.info(f"Distribution verified: {total_distributed} sats distributed across {len(distributions)} clients (clients with positive allocations only)") - return distributions - + return distributions, orphan_sats + except Exception as e: logger.error(f"Error calculating distribution amounts: {e}") - return {} + return {}, 0 async def distribute_to_clients(self, transaction: Dict[str, Any], distributions: Dict[str, Dict[str, int]]) -> None: """Send Bitcoin payments to DCA clients""" @@ -1190,10 +1240,16 @@ class LamassuTransactionProcessor: stored_transaction = await self.store_lamassu_transaction(transaction) # Calculate distribution amounts - distributions = await self.calculate_distribution_amounts(transaction) - + distributions, orphan_sats = await self.calculate_distribution_amounts(transaction) + if not distributions: - logger.info(f"No distributions calculated for transaction {transaction_id}") + if orphan_sats > 0: + logger.warning( + f"No client distributions for transaction {transaction_id}, " + f"but {orphan_sats} orphan sats remain in source wallet" + ) + else: + logger.info(f"No distributions calculated for transaction {transaction_id}") return # Calculate commission amount for sending to commission wallet @@ -1220,12 +1276,19 @@ class LamassuTransactionProcessor: clients_count = len(distributions) distributions_total_sats = sum(dist["sats_amount"] for dist in distributions.values()) await update_lamassu_transaction_distribution_stats( - stored_transaction, - clients_count, + stored_transaction, + clients_count, distributions_total_sats ) - - logger.info(f"Successfully processed transaction {transaction_id}") + + # Log final summary including orphan sats if any + if orphan_sats > 0: + logger.info( + f"Successfully processed transaction {transaction_id} " + f"({orphan_sats} orphan sats remain in source wallet due to sync mismatch)" + ) + else: + logger.info(f"Successfully processed transaction {transaction_id}") except Exception as e: logger.error(f"Error processing transaction {transaction.get('transaction_id', 'unknown')}: {e}") -- 2.52.0