diff --git a/crud.py b/crud.py index 51b07a4..fc87070 100644 --- a/crud.py +++ b/crud.py @@ -12,6 +12,7 @@ from lnbits.db import Database from lnbits.helpers import urlsafe_short_hash from .models import ( + CassetteConfig, ClientBalanceSummary, CommissionSplit, CommissionSplitLeg, @@ -26,6 +27,7 @@ from .models import ( DcaPayment, DcaSettlement, Machine, + PublishCassettesPayload, SuperConfig, TelemetrySnapshot, UpdateDcaClientData, @@ -33,6 +35,7 @@ from .models import ( UpdateDepositStatusData, UpdateMachineData, UpdateSuperConfigData, + UpsertCassetteConfigData, UpsertDcaLpData, ) @@ -1334,3 +1337,152 @@ async def upsert_fleet_snapshot( {"mid": machine_id, "json": telemetry_json, "now": now}, ) return await get_telemetry(machine_id) + + +# ============================================================================= +# Cassette configs — operator-driven ATM cassette inventory (#29). +# ============================================================================= +# Row lifecycle per #29: +# - First population for a (machine_id, denomination) pair → apply_bootstrap_state +# (consumer reading the ATM's one-shot bitspire-cassettes-state event) +# - Operator edit of count or position → update_cassette_config (refuses to +# create new rows; the denomination set is hardware-determined) +# - Row creation/deletion for a new denomination → admin only, via ATM +# re-provisioning + new bootstrap event (not exposed in v1 here) + + +def _should_apply_bootstrap_state( + existing_state_event_id: Optional[str], incoming_event_id: str +) -> bool: + """Pure-function dedup gate for apply_bootstrap_state. + + Returns False if any existing row for this machine already references + the incoming event_id (relay re-delivery after restart). True otherwise. + + Extracted as a pure function so the dedup decision is unit-testable + without a database round-trip. The actual idempotency check in + apply_bootstrap_state fetches one existing row and passes its + state_event_id here. + """ + return existing_state_event_id != incoming_event_id + + +async def get_cassette_config( + machine_id: str, denomination: int +) -> Optional[CassetteConfig]: + return await db.fetchone( + "SELECT * FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid AND denomination = :denom", + {"mid": machine_id, "denom": denomination}, + CassetteConfig, + ) + + +async def list_cassette_configs_for_machine( + machine_id: str, +) -> List[CassetteConfig]: + return await db.fetchall( + "SELECT * FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid ORDER BY position, denomination", + {"mid": machine_id}, + CassetteConfig, + ) + + +async def update_cassette_config( + machine_id: str, + denomination: int, + data: UpsertCassetteConfigData, + *, + updated_by: Optional[str] = None, +) -> Optional[CassetteConfig]: + """Operator-driven row update: change count and/or position for a single + cassette. Refuses to create new rows — those only land via + apply_bootstrap_state() consuming an ATM bootstrap event (per #29 row + lifecycle: hardware-determined denomination set, not operator-creatable). + Returns None if the (machine_id, denomination) row doesn't exist. + """ + existing = await get_cassette_config(machine_id, denomination) + if existing is None: + return None + update_data: dict = {k: v for k, v in data.dict().items() if v is not None} + if not update_data: + return existing + update_data["updated_at"] = datetime.now() + update_data["updated_by"] = updated_by + set_clause = ", ".join(f"{k} = :{k}" for k in update_data) + update_data["mid"] = machine_id + update_data["denom"] = denomination + await db.execute( + f"UPDATE satoshimachine.cassette_configs SET {set_clause} " + "WHERE machine_id = :mid AND denomination = :denom", + update_data, + ) + return await get_cassette_config(machine_id, denomination) + + +async def apply_bootstrap_state( + machine_id: str, + event_id: str, + event_created_at: datetime, + payload: PublishCassettesPayload, +) -> bool: + """Consume an ATM-published kind-30078 bitspire-cassettes-state: event + and upsert one cassette_configs row per denomination in the payload. + + Returns True if the upsert ran; False if any existing row for this + machine already references this event_id (idempotent on relay + re-delivery / restart). + + Populates both the operator-believed columns (count, position, + updated_at, updated_by='atm-bootstrap') AND the v2 reverse-channel + columns (state_count, state_at, state_event_id) so the operator's + initial view matches the ATM's reported state. v2 reconciliation UI + will diverge them when continuous reverse-channel events land. + """ + existing_first = await db.fetchone( + "SELECT state_event_id FROM satoshimachine.cassette_configs " + "WHERE machine_id = :mid LIMIT 1", + {"mid": machine_id}, + ) + existing_event_id: Optional[str] = None + if existing_first is not None: + existing_event_id = ( + existing_first.get("state_event_id") + if isinstance(existing_first, dict) + else getattr(existing_first, "state_event_id", None) + ) + if not _should_apply_bootstrap_state(existing_event_id, event_id): + return False + + now = datetime.now() + for denom, row in payload.denominations.items(): + await db.execute( + """ + INSERT INTO satoshimachine.cassette_configs + (machine_id, denomination, count, position, updated_at, + updated_by, state_count, state_at, state_event_id) + VALUES (:mid, :denom, :count, :pos, :now, :by, + :state_count, :state_at, :event_id) + ON CONFLICT (machine_id, denomination) DO UPDATE SET + count = excluded.count, + position = excluded.position, + updated_at = excluded.updated_at, + updated_by = excluded.updated_by, + state_count = excluded.state_count, + state_at = excluded.state_at, + state_event_id = excluded.state_event_id + """, + { + "mid": machine_id, + "denom": denom, + "count": row.count, + "pos": row.position, + "now": now, + "by": "atm-bootstrap", + "state_count": row.count, + "state_at": event_created_at, + "event_id": event_id, + }, + ) + return True diff --git a/tests/test_cassette_configs.py b/tests/test_cassette_configs.py new file mode 100644 index 0000000..f9d9a4a --- /dev/null +++ b/tests/test_cassette_configs.py @@ -0,0 +1,220 @@ +""" +Tests for the v1 cassette-config layer (aiolabs/satmachineadmin#29). + +Covers the pure pieces that don't need a live DB: + - Pydantic validator behaviour on PublishCassettesPayload + the row / + upsert models (denomination key coercion, integer ranges, no-duplicate- + positions, wire-format round-trip) + - _should_apply_bootstrap_state dedup helper (extracted from + apply_bootstrap_state so the relay-re-delivery decision is testable + without a database round-trip) + +DB-touching tests (apply_bootstrap_state actually upserting, list-by- +machine ordering, etc.) follow the project convention from +test_deposit_currency.py: "Layer 2 is an endpoint-level behaviour better +covered by an integration test against a running LNbits; tracked in #26 +as a follow-up." Smoke-tested manually via the dev container. +""" + +import pytest + +from ..crud import _should_apply_bootstrap_state +from ..models import ( + CassettePayloadRow, + PublishCassettesPayload, + UpsertCassetteConfigData, +) + + +# ============================================================================= +# PublishCassettesPayload — wire-shape validators +# ============================================================================= + + +class TestPublishCassettesPayload: + """The kind-30078 content payload, bidirectional (operator→ATM and + ATM→operator share the shape). String JSON keys must coerce to int; + duplicate positions must reject; per-row int constraints enforced.""" + + def test_happy_path_coerces_string_keys_to_int(self): + p = PublishCassettesPayload( + denominations={ + "20": {"position": 1, "count": 49}, + "50": {"position": 2, "count": 100}, + } + ) + assert set(p.denominations.keys()) == {20, 50} + assert p.denominations[20].position == 1 + assert p.denominations[20].count == 49 + assert p.denominations[50].count == 100 + + def test_wire_dict_round_trip_restringifies_keys(self): + """to_wire_dict() must restringify denomination keys so the + resulting JSON is parseable by clients (including the ATM-side + nostr-tools NIP-44 v2 consumer per the byte-compat cross-test).""" + original = PublishCassettesPayload( + denominations={ + "20": {"position": 1, "count": 49}, + "50": {"position": 2, "count": 100}, + } + ) + wire = original.to_wire_dict() + assert wire == { + "denominations": { + "20": {"position": 1, "count": 49}, + "50": {"position": 2, "count": 100}, + } + } + # And the wire form round-trips back through the parser cleanly. + reparsed = PublishCassettesPayload(**wire) + assert reparsed.denominations == original.denominations + + def test_rejects_non_int_key(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={"abc": {"position": 1, "count": 1}} + ) + assert "is not an int" in str(exc.value) + + def test_rejects_non_positive_denomination(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={"0": {"position": 1, "count": 1}} + ) + assert "denomination must be > 0" in str(exc.value) + + def test_rejects_negative_denomination(self): + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={"-20": {"position": 1, "count": 1}} + ) + assert "denomination must be > 0" in str(exc.value) + + def test_rejects_duplicate_position(self): + """Two cassettes can't occupy the same physical slot. The schema + PK is (machine_id, denomination), so duplicates land via the + payload; reject at the validator layer before the publish path + builds an event the ATM will misinterpret.""" + with pytest.raises(ValueError) as exc: + PublishCassettesPayload( + denominations={ + "20": {"position": 1, "count": 49}, + "50": {"position": 1, "count": 100}, + } + ) + assert "duplicate position" in str(exc.value) + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + PublishCassettesPayload( + denominations={"20": {"position": 1, "count": -1}} + ) + + def test_rejects_zero_position(self): + with pytest.raises(ValueError): + PublishCassettesPayload( + denominations={"20": {"position": 0, "count": 1}} + ) + + def test_allows_zero_count(self): + """An empty cassette is a legal state — operator must be able to + record `count=0` after a dispatcher pulled the cassette mid-day.""" + p = PublishCassettesPayload( + denominations={"20": {"position": 1, "count": 0}} + ) + assert p.denominations[20].count == 0 + + +# ============================================================================= +# CassettePayloadRow — per-row int constraints (single-row tests) +# ============================================================================= + + +class TestCassettePayloadRow: + def test_happy_path(self): + row = CassettePayloadRow(position=1, count=49) + assert row.position == 1 + assert row.count == 49 + + @pytest.mark.parametrize("bad_position", [0, -1, -100]) + def test_rejects_non_positive_position(self, bad_position): + with pytest.raises(ValueError): + CassettePayloadRow(position=bad_position, count=1) + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + CassettePayloadRow(position=1, count=-1) + + +# ============================================================================= +# UpsertCassetteConfigData — operator-edit form +# ============================================================================= + + +class TestUpsertCassetteConfigData: + """Operator-driven row edit. Both fields optional; same int constraints + as the wire-format row but applied independently per-edit.""" + + def test_partial_update_count_only(self): + d = UpsertCassetteConfigData(count=80) + assert d.count == 80 + assert d.position is None + + def test_partial_update_position_only(self): + d = UpsertCassetteConfigData(position=3) + assert d.position == 3 + assert d.count is None + + def test_empty_update_is_legal(self): + """An empty UpsertCassetteConfigData parses fine; the CRUD short- + circuits a no-op on empty payload (no SQL emitted).""" + d = UpsertCassetteConfigData() + assert d.count is None + assert d.position is None + + def test_rejects_negative_count(self): + with pytest.raises(ValueError): + UpsertCassetteConfigData(count=-1) + + def test_rejects_non_positive_position(self): + with pytest.raises(ValueError): + UpsertCassetteConfigData(position=0) + + +# ============================================================================= +# _should_apply_bootstrap_state — relay re-delivery dedup +# ============================================================================= + + +class TestShouldApplyBootstrapState: + """Pure-function dedup gate extracted from apply_bootstrap_state so the + decision is testable without a DB. Logic: apply if-and-only-if the + existing row's state_event_id differs from the incoming event_id. + + In v1 the ATM publishes the bootstrap event exactly once per machine, + so this is sufficient for replay protection. v2 will need a + `last_state_created_at` watermark in addition (per bitspire's + `meta.lastKnownConfigCreatedAt` on the ATM side) — flagged in #29's + v2 forward-look section. + """ + + def test_applies_when_no_existing_row(self): + assert _should_apply_bootstrap_state(None, "new-event-id") is True + + def test_applies_when_existing_event_id_differs(self): + assert ( + _should_apply_bootstrap_state("old-event-id", "new-event-id") is True + ) + + def test_skips_when_existing_event_id_matches(self): + """The same bootstrap event re-delivered after a relay reconnect + or satmachineadmin restart should no-op, not re-upsert the same + rows (which would clobber any operator edits since).""" + assert ( + _should_apply_bootstrap_state("same-event", "same-event") is False + ) + + def test_applies_when_existing_is_empty_string_and_incoming_is_id(self): + """Defensive — a sentinel empty-string existing_state_event_id + shouldn't block a real incoming event from applying.""" + assert _should_apply_bootstrap_state("", "real-event-id") is True