libra/tasks.py
Padreug 09a5d6ed55 Polish account-creation flow: insertion point, user_id consistency, startup race
Three small fixes shaken out by live testing on aio-demo:

1. fava_client.add_account: when the target file has no Open directives
   yet (e.g. the empty accounts/users.beancount seed), append at end of
   file instead of inserting at index 0. Keeps the seed header comments
   at the top where they belong.

2. account_sync.sync_single_account_from_beancount: read the full user_id
   from Beancount metadata when present, fall back to the name-derived
   8-char prefix otherwise. crud.get_or_create_user_account writes the
   full 32-char user_id into Beancount metadata when creating per-user
   accounts; the sync function was only looking at the account name and
   returning the prefix, so the post-sync `WHERE user_id=:user_id` query
   in crud.py missed the row and fell through the UNIQUE-constraint
   recovery path. Three lines of warning noise per user-account creation.

3. tasks.wait_for_account_sync: await `wait_for_fava_client()` (new
   helper backed by an asyncio.Event in fava_client.py) before the first
   sync iteration. Previously the sync task started in libra_start()
   raced the fire-and-forget `_init_fava()` coroutine and reliably
   crashed the first run with "Fava client not initialized".

Refs: aiolabs/libra#28
2026-06-06 19:36:39 +02:00

329 lines
12 KiB
Python

