test: add integration tests for CSV parsing and full distribution flow

Tests verify the complete data flow:
- CSV parsing → Decimal conversion (simulates execute_ssh_query)
- Commission calculations with parsed Decimal values
- Distribution calculations with 2 and 4 client scenarios
- Pydantic model creation with Decimal types
- Exchange rate precision and round-trip accuracy

Uses real Lamassu transaction data (8.75%, 5.5% commission rates,
discounts, multiple client configurations).

37 tests now pass (23 unit + 14 integration).

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
padreug 2026-01-11 14:51:23 +01:00
parent 6e86f53962
commit 49dd4d1844

379
tests/test_integration.py Normal file
View file

@ -0,0 +1,379 @@
"""
Integration tests for the full transaction processing flow.
These tests verify that data flows correctly from:
CSV parsing Decimal conversion calculations model creation
This gives us confidence that real Lamassu transactions will be
processed correctly end-to-end.
"""
import pytest
from decimal import Decimal
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, Any
import csv
import io
from ..calculations import calculate_commission, calculate_distribution, calculate_exchange_rate
from ..models import (
CreateDcaPaymentData,
CreateLamassuTransactionData,
ClientBalanceSummary,
)
# =============================================================================
# TEST DATA: Real Lamassu CSV output format
# =============================================================================
# This simulates what execute_ssh_query receives from the database
LAMASSU_CSV_DATA = {
"8.75pct_large": """transaction_id,fiat_amount,crypto_amount,transaction_time,device_id,status,commission_percentage,discount,crypto_code,fiat_code
abc123,2000,309200,2025-01-10 14:30:00+00,device1,confirmed,0.0875,0,BTC,GTQ""",
"5.5pct_no_discount": """transaction_id,fiat_amount,crypto_amount,transaction_time,device_id,status,commission_percentage,discount,crypto_code,fiat_code
def456,2000,309500,2025-01-10 15:00:00+00,device1,confirmed,0.055,0,BTC,GTQ""",
"5.5pct_90pct_discount": """transaction_id,fiat_amount,crypto_amount,transaction_time,device_id,status,commission_percentage,discount,crypto_code,fiat_code
ghi789,800,115000,2025-01-10 16:00:00+00,device1,confirmed,0.055,90,BTC,GTQ""",
"5.5pct_1300gtq_4clients": """transaction_id,fiat_amount,crypto_amount,transaction_time,device_id,status,commission_percentage,discount,crypto_code,fiat_code
jkl012,1300,205600,2025-01-10 17:00:00+00,device1,confirmed,0.055,0,BTC,GTQ""",
}
def parse_csv_like_transaction_processor(csv_data: str) -> Dict[str, Any]:
"""
Parse CSV data exactly like transaction_processor.execute_ssh_query does.
This is a copy of the parsing logic to test it in isolation.
"""
reader = csv.DictReader(io.StringIO(csv_data))
results = []
for row in reader:
processed_row = {}
for key, value in row.items():
if value == '' or value is None:
if key == 'crypto_amount':
processed_row[key] = 0
elif key == 'fiat_amount':
processed_row[key] = Decimal("0")
elif key in ['commission_percentage', 'discount']:
processed_row[key] = Decimal("0")
else:
processed_row[key] = None
elif key in ['transaction_id', 'device_id', 'crypto_code', 'fiat_code', 'status']:
processed_row[key] = str(value)
elif key == 'crypto_amount':
try:
processed_row[key] = int(float(value))
except (ValueError, TypeError):
processed_row[key] = 0
elif key == 'fiat_amount':
try:
processed_row[key] = Decimal(str(value))
except (ValueError, TypeError):
processed_row[key] = Decimal("0")
elif key in ['commission_percentage', 'discount']:
try:
processed_row[key] = Decimal(str(value))
except (ValueError, TypeError):
processed_row[key] = Decimal("0")
elif key == 'transaction_time':
timestamp_str = value
if timestamp_str.endswith('+00'):
timestamp_str = timestamp_str + ':00'
elif timestamp_str.endswith('Z'):
timestamp_str = timestamp_str.replace('Z', '+00:00')
try:
dt = datetime.fromisoformat(timestamp_str)
except ValueError:
dt = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
elif dt.tzinfo != timezone.utc:
dt = dt.astimezone(timezone.utc)
processed_row[key] = dt
else:
processed_row[key] = value
results.append(processed_row)
return results[0] if results else {}
# =============================================================================
# CSV PARSING TESTS
# =============================================================================
class TestCsvParsing:
"""Test that CSV parsing produces correct Decimal types."""
def test_parse_8_75pct_transaction(self):
"""Parse 8.75% commission transaction."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["8.75pct_large"])
assert tx["transaction_id"] == "abc123"
assert tx["crypto_amount"] == 309200
assert tx["fiat_amount"] == Decimal("2000")
assert tx["commission_percentage"] == Decimal("0.0875")
assert tx["discount"] == Decimal("0")
assert isinstance(tx["fiat_amount"], Decimal)
assert isinstance(tx["commission_percentage"], Decimal)
assert isinstance(tx["discount"], Decimal)
def test_parse_5_5pct_with_discount(self):
"""Parse 5.5% commission with 90% discount."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["5.5pct_90pct_discount"])
assert tx["crypto_amount"] == 115000
assert tx["fiat_amount"] == Decimal("800")
assert tx["commission_percentage"] == Decimal("0.055")
assert tx["discount"] == Decimal("90")
def test_timestamp_parsing(self):
"""Verify timestamp is parsed to UTC datetime."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["8.75pct_large"])
assert isinstance(tx["transaction_time"], datetime)
assert tx["transaction_time"].tzinfo == timezone.utc
# =============================================================================
# END-TO-END CALCULATION TESTS
# =============================================================================
class TestEndToEndCalculations:
"""
Test the full flow: CSV Decimal calculations expected results.
These use the same empirical data as test_calculations.py but verify
the data flows correctly through parsing.
"""
@pytest.mark.parametrize("csv_key,expected_base,expected_commission", [
("8.75pct_large", 284322, 24878),
("5.5pct_no_discount", 293365, 16135),
("5.5pct_90pct_discount", 114371, 629),
("5.5pct_1300gtq_4clients", 194882, 10718),
])
def test_csv_to_commission_calculation(self, csv_key, expected_base, expected_commission):
"""Verify CSV parsing → commission calculation produces expected results."""
# Parse CSV like transaction_processor does
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA[csv_key])
# Calculate commission using parsed Decimal values
base, commission, effective = calculate_commission(
tx["crypto_amount"],
tx["commission_percentage"],
tx["discount"]
)
assert base == expected_base, f"Base mismatch for {csv_key}"
assert commission == expected_commission, f"Commission mismatch for {csv_key}"
assert base + commission == tx["crypto_amount"], "Invariant: base + commission = total"
def test_full_distribution_flow_two_equal_clients(self):
"""Test full flow with two equal-balance clients."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["8.75pct_large"])
# Calculate commission
base_sats, commission_sats, effective = calculate_commission(
tx["crypto_amount"],
tx["commission_percentage"],
tx["discount"]
)
# Simulate client balances (as would come from database)
client_balances = {
"client_a": Decimal("1000.00"),
"client_b": Decimal("1000.00"),
}
# Calculate distribution
distributions = calculate_distribution(base_sats, client_balances)
# Verify results
assert sum(distributions.values()) == base_sats
assert len(distributions) == 2
# With equal balances, should be roughly equal (±1 sat for rounding)
assert abs(distributions["client_a"] - distributions["client_b"]) <= 1
def test_full_distribution_flow_four_clients(self):
"""Test the 1300 GTQ transaction with 4 clients of varying balances."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["5.5pct_1300gtq_4clients"])
# Calculate commission
base_sats, commission_sats, effective = calculate_commission(
tx["crypto_amount"],
tx["commission_percentage"],
tx["discount"]
)
assert base_sats == 194882
assert commission_sats == 10718
# Use the actual balance proportions from the real scenario
client_balances = {
"client_a": Decimal("1"),
"client_b": Decimal("986"),
"client_c": Decimal("14"),
"client_d": Decimal("4"),
}
distributions = calculate_distribution(base_sats, client_balances)
# Verify invariant
assert sum(distributions.values()) == base_sats
# Verify proportions are reasonable
total_balance = sum(client_balances.values())
for client_id, sats in distributions.items():
expected_proportion = client_balances[client_id] / total_balance
actual_proportion = Decimal(sats) / Decimal(base_sats)
# Allow 1% tolerance for rounding
assert abs(actual_proportion - expected_proportion) < Decimal("0.01"), \
f"Client {client_id} proportion off: {actual_proportion} vs {expected_proportion}"
# =============================================================================
# MODEL CREATION TESTS
# =============================================================================
class TestModelCreation:
"""Test that Pydantic models accept Decimal values correctly."""
def test_create_lamassu_transaction_data_with_decimals(self):
"""Verify CreateLamassuTransactionData accepts Decimal values."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["8.75pct_large"])
base_sats, commission_sats, effective = calculate_commission(
tx["crypto_amount"],
tx["commission_percentage"],
tx["discount"]
)
exchange_rate = calculate_exchange_rate(base_sats, tx["fiat_amount"])
# This should not raise any validation errors
data = CreateLamassuTransactionData(
lamassu_transaction_id=tx["transaction_id"],
fiat_amount=tx["fiat_amount"],
crypto_amount=tx["crypto_amount"],
commission_percentage=tx["commission_percentage"],
discount=tx["discount"],
effective_commission=effective,
commission_amount_sats=commission_sats,
base_amount_sats=base_sats,
exchange_rate=exchange_rate,
crypto_code=tx["crypto_code"],
fiat_code=tx["fiat_code"],
device_id=tx["device_id"],
transaction_time=tx["transaction_time"],
)
assert data.fiat_amount == Decimal("2000")
assert data.commission_percentage == Decimal("0.0875")
assert data.base_amount_sats == 284322
assert data.commission_amount_sats == 24878
def test_create_dca_payment_data_with_decimals(self):
"""Verify CreateDcaPaymentData accepts Decimal values."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["5.5pct_no_discount"])
base_sats, _, _ = calculate_commission(
tx["crypto_amount"],
tx["commission_percentage"],
tx["discount"]
)
exchange_rate = calculate_exchange_rate(base_sats, tx["fiat_amount"])
# Simulate a client getting half the distribution
client_sats = base_sats // 2
client_fiat = (Decimal(client_sats) / exchange_rate).quantize(Decimal("0.01"))
# This should not raise any validation errors
data = CreateDcaPaymentData(
client_id="test_client_123",
amount_sats=client_sats,
amount_fiat=client_fiat,
exchange_rate=exchange_rate,
transaction_type="flow",
lamassu_transaction_id=tx["transaction_id"],
transaction_time=tx["transaction_time"],
)
assert isinstance(data.amount_fiat, Decimal)
assert isinstance(data.exchange_rate, Decimal)
assert data.amount_sats == client_sats
def test_client_balance_summary_with_decimals(self):
"""Verify ClientBalanceSummary accepts Decimal values."""
summary = ClientBalanceSummary(
client_id="test_client",
total_deposits=Decimal("5000.00"),
total_payments=Decimal("1234.56"),
remaining_balance=Decimal("3765.44"),
currency="GTQ",
)
assert summary.remaining_balance == Decimal("3765.44")
assert isinstance(summary.total_deposits, Decimal)
# =============================================================================
# EXCHANGE RATE PRECISION TESTS
# =============================================================================
class TestExchangeRatePrecision:
"""Test that exchange rate calculations maintain precision."""
def test_exchange_rate_round_trip(self):
"""Verify sats → fiat → sats round-trip maintains precision."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["5.5pct_1300gtq_4clients"])
base_sats, _, _ = calculate_commission(
tx["crypto_amount"],
tx["commission_percentage"],
tx["discount"]
)
exchange_rate = calculate_exchange_rate(base_sats, tx["fiat_amount"])
# Convert sats to fiat and back
fiat_equivalent = Decimal(base_sats) / exchange_rate
sats_back = int((fiat_equivalent * exchange_rate).quantize(Decimal("1")))
# Should be within 1 sat of original
assert abs(sats_back - base_sats) <= 1
def test_per_client_fiat_sums_to_total(self):
"""Verify per-client fiat amounts sum to total fiat (within tolerance)."""
tx = parse_csv_like_transaction_processor(LAMASSU_CSV_DATA["5.5pct_1300gtq_4clients"])
base_sats, _, _ = calculate_commission(
tx["crypto_amount"],
tx["commission_percentage"],
tx["discount"]
)
exchange_rate = calculate_exchange_rate(base_sats, tx["fiat_amount"])
client_balances = {
"client_a": Decimal("1"),
"client_b": Decimal("986"),
"client_c": Decimal("14"),
"client_d": Decimal("4"),
}
distributions = calculate_distribution(base_sats, client_balances)
# Calculate per-client fiat and sum
total_fiat_distributed = Decimal("0")
for client_id, sats in distributions.items():
client_fiat = (Decimal(sats) / exchange_rate).quantize(Decimal("0.01"))
total_fiat_distributed += client_fiat
# Should be within 0.05 GTQ of original (accounting for per-client rounding)
# This is the 0.01 discrepancy we discussed, multiplied by number of clients
assert abs(total_fiat_distributed - tx["fiat_amount"]) < Decimal("0.05"), \
f"Total distributed {total_fiat_distributed} vs original {tx['fiat_amount']}"