This commit addresses critical race conditions when multiple requests try to write to the ledger file simultaneously. Changes: - Add global asyncio.Lock to FavaClient to serialize all write operations - Add per-user locks for finer-grained concurrency control - Wrap add_entry(), update_entry_source(), delete_entry() with write lock - Add retry logic with exponential backoff to add_account() for checksum conflicts - Add new add_entry_idempotent() method to prevent duplicate entries - Add ChecksumConflictError exception for conflict handling - Update on_invoice_paid() to use per-user locking and idempotent entry creation Fixes #4 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1817 lines
72 KiB
Python
1817 lines
72 KiB
Python
"""
|
|
Fava API client for Castle.
|
|
|
|
This module provides an async HTTP client for interacting with Fava's JSON API.
|
|
All accounting logic is delegated to Fava/Beancount.
|
|
|
|
Fava provides a REST API for:
|
|
- Adding transactions (PUT /api/add_entries)
|
|
- Adding accounts via Open directives (PUT /api/add_entries)
|
|
- Querying balances (GET /api/query)
|
|
- Balance sheets (GET /api/balance_sheet)
|
|
- Account reports (GET /api/account_report)
|
|
- Getting entry context (GET /api/context)
|
|
- Updating entries (PUT /api/source_slice)
|
|
- Deleting entries (DELETE /api/source_slice)
|
|
|
|
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
|
|
"""
|
|
|
|
import asyncio
|
|
import httpx
|
|
from typing import Any, Dict, List, Optional
|
|
from decimal import Decimal
|
|
from datetime import date, datetime
|
|
from loguru import logger
|
|
|
|
|
|
class ChecksumConflictError(Exception):
|
|
"""Raised when a Fava write operation fails due to stale checksum (concurrent modification)."""
|
|
pass
|
|
|
|
|
|
class FavaClient:
|
|
"""
|
|
Async client for Fava REST API.
|
|
|
|
Fava runs as a separate web service and provides a JSON API
|
|
for adding entries and querying ledger data.
|
|
|
|
All accounting calculations are performed by Beancount via Fava.
|
|
"""
|
|
|
|
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
|
|
"""
|
|
Initialize Fava client.
|
|
|
|
Args:
|
|
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
|
|
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
|
|
timeout: Request timeout in seconds
|
|
"""
|
|
self.fava_url = fava_url.rstrip('/')
|
|
self.ledger_slug = ledger_slug
|
|
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
|
|
self.timeout = timeout
|
|
|
|
# Concurrency control: Global write lock to serialize all ledger modifications.
|
|
# This prevents race conditions when multiple requests try to write to the
|
|
# Beancount ledger file simultaneously. Without this lock, concurrent writes
|
|
# can cause data loss, duplicate entries, or file corruption.
|
|
#
|
|
# Note: This serializes ALL writes which may become a bottleneck at scale.
|
|
# For higher throughput, consider per-user locking or distributed locking.
|
|
self._write_lock = asyncio.Lock()
|
|
|
|
# Per-user locks for user-specific operations (reduces contention)
|
|
self._user_locks: Dict[str, asyncio.Lock] = {}
|
|
|
|
def get_user_lock(self, user_id: str) -> asyncio.Lock:
|
|
"""
|
|
Get or create a lock for a specific user.
|
|
|
|
This enables per-user locking to reduce contention when multiple users
|
|
are making concurrent requests. User-specific operations should acquire
|
|
this lock in addition to (or instead of) the global write lock.
|
|
|
|
Args:
|
|
user_id: User ID (uses first 8 characters for consistency)
|
|
|
|
Returns:
|
|
asyncio.Lock for this user
|
|
"""
|
|
user_key = user_id[:8]
|
|
if user_key not in self._user_locks:
|
|
self._user_locks[user_key] = asyncio.Lock()
|
|
return self._user_locks[user_key]
|
|
|
|
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Submit a new journal entry to Fava.
|
|
|
|
Args:
|
|
entry: Beancount entry dict (format per Fava API spec)
|
|
Must include:
|
|
- t: "Transaction" (required by Fava)
|
|
- date: "YYYY-MM-DD"
|
|
- flag: "*" (cleared) or "!" (pending)
|
|
- narration: str
|
|
- postings: list of posting dicts
|
|
- payee: str (empty string, not None)
|
|
- tags: list of str
|
|
- links: list of str
|
|
- meta: dict
|
|
|
|
Returns:
|
|
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
|
|
|
|
Raises:
|
|
httpx.HTTPStatusError: If Fava returns an error
|
|
httpx.RequestError: If connection fails
|
|
|
|
Example:
|
|
entry = {
|
|
"t": "Transaction",
|
|
"date": "2025-01-15",
|
|
"flag": "*",
|
|
"payee": "Store",
|
|
"narration": "Purchase",
|
|
"postings": [
|
|
{"account": "Expenses:Food", "amount": "50.00 EUR"},
|
|
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
|
|
],
|
|
"tags": [],
|
|
"links": [],
|
|
"meta": {"user_id": "abc123"}
|
|
}
|
|
result = await fava_client.add_entry(entry)
|
|
|
|
Note:
|
|
This method acquires a global write lock to prevent concurrent
|
|
modifications to the ledger file. All writes are serialized.
|
|
"""
|
|
# Acquire global write lock to serialize ledger modifications
|
|
async with self._write_lock:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.put(
|
|
f"{self.base_url}/add_entries",
|
|
json={"entries": [entry]},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
|
|
return result
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def add_entry_idempotent(
|
|
self,
|
|
entry: Dict[str, Any],
|
|
idempotency_key: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Submit a journal entry with idempotency protection.
|
|
|
|
This method checks if an entry with the given idempotency key (as a Beancount link)
|
|
already exists before inserting. This prevents duplicate entries when the same
|
|
operation is retried (e.g., due to network issues or concurrent requests).
|
|
|
|
The idempotency key is stored as a Beancount link on the entry. Links are part
|
|
of the entry's identity and are indexed by Beancount, making lookup efficient.
|
|
|
|
Args:
|
|
entry: Beancount entry dict (same format as add_entry)
|
|
idempotency_key: Unique key for this operation (e.g., "castle-{uuid}" or "ln-{payment_hash}")
|
|
|
|
Returns:
|
|
Response from Fava if entry was created, or existing entry data if already exists
|
|
|
|
Example:
|
|
# Use payment hash as idempotency key for Lightning payments
|
|
result = await fava.add_entry_idempotent(
|
|
entry=settlement_entry,
|
|
idempotency_key=f"ln-{payment_hash[:16]}"
|
|
)
|
|
|
|
# Use expense ID for expense entries
|
|
result = await fava.add_entry_idempotent(
|
|
entry=expense_entry,
|
|
idempotency_key=f"exp-{expense_id}"
|
|
)
|
|
"""
|
|
from .beancount_format import sanitize_link
|
|
|
|
# Sanitize the idempotency key to ensure it's a valid Beancount link
|
|
safe_key = sanitize_link(idempotency_key)
|
|
|
|
# Check if entry with this link already exists
|
|
try:
|
|
entries = await self.get_journal_entries(days=30) # Check recent entries
|
|
|
|
for existing_entry in entries:
|
|
existing_links = existing_entry.get("links", [])
|
|
if safe_key in existing_links:
|
|
logger.info(f"Entry with idempotency key '{safe_key}' already exists, skipping insert")
|
|
return {
|
|
"data": "Entry already exists (idempotent)",
|
|
"existing": True,
|
|
"entry": existing_entry
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Could not check for existing entry with key '{safe_key}': {e}")
|
|
# Continue anyway - Beancount will error if there's a true duplicate
|
|
|
|
# Add the idempotency key as a link if not already present
|
|
if "links" not in entry:
|
|
entry["links"] = []
|
|
if safe_key not in entry["links"]:
|
|
entry["links"].append(safe_key)
|
|
|
|
# Now add the entry (this will acquire the write lock)
|
|
result = await self.add_entry(entry)
|
|
result["existing"] = False
|
|
return result
|
|
|
|
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get balance for a specific account (excluding pending transactions).
|
|
|
|
Uses sum(weight) for efficient SATS aggregation from price notation.
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
|
|
Returns:
|
|
Dict with:
|
|
- sats: int (balance in satoshis from weight column)
|
|
- fiat: Decimal (balance in fiat currency from number column)
|
|
- fiat_currency: str (currency code, defaults to EUR)
|
|
|
|
Note:
|
|
Excludes pending transactions (flag='!') from balance calculation.
|
|
Only cleared/completed transactions (flag='*') are included.
|
|
|
|
Example:
|
|
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
|
|
# Returns: {"sats": 200000, "fiat": Decimal("150.00"), "fiat_currency": "EUR"}
|
|
"""
|
|
from decimal import Decimal
|
|
|
|
# Use sum(weight) for SATS and sum(number) for fiat
|
|
# Note: BQL doesn't support != operator, so use flag = '*' to exclude pending
|
|
query = f"SELECT sum(number), sum(weight) WHERE account = '{account_name}' AND flag = '*'"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data['data']['rows'] or not data['data']['rows'][0]:
|
|
return {"sats": 0, "fiat": Decimal(0), "fiat_currency": "EUR"}
|
|
|
|
row = data['data']['rows'][0]
|
|
fiat_sum = row[0] if len(row) > 0 else 0
|
|
weight_sum = row[1] if len(row) > 1 else {}
|
|
|
|
# Parse fiat amount
|
|
fiat_amount = Decimal(str(fiat_sum)) if fiat_sum else Decimal(0)
|
|
|
|
# Parse SATS from weight column
|
|
total_sats = 0
|
|
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
|
|
sats_value = weight_sum["SATS"]
|
|
total_sats = int(Decimal(str(sats_value)))
|
|
|
|
return {
|
|
"sats": total_sats,
|
|
"fiat": fiat_amount,
|
|
"fiat_currency": "EUR" # Default, could be extended to detect currency
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get user's balance from castle's perspective.
|
|
|
|
Aggregates:
|
|
- Liabilities:Payable:User-{user_id} (negative = castle owes user)
|
|
- Assets:Receivable:User-{user_id} (positive = user owes castle)
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
{
|
|
"balance": int (sats, positive = user owes castle, negative = castle owes user),
|
|
"fiat_balances": {"EUR": Decimal("100.50")},
|
|
"accounts": [list of account dicts with balances]
|
|
}
|
|
|
|
Note:
|
|
Excludes pending transactions (flag='!') from balance calculation.
|
|
Only cleared/completed transactions (flag='*') are included.
|
|
"""
|
|
# Get all journal entries for this user
|
|
all_entries = await self.get_journal_entries()
|
|
|
|
total_sats = 0
|
|
fiat_balances = {}
|
|
accounts_dict = {} # Track balances per account
|
|
|
|
for entry in all_entries:
|
|
# Skip non-transactions, pending (!), and voided
|
|
if entry.get("t") != "Transaction":
|
|
continue
|
|
if entry.get("flag") == "!":
|
|
continue
|
|
if "voided" in entry.get("tags", []):
|
|
continue
|
|
|
|
# Process postings for this user
|
|
for posting in entry.get("postings", []):
|
|
account_name = posting.get("account", "")
|
|
|
|
# Only process this user's accounts (account names use first 8 chars of user_id)
|
|
if f":User-{user_id[:8]}" not in account_name:
|
|
continue
|
|
if "Payable" not in account_name and "Receivable" not in account_name:
|
|
continue
|
|
|
|
# Parse amount string: can be EUR, USD, or SATS
|
|
amount_str = posting.get("amount", "")
|
|
if not isinstance(amount_str, str) or not amount_str:
|
|
continue
|
|
|
|
import re
|
|
|
|
# Try total price notation: "50.00 EUR @@ 50000 SATS"
|
|
total_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@@\s+(-?\d+)\s+SATS$', amount_str)
|
|
# Try per-unit price notation: "50.00 EUR @ 1000.5 SATS"
|
|
unit_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@\s+([\d.]+)\s+SATS$', amount_str)
|
|
|
|
if total_price_match:
|
|
fiat_amount = Decimal(total_price_match.group(1))
|
|
fiat_currency = total_price_match.group(2)
|
|
sats_amount = int(total_price_match.group(3))
|
|
|
|
if fiat_currency not in fiat_balances:
|
|
fiat_balances[fiat_currency] = Decimal(0)
|
|
fiat_balances[fiat_currency] += fiat_amount
|
|
|
|
total_sats += sats_amount
|
|
if account_name not in accounts_dict:
|
|
accounts_dict[account_name] = {"account": account_name, "sats": 0}
|
|
accounts_dict[account_name]["sats"] += sats_amount
|
|
|
|
elif unit_price_match:
|
|
fiat_amount = Decimal(unit_price_match.group(1))
|
|
fiat_currency = unit_price_match.group(2)
|
|
sats_per_unit = Decimal(unit_price_match.group(3))
|
|
sats_amount = int(fiat_amount * sats_per_unit)
|
|
|
|
if fiat_currency not in fiat_balances:
|
|
fiat_balances[fiat_currency] = Decimal(0)
|
|
fiat_balances[fiat_currency] += fiat_amount
|
|
|
|
total_sats += sats_amount
|
|
if account_name not in accounts_dict:
|
|
accounts_dict[account_name] = {"account": account_name, "sats": 0}
|
|
accounts_dict[account_name]["sats"] += sats_amount
|
|
|
|
# Try simple fiat format: "50.00 EUR" (check metadata for sats)
|
|
elif re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str):
|
|
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
|
|
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
|
|
fiat_amount = Decimal(fiat_match.group(1))
|
|
fiat_currency = fiat_match.group(2)
|
|
|
|
if fiat_currency not in fiat_balances:
|
|
fiat_balances[fiat_currency] = Decimal(0)
|
|
fiat_balances[fiat_currency] += fiat_amount
|
|
|
|
# Also track SATS equivalent from metadata if available (legacy)
|
|
posting_meta = posting.get("meta", {})
|
|
sats_equiv = posting_meta.get("sats-equivalent")
|
|
if sats_equiv:
|
|
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
|
|
total_sats += sats_amount
|
|
if account_name not in accounts_dict:
|
|
accounts_dict[account_name] = {"account": account_name, "sats": 0}
|
|
accounts_dict[account_name]["sats"] += sats_amount
|
|
|
|
else:
|
|
# Old format: SATS with cost/price notation - extract SATS amount
|
|
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
|
|
if sats_match:
|
|
sats_amount = int(sats_match.group(1))
|
|
total_sats += sats_amount
|
|
|
|
# Track per account
|
|
if account_name not in accounts_dict:
|
|
accounts_dict[account_name] = {"account": account_name, "sats": 0}
|
|
accounts_dict[account_name]["sats"] += sats_amount
|
|
|
|
# Try to extract fiat from metadata or cost syntax (backward compatibility)
|
|
posting_meta = posting.get("meta", {})
|
|
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
|
|
fiat_currency_meta = posting_meta.get("fiat-currency")
|
|
|
|
if fiat_amount_total_str and fiat_currency_meta:
|
|
# Use exact total from metadata
|
|
fiat_total = Decimal(fiat_amount_total_str)
|
|
fiat_currency = fiat_currency_meta
|
|
|
|
if fiat_currency not in fiat_balances:
|
|
fiat_balances[fiat_currency] = Decimal(0)
|
|
|
|
# Apply the same sign as the SATS amount
|
|
if sats_match:
|
|
sats_amount_for_sign = int(sats_match.group(1))
|
|
if sats_amount_for_sign < 0:
|
|
fiat_total = -fiat_total
|
|
|
|
fiat_balances[fiat_currency] += fiat_total
|
|
|
|
logger.info(f"User {user_id[:8]} balance: {total_sats} sats, fiat: {dict(fiat_balances)}")
|
|
return {
|
|
"balance": total_sats,
|
|
"fiat_balances": fiat_balances,
|
|
"accounts": list(accounts_dict.values())
|
|
}
|
|
|
|
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get balances for all users (admin view).
|
|
|
|
Returns:
|
|
[
|
|
{
|
|
"user_id": "abc123",
|
|
"balance": 100000,
|
|
"fiat_balances": {"EUR": Decimal("100.50")},
|
|
"accounts": [...]
|
|
},
|
|
...
|
|
]
|
|
|
|
Note:
|
|
Excludes pending transactions (flag='!') and voided (tag #voided) from balance calculation.
|
|
Only cleared/completed transactions (flag='*') are included.
|
|
"""
|
|
# Get all journal entries and calculate balances from postings
|
|
all_entries = await self.get_journal_entries()
|
|
|
|
# Group by user_id
|
|
user_data = {}
|
|
|
|
for entry in all_entries:
|
|
# Skip non-transactions, pending (!), and voided
|
|
if entry.get("t") != "Transaction":
|
|
continue
|
|
if entry.get("flag") == "!":
|
|
continue
|
|
if "voided" in entry.get("tags", []):
|
|
continue
|
|
|
|
# Process postings
|
|
for posting in entry.get("postings", []):
|
|
account_name = posting.get("account", "")
|
|
|
|
# Only process user accounts (Payable or Receivable)
|
|
if ":User-" not in account_name:
|
|
continue
|
|
if "Payable" not in account_name and "Receivable" not in account_name:
|
|
continue
|
|
|
|
# Extract user_id from account name
|
|
user_id = account_name.split(":User-")[1]
|
|
|
|
if user_id not in user_data:
|
|
user_data[user_id] = {
|
|
"user_id": user_id,
|
|
"balance": 0,
|
|
"fiat_balances": {},
|
|
"accounts": []
|
|
}
|
|
|
|
# Parse amount string: can be EUR/USD directly (new format) or "SATS {EUR}" (old format)
|
|
amount_str = posting.get("amount", "")
|
|
if not isinstance(amount_str, str) or not amount_str:
|
|
continue
|
|
|
|
import re
|
|
|
|
# Try total price notation: "50.00 EUR @@ 50000 SATS"
|
|
total_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@@\s+(-?\d+)\s+SATS$', amount_str)
|
|
# Try per-unit price notation: "50.00 EUR @ 1000.5 SATS"
|
|
unit_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@\s+([\d.]+)\s+SATS$', amount_str)
|
|
|
|
if total_price_match:
|
|
fiat_amount = Decimal(total_price_match.group(1))
|
|
fiat_currency = total_price_match.group(2)
|
|
sats_amount = int(total_price_match.group(3))
|
|
|
|
if fiat_currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
|
|
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
elif unit_price_match:
|
|
fiat_amount = Decimal(unit_price_match.group(1))
|
|
fiat_currency = unit_price_match.group(2)
|
|
sats_per_unit = Decimal(unit_price_match.group(3))
|
|
sats_amount = int(fiat_amount * sats_per_unit)
|
|
|
|
if fiat_currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
|
|
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
# Try simple fiat format: "50.00 EUR" (check metadata for sats)
|
|
elif re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str):
|
|
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
|
|
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
|
|
fiat_amount = Decimal(fiat_match.group(1))
|
|
fiat_currency = fiat_match.group(2)
|
|
|
|
if fiat_currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
|
|
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
|
|
|
|
# Also track SATS equivalent from metadata if available (legacy)
|
|
posting_meta = posting.get("meta", {})
|
|
sats_equiv = posting_meta.get("sats-equivalent")
|
|
if sats_equiv:
|
|
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
else:
|
|
# Old format: SATS with cost/price notation
|
|
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
|
|
if sats_match:
|
|
sats_amount = int(sats_match.group(1))
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
# Extract fiat from cost syntax or metadata (backward compatibility)
|
|
posting_meta = posting.get("meta", {})
|
|
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
|
|
fiat_currency_meta = posting_meta.get("fiat-currency")
|
|
|
|
if fiat_amount_total_str and fiat_currency_meta:
|
|
fiat_total = Decimal(fiat_amount_total_str)
|
|
fiat_currency = fiat_currency_meta
|
|
|
|
if fiat_currency not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
|
|
|
|
# Apply the same sign as the SATS amount
|
|
if sats_match:
|
|
sats_amount_for_sign = int(sats_match.group(1))
|
|
if sats_amount_for_sign < 0:
|
|
fiat_total = -fiat_total
|
|
|
|
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_total
|
|
|
|
return list(user_data.values())
|
|
|
|
async def check_fava_health(self) -> bool:
|
|
"""
|
|
Check if Fava is running and accessible.
|
|
|
|
Returns:
|
|
True if Fava responds, False otherwise
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=2.0) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/changed"
|
|
)
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
logger.warning(f"Fava health check failed: {e}")
|
|
return False
|
|
|
|
async def query_transactions(
|
|
self,
|
|
account_pattern: Optional[str] = None,
|
|
limit: int = 100,
|
|
include_pending: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query transactions from Fava/Beancount.
|
|
|
|
Args:
|
|
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
|
|
limit: Maximum number of transactions to return
|
|
include_pending: Include pending transactions (flag='!')
|
|
|
|
Returns:
|
|
List of transaction dictionaries with date, description, postings, etc.
|
|
|
|
Example:
|
|
# All transactions
|
|
txns = await fava.query_transactions()
|
|
|
|
# User's transactions
|
|
txns = await fava.query_transactions(account_pattern="User-abc123")
|
|
|
|
# Account transactions
|
|
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
|
|
"""
|
|
# Build Beancount query
|
|
if account_pattern:
|
|
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
|
|
else:
|
|
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
|
|
data = result.get("data", {})
|
|
rows = data.get("rows", [])
|
|
types = data.get("types", [])
|
|
|
|
# Build column name mapping
|
|
column_names = [t.get("name") for t in types]
|
|
|
|
# Transform Fava's query result to transaction list
|
|
transactions = []
|
|
for row in rows:
|
|
# Rows are arrays, convert to dict using column names
|
|
if isinstance(row, list) and len(row) == len(column_names):
|
|
txn = dict(zip(column_names, row))
|
|
|
|
# Filter by flag if needed
|
|
flag = txn.get("flag", "*")
|
|
if not include_pending and flag == "!":
|
|
continue
|
|
|
|
transactions.append(txn)
|
|
elif isinstance(row, dict):
|
|
# Already a dict (shouldn't happen with BQL, but handle it)
|
|
flag = row.get("flag", "*")
|
|
if not include_pending and flag == "!":
|
|
continue
|
|
transactions.append(row)
|
|
|
|
return transactions[:limit]
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def query_bql(self, query_string: str) -> Dict[str, Any]:
|
|
"""
|
|
Execute arbitrary Beancount Query Language (BQL) query.
|
|
|
|
This is a general-purpose method for executing BQL queries against Fava/Beancount.
|
|
Use this for efficient aggregations, filtering, and data retrieval.
|
|
|
|
⚠️ LIMITATION: BQL can only query position amounts and transaction-level data.
|
|
It CANNOT access posting metadata (like 'sats-equivalent'). For Castle's current
|
|
ledger format where SATS are stored in metadata, manual aggregation is required.
|
|
|
|
See: docs/BQL-BALANCE-QUERIES.md for detailed analysis and test results.
|
|
|
|
FUTURE CONSIDERATION: If Castle's ledger format changes to use SATS as position
|
|
amounts (instead of metadata), BQL could provide significant performance benefits.
|
|
|
|
Args:
|
|
query_string: BQL query (e.g., "SELECT account, sum(position) WHERE account ~ 'User-abc'")
|
|
|
|
Returns:
|
|
{
|
|
"rows": [[col1, col2, ...], ...],
|
|
"types": [{"name": "col1", "type": "str"}, ...],
|
|
"column_names": ["col1", "col2", ...]
|
|
}
|
|
|
|
Example:
|
|
result = await fava.query_bql("SELECT account, sum(position) WHERE account ~ 'User-abc'")
|
|
for row in result["rows"]:
|
|
account, balance = row
|
|
print(f"{account}: {balance}")
|
|
|
|
See:
|
|
https://beancount.github.io/docs/beancount_query_language.html
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/query",
|
|
params={"query_string": query_string}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# Fava returns: {"data": {"rows": [...], "types": [...]}}
|
|
data = result.get("data", {})
|
|
rows = data.get("rows", [])
|
|
types = data.get("types", [])
|
|
column_names = [t.get("name") for t in types]
|
|
|
|
return {
|
|
"rows": rows,
|
|
"types": types,
|
|
"column_names": column_names
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"BQL query error: {e.response.status_code} - {e.response.text}")
|
|
logger.error(f"Query was: {query_string}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_user_balance_bql(self, user_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get user balance using BQL with price notation (efficient server-side aggregation).
|
|
|
|
Uses sum(weight) to aggregate SATS from @@ price notation.
|
|
This provides 5-10x performance improvement over manual aggregation.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
{
|
|
"balance": int (sats from weight column),
|
|
"fiat_balances": {"EUR": Decimal("100.50"), ...},
|
|
"accounts": [{"account": "...", "sats": 150000, "eur": Decimal("100.50")}, ...]
|
|
}
|
|
|
|
Example:
|
|
balance = await fava.get_user_balance_bql("af983632")
|
|
print(f"Balance: {balance['balance']} sats")
|
|
"""
|
|
from decimal import Decimal
|
|
|
|
user_id_prefix = user_id[:8]
|
|
|
|
# BQL query using sum(weight) for SATS aggregation
|
|
# weight column returns the @@ price value (SATS) from price notation
|
|
query = f"""
|
|
SELECT account, sum(number), sum(weight)
|
|
WHERE account ~ ':User-{user_id_prefix}'
|
|
AND (account ~ 'Payable' OR account ~ 'Receivable')
|
|
AND flag = '*'
|
|
GROUP BY account
|
|
"""
|
|
|
|
result = await self.query_bql(query)
|
|
|
|
total_sats = 0
|
|
fiat_balances = {}
|
|
accounts = []
|
|
|
|
for row in result["rows"]:
|
|
account_name, fiat_sum, weight_sum = row
|
|
|
|
# Parse fiat amount (sum of EUR/USD amounts)
|
|
fiat_amount = Decimal(str(fiat_sum)) if fiat_sum else Decimal(0)
|
|
|
|
# Parse SATS from weight column
|
|
# weight_sum is an Inventory dict like {"SATS": -10442635.00}
|
|
sats_amount = 0
|
|
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
|
|
sats_value = weight_sum["SATS"]
|
|
sats_amount = int(Decimal(str(sats_value)))
|
|
|
|
total_sats += sats_amount
|
|
|
|
# Aggregate fiat (assume EUR for now, could be extended)
|
|
if fiat_amount != 0:
|
|
if "EUR" not in fiat_balances:
|
|
fiat_balances["EUR"] = Decimal(0)
|
|
fiat_balances["EUR"] += fiat_amount
|
|
|
|
accounts.append({
|
|
"account": account_name,
|
|
"sats": sats_amount,
|
|
"eur": fiat_amount
|
|
})
|
|
|
|
logger.info(f"User {user_id[:8]} balance (BQL): {total_sats} sats, fiat: {dict(fiat_balances)}")
|
|
|
|
return {
|
|
"balance": total_sats,
|
|
"fiat_balances": fiat_balances,
|
|
"accounts": accounts
|
|
}
|
|
|
|
async def get_all_user_balances_bql(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get balances for all users using BQL with price notation (efficient admin view).
|
|
|
|
Uses sum(weight) to aggregate SATS from @@ price notation in a single query.
|
|
This provides significant performance benefits for admin views.
|
|
|
|
Returns:
|
|
[
|
|
{
|
|
"user_id": "abc123",
|
|
"balance": 100000,
|
|
"fiat_balances": {"EUR": Decimal("100.50")},
|
|
"accounts": [{"account": "...", "sats": 150000, "eur": Decimal("100.50")}, ...]
|
|
},
|
|
...
|
|
]
|
|
|
|
Example:
|
|
all_balances = await fava.get_all_user_balances_bql()
|
|
for user in all_balances:
|
|
print(f"{user['user_id']}: {user['balance']} sats")
|
|
"""
|
|
from decimal import Decimal
|
|
|
|
# BQL query using sum(weight) for SATS aggregation
|
|
query = """
|
|
SELECT account, sum(number), sum(weight)
|
|
WHERE (account ~ 'Payable:User-' OR account ~ 'Receivable:User-')
|
|
AND flag = '*'
|
|
GROUP BY account
|
|
"""
|
|
|
|
result = await self.query_bql(query)
|
|
|
|
# Group by user_id
|
|
user_data = {}
|
|
|
|
for row in result["rows"]:
|
|
account_name, fiat_sum, weight_sum = row
|
|
|
|
# Extract user_id from account name
|
|
if ":User-" not in account_name:
|
|
continue
|
|
|
|
user_id_with_prefix = account_name.split(":User-")[1]
|
|
user_id = user_id_with_prefix[:8]
|
|
|
|
if user_id not in user_data:
|
|
user_data[user_id] = {
|
|
"user_id": user_id,
|
|
"balance": 0,
|
|
"fiat_balances": {},
|
|
"accounts": []
|
|
}
|
|
|
|
# Parse fiat amount
|
|
fiat_amount = Decimal(str(fiat_sum)) if fiat_sum else Decimal(0)
|
|
|
|
# Parse SATS from weight column
|
|
sats_amount = 0
|
|
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
|
|
sats_value = weight_sum["SATS"]
|
|
sats_amount = int(Decimal(str(sats_value)))
|
|
|
|
user_data[user_id]["balance"] += sats_amount
|
|
|
|
# Aggregate fiat
|
|
if fiat_amount != 0:
|
|
if "EUR" not in user_data[user_id]["fiat_balances"]:
|
|
user_data[user_id]["fiat_balances"]["EUR"] = Decimal(0)
|
|
user_data[user_id]["fiat_balances"]["EUR"] += fiat_amount
|
|
|
|
user_data[user_id]["accounts"].append({
|
|
"account": account_name,
|
|
"sats": sats_amount,
|
|
"eur": fiat_amount
|
|
})
|
|
|
|
logger.info(f"Fetched balances for {len(user_data)} users (BQL)")
|
|
|
|
return list(user_data.values())
|
|
|
|
async def get_expense_summary_bql(
|
|
self,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
group_by: str = "account"
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get expense summary using BQL, grouped by account or date.
|
|
|
|
Uses sum(weight) for efficient SATS aggregation from price notation.
|
|
|
|
Args:
|
|
start_date: ISO format date string (YYYY-MM-DD), optional
|
|
end_date: ISO format date string (YYYY-MM-DD), optional
|
|
group_by: "account" (default) or "month" for grouping
|
|
|
|
Returns:
|
|
List of expense summaries:
|
|
[
|
|
{"account": "Expenses:Supplies:Food", "fiat": 500.00, "sats": 550000},
|
|
{"account": "Expenses:Supplies:Kitchen", "fiat": 200.00, "sats": 220000},
|
|
...
|
|
]
|
|
Or if group_by="month":
|
|
[
|
|
{"month": "2025-12", "fiat": 700.00, "sats": 770000},
|
|
...
|
|
]
|
|
"""
|
|
from decimal import Decimal
|
|
|
|
# Build date filter
|
|
date_filter = ""
|
|
if start_date:
|
|
date_filter += f" AND date >= {start_date}"
|
|
if end_date:
|
|
date_filter += f" AND date <= {end_date}"
|
|
|
|
if group_by == "month":
|
|
query = f"""
|
|
SELECT year, month, sum(number), sum(weight)
|
|
WHERE account ~ 'Expenses:'
|
|
AND flag = '*'
|
|
{date_filter}
|
|
GROUP BY year, month
|
|
ORDER BY year DESC, month DESC
|
|
"""
|
|
else:
|
|
query = f"""
|
|
SELECT account, sum(number), sum(weight)
|
|
WHERE account ~ 'Expenses:'
|
|
AND flag = '*'
|
|
{date_filter}
|
|
GROUP BY account
|
|
ORDER BY sum(weight)
|
|
"""
|
|
|
|
try:
|
|
result = await self.query_bql(query)
|
|
|
|
summaries = []
|
|
for row in result["rows"]:
|
|
if group_by == "month":
|
|
year, month, fiat_sum, weight_sum = row
|
|
# Parse SATS from weight
|
|
sats_amount = 0
|
|
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
|
|
sats_amount = abs(int(Decimal(str(weight_sum["SATS"]))))
|
|
|
|
summaries.append({
|
|
"month": f"{year}-{month:02d}",
|
|
"fiat": abs(float(fiat_sum)) if fiat_sum else 0.0,
|
|
"fiat_currency": "EUR",
|
|
"sats": sats_amount
|
|
})
|
|
else:
|
|
account, fiat_sum, weight_sum = row
|
|
# Parse SATS from weight
|
|
sats_amount = 0
|
|
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
|
|
sats_amount = abs(int(Decimal(str(weight_sum["SATS"]))))
|
|
|
|
summaries.append({
|
|
"account": account,
|
|
"fiat": abs(float(fiat_sum)) if fiat_sum else 0.0,
|
|
"fiat_currency": "EUR",
|
|
"sats": sats_amount
|
|
})
|
|
|
|
logger.info(f"BQL: Expense summary returned {len(summaries)} items")
|
|
return summaries
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting expense summary via BQL: {e}")
|
|
raise
|
|
|
|
async def get_user_contributions_bql(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get total expense contributions per user using BQL.
|
|
|
|
Uses sum(weight) to aggregate all expenses each user has submitted
|
|
that created liabilities (castle owes user).
|
|
|
|
Returns:
|
|
List of user contribution summaries:
|
|
[
|
|
{
|
|
"user_id": "cfe378b3",
|
|
"total_fiat": 1500.00,
|
|
"total_sats": 1650000,
|
|
"entry_count": 25
|
|
},
|
|
...
|
|
]
|
|
"""
|
|
from decimal import Decimal
|
|
|
|
# Query all expense entries that created payables, grouped by user
|
|
query = """
|
|
SELECT account, sum(number), sum(weight), count(number)
|
|
WHERE account ~ 'Liabilities:Payable:User-'
|
|
AND 'expense-entry' IN tags
|
|
AND flag = '*'
|
|
GROUP BY account
|
|
ORDER BY sum(weight)
|
|
"""
|
|
|
|
try:
|
|
result = await self.query_bql(query)
|
|
|
|
contributions = []
|
|
for row in result["rows"]:
|
|
account, fiat_sum, weight_sum, count = row
|
|
|
|
# Extract user_id from account name
|
|
if ":User-" not in account:
|
|
continue
|
|
user_id = account.split(":User-")[1][:8]
|
|
|
|
# Parse SATS from weight (negative for liabilities)
|
|
sats_amount = 0
|
|
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
|
|
sats_amount = abs(int(Decimal(str(weight_sum["SATS"]))))
|
|
|
|
contributions.append({
|
|
"user_id": user_id,
|
|
"total_fiat": abs(float(fiat_sum)) if fiat_sum else 0.0,
|
|
"fiat_currency": "EUR",
|
|
"total_sats": sats_amount,
|
|
"entry_count": int(count) if count else 0
|
|
})
|
|
|
|
# Sort by total_sats descending (highest contributors first)
|
|
contributions.sort(key=lambda x: x["total_sats"], reverse=True)
|
|
|
|
logger.info(f"BQL: Found contributions from {len(contributions)} users")
|
|
return contributions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user contributions via BQL: {e}")
|
|
raise
|
|
|
|
async def get_account_transactions(
|
|
self,
|
|
account_name: str,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all transactions affecting a specific account.
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
limit: Maximum number of transactions
|
|
|
|
Returns:
|
|
List of transactions affecting this account
|
|
"""
|
|
return await self.query_transactions(
|
|
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
|
|
limit=limit
|
|
)
|
|
|
|
async def get_user_transactions(
|
|
self,
|
|
user_id: str,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all transactions affecting a user's accounts.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
limit: Maximum number of transactions
|
|
|
|
Returns:
|
|
List of transactions affecting user's accounts
|
|
"""
|
|
return await self.query_transactions(
|
|
account_pattern=f"User-{user_id[:8]}",
|
|
limit=limit
|
|
)
|
|
|
|
async def get_all_accounts(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all accounts from Beancount/Fava using BQL query.
|
|
|
|
Returns:
|
|
List of account dictionaries:
|
|
[
|
|
{"account": "Assets:Cash", "meta": {...}},
|
|
{"account": "Expenses:Food", "meta": {...}},
|
|
...
|
|
]
|
|
|
|
Example:
|
|
accounts = await fava.get_all_accounts()
|
|
for acc in accounts:
|
|
print(acc["account"]) # "Assets:Cash"
|
|
"""
|
|
try:
|
|
# Use BQL to get all unique accounts
|
|
query = "SELECT DISTINCT account"
|
|
result = await self.query_bql(query)
|
|
|
|
# Convert BQL result to expected format
|
|
accounts = []
|
|
for row in result["rows"]:
|
|
account_name = row[0] if isinstance(row, list) else row.get("account")
|
|
if account_name:
|
|
accounts.append({
|
|
"account": account_name,
|
|
"meta": {} # BQL doesn't return metadata easily
|
|
})
|
|
|
|
logger.debug(f"Fava returned {len(accounts)} accounts via BQL")
|
|
return accounts
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch accounts via BQL: {e}")
|
|
raise
|
|
|
|
async def get_journal_entries(
|
|
self,
|
|
days: int = None,
|
|
start_date: str = None,
|
|
end_date: str = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get journal entries from Fava (with entry hashes), optionally filtered by date.
|
|
|
|
Uses Fava's server-side 'time' parameter for efficient date filtering,
|
|
avoiding the need to fetch all entries and filter in Python.
|
|
|
|
Args:
|
|
days: If provided, only return entries from the last N days.
|
|
If None, returns all entries (default behavior).
|
|
start_date: ISO format date string (YYYY-MM-DD). If provided with end_date,
|
|
filters entries between start_date and end_date (inclusive).
|
|
end_date: ISO format date string (YYYY-MM-DD). If provided with start_date,
|
|
filters entries between start_date and end_date (inclusive).
|
|
|
|
Note:
|
|
If both days and start_date/end_date are provided, start_date/end_date takes precedence.
|
|
|
|
Returns:
|
|
List of entries (transactions, opens, closes, etc.) with entry_hash field.
|
|
|
|
Example:
|
|
# Get all entries
|
|
entries = await fava.get_journal_entries()
|
|
|
|
# Get only last 30 days
|
|
recent = await fava.get_journal_entries(days=30)
|
|
|
|
# Get entries in custom date range
|
|
custom = await fava.get_journal_entries(start_date="2024-01-01", end_date="2024-01-31")
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
|
|
try:
|
|
# Build query parameters for server-side filtering
|
|
params = {}
|
|
|
|
# Use date range if both start_date and end_date are provided
|
|
if start_date and end_date:
|
|
# Fava uses "YYYY-MM-DD - YYYY-MM-DD" format for time ranges
|
|
params["time"] = f"{start_date} - {end_date}"
|
|
logger.info(f"Querying journal with date range: {start_date} to {end_date}")
|
|
|
|
# Fall back to days filter if no date range provided
|
|
elif days is not None:
|
|
cutoff_date = (datetime.now() - timedelta(days=days)).date()
|
|
today = datetime.now().date()
|
|
params["time"] = f"{cutoff_date.isoformat()} - {today.isoformat()}"
|
|
logger.info(f"Querying journal for last {days} days (from {cutoff_date})")
|
|
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(f"{self.base_url}/journal", params=params)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
entries = result.get("data", [])
|
|
|
|
if params:
|
|
logger.info(f"Fava /journal returned {len(entries)} entries (filtered)")
|
|
else:
|
|
logger.info(f"Fava /journal returned {len(entries)} entries (all)")
|
|
|
|
return entries
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]:
|
|
"""
|
|
Get entry source text and sha256sum for editing.
|
|
|
|
Uses /context endpoint which returns the editable source slice.
|
|
Note: Fava's API uses get_context for reading, put_source_slice for writing.
|
|
|
|
Args:
|
|
entry_hash: Entry hash from get_journal_entries()
|
|
|
|
Returns:
|
|
{
|
|
"slice": "2025-01-15 ! \"Description\"...", # Beancount source text
|
|
"sha256sum": "abc123...", # For concurrency control
|
|
}
|
|
|
|
Example:
|
|
context = await fava.get_entry_context("abc123")
|
|
source = context["slice"]
|
|
sha256sum = context["sha256sum"]
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/context",
|
|
params={"entry_hash": entry_hash}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("data", {})
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str:
|
|
"""
|
|
Update an entry's source text (e.g., change flag from ! to *).
|
|
|
|
Args:
|
|
entry_hash: Entry hash
|
|
new_source: Modified Beancount source text
|
|
sha256sum: Current sha256sum from get_entry_context() for concurrency control
|
|
|
|
Returns:
|
|
New sha256sum after update
|
|
|
|
Note:
|
|
This method acquires a global write lock to prevent concurrent
|
|
modifications to the ledger file. All writes are serialized.
|
|
|
|
Example:
|
|
# Get context
|
|
context = await fava.get_entry_context("abc123")
|
|
source = context["slice"]
|
|
sha256 = context["sha256sum"]
|
|
|
|
# Change flag
|
|
new_source = source.replace("2025-01-15 !", "2025-01-15 *")
|
|
|
|
# Update
|
|
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
|
|
"""
|
|
# Acquire global write lock to serialize ledger modifications
|
|
async with self._write_lock:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.put(
|
|
f"{self.base_url}/source_slice",
|
|
json={
|
|
"entry_hash": entry_hash,
|
|
"source": new_source,
|
|
"sha256sum": sha256sum
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("data", "")
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
|
|
"""
|
|
Delete an entry from the Beancount file.
|
|
|
|
Args:
|
|
entry_hash: Entry hash
|
|
sha256sum: Current sha256sum for concurrency control
|
|
|
|
Returns:
|
|
Success message
|
|
|
|
Note:
|
|
This method acquires a global write lock to prevent concurrent
|
|
modifications to the ledger file. All writes are serialized.
|
|
|
|
Example:
|
|
context = await fava.get_entry_context("abc123")
|
|
await fava.delete_entry("abc123", context["sha256sum"])
|
|
"""
|
|
# Acquire global write lock to serialize ledger modifications
|
|
async with self._write_lock:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.delete(
|
|
f"{self.base_url}/source_slice",
|
|
params={
|
|
"entry_hash": entry_hash,
|
|
"sha256sum": sha256sum
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("data", "")
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
async def add_account(
|
|
self,
|
|
account_name: str,
|
|
currencies: list[str],
|
|
opening_date: Optional[date] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
max_retries: int = 3
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Add an account to the Beancount ledger via an Open directive.
|
|
|
|
NOTE: Fava's /api/add_entries endpoint does NOT support Open directives.
|
|
This method uses /api/source to directly edit the Beancount file.
|
|
|
|
This method implements optimistic concurrency control with retry logic:
|
|
- Acquires a global write lock before modifying the ledger
|
|
- Uses SHA256 checksum to detect concurrent modifications
|
|
- Retries with exponential backoff on checksum conflicts
|
|
- Re-checks if account was created by concurrent request before retrying
|
|
|
|
Args:
|
|
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
|
|
currencies: List of currencies for this account (e.g., ["EUR", "SATS"])
|
|
opening_date: Date to open the account (defaults to today)
|
|
metadata: Optional metadata for the account
|
|
max_retries: Maximum number of retry attempts on checksum conflict (default: 3)
|
|
|
|
Returns:
|
|
Response from Fava ({"data": "new_sha256sum", "mtime": "..."})
|
|
|
|
Raises:
|
|
ChecksumConflictError: If all retry attempts fail due to concurrent modifications
|
|
|
|
Example:
|
|
# Add a user's receivable account
|
|
result = await fava.add_account(
|
|
account_name="Assets:Receivable:User-abc123",
|
|
currencies=["EUR", "SATS", "USD"],
|
|
metadata={"user_id": "abc123", "description": "User receivables"}
|
|
)
|
|
|
|
# Add a user's payable account
|
|
result = await fava.add_account(
|
|
account_name="Liabilities:Payable:User-abc123",
|
|
currencies=["EUR", "SATS"]
|
|
)
|
|
"""
|
|
from datetime import date as date_type
|
|
|
|
if opening_date is None:
|
|
opening_date = date_type.today()
|
|
|
|
last_error = None
|
|
|
|
for attempt in range(max_retries):
|
|
# Acquire global write lock to serialize ledger modifications
|
|
async with self._write_lock:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
# Step 1: Get the main Beancount file path from Fava
|
|
options_response = await client.get(f"{self.base_url}/options")
|
|
options_response.raise_for_status()
|
|
options_data = options_response.json()["data"]
|
|
file_path = options_data["beancount_options"]["filename"]
|
|
|
|
logger.debug(f"Fava main file: {file_path}")
|
|
|
|
# Step 2: Get current source file (fresh read on each attempt)
|
|
response = await client.get(
|
|
f"{self.base_url}/source",
|
|
params={"filename": file_path}
|
|
)
|
|
response.raise_for_status()
|
|
source_data = response.json()["data"]
|
|
|
|
sha256sum = source_data["sha256sum"]
|
|
source = source_data["source"]
|
|
|
|
# Step 3: Check if account already exists (may have been created by concurrent request)
|
|
if f"open {account_name}" in source:
|
|
logger.info(f"Account {account_name} already exists in Beancount file")
|
|
return {"data": sha256sum, "mtime": source_data.get("mtime", "")}
|
|
|
|
# Step 4: Find insertion point (after last Open directive AND its metadata)
|
|
lines = source.split('\n')
|
|
insert_index = 0
|
|
for i, line in enumerate(lines):
|
|
if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line:
|
|
# Found an Open directive, now skip over any metadata lines
|
|
insert_index = i + 1
|
|
# Skip metadata lines (lines starting with whitespace)
|
|
while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip():
|
|
insert_index += 1
|
|
|
|
# Step 5: Format Open directive as Beancount text
|
|
currencies_str = ", ".join(currencies)
|
|
open_lines = [
|
|
"",
|
|
f"{opening_date.isoformat()} open {account_name} {currencies_str}"
|
|
]
|
|
|
|
# Add metadata if provided
|
|
if metadata:
|
|
for key, value in metadata.items():
|
|
# Format metadata with proper indentation
|
|
if isinstance(value, str):
|
|
open_lines.append(f' {key}: "{value}"')
|
|
else:
|
|
open_lines.append(f' {key}: {value}')
|
|
|
|
# Step 6: Insert into source
|
|
for i, line in enumerate(open_lines):
|
|
lines.insert(insert_index + i, line)
|
|
|
|
new_source = '\n'.join(lines)
|
|
|
|
# Step 7: Update source file via PUT /api/source
|
|
update_payload = {
|
|
"file_path": file_path,
|
|
"source": new_source,
|
|
"sha256sum": sha256sum
|
|
}
|
|
|
|
response = await client.put(
|
|
f"{self.base_url}/source",
|
|
json=update_payload,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}")
|
|
return result
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
# Check for checksum conflict (HTTP 412 Precondition Failed or similar)
|
|
if e.response.status_code in (409, 412):
|
|
last_error = ChecksumConflictError(
|
|
f"Checksum conflict on attempt {attempt + 1}/{max_retries}: {e.response.text}"
|
|
)
|
|
logger.warning(
|
|
f"Checksum conflict adding account {account_name} "
|
|
f"(attempt {attempt + 1}/{max_retries}), retrying..."
|
|
)
|
|
# Continue to retry logic below
|
|
else:
|
|
logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Fava connection error: {e}")
|
|
raise
|
|
|
|
# If we get here due to checksum conflict, wait with exponential backoff before retry
|
|
if attempt < max_retries - 1:
|
|
backoff_time = 0.1 * (2 ** attempt) # 0.1s, 0.2s, 0.4s
|
|
logger.info(f"Waiting {backoff_time}s before retry...")
|
|
await asyncio.sleep(backoff_time)
|
|
|
|
# All retries exhausted
|
|
logger.error(f"Failed to add account {account_name} after {max_retries} attempts due to concurrent modifications")
|
|
raise last_error or ChecksumConflictError(f"Failed to add account after {max_retries} attempts")
|
|
|
|
async def get_unsettled_entries_bql(
|
|
self,
|
|
user_id: str,
|
|
entry_type: str = "expense"
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get unsettled expense or receivable entries for a user using BQL.
|
|
|
|
Uses BQL queries to efficiently find entries with exp-{id} or rcv-{id}
|
|
links that don't have a corresponding settlement entry.
|
|
|
|
This is significantly more efficient than the legacy method as it:
|
|
- Queries only relevant entries (not ALL journal entries)
|
|
- Uses weight column for SATS amounts (no string parsing)
|
|
- Filters by tags and account patterns in the database
|
|
|
|
Args:
|
|
user_id: User ID (first 8 characters used for account matching)
|
|
entry_type: "expense" (payables - castle owes user) or
|
|
"receivable" (user owes castle)
|
|
|
|
Returns:
|
|
List of unsettled entries with:
|
|
- link: The entry's unique link (exp-xxx or rcv-xxx)
|
|
- date: Entry date
|
|
- narration: Description
|
|
- fiat_amount: Amount in fiat currency (absolute value)
|
|
- fiat_currency: Currency code
|
|
- sats_amount: Amount in SATS (absolute value, from weight)
|
|
- entry_hash: For potential updates
|
|
- flag: Transaction flag
|
|
"""
|
|
from decimal import Decimal
|
|
|
|
user_short = user_id[:8]
|
|
link_prefix = "exp-" if entry_type == "expense" else "rcv-"
|
|
entry_tag = "expense-entry" if entry_type == "expense" else "receivable-entry"
|
|
|
|
# Determine account pattern based on entry type
|
|
if entry_type == "expense":
|
|
account_pattern = f"Liabilities:Payable:User-{user_short}"
|
|
else:
|
|
account_pattern = f"Assets:Receivable:User-{user_short}"
|
|
|
|
try:
|
|
# Query 1: Get all original expense/receivable entries for this user
|
|
# These are entries with the expense-entry or receivable-entry tag
|
|
original_query = f"""
|
|
SELECT date, narration, account, number, weight, links,
|
|
any_meta('entry-id') as entry_id
|
|
WHERE account ~ '{account_pattern}'
|
|
AND '{entry_tag}' IN tags
|
|
AND flag = '*'
|
|
ORDER BY date
|
|
"""
|
|
|
|
original_result = await self.query_bql(original_query)
|
|
|
|
# Query 2: Get all settlement entries for this user
|
|
# These are entries with the settlement tag
|
|
settlement_query = f"""
|
|
SELECT links
|
|
WHERE account ~ '{account_pattern}'
|
|
AND 'settlement' IN tags
|
|
AND flag = '*'
|
|
"""
|
|
|
|
settlement_result = await self.query_bql(settlement_query)
|
|
|
|
# Build set of settled links from settlement entries
|
|
settled_links: set = set()
|
|
for row in settlement_result["rows"]:
|
|
links = row[0] if row else []
|
|
if isinstance(links, list):
|
|
for link in links:
|
|
if link.startswith(link_prefix):
|
|
settled_links.add(link)
|
|
|
|
# Process original entries and find unsettled ones
|
|
entries_by_link: Dict[str, Dict[str, Any]] = {}
|
|
|
|
for row in original_result["rows"]:
|
|
date_val, narration, account, number, weight, links, entry_id = row
|
|
|
|
# Skip if no links
|
|
if not links or not isinstance(links, list):
|
|
continue
|
|
|
|
# Find the exp-/rcv- link
|
|
entry_link = None
|
|
for link in links:
|
|
if link.startswith(link_prefix):
|
|
entry_link = link
|
|
break
|
|
|
|
if not entry_link:
|
|
continue
|
|
|
|
# Skip if already settled
|
|
if entry_link in settled_links:
|
|
continue
|
|
|
|
# Skip if we already have this entry (BQL returns one row per posting)
|
|
if entry_link in entries_by_link:
|
|
continue
|
|
|
|
# Parse amounts
|
|
fiat_amount = abs(float(number)) if number else 0.0
|
|
fiat_currency = "EUR" # Default, could be extracted from posting
|
|
|
|
# Parse SATS from weight column
|
|
sats_amount = 0
|
|
if isinstance(weight, dict) and "SATS" in weight:
|
|
sats_value = weight["SATS"]
|
|
sats_amount = abs(int(Decimal(str(sats_value))))
|
|
|
|
# Format date as string
|
|
date_str = str(date_val) if date_val else ""
|
|
|
|
entries_by_link[entry_link] = {
|
|
"link": entry_link,
|
|
"date": date_str,
|
|
"narration": narration or "",
|
|
"fiat_amount": fiat_amount,
|
|
"fiat_currency": fiat_currency,
|
|
"sats_amount": sats_amount,
|
|
"entry_hash": "", # Not available from BQL, use entry_id instead
|
|
"entry_id": entry_id or "",
|
|
"flag": "*"
|
|
}
|
|
|
|
# Convert to list and sort by date
|
|
unsettled = list(entries_by_link.values())
|
|
unsettled.sort(key=lambda x: x.get("date", ""))
|
|
|
|
logger.info(
|
|
f"BQL: Found {len(unsettled)} unsettled {entry_type} entries for user {user_short} "
|
|
f"(settled: {len(settled_links)})"
|
|
)
|
|
|
|
return unsettled
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting unsettled entries via BQL: {e}")
|
|
raise
|
|
|
|
async def get_unsettled_entries(
|
|
self,
|
|
user_id: str,
|
|
entry_type: str = "expense"
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get unsettled expense or receivable entries for a user.
|
|
|
|
Finds entries with exp-{id} or rcv-{id} links that don't have
|
|
a corresponding settlement entry with the same link.
|
|
|
|
Args:
|
|
user_id: User ID (first 8 characters used for account matching)
|
|
entry_type: "expense" (payables - castle owes user) or
|
|
"receivable" (user owes castle)
|
|
|
|
Returns:
|
|
List of unsettled entries with:
|
|
- link: The entry's unique link (exp-xxx or rcv-xxx)
|
|
- date: Entry date
|
|
- narration: Description
|
|
- fiat_amount: Amount in fiat currency
|
|
- fiat_currency: Currency code
|
|
- sats_amount: Amount in SATS (from weight)
|
|
- entry_hash: For potential updates
|
|
|
|
Example:
|
|
unsettled = await fava.get_unsettled_entries("cfe378b3...", "expense")
|
|
# Returns: [
|
|
# {"link": "exp-abc123", "date": "2025-12-01", "narration": "Groceries",
|
|
# "fiat_amount": 50.00, "fiat_currency": "EUR", "sats_amount": 47000},
|
|
# ...
|
|
# ]
|
|
"""
|
|
user_short = user_id[:8]
|
|
link_prefix = "exp-" if entry_type == "expense" else "rcv-"
|
|
|
|
# Determine account pattern based on entry type
|
|
if entry_type == "expense":
|
|
account_pattern = f"Liabilities:Payable:User-{user_short}"
|
|
else:
|
|
account_pattern = f"Assets:Receivable:User-{user_short}"
|
|
|
|
try:
|
|
# Get all journal entries
|
|
entries = await self.get_journal_entries()
|
|
|
|
# Track entries by link and which links have been settled
|
|
entries_by_link: Dict[str, Dict[str, Any]] = {}
|
|
settled_links: set = set()
|
|
|
|
for entry in entries:
|
|
entry_links = entry.get("links", [])
|
|
entry_tags = entry.get("tags", [])
|
|
postings = entry.get("postings", [])
|
|
|
|
# Check if this entry has our user's account
|
|
has_user_account = any(
|
|
account_pattern in p.get("account", "")
|
|
for p in postings
|
|
)
|
|
|
|
if not has_user_account:
|
|
continue
|
|
|
|
# Process each link in the entry
|
|
for link in entry_links:
|
|
if not link.startswith(link_prefix):
|
|
continue
|
|
|
|
# Check if this is a settlement (has settlement tag)
|
|
if "settlement" in entry_tags:
|
|
settled_links.add(link)
|
|
else:
|
|
# This is an original expense/receivable entry
|
|
# Extract amount from the user's posting
|
|
for posting in postings:
|
|
if account_pattern in posting.get("account", ""):
|
|
amount_str = posting.get("amount", "")
|
|
# Parse amount like "-50.00 EUR @@ 47000 SATS"
|
|
fiat_amount = 0.0
|
|
fiat_currency = ""
|
|
sats_amount = 0
|
|
|
|
# Extract fiat part
|
|
if " @@ " in amount_str:
|
|
fiat_part, sats_part = amount_str.split(" @@ ")
|
|
parts = fiat_part.strip().split()
|
|
if len(parts) >= 2:
|
|
fiat_amount = abs(float(parts[0]))
|
|
fiat_currency = parts[1]
|
|
# Extract sats
|
|
sats_parts = sats_part.strip().split()
|
|
if sats_parts:
|
|
sats_amount = abs(int(float(sats_parts[0])))
|
|
else:
|
|
# Legacy format without @@ - try to parse
|
|
parts = amount_str.strip().split()
|
|
if len(parts) >= 2:
|
|
fiat_amount = abs(float(parts[0]))
|
|
fiat_currency = parts[1]
|
|
|
|
entries_by_link[link] = {
|
|
"link": link,
|
|
"date": entry.get("date", ""),
|
|
"narration": entry.get("narration", ""),
|
|
"fiat_amount": fiat_amount,
|
|
"fiat_currency": fiat_currency,
|
|
"sats_amount": sats_amount,
|
|
"entry_hash": entry.get("entry_hash", ""),
|
|
"flag": entry.get("flag", "*")
|
|
}
|
|
break
|
|
|
|
# Return entries whose links are NOT in settled_links
|
|
unsettled = [
|
|
entry_data
|
|
for link, entry_data in entries_by_link.items()
|
|
if link not in settled_links
|
|
]
|
|
|
|
# Sort by date
|
|
unsettled.sort(key=lambda x: x.get("date", ""))
|
|
|
|
logger.info(
|
|
f"Found {len(unsettled)} unsettled {entry_type} entries for user {user_short} "
|
|
f"(total: {len(entries_by_link)}, settled: {len(settled_links)})"
|
|
)
|
|
|
|
return unsettled
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting unsettled entries: {e}")
|
|
raise
|
|
|
|
|
|
# Singleton instance (configured from settings)
|
|
_fava_client: Optional[FavaClient] = None
|
|
|
|
|
|
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
|
|
"""
|
|
Initialize the global Fava client.
|
|
|
|
Args:
|
|
fava_url: Base URL of Fava server
|
|
ledger_slug: Ledger identifier
|
|
timeout: Request timeout in seconds
|
|
"""
|
|
global _fava_client
|
|
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
|
|
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
|
|
|
|
|
|
def get_fava_client() -> FavaClient:
|
|
"""
|
|
Get the configured Fava client.
|
|
|
|
Returns:
|
|
FavaClient instance
|
|
|
|
Raises:
|
|
RuntimeError: If client not initialized
|
|
"""
|
|
if _fava_client is None:
|
|
raise RuntimeError(
|
|
"Fava client not initialized. Call init_fava_client() first. "
|
|
"Castle requires Fava for all accounting operations."
|
|
)
|
|
return _fava_client
|