From acbf9c706182aa6c3ace2bf09992553f8f603191 Mon Sep 17 00:00:00 2001 From: hatim boufnichel Date: Tue, 30 Apr 2024 19:33:49 +0200 Subject: [PATCH] increment initial balance --- src/services/helpers/functionQueue.ts | 34 +++++++++++++++++++ src/services/lnd/lnd.ts | 8 ++--- src/services/main/watchdog.ts | 48 +++++++++++++++++++++------ src/services/metrics/index.ts | 3 +- 4 files changed, 77 insertions(+), 16 deletions(-) create mode 100644 src/services/helpers/functionQueue.ts diff --git a/src/services/helpers/functionQueue.ts b/src/services/helpers/functionQueue.ts new file mode 100644 index 00000000..5d3b5f21 --- /dev/null +++ b/src/services/helpers/functionQueue.ts @@ -0,0 +1,34 @@ +import { PubLogger, getLogger } from "../helpers/logger.js" + +type Item = { res: (v: T) => void, rej: (message: string) => void } +export default class FunctionQueue { + log: PubLogger + queue: Item[] = [] + running: boolean = false + f: () => Promise + constructor(name: string, f: () => Promise) { + this.log = getLogger({ appName: name }) + this.f = f + } + + Run = (item: Item) => { + this.queue.push(item) + if (!this.running) { + this.execF() + } + } + + execF = async () => { + this.running = true + try { + const res = await this.f() + this.queue.forEach(q => q.res(res)) + } catch (err) { + this.queue.forEach(q => q.rej((err as any).message)) + } + this.queue = [] + this.running = false + } +} + + diff --git a/src/services/lnd/lnd.ts b/src/services/lnd/lnd.ts index c89676e1..655653af 100644 --- a/src/services/lnd/lnd.ts +++ b/src/services/lnd/lnd.ts @@ -8,7 +8,7 @@ import { LightningClient } from '../../../proto/lnd/lightning.client.js' import { InvoicesClient } from '../../../proto/lnd/invoices.client.js' import { RouterClient } from '../../../proto/lnd/router.client.js' import { ChainNotifierClient } from '../../../proto/lnd/chainnotifier.client.js' -import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse } from '../../../proto/lnd/lightning.js' +import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse, ForwardingHistoryResponse } from '../../../proto/lnd/lightning.js' import { OpenChannelReq } from './openChannelReq.js'; import { AddInvoiceReq } from './addInvoiceReq.js'; import { PayInvoiceReq } from './payInvoiceReq.js'; @@ -358,9 +358,9 @@ export default class { return { confirmedBalance: Number(confirmedBalance), unconfirmedBalance: Number(unconfirmedBalance), totalBalance: Number(totalBalance), channelsBalance } } - async GetForwardingHistory(indexOffset: number): Promise<{ fee: number, chanIdIn: string, chanIdOut: string, timestampNs: number, offset: number }[]> { - const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: 0n, endTime: 0n, peerAliasLookup: false }, DeadLineMetadata()) - return response.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: Number(e.timestampNs), offset: response.lastOffsetIndex })) + async GetForwardingHistory(indexOffset: number, startTime = 0): Promise { + const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: BigInt(startTime), endTime: 0n, peerAliasLookup: false }, DeadLineMetadata()) + return response } async GetAllPaidInvoices(max: number) { diff --git a/src/services/main/watchdog.ts b/src/services/main/watchdog.ts index a2db529f..c43252df 100644 --- a/src/services/main/watchdog.ts +++ b/src/services/main/watchdog.ts @@ -1,4 +1,5 @@ import { EnvCanBeInteger } from "../helpers/envParser.js"; +import FunctionQueue from "../helpers/functionQueue.js"; import { getLogger } from "../helpers/logger.js"; import LND from "../lnd/lnd.js"; import { ChannelBalance } from "../lnd/settings.js"; @@ -12,20 +13,24 @@ export const LoadWatchdogSettingsFromEnv = (test = false): WatchdogSettings => { } } export class Watchdog { - + queue: FunctionQueue initialLndBalance: number; initialUsersBalance: number; + startedAtUnix: number; + latestIndexOffset: number; + accumulatedHtlcFees: number; lnd: LND; settings: WatchdogSettings; storage: Storage; latestCheckStart = 0 log = getLogger({ appName: "watchdog" }) - enabled = false + ready = false interval: NodeJS.Timer; constructor(settings: WatchdogSettings, lnd: LND, storage: Storage) { this.lnd = lnd; this.settings = settings; this.storage = storage; + this.queue = new FunctionQueue("watchdog::queue", () => this.StartCheck()) } Stop() { @@ -35,10 +40,13 @@ export class Watchdog { } Start = async () => { + this.startedAtUnix = Math.floor(Date.now() / 1000) const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() this.initialLndBalance = await this.getTotalLndBalance(totalUsersBalance) this.initialUsersBalance = totalUsersBalance - this.enabled = true + const fwEvents = await this.lnd.GetForwardingHistory(0, this.startedAtUnix) + this.latestIndexOffset = fwEvents.lastOffsetIndex + this.accumulatedHtlcFees = 0 this.interval = setInterval(() => { if (this.latestCheckStart + (1000 * 60) < Date.now()) { @@ -46,6 +54,17 @@ export class Watchdog { this.PaymentRequested() } }, 1000 * 60) + + this.ready = true + } + + updateAccumulatedHtlcFees = async () => { + const fwEvents = await this.lnd.GetForwardingHistory(this.latestIndexOffset, this.startedAtUnix) + this.latestIndexOffset = fwEvents.lastOffsetIndex + fwEvents.forwardingEvents.forEach((event) => { + this.accumulatedHtlcFees += Number(event.fee) + }) + } @@ -54,10 +73,11 @@ export class Watchdog { const walletBalance = await this.lnd.GetWalletBalance() this.log(Number(walletBalance.confirmedBalance), "sats in chain wallet") const channelsBalance = await this.lnd.GetChannelBalance() - getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal }) + getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal, f: this.accumulatedHtlcFees }) + const localChannelsBalance = Number(channelsBalance.localBalance?.sat || 0) const unsettledLocalBalance = Number(channelsBalance.unsettledLocalBalance?.sat || 0) - return Number(walletBalance.confirmedBalance) + localChannelsBalance + unsettledLocalBalance + return Number(walletBalance.confirmedBalance) + localChannelsBalance + unsettledLocalBalance + this.accumulatedHtlcFees } checkBalanceUpdate = (deltaLnd: number, deltaUsers: number) => { @@ -111,13 +131,9 @@ export class Watchdog { return false } - PaymentRequested = async () => { - this.log("Payment requested, checking balance") - if (!this.enabled) { - this.log("WARNING! Watchdog not enabled, skipping balance check") - return - } + StartCheck = async () => { this.latestCheckStart = Date.now() + await this.updateAccumulatedHtlcFees() const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() const totalLndBalance = await this.getTotalLndBalance(totalUsersBalance) const deltaLnd = totalLndBalance - this.initialLndBalance @@ -131,6 +147,16 @@ export class Watchdog { this.lnd.UnlockOutgoingOperations() } + PaymentRequested = async () => { + this.log("Payment requested, checking balance") + if (!this.ready) { + throw new Error("Watchdog not ready") + } + return new Promise((res, rej) => { + this.queue.Run({ res, rej }) + }) + } + checkDeltas = (deltaLnd: number, deltaUsers: number): DeltaCheckResult => { if (deltaLnd < 0) { if (deltaUsers < 0) { diff --git a/src/services/metrics/index.ts b/src/services/metrics/index.ts index 0e1f9553..56d0d4d7 100644 --- a/src/services/metrics/index.ts +++ b/src/services/metrics/index.ts @@ -40,7 +40,8 @@ export default class Handler { async FetchLatestForwardingEvents() { const latestIndex = await this.storage.metricsStorage.GetLatestForwardingIndexOffset() - const forwards = await this.lnd.GetForwardingHistory(latestIndex) + const res = await this.lnd.GetForwardingHistory(latestIndex) + const forwards = res.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: e.timestampNs.toString(), offset: res.lastOffsetIndex })) await Promise.all(forwards.map(async f => { await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdIn, { forward_fee_as_input: f.fee, latest_index_offset: f.offset }) await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdOut, { forward_fee_as_output: f.fee, latest_index_offset: f.offset })