refactor: use Decimal instead of float for monetary calculations

- calculations.py: Use Decimal for commission percentages, exchange rates,
  and client balances. Added to_decimal() helper for safe float conversion.
  Changed from banker's rounding to ROUND_HALF_UP.

- models.py: Changed all fiat amounts, percentages, and exchange rates to
  Decimal. Added json_encoders for API serialization.

- transaction_processor.py: Convert to Decimal at data ingestion boundary
  (CSV parsing). Updated all defaults and calculations to use Decimal.

- tests: Updated to work with Decimal return types.

This prevents floating-point precision issues in financial calculations.
All 23 tests pass.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
padreug 2026-01-11 14:47:56 +01:00
parent 397fd4b002
commit 6e86f53962
4 changed files with 180 additions and 101 deletions

View file

@ -3,16 +3,34 @@ Pure calculation functions for DCA transaction processing.
These functions have no external dependencies (no lnbits, no database)
and can be easily tested in isolation.
All monetary calculations use Decimal for precision. Satoshi amounts
remain as int since they are the smallest indivisible unit.
"""
from typing import Dict, Tuple
from decimal import Decimal, ROUND_HALF_UP
from typing import Dict, Tuple, Union
# Type alias for values that can be Decimal or numeric types that will be converted
DecimalLike = Union[Decimal, float, int, str]
def to_decimal(value: DecimalLike) -> Decimal:
"""Convert a value to Decimal, handling floats carefully."""
if isinstance(value, Decimal):
return value
# Convert floats via string to avoid binary float precision issues
# e.g., Decimal(0.055) gives 0.054999999... but Decimal("0.055") is exact
if isinstance(value, float):
return Decimal(str(value))
return Decimal(value)
def calculate_commission(
crypto_atoms: int,
commission_percentage: float,
discount: float = 0.0
) -> Tuple[int, int, float]:
commission_percentage: DecimalLike,
discount: DecimalLike = Decimal("0")
) -> Tuple[int, int, Decimal]:
"""
Calculate commission split from a Lamassu transaction.
@ -34,15 +52,25 @@ def calculate_commission(
Tuple of (base_amount_sats, commission_amount_sats, effective_commission_rate)
Example:
>>> calculate_commission(266800, 0.03, 0.0)
(259029, 7771, 0.03)
>>> calculate_commission(266800, Decimal("0.03"), Decimal("0"))
(259029, 7771, Decimal('0.03'))
"""
if commission_percentage > 0:
effective_commission = commission_percentage * (100 - discount) / 100
base_crypto_atoms = round(crypto_atoms / (1 + effective_commission))
# Convert inputs to Decimal for precise calculations
comm_pct = to_decimal(commission_percentage)
disc = to_decimal(discount)
if comm_pct > 0:
# effective = commission_percentage * (100 - discount) / 100
effective_commission = comm_pct * (Decimal("100") - disc) / Decimal("100")
# base = crypto_atoms / (1 + effective_commission), rounded to nearest int
divisor = Decimal("1") + effective_commission
exact_base = Decimal(crypto_atoms) / divisor
# Use ROUND_HALF_UP (standard rounding: 0.5 rounds up)
base_crypto_atoms = int(exact_base.quantize(Decimal("1"), rounding=ROUND_HALF_UP))
commission_amount_sats = crypto_atoms - base_crypto_atoms
else:
effective_commission = 0.0
effective_commission = Decimal("0")
base_crypto_atoms = crypto_atoms
commission_amount_sats = 0
@ -51,8 +79,8 @@ def calculate_commission(
def calculate_distribution(
base_amount_sats: int,
client_balances: Dict[str, float],
min_balance_threshold: float = 0.01
client_balances: Dict[str, DecimalLike],
min_balance_threshold: DecimalLike = Decimal("0.01")
) -> Dict[str, int]:
"""
Calculate proportional distribution of sats to clients based on their fiat balances.
@ -69,15 +97,18 @@ def calculate_distribution(
Dict of {client_id: allocated_sats}
Example:
>>> calculate_distribution(100000, {"a": 500.0, "b": 500.0})
>>> calculate_distribution(100000, {"a": Decimal("500"), "b": Decimal("500")})
{"a": 50000, "b": 50000}
"""
# Filter out clients with balance below threshold
active_balances = {
client_id: balance
for client_id, balance in client_balances.items()
if balance >= min_balance_threshold
}
# Convert threshold to Decimal
threshold = to_decimal(min_balance_threshold)
# Filter out clients with balance below threshold, converting to Decimal
active_balances: Dict[str, Decimal] = {}
for client_id, balance in client_balances.items():
bal = to_decimal(balance)
if bal >= threshold:
active_balances[client_id] = bal
if not active_balances:
return {}
@ -90,11 +121,13 @@ def calculate_distribution(
# First pass: calculate base allocations and track for remainder distribution
client_calculations = []
distributed_sats = 0
base_sats_decimal = Decimal(base_amount_sats)
for client_id, balance in active_balances.items():
proportion = balance / total_balance
exact_share = base_amount_sats * proportion
allocated_sats = round(exact_share)
exact_share = base_sats_decimal * proportion
# Round to nearest integer using ROUND_HALF_UP
allocated_sats = int(exact_share.quantize(Decimal("1"), rounding=ROUND_HALF_UP))
client_calculations.append({
'client_id': client_id,
@ -109,8 +142,9 @@ def calculate_distribution(
if remainder != 0:
# Sort by largest fractional remainder to distribute fairly
# The fractional part is exact_share - allocated_sats
client_calculations.sort(
key=lambda x: x['exact_share'] - x['allocated_sats'],
key=lambda x: x['exact_share'] - Decimal(x['allocated_sats']),
reverse=True
)
@ -131,7 +165,7 @@ def calculate_distribution(
return distributions
def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float:
def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: DecimalLike) -> Decimal:
"""
Calculate exchange rate in sats per fiat unit.
@ -140,8 +174,9 @@ def calculate_exchange_rate(base_crypto_atoms: int, fiat_amount: float) -> float
fiat_amount: Fiat amount dispensed
Returns:
Exchange rate as sats per fiat unit
Exchange rate as sats per fiat unit (Decimal for precision)
"""
if fiat_amount <= 0:
return 0.0
return base_crypto_atoms / fiat_amount
fiat = to_decimal(fiat_amount)
if fiat <= 0:
return Decimal("0")
return Decimal(base_crypto_atoms) / fiat

View file

@ -1,6 +1,7 @@
# Description: Pydantic data models dictate what is passed between frontend and backend.
from datetime import datetime
from decimal import Decimal
from typing import Optional
from pydantic import BaseModel, validator
@ -12,7 +13,7 @@ class CreateDcaClientData(BaseModel):
wallet_id: str
username: str
dca_mode: str = "flow" # 'flow' or 'fixed'
fixed_mode_daily_limit: Optional[float] = None
fixed_mode_daily_limit: Optional[Decimal] = None
class DcaClient(BaseModel):
@ -21,44 +22,52 @@ class DcaClient(BaseModel):
wallet_id: str
username: Optional[str]
dca_mode: str
fixed_mode_daily_limit: Optional[int]
fixed_mode_daily_limit: Optional[Decimal]
status: str
created_at: datetime
updated_at: datetime
class Config:
json_encoders = {Decimal: lambda v: float(v)}
class UpdateDcaClientData(BaseModel):
username: Optional[str] = None
dca_mode: Optional[str] = None
fixed_mode_daily_limit: Optional[float] = None
fixed_mode_daily_limit: Optional[Decimal] = None
status: Optional[str] = None
# Deposit Models (Now storing GTQ directly)
class CreateDepositData(BaseModel):
client_id: str
amount: float # Amount in GTQ (e.g., 150.75)
amount: Decimal # Amount in GTQ (e.g., 150.75)
currency: str = "GTQ"
notes: Optional[str] = None
@validator('amount')
@validator('amount', pre=True)
def round_amount_to_cents(cls, v):
"""Ensure amount is rounded to 2 decimal places for DECIMAL(10,2) storage"""
if v is not None:
return round(float(v), 2)
# Convert to Decimal via string to avoid float precision issues
d = Decimal(str(v)) if not isinstance(v, Decimal) else v
return d.quantize(Decimal("0.01"))
return v
class DcaDeposit(BaseModel):
id: str
client_id: str
amount: float # Amount in GTQ (e.g., 150.75)
amount: Decimal # Amount in GTQ (e.g., 150.75)
currency: str
status: str # 'pending' or 'confirmed'
notes: Optional[str]
created_at: datetime
confirmed_at: Optional[datetime]
class Config:
json_encoders = {Decimal: lambda v: float(v)}
class UpdateDepositStatusData(BaseModel):
status: str
@ -69,8 +78,8 @@ class UpdateDepositStatusData(BaseModel):
class CreateDcaPaymentData(BaseModel):
client_id: str
amount_sats: int
amount_fiat: float # Amount in GTQ (e.g., 150.75)
exchange_rate: float
amount_fiat: Decimal # Amount in GTQ (e.g., 150.75)
exchange_rate: Decimal
transaction_type: str # 'flow', 'fixed', 'manual', 'commission'
lamassu_transaction_id: Optional[str] = None
payment_hash: Optional[str] = None
@ -81,8 +90,8 @@ class DcaPayment(BaseModel):
id: str
client_id: str
amount_sats: int
amount_fiat: float # Amount in GTQ (e.g., 150.75)
exchange_rate: float
amount_fiat: Decimal # Amount in GTQ (e.g., 150.75)
exchange_rate: Decimal
transaction_type: str
lamassu_transaction_id: Optional[str]
payment_hash: Optional[str]
@ -90,38 +99,47 @@ class DcaPayment(BaseModel):
created_at: datetime
transaction_time: Optional[datetime] = None # Original ATM transaction time
class Config:
json_encoders = {Decimal: lambda v: float(v)}
# Client Balance Summary (Now storing GTQ directly)
class ClientBalanceSummary(BaseModel):
client_id: str
total_deposits: float # Total confirmed deposits in GTQ
total_payments: float # Total payments made in GTQ
remaining_balance: float # Available balance for DCA in GTQ
total_deposits: Decimal # Total confirmed deposits in GTQ
total_payments: Decimal # Total payments made in GTQ
remaining_balance: Decimal # Available balance for DCA in GTQ
currency: str
class Config:
json_encoders = {Decimal: lambda v: float(v)}
# Transaction Processing Models
class LamassuTransaction(BaseModel):
transaction_id: str
amount_fiat: float # Amount in GTQ (e.g., 150.75)
amount_fiat: Decimal # Amount in GTQ (e.g., 150.75)
amount_crypto: int
exchange_rate: float
exchange_rate: Decimal
transaction_type: str # 'cash_in' or 'cash_out'
status: str
timestamp: datetime
class Config:
json_encoders = {Decimal: lambda v: float(v)}
# Lamassu Transaction Storage Models
class CreateLamassuTransactionData(BaseModel):
lamassu_transaction_id: str
fiat_amount: float # Amount in GTQ (e.g., 150.75)
fiat_amount: Decimal # Amount in GTQ (e.g., 150.75)
crypto_amount: int
commission_percentage: float
discount: float = 0.0
effective_commission: float
commission_percentage: Decimal
discount: Decimal = Decimal("0")
effective_commission: Decimal
commission_amount_sats: int
base_amount_sats: int
exchange_rate: float
exchange_rate: Decimal
crypto_code: str = "BTC"
fiat_code: str = "GTQ"
device_id: Optional[str] = None
@ -131,14 +149,14 @@ class CreateLamassuTransactionData(BaseModel):
class StoredLamassuTransaction(BaseModel):
id: str
lamassu_transaction_id: str
fiat_amount: float # Amount in GTQ (e.g., 150.75)
fiat_amount: Decimal # Amount in GTQ (e.g., 150.75)
crypto_amount: int
commission_percentage: float
discount: float
effective_commission: float
commission_percentage: Decimal
discount: Decimal
effective_commission: Decimal
commission_amount_sats: int
base_amount_sats: int
exchange_rate: float
exchange_rate: Decimal
crypto_code: str
fiat_code: str
device_id: Optional[str]
@ -147,6 +165,9 @@ class StoredLamassuTransaction(BaseModel):
clients_count: int # Number of clients who received distributions
distributions_total_sats: int # Total sats distributed to clients
class Config:
json_encoders = {Decimal: lambda v: float(v)}
# Lamassu Configuration Models
class CreateLamassuConfigData(BaseModel):
@ -167,13 +188,14 @@ class CreateLamassuConfigData(BaseModel):
ssh_password: Optional[str] = None
ssh_private_key: Optional[str] = None # Path to private key file or key content
# DCA Client Limits
max_daily_limit_gtq: float = 2000.0 # Maximum daily limit for Fixed mode clients
@validator('max_daily_limit_gtq')
max_daily_limit_gtq: Decimal = Decimal("2000") # Maximum daily limit for Fixed mode clients
@validator('max_daily_limit_gtq', pre=True)
def round_max_daily_limit(cls, v):
"""Ensure max daily limit is rounded to 2 decimal places"""
if v is not None:
return round(float(v), 2)
d = Decimal(str(v)) if not isinstance(v, Decimal) else v
return d.quantize(Decimal("0.01"))
return v
@ -204,7 +226,10 @@ class LamassuConfig(BaseModel):
last_poll_time: Optional[datetime] = None
last_successful_poll: Optional[datetime] = None
# DCA Client Limits
max_daily_limit_gtq: float = 2000.0 # Maximum daily limit for Fixed mode clients
max_daily_limit_gtq: Decimal = Decimal("2000") # Maximum daily limit for Fixed mode clients
class Config:
json_encoders = {Decimal: lambda v: float(v)}
class UpdateLamassuConfigData(BaseModel):
@ -226,6 +251,6 @@ class UpdateLamassuConfigData(BaseModel):
ssh_password: Optional[str] = None
ssh_private_key: Optional[str] = None
# DCA Client Limits
max_daily_limit_gtq: Optional[int] = None
max_daily_limit_gtq: Optional[Decimal] = None

View file

@ -10,7 +10,7 @@ from decimal import Decimal
from typing import Dict, List, Tuple
# Import from the parent package (following lnurlp pattern)
from ..calculations import calculate_commission, calculate_distribution, calculate_exchange_rate
from ..calculations import calculate_commission, calculate_distribution, calculate_exchange_rate, to_decimal
# =============================================================================
@ -245,11 +245,12 @@ class TestDistributionCalculation:
# Convert each client's sats back to fiat
total_fiat_distributed = sum(
sats / exchange_rate for sats in distributions.values()
Decimal(sats) / exchange_rate for sats in distributions.values()
)
# Should equal original fiat amount (within small rounding tolerance)
assert abs(total_fiat_distributed - fiat_amount) < 0.01, \
fiat_decimal = to_decimal(fiat_amount)
assert abs(total_fiat_distributed - fiat_decimal) < Decimal("0.01"), \
f"Fiat round-trip failed: {total_fiat_distributed:.2f} != {fiat_amount:.2f} " \
f"(crypto={crypto_atoms}, comm={comm_pct}, discount={discount})"
@ -287,11 +288,12 @@ class TestEmpiricalTransactions:
"expected_base_sats": 259029,
"expected_commission_sats": 7771,
"expected_distributions": {
# 259029 / 2 = 129514.5 → both get 129514 or 129515
# With banker's rounding: 129514.5 → 129514 (even)
# Remainder of 1 sat goes to first client by fractional sort
"client_a": 129515,
"client_b": 129514,
# 259029 / 2 = 129514.5 → both round to 129515 (ROUND_HALF_UP)
# Total = 259030, remainder = -1
# Both have same fractional (-0.5), client_a is first alphabetically
# So client_a gets -1 adjustment
"client_a": 129514,
"client_b": 129515,
},
},
# Add more scenarios from your real data!

View file

@ -3,6 +3,7 @@
import asyncio
import asyncpg
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import List, Optional, Dict, Any
from loguru import logger
import socket
@ -492,28 +493,38 @@ class LamassuTransactionProcessor:
results = []
for row in reader:
# Convert string values to appropriate types
# Use Decimal for monetary and percentage values
processed_row = {}
for key, value in row.items():
# Handle None/empty values consistently at data ingestion boundary
if value == '' or value is None:
if key in ['fiat_amount', 'crypto_amount']:
processed_row[key] = 0 # Default numeric fields to 0
if key == 'crypto_amount':
processed_row[key] = 0 # Sats are always int
elif key == 'fiat_amount':
processed_row[key] = Decimal("0") # Fiat as Decimal
elif key in ['commission_percentage', 'discount']:
processed_row[key] = 0.0 # Default percentage fields to 0.0
processed_row[key] = Decimal("0") # Percentages as Decimal
else:
processed_row[key] = None # Keep None for non-numeric fields
elif key in ['transaction_id', 'device_id', 'crypto_code', 'fiat_code']:
processed_row[key] = str(value)
elif key in ['fiat_amount', 'crypto_amount']:
elif key == 'crypto_amount':
try:
processed_row[key] = int(float(value))
processed_row[key] = int(float(value)) # Sats are always int
except (ValueError, TypeError):
processed_row[key] = 0 # Fallback to 0 for invalid values
processed_row[key] = 0
elif key == 'fiat_amount':
try:
# Convert via string to avoid float precision issues
processed_row[key] = Decimal(str(value))
except (ValueError, TypeError):
processed_row[key] = Decimal("0")
elif key in ['commission_percentage', 'discount']:
try:
processed_row[key] = float(value)
# Convert via string to avoid float precision issues
processed_row[key] = Decimal(str(value))
except (ValueError, TypeError):
processed_row[key] = 0.0 # Fallback to 0.0 for invalid values
processed_row[key] = Decimal("0")
elif key == 'transaction_time':
from datetime import datetime
# Parse PostgreSQL timestamp format and ensure it's in UTC for consistency
@ -679,13 +690,13 @@ class LamassuTransactionProcessor:
logger.info("No Flow Mode clients found - skipping distribution")
return {}
# Extract transaction details - guaranteed clean from data ingestion
# Extract transaction details - guaranteed clean from data ingestion (Decimal types)
crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in
fiat_amount = transaction.get("fiat_amount", 0) # Actual fiat dispensed (principal only)
commission_percentage = transaction.get("commission_percentage", 0.0) # Already stored as decimal (e.g., 0.045)
discount = transaction.get("discount", 0.0) # Discount percentage
fiat_amount = transaction.get("fiat_amount", Decimal("0")) # Actual fiat dispensed (principal only)
commission_percentage = transaction.get("commission_percentage", Decimal("0")) # Already stored as Decimal (e.g., 0.045)
discount = transaction.get("discount", Decimal("0")) # Discount percentage
transaction_time = transaction.get("transaction_time") # ATM transaction timestamp for temporal accuracy
# Normalize transaction_time to UTC if present
if transaction_time is not None:
if transaction_time.tzinfo is None:
@ -697,7 +708,7 @@ class LamassuTransactionProcessor:
original_tz = transaction_time.tzinfo
transaction_time = transaction_time.astimezone(timezone.utc)
logger.info(f"Converted transaction time from {original_tz} to UTC")
# Validate required fields
if crypto_atoms is None:
logger.error(f"Missing crypto_amount in transaction: {transaction}")
@ -707,10 +718,10 @@ class LamassuTransactionProcessor:
return {}
if commission_percentage is None:
logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0")
commission_percentage = 0.0
commission_percentage = Decimal("0")
if discount is None:
logger.info(f"Missing discount in transaction: {transaction}, defaulting to 0")
discount = 0.0
discount = Decimal("0")
if transaction_time is None:
logger.warning(f"Missing transaction_time in transaction: {transaction}")
# Could use current time as fallback, but this indicates a data issue
@ -733,15 +744,16 @@ class LamassuTransactionProcessor:
logger.warning("No transaction time available - using current balances (may be inaccurate)")
# Get balance summaries for all clients to calculate proportions
client_balances = {}
total_confirmed_deposits = 0
client_balances: Dict[str, Decimal] = {}
total_confirmed_deposits = Decimal("0")
min_balance = Decimal("0.01")
for client in flow_clients:
# Get balance as of the transaction time for temporal accuracy
balance = await get_client_balance_summary(client.id, as_of_time=transaction_time)
# Only include clients with positive remaining balance
# NOTE: This works for fiat amounts that use cents
if balance.remaining_balance >= 0.01:
if balance.remaining_balance >= min_balance:
client_balances[client.id] = balance.remaining_balance
total_confirmed_deposits += balance.remaining_balance
logger.debug(f"Client {client.id[:8]}... included with balance: {balance.remaining_balance:.2f} GTQ")
@ -766,7 +778,10 @@ class LamassuTransactionProcessor:
proportion = client_balances[client_id] / total_confirmed_deposits
# Calculate equivalent fiat value in GTQ for tracking purposes
client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0
if exchange_rate > 0:
client_fiat_amount = (Decimal(client_sats_amount) / exchange_rate).quantize(Decimal("0.01"))
else:
client_fiat_amount = Decimal("0")
distributions[client_id] = {
"fiat_amount": client_fiat_amount,
@ -1003,20 +1018,20 @@ class LamassuTransactionProcessor:
async def store_lamassu_transaction(self, transaction: Dict[str, Any]) -> Optional[str]:
"""Store the Lamassu transaction in our database for audit and UI"""
try:
# Extract transaction data - guaranteed clean from data ingestion boundary
# Extract transaction data - guaranteed clean from data ingestion boundary (Decimal types)
crypto_atoms = transaction.get("crypto_amount", 0)
fiat_amount = transaction.get("fiat_amount", 0)
commission_percentage = transaction.get("commission_percentage", 0.0)
discount = transaction.get("discount", 0.0)
fiat_amount = transaction.get("fiat_amount", Decimal("0"))
commission_percentage = transaction.get("commission_percentage", Decimal("0"))
discount = transaction.get("discount", Decimal("0"))
transaction_time = transaction.get("transaction_time")
# Normalize transaction_time to UTC if present
if transaction_time is not None:
if transaction_time.tzinfo is None:
transaction_time = transaction_time.replace(tzinfo=timezone.utc)
elif transaction_time.tzinfo != timezone.utc:
transaction_time = transaction_time.astimezone(timezone.utc)
# Calculate commission metrics using the extracted pure function
base_crypto_atoms, commission_amount_sats, effective_commission = calculate_commission(
crypto_atoms, commission_percentage, discount
@ -1024,11 +1039,13 @@ class LamassuTransactionProcessor:
# Calculate exchange rate
exchange_rate = calculate_exchange_rate(base_crypto_atoms, fiat_amount)
# Create transaction data with GTQ amounts
# Create transaction data with GTQ amounts (Decimal already, just ensure 2 decimal places)
fiat_amount_rounded = fiat_amount.quantize(Decimal("0.01")) if isinstance(fiat_amount, Decimal) else Decimal(str(fiat_amount)).quantize(Decimal("0.01"))
transaction_data = CreateLamassuTransactionData(
lamassu_transaction_id=transaction["transaction_id"],
fiat_amount=round(fiat_amount, 2), # Store GTQ with 2 decimal places
fiat_amount=fiat_amount_rounded,
crypto_amount=crypto_atoms,
commission_percentage=commission_percentage,
discount=discount,
@ -1148,9 +1165,9 @@ class LamassuTransactionProcessor:
# Calculate commission amount for sending to commission wallet
crypto_atoms = transaction.get("crypto_amount", 0)
commission_percentage = transaction.get("commission_percentage", 0.0)
discount = transaction.get("discount", 0.0)
commission_percentage = transaction.get("commission_percentage", Decimal("0"))
discount = transaction.get("discount", Decimal("0"))
# Calculate commission amount using the extracted pure function
_, commission_amount_sats, _ = calculate_commission(
crypto_atoms, commission_percentage, discount