Route account writes for the split Fava ledger layout #32

Merged
padreug merged 5 commits from feat/split-ledger into main 2026-06-06 18:03:26 +00:00
5 changed files with 191 additions and 35 deletions

View file

@ -320,11 +320,20 @@ async def sync_single_account_from_beancount(account_name: str) -> bool:
# Create in Libra DB # Create in Libra DB
account_type = infer_account_type_from_name(account_name) account_type = infer_account_type_from_name(account_name)
user_id = extract_user_id_from_account_name(account_name)
# Prefer the full user_id stored in Beancount metadata (libra writes it
# when crud.get_or_create_user_account calls fava.add_account). Fall
# back to the name-derived 8-char prefix for accounts imported without
# metadata. This keeps user_id consistent with what the caller will
# query for, avoiding a churn cycle through the UNIQUE-constraint
# recovery path in crud.py.
description = None description = None
meta_user_id = None
if "meta" in bc_account and isinstance(bc_account["meta"], dict): if "meta" in bc_account and isinstance(bc_account["meta"], dict):
description = bc_account["meta"].get("description") description = bc_account["meta"].get("description")
meta_user_id = bc_account["meta"].get("user_id")
user_id = meta_user_id or extract_user_id_from_account_name(account_name)
await create_account( await create_account(
CreateAccount( CreateAccount(

View file

@ -18,6 +18,7 @@ See: https://github.com/beancount/fava/blob/main/src/fava/json_api.py
""" """
import asyncio import asyncio
import re
import httpx import httpx
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from decimal import Decimal from decimal import Decimal
@ -30,6 +31,19 @@ class ChecksumConflictError(Exception):
pass pass
# Per-user account names end with :User-{user_id[:8]} (8 hex chars). Anything
# matching is routed to accounts/users.beancount; anything else goes to
# accounts/chart.beancount. See `_infer_target_file` and `add_account`.
_USER_ACCT_RE = re.compile(r":User-[0-9a-f]{8}$")
def _infer_target_file(account_name: str) -> str:
"""Pick the Beancount include file for an Open directive based on account name."""
if _USER_ACCT_RE.search(account_name):
return "accounts/users.beancount"
return "accounts/chart.beancount"
class FavaClient: class FavaClient:
""" """
Async client for Fava REST API. Async client for Fava REST API.
@ -66,6 +80,46 @@ class FavaClient:
# Per-user locks for user-specific operations (reduces contention) # Per-user locks for user-specific operations (reduces contention)
self._user_locks: Dict[str, asyncio.Lock] = {} self._user_locks: Dict[str, asyncio.Lock] = {}
# Cached absolute dirname of the root ledger file, derived from
# GET /api/options on first need. Used by `_resolve_target_file` to
# turn relative include paths (e.g. "accounts/users.beancount") into
# the absolute paths fava's /api/source endpoint requires.
self._main_dir_cache: Optional[str] = None
self._main_dir_lock = asyncio.Lock()
async def _resolve_target_file(self, target_file: str) -> str:
"""
Turn a relative include path into the absolute path fava expects.
Fava's /api/source endpoint refuses relative paths with HTTP 500
(NonSourceFileError). Resolve any non-absolute target_file by
prepending the directory of the root ledger file (cached after
the first GET /api/options).
Args:
target_file: Relative (e.g. "accounts/users.beancount") or
absolute path.
Returns:
Absolute path under fava's ledger root.
"""
import os
if os.path.isabs(target_file):
return target_file
if self._main_dir_cache is None:
async with self._main_dir_lock:
if self._main_dir_cache is None:
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.get(f"{self.base_url}/options")
resp.raise_for_status()
main_file = resp.json()["data"]["beancount_options"]["filename"]
self._main_dir_cache = os.path.dirname(main_file)
logger.debug(f"Cached fava ledger root dir: {self._main_dir_cache}")
return os.path.join(self._main_dir_cache, target_file)
def get_user_lock(self, user_id: str) -> asyncio.Lock: def get_user_lock(self, user_id: str) -> asyncio.Lock:
""" """
Get or create a lock for a specific user. Get or create a lock for a specific user.
@ -1487,13 +1541,20 @@ class FavaClient:
currencies: list[str], currencies: list[str],
opening_date: Optional[date] = None, opening_date: Optional[date] = None,
metadata: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None,
target_file: Optional[str] = None,
max_retries: int = 3 max_retries: int = 3
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Add an account to the Beancount ledger via an Open directive. Add an account to the Beancount ledger via an Open directive.
NOTE: Fava's /api/add_entries endpoint does NOT support Open directives. NOTE: Fava's /api/add_entries endpoint does NOT support Open directives.
This method uses /api/source to directly edit the Beancount file. This method uses /api/source to directly edit a Beancount file.
The ledger is split across multiple include files
(see modules/services/fava-seeds.nix in server-deploy). Per-user
opens go to accounts/users.beancount; admin/static chart opens go to
accounts/chart.beancount. If `target_file` is not passed, it is
inferred from the account name via `_infer_target_file`.
This method implements optimistic concurrency control with retry logic: This method implements optimistic concurrency control with retry logic:
- Acquires a global write lock before modifying the ledger - Acquires a global write lock before modifying the ledger
@ -1506,6 +1567,8 @@ class FavaClient:
currencies: List of currencies for this account (e.g., ["EUR", "SATS"]) currencies: List of currencies for this account (e.g., ["EUR", "SATS"])
opening_date: Date to open the account (defaults to today) opening_date: Date to open the account (defaults to today)
metadata: Optional metadata for the account metadata: Optional metadata for the account
target_file: Beancount file path (relative to ledger root) to append
the Open directive to. Defaults to inference from `account_name`.
max_retries: Maximum number of retry attempts on checksum conflict (default: 3) max_retries: Maximum number of retry attempts on checksum conflict (default: 3)
Returns: Returns:
@ -1515,17 +1578,18 @@ class FavaClient:
ChecksumConflictError: If all retry attempts fail due to concurrent modifications ChecksumConflictError: If all retry attempts fail due to concurrent modifications
Example: Example:
# Add a user's receivable account # User-account names route to accounts/users.beancount automatically.
result = await fava.add_account( result = await fava.add_account(
account_name="Assets:Receivable:User-abc123", account_name="Assets:Receivable:User-abc12345",
currencies=["EUR", "SATS", "USD"], currencies=["EUR", "SATS", "USD"],
metadata={"user_id": "abc123", "description": "User receivables"} metadata={"user_id": "abc12345", "description": "User receivables"}
) )
# Add a user's payable account # Static / admin-added chart entries route to accounts/chart.beancount.
result = await fava.add_account( result = await fava.add_account(
account_name="Liabilities:Payable:User-abc123", account_name="Expenses:NewCategory",
currencies=["EUR", "SATS"] currencies=["EUR"],
target_file="accounts/chart.beancount",
) )
""" """
from datetime import date as date_type from datetime import date as date_type
@ -1533,6 +1597,12 @@ class FavaClient:
if opening_date is None: if opening_date is None:
opening_date = date_type.today() opening_date = date_type.today()
if target_file is None:
target_file = _infer_target_file(account_name)
# Fava's /api/source requires absolute paths; convert if needed.
target_file = await self._resolve_target_file(target_file)
last_error = None last_error = None
for attempt in range(max_retries): for attempt in range(max_retries):
@ -1540,18 +1610,10 @@ class FavaClient:
async with self._write_lock: async with self._write_lock:
try: try:
async with httpx.AsyncClient(timeout=self.timeout) as client: async with httpx.AsyncClient(timeout=self.timeout) as client:
# Step 1: Get the main Beancount file path from Fava # Step 1: Get current source file (fresh read on each attempt)
options_response = await client.get(f"{self.base_url}/options")
options_response.raise_for_status()
options_data = options_response.json()["data"]
file_path = options_data["beancount_options"]["filename"]
logger.debug(f"Fava main file: {file_path}")
# Step 2: Get current source file (fresh read on each attempt)
response = await client.get( response = await client.get(
f"{self.base_url}/source", f"{self.base_url}/source",
params={"filename": file_path} params={"filename": target_file}
) )
response.raise_for_status() response.raise_for_status()
source_data = response.json()["data"] source_data = response.json()["data"]
@ -1559,23 +1621,22 @@ class FavaClient:
sha256sum = source_data["sha256sum"] sha256sum = source_data["sha256sum"]
source = source_data["source"] source = source_data["source"]
# Step 3: Check if account already exists (may have been created by concurrent request) # Step 2: Check if account already exists (may have been created by concurrent request)
if f"open {account_name}" in source: if f"open {account_name}" in source:
logger.info(f"Account {account_name} already exists in Beancount file") logger.info(f"Account {account_name} already exists in {target_file}")
return {"data": sha256sum, "mtime": source_data.get("mtime", "")} return {"data": sha256sum, "mtime": source_data.get("mtime", "")}
# Step 4: Find insertion point (after last Open directive AND its metadata) # Step 3: Always append at end of file.
# Post-split layout, each include file has one mutation
# profile (only Open directives in chart/users, only
# Transactions in transactions.beancount), so there's no
# reason to slot new entries mid-file. Append-only also
# keeps the seed header comments at the top and makes
# the file's evolution trivially readable.
lines = source.split('\n') lines = source.split('\n')
insert_index = 0 insert_index = len(lines)
for i, line in enumerate(lines):
if line.strip().startswith(('open ', f'{opening_date.year}-')) and 'open' in line:
# Found an Open directive, now skip over any metadata lines
insert_index = i + 1
# Skip metadata lines (lines starting with whitespace)
while insert_index < len(lines) and lines[insert_index].startswith((' ', '\t')) and lines[insert_index].strip():
insert_index += 1
# Step 5: Format Open directive as Beancount text # Step 4: Format Open directive as Beancount text
currencies_str = ", ".join(currencies) currencies_str = ", ".join(currencies)
open_lines = [ open_lines = [
"", "",
@ -1591,15 +1652,15 @@ class FavaClient:
else: else:
open_lines.append(f' {key}: {value}') open_lines.append(f' {key}: {value}')
# Step 6: Insert into source # Step 5: Insert into source
for i, line in enumerate(open_lines): for i, line in enumerate(open_lines):
lines.insert(insert_index + i, line) lines.insert(insert_index + i, line)
new_source = '\n'.join(lines) new_source = '\n'.join(lines)
# Step 7: Update source file via PUT /api/source # Step 6: Update source file via PUT /api/source
update_payload = { update_payload = {
"file_path": file_path, "file_path": target_file,
"source": new_source, "source": new_source,
"sha256sum": sha256sum "sha256sum": sha256sum
} }
@ -1612,7 +1673,7 @@ class FavaClient:
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
logger.info(f"Added account {account_name} to Beancount file with currencies {currencies}") logger.info(f"Added account {account_name} to {target_file} with currencies {currencies}")
return result return result
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
@ -1927,6 +1988,10 @@ class FavaClient:
# Singleton instance (configured from settings) # Singleton instance (configured from settings)
_fava_client: Optional[FavaClient] = None _fava_client: Optional[FavaClient] = None
# Set by init_fava_client; await for background tasks that must not run
# before the client exists (otherwise they raise "Fava client not initialized"
# during the first ~500ms of startup).
_fava_client_ready: asyncio.Event = asyncio.Event()
def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0): def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
@ -1940,9 +2005,21 @@ def init_fava_client(fava_url: str, ledger_slug: str, timeout: float = 10.0):
""" """
global _fava_client global _fava_client
_fava_client = FavaClient(fava_url, ledger_slug, timeout) _fava_client = FavaClient(fava_url, ledger_slug, timeout)
_fava_client_ready.set()
logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}") logger.info(f"Fava client initialized: {fava_url}/{ledger_slug}")
async def wait_for_fava_client() -> FavaClient:
"""Block until init_fava_client() has been called, then return the client.
Use this from background tasks started in libra_start() they otherwise
race the fire-and-forget _init_fava() coroutine and crash with
"Fava client not initialized" on first iteration.
"""
await _fava_client_ready.wait()
return get_fava_client()
def get_fava_client() -> FavaClient: def get_fava_client() -> FavaClient:
""" """
Get the configured Fava client. Get the configured Fava client.

View file

@ -48,6 +48,13 @@ class CreateAccount(BaseModel):
is_virtual: bool = False # Set to True to create virtual parent account is_virtual: bool = False # Set to True to create virtual parent account
class CreateChartAccount(BaseModel):
"""Admin-created chart-of-accounts entry written to accounts/chart.beancount."""
name: str # Full hierarchical account name, e.g. "Expenses:Services:Domain"
currencies: list[str] = ["EUR", "SATS", "USD"]
description: Optional[str] = None
class EntryLine(BaseModel): class EntryLine(BaseModel):
id: str id: str
journal_entry_id: str journal_entry_id: str

View file

@ -134,8 +134,15 @@ async def wait_for_account_sync():
Background task that periodically syncs accounts from Beancount to Libra DB. Background task that periodically syncs accounts from Beancount to Libra DB.
Runs hourly to ensure Libra DB stays in sync with Beancount. Runs hourly to ensure Libra DB stays in sync with Beancount.
Blocks on `wait_for_fava_client()` before the first iteration so we don't
race the fire-and-forget `_init_fava()` started in `libra_start()` and
fail the first sync with "Fava client not initialized".
""" """
from .fava_client import wait_for_fava_client
logger.info("[LIBRA] Account sync background task started") logger.info("[LIBRA] Account sync background task started")
await wait_for_fava_client()
while True: while True:
try: try:

View file

@ -52,6 +52,7 @@ from .models import (
LibraSettings, LibraSettings,
CreateAccount, CreateAccount,
CreateAccountPermission, CreateAccountPermission,
CreateChartAccount,
CreateBalanceAssertion, CreateBalanceAssertion,
CreateEntryLine, CreateEntryLine,
CreateJournalEntry, CreateJournalEntry,
@ -3565,6 +3566,61 @@ async def api_get_account_hierarchy(
# ===== ACCOUNT SYNC ENDPOINTS ===== # ===== ACCOUNT SYNC ENDPOINTS =====
_VALID_ACCOUNT_PREFIXES = ("Assets:", "Liabilities:", "Equity:", "Income:", "Expenses:")
@libra_api_router.post("/api/v1/admin/accounts", status_code=HTTPStatus.CREATED)
async def api_admin_add_chart_account(
payload: CreateChartAccount,
auth: AuthContext = Depends(require_super_user),
) -> dict:
"""
Add a chart-of-accounts entry (super-user only).
Writes an Open directive to accounts/chart.beancount via Fava's /api/source,
then syncs the account into Libra's DB so permissions can be granted on it.
Per-user accounts (matching :User-xxxxxxxx) take a different code path via
crud.get_or_create_user_account and are not created through this endpoint.
"""
from .fava_client import get_fava_client
if not payload.name.startswith(_VALID_ACCOUNT_PREFIXES):
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=(
f"Account name must start with one of "
f"{', '.join(_VALID_ACCOUNT_PREFIXES)} (got {payload.name!r})"
),
)
logger.info(
f"Admin {auth.user_id[:8]} adding chart account {payload.name} "
f"with currencies {payload.currencies}"
)
fava = get_fava_client()
metadata: dict = {"added_by": auth.user_id[:8], "source": "admin-ui"}
if payload.description:
metadata["description"] = payload.description
await fava.add_account(
account_name=payload.name,
currencies=payload.currencies,
target_file="accounts/chart.beancount",
metadata=metadata,
)
# Mirror into libra DB so permissions / metadata layer sees it.
from .account_sync import sync_single_account_from_beancount
synced = await sync_single_account_from_beancount(payload.name)
return {
"success": True,
"account_name": payload.name,
"synced_to_libra_db": synced,
}
@libra_api_router.post("/api/v1/admin/accounts/sync") @libra_api_router.post("/api/v1/admin/accounts/sync")
async def api_sync_all_accounts( async def api_sync_all_accounts(
force_full_sync: bool = False, force_full_sync: bool = False,