refactor(v2): hoist LP state (wallet, mode, autoforward) into dca_lp table

LP-level preferences were denormalised across every `dca_clients` row
of a given user. Every LP enrolment carried its own wallet_id /
dca_mode / fixed_mode_daily_limit / autoforward_ln_address /
autoforward_enabled — and satmachineclient's `update_lp_autoforward`
did a multi-row UPDATE to keep them in sync. That sync dance was the
smell: user-level intent stored at machine-enrolment granularity.

New shape:

  dca_lp  (user_id PK, dca_wallet_id, default_dca_mode,
           fixed_mode_daily_limit, autoforward_ln_address,
           autoforward_enabled, ...)
  dca_clients  (id, machine_id, user_id, username, status, ...)
        // pure (machine, LP) enrolment — wallet/mode/autoforward gone

Authority split:
  - LP writes dca_lp via satmachineclient (Phase 2, separate commit).
  - Operator writes dca_clients via satmachineadmin. They cannot
    choose the LP's destination wallet — it's resolved from dca_lp
    at distribution time. Better trust hygiene.

Onboarding gate:
  - `api_create_deposit` refuses (HTTP 422) when the target LP has
    no dca_lp row. Forces every LP through a "yes, I am here and
    this is where I want my sats" gesture via satmachineclient
    before any fiat starts accumulating against them.

Schema:
  - m001 canonical schema updated: slim `dca_clients`, new `dca_lp`.
    Fresh installs land here directly.
  - m004 idempotent migration for installs that already have the
    legacy `dca_clients.wallet_id` column: creates dca_lp,
    backfills from the latest dca_clients row per user (window
    function), then DROP COLUMN on the moved fields. Greg's live
    test data survives the upgrade.

Distribution:
  - `get_flow_mode_clients_for_machine` INNER JOINs dca_lp so
    un-onboarded LPs are filtered out (no destination wallet).
  - `_pay_one_dca_leg`, `_attempt_autoforward`, `settle_lp_balance`
    all fetch `dca_lp` via the new `get_dca_lp(user_id)` helper.
    Wallet + autoforward read from prefs, not from client.

Models:
  - `DcaClient` loses 5 fields. `CreateDcaClientData` reduces to
    (machine_id, user_id, username). `UpdateDcaClientData` keeps
    only operator-controlled fields (username, status).
  - New `DcaLpPreferences` + `UpsertDcaLpData` models for the
    per-user surface (satmachineclient writes these in Phase 2).

