diff --git a/crud.py b/crud.py index cd610b1..0976e72 100644 --- a/crud.py +++ b/crud.py @@ -424,26 +424,6 @@ async def get_user_wallet_settings(user_id: str) -> Optional[UserWalletSettings] ) -async def get_user_wallet_settings_by_prefix( - user_id_prefix: str, -) -> Optional[StoredUserWalletSettings]: - """ - Get user wallet settings by user ID prefix (for truncated 8-char IDs from Beancount). - - Beancount accounts use truncated user IDs (first 8 chars), but the database - stores full UUIDs. This function looks up by prefix to bridge the gap. - """ - return await db.fetchone( - """ - SELECT * FROM user_wallet_settings - WHERE id LIKE :prefix || '%' - LIMIT 1 - """, - {"prefix": user_id_prefix}, - StoredUserWalletSettings, - ) - - async def update_user_wallet_settings( user_id: str, data: UserWalletSettings ) -> UserWalletSettings: diff --git a/fava_client.py b/fava_client.py index 6880d11..38e932a 100644 --- a/fava_client.py +++ b/fava_client.py @@ -17,7 +17,6 @@ Fava provides a REST API for: See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py """ -import asyncio import httpx from typing import Any, Dict, List, Optional from decimal import Decimal @@ -25,11 +24,6 @@ from datetime import date, datetime from loguru import logger -class ChecksumConflictError(Exception): - """Raised when a Fava write operation fails due to stale checksum (concurrent modification).""" - pass - - class FavaClient: """ Async client for Fava REST API. @@ -54,37 +48,6 @@ class FavaClient: self.base_url = f"{self.fava_url}/{self.ledger_slug}/api" self.timeout = timeout - # Concurrency control: Global write lock to serialize all ledger modifications. - # This prevents race conditions when multiple requests try to write to the - # Beancount ledger file simultaneously. Without this lock, concurrent writes - # can cause data loss, duplicate entries, or file corruption. - # - # Note: This serializes ALL writes which may become a bottleneck at scale. - # For higher throughput, consider per-user locking or distributed locking. - self._write_lock = asyncio.Lock() - - # Per-user locks for user-specific operations (reduces contention) - self._user_locks: Dict[str, asyncio.Lock] = {} - - def get_user_lock(self, user_id: str) -> asyncio.Lock: - """ - Get or create a lock for a specific user. - - This enables per-user locking to reduce contention when multiple users - are making concurrent requests. User-specific operations should acquire - this lock in addition to (or instead of) the global write lock. - - Args: - user_id: User ID (uses first 8 characters for consistency) - - Returns: - asyncio.Lock for this user - """ - user_key = user_id[:8] - if user_key not in self._user_locks: - self._user_locks[user_key] = asyncio.Lock() - return self._user_locks[user_key] - async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]: """ Submit a new journal entry to Fava. @@ -125,100 +88,26 @@ class FavaClient: "meta": {"user_id": "abc123"} } result = await fava_client.add_entry(entry) - - Note: - This method acquires a global write lock to prevent concurrent - modifications to the ledger file. All writes are serialized. """ - # Acquire global write lock to serialize ledger modifications - async with self._write_lock: - try: - async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.put( - f"{self.base_url}/add_entries", - json={"entries": [entry]}, - headers={"Content-Type": "application/json"} - ) - response.raise_for_status() - result = response.json() - - logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}") - return result - - except httpx.HTTPStatusError as e: - logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}") - raise - except httpx.RequestError as e: - logger.error(f"Fava connection error: {e}") - raise - - async def add_entry_idempotent( - self, - entry: Dict[str, Any], - idempotency_key: str - ) -> Dict[str, Any]: - """ - Submit a journal entry with idempotency protection. - - This method checks if an entry with the given idempotency key (as a Beancount link) - already exists before inserting. This prevents duplicate entries when the same - operation is retried (e.g., due to network issues or concurrent requests). - - The idempotency key is stored as a Beancount link on the entry. Links are part - of the entry's identity and are indexed by Beancount, making lookup efficient. - - Args: - entry: Beancount entry dict (same format as add_entry) - idempotency_key: Unique key for this operation (e.g., "castle-{uuid}" or "ln-{payment_hash}") - - Returns: - Response from Fava if entry was created, or existing entry data if already exists - - Example: - # Use payment hash as idempotency key for Lightning payments - result = await fava.add_entry_idempotent( - entry=settlement_entry, - idempotency_key=f"ln-{payment_hash[:16]}" - ) - - # Use expense ID for expense entries - result = await fava.add_entry_idempotent( - entry=expense_entry, - idempotency_key=f"exp-{expense_id}" - ) - """ - from .beancount_format import sanitize_link - - # Sanitize the idempotency key to ensure it's a valid Beancount link - safe_key = sanitize_link(idempotency_key) - - # Check if entry with this link already exists try: - entries = await self.get_journal_entries(days=30) # Check recent entries + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.put( + f"{self.base_url}/add_entries", + json={"entries": [entry]}, + headers={"Content-Type": "application/json"} + ) + response.raise_for_status() + result = response.json() - for existing_entry in entries: - existing_links = existing_entry.get("links", []) - if safe_key in existing_links: - logger.info(f"Entry with idempotency key '{safe_key}' already exists, skipping insert") - return { - "data": "Entry already exists (idempotent)", - "existing": True, - "entry": existing_entry - } - except Exception as e: - logger.warning(f"Could not check for existing entry with key '{safe_key}': {e}") - # Continue anyway - Beancount will error if there's a true duplicate + logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}") + return result - # Add the idempotency key as a link if not already present - if "links" not in entry: - entry["links"] = [] - if safe_key not in entry["links"]: - entry["links"].append(safe_key) - - # Now add the entry (this will acquire the write lock) - result = await self.add_entry(entry) - result["existing"] = False - return result + except httpx.HTTPStatusError as e: + logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise async def get_account_balance(self, account_name: str) -> Dict[str, Any]: """ @@ -1257,10 +1146,6 @@ class FavaClient: Returns: New sha256sum after update - Note: - This method acquires a global write lock to prevent concurrent - modifications to the ledger file. All writes are serialized. - Example: # Get context context = await fava.get_entry_context("abc123") @@ -1273,28 +1158,26 @@ class FavaClient: # Update new_sha256 = await fava.update_entry_source("abc123", new_source, sha256) """ - # Acquire global write lock to serialize ledger modifications - async with self._write_lock: - try: - async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.put( - f"{self.base_url}/source_slice", - json={ - "entry_hash": entry_hash, - "source": new_source, - "sha256sum": sha256sum - } - ) - response.raise_for_status() - result = response.json() - return result.get("data", "") + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.put( + f"{self.base_url}/source_slice", + json={ + "entry_hash": entry_hash, + "source": new_source, + "sha256sum": sha256sum + } + ) + response.raise_for_status() + result = response.json() + return result.get("data", "") - except httpx.HTTPStatusError as e: - logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}") - raise - except httpx.RequestError as e: - logger.error(f"Fava connection error: {e}") - raise + except httpx.HTTPStatusError as e: + logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise async def delete_entry(self, entry_hash: str, sha256sum: str) -> str: """ @@ -1307,43 +1190,36 @@ class FavaClient: Returns: Success message - Note: - This method acquires a global write lock to prevent concurrent - modifications to the ledger file. All writes are serialized. - Example: context = await fava.get_entry_context("abc123") await fava.delete_entry("abc123", context["sha256sum"]) """ - # Acquire global write lock to serialize ledger modifications - async with self._write_lock: - try: - async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.delete( - f"{self.base_url}/source_slice", - params={ - "entry_hash": entry_hash, - "sha256sum": sha256sum - } - ) - response.raise_for_status() - result = response.json() - return result.get("data", "") + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.delete( + f"{self.base_url}/source_slice", + params={ + "entry_hash": entry_hash, + "sha256sum": sha256sum + } + ) + response.raise_for_status() + result = response.json() + return result.get("data", "") - except httpx.HTTPStatusError as e: - logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}") - raise - except httpx.RequestError as e: - logger.error(f"Fava connection error: {e}") - raise + except httpx.HTTPStatusError as e: + logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise async def add_account( self, account_name: str, currencies: list[str], opening_date: Optional[date] = None, - metadata: Optional[Dict[str, Any]] = None, - max_retries: int = 3 + metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Add an account to the Beancount ledger via an Open directive. @@ -1351,25 +1227,15 @@ class FavaClient: NOTE: Fava's /api/add_entries endpoint does NOT support Open directives. This method uses /api/source to directly edit the Beancount file. - This method implements optimistic concurrency control with retry logic: - - Acquires a global write lock before modifying the ledger - - Uses SHA256 checksum to detect concurrent modifications - - Retries with exponential backoff on checksum conflicts - - Re-checks if account was created by concurrent request before retrying - Args: account_name: Full account name (e.g., "Assets:Receivable:User-abc123") currencies: List of currencies for this account (e.g., ["EUR", "SATS"]) opening_date: Date to open the account (defaults to today) metadata: Optional metadata for the account - max_retries: Maximum number of retry attempts on checksum conflict (default: 3) Returns: Response from Fava ({"data": "new_sha256sum", "mtime": "..."}) - Raises: - ChecksumConflictError: If all retry attempts fail due to concurrent modifications - Example: # Add a user's receivable account result = await fava.add_account( @@ -1389,115 +1255,89 @@ class FavaClient: if opening_date is None: opening_date = date_type.today() - last_error = None + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + # Step 1: Get the main Beancount file path from Fava + options_response = await client.get(f"{self.base_url}/options") + options_response.raise_for_status() + options_data = options_response.json()["data"] + file_path = options_data["beancount_options"]["filename"] - for attempt in range(max_retries): - # Acquire global write lock to serialize ledger modifications - async with self._write_lock: - try: - async with httpx.AsyncClient(timeout=self.timeout) as client: - # Step 1: Get the main Beancount file path from Fava - options_response = await client.get(f"{self.base_url}/options") - options_response.raise_for_status() - options_data = options_response.json()["data"] - file_path = options_data["beancount_options"]["filename"] + logger.debug(f"Fava main file: {file_path}") - logger.debug(f"Fava main file: {file_path}") + # Step 2: Get current source file + response = await client.get( + f"{self.base_url}/source", + params={"filename": file_path} + ) + response.raise_for_status() + source_data = response.json()["data"] - # Step 2: Get current source file (fresh read on each attempt) - response = await client.get( - f"{self.base_url}/source", - params={"filename": file_path} - ) - response.raise_for_status() - source_data = response.json()["data"] + sha256sum = source_data["sha256sum"] + source = source_data["source"] - sha256sum = source_data["sha256sum"] - source = source_data["source"] + # Step 2: Check if account already exists + if f"open {account_name}" in source: + logger.info(f"Account {account_name} already exists in Beancount file") + return {"data": sha256sum, "mtime": source_data.get("mtime", "")} - # Step 3: Check if account already exists (may have been created by concurrent request) - if f"open {account_name}" in source: - logger.info(f"Account {account_name} already exists in Beancount file") - return {"data": sha256sum, "mtime": source_data.get("mtime", "")} + # Step 3: Find insertion point (after last Open directive AND its metadata) + lines = source.split('\n') + insert_index = 0 + for i, line in enumerate(lines): + if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line: + # Found an Open directive, now skip over any metadata lines + insert_index = i + 1 + # Skip metadata lines (lines starting with whitespace) + while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip(): + insert_index += 1 - # Step 4: Find insertion point (after last Open directive AND its metadata) - lines = source.split('\n') - insert_index = 0 - for i, line in enumerate(lines): - if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line: - # Found an Open directive, now skip over any metadata lines - insert_index = i + 1 - # Skip metadata lines (lines starting with whitespace) - while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip(): - insert_index += 1 + # Step 4: Format Open directive as Beancount text + currencies_str = ", ".join(currencies) + open_lines = [ + "", + f"{opening_date.isoformat()} open {account_name} {currencies_str}" + ] - # Step 5: Format Open directive as Beancount text - currencies_str = ", ".join(currencies) - open_lines = [ - "", - f"{opening_date.isoformat()} open {account_name} {currencies_str}" - ] + # Add metadata if provided + if metadata: + for key, value in metadata.items(): + # Format metadata with proper indentation + if isinstance(value, str): + open_lines.append(f' {key}: "{value}"') + else: + open_lines.append(f' {key}: {value}') - # Add metadata if provided - if metadata: - for key, value in metadata.items(): - # Format metadata with proper indentation - if isinstance(value, str): - open_lines.append(f' {key}: "{value}"') - else: - open_lines.append(f' {key}: {value}') + # Step 5: Insert into source + for i, line in enumerate(open_lines): + lines.insert(insert_index + i, line) - # Step 6: Insert into source - for i, line in enumerate(open_lines): - lines.insert(insert_index + i, line) + new_source = '\n'.join(lines) - new_source = '\n'.join(lines) + # Step 6: Update source file via PUT /api/source + update_payload = { + "file_path": file_path, + "source": new_source, + "sha256sum": sha256sum + } - # Step 7: Update source file via PUT /api/source - update_payload = { - "file_path": file_path, - "source": new_source, - "sha256sum": sha256sum - } + response = await client.put( + f"{self.base_url}/source", + json=update_payload, + headers={"Content-Type": "application/json"} + ) + response.raise_for_status() + result = response.json() - response = await client.put( - f"{self.base_url}/source", - json=update_payload, - headers={"Content-Type": "application/json"} - ) - response.raise_for_status() - result = response.json() + logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}") + return result - logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}") - return result - - except httpx.HTTPStatusError as e: - # Check for checksum conflict (HTTP 412 Precondition Failed or similar) - if e.response.status_code in (409, 412): - last_error = ChecksumConflictError( - f"Checksum conflict on attempt {attempt + 1}/{max_retries}: {e.response.text}" - ) - logger.warning( - f"Checksum conflict adding account {account_name} " - f"(attempt {attempt + 1}/{max_retries}), retrying..." - ) - # Continue to retry logic below - else: - logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}") - raise - except httpx.RequestError as e: - logger.error(f"Fava connection error: {e}") - raise - - # If we get here due to checksum conflict, wait with exponential backoff before retry - if attempt < max_retries - 1: - backoff_time = 0.1 * (2 ** attempt) # 0.1s, 0.2s, 0.4s - logger.info(f"Waiting {backoff_time}s before retry...") - await asyncio.sleep(backoff_time) - - # All retries exhausted - logger.error(f"Failed to add account {account_name} after {max_retries} attempts due to concurrent modifications") - raise last_error or ChecksumConflictError(f"Failed to add account after {max_retries} attempts") + except httpx.HTTPStatusError as e: + logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Fava connection error: {e}") + raise async def get_unsettled_entries_bql( self, diff --git a/services.py b/services.py index 51c4bd8..1f9d826 100644 --- a/services.py +++ b/services.py @@ -18,29 +18,12 @@ async def get_settings(user_id: str) -> CastleSettings: async def update_settings(user_id: str, data: CastleSettings) -> CastleSettings: - from loguru import logger - - from .fava_client import init_fava_client - settings = await get_castle_settings(user_id) if not settings: settings = await create_castle_settings(user_id, data) else: settings = await update_castle_settings(user_id, data) - # Reinitialize Fava client with new settings - try: - init_fava_client( - fava_url=settings.fava_url, - ledger_slug=settings.fava_ledger_slug, - timeout=settings.fava_timeout, - ) - logger.info( - f"Fava client reinitialized: {settings.fava_url}/{settings.fava_ledger_slug}" - ) - except Exception as e: - logger.error(f"Failed to reinitialize Fava client: {e}") - return settings diff --git a/static/js/index.js b/static/js/index.js index bd52e39..3fc736e 100644 --- a/static/js/index.js +++ b/static/js/index.js @@ -31,7 +31,6 @@ window.app = Vue.createApp({ userInfo: null, // User information including equity eligibility isAdmin: false, isSuperUser: false, - settingsLoaded: false, // Flag to prevent race conditions on toolbar buttons castleWalletConfigured: false, userWalletConfigured: false, currentExchangeRate: null, // BTC/EUR rate (sats per EUR) @@ -58,9 +57,6 @@ window.app = Vue.createApp({ settingsDialog: { show: false, castleWalletId: '', - favaUrl: 'http://localhost:3333', - favaLedgerSlug: 'castle-ledger', - favaTimeout: 10.0, loading: false }, userWalletDialog: { @@ -521,9 +517,6 @@ window.app = Vue.createApp({ } catch (error) { // Settings not available this.castleWalletConfigured = false - } finally { - // Mark settings as loaded to enable toolbar buttons - this.settingsLoaded = true } }, async loadUserWallet() { @@ -541,9 +534,6 @@ window.app = Vue.createApp({ }, showSettingsDialog() { this.settingsDialog.castleWalletId = this.settings?.castle_wallet_id || '' - this.settingsDialog.favaUrl = this.settings?.fava_url || 'http://localhost:3333' - this.settingsDialog.favaLedgerSlug = this.settings?.fava_ledger_slug || 'castle-ledger' - this.settingsDialog.favaTimeout = this.settings?.fava_timeout || 10.0 this.settingsDialog.show = true }, showUserWalletDialog() { @@ -559,14 +549,6 @@ window.app = Vue.createApp({ return } - if (!this.settingsDialog.favaUrl) { - this.$q.notify({ - type: 'warning', - message: 'Fava URL is required' - }) - return - } - this.settingsDialog.loading = true try { await LNbits.api.request( @@ -574,10 +556,7 @@ window.app = Vue.createApp({ '/castle/api/v1/settings', this.g.user.wallets[0].adminkey, { - castle_wallet_id: this.settingsDialog.castleWalletId, - fava_url: this.settingsDialog.favaUrl, - fava_ledger_slug: this.settingsDialog.favaLedgerSlug || 'castle-ledger', - fava_timeout: parseFloat(this.settingsDialog.favaTimeout) || 10.0 + castle_wallet_id: this.settingsDialog.castleWalletId } ) this.$q.notify({ @@ -1424,7 +1403,7 @@ window.app = Vue.createApp({ maxAmount: maxAmountSats, // Positive sats amount castle owes maxAmountFiat: maxAmountFiat, // EUR or other fiat amount (positive) fiatCurrency: fiatCurrency, - amount: maxAmountSats, // Default to sats since lightning is the default payment method + amount: fiatCurrency ? maxAmountFiat : maxAmountSats, // Default to fiat if available payment_method: 'lightning', // Default to lightning for paying description: '', reference: '', @@ -1456,9 +1435,8 @@ window.app = Vue.createApp({ memo: `Payment from Castle to ${this.payUserDialog.username}` } ) - console.log(invoiceResponse) - const paymentRequest = invoiceResponse.data.bolt11 + const paymentRequest = invoiceResponse.data.payment_request // Pay the invoice from Castle's wallet const paymentResponse = await LNbits.api.request( diff --git a/static/js/permissions.js b/static/js/permissions.js index 4cc54f0..0de3569 100644 --- a/static/js/permissions.js +++ b/static/js/permissions.js @@ -1118,3 +1118,5 @@ window.app = Vue.createApp({ } } }) + +window.app.mount('#vue') diff --git a/tasks.py b/tasks.py index 8ec83b9..b44c883 100644 --- a/tasks.py +++ b/tasks.py @@ -187,12 +187,6 @@ async def on_invoice_paid(payment: Payment) -> None: This function is called automatically when any invoice on the Castle wallet is paid. It checks if the invoice is a Castle payment and records it in Beancount via Fava. - - Concurrency Protection: - - Uses per-user locking to prevent race conditions when multiple payments - for the same user are processed simultaneously - - Uses idempotent entry creation to prevent duplicate entries even if - the same payment is processed multiple times """ # Only process Castle-specific payments if not payment.extra or payment.extra.get("tag") != "castle": @@ -203,120 +197,134 @@ async def on_invoice_paid(payment: Payment) -> None: logger.warning(f"Castle invoice {payment.payment_hash} missing user_id in metadata") return + # Check if payment already recorded (idempotency) + # Query Fava for existing entry with this payment hash link from .fava_client import get_fava_client + import httpx fava = get_fava_client() - # Use idempotency key based on payment hash - this ensures duplicate - # processing of the same payment won't create duplicate entries - idempotency_key = f"ln-{payment.payment_hash[:16]}" + try: + # Check if payment already recorded by fetching recent entries + # Note: We can't use BQL query with `links ~ 'pattern'` because links is a set type + # and BQL doesn't support regex matching on sets. Instead, fetch entries and filter in Python. + link_to_find = f"ln-{payment.payment_hash[:16]}" - # Acquire per-user lock to serialize processing for this user - # This prevents race conditions when a user has multiple payments being processed - user_lock = fava.get_user_lock(user_id) + async with httpx.AsyncClient(timeout=5.0) as client: + # Get recent entries from Fava's journal endpoint + response = await client.get( + f"{fava.base_url}/api/journal", + params={"time": ""} # Get all entries + ) - async with user_lock: - logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]} to Fava") + if response.status_code == 200: + data = response.json() + entries = data.get('entries', []) + # Check if any entry has our payment link + for entry in entries: + entry_links = entry.get('links', []) + if link_to_find in entry_links: + logger.info(f"Payment {payment.payment_hash} already recorded in Fava, skipping") + return + + except Exception as e: + logger.warning(f"Could not check Fava for duplicate payment: {e}") + # Continue anyway - Fava/Beancount will catch duplicate if it exists + + logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]} to Fava") + + try: + from decimal import Decimal + from .crud import get_account_by_name, get_or_create_user_account + from .models import AccountType + from .beancount_format import format_net_settlement_entry + + # Convert amount from millisatoshis to satoshis + amount_sats = payment.amount // 1000 + + # Extract fiat metadata from invoice (if present) + fiat_currency = None + fiat_amount = None + if payment.extra: + fiat_currency = payment.extra.get("fiat_currency") + fiat_amount_str = payment.extra.get("fiat_amount") + if fiat_amount_str: + fiat_amount = Decimal(str(fiat_amount_str)) + + if not fiat_currency or not fiat_amount: + logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata") + return + + # Get user's current balance to determine receivables and payables + balance = await fava.get_user_balance(user_id) + fiat_balances = balance.get("fiat_balances", {}) + total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0)) + + # Determine receivables and payables based on balance + # Positive balance = user owes castle (receivable) + # Negative balance = castle owes user (payable) + if total_fiat_balance > 0: + # User owes castle + total_receivable = total_fiat_balance + total_payable = Decimal(0) + else: + # Castle owes user + total_receivable = Decimal(0) + total_payable = abs(total_fiat_balance) + + logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})") + + # Get account names + user_receivable = await get_or_create_user_account( + user_id, AccountType.ASSET, "Accounts Receivable" + ) + user_payable = await get_or_create_user_account( + user_id, AccountType.LIABILITY, "Accounts Payable" + ) + lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning") + if not lightning_account: + logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found") + return + + # Query for unsettled entries to link this settlement back to them + # Net settlement can settle both expenses and receivables + settled_links = [] try: - from decimal import Decimal - from .crud import get_account_by_name, get_or_create_user_account - from .models import AccountType - from .beancount_format import format_net_settlement_entry - - # Convert amount from millisatoshis to satoshis - amount_sats = payment.amount // 1000 - - # Extract fiat metadata from invoice (if present) - fiat_currency = None - fiat_amount = None - if payment.extra: - fiat_currency = payment.extra.get("fiat_currency") - fiat_amount_str = payment.extra.get("fiat_amount") - if fiat_amount_str: - fiat_amount = Decimal(str(fiat_amount_str)) - - if not fiat_currency or not fiat_amount: - logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata") - return - - # Get user's current balance to determine receivables and payables - balance = await fava.get_user_balance(user_id) - fiat_balances = balance.get("fiat_balances", {}) - total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0)) - - # Determine receivables and payables based on balance - # Positive balance = user owes castle (receivable) - # Negative balance = castle owes user (payable) - if total_fiat_balance > 0: - # User owes castle - total_receivable = total_fiat_balance - total_payable = Decimal(0) - else: - # Castle owes user - total_receivable = Decimal(0) - total_payable = abs(total_fiat_balance) - - logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})") - - # Get account names - user_receivable = await get_or_create_user_account( - user_id, AccountType.ASSET, "Accounts Receivable" - ) - user_payable = await get_or_create_user_account( - user_id, AccountType.LIABILITY, "Accounts Payable" - ) - lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning") - if not lightning_account: - logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found") - return - - # Query for unsettled entries to link this settlement back to them - # Net settlement can settle both expenses and receivables - settled_links = [] - try: - unsettled_expenses = await fava.get_unsettled_entries_bql(user_id, "expense") - settled_links.extend([e["link"] for e in unsettled_expenses if e.get("link")]) - unsettled_receivables = await fava.get_unsettled_entries_bql(user_id, "receivable") - settled_links.extend([e["link"] for e in unsettled_receivables if e.get("link")]) - except Exception as e: - logger.warning(f"Could not query unsettled entries for settlement links: {e}") - # Continue without links - settlement will still be recorded - - # Format as net settlement transaction - entry = format_net_settlement_entry( - user_id=user_id, - payment_account=lightning_account.name, - receivable_account=user_receivable.name, - payable_account=user_payable.name, - amount_sats=amount_sats, - net_fiat_amount=fiat_amount, - total_receivable_fiat=total_receivable, - total_payable_fiat=total_payable, - fiat_currency=fiat_currency, - description=f"Lightning payment settlement from user {user_id[:8]}", - entry_date=datetime.now().date(), - payment_hash=payment.payment_hash, - reference=payment.payment_hash, - settled_entry_links=settled_links if settled_links else None - ) - - # Submit to Fava using idempotent method to prevent duplicates - # The idempotency key is based on the payment hash, so even if this - # function is called multiple times for the same payment, only one - # entry will be created - result = await fava.add_entry_idempotent(entry, idempotency_key) - - if result.get("existing"): - logger.info( - f"Payment {payment.payment_hash} was already recorded in Fava (idempotent)" - ) - else: - logger.info( - f"Successfully recorded payment {payment.payment_hash} to Fava: " - f"{result.get('data', 'Unknown')}" - ) - + unsettled_expenses = await fava.get_unsettled_entries_bql(user_id, "expense") + settled_links.extend([e["link"] for e in unsettled_expenses if e.get("link")]) + unsettled_receivables = await fava.get_unsettled_entries_bql(user_id, "receivable") + settled_links.extend([e["link"] for e in unsettled_receivables if e.get("link")]) except Exception as e: - logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}") - raise + logger.warning(f"Could not query unsettled entries for settlement links: {e}") + # Continue without links - settlement will still be recorded + + # Format as net settlement transaction + entry = format_net_settlement_entry( + user_id=user_id, + payment_account=lightning_account.name, + receivable_account=user_receivable.name, + payable_account=user_payable.name, + amount_sats=amount_sats, + net_fiat_amount=fiat_amount, + total_receivable_fiat=total_receivable, + total_payable_fiat=total_payable, + fiat_currency=fiat_currency, + description=f"Lightning payment settlement from user {user_id[:8]}", + entry_date=datetime.now().date(), + payment_hash=payment.payment_hash, + reference=payment.payment_hash, + settled_entry_links=settled_links if settled_links else None + ) + + # Submit to Fava + result = await fava.add_entry(entry) + + logger.info( + f"Successfully recorded payment {payment.payment_hash} to Fava: " + f"{result.get('data', 'Unknown')}" + ) + + except Exception as e: + logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}") + raise diff --git a/templates/castle/index.html b/templates/castle/index.html index 2a1e665..6648e6c 100644 --- a/templates/castle/index.html +++ b/templates/castle/index.html @@ -17,14 +17,13 @@

