Merge pull request #882 from shocknet/watchdog-root-ops

account for root ops and change
This commit is contained in:
Justin (shocknet) 2026-02-19 13:54:26 -05:00 committed by GitHub
commit 791a5756a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 108 additions and 17 deletions

View file

@ -1,3 +1,14 @@
/**
* TypeORM DataSource used only by the TypeORM CLI (e.g. migration:generate).
*
* Migrations at runtime are run from src/services/storage/migrations/runner.ts (allMigrations),
* not from this file. The app never uses this DataSource to run migrations.
*
* Workflow: update the migrations array in this file *before* running
* migration:generate, so TypeORM knows the current schema (entities + existing migrations).
* We do not update this file immediately after adding a new migration; update it when you
* are about to generate the next migration.
*/
import { DataSource } from "typeorm" import { DataSource } from "typeorm"
import { BalanceEvent } from "./build/src/services/storage/entity/BalanceEvent.js" import { BalanceEvent } from "./build/src/services/storage/entity/BalanceEvent.js"
import { ChannelBalanceEvent } from "./build/src/services/storage/entity/ChannelsBalanceEvent.js" import { ChannelBalanceEvent } from "./build/src/services/storage/entity/ChannelsBalanceEvent.js"
@ -8,12 +19,16 @@ import { LndMetrics1703170330183 } from './build/src/services/storage/migrations
import { ChannelRouting1709316653538 } from './build/src/services/storage/migrations/1709316653538-channel_routing.js' import { ChannelRouting1709316653538 } from './build/src/services/storage/migrations/1709316653538-channel_routing.js'
import { HtlcCount1724266887195 } from './build/src/services/storage/migrations/1724266887195-htlc_count.js' import { HtlcCount1724266887195 } from './build/src/services/storage/migrations/1724266887195-htlc_count.js'
import { BalanceEvents1724860966825 } from './build/src/services/storage/migrations/1724860966825-balance_events.js' import { BalanceEvents1724860966825 } from './build/src/services/storage/migrations/1724860966825-balance_events.js'
import { RootOps1732566440447 } from './build/src/services/storage/migrations/1732566440447-root_ops.js'
import { RootOpsTime1745428134124 } from './build/src/services/storage/migrations/1745428134124-root_ops_time.js'
import { ChannelEvents1750777346411 } from './build/src/services/storage/migrations/1750777346411-channel_events.js'
import { RootOpPending1771524665409 } from './build/src/services/storage/migrations/1771524665409-root_op_pending.js'
export default new DataSource({ export default new DataSource({
type: "better-sqlite3", type: "better-sqlite3",
database: "metrics.sqlite", database: "metrics.sqlite",
entities: [BalanceEvent, ChannelBalanceEvent, ChannelRouting, RootOperation, ChannelEvent], entities: [BalanceEvent, ChannelBalanceEvent, ChannelRouting, RootOperation, ChannelEvent],
migrations: [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825] migrations: [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825,
RootOps1732566440447, RootOpsTime1745428134124, ChannelEvents1750777346411, RootOpPending1771524665409]
}); });
//npx typeorm migration:generate ./src/services/storage/migrations/channel_events -d ./metricsDatasource.js //npx typeorm migration:generate ./src/services/storage/migrations/root_op_pending -d ./metricsDatasource.js

View file

