Merge pull request #680 from shocknet/increment-initial
increment initial balance
This commit is contained in:
commit
119827697f
4 changed files with 77 additions and 16 deletions
34
src/services/helpers/functionQueue.ts
Normal file
34
src/services/helpers/functionQueue.ts
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
import { PubLogger, getLogger } from "../helpers/logger.js"
|
||||
|
||||
type Item<T> = { res: (v: T) => void, rej: (message: string) => void }
|
||||
export default class FunctionQueue<T> {
|
||||
log: PubLogger
|
||||
queue: Item<T>[] = []
|
||||
running: boolean = false
|
||||
f: () => Promise<T>
|
||||
constructor(name: string, f: () => Promise<T>) {
|
||||
this.log = getLogger({ appName: name })
|
||||
this.f = f
|
||||
}
|
||||
|
||||
Run = (item: Item<T>) => {
|
||||
this.queue.push(item)
|
||||
if (!this.running) {
|
||||
this.execF()
|
||||
}
|
||||
}
|
||||
|
||||
execF = async () => {
|
||||
this.running = true
|
||||
try {
|
||||
const res = await this.f()
|
||||
this.queue.forEach(q => q.res(res))
|
||||
} catch (err) {
|
||||
this.queue.forEach(q => q.rej((err as any).message))
|
||||
}
|
||||
this.queue = []
|
||||
this.running = false
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -8,7 +8,7 @@ import { LightningClient } from '../../../proto/lnd/lightning.client.js'
|
|||
import { InvoicesClient } from '../../../proto/lnd/invoices.client.js'
|
||||
import { RouterClient } from '../../../proto/lnd/router.client.js'
|
||||
import { ChainNotifierClient } from '../../../proto/lnd/chainnotifier.client.js'
|
||||
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse } from '../../../proto/lnd/lightning.js'
|
||||
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse, ForwardingHistoryResponse } from '../../../proto/lnd/lightning.js'
|
||||
import { OpenChannelReq } from './openChannelReq.js';
|
||||
import { AddInvoiceReq } from './addInvoiceReq.js';
|
||||
import { PayInvoiceReq } from './payInvoiceReq.js';
|
||||
|
|
@ -358,9 +358,9 @@ export default class {
|
|||
return { confirmedBalance: Number(confirmedBalance), unconfirmedBalance: Number(unconfirmedBalance), totalBalance: Number(totalBalance), channelsBalance }
|
||||
}
|
||||
|
||||
async GetForwardingHistory(indexOffset: number): Promise<{ fee: number, chanIdIn: string, chanIdOut: string, timestampNs: number, offset: number }[]> {
|
||||
const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: 0n, endTime: 0n, peerAliasLookup: false }, DeadLineMetadata())
|
||||
return response.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: Number(e.timestampNs), offset: response.lastOffsetIndex }))
|
||||
async GetForwardingHistory(indexOffset: number, startTime = 0): Promise<ForwardingHistoryResponse> {
|
||||
const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: BigInt(startTime), endTime: 0n, peerAliasLookup: false }, DeadLineMetadata())
|
||||
return response
|
||||
}
|
||||
|
||||
async GetAllPaidInvoices(max: number) {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import { EnvCanBeInteger } from "../helpers/envParser.js";
|
||||
import FunctionQueue from "../helpers/functionQueue.js";
|
||||
import { getLogger } from "../helpers/logger.js";
|
||||
import LND from "../lnd/lnd.js";
|
||||
import { ChannelBalance } from "../lnd/settings.js";
|
||||
|
|
@ -12,20 +13,24 @@ export const LoadWatchdogSettingsFromEnv = (test = false): WatchdogSettings => {
|
|||
}
|
||||
}
|
||||
export class Watchdog {
|
||||
|
||||
queue: FunctionQueue<void>
|
||||
initialLndBalance: number;
|
||||
initialUsersBalance: number;
|
||||
startedAtUnix: number;
|
||||
latestIndexOffset: number;
|
||||
accumulatedHtlcFees: number;
|
||||
lnd: LND;
|
||||
settings: WatchdogSettings;
|
||||
storage: Storage;
|
||||
latestCheckStart = 0
|
||||
log = getLogger({ appName: "watchdog" })
|
||||
enabled = false
|
||||
ready = false
|
||||
interval: NodeJS.Timer;
|
||||
constructor(settings: WatchdogSettings, lnd: LND, storage: Storage) {
|
||||
this.lnd = lnd;
|
||||
this.settings = settings;
|
||||
this.storage = storage;
|
||||
this.queue = new FunctionQueue("watchdog::queue", () => this.StartCheck())
|
||||
}
|
||||
|
||||
Stop() {
|
||||
|
|
@ -35,10 +40,13 @@ export class Watchdog {
|
|||
}
|
||||
|
||||
Start = async () => {
|
||||
this.startedAtUnix = Math.floor(Date.now() / 1000)
|
||||
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
|
||||
this.initialLndBalance = await this.getTotalLndBalance(totalUsersBalance)
|
||||
this.initialUsersBalance = totalUsersBalance
|
||||
this.enabled = true
|
||||
const fwEvents = await this.lnd.GetForwardingHistory(0, this.startedAtUnix)
|
||||
this.latestIndexOffset = fwEvents.lastOffsetIndex
|
||||
this.accumulatedHtlcFees = 0
|
||||
|
||||
this.interval = setInterval(() => {
|
||||
if (this.latestCheckStart + (1000 * 60) < Date.now()) {
|
||||
|
|
@ -46,6 +54,17 @@ export class Watchdog {
|
|||
this.PaymentRequested()
|
||||
}
|
||||
}, 1000 * 60)
|
||||
|
||||
this.ready = true
|
||||
}
|
||||
|
||||
updateAccumulatedHtlcFees = async () => {
|
||||
const fwEvents = await this.lnd.GetForwardingHistory(this.latestIndexOffset, this.startedAtUnix)
|
||||
this.latestIndexOffset = fwEvents.lastOffsetIndex
|
||||
fwEvents.forwardingEvents.forEach((event) => {
|
||||
this.accumulatedHtlcFees += Number(event.fee)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -54,7 +73,8 @@ export class Watchdog {
|
|||
const walletBalance = await this.lnd.GetWalletBalance()
|
||||
this.log(Number(walletBalance.confirmedBalance), "sats in chain wallet")
|
||||
const channelsBalance = await this.lnd.GetChannelBalance()
|
||||
getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal })
|
||||
getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal, f: this.accumulatedHtlcFees })
|
||||
|
||||
const localChannelsBalance = Number(channelsBalance.localBalance?.sat || 0)
|
||||
const unsettledLocalBalance = Number(channelsBalance.unsettledLocalBalance?.sat || 0)
|
||||
return Number(walletBalance.confirmedBalance) + localChannelsBalance + unsettledLocalBalance
|
||||
|
|
@ -111,16 +131,12 @@ export class Watchdog {
|
|||
return false
|
||||
}
|
||||
|
||||
PaymentRequested = async () => {
|
||||
this.log("Payment requested, checking balance")
|
||||
if (!this.enabled) {
|
||||
this.log("WARNING! Watchdog not enabled, skipping balance check")
|
||||
return
|
||||
}
|
||||
StartCheck = async () => {
|
||||
this.latestCheckStart = Date.now()
|
||||
await this.updateAccumulatedHtlcFees()
|
||||
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
|
||||
const totalLndBalance = await this.getTotalLndBalance(totalUsersBalance)
|
||||
const deltaLnd = totalLndBalance - this.initialLndBalance
|
||||
const deltaLnd = totalLndBalance - (this.initialLndBalance + this.accumulatedHtlcFees)
|
||||
const deltaUsers = totalUsersBalance - this.initialUsersBalance
|
||||
const deny = this.checkBalanceUpdate(deltaLnd, deltaUsers)
|
||||
if (deny) {
|
||||
|
|
@ -131,6 +147,16 @@ export class Watchdog {
|
|||
this.lnd.UnlockOutgoingOperations()
|
||||
}
|
||||
|
||||
PaymentRequested = async () => {
|
||||
this.log("Payment requested, checking balance")
|
||||
if (!this.ready) {
|
||||
throw new Error("Watchdog not ready")
|
||||
}
|
||||
return new Promise<void>((res, rej) => {
|
||||
this.queue.Run({ res, rej })
|
||||
})
|
||||
}
|
||||
|
||||
checkDeltas = (deltaLnd: number, deltaUsers: number): DeltaCheckResult => {
|
||||
if (deltaLnd < 0) {
|
||||
if (deltaUsers < 0) {
|
||||
|
|
|
|||
|
|
@ -40,7 +40,8 @@ export default class Handler {
|
|||
|
||||
async FetchLatestForwardingEvents() {
|
||||
const latestIndex = await this.storage.metricsStorage.GetLatestForwardingIndexOffset()
|
||||
const forwards = await this.lnd.GetForwardingHistory(latestIndex)
|
||||
const res = await this.lnd.GetForwardingHistory(latestIndex)
|
||||
const forwards = res.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: e.timestampNs.toString(), offset: res.lastOffsetIndex }))
|
||||
await Promise.all(forwards.map(async f => {
|
||||
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdIn, { forward_fee_as_input: f.fee, latest_index_offset: f.offset })
|
||||
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdOut, { forward_fee_as_output: f.fee, latest_index_offset: f.offset })
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue