Compare commits
3 commits
refactor/d
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 6eb076d5f6 | |||
| 545a0284a7 | |||
| 49f3670bac |
2 changed files with 94 additions and 43 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "satmachineadmin"
|
name = "satmachineadmin"
|
||||||
version = "0.0.0"
|
version = "0.0.4"
|
||||||
description = "Eightball is a simple API that allows you to create a random number generator."
|
description = "Eightball is a simple API that allows you to create a random number generator."
|
||||||
authors = ["benarc", "dni <dni@lnbits.com>"]
|
authors = ["benarc", "dni <dni@lnbits.com>"]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -669,15 +669,21 @@ class LamassuTransactionProcessor:
|
||||||
logger.error(f"Error fetching transactions from Lamassu database: {e}")
|
logger.error(f"Error fetching transactions from Lamassu database: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> Dict[str, int]:
|
async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> tuple[Dict[str, Any], int]:
|
||||||
"""Calculate how much each Flow Mode client should receive"""
|
"""Calculate how much each Flow Mode client should receive.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (distributions dict, orphan_sats int)
|
||||||
|
- distributions: {client_id: {fiat_amount, sats_amount, exchange_rate}}
|
||||||
|
- orphan_sats: sats that couldn't be distributed due to sync mismatch
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
# Get all active Flow Mode clients
|
# Get all active Flow Mode clients
|
||||||
flow_clients = await get_flow_mode_clients()
|
flow_clients = await get_flow_mode_clients()
|
||||||
|
|
||||||
if not flow_clients:
|
if not flow_clients:
|
||||||
logger.info("No Flow Mode clients found - skipping distribution")
|
logger.info("No Flow Mode clients found - skipping distribution")
|
||||||
return {}
|
return {}, 0
|
||||||
|
|
||||||
# Extract transaction details - guaranteed clean from data ingestion
|
# Extract transaction details - guaranteed clean from data ingestion
|
||||||
crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in
|
crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in
|
||||||
|
|
@ -701,10 +707,10 @@ class LamassuTransactionProcessor:
|
||||||
# Validate required fields
|
# Validate required fields
|
||||||
if crypto_atoms is None:
|
if crypto_atoms is None:
|
||||||
logger.error(f"Missing crypto_amount in transaction: {transaction}")
|
logger.error(f"Missing crypto_amount in transaction: {transaction}")
|
||||||
return {}
|
return {}, 0
|
||||||
if fiat_amount is None:
|
if fiat_amount is None:
|
||||||
logger.error(f"Missing fiat_amount in transaction: {transaction}")
|
logger.error(f"Missing fiat_amount in transaction: {transaction}")
|
||||||
return {}
|
return {}, 0
|
||||||
if commission_percentage is None:
|
if commission_percentage is None:
|
||||||
logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0")
|
logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0")
|
||||||
commission_percentage = 0.0
|
commission_percentage = 0.0
|
||||||
|
|
@ -750,37 +756,76 @@ class LamassuTransactionProcessor:
|
||||||
|
|
||||||
if total_confirmed_deposits == 0:
|
if total_confirmed_deposits == 0:
|
||||||
logger.info("No clients with remaining DCA balance - skipping distribution")
|
logger.info("No clients with remaining DCA balance - skipping distribution")
|
||||||
return {}
|
return {}, 0
|
||||||
|
|
||||||
# Calculate sat allocations using the extracted pure function
|
# Detect sync mismatch: more money in ATM than tracked client balances
|
||||||
sat_allocations = calculate_distribution(base_crypto_atoms, client_balances)
|
sync_mismatch = total_confirmed_deposits < fiat_amount
|
||||||
|
if sync_mismatch:
|
||||||
|
orphan_fiat = fiat_amount - total_confirmed_deposits
|
||||||
|
logger.warning(
|
||||||
|
f"Sync mismatch detected: tracked balances ({total_confirmed_deposits:.2f} GTQ) "
|
||||||
|
f"< transaction ({fiat_amount} GTQ). Orphan amount: {orphan_fiat:.2f} GTQ"
|
||||||
|
)
|
||||||
|
|
||||||
if not sat_allocations:
|
# Calculate distribution amounts
|
||||||
logger.info("No allocations calculated - skipping distribution")
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Build final distributions dict with additional tracking fields
|
|
||||||
distributions = {}
|
distributions = {}
|
||||||
for client_id, client_sats_amount in sat_allocations.items():
|
|
||||||
# Calculate proportion for logging
|
|
||||||
proportion = client_balances[client_id] / total_confirmed_deposits
|
|
||||||
|
|
||||||
# Calculate equivalent fiat value in GTQ for tracking purposes
|
if sync_mismatch:
|
||||||
client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0
|
# SYNC MISMATCH MODE: Cap each client's allocation to their remaining fiat balance
|
||||||
|
# Each client gets sats equivalent to their full remaining balance
|
||||||
|
for client_id, client_balance in client_balances.items():
|
||||||
|
# Calculate sats equivalent to this client's remaining fiat balance
|
||||||
|
client_sats_amount = round(client_balance * exchange_rate)
|
||||||
|
proportion = client_balance / total_confirmed_deposits
|
||||||
|
|
||||||
distributions[client_id] = {
|
# Calculate equivalent fiat value in GTQ for tracking purposes
|
||||||
"fiat_amount": client_fiat_amount,
|
client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0
|
||||||
"sats_amount": client_sats_amount,
|
|
||||||
"exchange_rate": exchange_rate
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)")
|
distributions[client_id] = {
|
||||||
|
"fiat_amount": client_fiat_amount,
|
||||||
# Verification: ensure total distribution equals base amount
|
"sats_amount": client_sats_amount,
|
||||||
total_distributed = sum(dist["sats_amount"] for dist in distributions.values())
|
"exchange_rate": exchange_rate
|
||||||
if total_distributed != base_crypto_atoms:
|
}
|
||||||
logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats")
|
|
||||||
raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}")
|
logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)")
|
||||||
|
|
||||||
|
# Calculate orphan sats (difference between base amount and distributed)
|
||||||
|
total_distributed = sum(dist["sats_amount"] for dist in distributions.values())
|
||||||
|
orphan_sats = base_crypto_atoms - total_distributed
|
||||||
|
logger.info(
|
||||||
|
f"Sync mismatch distribution: {total_distributed} sats to clients, "
|
||||||
|
f"{orphan_sats} sats orphaned (staying in source wallet)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# NORMAL MODE: Proportional distribution based on transaction amount
|
||||||
|
sat_allocations = calculate_distribution(base_crypto_atoms, client_balances)
|
||||||
|
|
||||||
|
if not sat_allocations:
|
||||||
|
logger.info("No allocations calculated - skipping distribution")
|
||||||
|
return {}, 0
|
||||||
|
|
||||||
|
# Build final distributions dict with additional tracking fields
|
||||||
|
for client_id, client_sats_amount in sat_allocations.items():
|
||||||
|
# Calculate proportion for logging
|
||||||
|
proportion = client_balances[client_id] / total_confirmed_deposits
|
||||||
|
|
||||||
|
# Calculate equivalent fiat value in GTQ for tracking purposes
|
||||||
|
client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0
|
||||||
|
|
||||||
|
distributions[client_id] = {
|
||||||
|
"fiat_amount": client_fiat_amount,
|
||||||
|
"sats_amount": client_sats_amount,
|
||||||
|
"exchange_rate": exchange_rate
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)")
|
||||||
|
|
||||||
|
# Verification: ensure total distribution equals base amount
|
||||||
|
total_distributed = sum(dist["sats_amount"] for dist in distributions.values())
|
||||||
|
if total_distributed != base_crypto_atoms:
|
||||||
|
logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats")
|
||||||
|
raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}")
|
||||||
|
orphan_sats = 0
|
||||||
|
|
||||||
# Safety check: Re-verify all clients still have positive balances before finalizing distributions
|
# Safety check: Re-verify all clients still have positive balances before finalizing distributions
|
||||||
# This prevents race conditions where balances changed during calculation
|
# This prevents race conditions where balances changed during calculation
|
||||||
|
|
@ -800,18 +845,18 @@ class LamassuTransactionProcessor:
|
||||||
# Recalculate proportions if some clients were rejected
|
# Recalculate proportions if some clients were rejected
|
||||||
if len(final_distributions) == 0:
|
if len(final_distributions) == 0:
|
||||||
logger.info("All clients rejected due to negative balances - no distributions")
|
logger.info("All clients rejected due to negative balances - no distributions")
|
||||||
return {}
|
return {}, orphan_sats
|
||||||
|
|
||||||
# For simplicity, we'll still return the original distributions but log the warning
|
# For simplicity, we'll still return the original distributions but log the warning
|
||||||
# In a production system, you might want to recalculate the entire distribution
|
# In a production system, you might want to recalculate the entire distribution
|
||||||
logger.warning("Proceeding with original distribution despite balance warnings - manual review recommended")
|
logger.warning("Proceeding with original distribution despite balance warnings - manual review recommended")
|
||||||
|
|
||||||
logger.info(f"Distribution verified: {total_distributed} sats distributed across {len(distributions)} clients (clients with positive allocations only)")
|
logger.info(f"Distribution verified: {total_distributed} sats distributed across {len(distributions)} clients (clients with positive allocations only)")
|
||||||
return distributions
|
return distributions, orphan_sats
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error calculating distribution amounts: {e}")
|
logger.error(f"Error calculating distribution amounts: {e}")
|
||||||
return {}
|
return {}, 0
|
||||||
|
|
||||||
async def distribute_to_clients(self, transaction: Dict[str, Any], distributions: Dict[str, Dict[str, int]]) -> None:
|
async def distribute_to_clients(self, transaction: Dict[str, Any], distributions: Dict[str, Dict[str, int]]) -> None:
|
||||||
"""Send Bitcoin payments to DCA clients"""
|
"""Send Bitcoin payments to DCA clients"""
|
||||||
|
|
@ -920,7 +965,7 @@ class LamassuTransactionProcessor:
|
||||||
}
|
}
|
||||||
new_payment = await create_invoice(
|
new_payment = await create_invoice(
|
||||||
wallet_id=target_wallet.id,
|
wallet_id=target_wallet.id,
|
||||||
amount=amount_sats, # LNBits create_invoice expects sats
|
amount=float(amount_sats), # LNBits create_invoice expects float
|
||||||
internal=True, # Internal transfer within LNBits
|
internal=True, # Internal transfer within LNBits
|
||||||
memo=memo,
|
memo=memo,
|
||||||
extra=extra
|
extra=extra
|
||||||
|
|
@ -1085,7 +1130,7 @@ class LamassuTransactionProcessor:
|
||||||
|
|
||||||
commission_payment = await create_invoice(
|
commission_payment = await create_invoice(
|
||||||
wallet_id=admin_config.commission_wallet_id,
|
wallet_id=admin_config.commission_wallet_id,
|
||||||
amount=commission_amount_sats,
|
amount=float(commission_amount_sats), # LNbits create_invoice expects float
|
||||||
internal=True,
|
internal=True,
|
||||||
memo=commission_memo,
|
memo=commission_memo,
|
||||||
extra={
|
extra={
|
||||||
|
|
@ -1140,10 +1185,16 @@ class LamassuTransactionProcessor:
|
||||||
stored_transaction = await self.store_lamassu_transaction(transaction)
|
stored_transaction = await self.store_lamassu_transaction(transaction)
|
||||||
|
|
||||||
# Calculate distribution amounts
|
# Calculate distribution amounts
|
||||||
distributions = await self.calculate_distribution_amounts(transaction)
|
distributions, orphan_sats = await self.calculate_distribution_amounts(transaction)
|
||||||
|
|
||||||
if not distributions:
|
if not distributions:
|
||||||
logger.info(f"No distributions calculated for transaction {transaction_id}")
|
if orphan_sats > 0:
|
||||||
|
logger.warning(
|
||||||
|
f"No client distributions for transaction {transaction_id}, "
|
||||||
|
f"but {orphan_sats} orphan sats remain in source wallet"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(f"No distributions calculated for transaction {transaction_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Calculate commission amount for sending to commission wallet
|
# Calculate commission amount for sending to commission wallet
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue