diff --git a/crud.py b/crud.py index 53b3fe3..581b053 100644 --- a/crud.py +++ b/crud.py @@ -22,6 +22,7 @@ from .models import ( CreateMachineData, DcaClient, DcaDeposit, + DcaLpPreferences, DcaPayment, DcaSettlement, Machine, @@ -32,6 +33,7 @@ from .models import ( UpdateDepositStatusData, UpdateMachineData, UpdateSuperConfigData, + UpsertDcaLpData, ) db = Database("ext_satoshimachine") @@ -168,28 +170,27 @@ async def delete_machine(machine_id: str) -> None: async def create_dca_client(data: CreateDcaClientData) -> DcaClient: + """Operator enrols an LP at one of their machines. + + Pure (machine, LP) record. Wallet / mode / autoforward live on + dca_lp (per-user) — populated by the LP via satmachineclient. + Enrolment doesn't require the LP to be onboarded yet, but deposits + do (see `create_deposit`). + """ client_id = urlsafe_short_hash() now = datetime.now() await db.execute( """ INSERT INTO satoshimachine.dca_clients - (id, machine_id, user_id, wallet_id, username, dca_mode, - fixed_mode_daily_limit, autoforward_ln_address, autoforward_enabled, - status, created_at, updated_at) - VALUES (:id, :machine_id, :user_id, :wallet_id, :username, :dca_mode, - :fixed_mode_daily_limit, :autoforward_ln_address, - :autoforward_enabled, :status, :created_at, :updated_at) + (id, machine_id, user_id, username, status, created_at, updated_at) + VALUES (:id, :machine_id, :user_id, :username, :status, + :created_at, :updated_at) """, { "id": client_id, "machine_id": data.machine_id, "user_id": data.user_id, - "wallet_id": data.wallet_id, "username": data.username, - "dca_mode": data.dca_mode, - "fixed_mode_daily_limit": data.fixed_mode_daily_limit, - "autoforward_ln_address": data.autoforward_ln_address, - "autoforward_enabled": data.autoforward_enabled, "status": "active", "created_at": now, "updated_at": now, @@ -262,20 +263,109 @@ async def get_dca_clients_for_user(user_id: str) -> List[DcaClient]: async def get_flow_mode_clients_for_machine(machine_id: str) -> List[DcaClient]: - """Active flow-mode clients used by the distribution algorithm.""" + """Active LPs enrolled at this machine whose per-user `dca_lp` row + has `default_dca_mode = 'flow'`. Used by the distribution algorithm. + + An LP enrolment without a matching `dca_lp` row (i.e., the LP hasn't + onboarded via satmachineclient yet) is filtered out by the INNER + JOIN — there's no destination wallet to pay to. + """ return await db.fetchall( """ - SELECT * FROM satoshimachine.dca_clients - WHERE machine_id = :machine_id - AND dca_mode = 'flow' - AND status = 'active' - ORDER BY created_at ASC + SELECT c.* + FROM satoshimachine.dca_clients c + JOIN satoshimachine.dca_lp lp ON lp.user_id = c.user_id + WHERE c.machine_id = :machine_id + AND lp.default_dca_mode = 'flow' + AND c.status = 'active' + ORDER BY c.created_at ASC """, {"machine_id": machine_id}, DcaClient, ) +# ============================================================================= +# DCA LP preferences (per-user) — wallet + mode + autoforward +# ============================================================================= + + +async def get_dca_lp(user_id: str) -> Optional[DcaLpPreferences]: + """Return the LP's preferences row, or None if they haven't onboarded + via satmachineclient yet.""" + return await db.fetchone( + "SELECT * FROM satoshimachine.dca_lp WHERE user_id = :uid", + {"uid": user_id}, + DcaLpPreferences, + ) + + +async def lp_is_onboarded(user_id: str) -> bool: + """Cheap existence check used by the deposit-creation gate.""" + row = await db.fetchone( + "SELECT user_id FROM satoshimachine.dca_lp WHERE user_id = :uid", + {"uid": user_id}, + ) + return row is not None + + +async def upsert_dca_lp( + user_id: str, + data: UpsertDcaLpData, + *, + fallback_wallet_id: Optional[str] = None, +) -> DcaLpPreferences: + """Create or update the LP's preferences row. + + First call (no row yet): `data.dca_wallet_id` must be set OR + `fallback_wallet_id` must be provided (satmachineclient passes the + LP's default LNbits wallet here when auto-seeding on first dashboard + visit). Subsequent calls update only the fields in `data` that are + non-None. + """ + existing = await get_dca_lp(user_id) + now = datetime.now() + if existing is None: + wallet_id = data.dca_wallet_id or fallback_wallet_id + if not wallet_id: + raise ValueError( + "first upsert requires dca_wallet_id (or fallback_wallet_id)" + ) + await db.execute( + """ + INSERT INTO satoshimachine.dca_lp + (user_id, dca_wallet_id, default_dca_mode, fixed_mode_daily_limit, + autoforward_ln_address, autoforward_enabled, + created_at, updated_at) + VALUES (:uid, :wallet, :mode, :limit, :ln_addr, :auto, + :now, :now) + """, + { + "uid": user_id, + "wallet": wallet_id, + "mode": data.default_dca_mode or "flow", + "limit": data.fixed_mode_daily_limit, + "ln_addr": data.autoforward_ln_address, + "auto": data.autoforward_enabled or False, + "now": now, + }, + ) + else: + update_data: dict = {k: v for k, v in data.dict().items() if v is not None} + if not update_data: + return existing + update_data["updated_at"] = now + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) + update_data["uid"] = user_id + await db.execute( + f"UPDATE satoshimachine.dca_lp SET {set_clause} WHERE user_id = :uid", + update_data, + ) + refreshed = await get_dca_lp(user_id) + assert refreshed is not None + return refreshed + + async def update_dca_client( client_id: str, data: UpdateDcaClientData ) -> Optional[DcaClient]: diff --git a/distribution.py b/distribution.py index 8e953a4..d1e5fee 100644 --- a/distribution.py +++ b/distribution.py @@ -41,6 +41,7 @@ from .crud import ( count_completed_legs_for_settlement, create_dca_payment, get_client_balance_summary, + get_dca_lp, get_effective_commission_splits, get_flow_mode_clients_for_machine, get_machine, @@ -53,6 +54,7 @@ from .crud import ( from .models import ( CreateDcaPaymentData, DcaClient, + DcaLpPreferences, DcaPayment, DcaSettlement, Machine, @@ -172,7 +174,17 @@ async def settle_lp_balance( machine and the funding wallet (API endpoint does this). The amount_fiat is capped at the LP's remaining balance — operators cannot accidentally over-pay via this path. + + The destination wallet is the LP's own `dca_lp.dca_wallet_id` — the + operator can't redirect this; if the LP hasn't onboarded yet there's + no destination and we refuse. """ + prefs = await get_dca_lp(client.user_id) + if prefs is None: + raise ValueError( + f"client {client.id} (user {client.user_id[:8]}...) has not " + f"onboarded via satmachineclient — no DCA wallet configured" + ) summary = await get_client_balance_summary(client.id) if summary is None: raise ValueError(f"client {client.id} balance not available") @@ -208,7 +220,7 @@ async def settle_lp_balance( machine_id=machine.id, operator_user_id=machine.operator_user_id, leg_type="settlement", - destination_wallet_id=client.wallet_id, + destination_wallet_id=prefs.dca_wallet_id, destination_ln_address=None, amount_sats=amount_sats, amount_fiat=amount_fiat, @@ -225,7 +237,7 @@ async def settle_lp_balance( } try: new_invoice = await create_invoice( - wallet_id=client.wallet_id, + wallet_id=prefs.dca_wallet_id, amount=float(amount_sats), internal=True, memo=memo, @@ -557,9 +569,20 @@ async def _pay_one_dca_leg( amount_sats: int, errors: List[str], ) -> None: - """Pay a single DCA leg + best-effort autoforward.""" + """Pay a single DCA leg + best-effort autoforward. + + Reads the LP's destination wallet + autoforward config from `dca_lp`. + Callers reach this through `get_flow_mode_clients_for_machine` which + INNER JOINs on `dca_lp`, so a `prefs is None` here would indicate a + race (LP deleted their dca_lp row between query and pay) — we + defensively skip. + """ if amount_sats <= 0: return + prefs = await get_dca_lp(client.user_id) + if prefs is None: + errors.append(f"client {client.id}: dca_lp row disappeared mid-distribution") + return amount_fiat = round(amount_sats / float(settlement.exchange_rate), 2) memo = f"DCA: {amount_sats} sats • {amount_fiat:.2f} {settlement.fiat_code}" dca_leg = await _pay_internal( @@ -567,7 +590,7 @@ async def _pay_one_dca_leg( machine=machine, leg_type="dca", client_id=client.id, - destination_wallet_id=client.wallet_id, + destination_wallet_id=prefs.dca_wallet_id, amount_sats=amount_sats, amount_fiat=amount_fiat, exchange_rate=float(settlement.exchange_rate), @@ -581,10 +604,10 @@ async def _pay_one_dca_leg( if ( dca_leg is not None and dca_leg.status == "completed" - and client.autoforward_enabled - and client.autoforward_ln_address + and prefs.autoforward_enabled + and prefs.autoforward_ln_address ): - await _attempt_autoforward(client, machine, settlement, amount_sats) + await _attempt_autoforward(client, prefs, machine, settlement, amount_sats) # ============================================================================= @@ -594,6 +617,7 @@ async def _pay_one_dca_leg( async def _attempt_autoforward( client: DcaClient, + prefs: DcaLpPreferences, machine: Machine, settlement: DcaSettlement, amount_sats: int, @@ -610,7 +634,7 @@ async def _attempt_autoforward( LNbits wallet. The LP can move them manually via the LNbits UI. We never re-raise; failed forwarding must not block subsequent legs. """ - address = client.autoforward_ln_address + address = prefs.autoforward_ln_address if not address: return leg = await create_dca_payment( @@ -637,7 +661,7 @@ async def _attempt_autoforward( comment=f"satmachine autoforward — {machine.machine_npub[:12]}", ) paid = await pay_invoice( - wallet_id=client.wallet_id, + wallet_id=prefs.dca_wallet_id, payment_request=bolt11, description=f"satmachine autoforward → {address}", tag=_payment_tag(machine), diff --git a/migrations.py b/migrations.py index 2a69b7e..0d9bc7d 100644 --- a/migrations.py +++ b/migrations.py @@ -102,20 +102,17 @@ async def m001_satmachine_v2_initial(db): "ON dca_machines (wallet_id)" ) - # 4. dca_clients — LP registrations scoped per (machine, user). An LP - # can hold positions at many machines (and many operators) on the - # same LNbits instance. + # 4. dca_clients — per-(machine, LP) registrations. Pure machine + # enrolment record: no wallet, no mode, no autoforward — those are + # LP-controlled at the user level via dca_lp (see below). Operator + # just decides "this LP is enrolled at my machine"; everything + # delivery-related is the LP's own preference. await db.execute(f""" CREATE TABLE IF NOT EXISTS satoshimachine.dca_clients ( id TEXT PRIMARY KEY, machine_id TEXT NOT NULL, user_id TEXT NOT NULL, - wallet_id TEXT NOT NULL, username TEXT, - dca_mode TEXT NOT NULL DEFAULT 'flow', - fixed_mode_daily_limit DECIMAL(10,2), - autoforward_ln_address TEXT, - autoforward_enabled BOOLEAN NOT NULL DEFAULT false, status TEXT NOT NULL DEFAULT 'active', created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now} @@ -126,9 +123,35 @@ async def m001_satmachine_v2_initial(db): "ON dca_clients (machine_id, user_id)" ) await db.execute( - "CREATE INDEX IF NOT EXISTS dca_clients_user_idx " "ON dca_clients (user_id)" + "CREATE INDEX IF NOT EXISTS dca_clients_user_idx ON dca_clients (user_id)" ) + # 4a. dca_lp — LP-level (per-user) DCA preferences. ONE row per LNbits + # user that has onboarded as a Liquidity Provider, regardless of + # how many machines they're enrolled at. Owned by the LP (writes + # come from the satmachineclient extension under the LP's session), + # read by satmachineadmin during distribution to resolve "where do + # DCA payouts for this LP go?" + # + # Gating: satmachineadmin refuses to create deposits for an LP who + # doesn't have a dca_lp row yet. The LP must onboard via + # satmachineclient first (which auto-creates the row with their + # default LNbits wallet on first dashboard visit). Forces every + # LP through a "yes, I am here and this is where I want my sats" + # gesture before any fiat starts accumulating against them. + await db.execute(f""" + CREATE TABLE IF NOT EXISTS satoshimachine.dca_lp ( + user_id TEXT PRIMARY KEY, + dca_wallet_id TEXT NOT NULL, + default_dca_mode TEXT NOT NULL DEFAULT 'flow', + fixed_mode_daily_limit DECIMAL(10,2), + autoforward_ln_address TEXT, + autoforward_enabled BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, + updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now} + ); + """) + # 5. dca_deposits — fiat the operator (or super) records against an LP # at a machine. creator_user_id preserves audit trail. await db.execute(f""" @@ -336,3 +359,81 @@ async def m003_rename_settlements_net_sats_to_principal_sats(db): "ALTER TABLE satoshimachine.dca_settlements " "RENAME COLUMN net_sats TO principal_sats" ) + + +async def m004_introduce_dca_lp_table(db): + """Hoist LP-level state (wallet, mode, autoforward) out of dca_clients + into a per-user dca_lp table. dca_clients becomes a pure (machine, LP) + enrolment record; everything delivery-related becomes the LP's own + preference, owned and written by satmachineclient. + + Why: the per-row state on dca_clients was a denormalised duplicate of + user-level intent ("which wallet should my DCA land in?" + "should it + forward to my LN address?" — same answer regardless of which machine + paid). Today's update_lp_autoforward already does a multi-row UPDATE + to keep the rows in sync — a smell of state belonging one level up. + + Fresh installs from m001 onward land on the new schema directly. + Existing installs (pre-m004 test data) get migrated here: + 1. Create dca_lp table (no-op if already present from m001 path). + 2. Backfill dca_lp from existing dca_clients rows, picking the + most-recently-updated row per user_id when an LP is enrolled at + multiple machines. + 3. Drop the moved columns from dca_clients. + + Idempotent: probes for the legacy `dca_clients.wallet_id` column. If + absent the install already on the new shape; no-op. + """ + try: + await db.fetchone("SELECT wallet_id FROM satoshimachine.dca_clients LIMIT 1") + except Exception: + return + + # Step 1: create dca_lp if it doesn't exist yet. m001 on a fresh install + # already created it; on a pre-m004 install we're creating it here. + await db.execute(f""" + CREATE TABLE IF NOT EXISTS satoshimachine.dca_lp ( + user_id TEXT PRIMARY KEY, + dca_wallet_id TEXT NOT NULL, + default_dca_mode TEXT NOT NULL DEFAULT 'flow', + fixed_mode_daily_limit DECIMAL(10,2), + autoforward_ln_address TEXT, + autoforward_enabled BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now}, + updated_at TIMESTAMP NOT NULL DEFAULT {db.timestamp_now} + ); + """) + + # Step 2: backfill dca_lp from dca_clients. Pick the latest row per + # user (by updated_at, falling back to created_at) when the LP is + # enrolled at multiple machines — that row reflects their most + # recent intent. ROW_NUMBER() OVER (...) requires SQLite 3.25+ (2018). + await db.execute(""" + INSERT OR IGNORE INTO satoshimachine.dca_lp + (user_id, dca_wallet_id, default_dca_mode, fixed_mode_daily_limit, + autoforward_ln_address, autoforward_enabled, + created_at, updated_at) + SELECT user_id, wallet_id, dca_mode, fixed_mode_daily_limit, + autoforward_ln_address, autoforward_enabled, + created_at, updated_at + FROM ( + SELECT *, ROW_NUMBER() OVER ( + PARTITION BY user_id + ORDER BY updated_at DESC, created_at DESC + ) AS rn + FROM satoshimachine.dca_clients + ) ranked + WHERE rn = 1 + """) + + # Step 3: drop the moved columns from dca_clients. ALTER TABLE DROP + # COLUMN needs SQLite 3.35+ (2021). One column per ALTER (SQLite + # doesn't support multi-column DROP). + for col in ( + "wallet_id", + "dca_mode", + "fixed_mode_daily_limit", + "autoforward_ln_address", + "autoforward_enabled", + ): + await db.execute(f"ALTER TABLE satoshimachine.dca_clients DROP COLUMN {col}") diff --git a/models.py b/models.py index aeb07fb..f63b262 100644 --- a/models.py +++ b/models.py @@ -80,40 +80,70 @@ class UpdateMachineData(BaseModel): class CreateDcaClientData(BaseModel): + """Operator enrols an LP at one of their machines. + + Pure (machine, LP) tuple — no wallet, no mode, no autoforward. Those + live on the per-user `dca_lp` row, written by the LP themselves via + satmachineclient. An LP must have onboarded (have a `dca_lp` row) + before deposits can be recorded against this enrolment; enrolment + itself works either way. + """ + machine_id: str user_id: str - wallet_id: str username: Optional[str] = None - dca_mode: str = "flow" # 'flow' | 'fixed' - fixed_mode_daily_limit: Optional[float] = None - # Auto-forward DCA distributions to an external LN address (best-effort; - # sats stay in LNbits wallet on forward failure — see satmachineadmin#8). - autoforward_ln_address: Optional[str] = None - autoforward_enabled: bool = False class DcaClient(BaseModel): id: str machine_id: str user_id: str - wallet_id: str username: Optional[str] - dca_mode: str - fixed_mode_daily_limit: Optional[float] - autoforward_ln_address: Optional[str] - autoforward_enabled: bool status: str created_at: datetime updated_at: datetime class UpdateDcaClientData(BaseModel): + """Operator-side updates to an enrolment. The operator can only edit + fields that aren't LP-controlled (username display, status). Wallet + / mode / autoforward changes go through satmachineclient against + `dca_lp` instead.""" + username: Optional[str] = None - dca_mode: Optional[str] = None + status: Optional[str] = None + + +class DcaLpPreferences(BaseModel): + """Per-user DCA preferences, owned by the LP. + + Created on first satmachineclient dashboard access (the extension + auto-seeds `dca_wallet_id` with the LP's first/default LNbits wallet + — they can change it from the dashboard). All distribution decisions + (where do the sats go, do we forward to an LN address, what's the + default mode) read from here, joined onto `dca_clients` by user_id. + """ + + user_id: str + dca_wallet_id: str + default_dca_mode: str # 'flow' | 'fixed' + fixed_mode_daily_limit: Optional[float] + autoforward_ln_address: Optional[str] + autoforward_enabled: bool + created_at: datetime + updated_at: datetime + + +class UpsertDcaLpData(BaseModel): + """satmachineclient writes this on first onboarding / when the LP + edits their preferences. All fields optional on update — pass only + the ones being changed.""" + + dca_wallet_id: Optional[str] = None + default_dca_mode: Optional[str] = None fixed_mode_daily_limit: Optional[float] = None autoforward_ln_address: Optional[str] = None autoforward_enabled: Optional[bool] = None - status: Optional[str] = None class ClientBalanceSummary(BaseModel): diff --git a/views_api.py b/views_api.py index 7cdd4db..8b6db83 100644 --- a/views_api.py +++ b/views_api.py @@ -39,6 +39,7 @@ from .crud import ( get_settlements_for_operator, get_stuck_settlements_for_operator, get_super_config, + lp_is_onboarded, replace_commission_splits, reset_settlement_for_retry, update_dca_client, @@ -311,6 +312,18 @@ async def api_create_deposit( HTTPStatus.BAD_REQUEST, "client_id and machine_id refer to different machines", ) + # Gate: refuse deposits for an LP who hasn't onboarded via + # satmachineclient. Without a dca_lp row we don't know where to + # send their DCA distributions, so accepting fiat against them + # would just queue up sats with nowhere to go. Forces the LP to + # actively register before any economic activity accrues. + if not await lp_is_onboarded(client.user_id): + raise HTTPException( + HTTPStatus.UNPROCESSABLE_ENTITY, + "LP has not onboarded yet — they must register via " + "satmachineclient and select a DCA wallet before deposits " + "can be recorded against them.", + ) return await create_deposit(user.id, data)