CRUD:
  - New: `get_dca_lp`, `lp_is_onboarded`, `upsert_dca_lp` (the
    latter takes a `fallback_wallet_id` for first-onboarding when
    satmachineclient auto-seeds from the LP's default LNbits wallet).
  - `create_dca_client` insert reduces to the new column set.

Tests: 86 unit tests still green.

Next:
  - Phase 1c (this repo): UI simplification for operator's
    Add/Edit LP dialogs + deposit-gating UX.
  - Phase 2 (satmachineclient): own dca_lp writes + auto-init with
    the LP's default LNbits wallet on first dashboard visit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Padreug 2026-05-16 10:05:54 +02:00
commit 80b5a6d785
5 changed files with 307 additions and 49 deletions

124
crud.py
View file

@ -22,6 +22,7 @@ from .models import (
CreateMachineData,
DcaClient,
DcaDeposit,
DcaLpPreferences,
DcaPayment,
DcaSettlement,
Machine,
@ -32,6 +33,7 @@ from .models import (
UpdateDepositStatusData,
UpdateMachineData,
UpdateSuperConfigData,
UpsertDcaLpData,
)
db = Database("ext_satoshimachine")
@ -168,28 +170,27 @@ async def delete_machine(machine_id: str) -> None:
async def create_dca_client(data: CreateDcaClientData) -> DcaClient:
"""Operator enrols an LP at one of their machines.
Pure (machine, LP) record. Wallet / mode / autoforward live on
dca_lp (per-user) populated by the LP via satmachineclient.
Enrolment doesn't require the LP to be onboarded yet, but deposits
do (see `create_deposit`).
"""
client_id = urlsafe_short_hash()
now = datetime.now()
await db.execute(
"""
INSERT INTO satoshimachine.dca_clients
(id, machine_id, user_id, wallet_id, username, dca_mode,
fixed_mode_daily_limit, autoforward_ln_address, autoforward_enabled,
status, created_at, updated_at)
VALUES (:id, :machine_id, :user_id, :wallet_id, :username, :dca_mode,
:fixed_mode_daily_limit, :autoforward_ln_address,
:autoforward_enabled, :status, :created_at, :updated_at)
(id, machine_id, user_id, username, status, created_at, updated_at)
VALUES (:id, :machine_id, :user_id, :username, :status,
:created_at, :updated_at)
""",
{
"id": client_id,
"machine_id": data.machine_id,
"user_id": data.user_id,
"wallet_id": data.wallet_id,
"username": data.username,
"dca_mode": data.dca_mode,
"fixed_mode_daily_limit": data.fixed_mode_daily_limit,
"autoforward_ln_address": data.autoforward_ln_address,
"autoforward_enabled": data.autoforward_enabled,
"status": "active",
"created_at": now,
"updated_at": now,
@ -262,20 +263,109 @@ async def get_dca_clients_for_user(user_id: str) -> List[DcaClient]:
async def get_flow_mode_clients_for_machine(machine_id: str) -> List[DcaClient]:
"""Active flow-mode clients used by the distribution algorithm."""
"""Active LPs enrolled at this machine whose per-user `dca_lp` row
has `default_dca_mode = 'flow'`. Used by the distribution algorithm.
An LP enrolment without a matching `dca_lp` row (i.e., the LP hasn't
onboarded via satmachineclient yet) is filtered out by the INNER
JOIN there's no destination wallet to pay to.
"""
return await db.fetchall(
"""
SELECT * FROM satoshimachine.dca_clients
WHERE machine_id = :machine_id
AND dca_mode = 'flow'
AND status = 'active'
ORDER BY created_at ASC
SELECT c.*
FROM satoshimachine.dca_clients c
JOIN satoshimachine.dca_lp lp ON lp.user_id = c.user_id
WHERE c.machine_id = :machine_id
AND lp.default_dca_mode = 'flow'
AND c.status = 'active'
ORDER BY c.created_at ASC
""",
{"machine_id": machine_id},
DcaClient,
)
# =============================================================================
# DCA LP preferences (per-user) — wallet + mode + autoforward
# =============================================================================
async def get_dca_lp(user_id: str) -> Optional[DcaLpPreferences]:
"""Return the LP's preferences row, or None if they haven't onboarded
via satmachineclient yet."""
return await db.fetchone(
"SELECT * FROM satoshimachine.dca_lp WHERE user_id = :uid",
{"uid": user_id},
DcaLpPreferences,
)
async def lp_is_onboarded(user_id: str) -> bool:
"""Cheap existence check used by the deposit-creation gate."""
row = await db.fetchone(
"SELECT user_id FROM satoshimachine.dca_lp WHERE user_id = :uid",
{"uid": user_id},
)
return row is not None
async def upsert_dca_lp(
user_id: str,
data: UpsertDcaLpData,
*,
fallback_wallet_id: Optional[str] = None,
) -> DcaLpPreferences:
"""Create or update the LP's preferences row.
First call (no row yet): `data.dca_wallet_id` must be set OR
`fallback_wallet_id` must be provided (satmachineclient passes the
LP's default LNbits wallet here when auto-seeding on first dashboard
visit). Subsequent calls update only the fields in `data` that are
non-None.
"""
existing = await get_dca_lp(user_id)
now = datetime.now()
if existing is None:
wallet_id = data.dca_wallet_id or fallback_wallet_id
if not wallet_id:
raise ValueError(
"first upsert requires dca_wallet_id (or fallback_wallet_id)"
)
await db.execute(
"""
INSERT INTO satoshimachine.dca_lp
(user_id, dca_wallet_id, default_dca_mode, fixed_mode_daily_limit,
autoforward_ln_address, autoforward_enabled,
created_at, updated_at)
VALUES (:uid, :wallet, :mode, :limit, :ln_addr, :auto,
:now, :now)
""",
{
"uid": user_id,
"wallet": wallet_id,
"mode": data.default_dca_mode or "flow",
"limit": data.fixed_mode_daily_limit,
"ln_addr": data.autoforward_ln_address,
"auto": data.autoforward_enabled or False,
"now": now,
},
)
else:
update_data: dict = {k: v for k, v in data.dict().items() if v is not None}
if not update_data:
return existing
update_data["updated_at"] = now
set_clause = ", ".join(f"{k} = :{k}" for k in update_data)
update_data["uid"] = user_id
await db.execute(
f"UPDATE satoshimachine.dca_lp SET {set_clause} WHERE user_id = :uid",
update_data,
)
refreshed = await get_dca_lp(user_id)
assert refreshed is not None
return refreshed
async def update_dca_client(
client_id: str, data: UpdateDcaClientData
) -> Optional[DcaClient]:

View file

@ -41,6 +41,7 @@ from .crud import (
count_completed_legs_for_settlement,
create_dca_payment,
get_client_balance_summary,
get_dca_lp,
get_effective_commission_splits,
get_flow_mode_clients_for_machine,
get_machine,
@ -53,6 +54,7 @@ from .crud import (
from .models import (
CreateDcaPaymentData,
DcaClient,
DcaLpPreferences,
DcaPayment,
DcaSettlement,
Machine,
@ -172,7 +174,17 @@ async def settle_lp_balance(
machine and the funding wallet (API endpoint does this). The amount_fiat
is capped at the LP's remaining balance — operators cannot accidentally
over-pay via this path.
The destination wallet is the LP's own `dca_lp.dca_wallet_id` — the
operator can't redirect this; if the LP hasn't onboarded yet there's
no destination and we refuse.
"""
prefs = await get_dca_lp(client.user_id)
if prefs is None:
raise ValueError(
f"client {client.id} (user {client.user_id[:8]}...) has not "
f"onboarded via satmachineclient — no DCA wallet configured"
)
summary = await get_client_balance_summary(client.id)
if summary is None:
raise ValueError(f"client {client.id} balance not available")
@ -208,7 +220,7 @@ async def settle_lp_balance(
machine_id=machine.id,
operator_user_id=machine.operator_user_id,
leg_type="settlement",
destination_wallet_id=client.wallet_id,
destination_wallet_id=prefs.dca_wallet_id,
destination_ln_address=None,
amount_sats=amount_sats,
amount_fiat=amount_fiat,
@ -225,7 +237,7 @@ async def settle_lp_balance(
}
try:
new_invoice = await create_invoice(
wallet_id=client.wallet_id,
wallet_id=prefs.dca_wallet_id,
amount=float(amount_sats),
internal=True,
memo=memo,
@ -557,9 +569,20 @@ async def _pay_one_dca_leg(
amount_sats: int,
errors: List[str],
) -> None:
"""Pay a single DCA leg + best-effort autoforward."""
"""Pay a single DCA leg + best-effort autoforward.
Reads the LP's destination wallet + autoforward config from `dca_lp`.
Callers reach this through `get_flow_mode_clients_for_machine` which
INNER JOINs on `dca_lp`, so a `prefs is None` here would indicate a
race (LP deleted their dca_lp row between query and pay) we
defensively skip.
"""
if amount_sats <= 0:
return
prefs = await get_dca_lp(client.user_id)
if prefs is None:
errors.append(f"client {client.id}: dca_lp row disappeared mid-distribution")
return
amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2)
memo = f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}"
dca_leg = await _pay_internal(
@ -567,7 +590,7 @@ async def _pay_one_dca_leg(
machine=machine,
leg_type="dca",
client_id=client.id,
destination_wallet_id=client.wallet_id,
destination_wallet_id=prefs.dca_wallet_id,
amount_sats=amount_sats,
amount_fiat=amount_fiat,
exchange_rate=float(settlement.exchange_rate),
@ -581,10 +604,10 @@ async def _pay_one_dca_leg(
if (
dca_leg is not None
and dca_leg.status == "completed"
and client.autoforward_enabled
and client.autoforward_ln_address
and prefs.autoforward_enabled
and prefs.autoforward_ln_address
):
await _attempt_autoforward(client, machine, settlement, amount_sats)
await _attempt_autoforward(client, prefs, machine, settlement, amount_sats)
# =============================================================================
@ -594,6 +617,7 @@ async def _pay_one_dca_leg(
async def _attempt_autoforward(
client: DcaClient,
prefs: DcaLpPreferences,
machine: Machine,
settlement: DcaSettlement,
amount_sats: int,
@ -610,7 +634,7 @@ async def _attempt_autoforward(
LNbits wallet. The LP can move them manually via the LNbits UI. We
never re-raise; failed forwarding must not block subsequent legs.
"""
address = client.autoforward_ln_address
address = prefs.autoforward_ln_address
if not address:
return
leg = await create_dca_payment(
@ -637,7 +661,7 @@ async def _attempt_autoforward(
comment=f"satmachine autoforward — {machine.machine_npub[:12]}",
)
paid = await pay_invoice(
wallet_id=client.wallet_id,
wallet_id=prefs.dca_wallet_id,
payment_request=bolt11,
description=f"satmachine autoforward → {address}",
tag=_payment_tag(machine),

View file

@ -102,20 +102,17 @@ async def m001_satmachine_v2_initial(db):
"ON dca_machines (wallet_id)"
)
# 4. dca_clients — LP registrations scoped per (machine, user). An LP
# can hold positions at many machines (and many operators) on the
# same LNbits instance.
# 4. dca_clients — per-(machine, LP) registrations. Pure machine
# enrolment record: no wallet, no mode, no autoforward — those are
# LP-controlled at the user level via dca_lp (see below). Operator
# just decides "this LP is enrolled at my machine"; everything
# delivery-related is the LP's own preference.
await db.execute(f"""
CREATE TABLE IF NOT EXISTS satoshimachine.dca_clients (
id TEXT PRIMARY KEY,
machine_id TEXT NOT NULL,
user_id TEXT NOT NULL,
wallet_id TEXT NOT NULL,
username TEXT,
dca_mode TEXT NOT NULL DEFAULT 'flow',
fixed_mode_daily_limit DECIMAL(10,2),
autoforward_ln_address TEXT,
autoforward_enabled BOOLEAN NOT NULL DEFAULT false,
status TEXT NOT NULL DEFAULT 'active',
created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now},
updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}
@ -126,9 +123,35 @@ async def m001_satmachine_v2_initial(db):
"ON dca_clients (machine_id, user_id)"
)
await db.execute(
"CREATE INDEX IF NOT EXISTS dca_clients_user_idx " "ON dca_clients (user_id)"
"CREATE INDEX IF NOT EXISTS dca_clients_user_idx ON dca_clients (user_id)"
)
# 4a. dca_lp — LP-level (per-user) DCA preferences. ONE row per LNbits
# user that has onboarded as a Liquidity Provider, regardless of
# how many machines they're enrolled at. Owned by the LP (writes
# come from the satmachineclient extension under the LP's session),
# read by satmachineadmin during distribution to resolve "where do
# DCA payouts for this LP go?"
#
# Gating: satmachineadmin refuses to create deposits for an LP who
# doesn't have a dca_lp row yet. The LP must onboard via
# satmachineclient first (which auto-creates the row with their
# default LNbits wallet on first dashboard visit). Forces every
# LP through a "yes, I am here and this is where I want my sats"
# gesture before any fiat starts accumulating against them.
await db.execute(f"""
CREATE TABLE IF NOT EXISTS satoshimachine.dca_lp (
user_id TEXT PRIMARY KEY,
dca_wallet_id TEXT NOT NULL,
default_dca_mode TEXT NOT NULL DEFAULT 'flow',
fixed_mode_daily_limit DECIMAL(10,2),
autoforward_ln_address TEXT,
autoforward_enabled BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now},
updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}
);
""")
# 5. dca_deposits — fiat the operator (or super) records against an LP
# at a machine. creator_user_id preserves audit trail.
await db.execute(f"""
@ -336,3 +359,81 @@ async def m003_rename_settlements_net_sats_to_principal_sats(db):
"ALTER TABLE satoshimachine.dca_settlements "
"RENAME COLUMN net_sats TO principal_sats"
)
async def m004_introduce_dca_lp_table(db):
"""Hoist LP-level state (wallet, mode, autoforward) out of dca_clients
into a per-user dca_lp table. dca_clients becomes a pure (machine, LP)
enrolment record; everything delivery-related becomes the LP's own
preference, owned and written by satmachineclient.
Why: the per-row state on dca_clients was a denormalised duplicate of
user-level intent ("which wallet should my DCA land in?" + "should it
forward to my LN address?" — same answer regardless of which machine
paid). Today's update_lp_autoforward already does a multi-row UPDATE
to keep the rows in sync a smell of state belonging one level up.
Fresh installs from m001 onward land on the new schema directly.
Existing installs (pre-m004 test data) get migrated here:
1. Create dca_lp table (no-op if already present from m001 path).
2. Backfill dca_lp from existing dca_clients rows, picking the
most-recently-updated row per user_id when an LP is enrolled at
multiple machines.
3. Drop the moved columns from dca_clients.
Idempotent: probes for the legacy `dca_clients.wallet_id` column. If
absent the install already on the new shape; no-op.
"""
try:
await db.fetchone("SELECT wallet_id FROM satoshimachine.dca_clients LIMIT 1")
except Exception:
return
# Step 1: create dca_lp if it doesn't exist yet. m001 on a fresh install
# already created it; on a pre-m004 install we're creating it here.
await db.execute(f"""
CREATE TABLE IF NOT EXISTS satoshimachine.dca_lp (
user_id TEXT PRIMARY KEY,
dca_wallet_id TEXT NOT NULL,
default_dca_mode TEXT NOT NULL DEFAULT 'flow',
fixed_mode_daily_limit DECIMAL(10,2),
autoforward_ln_address TEXT,
autoforward_enabled BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now},
updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}
);
""")
# Step 2: backfill dca_lp from dca_clients. Pick the latest row per
# user (by updated_at, falling back to created_at) when the LP is
# enrolled at multiple machines — that row reflects their most
# recent intent. ROW_NUMBER() OVER (...) requires SQLite 3.25+ (2018).
await db.execute("""
INSERT OR IGNORE INTO satoshimachine.dca_lp
(user_id, dca_wallet_id, default_dca_mode, fixed_mode_daily_limit,
autoforward_ln_address, autoforward_enabled,
created_at, updated_at)
SELECT user_id, wallet_id, dca_mode, fixed_mode_daily_limit,
autoforward_ln_address, autoforward_enabled,
created_at, updated_at
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY updated_at DESC, created_at DESC
) AS rn
FROM satoshimachine.dca_clients
) ranked
WHERE rn = 1
""")
# Step 3: drop the moved columns from dca_clients. ALTER TABLE DROP
# COLUMN needs SQLite 3.35+ (2021). One column per ALTER (SQLite
# doesn't support multi-column DROP).
for col in (
"wallet_id",
"dca_mode",
"fixed_mode_daily_limit",
"autoforward_ln_address",
"autoforward_enabled",
):
await db.execute(f"ALTER TABLE satoshimachine.dca_clients DROP COLUMN {col}")

View file

@ -80,40 +80,70 @@ class UpdateMachineData(BaseModel):
class CreateDcaClientData(BaseModel):
"""Operator enrols an LP at one of their machines.
Pure (machine, LP) tuple no wallet, no mode, no autoforward. Those
live on the per-user `dca_lp` row, written by the LP themselves via
satmachineclient. An LP must have onboarded (have a `dca_lp` row)
before deposits can be recorded against this enrolment; enrolment
itself works either way.
"""
machine_id: str
user_id: str
wallet_id: str
username: Optional[str] = None
dca_mode: str = "flow" # 'flow' | 'fixed'
fixed_mode_daily_limit: Optional[float] = None
# Auto-forward DCA distributions to an external LN address (best-effort;
# sats stay in LNbits wallet on forward failure — see satmachineadmin#8).
autoforward_ln_address: Optional[str] = None
autoforward_enabled: bool = False
class DcaClient(BaseModel):
id: str
machine_id: str
user_id: str
wallet_id: str
username: Optional[str]
dca_mode: str
fixed_mode_daily_limit: Optional[float]
autoforward_ln_address: Optional[str]
autoforward_enabled: bool
status: str
created_at: datetime
updated_at: datetime
class UpdateDcaClientData(BaseModel):
"""Operator-side updates to an enrolment. The operator can only edit
fields that aren't LP-controlled (username display, status). Wallet
/ mode / autoforward changes go through satmachineclient against
`dca_lp` instead."""
username: Optional[str] = None
dca_mode: Optional[str] = None
status: Optional[str] = None
class DcaLpPreferences(BaseModel):
"""Per-user DCA preferences, owned by the LP.
Created on first satmachineclient dashboard access (the extension
auto-seeds `dca_wallet_id` with the LP's first/default LNbits wallet
they can change it from the dashboard). All distribution decisions
(where do the sats go, do we forward to an LN address, what's the
default mode) read from here, joined onto `dca_clients` by user_id.
"""
user_id: str
dca_wallet_id: str
default_dca_mode: str # 'flow' | 'fixed'
fixed_mode_daily_limit: Optional[float]
autoforward_ln_address: Optional[str]
autoforward_enabled: bool
created_at: datetime
updated_at: datetime
class UpsertDcaLpData(BaseModel):
"""satmachineclient writes this on first onboarding / when the LP
edits their preferences. All fields optional on update pass only
the ones being changed."""
dca_wallet_id: Optional[str] = None
default_dca_mode: Optional[str] = None
fixed_mode_daily_limit: Optional[float] = None
autoforward_ln_address: Optional[str] = None
autoforward_enabled: Optional[bool] = None
status: Optional[str] = None
class ClientBalanceSummary(BaseModel):

View file

@ -39,6 +39,7 @@ from .crud import (
get_settlements_for_operator,
get_stuck_settlements_for_operator,
get_super_config,
lp_is_onboarded,
replace_commission_splits,
reset_settlement_for_retry,
update_dca_client,
@ -311,6 +312,18 @@ async def api_create_deposit(
HTTPStatus.BAD_REQUEST,
"client_id and machine_id refer to different machines",
)
# Gate: refuse deposits for an LP who hasn't onboarded via
# satmachineclient. Without a dca_lp row we don't know where to
# send their DCA distributions, so accepting fiat against them
# would just queue up sats with nowhere to go. Forces the LP to
# actively register before any economic activity accrues.
if not await lp_is_onboarded(client.user_id):
raise HTTPException(
HTTPStatus.UNPROCESSABLE_ENTITY,
"LP has not onboarded yet — they must register via "
"satmachineclient and select a DCA wallet before deposits "
"can be recorded against them.",
)
return await create_deposit(user.id, data)