From b96837164eea01670f1f3b80d2b23d2166fbf862 Mon Sep 17 00:00:00 2001 From: Padreug Date: Thu, 14 May 2026 19:00:43 +0200 Subject: [PATCH] chore(v2): dead-code purge (fix bundle 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ~1300 lines removed across four cleanups. Pure deletions; no behavioural changes. 1. **transaction_processor.py — DELETED (1274 lines).** Orphaned v1 file that hasn't been imported anywhere since fix-bundle-1 wired the v2 distribution chain. The historical Lamassu logic is preserved in git history at any commit on main. 2. **views_api.v2_in_progress_stub — DELETED.** The catch-all that returned 503 for any unmatched /api/v1/dca/* path. With P3a–P9g shipped, every documented endpoint is implemented; the catch-all was stale and (per issue #11 M7) unauthenticated, so it leaked the extension's existence to anonymous probes. Removed entirely. 3. **tasks.hourly_transaction_polling — DELETED.** v1 LegacyLamassu polling no-op. The associated `create_permanent_unique_task` spawn in __init__.py is also gone (was spawning a forever-sleeping task for no reason). 4. **__init__.py scaffolding artifacts.** - Replaced the placeholder "you can debug in your extension using 'import logger from loguru'" template log with a meaningful "satmachineadmin v2 loaded" INFO line. - Dropped the now-stale `hourly_transaction_polling` import + spawn. - Sorted __all__ (RUF022). Migration collapse (m001..m007 → single m001_v2_initial) was on the fix-bundle-3 list but is deferred to a separate PR. The current migrations are harmless on fresh installs (idempotent CREATE/DROP chain) and collapsing them risks breaking the LNbits version tracker on the off chance any operator has v1 data; better to do that as a dedicated migration-discipline change once we're confident no v1 operator data exists in the wild. Routes: 34 → 33 (catch-all gone). 76/76 tests pass. Refs: aiolabs/satmachineadmin#11 — fix bundle 3 ✅ (modulo migration collapse). Remaining in #11: M1-M12 + N1-N12. Co-Authored-By: Claude Opus 4.7 (1M context) --- __init__.py | 25 +- tasks.py | 6 - transaction_processor.py | 1274 -------------------------------------- views_api.py | 18 - 4 files changed, 11 insertions(+), 1312 deletions(-) delete mode 100644 transaction_processor.py diff --git a/__init__.py b/__init__.py index 8c16954..162f3dc 100644 --- a/__init__.py +++ b/__init__.py @@ -5,17 +5,16 @@ from lnbits.tasks import create_permanent_unique_task from loguru import logger from .crud import db -from .tasks import wait_for_paid_invoices, hourly_transaction_polling +from .tasks import wait_for_paid_invoices from .views import satmachineadmin_generic_router from .views_api import satmachineadmin_api_router -logger.debug( - "This logged message is from satmachineadmin/__init__.py, you can debug in your " - "extension using 'import logger from loguru' and 'logger.debug()'." +logger.info("satmachineadmin v2 loaded") + + +satmachineadmin_ext: APIRouter = APIRouter( + prefix="/satmachineadmin", tags=["DCA Admin"] ) - - -satmachineadmin_ext: APIRouter = APIRouter(prefix="/satmachineadmin", tags=["DCA Admin"]) satmachineadmin_ext.include_router(satmachineadmin_generic_router) satmachineadmin_ext.include_router(satmachineadmin_api_router) @@ -38,19 +37,17 @@ def satmachineadmin_stop(): def satmachineadmin_start(): - # Start invoice listener task - invoice_task = create_permanent_unique_task("ext_satmachineadmin", wait_for_paid_invoices) + # bitSpire invoice listener — replaces the v1 SSH/PostgreSQL poller. + invoice_task = create_permanent_unique_task( + "ext_satmachineadmin", wait_for_paid_invoices + ) scheduled_tasks.append(invoice_task) - - # Start hourly transaction polling task - polling_task = create_permanent_unique_task("ext_satmachineadmin_polling", hourly_transaction_polling) - scheduled_tasks.append(polling_task) __all__ = [ "db", "satmachineadmin_ext", - "satmachineadmin_static_files", "satmachineadmin_start", + "satmachineadmin_static_files", "satmachineadmin_stop", ] diff --git a/tasks.py b/tasks.py index 68bbd30..1e6cf48 100644 --- a/tasks.py +++ b/tasks.py @@ -95,9 +95,3 @@ async def _handle_payment(payment: Payment) -> None: task.add_done_callback(_inflight_distributions.discard) -async def hourly_transaction_polling() -> None: - """No-op placeholder. The v1 Lamassu PostgreSQL poller is gone — bitSpire - settlements arrive push-based via Nostr kind-21000 in v2.""" - logger.debug("satmachineadmin v2: legacy polling stub (no-op).") - while True: - await asyncio.sleep(3600) diff --git a/transaction_processor.py b/transaction_processor.py deleted file mode 100644 index 661e5ab..0000000 --- a/transaction_processor.py +++ /dev/null @@ -1,1274 +0,0 @@ -# Transaction processing and polling service for Lamassu ATM integration - -import asyncio -import asyncpg -from datetime import datetime, timedelta, timezone -from typing import List, Optional, Dict, Any -from loguru import logger -import socket -import threading -import time - -try: - import asyncssh - SSH_AVAILABLE = True -except ImportError: - try: - # Fallback to subprocess-based SSH tunnel - import subprocess - SSH_AVAILABLE = True - except ImportError: - SSH_AVAILABLE = False - logger.warning("SSH tunnel support not available") - -from lnbits.core.services import create_invoice, pay_invoice -from lnbits.core.crud.wallets import get_wallet -from lnbits.core.services import update_wallet_balance -from lnbits.settings import settings - -from .calculations import calculate_commission, calculate_distribution, calculate_exchange_rate -from .crud import ( - get_flow_mode_clients, - get_payments_by_lamassu_transaction, - create_dca_payment, - get_client_balance_summary, - get_active_lamassu_config, - update_config_test_result, - update_poll_start_time, - update_poll_success_time, - update_dca_payment_status, - create_lamassu_transaction, - update_lamassu_transaction_distribution_stats -) -from .models import CreateDcaPaymentData, LamassuTransaction, DcaClient, CreateLamassuTransactionData - - -class LamassuTransactionProcessor: - """Handles polling Lamassu database and processing transactions for DCA distribution""" - - def __init__(self): - self.last_check_time = None - self.processed_transaction_ids = set() - self.ssh_process = None - self.ssh_key_path = None - self.ssh_config_path = None - - async def get_db_config(self) -> Optional[Dict[str, Any]]: - """Get database configuration from the database""" - try: - config = await get_active_lamassu_config() - if not config: - logger.error("No active Lamassu database configuration found") - return None - - return { - "host": config.host, - "port": config.port, - "database": config.database_name, - "user": config.username, - "password": config.password, - "config_id": config.id, - "use_ssh_tunnel": config.use_ssh_tunnel, - "ssh_host": config.ssh_host, - "ssh_port": config.ssh_port, - "ssh_username": config.ssh_username, - "ssh_password": config.ssh_password, - "ssh_private_key": config.ssh_private_key - } - except Exception as e: - logger.error(f"Error getting database configuration: {e}") - return None - - def setup_ssh_tunnel(self, db_config: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Setup SSH tunnel if required and return modified connection config""" - if not db_config.get("use_ssh_tunnel"): - return db_config - - if not SSH_AVAILABLE: - logger.error("SSH tunnel requested but SSH libraries not available") - return None - - try: - # Close existing tunnel if any - self.close_ssh_tunnel() - - # Use subprocess-based SSH tunnel as fallback - return self._setup_subprocess_ssh_tunnel(db_config) - - except Exception as e: - logger.error(f"Failed to setup SSH tunnel: {e}") - self.close_ssh_tunnel() - return None - - def _setup_subprocess_ssh_tunnel(self, db_config: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Setup SSH tunnel using subprocess (compatible with all environments)""" - import subprocess - import socket - - # Find an available local port - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - local_port = s.getsockname()[1] - - # Build SSH command - ssh_cmd = [ - "ssh", - "-N", # Don't execute remote command - "-L", f"{local_port}:{db_config['host']}:{db_config['port']}", - f"{db_config['ssh_username']}@{db_config['ssh_host']}", - "-p", str(db_config['ssh_port']), - "-o", "StrictHostKeyChecking=no", - "-o", "UserKnownHostsFile=/dev/null", - "-o", "LogLevel=ERROR", - "-o", "ConnectTimeout=10", - "-o", "ServerAliveInterval=60" - ] - - # Add authentication method - if db_config.get("ssh_password"): - # Check if sshpass is available for password authentication - try: - import subprocess - subprocess.run(["which", "sshpass"], check=True, capture_output=True) - ssh_cmd = ["sshpass", "-p", db_config["ssh_password"]] + ssh_cmd - except subprocess.CalledProcessError: - logger.error("Password authentication requires 'sshpass' tool which is not installed. Please use SSH key authentication instead.") - return None - elif db_config.get("ssh_private_key"): - # Write private key and SSH config to temporary files - import tempfile - import os - key_fd, key_path = tempfile.mkstemp(suffix='.pem') - config_fd, config_path = tempfile.mkstemp(suffix='.ssh_config') - try: - # Prepare key content with proper line endings and final newline - key_data = db_config["ssh_private_key"] - key_data = key_data.replace('\r\n', '\n').replace('\r', '\n') # Normalize line endings - if not key_data.endswith('\n'): - key_data += '\n' # Ensure newline at end of file - - with os.fdopen(key_fd, 'w', encoding='utf-8') as f: - f.write(key_data) - - os.chmod(key_path, 0o600) - - # Create temporary SSH config file with strict settings - ssh_config = f"""Host {db_config['ssh_host']} - HostName {db_config['ssh_host']} - Port {db_config['ssh_port']} - User {db_config['ssh_username']} - IdentityFile {key_path} - IdentitiesOnly yes - PasswordAuthentication no - PubkeyAuthentication yes - PreferredAuthentications publickey - NumberOfPasswordPrompts 0 - IdentityAgent none - ControlMaster no - ControlPath none - StrictHostKeyChecking no - UserKnownHostsFile /dev/null - LogLevel ERROR - ConnectTimeout 10 - ServerAliveInterval 60 -""" - - with os.fdopen(config_fd, 'w', encoding='utf-8') as f: - f.write(ssh_config) - - os.chmod(config_path, 0o600) - - # Use the custom config file - ssh_cmd.extend([ - "-F", config_path, - db_config['ssh_host'] - ]) - print(ssh_cmd) - - self.ssh_key_path = key_path # Store for cleanup - self.ssh_config_path = config_path # Store for cleanup - except Exception as e: - os.unlink(key_path) - if 'config_path' in locals(): - os.unlink(config_path) - raise e - else: - logger.error("SSH tunnel requires either private key or password") - return None - - # Start SSH tunnel process - try: - self.ssh_process = subprocess.Popen( - ssh_cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL - ) - - # Wait a moment for tunnel to establish - import time - time.sleep(2) - - # Check if process is still running - if self.ssh_process.poll() is not None: - raise Exception("SSH tunnel process terminated immediately") - - logger.info(f"SSH tunnel established: localhost:{local_port} -> {db_config['ssh_host']}:{db_config['ssh_port']} -> {db_config['host']}:{db_config['port']}") - - # Return modified config to connect through tunnel - tunnel_config = db_config.copy() - tunnel_config["host"] = "127.0.0.1" - tunnel_config["port"] = local_port - - return tunnel_config - - except FileNotFoundError: - logger.error("SSH command not found. SSH tunneling requires ssh (and sshpass for password auth) to be installed on the system.") - return None - except Exception as e: - logger.error(f"Failed to establish SSH tunnel: {e}") - return None - - def close_ssh_tunnel(self): - """Close SSH tunnel if active""" - # Close subprocess-based tunnel - if hasattr(self, 'ssh_process') and self.ssh_process: - try: - self.ssh_process.terminate() - self.ssh_process.wait(timeout=5) - logger.info("SSH tunnel process closed") - except Exception as e: - logger.warning(f"Error closing SSH tunnel process: {e}") - try: - self.ssh_process.kill() - except: - pass - finally: - self.ssh_process = None - - # Clean up temporary key file if exists - if hasattr(self, 'ssh_key_path') and self.ssh_key_path: - try: - import os - os.unlink(self.ssh_key_path) - logger.info("SSH key file cleaned up") - except Exception as e: - logger.warning(f"Error cleaning up SSH key file: {e}") - finally: - self.ssh_key_path = None - - # Clean up temporary SSH config file if exists - if hasattr(self, 'ssh_config_path') and self.ssh_config_path: - try: - import os - os.unlink(self.ssh_config_path) - logger.info("SSH config file cleaned up") - except Exception as e: - logger.warning(f"Error cleaning up SSH config file: {e}") - finally: - self.ssh_config_path = None - - async def test_connection_detailed(self) -> Dict[str, Any]: - """Test connection with detailed step-by-step reporting""" - result = { - "success": False, - "message": "", - "steps": [], - "ssh_tunnel_used": False, - "ssh_tunnel_success": False, - "database_connection_success": False, - "config_id": None - } - - try: - # Step 1: Get configuration - result["steps"].append("Retrieving database configuration...") - db_config = await self.get_db_config() - if not db_config: - result["message"] = "No active Lamassu database configuration found" - result["steps"].append("❌ No configuration found") - return result - - result["config_id"] = db_config["config_id"] - result["steps"].append("✅ Configuration retrieved") - - # Step 2: SSH Tunnel setup (if required) - if db_config.get("use_ssh_tunnel"): - result["ssh_tunnel_used"] = True - result["steps"].append("Setting up SSH tunnel...") - - if not SSH_AVAILABLE: - result["message"] = "SSH tunnel required but SSH support not available" - result["steps"].append("❌ SSH support missing (requires ssh command line tool)") - return result - - connection_config = self.setup_ssh_tunnel(db_config) - if not connection_config: - result["message"] = "Failed to establish SSH tunnel" - result["steps"].append("❌ SSH tunnel failed - check SSH credentials and server accessibility") - return result - - result["ssh_tunnel_success"] = True - result["steps"].append(f"✅ SSH tunnel established to {db_config['ssh_host']}:{db_config['ssh_port']}") - else: - connection_config = db_config - result["steps"].append("ℹ️ Direct database connection (no SSH tunnel)") - - # Step 3: Test SSH-based database query - result["steps"].append("Testing database query via SSH...") - test_query = "SELECT 1 as test" - test_results = await self.execute_ssh_query(db_config, test_query) - - if not test_results: - result["message"] = "SSH connection succeeded but database query failed" - result["steps"].append("❌ Database query test failed") - return result - - result["database_connection_success"] = True - result["steps"].append("✅ Database query test successful") - - # Step 4: Test actual table access and check timezone - result["steps"].append("Testing access to cash_out_txs table...") - table_query = "SELECT COUNT(*) FROM cash_out_txs" - table_results = await self.execute_ssh_query(db_config, table_query) - - if not table_results: - result["message"] = "Connected but cash_out_txs table not accessible" - result["steps"].append("❌ Table access failed") - return result - - count = table_results[0].get('count', 0) - result["steps"].append(f"✅ Table access successful (found {count} transactions)") - - # Step 5: Check database timezone - result["steps"].append("Checking database timezone...") - timezone_query = "SELECT NOW() as db_time, EXTRACT(timezone FROM NOW()) as timezone_offset" - timezone_results = await self.execute_ssh_query(db_config, timezone_query) - - if timezone_results: - db_time = timezone_results[0].get('db_time', 'unknown') - timezone_offset = timezone_results[0].get('timezone_offset', 'unknown') - result["steps"].append(f"✅ Database time: {db_time} (offset: {timezone_offset})") - else: - result["steps"].append("⚠️ Could not determine database timezone") - - result["success"] = True - result["message"] = "All connection tests passed successfully" - - except Exception as e: - error_msg = str(e) - if "cash_out_txs" in error_msg: - result["message"] = "Connected to database but cash_out_txs table not found" - result["steps"].append("❌ Lamassu transaction table missing") - elif "ssh" in error_msg.lower() or "connection" in error_msg.lower(): - result["message"] = f"SSH connection error: {error_msg}" - result["steps"].append(f"❌ SSH error: {error_msg}") - elif "permission denied" in error_msg.lower() or "authentication" in error_msg.lower(): - result["message"] = f"SSH authentication failed: {error_msg}" - result["steps"].append(f"❌ SSH authentication error: {error_msg}") - else: - result["message"] = f"Connection test failed: {error_msg}" - result["steps"].append(f"❌ Unexpected error: {error_msg}") - - # Update test result in database - if result["config_id"]: - try: - await update_config_test_result(result["config_id"], result["success"]) - except Exception as e: - logger.warning(f"Could not update config test result: {e}") - - return result - - async def connect_to_lamassu_db(self) -> Optional[Dict[str, Any]]: - """Get database configuration (returns config dict instead of connection)""" - try: - db_config = await self.get_db_config() - if not db_config: - return None - - # Update test result on successful config retrieval - try: - await update_config_test_result(db_config["config_id"], True) - except Exception as e: - logger.warning(f"Could not update config test result: {e}") - - return db_config - except Exception as e: - logger.error(f"Failed to get database configuration: {e}") - return None - - async def execute_ssh_query(self, db_config: Dict[str, Any], query: str) -> List[Dict[str, Any]]: - """Execute a query via SSH connection""" - import subprocess - import json - import asyncio - - try: - # Build SSH command to execute the query - ssh_cmd = [ - "ssh", - f"{db_config['ssh_username']}@{db_config['ssh_host']}", - "-p", str(db_config['ssh_port']), - "-o", "StrictHostKeyChecking=no", - "-o", "UserKnownHostsFile=/dev/null", - "-o", "LogLevel=ERROR" - ] - - # Add key authentication if provided - if db_config.get("ssh_private_key"): - import tempfile - import os - key_fd, key_path = tempfile.mkstemp(suffix='.pem') - config_fd, config_path = tempfile.mkstemp(suffix='.ssh_config') - try: - # Prepare key content with proper line endings and final newline - key_data = db_config["ssh_private_key"] - key_data = key_data.replace('\r\n', '\n').replace('\r', '\n') # Normalize line endings - if not key_data.endswith('\n'): - key_data += '\n' # Ensure newline at end of file - - with os.fdopen(key_fd, 'w', encoding='utf-8') as f: - f.write(key_data) - os.chmod(key_path, 0o600) - - # Create temporary SSH config file with strict settings - ssh_config = f"""Host {db_config['ssh_host']} - HostName {db_config['ssh_host']} - Port {db_config['ssh_port']} - User {db_config['ssh_username']} - IdentityFile {key_path} - IdentitiesOnly yes - PasswordAuthentication no - PubkeyAuthentication yes - PreferredAuthentications publickey - NumberOfPasswordPrompts 0 - IdentityAgent none - ControlMaster no - ControlPath none - StrictHostKeyChecking no - UserKnownHostsFile /dev/null - LogLevel ERROR - ConnectTimeout 10 - ServerAliveInterval 60 -""" - - with os.fdopen(config_fd, 'w', encoding='utf-8') as f: - f.write(ssh_config) - os.chmod(config_path, 0o600) - - # Use the custom config file - ssh_cmd = [ - "ssh", - "-F", config_path, - db_config['ssh_host'] - ] - - # Build the psql command to return JSON - psql_cmd = f"psql {db_config['database']} -t -c \"COPY ({query}) TO STDOUT WITH CSV HEADER\"" - ssh_cmd.append(psql_cmd) - - # Execute the command - process = await asyncio.create_subprocess_exec( - *ssh_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - - stdout, stderr = await process.communicate() - - if process.returncode != 0: - logger.error(f"SSH query failed: {stderr.decode()}") - return [] - - # Parse CSV output - import csv - import io - - csv_data = stdout.decode() - if not csv_data.strip(): - return [] - - reader = csv.DictReader(io.StringIO(csv_data)) - results = [] - for row in reader: - # Convert string values to appropriate types - processed_row = {} - for key, value in row.items(): - # Handle None/empty values consistently at data ingestion boundary - if value == '' or value is None: - if key in ['fiat_amount', 'crypto_amount']: - processed_row[key] = 0 # Default numeric fields to 0 - elif key in ['commission_percentage', 'discount']: - processed_row[key] = 0.0 # Default percentage fields to 0.0 - else: - processed_row[key] = None # Keep None for non-numeric fields - elif key in ['transaction_id', 'device_id', 'crypto_code', 'fiat_code']: - processed_row[key] = str(value) - elif key in ['fiat_amount', 'crypto_amount']: - try: - processed_row[key] = int(float(value)) - except (ValueError, TypeError): - processed_row[key] = 0 # Fallback to 0 for invalid values - elif key in ['commission_percentage', 'discount']: - try: - processed_row[key] = float(value) - except (ValueError, TypeError): - processed_row[key] = 0.0 # Fallback to 0.0 for invalid values - elif key == 'transaction_time': - from datetime import datetime - # Parse PostgreSQL timestamp format and ensure it's in UTC for consistency - # Handle formats like: '2025-07-04 23:12:42.627+00' (PostgreSQL format) - timestamp_str = value - - # Fix PostgreSQL timezone format: +00 -> +00:00 - if timestamp_str.endswith('+00'): - timestamp_str = timestamp_str + ':00' - elif timestamp_str.endswith('Z'): - timestamp_str = timestamp_str.replace('Z', '+00:00') - - try: - dt = datetime.fromisoformat(timestamp_str) - except ValueError as e: - logger.error(f"Failed to parse timestamp '{value}': {e}") - # Fallback to current time with warning - dt = datetime.now(timezone.utc) - logger.warning(f"Using current UTC time as fallback for invalid timestamp: {dt}") - - # Convert to UTC if not already - if dt.tzinfo is None: - # Assume UTC if no timezone info - dt = dt.replace(tzinfo=timezone.utc) - elif dt.tzinfo != timezone.utc: - # Convert to UTC - dt = dt.astimezone(timezone.utc) - processed_row[key] = dt - else: - processed_row[key] = value - results.append(processed_row) - - return results - - finally: - os.unlink(key_path) - if 'config_path' in locals(): - os.unlink(config_path) - - else: - logger.error("SSH private key required for database queries") - return [] - - except Exception as e: - logger.error(f"Error executing SSH query: {e}") - return [] - - async def fetch_transaction_by_id(self, db_config: Dict[str, Any], transaction_id: str) -> Optional[Dict[str, Any]]: - """Fetch a specific transaction by ID from Lamassu database, bypassing all status filters""" - try: - logger.info(f"Fetching transaction {transaction_id} from Lamassu database (bypass all filters)") - - # Query for specific transaction ID without any status/dispense filters - lamassu_query = f""" - SELECT - co.id as transaction_id, - co.fiat as fiat_amount, - co.crypto_atoms as crypto_amount, - co.confirmed_at as transaction_time, - co.device_id, - co.status, - co.commission_percentage, - co.discount, - co.crypto_code, - co.fiat_code, - co.dispense, - co.dispense_confirmed - FROM cash_out_txs co - WHERE co.id = '{transaction_id}' - """ - - results = await self.execute_ssh_query(db_config, lamassu_query) - - if not results: - logger.warning(f"Transaction {transaction_id} not found in Lamassu database") - return None - - transaction = results[0] - logger.info(f"Found transaction {transaction_id}: status={transaction.get('status')}, dispense={transaction.get('dispense')}, dispense_confirmed={transaction.get('dispense_confirmed')}") - return transaction - - except Exception as e: - logger.error(f"Error fetching transaction {transaction_id} from Lamassu database: {e}") - return None - - async def fetch_new_transactions(self, db_config: Dict[str, Any]) -> List[Dict[str, Any]]: - """Fetch new successful transactions from Lamassu database since last poll""" - try: - # Determine the time threshold based on last successful poll - config = await get_active_lamassu_config() - if config and config.last_successful_poll: - # Use last successful poll time - time_threshold = config.last_successful_poll - logger.info(f"Checking for transactions since last successful poll: {time_threshold}") - else: - # Fallback to last 24 hours for first run or if no previous poll - time_threshold = datetime.now(timezone.utc) - timedelta(hours=24) - logger.info(f"No previous poll found, checking last 24 hours since: {time_threshold}") - - # Convert to UTC if not already timezone-aware - if time_threshold.tzinfo is None: - time_threshold = time_threshold.replace(tzinfo=timezone.utc) - elif time_threshold.tzinfo != timezone.utc: - time_threshold = time_threshold.astimezone(timezone.utc) - - # Format as UTC for database query - time_threshold_str = time_threshold.strftime('%Y-%m-%d %H:%M:%S UTC') - - # First, get all transactions since the threshold from Lamassu database - # Filter out unconfirmed dispenses - # TODO: review - lamassu_query = f""" - SELECT - co.id as transaction_id, - co.fiat as fiat_amount, - co.crypto_atoms as crypto_amount, - co.confirmed_at as transaction_time, - co.device_id, - co.status, - co.commission_percentage, - co.discount, - co.crypto_code, - co.fiat_code - FROM cash_out_txs co - WHERE co.confirmed_at > '{time_threshold_str}' - AND co.status IN ('confirmed', 'authorized') - AND co.dispense = 't' - AND co.dispense_confirmed = 't' - ORDER BY co.confirmed_at ASC - """ - - all_transactions = await self.execute_ssh_query(db_config, lamassu_query) - - # Then filter out already processed transactions using our local database - from .crud import get_all_payments - processed_payments = await get_all_payments() - processed_transaction_ids = { - payment.lamassu_transaction_id - for payment in processed_payments - if payment.lamassu_transaction_id - } - - # Filter out already processed transactions - new_transactions = [ - tx for tx in all_transactions - if tx['transaction_id'] not in processed_transaction_ids - ] - - logger.info(f"Found {len(all_transactions)} total transactions since {time_threshold}, {len(new_transactions)} are new") - return new_transactions - - except Exception as e: - logger.error(f"Error fetching transactions from Lamassu database: {e}") - return [] - - async def calculate_distribution_amounts(self, transaction: Dict[str, Any]) -> tuple[Dict[str, Any], int]: - """Calculate how much each Flow Mode client should receive. - - Returns: - tuple: (distributions dict, orphan_sats int) - - distributions: {client_id: {fiat_amount, sats_amount, exchange_rate}} - - orphan_sats: sats that couldn't be distributed due to sync mismatch - """ - try: - # Get all active Flow Mode clients - flow_clients = await get_flow_mode_clients() - - if not flow_clients: - logger.info("No Flow Mode clients found - skipping distribution") - return {}, 0 - - # Extract transaction details - guaranteed clean from data ingestion - crypto_atoms = transaction.get("crypto_amount", 0) # Total sats with commission baked in - fiat_amount = transaction.get("fiat_amount", 0) # Actual fiat dispensed (principal only) - commission_percentage = transaction.get("commission_percentage", 0.0) # Already stored as decimal (e.g., 0.045) - discount = transaction.get("discount", 0.0) # Discount percentage - transaction_time = transaction.get("transaction_time") # ATM transaction timestamp for temporal accuracy - - # Normalize transaction_time to UTC if present - if transaction_time is not None: - if transaction_time.tzinfo is None: - # Assume UTC if no timezone info - transaction_time = transaction_time.replace(tzinfo=timezone.utc) - logger.warning("Transaction time was timezone-naive, assuming UTC") - elif transaction_time.tzinfo != timezone.utc: - # Convert to UTC - original_tz = transaction_time.tzinfo - transaction_time = transaction_time.astimezone(timezone.utc) - logger.info(f"Converted transaction time from {original_tz} to UTC") - - # Validate required fields - if crypto_atoms is None: - logger.error(f"Missing crypto_amount in transaction: {transaction}") - return {}, 0 - if fiat_amount is None: - logger.error(f"Missing fiat_amount in transaction: {transaction}") - return {}, 0 - if commission_percentage is None: - logger.warning(f"Missing commission_percentage in transaction: {transaction}, defaulting to 0") - commission_percentage = 0.0 - if discount is None: - logger.info(f"Missing discount in transaction: {transaction}, defaulting to 0") - discount = 0.0 - if transaction_time is None: - logger.warning(f"Missing transaction_time in transaction: {transaction}") - # Could use current time as fallback, but this indicates a data issue - # transaction_time = datetime.now(timezone.utc) - - # Calculate commission split using the extracted pure function - base_crypto_atoms, commission_amount_sats, effective_commission = calculate_commission( - crypto_atoms, commission_percentage, discount - ) - - # Calculate exchange rate based on base amounts - exchange_rate = calculate_exchange_rate(base_crypto_atoms, fiat_amount) - - logger.info(f"Transaction - Total crypto: {crypto_atoms} sats") - logger.info(f"Commission: {commission_percentage*100:.1f}% - {discount:.1f}% discount = {effective_commission*100:.1f}% effective ({commission_amount_sats} sats)") - logger.info(f"Base for DCA: {base_crypto_atoms} sats, Fiat dispensed: {fiat_amount}, Exchange rate: {exchange_rate:.2f} sats/fiat_unit") - if transaction_time: - logger.info(f"Calculating balances as of transaction time: {transaction_time}") - else: - logger.warning("No transaction time available - using current balances (may be inaccurate)") - - # Get balance summaries for all clients to calculate proportions - client_balances = {} - total_confirmed_deposits = 0 - - for client in flow_clients: - # Get balance as of the transaction time for temporal accuracy - balance = await get_client_balance_summary(client.id, as_of_time=transaction_time) - # Only include clients with positive remaining balance - # NOTE: This works for fiat amounts that use cents - if balance.remaining_balance >= 0.01: - client_balances[client.id] = balance.remaining_balance - total_confirmed_deposits += balance.remaining_balance - logger.debug(f"Client {client.id[:8]}... included with balance: {balance.remaining_balance:.2f} GTQ") - else: - logger.info(f"Client {client.id[:8]}... excluded - zero/negative balance: {balance.remaining_balance:.2f} GTQ") - - if total_confirmed_deposits == 0: - logger.info("No clients with remaining DCA balance - skipping distribution") - return {}, 0 - - # Detect sync mismatch: more money in ATM than tracked client balances - sync_mismatch = total_confirmed_deposits < fiat_amount - if sync_mismatch: - orphan_fiat = fiat_amount - total_confirmed_deposits - logger.warning( - f"Sync mismatch detected: tracked balances ({total_confirmed_deposits:.2f} GTQ) " - f"< transaction ({fiat_amount} GTQ). Orphan amount: {orphan_fiat:.2f} GTQ" - ) - - # Calculate distribution amounts - distributions = {} - - if sync_mismatch: - # SYNC MISMATCH MODE: Cap each client's allocation to their remaining fiat balance - # Each client gets sats equivalent to their full remaining balance - for client_id, client_balance in client_balances.items(): - # Calculate sats equivalent to this client's remaining fiat balance - client_sats_amount = round(client_balance * exchange_rate) - proportion = client_balance / total_confirmed_deposits - - # Calculate equivalent fiat value in GTQ for tracking purposes - client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0 - - distributions[client_id] = { - "fiat_amount": client_fiat_amount, - "sats_amount": client_sats_amount, - "exchange_rate": exchange_rate - } - - logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)") - - # Calculate orphan sats (difference between base amount and distributed) - total_distributed = sum(dist["sats_amount"] for dist in distributions.values()) - orphan_sats = base_crypto_atoms - total_distributed - logger.info( - f"Sync mismatch distribution: {total_distributed} sats to clients, " - f"{orphan_sats} sats orphaned (staying in source wallet)" - ) - else: - # NORMAL MODE: Proportional distribution based on transaction amount - sat_allocations = calculate_distribution(base_crypto_atoms, client_balances) - - if not sat_allocations: - logger.info("No allocations calculated - skipping distribution") - return {}, 0 - - # Build final distributions dict with additional tracking fields - for client_id, client_sats_amount in sat_allocations.items(): - # Calculate proportion for logging - proportion = client_balances[client_id] / total_confirmed_deposits - - # Calculate equivalent fiat value in GTQ for tracking purposes - client_fiat_amount = round(client_sats_amount / exchange_rate, 2) if exchange_rate > 0 else 0.0 - - distributions[client_id] = { - "fiat_amount": client_fiat_amount, - "sats_amount": client_sats_amount, - "exchange_rate": exchange_rate - } - - logger.info(f"Client {client_id[:8]}... gets {client_sats_amount} sats (≈{client_fiat_amount:.2f} GTQ, {proportion:.2%} share)") - - # Verification: ensure total distribution equals base amount - total_distributed = sum(dist["sats_amount"] for dist in distributions.values()) - if total_distributed != base_crypto_atoms: - logger.error(f"Distribution mismatch! Expected: {base_crypto_atoms} sats, Distributed: {total_distributed} sats") - raise ValueError(f"Satoshi distribution calculation error: {base_crypto_atoms} != {total_distributed}") - orphan_sats = 0 - - # Safety check: Re-verify all clients still have positive balances before finalizing distributions - # This prevents race conditions where balances changed during calculation - final_distributions = {} - for client_id, distribution in distributions.items(): - # Re-check current balance (without temporal filtering to get most recent state) - current_balance = await get_client_balance_summary(client_id) - if current_balance.remaining_balance > 0: - final_distributions[client_id] = distribution - logger.info(f"Client {client_id[:8]}... final balance check: {current_balance.remaining_balance:.2f} GTQ - APPROVED for {distribution['sats_amount']} sats") - else: - logger.warning(f"Client {client_id[:8]}... final balance check: {current_balance.remaining_balance:.2f} GTQ - REJECTED (negative balance)") - - if len(final_distributions) != len(distributions): - logger.warning(f"Rejected {len(distributions) - len(final_distributions)} clients due to negative balances during final check") - - # Recalculate proportions if some clients were rejected - if len(final_distributions) == 0: - logger.info("All clients rejected due to negative balances - no distributions") - return {}, orphan_sats - - # For simplicity, we'll still return the original distributions but log the warning - # In a production system, you might want to recalculate the entire distribution - logger.warning("Proceeding with original distribution despite balance warnings - manual review recommended") - - logger.info(f"Distribution verified: {total_distributed} sats distributed across {len(distributions)} clients (clients with positive allocations only)") - return distributions, orphan_sats - - except Exception as e: - logger.error(f"Error calculating distribution amounts: {e}") - return {}, 0 - - async def distribute_to_clients(self, transaction: Dict[str, Any], distributions: Dict[str, Dict[str, int]]) -> None: - """Send Bitcoin payments to DCA clients""" - try: - transaction_id = transaction["transaction_id"] - transaction_time = transaction.get("transaction_time") - - # Normalize transaction_time to UTC if present - if transaction_time is not None: - if transaction_time.tzinfo is None: - transaction_time = transaction_time.replace(tzinfo=timezone.utc) - elif transaction_time.tzinfo != timezone.utc: - transaction_time = transaction_time.astimezone(timezone.utc) - - for client_id, distribution in distributions.items(): - try: - # Get client info - flow_clients = await get_flow_mode_clients() - client = next((c for c in flow_clients if c.id == client_id), None) - - if not client: - logger.error(f"Client {client_id} not found") - continue - - # Final safety check: Verify client still has positive balance before payment - current_balance = await get_client_balance_summary(client_id) - if current_balance.remaining_balance <= 0: - logger.error(f"CRITICAL: Client {client_id[:8]}... has negative balance ({current_balance.remaining_balance:.2f} GTQ) - REFUSING payment of {distribution['sats_amount']} sats") - continue - - # Verify balance is sufficient for this distribution (round to 2 decimal places to match DECIMAL(10,2) precision) - fiat_equivalent = distribution["fiat_amount"] # Amount in GTQ - # Round both values to 2 decimal places to match database precision and avoid floating point comparison issues - balance_rounded = round(current_balance.remaining_balance, 2) - amount_rounded = round(fiat_equivalent, 2) - if balance_rounded < amount_rounded: - logger.error(f"CRITICAL: Client {client_id[:8]}... insufficient balance ({balance_rounded:.2f} < {amount_rounded:.2f} GTQ) - REFUSING payment") - continue - - logger.info(f"Client {client_id[:8]}... pre-payment balance check: {current_balance.remaining_balance:.2f} GTQ - SUFFICIENT for {fiat_equivalent:.2f} GTQ payment") - - # Create DCA payment record - payment_data = CreateDcaPaymentData( - client_id=client_id, - amount_sats=distribution["sats_amount"], - amount_fiat=distribution["fiat_amount"], # Amount in GTQ - exchange_rate=distribution["exchange_rate"], - transaction_type="flow", - lamassu_transaction_id=transaction_id, - transaction_time=transaction_time # Normalized UTC timestamp - ) - - # Record the payment in our database - dca_payment = await create_dca_payment(payment_data) - - # Send Bitcoin to client's wallet - success = await self.send_dca_payment(client, distribution, transaction_id) - if success: - # Update payment status to confirmed after successful payment - await self.update_payment_status(dca_payment.id, "confirmed") - logger.info(f"DCA payment sent to client {client_id[:8]}...: {distribution['sats_amount']} sats") - else: - # Update payment status to failed if payment failed - await self.update_payment_status(dca_payment.id, "failed") - logger.error(f"Failed to send DCA payment to client {client_id[:8]}...") - - except Exception as e: - logger.error(f"Error processing distribution for client {client_id}: {e}") - continue - - except Exception as e: - logger.error(f"Error distributing to clients: {e}") - - async def send_dca_payment(self, client: DcaClient, distribution: Dict[str, Any], lamassu_transaction_id: str) -> bool: - """Send Bitcoin payment to a DCA client's wallet""" - try: - # For now, we only support wallet_id payments (internal LNBits transfers) - target_wallet_id = client.wallet_id - amount_sats = distribution["sats_amount"] - amount_msat = amount_sats * 1000 # Convert sats to millisats - - # Validate the target wallet exists - target_wallet = await get_wallet(target_wallet_id) - if not target_wallet: - logger.error(f"Target wallet {target_wallet_id} not found for client {client.username or client.user_id}") - return False - - # Create descriptive memo with DCA metrics - fiat_amount_gtq = distribution.get("fiat_amount", 0.0) - exchange_rate = distribution.get("exchange_rate", 0) - - # Calculate cost basis (fiat per BTC) - if exchange_rate > 0: - # exchange_rate is sats per fiat unit, so convert to fiat per BTC - cost_basis_per_btc = 100_000_000 / exchange_rate # 100M sats = 1 BTC - memo = f"DCA: {amount_sats:,} sats • {fiat_amount_gtq:.2f} GTQ • Cost basis: {cost_basis_per_btc:,.2f} GTQ/BTC" - else: - memo = f"DCA: {amount_sats:,} sats • {fiat_amount_gtq:.2f} GTQ" - - # Create invoice in target wallet - extra={ - "tag": "dca_distribution", - "client_id": client.id, - "lamassu_transaction_id": lamassu_transaction_id, - "distribution_amount": amount_sats - } - new_payment = await create_invoice( - wallet_id=target_wallet.id, - amount=float(amount_sats), # LNBits create_invoice expects float - internal=True, # Internal transfer within LNBits - memo=memo, - extra=extra - ) - - if not new_payment: - logger.error(f"Failed to create invoice for client {client.username or client.user_id}") - return False - - # Pay the invoice from the DCA admin wallet (this extension's wallet) - # Get the admin wallet that manages DCA funds - admin_config = await get_active_lamassu_config() - if not admin_config: - logger.error("No active Lamassu config found - cannot determine source wallet") - return False - - if not admin_config.source_wallet_id: - logger.warning("DCA source wallet not configured - payment creation successful but not sent") - logger.info(f"Created invoice for {amount_sats} sats to client {client.username or client.user_id}") - logger.info(f"Invoice: {new_payment.bolt11}") - return True - - # Pay the invoice from the configured source wallet - try: - await pay_invoice( - payment_request=new_payment.bolt11, - wallet_id=admin_config.source_wallet_id, - description=memo, - extra=extra - ) - logger.info(f"DCA payment completed: {amount_sats} sats sent to {client.username or client.user_id}") - return True - except Exception as e: - logger.error(f"Failed to pay invoice for client {client.username or client.user_id}: {e}") - return False - - except Exception as e: - logger.error(f"Error sending DCA payment to client {client.username or client.user_id}: {e}") - return False - - async def credit_source_wallet(self, transaction: Dict[str, Any]) -> bool: - """Credit the source wallet with the full crypto_atoms amount from Lamassu transaction""" - try: - # Get the configuration to find source wallet - admin_config = await get_active_lamassu_config() - if not admin_config or not admin_config.source_wallet_id: - logger.error("No source wallet configured - cannot credit wallet") - return False - - crypto_atoms = transaction["crypto_amount"] # Full amount including commission - transaction_id = transaction["transaction_id"] - - # Get the source wallet object - source_wallet = await get_wallet(admin_config.source_wallet_id) - if not source_wallet: - logger.error(f"Source wallet {admin_config.source_wallet_id} not found") - return False - - # Credit the source wallet with the full crypto_atoms amount - await update_wallet_balance( - wallet=source_wallet, - amount=crypto_atoms # Function expects sats, not millisats - ) - - logger.info(f"Credited source wallet with {crypto_atoms} sats from transaction {transaction_id}") - return True - - except Exception as e: - logger.error(f"Error crediting source wallet for transaction {transaction.get('transaction_id', 'unknown')}: {e}") - return False - - async def update_payment_status(self, payment_id: str, status: str) -> None: - """Update the status of a DCA payment""" - try: - await update_dca_payment_status(payment_id, status) - logger.info(f"Updated payment {payment_id[:8]}... status to {status}") - except Exception as e: - logger.error(f"Error updating payment status for {payment_id}: {e}") - - async def store_lamassu_transaction(self, transaction: Dict[str, Any]) -> Optional[str]: - """Store the Lamassu transaction in our database for audit and UI""" - try: - # Extract transaction data - guaranteed clean from data ingestion boundary - crypto_atoms = transaction.get("crypto_amount", 0) - fiat_amount = transaction.get("fiat_amount", 0) - commission_percentage = transaction.get("commission_percentage", 0.0) - discount = transaction.get("discount", 0.0) - transaction_time = transaction.get("transaction_time") - - # Normalize transaction_time to UTC if present - if transaction_time is not None: - if transaction_time.tzinfo is None: - transaction_time = transaction_time.replace(tzinfo=timezone.utc) - elif transaction_time.tzinfo != timezone.utc: - transaction_time = transaction_time.astimezone(timezone.utc) - - # Calculate commission metrics using the extracted pure function - base_crypto_atoms, commission_amount_sats, effective_commission = calculate_commission( - crypto_atoms, commission_percentage, discount - ) - - # Calculate exchange rate - exchange_rate = calculate_exchange_rate(base_crypto_atoms, fiat_amount) - - # Create transaction data with GTQ amounts - transaction_data = CreateLamassuTransactionData( - lamassu_transaction_id=transaction["transaction_id"], - fiat_amount=round(fiat_amount, 2), # Store GTQ with 2 decimal places - crypto_amount=crypto_atoms, - commission_percentage=commission_percentage, - discount=discount, - effective_commission=effective_commission, - commission_amount_sats=commission_amount_sats, - base_amount_sats=base_crypto_atoms, - exchange_rate=exchange_rate, - crypto_code=transaction.get("crypto_code", "BTC"), - fiat_code=transaction.get("fiat_code", "GTQ"), - device_id=transaction.get("device_id"), - transaction_time=transaction_time # Normalized UTC timestamp - ) - - # Store in database - stored_transaction = await create_lamassu_transaction(transaction_data) - logger.info(f"Stored Lamassu transaction {transaction['transaction_id']} in database") - return stored_transaction.id - - except Exception as e: - logger.error(f"Error storing Lamassu transaction {transaction.get('transaction_id', 'unknown')}: {e}") - return None - - async def send_commission_payment(self, transaction: Dict[str, Any], commission_amount_sats: int) -> bool: - """Send commission to the configured commission wallet""" - try: - # Get the configuration to find commission wallet - admin_config = await get_active_lamassu_config() - if not admin_config or not admin_config.commission_wallet_id: - logger.info("No commission wallet configured - commission remains in source wallet") - return True # Not an error, just no transfer needed - - if not admin_config.source_wallet_id: - logger.error("No source wallet configured - cannot send commission") - return False - - transaction_id = transaction["transaction_id"] - - # Create invoice in commission wallet with DCA metrics - fiat_amount = transaction.get("fiat_amount", 0) - commission_percentage = transaction.get("commission_percentage", 0) * 100 # Convert to percentage - discount = transaction.get("discount", 0.0) # Discount percentage - - # Calculate effective commission for display - if commission_percentage > 0: - effective_commission_percentage = commission_percentage * (100 - discount) / 100 - else: - effective_commission_percentage = 0.0 - - # Create detailed memo showing discount if applied - if discount > 0: - commission_memo = f"DCA Commission: {commission_amount_sats:,} sats • {commission_percentage:.1f}% - {discount:.1f}% discount = {effective_commission_percentage:.1f}% effective • {fiat_amount:,} GTQ transaction" - else: - commission_memo = f"DCA Commission: {commission_amount_sats:,} sats • {commission_percentage:.1f}% • {fiat_amount:,} GTQ transaction" - - commission_payment = await create_invoice( - wallet_id=admin_config.commission_wallet_id, - amount=float(commission_amount_sats), # LNbits create_invoice expects float - internal=True, - memo=commission_memo, - extra={ - "tag": "dca_commission", - "lamassu_transaction_id": transaction_id, - "commission_amount": commission_amount_sats - } - ) - - if not commission_payment: - logger.error(f"Failed to create commission invoice for transaction {transaction_id}") - return False - - # Pay the commission invoice from source wallet - await pay_invoice( - payment_request=commission_payment.bolt11, - wallet_id=admin_config.source_wallet_id, - description=commission_memo, - extra={ - "tag": "dca_commission_payment", - "lamassu_transaction_id": transaction_id - } - ) - - logger.info(f"Commission payment completed: {commission_amount_sats} sats sent to commission wallet for transaction {transaction_id}") - return True - - except Exception as e: - logger.error(f"Error sending commission payment for transaction {transaction.get('transaction_id', 'unknown')}: {e}") - return False - - async def process_transaction(self, transaction: Dict[str, Any]) -> None: - """Process a single transaction - calculate and distribute DCA payments""" - try: - transaction_id = transaction["transaction_id"] - - # Check if transaction already processed - existing_payments = await get_payments_by_lamassu_transaction(transaction_id) - if existing_payments: - logger.info(f"Transaction {transaction_id} already processed - skipping") - return - - logger.info(f"Processing new transaction: {transaction_id}") - - # First, credit the source wallet with the full transaction amount - credit_success = await self.credit_source_wallet(transaction) - if not credit_success: - logger.error(f"Failed to credit source wallet for transaction {transaction_id} - skipping distribution") - return - - # Store the transaction in our database for audit and UI - stored_transaction = await self.store_lamassu_transaction(transaction) - - # Calculate distribution amounts - distributions, orphan_sats = await self.calculate_distribution_amounts(transaction) - - if not distributions: - if orphan_sats > 0: - logger.warning( - f"No client distributions for transaction {transaction_id}, " - f"but {orphan_sats} orphan sats remain in source wallet" - ) - else: - logger.info(f"No distributions calculated for transaction {transaction_id}") - return - - # Calculate commission amount for sending to commission wallet - crypto_atoms = transaction.get("crypto_amount", 0) - commission_percentage = transaction.get("commission_percentage", 0.0) - discount = transaction.get("discount", 0.0) - - # Calculate commission amount using the extracted pure function - _, commission_amount_sats, _ = calculate_commission( - crypto_atoms, commission_percentage, discount - ) - - # Distribute to clients - await self.distribute_to_clients(transaction, distributions) - - # Send commission to commission wallet (if configured) - if commission_amount_sats > 0: - await self.send_commission_payment(transaction, commission_amount_sats) - - # Update distribution statistics in stored transaction - if stored_transaction: - clients_count = len(distributions) - distributions_total_sats = sum(dist["sats_amount"] for dist in distributions.values()) - await update_lamassu_transaction_distribution_stats( - stored_transaction, - clients_count, - distributions_total_sats - ) - - logger.info(f"Successfully processed transaction {transaction_id}") - - except Exception as e: - logger.error(f"Error processing transaction {transaction.get('transaction_id', 'unknown')}: {e}") - - async def poll_and_process(self) -> None: - """Main polling function - checks for new transactions and processes them""" - config_id = None - try: - logger.info("Starting Lamassu transaction polling...") - - # Get database configuration - db_config = await self.connect_to_lamassu_db() - if not db_config: - logger.error("Could not get Lamassu database configuration - skipping this poll") - return - - config_id = db_config["config_id"] - - # Record poll start time - await update_poll_start_time(config_id) - logger.info("Poll start time recorded") - - # Fetch new transactions via SSH - new_transactions = await self.fetch_new_transactions(db_config) - - # Process each transaction - transactions_processed = 0 - for transaction in new_transactions: - await self.process_transaction(transaction) - transactions_processed += 1 - - # Record successful poll completion - await update_poll_success_time(config_id) - logger.info(f"Completed processing {transactions_processed} transactions. Poll success time recorded.") - - except Exception as e: - logger.error(f"Error in polling cycle: {e}") - # Don't update success time on error, but poll start time remains as attempted - - -# Global processor instance -transaction_processor = LamassuTransactionProcessor() - - -async def poll_lamassu_transactions() -> None: - """Entry point for the polling task""" - await transaction_processor.poll_and_process() diff --git a/views_api.py b/views_api.py index d60fe15..b1c9326 100644 --- a/views_api.py +++ b/views_api.py @@ -733,21 +733,3 @@ async def api_update_super_config( return config -# ============================================================================= -# Catch-all stub for endpoints not yet implemented (clients, deposits, -# commission splits, partial-tx, balance-settle, super-config write). Each -# lands in a follow-up commit. The catch-all comes LAST so specific routes -# above take precedence. -# ============================================================================= - - -@satmachineadmin_api_router.api_route( - "/api/v1/dca/{full_path:path}", - methods=["GET", "POST", "PUT", "DELETE", "PATCH"], -) -async def v2_in_progress_stub(full_path: str) -> None: - raise HTTPException( - HTTPStatus.SERVICE_UNAVAILABLE, - f"satmachineadmin v2: /api/v1/dca/{full_path} not yet implemented " - "(landing in P2+). See aiolabs/satmachineadmin#9.", - )