castle/fava_client.py
padreug 7dabe8700d Add BQL-based report endpoints for expenses and contributions
New endpoints:
- GET /api/v1/reports/expenses - Expense summary by account or month
- GET /api/v1/reports/contributions - User contribution totals

New FavaClient methods:
- get_expense_summary_bql() - Aggregates expenses with date filtering
- get_user_contributions_bql() - Aggregates user expense submissions

Both use sum(weight) for efficient SATS aggregation from price notation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-15 01:15:29 +01:00

1654 lines
64 KiB
Python

"""
Fava API client for Castle.
This module provides an async HTTP client for interacting with Fava's JSON API.
All accounting logic is delegated to Fava/Beancount.
Fava provides a REST API for:
- Adding transactions (PUT /api/add_entries)
- Adding accounts via Open directives (PUT /api/add_entries)
- Querying balances (GET /api/query)
- Balance sheets (GET /api/balance_sheet)
- Account reports (GET /api/account_report)
- Updating/deleting entries (PUT/DELETE /api/source_slice)
See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
"""
import httpx
from typing import Any, Dict, List, Optional
from decimal import Decimal
from datetime import date, datetime
from loguru import logger
class FavaClient:
"""
Async client for Fava REST API.
Fava runs as a separate web service and provides a JSON API
for adding entries and querying ledger data.
All accounting calculations are performed by Beancount via Fava.
"""
def __init__(self, fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize Fava client.
Args:
fava_url: Base URL of Fava server (e.g., http://localhost:3333)
ledger_slug: URL-safe ledger identifier (e.g., castle-accounting)
timeout: Request timeout in seconds
"""
self.fava_url = fava_url.rstrip('/')
self.ledger_slug = ledger_slug
self.base_url = f"{self.fava_url}/{self.ledger_slug}/api"
self.timeout = timeout
async def add_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
"""
Submit a new journal entry to Fava.
Args:
entry: Beancount entry dict (format per Fava API spec)
Must include:
- t: "Transaction" (required by Fava)
- date: "YYYY-MM-DD"
- flag: "*" (cleared) or "!" (pending)
- narration: str
- postings: list of posting dicts
- payee: str (empty string, not None)
- tags: list of str
- links: list of str
- meta: dict
Returns:
Response from Fava ({"data": "Stored 1 entries.", "mtime": "..."})
Raises:
httpx.HTTPStatusError: If Fava returns an error
httpx.RequestError: If connection fails
Example:
entry = {
"t": "Transaction",
"date": "2025-01-15",
"flag": "*",
"payee": "Store",
"narration": "Purchase",
"postings": [
{"account": "Expenses:Food", "amount": "50.00 EUR"},
{"account": "Assets:Cash", "amount": "-50.00 EUR"}
],
"tags": [],
"links": [],
"meta": {"user_id": "abc123"}
}
result = await fava_client.add_entry(entry)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/add_entries",
json={"entries": [entry]},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added entry to Fava: {result.get('data', 'Unknown')}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_account_balance(self, account_name: str) -> Dict[str, Any]:
"""
Get balance for a specific account (excluding pending transactions).
Uses sum(weight) for efficient SATS aggregation from price notation.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
Returns:
Dict with:
- sats: int (balance in satoshis from weight column)
- fiat: Decimal (balance in fiat currency from number column)
- fiat_currency: str (currency code, defaults to EUR)
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
Example:
balance = await fava_client.get_account_balance("Assets:Receivable:User-abc")
# Returns: {"sats": 200000, "fiat": Decimal("150.00"), "fiat_currency": "EUR"}
"""
from decimal import Decimal
# Use sum(weight) for SATS and sum(number) for fiat
# Note: BQL doesn't support != operator, so use flag = '*' to exclude pending
query = f"SELECT sum(number), sum(weight) WHERE account = '{account_name}' AND flag = '*'"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
data = response.json()
if not data['data']['rows'] or not data['data']['rows'][0]:
return {"sats": 0, "fiat": Decimal(0), "fiat_currency": "EUR"}
row = data['data']['rows'][0]
fiat_sum = row[0] if len(row) > 0 else 0
weight_sum = row[1] if len(row) > 1 else {}
# Parse fiat amount
fiat_amount = Decimal(str(fiat_sum)) if fiat_sum else Decimal(0)
# Parse SATS from weight column
total_sats = 0
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
sats_value = weight_sum["SATS"]
total_sats = int(Decimal(str(sats_value)))
return {
"sats": total_sats,
"fiat": fiat_amount,
"fiat_currency": "EUR" # Default, could be extended to detect currency
}
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_user_balance(self, user_id: str) -> Dict[str, Any]:
"""
Get user's balance from castle's perspective.
Aggregates:
- Liabilities:Payable:User-{user_id} (negative = castle owes user)
- Assets:Receivable:User-{user_id} (positive = user owes castle)
Args:
user_id: User ID
Returns:
{
"balance": int (sats, positive = user owes castle, negative = castle owes user),
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [list of account dicts with balances]
}
Note:
Excludes pending transactions (flag='!') from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries for this user
all_entries = await self.get_journal_entries()
total_sats = 0
fiat_balances = {}
accounts_dict = {} # Track balances per account
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Process postings for this user
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process this user's accounts (account names use first 8 chars of user_id)
if f":User-{user_id[:8]}" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Parse amount string: can be EUR, USD, or SATS
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
import re
# Try total price notation: "50.00 EUR @@ 50000 SATS"
total_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@@\s+(-?\d+)\s+SATS$', amount_str)
# Try per-unit price notation: "50.00 EUR @ 1000.5 SATS"
unit_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@\s+([\d.]+)\s+SATS$', amount_str)
if total_price_match:
fiat_amount = Decimal(total_price_match.group(1))
fiat_currency = total_price_match.group(2)
sats_amount = int(total_price_match.group(3))
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
fiat_balances[fiat_currency] += fiat_amount
total_sats += sats_amount
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
elif unit_price_match:
fiat_amount = Decimal(unit_price_match.group(1))
fiat_currency = unit_price_match.group(2)
sats_per_unit = Decimal(unit_price_match.group(3))
sats_amount = int(fiat_amount * sats_per_unit)
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
fiat_balances[fiat_currency] += fiat_amount
total_sats += sats_amount
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
# Try simple fiat format: "50.00 EUR" (check metadata for sats)
elif re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str):
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
fiat_amount = Decimal(fiat_match.group(1))
fiat_currency = fiat_match.group(2)
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
fiat_balances[fiat_currency] += fiat_amount
# Also track SATS equivalent from metadata if available (legacy)
posting_meta = posting.get("meta", {})
sats_equiv = posting_meta.get("sats-equivalent")
if sats_equiv:
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
total_sats += sats_amount
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
else:
# Old format: SATS with cost/price notation - extract SATS amount
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
total_sats += sats_amount
# Track per account
if account_name not in accounts_dict:
accounts_dict[account_name] = {"account": account_name, "sats": 0}
accounts_dict[account_name]["sats"] += sats_amount
# Try to extract fiat from metadata or cost syntax (backward compatibility)
posting_meta = posting.get("meta", {})
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
fiat_currency_meta = posting_meta.get("fiat-currency")
if fiat_amount_total_str and fiat_currency_meta:
# Use exact total from metadata
fiat_total = Decimal(fiat_amount_total_str)
fiat_currency = fiat_currency_meta
if fiat_currency not in fiat_balances:
fiat_balances[fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_total = -fiat_total
fiat_balances[fiat_currency] += fiat_total
logger.info(f"User {user_id[:8]} balance: {total_sats} sats, fiat: {dict(fiat_balances)}")
return {
"balance": total_sats,
"fiat_balances": fiat_balances,
"accounts": list(accounts_dict.values())
}
async def get_all_user_balances(self) -> List[Dict[str, Any]]:
"""
Get balances for all users (admin view).
Returns:
[
{
"user_id": "abc123",
"balance": 100000,
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [...]
},
...
]
Note:
Excludes pending transactions (flag='!') and voided (tag #voided) from balance calculation.
Only cleared/completed transactions (flag='*') are included.
"""
# Get all journal entries and calculate balances from postings
all_entries = await self.get_journal_entries()
# Group by user_id
user_data = {}
for entry in all_entries:
# Skip non-transactions, pending (!), and voided
if entry.get("t") != "Transaction":
continue
if entry.get("flag") == "!":
continue
if "voided" in entry.get("tags", []):
continue
# Process postings
for posting in entry.get("postings", []):
account_name = posting.get("account", "")
# Only process user accounts (Payable or Receivable)
if ":User-" not in account_name:
continue
if "Payable" not in account_name and "Receivable" not in account_name:
continue
# Extract user_id from account name
user_id = account_name.split(":User-")[1]
if user_id not in user_data:
user_data[user_id] = {
"user_id": user_id,
"balance": 0,
"fiat_balances": {},
"accounts": []
}
# Parse amount string: can be EUR/USD directly (new format) or "SATS {EUR}" (old format)
amount_str = posting.get("amount", "")
if not isinstance(amount_str, str) or not amount_str:
continue
import re
# Try total price notation: "50.00 EUR @@ 50000 SATS"
total_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@@\s+(-?\d+)\s+SATS$', amount_str)
# Try per-unit price notation: "50.00 EUR @ 1000.5 SATS"
unit_price_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})\s+@\s+([\d.]+)\s+SATS$', amount_str)
if total_price_match:
fiat_amount = Decimal(total_price_match.group(1))
fiat_currency = total_price_match.group(2)
sats_amount = int(total_price_match.group(3))
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
user_data[user_id]["balance"] += sats_amount
elif unit_price_match:
fiat_amount = Decimal(unit_price_match.group(1))
fiat_currency = unit_price_match.group(2)
sats_per_unit = Decimal(unit_price_match.group(3))
sats_amount = int(fiat_amount * sats_per_unit)
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
user_data[user_id]["balance"] += sats_amount
# Try simple fiat format: "50.00 EUR" (check metadata for sats)
elif re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str):
fiat_match = re.match(r'^(-?[\d.]+)\s+([A-Z]{3})$', amount_str)
if fiat_match and fiat_match.group(2) in ('EUR', 'USD', 'GBP'):
fiat_amount = Decimal(fiat_match.group(1))
fiat_currency = fiat_match.group(2)
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_amount
# Also track SATS equivalent from metadata if available (legacy)
posting_meta = posting.get("meta", {})
sats_equiv = posting_meta.get("sats-equivalent")
if sats_equiv:
sats_amount = int(sats_equiv) if fiat_amount > 0 else -int(sats_equiv)
user_data[user_id]["balance"] += sats_amount
else:
# Old format: SATS with cost/price notation
sats_match = re.match(r'^(-?\d+)\s+SATS', amount_str)
if sats_match:
sats_amount = int(sats_match.group(1))
user_data[user_id]["balance"] += sats_amount
# Extract fiat from cost syntax or metadata (backward compatibility)
posting_meta = posting.get("meta", {})
fiat_amount_total_str = posting_meta.get("fiat-amount-total")
fiat_currency_meta = posting_meta.get("fiat-currency")
if fiat_amount_total_str and fiat_currency_meta:
fiat_total = Decimal(fiat_amount_total_str)
fiat_currency = fiat_currency_meta
if fiat_currency not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"][fiat_currency] = Decimal(0)
# Apply the same sign as the SATS amount
if sats_match:
sats_amount_for_sign = int(sats_match.group(1))
if sats_amount_for_sign < 0:
fiat_total = -fiat_total
user_data[user_id]["fiat_balances"][fiat_currency] += fiat_total
return list(user_data.values())
async def check_fava_health(self) -> bool:
"""
Check if Fava is running and accessible.
Returns:
True if Fava responds, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(
f"{self.base_url}/changed"
)
return response.status_code == 200
except Exception as e:
logger.warning(f"Fava health check failed: {e}")
return False
async def query_transactions(
self,
account_pattern: Optional[str] = None,
limit: int = 100,
include_pending: bool = True
) -> List[Dict[str, Any]]:
"""
Query transactions from Fava/Beancount.
Args:
account_pattern: Optional regex pattern to filter accounts (e.g., "User-abc123")
limit: Maximum number of transactions to return
include_pending: Include pending transactions (flag='!')
Returns:
List of transaction dictionaries with date, description, postings, etc.
Example:
# All transactions
txns = await fava.query_transactions()
# User's transactions
txns = await fava.query_transactions(account_pattern="User-abc123")
# Account transactions
txns = await fava.query_transactions(account_pattern="Assets:Receivable:User-abc")
"""
# Build Beancount query
if account_pattern:
query = f"SELECT * WHERE account ~ '{account_pattern}' ORDER BY date DESC LIMIT {limit}"
else:
query = f"SELECT * ORDER BY date DESC LIMIT {limit}"
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query}
)
response.raise_for_status()
result = response.json()
# Fava query API returns: {"data": {"rows": [...], "types": [...]}}
data = result.get("data", {})
rows = data.get("rows", [])
types = data.get("types", [])
# Build column name mapping
column_names = [t.get("name") for t in types]
# Transform Fava's query result to transaction list
transactions = []
for row in rows:
# Rows are arrays, convert to dict using column names
if isinstance(row, list) and len(row) == len(column_names):
txn = dict(zip(column_names, row))
# Filter by flag if needed
flag = txn.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(txn)
elif isinstance(row, dict):
# Already a dict (shouldn't happen with BQL, but handle it)
flag = row.get("flag", "*")
if not include_pending and flag == "!":
continue
transactions.append(row)
return transactions[:limit]
except httpx.HTTPStatusError as e:
logger.error(f"Fava query error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def query_bql(self, query_string: str) -> Dict[str, Any]:
"""
Execute arbitrary Beancount Query Language (BQL) query.
This is a general-purpose method for executing BQL queries against Fava/Beancount.
Use this for efficient aggregations, filtering, and data retrieval.
⚠️ LIMITATION: BQL can only query position amounts and transaction-level data.
It CANNOT access posting metadata (like 'sats-equivalent'). For Castle's current
ledger format where SATS are stored in metadata, manual aggregation is required.
See: docs/BQL-BALANCE-QUERIES.md for detailed analysis and test results.
FUTURE CONSIDERATION: If Castle's ledger format changes to use SATS as position
amounts (instead of metadata), BQL could provide significant performance benefits.
Args:
query_string: BQL query (e.g., "SELECT account, sum(position) WHERE account ~ 'User-abc'")
Returns:
{
"rows": [[col1, col2, ...], ...],
"types": [{"name": "col1", "type": "str"}, ...],
"column_names": ["col1", "col2", ...]
}
Example:
result = await fava.query_bql("SELECT account, sum(position) WHERE account ~ 'User-abc'")
for row in result["rows"]:
account, balance = row
print(f"{account}: {balance}")
See:
https://beancount.github.io/docs/beancount_query_language.html
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/query",
params={"query_string": query_string}
)
response.raise_for_status()
result = response.json()
# Fava returns: {"data": {"rows": [...], "types": [...]}}
data = result.get("data", {})
rows = data.get("rows", [])
types = data.get("types", [])
column_names = [t.get("name") for t in types]
return {
"rows": rows,
"types": types,
"column_names": column_names
}
except httpx.HTTPStatusError as e:
logger.error(f"BQL query error: {e.response.status_code} - {e.response.text}")
logger.error(f"Query was: {query_string}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_user_balance_bql(self, user_id: str) -> Dict[str, Any]:
"""
Get user balance using BQL with price notation (efficient server-side aggregation).
Uses sum(weight) to aggregate SATS from @@ price notation.
This provides 5-10x performance improvement over manual aggregation.
Args:
user_id: User ID
Returns:
{
"balance": int (sats from weight column),
"fiat_balances": {"EUR": Decimal("100.50"), ...},
"accounts": [{"account": "...", "sats": 150000, "eur": Decimal("100.50")}, ...]
}
Example:
balance = await fava.get_user_balance_bql("af983632")
print(f"Balance: {balance['balance']} sats")
"""
from decimal import Decimal
user_id_prefix = user_id[:8]
# BQL query using sum(weight) for SATS aggregation
# weight column returns the @@ price value (SATS) from price notation
query = f"""
SELECT account, sum(number), sum(weight)
WHERE account ~ ':User-{user_id_prefix}'
AND (account ~ 'Payable' OR account ~ 'Receivable')
AND flag = '*'
GROUP BY account
"""
result = await self.query_bql(query)
total_sats = 0
fiat_balances = {}
accounts = []
for row in result["rows"]:
account_name, fiat_sum, weight_sum = row
# Parse fiat amount (sum of EUR/USD amounts)
fiat_amount = Decimal(str(fiat_sum)) if fiat_sum else Decimal(0)
# Parse SATS from weight column
# weight_sum is an Inventory dict like {"SATS": -10442635.00}
sats_amount = 0
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
sats_value = weight_sum["SATS"]
sats_amount = int(Decimal(str(sats_value)))
total_sats += sats_amount
# Aggregate fiat (assume EUR for now, could be extended)
if fiat_amount != 0:
if "EUR" not in fiat_balances:
fiat_balances["EUR"] = Decimal(0)
fiat_balances["EUR"] += fiat_amount
accounts.append({
"account": account_name,
"sats": sats_amount,
"eur": fiat_amount
})
logger.info(f"User {user_id[:8]} balance (BQL): {total_sats} sats, fiat: {dict(fiat_balances)}")
return {
"balance": total_sats,
"fiat_balances": fiat_balances,
"accounts": accounts
}
async def get_all_user_balances_bql(self) -> List[Dict[str, Any]]:
"""
Get balances for all users using BQL with price notation (efficient admin view).
Uses sum(weight) to aggregate SATS from @@ price notation in a single query.
This provides significant performance benefits for admin views.
Returns:
[
{
"user_id": "abc123",
"balance": 100000,
"fiat_balances": {"EUR": Decimal("100.50")},
"accounts": [{"account": "...", "sats": 150000, "eur": Decimal("100.50")}, ...]
},
...
]
Example:
all_balances = await fava.get_all_user_balances_bql()
for user in all_balances:
print(f"{user['user_id']}: {user['balance']} sats")
"""
from decimal import Decimal
# BQL query using sum(weight) for SATS aggregation
query = """
SELECT account, sum(number), sum(weight)
WHERE (account ~ 'Payable:User-' OR account ~ 'Receivable:User-')
AND flag = '*'
GROUP BY account
"""
result = await self.query_bql(query)
# Group by user_id
user_data = {}
for row in result["rows"]:
account_name, fiat_sum, weight_sum = row
# Extract user_id from account name
if ":User-" not in account_name:
continue
user_id_with_prefix = account_name.split(":User-")[1]
user_id = user_id_with_prefix[:8]
if user_id not in user_data:
user_data[user_id] = {
"user_id": user_id,
"balance": 0,
"fiat_balances": {},
"accounts": []
}
# Parse fiat amount
fiat_amount = Decimal(str(fiat_sum)) if fiat_sum else Decimal(0)
# Parse SATS from weight column
sats_amount = 0
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
sats_value = weight_sum["SATS"]
sats_amount = int(Decimal(str(sats_value)))
user_data[user_id]["balance"] += sats_amount
# Aggregate fiat
if fiat_amount != 0:
if "EUR" not in user_data[user_id]["fiat_balances"]:
user_data[user_id]["fiat_balances"]["EUR"] = Decimal(0)
user_data[user_id]["fiat_balances"]["EUR"] += fiat_amount
user_data[user_id]["accounts"].append({
"account": account_name,
"sats": sats_amount,
"eur": fiat_amount
})
logger.info(f"Fetched balances for {len(user_data)} users (BQL)")
return list(user_data.values())
async def get_expense_summary_bql(
self,
start_date: str = None,
end_date: str = None,
group_by: str = "account"
) -> List[Dict[str, Any]]:
"""
Get expense summary using BQL, grouped by account or date.
Uses sum(weight) for efficient SATS aggregation from price notation.
Args:
start_date: ISO format date string (YYYY-MM-DD), optional
end_date: ISO format date string (YYYY-MM-DD), optional
group_by: "account" (default) or "month" for grouping
Returns:
List of expense summaries:
[
{"account": "Expenses:Supplies:Food", "fiat": 500.00, "sats": 550000},
{"account": "Expenses:Supplies:Kitchen", "fiat": 200.00, "sats": 220000},
...
]
Or if group_by="month":
[
{"month": "2025-12", "fiat": 700.00, "sats": 770000},
...
]
"""
from decimal import Decimal
# Build date filter
date_filter = ""
if start_date:
date_filter += f" AND date >= {start_date}"
if end_date:
date_filter += f" AND date <= {end_date}"
if group_by == "month":
query = f"""
SELECT year, month, sum(number), sum(weight)
WHERE account ~ 'Expenses:'
AND flag = '*'
{date_filter}
GROUP BY year, month
ORDER BY year DESC, month DESC
"""
else:
query = f"""
SELECT account, sum(number), sum(weight)
WHERE account ~ 'Expenses:'
AND flag = '*'
{date_filter}
GROUP BY account
ORDER BY sum(weight)
"""
try:
result = await self.query_bql(query)
summaries = []
for row in result["rows"]:
if group_by == "month":
year, month, fiat_sum, weight_sum = row
# Parse SATS from weight
sats_amount = 0
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
sats_amount = abs(int(Decimal(str(weight_sum["SATS"]))))
summaries.append({
"month": f"{year}-{month:02d}",
"fiat": abs(float(fiat_sum)) if fiat_sum else 0.0,
"fiat_currency": "EUR",
"sats": sats_amount
})
else:
account, fiat_sum, weight_sum = row
# Parse SATS from weight
sats_amount = 0
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
sats_amount = abs(int(Decimal(str(weight_sum["SATS"]))))
summaries.append({
"account": account,
"fiat": abs(float(fiat_sum)) if fiat_sum else 0.0,
"fiat_currency": "EUR",
"sats": sats_amount
})
logger.info(f"BQL: Expense summary returned {len(summaries)} items")
return summaries
except Exception as e:
logger.error(f"Error getting expense summary via BQL: {e}")
raise
async def get_user_contributions_bql(self) -> List[Dict[str, Any]]:
"""
Get total expense contributions per user using BQL.
Uses sum(weight) to aggregate all expenses each user has submitted
that created liabilities (castle owes user).
Returns:
List of user contribution summaries:
[
{
"user_id": "cfe378b3",
"total_fiat": 1500.00,
"total_sats": 1650000,
"entry_count": 25
},
...
]
"""
from decimal import Decimal
# Query all expense entries that created payables, grouped by user
query = """
SELECT account, sum(number), sum(weight), count(number)
WHERE account ~ 'Liabilities:Payable:User-'
AND 'expense-entry' IN tags
AND flag = '*'
GROUP BY account
ORDER BY sum(weight)
"""
try:
result = await self.query_bql(query)
contributions = []
for row in result["rows"]:
account, fiat_sum, weight_sum, count = row
# Extract user_id from account name
if ":User-" not in account:
continue
user_id = account.split(":User-")[1][:8]
# Parse SATS from weight (negative for liabilities)
sats_amount = 0
if isinstance(weight_sum, dict) and "SATS" in weight_sum:
sats_amount = abs(int(Decimal(str(weight_sum["SATS"]))))
contributions.append({
"user_id": user_id,
"total_fiat": abs(float(fiat_sum)) if fiat_sum else 0.0,
"fiat_currency": "EUR",
"total_sats": sats_amount,
"entry_count": int(count) if count else 0
})
# Sort by total_sats descending (highest contributors first)
contributions.sort(key=lambda x: x["total_sats"], reverse=True)
logger.info(f"BQL: Found contributions from {len(contributions)} users")
return contributions
except Exception as e:
logger.error(f"Error getting user contributions via BQL: {e}")
raise
async def get_account_transactions(
self,
account_name: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a specific account.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
limit: Maximum number of transactions
Returns:
List of transactions affecting this account
"""
return await self.query_transactions(
account_pattern=account_name.replace(":", "\\:"), # Escape colons for regex
limit=limit
)
async def get_user_transactions(
self,
user_id: str,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get all transactions affecting a user's accounts.
Args:
user_id: User ID
limit: Maximum number of transactions
Returns:
List of transactions affecting user's accounts
"""
return await self.query_transactions(
account_pattern=f"User-{user_id[:8]}",
limit=limit
)
async def get_all_accounts(self) -> List[Dict[str, Any]]:
"""
Get all accounts from Beancount/Fava using BQL query.
Returns:
List of account dictionaries:
[
{"account": "Assets:Cash", "meta": {...}},
{"account": "Expenses:Food", "meta": {...}},
...
]
Example:
accounts = await fava.get_all_accounts()
for acc in accounts:
print(acc["account"]) # "Assets:Cash"
"""
try:
# Use BQL to get all unique accounts
query = "SELECT DISTINCT account"
result = await self.query_bql(query)
# Convert BQL result to expected format
accounts = []
for row in result["rows"]:
account_name = row[0] if isinstance(row, list) else row.get("account")
if account_name:
accounts.append({
"account": account_name,
"meta": {} # BQL doesn't return metadata easily
})
logger.debug(f"Fava returned {len(accounts)} accounts via BQL")
return accounts
except Exception as e:
logger.error(f"Failed to fetch accounts via BQL: {e}")
raise
async def get_journal_entries(
self,
days: int = None,
start_date: str = None,
end_date: str = None
) -> List[Dict[str, Any]]:
"""
Get journal entries from Fava (with entry hashes), optionally filtered by date.
Uses Fava's server-side 'time' parameter for efficient date filtering,
avoiding the need to fetch all entries and filter in Python.
Args:
days: If provided, only return entries from the last N days.
If None, returns all entries (default behavior).
start_date: ISO format date string (YYYY-MM-DD). If provided with end_date,
filters entries between start_date and end_date (inclusive).
end_date: ISO format date string (YYYY-MM-DD). If provided with start_date,
filters entries between start_date and end_date (inclusive).
Note:
If both days and start_date/end_date are provided, start_date/end_date takes precedence.
Returns:
List of entries (transactions, opens, closes, etc.) with entry_hash field.
Example:
# Get all entries
entries = await fava.get_journal_entries()
# Get only last 30 days
recent = await fava.get_journal_entries(days=30)
# Get entries in custom date range
custom = await fava.get_journal_entries(start_date="2024-01-01", end_date="2024-01-31")
"""
from datetime import datetime, timedelta
try:
# Build query parameters for server-side filtering
params = {}
# Use date range if both start_date and end_date are provided
if start_date and end_date:
# Fava uses "YYYY-MM-DD - YYYY-MM-DD" format for time ranges
params["time"] = f"{start_date} - {end_date}"
logger.info(f"Querying journal with date range: {start_date} to {end_date}")
# Fall back to days filter if no date range provided
elif days is not None:
cutoff_date = (datetime.now() - timedelta(days=days)).date()
today = datetime.now().date()
params["time"] = f"{cutoff_date.isoformat()} - {today.isoformat()}"
logger.info(f"Querying journal for last {days} days (from {cutoff_date})")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.base_url}/journal", params=params)
response.raise_for_status()
result = response.json()
entries = result.get("data", [])
if params:
logger.info(f"Fava /journal returned {len(entries)} entries (filtered)")
else:
logger.info(f"Fava /journal returned {len(entries)} entries (all)")
return entries
except httpx.HTTPStatusError as e:
logger.error(f"Fava journal error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_entry_context(self, entry_hash: str) -> Dict[str, Any]:
"""
Get entry source text and sha256sum for editing.
Uses /source_slice endpoint which returns the editable source.
Args:
entry_hash: Entry hash from get_journal_entries()
Returns:
{
"slice": "2025-01-15 ! \"Description\"...", # Beancount source text
"sha256sum": "abc123...", # For concurrency control
}
Example:
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256sum = context["sha256sum"]
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/source_slice",
params={"entry_hash": entry_hash}
)
response.raise_for_status()
result = response.json()
return result.get("data", {})
except httpx.HTTPStatusError as e:
logger.error(f"Fava context error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def update_entry_source(self, entry_hash: str, new_source: str, sha256sum: str) -> str:
"""
Update an entry's source text (e.g., change flag from ! to *).
Args:
entry_hash: Entry hash
new_source: Modified Beancount source text
sha256sum: Current sha256sum from get_entry_context() for concurrency control
Returns:
New sha256sum after update
Example:
# Get context
context = await fava.get_entry_context("abc123")
source = context["slice"]
sha256 = context["sha256sum"]
# Change flag
new_source = source.replace("2025-01-15 !", "2025-01-15 *")
# Update
new_sha256 = await fava.update_entry_source("abc123", new_source, sha256)
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.put(
f"{self.base_url}/source_slice",
json={
"entry_hash": entry_hash,
"source": new_source,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava update error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def delete_entry(self, entry_hash: str, sha256sum: str) -> str:
"""
Delete an entry from the Beancount file.
Args:
entry_hash: Entry hash
sha256sum: Current sha256sum for concurrency control
Returns:
Success message
Example:
context = await fava.get_entry_context("abc123")
await fava.delete_entry("abc123", context["sha256sum"])
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.delete(
f"{self.base_url}/source_slice",
params={
"entry_hash": entry_hash,
"sha256sum": sha256sum
}
)
response.raise_for_status()
result = response.json()
return result.get("data", "")
except httpx.HTTPStatusError as e:
logger.error(f"Fava delete error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def add_account(
self,
account_name: str,
currencies: list[str],
opening_date: Optional[date] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Add an account to the Beancount ledger via an Open directive.
NOTE: Fava's /api/add_entries endpoint does NOT support Open directives.
This method uses /api/source to directly edit the Beancount file.
Args:
account_name: Full account name (e.g., "Assets:Receivable:User-abc123")
currencies: List of currencies for this account (e.g., ["EUR", "SATS"])
opening_date: Date to open the account (defaults to today)
metadata: Optional metadata for the account
Returns:
Response from Fava ({"data": "new_sha256sum", "mtime": "..."})
Example:
# Add a user's receivable account
result = await fava.add_account(
account_name="Assets:Receivable:User-abc123",
currencies=["EUR", "SATS", "USD"],
metadata={"user_id": "abc123", "description": "User receivables"}
)
# Add a user's payable account
result = await fava.add_account(
account_name="Liabilities:Payable:User-abc123",
currencies=["EUR", "SATS"]
)
"""
from datetime import date as date_type
if opening_date is None:
opening_date = date_type.today()
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Step 1: Get the main Beancount file path from Fava
options_response = await client.get(f"{self.base_url}/options")
options_response.raise_for_status()
options_data = options_response.json()["data"]
file_path = options_data["beancount_options"]["filename"]
logger.debug(f"Fava main file: {file_path}")
# Step 2: Get current source file
response = await client.get(
f"{self.base_url}/source",
params={"filename": file_path}
)
response.raise_for_status()
source_data = response.json()["data"]
sha256sum = source_data["sha256sum"]
source = source_data["source"]
# Step 2: Check if account already exists
if f"open {account_name}" in source:
logger.info(f"Account {account_name} already exists in Beancount file")
return {"data": sha256sum, "mtime": source_data.get("mtime", "")}
# Step 3: Find insertion point (after last Open directive AND its metadata)
lines = source.split('\n')
insert_index = 0
for i, line in enumerate(lines):
if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line:
# Found an Open directive, now skip over any metadata lines
insert_index = i + 1
# Skip metadata lines (lines starting with whitespace)
while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip():
insert_index += 1
# Step 4: Format Open directive as Beancount text
currencies_str = ", ".join(currencies)
open_lines = [
"",
f"{opening_date.isoformat()} open {account_name} {currencies_str}"
]
# Add metadata if provided
if metadata:
for key, value in metadata.items():
# Format metadata with proper indentation
if isinstance(value, str):
open_lines.append(f' {key}: "{value}"')
else:
open_lines.append(f' {key}: {value}')
# Step 5: Insert into source
for i, line in enumerate(open_lines):
lines.insert(insert_index + i, line)
new_source = '\n'.join(lines)
# Step 6: Update source file via PUT /api/source
update_payload = {
"file_path": file_path,
"source": new_source,
"sha256sum": sha256sum
}
response = await client.put(
f"{self.base_url}/source",
json=update_payload,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}")
return result
except httpx.HTTPStatusError as e:
logger.error(f"Fava HTTP error adding account: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Fava connection error: {e}")
raise
async def get_unsettled_entries_bql(
self,
user_id: str,
entry_type: str = "expense"
) -> List[Dict[str, Any]]:
"""
Get unsettled expense or receivable entries for a user using BQL.
Uses BQL queries to efficiently find entries with exp-{id} or rcv-{id}
links that don't have a corresponding settlement entry.
This is significantly more efficient than the legacy method as it:
- Queries only relevant entries (not ALL journal entries)
- Uses weight column for SATS amounts (no string parsing)
- Filters by tags and account patterns in the database
Args:
user_id: User ID (first 8 characters used for account matching)
entry_type: "expense" (payables - castle owes user) or
"receivable" (user owes castle)
Returns:
List of unsettled entries with:
- link: The entry's unique link (exp-xxx or rcv-xxx)
- date: Entry date
- narration: Description
- fiat_amount: Amount in fiat currency (absolute value)
- fiat_currency: Currency code
- sats_amount: Amount in SATS (absolute value, from weight)
- entry_hash: For potential updates
- flag: Transaction flag
"""
from decimal import Decimal
user_short = user_id[:8]
link_prefix = "exp-" if entry_type == "expense" else "rcv-"
entry_tag = "expense-entry" if entry_type == "expense" else "receivable-entry"
# Determine account pattern based on entry type
if entry_type == "expense":
account_pattern = f"Liabilities:Payable:User-{user_short}"
else:
account_pattern = f"Assets:Receivable:User-{user_short}"
try:
# Query 1: Get all original expense/receivable entries for this user
# These are entries with the expense-entry or receivable-entry tag
original_query = f"""
SELECT date, narration, account, number, weight, links,
any_meta('entry-id') as entry_id
WHERE account ~ '{account_pattern}'
AND '{entry_tag}' IN tags
AND flag = '*'
ORDER BY date
"""
original_result = await self.query_bql(original_query)
# Query 2: Get all settlement entries for this user
# These are entries with the settlement tag
settlement_query = f"""
SELECT links
WHERE account ~ '{account_pattern}'
AND 'settlement' IN tags
AND flag = '*'
"""
settlement_result = await self.query_bql(settlement_query)
# Build set of settled links from settlement entries
settled_links: set = set()
for row in settlement_result["rows"]:
links = row[0] if row else []
if isinstance(links, list):
for link in links:
if link.startswith(link_prefix):
settled_links.add(link)
# Process original entries and find unsettled ones
entries_by_link: Dict[str, Dict[str, Any]] = {}
for row in original_result["rows"]:
date_val, narration, account, number, weight, links, entry_id = row
# Skip if no links
if not links or not isinstance(links, list):
continue
# Find the exp-/rcv- link
entry_link = None
for link in links:
if link.startswith(link_prefix):
entry_link = link
break
if not entry_link:
continue
# Skip if already settled
if entry_link in settled_links:
continue
# Skip if we already have this entry (BQL returns one row per posting)
if entry_link in entries_by_link:
continue
# Parse amounts
fiat_amount = abs(float(number)) if number else 0.0
fiat_currency = "EUR" # Default, could be extracted from posting
# Parse SATS from weight column
sats_amount = 0
if isinstance(weight, dict) and "SATS" in weight:
sats_value = weight["SATS"]
sats_amount = abs(int(Decimal(str(sats_value))))
# Format date as string
date_str = str(date_val) if date_val else ""
entries_by_link[entry_link] = {
"link": entry_link,
"date": date_str,
"narration": narration or "",
"fiat_amount": fiat_amount,
"fiat_currency": fiat_currency,
"sats_amount": sats_amount,
"entry_hash": "", # Not available from BQL, use entry_id instead
"entry_id": entry_id or "",
"flag": "*"
}
# Convert to list and sort by date
unsettled = list(entries_by_link.values())
unsettled.sort(key=lambda x: x.get("date", ""))
logger.info(
f"BQL: Found {len(unsettled)} unsettled {entry_type} entries for user {user_short} "
f"(settled: {len(settled_links)})"
)
return unsettled
except Exception as e:
logger.error(f"Error getting unsettled entries via BQL: {e}")
raise
async def get_unsettled_entries(
self,
user_id: str,
entry_type: str = "expense"
) -> List[Dict[str, Any]]:
"""
Get unsettled expense or receivable entries for a user.
Finds entries with exp-{id} or rcv-{id} links that don't have
a corresponding settlement entry with the same link.
Args:
user_id: User ID (first 8 characters used for account matching)
entry_type: "expense" (payables - castle owes user) or
"receivable" (user owes castle)
Returns:
List of unsettled entries with:
- link: The entry's unique link (exp-xxx or rcv-xxx)
- date: Entry date
- narration: Description
- fiat_amount: Amount in fiat currency
- fiat_currency: Currency code
- sats_amount: Amount in SATS (from weight)
- entry_hash: For potential updates
Example:
unsettled = await fava.get_unsettled_entries("cfe378b3...", "expense")
# Returns: [
# {"link": "exp-abc123", "date": "2025-12-01", "narration": "Groceries",
# "fiat_amount": 50.00, "fiat_currency": "EUR", "sats_amount": 47000},
# ...
# ]
"""
user_short = user_id[:8]
link_prefix = "exp-" if entry_type == "expense" else "rcv-"
# Determine account pattern based on entry type
if entry_type == "expense":
account_pattern = f"Liabilities:Payable:User-{user_short}"
else:
account_pattern = f"Assets:Receivable:User-{user_short}"
try:
# Get all journal entries
entries = await self.get_journal_entries()
# Track entries by link and which links have been settled
entries_by_link: Dict[str, Dict[str, Any]] = {}
settled_links: set = set()
for entry in entries:
entry_links = entry.get("links", [])
entry_tags = entry.get("tags", [])
postings = entry.get("postings", [])
# Check if this entry has our user's account
has_user_account = any(
account_pattern in p.get("account", "")
for p in postings
)
if not has_user_account:
continue
# Process each link in the entry
for link in entry_links:
if not link.startswith(link_prefix):
continue
# Check if this is a settlement (has settlement tag)
if "settlement" in entry_tags:
settled_links.add(link)
else:
# This is an original expense/receivable entry
# Extract amount from the user's posting
for posting in postings:
if account_pattern in posting.get("account", ""):
amount_str = posting.get("amount", "")
# Parse amount like "-50.00 EUR @@ 47000 SATS"
fiat_amount = 0.0
fiat_currency = ""
sats_amount = 0
# Extract fiat part
if " @@ " in amount_str:
fiat_part, sats_part = amount_str.split(" @@ ")
parts = fiat_part.strip().split()
if len(parts) >= 2:
fiat_amount = abs(float(parts[0]))
fiat_currency = parts[1]
# Extract sats
sats_parts = sats_part.strip().split()
if sats_parts:
sats_amount = abs(int(float(sats_parts[0])))
else:
# Legacy format without @@ - try to parse
parts = amount_str.strip().split()
if len(parts) >= 2:
fiat_amount = abs(float(parts[0]))
fiat_currency = parts[1]
entries_by_link[link] = {
"link": link,
"date": entry.get("date", ""),
"narration": entry.get("narration", ""),
"fiat_amount": fiat_amount,
"fiat_currency": fiat_currency,
"sats_amount": sats_amount,
"entry_hash": entry.get("entry_hash", ""),
"flag": entry.get("flag", "*")
}
break
# Return entries whose links are NOT in settled_links
unsettled = [
entry_data
for link, entry_data in entries_by_link.items()
if link not in settled_links
]
# Sort by date
unsettled.sort(key=lambda x: x.get("date", ""))
logger.info(
f"Found {len(unsettled)} unsettled {entry_type} entries for user {user_short} "
f"(total: {len(entries_by_link)}, settled: {len(settled_links)})"
)
return unsettled
except Exception as e:
logger.error(f"Error getting unsettled entries: {e}")
raise
# Singleton instance (configured from settings)
_fava_client: Optional[FavaClient] = None
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
"""
Initialize the global Fava client.
Args:
fava_url: Base URL of Fava server
ledger_slug: Ledger identifier
timeout: Request timeout in seconds
"""
global _fava_client
_fava_client = FavaClient(fava_url, ledger_slug, timeout)
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
def get_fava_client() -> FavaClient:
"""
Get the configured Fava client.
Returns:
FavaClient instance
Raises:
RuntimeError: If client not initialized
"""
if _fava_client is None:
raise RuntimeError(
"Fava client not initialized. Call init_fava_client() first. "
"Castle requires Fava for all accounting operations."
)
return _fava_client