"""
Background tasks for Libra accounting extension.
These tasks handle automated reconciliation checks and maintenance.
"""
import asyncio
from asyncio import Queue
from datetime import datetime
from typing import Optional
from lnbits.core.models import Payment
from lnbits.tasks import register_invoice_listener
from loguru import logger
from .crud import check_balance_assertion, get_balance_assertions
from .models import AssertionStatus
async def check_all_balance_assertions() -> dict:
"""
Check all balance assertions and return results.
This can be called manually or scheduled to run daily.
Returns:
dict: Summary of check results
"""
from lnbits.helpers import urlsafe_short_hash
# Get all assertions
all_assertions = await get_balance_assertions(limit=1000)
results = {
"task_id": urlsafe_short_hash(),
"timestamp": datetime.now().isoformat(),
"total": len(all_assertions),
"checked": 0,
"passed": 0,
"failed": 0,
"errors": 0,
"failed_assertions": [],
}
for assertion in all_assertions:
try:
checked = await check_balance_assertion(assertion.id)
results["checked"] += 1
if checked.status == AssertionStatus.PASSED:
results["passed"] += 1
elif checked.status == AssertionStatus.FAILED:
results["failed"] += 1
results["failed_assertions"].append({
"id": assertion.id,
"account_id": assertion.account_id,
"expected_sats": assertion.expected_balance_sats,
"actual_sats": checked.checked_balance_sats,
"difference_sats": checked.difference_sats,
})
except Exception as e:
results["errors"] += 1
print(f"Error checking assertion {assertion.id}: {e}")
# Log results
if results["failed"] > 0:
print(f"[LIBRA] Daily reconciliation check: {results['failed']} FAILED assertions!")
for failed in results["failed_assertions"]:
print(f" - Account {failed['account_id']}: expected {failed['expected_sats']}, got {failed['actual_sats']}")
else:
print(f"[LIBRA] Daily reconciliation check: All {results['passed']} assertions passed ✓")
return results
async def scheduled_daily_reconciliation():
"""
Scheduled task that runs daily to check all balance assertions.
This function is meant to be called by a scheduler (cron, systemd timer, etc.)
or by LNbits background task system.
"""
print(f"[LIBRA] Running scheduled daily reconciliation check at {datetime.now()}")
try:
results = await check_all_balance_assertions()
# TODO: Send notifications if there are failures
# This could send email, webhook, or in-app notification
if results["failed"] > 0:
print(f"[LIBRA] WARNING: {results['failed']} balance assertions failed!")
# Future: Send alert notification
return results
except Exception as e:
print(f"[LIBRA] Error in scheduled reconciliation: {e}")
raise
async def scheduled_account_sync():
"""
Scheduled task that runs hourly to sync accounts from Beancount to Libra DB.
This ensures Libra DB stays in sync with Beancount (source of truth) by
automatically adding any new accounts created in Beancount to Libra's
metadata database for permission tracking.
"""
from .account_sync import sync_accounts_from_beancount
logger.info(f"[LIBRA] Running scheduled account sync at {datetime.now()}")
try:
stats = await sync_accounts_from_beancount(force_full_sync=False)
if stats["accounts_added"] > 0:
logger.info(
f"[LIBRA] Account sync: Added {stats['accounts_added']} new accounts"
)
if stats["errors"]:
logger.warning(
f"[LIBRA] Account sync: {len(stats['errors'])} errors encountered"
)
for error in stats["errors"][:5]: # Log first 5 errors
logger.error(f" - {error}")
return stats
except Exception as e:
logger.error(f"[LIBRA] Error in scheduled account sync: {e}")
raise
async def wait_for_account_sync():
"""
Background task that periodically syncs accounts from Beancount to Libra DB.
Runs hourly to ensure Libra DB stays in sync with Beancount.
Blocks on `wait_for_fava_client()` before the first iteration so we don't
race the fire-and-forget `_init_fava()` started in `libra_start()` and
fail the first sync with "Fava client not initialized".
"""
from .fava_client import wait_for_fava_client
logger.info("[LIBRA] Account sync background task started")
await wait_for_fava_client()
while True:
try:
# Run sync
await scheduled_account_sync()
except Exception as e:
logger.error(f"[LIBRA] Account sync error: {e}")
# Wait 1 hour before next sync
await asyncio.sleep(3600) # 3600 seconds = 1 hour
def start_daily_reconciliation_task():
"""
Initialize the daily reconciliation task.
This can be called from the extension's __init__.py or configured
to run via external cron job.
For cron setup:
# Run daily at 2 AM
0 2 * * * curl -X POST http://localhost:5000/libra/api/v1/tasks/daily-reconciliation -H "X-Api-Key: YOUR_ADMIN_KEY"
"""
print("[LIBRA] Daily reconciliation task registered")
# In a production system, you would register this with LNbits task scheduler
# For now, it can be triggered manually via API endpoint
async def wait_for_paid_invoices():
"""
Background task that listens for paid invoices and automatically
records them in the accounting system.
This ensures payments are recorded even if the user closes their browser
before the payment is detected by client-side polling.
"""
invoice_queue = Queue()
register_invoice_listener(invoice_queue, "ext_libra")
while True:
payment = await invoice_queue.get()
await on_invoice_paid(payment)
async def on_invoice_paid(payment: Payment) -> None:
"""
Handle a paid Libra invoice by automatically submitting to Fava.
This function is called automatically when any invoice on the Libra wallet
is paid. It checks if the invoice is a Libra payment and records it in
Beancount via Fava.
Concurrency Protection:
- Uses per-user locking to prevent race conditions when multiple payments
for the same user are processed simultaneously
- Uses idempotent entry creation to prevent duplicate entries even if
the same payment is processed multiple times
"""
# Only process Libra-specific payments
if not payment.extra or payment.extra.get("tag") != "libra":
return
user_id = payment.extra.get("user_id")
if not user_id:
logger.warning(f"Libra invoice {payment.payment_hash} missing user_id in metadata")
return
from .fava_client import get_fava_client
fava = get_fava_client()
# Use idempotency key based on payment hash - this ensures duplicate
# processing of the same payment won't create duplicate entries
idempotency_key = f"ln-{payment.payment_hash[:16]}"
# Acquire per-user lock to serialize processing for this user
# This prevents race conditions when a user has multiple payments being processed
user_lock = fava.get_user_lock(user_id)
async with user_lock:
logger.info(f"Recording Libra payment {payment.payment_hash} for user {user_id[:8]} to Fava")
try:
from decimal import Decimal
from .crud import get_account_by_name, get_or_create_user_account
from .models import AccountType
from .beancount_format import format_net_settlement_entry
# Convert amount from millisatoshis to satoshis
amount_sats = payment.amount // 1000
# Extract fiat metadata from invoice (if present)
fiat_currency = None
fiat_amount = None
if payment.extra:
fiat_currency = payment.extra.get("fiat_currency")
fiat_amount_str = payment.extra.get("fiat_amount")
if fiat_amount_str:
fiat_amount = Decimal(str(fiat_amount_str))
if not fiat_currency or not fiat_amount:
logger.error(f"Payment {payment.payment_hash} missing fiat currency/amount metadata")
return
# Get user's current balance to determine receivables and payables
balance = await fava.get_user_balance(user_id)
fiat_balances = balance.get("fiat_balances", {})
total_fiat_balance = fiat_balances.get(fiat_currency, Decimal(0))
# Determine receivables and payables based on balance
# Positive balance = user owes libra (receivable)
# Negative balance = libra owes user (payable)
if total_fiat_balance > 0:
# User owes libra
total_receivable = total_fiat_balance
total_payable = Decimal(0)
else:
# Libra owes user
total_receivable = Decimal(0)
total_payable = abs(total_fiat_balance)
logger.info(f"Settlement: {fiat_amount} {fiat_currency} (Receivable: {total_receivable}, Payable: {total_payable})")
# Get account names
user_receivable = await get_or_create_user_account(
user_id, AccountType.ASSET, "Accounts Receivable"
)
user_payable = await get_or_create_user_account(
user_id, AccountType.LIABILITY, "Accounts Payable"
)
lightning_account = await get_account_by_name("Assets:Bitcoin:Lightning")
if not lightning_account:
logger.error("Lightning account 'Assets:Bitcoin:Lightning' not found")
return
# Query for unsettled entries to link this settlement back to them
# Net settlement can settle both expenses and receivables
settled_links = []
try:
unsettled_expenses = await fava.get_unsettled_entries_bql(user_id, "expense")
settled_links.extend([e["link"] for e in unsettled_expenses if e.get("link")])
unsettled_receivables = await fava.get_unsettled_entries_bql(user_id, "receivable")
settled_links.extend([e["link"] for e in unsettled_receivables if e.get("link")])
except Exception as e:
logger.warning(f"Could not query unsettled entries for settlement links: {e}")
# Continue without links - settlement will still be recorded
# Format as net settlement transaction
entry = format_net_settlement_entry(
user_id=user_id,
payment_account=lightning_account.name,
receivable_account=user_receivable.name,
payable_account=user_payable.name,
amount_sats=amount_sats,
net_fiat_amount=fiat_amount,
total_receivable_fiat=total_receivable,
total_payable_fiat=total_payable,
fiat_currency=fiat_currency,
description=f"Lightning payment settlement from user {user_id[:8]}",
entry_date=datetime.now().date(),
payment_hash=payment.payment_hash,
reference=payment.payment_hash,
settled_entry_links=settled_links if settled_links else None
)
# Submit to Fava using idempotent method to prevent duplicates
# The idempotency key is based on the payment hash, so even if this
# function is called multiple times for the same payment, only one
# entry will be created
result = await fava.add_entry_idempotent(entry, idempotency_key)
if result.get("existing"):
logger.info(
f"Payment {payment.payment_hash} was already recorded in Fava (idempotent)"
)
else:
logger.info(
f"Successfully recorded payment {payment.payment_hash} to Fava: "
f"{result.get('data', 'Unknown')}"
)
except Exception as e:
logger.error(f"Error recording Libra payment {payment.payment_hash}: {e}")
raise