Add concurrency protection for Fava/Beancount ledger writes

This commit addresses critical race conditions when multiple requests
try to write to the ledger file simultaneously.

Changes:
- Add global asyncio.Lock to FavaClient to serialize all write operations
- Add per-user locks for finer-grained concurrency control
- Wrap add_entry(), update_entry_source(), delete_entry() with write lock
- Add retry logic with exponential backoff to add_account() for checksum conflicts
- Add new add_entry_idempotent() method to prevent duplicate entries
- Add ChecksumConflictError exception for conflict handling
- Update on_invoice_paid() to use per-user locking and idempotent entry creation

Fixes #4

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
padreug 2026-01-06 23:57:03 +01:00
parent e403ec223d
commit b5c36504fb
2 changed files with 397 additions and 245 deletions

View file

@ -17,6 +17,7 @@ Fava provides a REST API for:
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
""" """
import asyncio
import httpx import httpx
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from decimal import Decimal from decimal import Decimal
@ -24,6 +25,11 @@ from datetime import date, datetime
from loguru import logger from loguru import logger
class ChecksumConflictError(Exception):
"""Raised when a Fava write operation fails due to stale checksum (concurrent modification)."""
pass
class FavaClient: class FavaClient:
""" """
Async client for Fava REST API. Async client for Fava REST API.
@ -48,6 +54,37 @@ class FavaClient:
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api" self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
self.timeout = timeout self.timeout = timeout
# Concurrency control: Global write lock to serialize all ledger modifications.
# This prevents race conditions when multiple requests try to write to the
# Beancount ledger file simultaneously. Without this lock, concurrent writes
# can cause data loss, duplicate entries, or file corruption.
#
# Note: This serializes ALL writes which may become a bottleneck at scale.
# For higher throughput, consider per-user locking or distributed locking.
self._write_lock = asyncio.Lock()
# Per-user locks for user-specific operations (reduces contention)
self._user_locks: Dict[str, asyncio.Lock] = {}
def get_user_lock(self, user_id: str) -> asyncio.Lock:
"""
Get or create a lock for a specific user.
This enables per-user locking to reduce contention when multiple users
are making concurrent requests. User-specific operations should acquire
this lock in addition to (or instead of) the global write lock.
Args:
user_id: User ID (uses first 8 characters for consistency)
Returns:
asyncio.Lock for this user
"""
user_key = user_id[:8]
if user_key not in self._user_locks:
self._user_locks[user_key] = asyncio.Lock()
return self._user_locks[user_key]
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]: async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
""" """
Submit a new journal entry to Fava. Submit a new journal entry to Fava.
@ -88,26 +125,100 @@ class FavaClient:
"meta": {"user_id": "abc123"} "meta": {"user_id": "abc123"}
} }
result = await fava_client.add_entry(entry) result = await fava_client.add_entry(entry)
Note:
This method acquires a global write lock to prevent concurrent
modifications to the ledger file. All writes are serialized.
""" """
# Acquire global write lock to serialize ledger modifications
async with self._write_lock:
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/add_entries",
json={"entries": [entry]},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def add_entry_idempotent(
self,
entry: Dict[str, Any],
idempotency_key: str
) -> Dict[str, Any]:
"""
Submit a journal entry with idempotency protection.
This method checks if an entry with the given idempotency key (as a Beancount link)
already exists before inserting. This prevents duplicate entries when the same
operation is retried (e.g., due to network issues or concurrent requests).
The idempotency key is stored as a Beancount link on the entry. Links are part
of the entry's identity and are indexed by Beancount, making lookup efficient.
Args:
entry: Beancount entry dict (same format as add_entry)
idempotency_key: Unique key for this operation (e.g., "castle-{uuid}" or "ln-{payment_hash}")
Returns:
Response from Fava if entry was created, or existing entry data if already exists
Example:
# Use payment hash as idempotency key for Lightning payments
result = await fava.add_entry_idempotent(
entry=settlement_entry,
idempotency_key=f"ln-{payment_hash[:16]}"
)
# Use expense ID for expense entries
result = await fava.add_entry_idempotent(
entry=expense_entry,
idempotency_key=f"exp-{expense_id}"
)
"""
from .beancount_format import sanitize_link
# Sanitize the idempotency key to ensure it's a valid Beancount link
safe_key = sanitize_link(idempotency_key)
# Check if entry with this link already exists
try: try:
async with httpx.AsyncClient(timeout=self.timeout) as client: entries = await self.get_journal_entries(days=30) # Check recent entries
response = await client.put(
f"{self.base_url}/add_entries",
json={"entries": [entry]},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}") for existing_entry in entries:
return result existing_links = existing_entry.get("links", [])
if safe_key in existing_links:
logger.info(f"Entry with idempotency key '{safe_key}' already exists, skipping insert")
return {
"data": "Entry already exists (idempotent)",
"existing": True,
"entry": existing_entry
}
except Exception as e:
logger.warning(f"Could not check for existing entry with key '{safe_key}': {e}")
# Continue anyway - Beancount will error if there's a true duplicate
except httpx.HTTPStatusError as e: # Add the idempotency key as a link if not already present
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}") if "links" not in entry:
raise entry["links"] = []
except httpx.RequestError as e: if safe_key not in entry["links"]:
logger.error(f"Fava connection error: {e}") entry["links"].append(safe_key)
raise
# Now add the entry (this will acquire the write lock)
result = await self.add_entry(entry)
result["existing"] = False
return result
async def get_account_balance(self, account_name: str) -> Dict[str, Any]: async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
""" """
@ -1146,6 +1257,10 @@ class FavaClient:
Returns: Returns:
New sha256sum after update New sha256sum after update
Note:
This method acquires a global write lock to prevent concurrent
modifications to the ledger file. All writes are serialized.
Example: Example:
# Get context # Get context
context = await fava.get_entry_context("abc123") context = await fava.get_entry_context("abc123")
@ -1158,26 +1273,28 @@ class FavaClient:
# Update # Update
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256) new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
""" """
try: # Acquire global write lock to serialize ledger modifications
async with httpx.AsyncClient(timeout=self.timeout) as client: async with self._write_lock:
response = await client.put( try:
f"{self.base_url}/source_slice", async with httpx.AsyncClient(timeout=self.timeout) as client:
json={ response = await client.put(
"entry_hash": entry_hash, f"{self.base_url}/source_slice",
"source": new_source, json={
"sha256sum": sha256sum "entry_hash": entry_hash,
} "source": new_source,
) "sha256sum": sha256sum
response.raise_for_status() }
result = response.json() )
return result.get("data", "") response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}") logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
raise raise
except httpx.RequestError as e: except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}") logger.error(f"Fava connection error: {e}")
raise raise
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str: async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
""" """
@ -1190,36 +1307,43 @@ class FavaClient:
Returns: Returns:
Success message Success message
Note:
This method acquires a global write lock to prevent concurrent
modifications to the ledger file. All writes are serialized.
Example: Example:
context = await fava.get_entry_context("abc123") context = await fava.get_entry_context("abc123")
await fava.delete_entry("abc123", context["sha256sum"]) await fava.delete_entry("abc123", context["sha256sum"])
""" """
try: # Acquire global write lock to serialize ledger modifications
async with httpx.AsyncClient(timeout=self.timeout) as client: async with self._write_lock:
response = await client.delete( try:
f"{self.base_url}/source_slice", async with httpx.AsyncClient(timeout=self.timeout) as client:
params={ response = await client.delete(
"entry_hash": entry_hash, f"{self.base_url}/source_slice",
"sha256sum": sha256sum params={
} "entry_hash": entry_hash,
) "sha256sum": sha256sum
response.raise_for_status() }
result = response.json() )
return result.get("data", "") response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}") logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
raise raise
except httpx.RequestError as e: except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}") logger.error(f"Fava connection error: {e}")
raise raise
async def add_account( async def add_account(
self, self,
account_name: str, account_name: str,
currencies: list[str], currencies: list[str],
opening_date: Optional[date] = None, opening_date: Optional[date] = None,
metadata: Optional[Dict[str, Any]] = None metadata: Optional[Dict[str, Any]] = None,
max_retries: int = 3
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Add an account to the Beancount ledger via an Open directive. Add an account to the Beancount ledger via an Open directive.
@ -1227,15 +1351,25 @@ class FavaClient:
NOTE: Fava's /api/add_entries endpoint does NOT support Open directives. NOTE: Fava's /api/add_entries endpoint does NOT support Open directives.
This method uses /api/source to directly edit the Beancount file. This method uses /api/source to directly edit the Beancount file.
This method implements optimistic concurrency control with retry logic:
- Acquires a global write lock before modifying the ledger
- Uses SHA256 checksum to detect concurrent modifications
- Retries with exponential backoff on checksum conflicts
- Re-checks if account was created by concurrent request before retrying
Args: Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123") account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
currencies: List of currencies for this account (e.g., ["EUR", "SATS"]) currencies: List of currencies for this account (e.g., ["EUR", "SATS"])
opening_date: Date to open the account (defaults to today) opening_date: Date to open the account (defaults to today)
metadata: Optional metadata for the account metadata: Optional metadata for the account
max_retries: Maximum number of retry attempts on checksum conflict (default: 3)
Returns: Returns:
Response from Fava ({"data": "new_sha256sum", "mtime": "..."}) Response from Fava ({"data": "new_sha256sum", "mtime": "..."})
Raises:
ChecksumConflictError: If all retry attempts fail due to concurrent modifications
Example: Example:
# Add a user's receivable account # Add a user's receivable account
result = await fava.add_account( result = await fava.add_account(
@ -1255,89 +1389,115 @@ class FavaClient:
if opening_date is None: if opening_date is None:
opening_date = date_type.today() opening_date = date_type.today()
try: last_error = None
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Step 1: Get the main Beancount file path from Fava
options_response = await client.get(f"{self.base_url}/options")
options_response.raise_for_status()
options_data = options_response.json()["data"]
file_path = options_data["beancount_options"]["filename"]
logger.debug(f"Fava main file: {file_path}") for attempt in range(max_retries):
# Acquire global write lock to serialize ledger modifications
async with self._write_lock:
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Step 1: Get the main Beancount file path from Fava
options_response = await client.get(f"{self.base_url}/options")
options_response.raise_for_status()
options_data = options_response.json()["data"]
file_path = options_data["beancount_options"]["filename"]
# Step 2: Get current source file logger.debug(f"Fava main file: {file_path}")
response = await client.get(
f"{self.base_url}/source",
params={"filename": file_path}
)
response.raise_for_status()
source_data = response.json()["data"]
sha256sum = source_data["sha256sum"] # Step 2: Get current source file (fresh read on each attempt)
source = source_data["source"] response = await client.get(
f"{self.base_url}/source",
params={"filename": file_path}
)
response.raise_for_status()
source_data = response.json()["data"]
# Step 2: Check if account already exists sha256sum = source_data["sha256sum"]
if f"open {account_name}" in source: source = source_data["source"]
logger.info(f"Account {account_name} already exists in Beancount file")
return {"data": sha256sum, "mtime": source_data.get("mtime", "")}
# Step 3: Find insertion point (after last Open directive AND its metadata) # Step 3: Check if account already exists (may have been created by concurrent request)
lines = source.split('\n') if f"open {account_name}" in source:
insert_index = 0 logger.info(f"Account {account_name} already exists in Beancount file")
for i, line in enumerate(lines): return {"data": sha256sum, "mtime": source_data.get("mtime", "")}
if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line:
# Found an Open directive, now skip over any metadata lines
insert_index = i + 1
# Skip metadata lines (lines starting with whitespace)
while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip():
insert_index += 1
# Step 4: Format Open directive as Beancount text # Step 4: Find insertion point (after last Open directive AND its metadata)
currencies_str = ", ".join(currencies) lines = source.split('\n')
open_lines = [ insert_index = 0
"", for i, line in enumerate(lines):
f"{opening_date.isoformat()} open {account_name} {currencies_str}" if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line:
] # Found an Open directive, now skip over any metadata lines
insert_index = i + 1
# Skip metadata lines (lines starting with whitespace)
while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip():
insert_index += 1
# Add metadata if provided # Step 5: Format Open directive as Beancount text
if metadata: currencies_str = ", ".join(currencies)
for key, value in metadata.items(): open_lines = [
# Format metadata with proper indentation "",
if isinstance(value, str): f"{opening_date.isoformat()} open {account_name} {currencies_str}"
open_lines.append(f' {key}: "{value}"') ]
else:
open_lines.append(f' {key}: {value}')
# Step 5: Insert into source # Add metadata if provided
for i, line in enumerate(open_lines): if metadata:
lines.insert(insert_index + i, line) for key, value in metadata.items():
# Format metadata with proper indentation
if isinstance(value, str):
open_lines.append(f' {key}: "{value}"')
else:
open_lines.append(f' {key}: {value}')
new_source = '\n'.join(lines) # Step 6: Insert into source
for i, line in enumerate(open_lines):
lines.insert(insert_index + i, line)
# Step 6: Update source file via PUT /api/source new_source = '\n'.join(lines)
update_payload = {
"file_path": file_path,
"source": new_source,
"sha256sum": sha256sum
}
response = await client.put( # Step 7: Update source file via PUT /api/source
f"{self.base_url}/source", update_payload = {
json=update_payload, "file_path": file_path,
headers={"Content-Type": "application/json"} "source": new_source,
) "sha256sum": sha256sum
response.raise_for_status() }
result = response.json()
logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}") response = await client.put(
return result f"{self.base_url}/source",
json=update_payload,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
except httpx.HTTPStatusError as e: logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}")
logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}") return result
raise
except httpx.RequestError as e: except httpx.HTTPStatusError as e:
logger.error(f"Fava connection error: {e}") # Check for checksum conflict (HTTP 412 Precondition Failed or similar)
raise if e.response.status_code in (409, 412):
last_error = ChecksumConflictError(
f"Checksum conflict on attempt {attempt + 1}/{max_retries}: {e.response.text}"
)
logger.warning(
f"Checksum conflict adding account {account_name} "
f"(attempt {attempt + 1}/{max_retries}), retrying..."
)
# Continue to retry logic below
else:
logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
# If we get here due to checksum conflict, wait with exponential backoff before retry
if attempt < max_retries - 1:
backoff_time = 0.1 * (2 ** attempt) # 0.1s, 0.2s, 0.4s
logger.info(f"Waiting {backoff_time}s before retry...")
await asyncio.sleep(backoff_time)
# All retries exhausted
logger.error(f"Failed to add account {account_name} after {max_retries} attempts due to concurrent modifications")
raise last_error or ChecksumConflictError(f"Failed to add account after {max_retries} attempts")
async def get_unsettled_entries_bql( async def get_unsettled_entries_bql(
self, self,

234
tasks.py
View file

@ -187,6 +187,12 @@ async def on_invoice_paid(payment: Payment) -> None:
This function is called automatically when any invoice on the Castle wallet This function is called automatically when any invoice on the Castle wallet
is paid. It checks if the invoice is a Castle payment and records it in is paid. It checks if the invoice is a Castle payment and records it in
Beancount via Fava. Beancount via Fava.
Concurrency Protection:
- Uses per-user locking to prevent race conditions when multiple payments
for the same user are processed simultaneously
- Uses idempotent entry creation to prevent duplicate entries even if
the same payment is processed multiple times
""" """
# Only process Castle-specific payments # Only process Castle-specific payments
if not payment.extra or payment.extra.get("tag") != "castle": if not payment.extra or payment.extra.get("tag") != "castle":
@ -197,134 +203,120 @@ async def on_invoice_paid(payment: Payment) -> None:
logger.warning(f"Castle invoice {payment.payment_hash} missing user_id in metadata") logger.warning(f"Castle invoice {payment.payment_hash} missing user_id in metadata")
return return
# Check if payment already recorded (idempotency)
# Query Fava for existing entry with this payment hash link
from .fava_client import get_fava_client from .fava_client import get_fava_client
import httpx
fava = get_fava_client() fava = get_fava_client()
try: # Use idempotency key based on payment hash - this ensures duplicate
# Check if payment already recorded by fetching recent entries # processing of the same payment won't create duplicate entries
# Note: We can't use BQL query with `links ~ 'pattern'` because links is a set type idempotency_key = f"ln-{payment.payment_hash[:16]}"
# and BQL doesn't support regex matching on sets. Instead, fetch entries and filter in Python.
link_to_find = f"ln-{payment.payment_hash[:16]}"
async with httpx.AsyncClient(timeout=5.0) as client: # Acquire per-user lock to serialize processing for this user
# Get recent entries from Fava's journal endpoint # This prevents race conditions when a user has multiple payments being processed
response = await client.get( user_lock = fava.get_user_lock(user_id)
f"{fava.base_url}/api/journal",
params={"time": ""} # Get all entries async with user_lock:
logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]} to Fava")
try:
from decimal import Decimal
from .crud import get_account_by_name, get_or_create_user_account
from .models import AccountType
from .beancount_format import format_net_settlement_entry
# Convert amount from millisatoshis to satoshis
amount_sats = payment.amount // 1000
# Extract fiat metadata from invoice (if present)
fiat_currency = None
fiat_amount = None
if payment.extra:
fiat_currency = payment.extra.get("fiat_currency")
fiat_amount_str = payment.extra.get("fiat_amount")
if fiat_amount_str:
fiat_amount = Decimal(str(fiat_amount_str))
if not fiat_currency or not fiat_amount:
logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata")
return
# Get user's current balance to determine receivables and payables
balance = await fava.get_user_balance(user_id)
fiat_balances = balance.get("fiat_balances", {})
total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0))
# Determine receivables and payables based on balance
# Positive balance = user owes castle (receivable)
# Negative balance = castle owes user (payable)
if total_fiat_balance > 0:
# User owes castle
total_receivable = total_fiat_balance
total_payable = Decimal(0)
else:
# Castle owes user
total_receivable = Decimal(0)
total_payable = abs(total_fiat_balance)
logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})")
# Get account names
user_receivable = await get_or_create_user_account(
user_id, AccountType.ASSET, "Accounts Receivable"
)
user_payable = await get_or_create_user_account(
user_id, AccountType.LIABILITY, "Accounts Payable"
)
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found")
return
# Query for unsettled entries to link this settlement back to them
# Net settlement can settle both expenses and receivables
settled_links = []
try:
unsettled_expenses = await fava.get_unsettled_entries_bql(user_id, "expense")
settled_links.extend([e["link"] for e in unsettled_expenses if e.get("link")])
unsettled_receivables = await fava.get_unsettled_entries_bql(user_id, "receivable")
settled_links.extend([e["link"] for e in unsettled_receivables if e.get("link")])
except Exception as e:
logger.warning(f"Could not query unsettled entries for settlement links: {e}")
# Continue without links - settlement will still be recorded
# Format as net settlement transaction
entry = format_net_settlement_entry(
user_id=user_id,
payment_account=lightning_account.name,
receivable_account=user_receivable.name,
payable_account=user_payable.name,
amount_sats=amount_sats,
net_fiat_amount=fiat_amount,
total_receivable_fiat=total_receivable,
total_payable_fiat=total_payable,
fiat_currency=fiat_currency,
description=f"Lightning payment settlement from user {user_id[:8]}",
entry_date=datetime.now().date(),
payment_hash=payment.payment_hash,
reference=payment.payment_hash,
settled_entry_links=settled_links if settled_links else None
) )
if response.status_code == 200: # Submit to Fava using idempotent method to prevent duplicates
data = response.json() # The idempotency key is based on the payment hash, so even if this
entries = data.get('entries', []) # function is called multiple times for the same payment, only one
# entry will be created
result = await fava.add_entry_idempotent(entry, idempotency_key)
# Check if any entry has our payment link if result.get("existing"):
for entry in entries: logger.info(
entry_links = entry.get('links', []) f"Payment {payment.payment_hash} was already recorded in Fava (idempotent)"
if link_to_find in entry_links: )
logger.info(f"Payment {payment.payment_hash} already recorded in Fava, skipping") else:
return logger.info(
f"Successfully recorded payment {payment.payment_hash} to Fava: "
f"{result.get('data', 'Unknown')}"
)
except Exception as e:
logger.warning(f"Could not check Fava for duplicate payment: {e}")
# Continue anyway - Fava/Beancount will catch duplicate if it exists
logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]} to Fava")
try:
from decimal import Decimal
from .crud import get_account_by_name, get_or_create_user_account
from .models import AccountType
from .beancount_format import format_net_settlement_entry
# Convert amount from millisatoshis to satoshis
amount_sats = payment.amount // 1000
# Extract fiat metadata from invoice (if present)
fiat_currency = None
fiat_amount = None
if payment.extra:
fiat_currency = payment.extra.get("fiat_currency")
fiat_amount_str = payment.extra.get("fiat_amount")
if fiat_amount_str:
fiat_amount = Decimal(str(fiat_amount_str))
if not fiat_currency or not fiat_amount:
logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata")
return
# Get user's current balance to determine receivables and payables
balance = await fava.get_user_balance(user_id)
fiat_balances = balance.get("fiat_balances", {})
total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0))
# Determine receivables and payables based on balance
# Positive balance = user owes castle (receivable)
# Negative balance = castle owes user (payable)
if total_fiat_balance > 0:
# User owes castle
total_receivable = total_fiat_balance
total_payable = Decimal(0)
else:
# Castle owes user
total_receivable = Decimal(0)
total_payable = abs(total_fiat_balance)
logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})")
# Get account names
user_receivable = await get_or_create_user_account(
user_id, AccountType.ASSET, "Accounts Receivable"
)
user_payable = await get_or_create_user_account(
user_id, AccountType.LIABILITY, "Accounts Payable"
)
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found")
return
# Query for unsettled entries to link this settlement back to them
# Net settlement can settle both expenses and receivables
settled_links = []
try:
unsettled_expenses = await fava.get_unsettled_entries_bql(user_id, "expense")
settled_links.extend([e["link"] for e in unsettled_expenses if e.get("link")])
unsettled_receivables = await fava.get_unsettled_entries_bql(user_id, "receivable")
settled_links.extend([e["link"] for e in unsettled_receivables if e.get("link")])
except Exception as e: except Exception as e:
logger.warning(f"Could not query unsettled entries for settlement links: {e}") logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}")
# Continue without links - settlement will still be recorded raise
# Format as net settlement transaction
entry = format_net_settlement_entry(
user_id=user_id,
payment_account=lightning_account.name,
receivable_account=user_receivable.name,
payable_account=user_payable.name,
amount_sats=amount_sats,
net_fiat_amount=fiat_amount,
total_receivable_fiat=total_receivable,
total_payable_fiat=total_payable,
fiat_currency=fiat_currency,
description=f"Lightning payment settlement from user {user_id[:8]}",
entry_date=datetime.now().date(),
payment_hash=payment.payment_hash,
reference=payment.payment_hash,
settled_entry_links=settled_links if settled_links else None
)
# Submit to Fava
result = await fava.add_entry(entry)
logger.info(
f"Successfully recorded payment {payment.payment_hash} to Fava: "
f"{result.get('data', 'Unknown')}"
)
except Exception as e:
logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}")
raise