diff --git a/metricsDatasource.js b/metricsDatasource.js index 226b9dac..c1c3f493 100644 --- a/metricsDatasource.js +++ b/metricsDatasource.js @@ -8,12 +8,16 @@ import { LndMetrics1703170330183 } from './build/src/services/storage/migrations import { ChannelRouting1709316653538 } from './build/src/services/storage/migrations/1709316653538-channel_routing.js' import { HtlcCount1724266887195 } from './build/src/services/storage/migrations/1724266887195-htlc_count.js' import { BalanceEvents1724860966825 } from './build/src/services/storage/migrations/1724860966825-balance_events.js' +import { RootOps1732566440447 } from './build/src/services/storage/migrations/1732566440447-root_ops.js' +import { RootOpsTime1745428134124 } from './build/src/services/storage/migrations/1745428134124-root_ops_time.js' +import { ChannelEvents1750777346411 } from './build/src/services/storage/migrations/1750777346411-channel_events.js' export default new DataSource({ type: "better-sqlite3", database: "metrics.sqlite", entities: [BalanceEvent, ChannelBalanceEvent, ChannelRouting, RootOperation, ChannelEvent], - migrations: [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825] + migrations: [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, + RootOps1732566440447, RootOpsTime1745428134124, ChannelEvents1750777346411] }); -//npx typeorm migration:generate ./src/services/storage/migrations/channel_events -d ./metricsDatasource.js \ No newline at end of file +//npx typeorm migration:generate ./src/services/storage/migrations/root_op_pending -d ./metricsDatasource.js \ No newline at end of file diff --git a/src/services/main/adminManager.ts b/src/services/main/adminManager.ts index 4449e9ca..e8147aa6 100644 --- a/src/services/main/adminManager.ts +++ b/src/services/main/adminManager.ts @@ -276,7 +276,7 @@ export class AdminManager { this.swaps.PayInvoiceSwap("admin", req.swap_operation_id, req.sat_per_v_byte, async (addr, amt) => { const tx = await this.lnd.PayAddress(addr, amt, req.sat_per_v_byte, "", { useProvider: false, from: 'system' }) this.log("paid admin invoice swap", { swapOpId: req.swap_operation_id, txId: tx.txid }) - await this.storage.metricsStorage.AddRootOperation("chain_payment", tx.txid, amt) + await this.storage.metricsStorage.AddRootOperation("chain_payment", tx.txid, amt, true) // Fetch the full transaction hex for potential refunds let lockupTxHex: string | undefined diff --git a/src/services/main/watchdog.ts b/src/services/main/watchdog.ts index 778e09ce..b02b9678 100644 --- a/src/services/main/watchdog.ts +++ b/src/services/main/watchdog.ts @@ -29,6 +29,7 @@ export class Watchdog { ready = false interval: NodeJS.Timer; lndPubKey: string; + lastHandlerRootOpsAtUnix = 0 constructor(settings: SettingsManager, liquidityManager: LiquidityManager, lnd: LND, storage: Storage, utils: Utils, rugPullTracker: RugPullTracker) { this.lnd = lnd; this.settings = settings; @@ -67,7 +68,7 @@ export class Watchdog { await this.getTracker() const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() this.utils.stateBundler.AddBalancePoint('usersBalance', totalUsersBalance) - const { totalExternal, otherExternal } = await this.getAggregatedExternalBalance() + const { totalExternal } = await this.getAggregatedExternalBalance() this.initialLndBalance = totalExternal this.initialUsersBalance = totalUsersBalance const fwEvents = await this.lnd.GetForwardingHistory(0, this.startedAtUnix) @@ -76,8 +77,6 @@ export class Watchdog { const paymentFound = await this.storage.paymentStorage.GetMaxPaymentIndex() const knownMaxIndex = paymentFound.length > 0 ? Math.max(paymentFound[0].paymentIndex, 0) : 0 this.latestPaymentIndexOffset = await this.lnd.GetLatestPaymentIndex(knownMaxIndex) - const other = { ilnd: this.initialLndBalance, hf: this.accumulatedHtlcFees, iu: this.initialUsersBalance, tu: totalUsersBalance, oext: otherExternal } - //getLogger({ component: 'watchdog_debug2' })(JSON.stringify({ deltaLnd: 0, deltaUsers: 0, totalExternal, latestIndex: this.latestPaymentIndexOffset, other })) this.interval = setInterval(() => { if (this.latestCheckStart + (1000 * 58) < Date.now()) { this.PaymentRequested() @@ -93,7 +92,44 @@ export class Watchdog { fwEvents.forwardingEvents.forEach((event) => { this.accumulatedHtlcFees += Number(event.fee) }) + } + handleRootOperations = async () => { + let pendingChange = 0 + const pendingChainPayments = await this.storage.metricsStorage.GetPendingChainPayments() + for (const payment of pendingChainPayments) { + const tx = await this.lnd.GetTx(payment.operation_identifier) + if (tx.numConfirmations > 0) { + await this.storage.metricsStorage.SetRootOpConfirmed(payment.serial_id) + continue + } + tx.outputDetails.forEach(o => pendingChange += o.isOurAddress ? Number(o.amount) : 0) + } + let newReceived = 0 + let newSpent = 0 + if (this.lastHandlerRootOpsAtUnix === 0) { + this.lastHandlerRootOpsAtUnix = Math.floor(Date.now() / 1000) + return { newReceived, newSpent, pendingChange } + } + + const newOps = await this.storage.metricsStorage.GetRootOperations({ from: this.lastHandlerRootOpsAtUnix }) + newOps.forEach(o => { + switch (o.operation_type) { + case 'chain_payment': + newSpent += Number(o.operation_amount) + break + case 'invoice_payment': + newSpent += Number(o.operation_amount) + break + case 'chain': + newReceived += Number(o.operation_amount) + break + case 'invoice': + newReceived += Number(o.operation_amount) + break + } + }) + return { newReceived, newSpent, pendingChange } } getAggregatedExternalBalance = async () => { @@ -101,8 +137,9 @@ export class Watchdog { const feesPaidForLiquidity = this.liquidityManager.GetPaidFees() const pb = await this.rugPullTracker.CheckProviderBalance() const providerBalance = pb.prevBalance || pb.balance - const otherExternal = { pb: providerBalance, f: feesPaidForLiquidity, lnd: totalLndBalance, olnd: othersFromLnd } - return { totalExternal: totalLndBalance + providerBalance + feesPaidForLiquidity, otherExternal } + const { newReceived, newSpent, pendingChange } = await this.handleRootOperations() + const opsTotal = newReceived + pendingChange - newSpent + return { totalExternal: totalLndBalance + providerBalance + feesPaidForLiquidity + opsTotal } } checkBalanceUpdate = async (deltaLnd: number, deltaUsers: number) => { @@ -187,7 +224,7 @@ export class Watchdog { } const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() this.utils.stateBundler.AddBalancePoint('usersBalance', totalUsersBalance) - const { totalExternal, otherExternal } = await this.getAggregatedExternalBalance() + const { totalExternal } = await this.getAggregatedExternalBalance() this.utils.stateBundler.AddBalancePoint('accumulatedHtlcFees', this.accumulatedHtlcFees) const deltaLnd = totalExternal - (this.initialLndBalance + this.accumulatedHtlcFees) const deltaUsers = totalUsersBalance - this.initialUsersBalance @@ -196,8 +233,6 @@ export class Watchdog { const knownMaxIndex = Math.max(maxFromDb, this.latestPaymentIndexOffset) const newLatest = await this.lnd.GetLatestPaymentIndex(knownMaxIndex) const historyMismatch = newLatest > knownMaxIndex - const other = { ilnd: this.initialLndBalance, hf: this.accumulatedHtlcFees, iu: this.initialUsersBalance, tu: totalUsersBalance, km: knownMaxIndex, nl: newLatest, oext: otherExternal } - //getLogger({ component: 'watchdog_debug2' })(JSON.stringify({ deltaLnd, deltaUsers, totalExternal, other })) const deny = await this.checkBalanceUpdate(deltaLnd, deltaUsers) if (historyMismatch) { getLogger({ component: 'bark' })("History mismatch detected in absolute update, locking outgoing operations") diff --git a/src/services/storage/entity/RootOperation.ts b/src/services/storage/entity/RootOperation.ts index 3a2bd1f9..ec4914bc 100644 --- a/src/services/storage/entity/RootOperation.ts +++ b/src/services/storage/entity/RootOperation.ts @@ -17,6 +17,9 @@ export class RootOperation { @Column({ default: 0 }) at_unix: number + @Column({ default: false }) + pending: boolean + @CreateDateColumn() created_at: Date diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index 44da304c..b388a587 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -10,6 +10,7 @@ import { StorageInterface } from "./db/storageInterface.js"; import { Utils } from "../helpers/utilsWrapper.js"; import { Channel, ChannelEventUpdate } from "../../../proto/lnd/lightning.js"; import { ChannelEvent } from "./entity/ChannelEvent.js"; +export type RootOperationType = 'chain' | 'invoice' | 'chain_payment' | 'invoice_payment' export default class { //DB: DataSource | EntityManager settings: StorageSettings @@ -145,14 +146,25 @@ export default class { } } - async AddRootOperation(opType: string, id: string, amount: number, txId?: string) { - return this.dbs.CreateAndSave('RootOperation', { operation_type: opType, operation_amount: amount, operation_identifier: id, at_unix: Math.floor(Date.now() / 1000) }, txId) + async AddRootOperation(opType: RootOperationType, id: string, amount: number, pending = false, dbTxId?: string) { + return this.dbs.CreateAndSave('RootOperation', { + operation_type: opType, operation_amount: amount, + operation_identifier: id, at_unix: Math.floor(Date.now() / 1000), pending + }, dbTxId) } - async GetRootOperation(opType: string, id: string, txId?: string) { + async GetRootOperation(opType: RootOperationType, id: string, txId?: string) { return this.dbs.FindOne('RootOperation', { where: { operation_type: opType, operation_identifier: id } }, txId) } + async GetPendingChainPayments() { + return this.dbs.Find('RootOperation', { where: { operation_type: 'chain_payment', pending: true } }) + } + + async SetRootOpConfirmed(serialId: number) { + return this.dbs.Update('RootOperation', serialId, { pending: false }) + } + async GetRootOperations({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) return this.dbs.Find('RootOperation', q, txId) diff --git a/src/services/storage/migrations/1771524665409-root_op_pending.ts b/src/services/storage/migrations/1771524665409-root_op_pending.ts new file mode 100644 index 00000000..e65461f3 --- /dev/null +++ b/src/services/storage/migrations/1771524665409-root_op_pending.ts @@ -0,0 +1,20 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class RootOpPending1771524665409 implements MigrationInterface { + name = 'RootOpPending1771524665409' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE TABLE "temporary_root_operation" ("serial_id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "operation_type" varchar NOT NULL, "operation_amount" integer NOT NULL, "operation_identifier" varchar NOT NULL, "created_at" datetime NOT NULL DEFAULT (datetime('now')), "updated_at" datetime NOT NULL DEFAULT (datetime('now')), "at_unix" integer NOT NULL DEFAULT (0), "pending" boolean NOT NULL DEFAULT (0))`); + await queryRunner.query(`INSERT INTO "temporary_root_operation"("serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix") SELECT "serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix" FROM "root_operation"`); + await queryRunner.query(`DROP TABLE "root_operation"`); + await queryRunner.query(`ALTER TABLE "temporary_root_operation" RENAME TO "root_operation"`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "root_operation" RENAME TO "temporary_root_operation"`); + await queryRunner.query(`CREATE TABLE "root_operation" ("serial_id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "operation_type" varchar NOT NULL, "operation_amount" integer NOT NULL, "operation_identifier" varchar NOT NULL, "created_at" datetime NOT NULL DEFAULT (datetime('now')), "updated_at" datetime NOT NULL DEFAULT (datetime('now')), "at_unix" integer NOT NULL DEFAULT (0))`); + await queryRunner.query(`INSERT INTO "root_operation"("serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix") SELECT "serial_id", "operation_type", "operation_amount", "operation_identifier", "created_at", "updated_at", "at_unix" FROM "temporary_root_operation"`); + await queryRunner.query(`DROP TABLE "temporary_root_operation"`); + } + +} diff --git a/src/services/storage/migrations/runner.ts b/src/services/storage/migrations/runner.ts index eb5d7c19..d68e3698 100644 --- a/src/services/storage/migrations/runner.ts +++ b/src/services/storage/migrations/runner.ts @@ -35,7 +35,7 @@ import { SwapsServiceUrl1768413055036 } from './1768413055036-swaps_service_url. import { InvoiceSwaps1769529793283 } from './1769529793283-invoice_swaps.js' import { InvoiceSwapsFixes1769805357459 } from './1769805357459-invoice_swaps_fixes.js' import { SwapTimestamps1771347307798 } from './1771347307798-swap_timestamps.js' - +import { RootOpPending1771524665409 } from './1771524665409-root_op_pending.js' export const allMigrations = [Initial1703170309875, LspOrder1718387847693, LiquidityProvider1719335699480, LndNodeInfo1720187506189, TrackedProvider1720814323679, CreateInviteTokenTable1721751414878, PaymentIndex1721760297610, DebitAccess1726496225078, DebitAccessFixes1726685229264, DebitToPub1727105758354, UserCbUrl1727112281043, UserOffer1733502626042, ManagementGrant1751307732346, ManagementGrantBanned1751989251513, @@ -44,7 +44,8 @@ export const allMigrations = [Initial1703170309875, LspOrder1718387847693, Liqui TxSwapAddress1764779178945, ClinkRequester1765497600000, TrackedProviderHeight1766504040000, SwapsServiceUrl1768413055036, InvoiceSwaps1769529793283, InvoiceSwapsFixes1769805357459, SwapTimestamps1771347307798] -export const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, RootOps1732566440447, RootOpsTime1745428134124, ChannelEvents1750777346411] +export const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, + RootOps1732566440447, RootOpsTime1745428134124, ChannelEvents1750777346411, RootOpPending1771524665409] /* export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise => { await connectAndMigrate(log, storageManager, allMigrations, allMetricsMigrations) return false