@ -276,7 +276,7 @@ export class AdminManager {
this.swaps.PayInvoiceSwap("admin", req.swap_operation_id, req.sat_per_v_byte, async (addr, amt) => { this.swaps.PayInvoiceSwap("admin", req.swap_operation_id, req.sat_per_v_byte, async (addr, amt) => {
const tx = await this.lnd.PayAddress(addr, amt, req.sat_per_v_byte, "", { useProvider: false, from: 'system' }) const tx = await this.lnd.PayAddress(addr, amt, req.sat_per_v_byte, "", { useProvider: false, from: 'system' })
this.log("paid admin invoice swap", { swapOpId: req.swap_operation_id, txId: tx.txid }) this.log("paid admin invoice swap", { swapOpId: req.swap_operation_id, txId: tx.txid })
await this.storage.metricsStorage.AddRootOperation("chain_payment", tx.txid, amt) await this.storage.metricsStorage.AddRootOperation("chain_payment", tx.txid, amt, true)
// Fetch the full transaction hex for potential refunds // Fetch the full transaction hex for potential refunds
let lockupTxHex: string | undefined let lockupTxHex: string | undefined

View file

@ -29,6 +29,7 @@ export class Watchdog {
ready = false ready = false
interval: NodeJS.Timer; interval: NodeJS.Timer;
lndPubKey: string; lndPubKey: string;
lastHandlerRootOpsAtUnix = 0
constructor(settings: SettingsManager, liquidityManager: LiquidityManager, lnd: LND, storage: Storage, utils: Utils, rugPullTracker: RugPullTracker) { constructor(settings: SettingsManager, liquidityManager: LiquidityManager, lnd: LND, storage: Storage, utils: Utils, rugPullTracker: RugPullTracker) {
this.lnd = lnd; this.lnd = lnd;
this.settings = settings; this.settings = settings;
@ -67,7 +68,7 @@ export class Watchdog {
await this.getTracker() await this.getTracker()
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
this.utils.stateBundler.AddBalancePoint('usersBalance', totalUsersBalance) this.utils.stateBundler.AddBalancePoint('usersBalance', totalUsersBalance)
const { totalExternal, otherExternal } = await this.getAggregatedExternalBalance() const { totalExternal } = await this.getAggregatedExternalBalance()
this.initialLndBalance = totalExternal this.initialLndBalance = totalExternal
this.initialUsersBalance = totalUsersBalance this.initialUsersBalance = totalUsersBalance
const fwEvents = await this.lnd.GetForwardingHistory(0, this.startedAtUnix) const fwEvents = await this.lnd.GetForwardingHistory(0, this.startedAtUnix)
@ -76,8 +77,6 @@ export class Watchdog {
const paymentFound = await this.storage.paymentStorage.GetMaxPaymentIndex() const paymentFound = await this.storage.paymentStorage.GetMaxPaymentIndex()
const knownMaxIndex = paymentFound.length > 0 ? Math.max(paymentFound[0].paymentIndex, 0) : 0 const knownMaxIndex = paymentFound.length > 0 ? Math.max(paymentFound[0].paymentIndex, 0) : 0
this.latestPaymentIndexOffset = await this.lnd.GetLatestPaymentIndex(knownMaxIndex) this.latestPaymentIndexOffset = await this.lnd.GetLatestPaymentIndex(knownMaxIndex)
const other = { ilnd: this.initialLndBalance, hf: this.accumulatedHtlcFees, iu: this.initialUsersBalance, tu: totalUsersBalance, oext: otherExternal }
//getLogger({ component: 'watchdog_debug2' })(JSON.stringify({ deltaLnd: 0, deltaUsers: 0, totalExternal, latestIndex: this.latestPaymentIndexOffset, other }))
this.interval = setInterval(() => { this.interval = setInterval(() => {
if (this.latestCheckStart + (1000 * 58) < Date.now()) { if (this.latestCheckStart + (1000 * 58) < Date.now()) {
this.PaymentRequested() this.PaymentRequested()
@ -93,7 +92,49 @@ export class Watchdog {
fwEvents.forwardingEvents.forEach((event) => { fwEvents.forwardingEvents.forEach((event) => {
this.accumulatedHtlcFees += Number(event.fee) this.accumulatedHtlcFees += Number(event.fee)
}) })
}
handleRootOperations = async () => {
let pendingChange = 0
const pendingChainPayments = await this.storage.metricsStorage.GetPendingChainPayments()
for (const payment of pendingChainPayments) {
try {
const tx = await this.lnd.GetTx(payment.operation_identifier)
if (tx.numConfirmations > 0) {
await this.storage.metricsStorage.SetRootOpConfirmed(payment.serial_id)
continue
}
tx.outputDetails.forEach(o => pendingChange += o.isOurAddress ? Number(o.amount) : 0)
} catch (err: any) {
this.log("Error getting tx for root operation", err.message || err)
}
}
let newReceived = 0
let newSpent = 0
if (this.lastHandlerRootOpsAtUnix === 0) {
this.lastHandlerRootOpsAtUnix = Math.floor(Date.now() / 1000)
return { newReceived, newSpent, pendingChange }
}
const newOps = await this.storage.metricsStorage.GetRootOperations({ from: this.lastHandlerRootOpsAtUnix })
newOps.forEach(o => {
switch (o.operation_type) {
case 'chain_payment':
newSpent += Number(o.operation_amount)
break
case 'invoice_payment':
newSpent += Number(o.operation_amount)
break
case 'chain':
newReceived += Number(o.operation_amount)
break
case 'invoice':
newReceived += Number(o.operation_amount)
break
}
})
return { newReceived, newSpent, pendingChange }
} }
getAggregatedExternalBalance = async () => { getAggregatedExternalBalance = async () => {
@ -101,8 +142,9 @@ export class Watchdog {
const feesPaidForLiquidity = this.liquidityManager.GetPaidFees() const feesPaidForLiquidity = this.liquidityManager.GetPaidFees()
const pb = await this.rugPullTracker.CheckProviderBalance() const pb = await this.rugPullTracker.CheckProviderBalance()
const providerBalance = pb.prevBalance || pb.balance const providerBalance = pb.prevBalance || pb.balance
const otherExternal = { pb: providerBalance, f: feesPaidForLiquidity, lnd: totalLndBalance, olnd: othersFromLnd } const { newReceived, newSpent, pendingChange } = await this.handleRootOperations()
return { totalExternal: totalLndBalance + providerBalance + feesPaidForLiquidity, otherExternal } const opsTotal = newReceived + pendingChange - newSpent
return { totalExternal: totalLndBalance + providerBalance + feesPaidForLiquidity + opsTotal }
} }
checkBalanceUpdate = async (deltaLnd: number, deltaUsers: number) => { checkBalanceUpdate = async (deltaLnd: number, deltaUsers: number) => {
@ -187,7 +229,7 @@ export class Watchdog {
} }
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
this.utils.stateBundler.AddBalancePoint('usersBalance', totalUsersBalance) this.utils.stateBundler.AddBalancePoint('usersBalance', totalUsersBalance)
const { totalExternal, otherExternal } = await this.getAggregatedExternalBalance() const { totalExternal } = await this.getAggregatedExternalBalance()
this.utils.stateBundler.AddBalancePoint('accumulatedHtlcFees', this.accumulatedHtlcFees) this.utils.stateBundler.AddBalancePoint('accumulatedHtlcFees', this.accumulatedHtlcFees)
const deltaLnd = totalExternal - (this.initialLndBalance + this.accumulatedHtlcFees) const deltaLnd = totalExternal - (this.initialLndBalance + this.accumulatedHtlcFees)
const deltaUsers = totalUsersBalance - this.initialUsersBalance const deltaUsers = totalUsersBalance - this.initialUsersBalance
@ -196,8 +238,6 @@ export class Watchdog {
const knownMaxIndex = Math.max(maxFromDb, this.latestPaymentIndexOffset) const knownMaxIndex = Math.max(maxFromDb, this.latestPaymentIndexOffset)
const newLatest = await this.lnd.GetLatestPaymentIndex(knownMaxIndex) const newLatest = await this.lnd.GetLatestPaymentIndex(knownMaxIndex)
const historyMismatch = newLatest > knownMaxIndex const historyMismatch = newLatest > knownMaxIndex
const other = { ilnd: this.initialLndBalance, hf: this.accumulatedHtlcFees, iu: this.initialUsersBalance, tu: totalUsersBalance, km: knownMaxIndex, nl: newLatest, oext: otherExternal }
//getLogger({ component: 'watchdog_debug2' })(JSON.stringify({ deltaLnd, deltaUsers, totalExternal, other }))
const deny = await this.checkBalanceUpdate(deltaLnd, deltaUsers) const deny = await this.checkBalanceUpdate(deltaLnd, deltaUsers)
if (historyMismatch) { if (historyMismatch) {
getLogger({ component: 'bark' })("History mismatch detected in absolute update, locking outgoing operations") getLogger({ component: 'bark' })("History mismatch detected in absolute update, locking outgoing operations")

View file

@ -17,6 +17,9 @@ export class RootOperation {
@Column({ default: 0 }) @Column({ default: 0 })
at_unix: number at_unix: number
@Column({ default: false })
pending: boolean
@CreateDateColumn() @CreateDateColumn()
created_at: Date created_at: Date

View file

@ -10,6 +10,7 @@ import { StorageInterface } from "./db/storageInterface.js";
import { Utils } from "../helpers/utilsWrapper.js"; import { Utils } from "../helpers/utilsWrapper.js";
import { Channel, ChannelEventUpdate } from "../../../proto/lnd/lightning.js"; import { Channel, ChannelEventUpdate } from "../../../proto/lnd/lightning.js";
import { ChannelEvent } from "./entity/ChannelEvent.js"; import { ChannelEvent } from "./entity/ChannelEvent.js";
export type RootOperationType = 'chain' | 'invoice' | 'chain_payment' | 'invoice_payment'
export default class { export default class {
//DB: DataSource | EntityManager //DB: DataSource | EntityManager
settings: StorageSettings settings: StorageSettings
@ -145,14 +146,25 @@ export default class {
} }
} }
async AddRootOperation(opType: string, id: string, amount: number, txId?: string) { async AddRootOperation(opType: RootOperationType, id: string, amount: number, pending = false, dbTxId?: string) {
return this.dbs.CreateAndSave<RootOperation>('RootOperation', { operation_type: opType, operation_amount: amount, operation_identifier: id, at_unix: Math.floor(Date.now() / 1000) }, txId) return this.dbs.CreateAndSave<RootOperation>('RootOperation', {
operation_type: opType, operation_amount: amount,
operation_identifier: id, at_unix: Math.floor(Date.now() / 1000), pending
}, dbTxId)
} }
async GetRootOperation(opType: string, id: string, txId?: string) { async GetRootOperation(opType: RootOperationType, id: string, txId?: string) {
return this.dbs.FindOne<RootOperation>('RootOperation', { where: { operation_type: opType, operation_identifier: id } }, txId) return this.dbs.FindOne<RootOperation>('RootOperation', { where: { operation_type: opType, operation_identifier: id } }, txId)
} }
async GetPendingChainPayments() {
return this.dbs.Find<RootOperation>('RootOperation', { where: { operation_type: 'chain_payment', pending: true } })
}
async SetRootOpConfirmed(serialId: number) {
return this.dbs.Update<RootOperation>('RootOperation', serialId, { pending: false })
}
async GetRootOperations({ from, to }: { from?: number, to?: number }, txId?: string) { async GetRootOperations({ from, to }: { from?: number, to?: number }, txId?: string) {
const q = getTimeQuery({ from, to }) const q = getTimeQuery({ from, to })
return this.dbs.Find<RootOperation>('RootOperation', q, txId) return this.dbs.Find<RootOperation>('RootOperation', q, txId)

View file

@ -0,0 +1,20 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class RootOpPending1771524665409 implements MigrationInterface {
name = 'RootOpPending1771524665409'
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "temporary_root_operation" ("serial_id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "operation_type" varchar NOT NULL, "operation_amount" integer NOT NULL, "operation_identifier" varchar NOT NULL, "created_at" datetime NOT NULL DEFAULT (datetime('now')), "updated_at" datetime NOT NULL DEFAULT (datetime('now')), "at_unix" integer NOT NULL DEFAULT (0), "pending" boolean NOT NULL DEFAULT (0))`);
await queryRunner.query(`INSERT INTO "temporary_root_operation"("serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix") SELECT "serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix" FROM "root_operation"`);
await queryRunner.query(`DROP TABLE "root_operation"`);
await queryRunner.query(`ALTER TABLE "temporary_root_operation" RENAME TO "root_operation"`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "root_operation" RENAME TO "temporary_root_operation"`);
await queryRunner.query(`CREATE TABLE "root_operation" ("serial_id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "operation_type" varchar NOT NULL, "operation_amount" integer NOT NULL, "operation_identifier" varchar NOT NULL, "created_at" datetime NOT NULL DEFAULT (datetime('now')), "updated_at" datetime NOT NULL DEFAULT (datetime('now')), "at_unix" integer NOT NULL DEFAULT (0))`);
await queryRunner.query(`INSERT INTO "root_operation"("serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix") SELECT "serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix" FROM "temporary_root_operation"`);
await queryRunner.query(`DROP TABLE "temporary_root_operation"`);
}
}

View file

@ -35,7 +35,7 @@ import { SwapsServiceUrl1768413055036 } from './1768413055036-swaps_service_url.
import { InvoiceSwaps1769529793283 } from './1769529793283-invoice_swaps.js' import { InvoiceSwaps1769529793283 } from './1769529793283-invoice_swaps.js'
import { InvoiceSwapsFixes1769805357459 } from './1769805357459-invoice_swaps_fixes.js' import { InvoiceSwapsFixes1769805357459 } from './1769805357459-invoice_swaps_fixes.js'
import { SwapTimestamps1771347307798 } from './1771347307798-swap_timestamps.js' import { SwapTimestamps1771347307798 } from './1771347307798-swap_timestamps.js'
import { RootOpPending1771524665409 } from './1771524665409-root_op_pending.js'
export const allMigrations = [Initial1703170309875, LspOrder1718387847693, LiquidityProvider1719335699480, LndNodeInfo1720187506189, export const allMigrations = [Initial1703170309875, LspOrder1718387847693, LiquidityProvider1719335699480, LndNodeInfo1720187506189,
TrackedProvider1720814323679, CreateInviteTokenTable1721751414878, PaymentIndex1721760297610, DebitAccess1726496225078, DebitAccessFixes1726685229264, TrackedProvider1720814323679, CreateInviteTokenTable1721751414878, PaymentIndex1721760297610, DebitAccess1726496225078, DebitAccessFixes1726685229264,
DebitToPub1727105758354, UserCbUrl1727112281043, UserOffer1733502626042, ManagementGrant1751307732346, ManagementGrantBanned1751989251513, DebitToPub1727105758354, UserCbUrl1727112281043, UserOffer1733502626042, ManagementGrant1751307732346, ManagementGrantBanned1751989251513,
@ -44,7 +44,8 @@ export const allMigrations = [Initial1703170309875, LspOrder1718387847693, Liqui
TxSwapAddress1764779178945, ClinkRequester1765497600000, TrackedProviderHeight1766504040000, SwapsServiceUrl1768413055036, TxSwapAddress1764779178945, ClinkRequester1765497600000, TrackedProviderHeight1766504040000, SwapsServiceUrl1768413055036,
InvoiceSwaps1769529793283, InvoiceSwapsFixes1769805357459, SwapTimestamps1771347307798] InvoiceSwaps1769529793283, InvoiceSwapsFixes1769805357459, SwapTimestamps1771347307798]
export const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, RootOps1732566440447, RootOpsTime1745428134124, ChannelEvents1750777346411] export const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825,
RootOps1732566440447, RootOpsTime1745428134124, ChannelEvents1750777346411, RootOpPending1771524665409]
/* export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise<boolean> => { /* export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise<boolean> => {
await connectAndMigrate(log, storageManager, allMigrations, allMetricsMigrations) await connectAndMigrate(log, storageManager, allMigrations, allMetricsMigrations)
return false return false