Track expenses, receivables, and balances for the collective

- - + Configure Your Wallet - + Manage Permissions (Admin) - + Castle Settings (Super User Only)
@@ -33,7 +32,7 @@ - + @@ -45,7 +44,7 @@ - + @@ -54,7 +53,7 @@ - + @@ -1123,46 +1122,10 @@ :disable="!isSuperUser" > -
+
Select the wallet that will be used for Castle operations and transactions.
- - -
Fava/Beancount Integration
- - - - - - -
dict: - """Get user's wallet settings (super user only) + """Get user's wallet settings (super user only)""" - Supports both full UUIDs and truncated 8-char IDs (from Beancount accounts). - """ - from .crud import get_user_wallet_settings_by_prefix - - # First try exact match user_wallet = await get_user_wallet(user_id) - - # If not found and user_id looks like a truncated ID (8 chars), try prefix match - if not user_wallet or not user_wallet.user_wallet_id: - if len(user_id) <= 8: - stored_wallet = await get_user_wallet_settings_by_prefix(user_id) - if stored_wallet and stored_wallet.user_wallet_id: - user_wallet = stored_wallet - user_id = stored_wallet.id # Use the full ID - - if not user_wallet or not user_wallet.user_wallet_id: + if not user_wallet: return {"user_id": user_id, "user_wallet_id": None} # Get invoice key for the user's wallet (needed to generate invoices) @@ -2650,7 +2636,6 @@ async def api_approve_expense_entry( This updates the transaction in the Beancount file via Fava API. """ - import httpx from .fava_client import get_fava_client fava = get_fava_client() @@ -2659,6 +2644,7 @@ async def api_approve_expense_entry( all_entries = await fava.get_journal_entries() # 2. Find the entry with matching castle ID in links + target_entry_hash = None target_entry = None for entry in all_entries: @@ -2670,86 +2656,51 @@ async def api_approve_expense_entry( link_clean = link.lstrip('^') # Check if this entry has our castle ID if link_clean == f"castle-{entry_id}" or link_clean.endswith(f"-{entry_id}"): + target_entry_hash = entry.get("entry_hash") target_entry = entry break - if target_entry: + if target_entry_hash: break - if not target_entry: + if not target_entry_hash: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Pending entry {entry_id} not found in Beancount ledger" ) - # Get entry metadata for file location - meta = target_entry.get("meta", {}) - filename = meta.get("filename") - lineno = meta.get("lineno") - date_str = target_entry.get("date", "") + # 3. Get the entry context (source text + sha256sum) + context = await fava.get_entry_context(target_entry_hash) + source = context.get("slice", "") + sha256sum = context.get("sha256sum", "") - if not filename or not lineno: + if not source: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail="Entry metadata missing filename or lineno" + detail="Could not retrieve entry source from Fava" ) - # 3. Get the source file from Fava - async with httpx.AsyncClient(timeout=fava.timeout) as client: - response = await client.get( - f"{fava.base_url}/source", - params={"filename": filename} + # 4. Change flag from ! to * + # Replace the first occurrence of the date + ! pattern + import re + date_str = target_entry.get("date", "") + old_pattern = f"{date_str} !" + new_pattern = f"{date_str} *" + + if old_pattern not in source: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Could not find pending flag pattern '{old_pattern}' in entry source" ) - response.raise_for_status() - source_data = response.json()["data"] - sha256sum = source_data["sha256sum"] - source = source_data["source"] - lines = source.split('\n') + new_source = source.replace(old_pattern, new_pattern, 1) - # 4. Find and modify the entry at the specified line - # Line numbers are 1-indexed, list is 0-indexed - entry_line_idx = lineno - 1 - - if entry_line_idx >= len(lines): - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=f"Line {lineno} not found in source file" - ) - - entry_line = lines[entry_line_idx] - - # Check if the line contains the pending flag pattern - old_pattern = f"{date_str} !" - if old_pattern not in entry_line: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=f"Line {lineno} does not contain expected pattern '{old_pattern}'. Found: {entry_line}" - ) - - # Replace the flag - new_pattern = f"{date_str} *" - new_line = entry_line.replace(old_pattern, new_pattern, 1) - lines[entry_line_idx] = new_line - - # 5. Write back the modified source - new_source = '\n'.join(lines) - - update_response = await client.put( - f"{fava.base_url}/source", - json={ - "file_path": filename, - "source": new_source, - "sha256sum": sha256sum - }, - headers={"Content-Type": "application/json"} - ) - update_response.raise_for_status() - - logger.info(f"Entry {entry_id} approved (flag changed to *)") + # 5. Update the entry via Fava API + await fava.update_entry_source(target_entry_hash, new_source, sha256sum) return { "message": f"Entry {entry_id} approved successfully", "entry_id": entry_id, + "entry_hash": target_entry_hash, "date": date_str, "description": target_entry.get("narration", "") } @@ -2766,7 +2717,6 @@ async def api_reject_expense_entry( Adds #voided tag for audit trail while keeping the '!' flag. Voided transactions are excluded from balances but preserved in the ledger. """ - import httpx from .fava_client import get_fava_client fava = get_fava_client() @@ -2775,6 +2725,7 @@ async def api_reject_expense_entry( all_entries = await fava.get_journal_entries() # 2. Find the entry with matching castle ID in links + target_entry_hash = None target_entry = None for entry in all_entries: @@ -2786,77 +2737,58 @@ async def api_reject_expense_entry( link_clean = link.lstrip('^') # Check if this entry has our castle ID if link_clean == f"castle-{entry_id}" or link_clean.endswith(f"-{entry_id}"): + target_entry_hash = entry.get("entry_hash") target_entry = entry break - if target_entry: + if target_entry_hash: break - if not target_entry: + if not target_entry_hash: raise HTTPException( status_code=HTTPStatus.NOT_FOUND, detail=f"Pending entry {entry_id} not found in Beancount ledger" ) - # Get entry metadata for file location - meta = target_entry.get("meta", {}) - filename = meta.get("filename") - lineno = meta.get("lineno") - date_str = target_entry.get("date", "") + # 3. Get the entry context (source text + sha256sum) + context = await fava.get_entry_context(target_entry_hash) + source = context.get("slice", "") + sha256sum = context.get("sha256sum", "") - if not filename or not lineno: + if not source: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail="Entry metadata missing filename or lineno" + detail="Could not retrieve entry source from Fava" ) - # 3. Get the source file from Fava - async with httpx.AsyncClient(timeout=fava.timeout) as client: - response = await client.get( - f"{fava.base_url}/source", - params={"filename": filename} - ) - response.raise_for_status() - source_data = response.json()["data"] + # 4. Add #voided tag (keep ! flag as per convention) + date_str = target_entry.get("date", "") - sha256sum = source_data["sha256sum"] - source = source_data["source"] + # Add #voided tag if not already present + if "#voided" not in source: + # Find the transaction line and add #voided to the tags + # Pattern: date ! "narration" #existing-tags lines = source.split('\n') + for i, line in enumerate(lines): + if date_str in line and '"' in line and '!' in line: + # Add #voided tag to the transaction line + if '#' in line: + # Already has tags, append voided + lines[i] = line.rstrip() + ' #voided' + else: + # No tags yet, add after narration + lines[i] = line.rstrip() + ' #voided' + break + new_source = '\n'.join(lines) + else: + new_source = source - # 4. Find and modify the entry at the specified line - add #voided tag - entry_line_idx = lineno - 1 - - if entry_line_idx >= len(lines): - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=f"Line {lineno} not found in source file" - ) - - entry_line = lines[entry_line_idx] - - # Add #voided tag if not already present - if "#voided" not in entry_line: - # Add #voided tag to the transaction line - new_line = entry_line.rstrip() + ' #voided' - lines[entry_line_idx] = new_line - - # 5. Write back the modified source - new_source = '\n'.join(lines) - - update_response = await client.put( - f"{fava.base_url}/source", - json={ - "file_path": filename, - "source": new_source, - "sha256sum": sha256sum - }, - headers={"Content-Type": "application/json"} - ) - update_response.raise_for_status() - logger.info(f"Entry {entry_id} rejected (added #voided tag)") + # 5. Update the entry via Fava API + await fava.update_entry_source(target_entry_hash, new_source, sha256sum) return { "message": f"Entry {entry_id} rejected (marked as voided)", "entry_id": entry_id, + "entry_hash": target_entry_hash, "date": date_str, "description": target_entry.get("narration", "") }