castle/tasks.py
padreug b5c36504fb Add concurrency protection for Fava/Beancount ledger writes
This commit addresses critical race conditions when multiple requests
try to write to the ledger file simultaneously.

Changes:
- Add global asyncio.Lock to FavaClient to serialize all write operations
- Add per-user locks for finer-grained concurrency control
- Wrap add_entry(), update_entry_source(), delete_entry() with write lock
- Add retry logic with exponential backoff to add_account() for checksum conflicts
- Add new add_entry_idempotent() method to prevent duplicate entries
- Add ChecksumConflictError exception for conflict handling
- Update on_invoice_paid() to use per-user locking and idempotent entry creation

Fixes #4

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 23:57:03 +01:00

322 lines
12 KiB
Python

"""
Background tasks for Castle accounting extension.
These tasks handle automated reconciliation checks and maintenance.
"""
import asyncio
from asyncio import Queue
from datetime import datetime
from typing import Optional
from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
from loguru import logger
from .crud import check_balance_assertion, get_balance_assertions
from .models import AssertionStatus
async def check_all_balance_assertions() -> dict:
"""
Check all balance assertions and return results.
This can be called manually or scheduled to run daily.
Returns:
dict: Summary of check results
"""
from lnbits.helpers import urlsafe_short_hash
# Get all assertions
all_assertions = await get_balance_assertions(limit=1000)
results = {
"task_id": urlsafe_short_hash(),
"timestamp": datetime.now().isoformat(),
"total": len(all_assertions),
"checked": 0,
"passed": 0,
"failed": 0,
"errors": 0,
"failed_assertions": [],
}
for assertion in all_assertions:
try:
checked = await check_balance_assertion(assertion.id)
results["checked"] += 1
if checked.status == AssertionStatus.PASSED:
results["passed"] += 1
elif checked.status == AssertionStatus.FAILED:
results["failed"] += 1
results["failed_assertions"].append({
"id": assertion.id,
"account_id": assertion.account_id,
"expected_sats": assertion.expected_balance_sats,
"actual_sats": checked.checked_balance_sats,
"difference_sats": checked.difference_sats,
})
except Exception as e:
results["errors"] += 1
print(f"Error checking assertion {assertion.id}: {e}")
# Log results
if results["failed"] > 0:
print(f"[CASTLE] Daily reconciliation check: {results['failed']} FAILED assertions!")
for failed in results["failed_assertions"]:
print(f" - Account {failed['account_id']}: expected {failed['expected_sats']}, got {failed['actual_sats']}")
else:
print(f"[CASTLE] Daily reconciliation check: All {results['passed']} assertions passed ✓")
return results
async def scheduled_daily_reconciliation():
"""
Scheduled task that runs daily to check all balance assertions.
This function is meant to be called by a scheduler (cron, systemd timer, etc.)
or by LNbits background task system.
"""
print(f"[CASTLE] Running scheduled daily reconciliation check at {datetime.now()}")
try:
results = await check_all_balance_assertions()
# TODO: Send notifications if there are failures
# This could send email, webhook, or in-app notification
if results["failed"] > 0:
print(f"[CASTLE] WARNING: {results['failed']} balance assertions failed!")
# Future: Send alert notification
return results
except Exception as e:
print(f"[CASTLE] Error in scheduled reconciliation: {e}")
raise
async def scheduled_account_sync():
"""
Scheduled task that runs hourly to sync accounts from Beancount to Castle DB.
This ensures Castle DB stays in sync with Beancount (source of truth) by
automatically adding any new accounts created in Beancount to Castle's
metadata database for permission tracking.
"""
from .account_sync import sync_accounts_from_beancount
logger.info(f"[CASTLE] Running scheduled account sync at {datetime.now()}")
try:
stats = await sync_accounts_from_beancount(force_full_sync=False)
if stats["accounts_added"] > 0:
logger.info(
f"[CASTLE] Account sync: Added {stats['accounts_added']} new accounts"
)
if stats["errors"]:
logger.warning(
f"[CASTLE] Account sync: {len(stats['errors'])} errors encountered"
)
for error in stats["errors"][:5]: # Log first 5 errors
logger.error(f" - {error}")
return stats
except Exception as e:
logger.error(f"[CASTLE] Error in scheduled account sync: {e}")
raise
async def wait_for_account_sync():
"""
Background task that periodically syncs accounts from Beancount to Castle DB.
Runs hourly to ensure Castle DB stays in sync with Beancount.
"""
logger.info("[CASTLE] Account sync background task started")
while True:
try:
# Run sync
await scheduled_account_sync()
except Exception as e:
logger.error(f"[CASTLE] Account sync error: {e}")
# Wait 1 hour before next sync
await asyncio.sleep(3600) # 3600 seconds = 1 hour
def start_daily_reconciliation_task():
"""
Initialize the daily reconciliation task.
This can be called from the extension's __init__.py or configured
to run via external cron job.
For cron setup:
# Run daily at 2 AM
0 2 * * * curl -X POST http://localhost:5000/castle/api/v1/tasks/daily-reconciliation -H "X-Api-Key: YOUR_ADMIN_KEY"
"""
print("[CASTLE] Daily reconciliation task registered")
# In a production system, you would register this with LNbits task scheduler
# For now, it can be triggered manually via API endpoint
async def wait_for_paid_invoices():
"""
Background task that listens for paid invoices and automatically
records them in the accounting system.
This ensures payments are recorded even if the user closes their browser
before the payment is detected by client-side polling.
"""
invoice_queue = Queue()
register_invoice_listener(invoice_queue, "ext_castle")
while True:
payment = await invoice_queue.get()
await on_invoice_paid(payment)
async def on_invoice_paid(payment: Payment) -> None:
"""
Handle a paid Castle invoice by automatically submitting to Fava.
This function is called automatically when any invoice on the Castle wallet
is paid. It checks if the invoice is a Castle payment and records it in
Beancount via Fava.
Concurrency Protection:
- Uses per-user locking to prevent race conditions when multiple payments
for the same user are processed simultaneously
- Uses idempotent entry creation to prevent duplicate entries even if
the same payment is processed multiple times
"""
# Only process Castle-specific payments
if not payment.extra or payment.extra.get("tag") != "castle":
return
user_id = payment.extra.get("user_id")
if not user_id:
logger.warning(f"Castle invoice {payment.payment_hash} missing user_id in metadata")
return
from .fava_client import get_fava_client
fava = get_fava_client()
# Use idempotency key based on payment hash - this ensures duplicate
# processing of the same payment won't create duplicate entries
idempotency_key = f"ln-{payment.payment_hash[:16]}"
# Acquire per-user lock to serialize processing for this user
# This prevents race conditions when a user has multiple payments being processed
user_lock = fava.get_user_lock(user_id)
async with user_lock:
logger.info(f"Recording Castle payment {payment.payment_hash} for user {user_id[:8]} to Fava")
try:
from decimal import Decimal
from .crud import get_account_by_name, get_or_create_user_account
from .models import AccountType
from .beancount_format import format_net_settlement_entry
# Convert amount from millisatoshis to satoshis
amount_sats = payment.amount // 1000
# Extract fiat metadata from invoice (if present)
fiat_currency = None
fiat_amount = None
if payment.extra:
fiat_currency = payment.extra.get("fiat_currency")
fiat_amount_str = payment.extra.get("fiat_amount")
if fiat_amount_str:
fiat_amount = Decimal(str(fiat_amount_str))
if not fiat_currency or not fiat_amount:
logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata")
return
# Get user's current balance to determine receivables and payables
balance = await fava.get_user_balance(user_id)
fiat_balances = balance.get("fiat_balances", {})
total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0))
# Determine receivables and payables based on balance
# Positive balance = user owes castle (receivable)
# Negative balance = castle owes user (payable)
if total_fiat_balance > 0:
# User owes castle
total_receivable = total_fiat_balance
total_payable = Decimal(0)
else:
# Castle owes user
total_receivable = Decimal(0)
total_payable = abs(total_fiat_balance)
logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})")
# Get account names
user_receivable = await get_or_create_user_account(
user_id, AccountType.ASSET, "Accounts Receivable"
)
user_payable = await get_or_create_user_account(
user_id, AccountType.LIABILITY, "Accounts Payable"
)
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found")
return
# Query for unsettled entries to link this settlement back to them
# Net settlement can settle both expenses and receivables
settled_links = []
try:
unsettled_expenses = await fava.get_unsettled_entries_bql(user_id, "expense")
settled_links.extend([e["link"] for e in unsettled_expenses if e.get("link")])
unsettled_receivables = await fava.get_unsettled_entries_bql(user_id, "receivable")
settled_links.extend([e["link"] for e in unsettled_receivables if e.get("link")])
except Exception as e:
logger.warning(f"Could not query unsettled entries for settlement links: {e}")
# Continue without links - settlement will still be recorded
# Format as net settlement transaction
entry = format_net_settlement_entry(
user_id=user_id,
payment_account=lightning_account.name,
receivable_account=user_receivable.name,
payable_account=user_payable.name,
amount_sats=amount_sats,
net_fiat_amount=fiat_amount,
total_receivable_fiat=total_receivable,
total_payable_fiat=total_payable,
fiat_currency=fiat_currency,
description=f"Lightning payment settlement from user {user_id[:8]}",
entry_date=datetime.now().date(),
payment_hash=payment.payment_hash,
reference=payment.payment_hash,
settled_entry_links=settled_links if settled_links else None
)
# Submit to Fava using idempotent method to prevent duplicates
# The idempotency key is based on the payment hash, so even if this
# function is called multiple times for the same payment, only one
# entry will be created
result = await fava.add_entry_idempotent(entry, idempotency_key)
if result.get("existing"):
logger.info(
f"Payment {payment.payment_hash} was already recorded in Fava (idempotent)"
)
else:
logger.info(
f"Successfully recorded payment {payment.payment_hash} to Fava: "
f"{result.get('data', 'Unknown')}"
)
except Exception as e:
logger.error(f"Error recording Castle payment {payment.payment_hash}: {e}")
raise