increment initial balance
This commit is contained in:
parent
eabaff5b10
commit
acbf9c7061
4 changed files with 77 additions and 16 deletions
34
src/services/helpers/functionQueue.ts
Normal file
34
src/services/helpers/functionQueue.ts
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
import { PubLogger, getLogger } from "../helpers/logger.js"
|
||||||
|
|
||||||
|
type Item<T> = { res: (v: T) => void, rej: (message: string) => void }
|
||||||
|
export default class FunctionQueue<T> {
|
||||||
|
log: PubLogger
|
||||||
|
queue: Item<T>[] = []
|
||||||
|
running: boolean = false
|
||||||
|
f: () => Promise<T>
|
||||||
|
constructor(name: string, f: () => Promise<T>) {
|
||||||
|
this.log = getLogger({ appName: name })
|
||||||
|
this.f = f
|
||||||
|
}
|
||||||
|
|
||||||
|
Run = (item: Item<T>) => {
|
||||||
|
this.queue.push(item)
|
||||||
|
if (!this.running) {
|
||||||
|
this.execF()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
execF = async () => {
|
||||||
|
this.running = true
|
||||||
|
try {
|
||||||
|
const res = await this.f()
|
||||||
|
this.queue.forEach(q => q.res(res))
|
||||||
|
} catch (err) {
|
||||||
|
this.queue.forEach(q => q.rej((err as any).message))
|
||||||
|
}
|
||||||
|
this.queue = []
|
||||||
|
this.running = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -8,7 +8,7 @@ import { LightningClient } from '../../../proto/lnd/lightning.client.js'
|
||||||
import { InvoicesClient } from '../../../proto/lnd/invoices.client.js'
|
import { InvoicesClient } from '../../../proto/lnd/invoices.client.js'
|
||||||
import { RouterClient } from '../../../proto/lnd/router.client.js'
|
import { RouterClient } from '../../../proto/lnd/router.client.js'
|
||||||
import { ChainNotifierClient } from '../../../proto/lnd/chainnotifier.client.js'
|
import { ChainNotifierClient } from '../../../proto/lnd/chainnotifier.client.js'
|
||||||
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse } from '../../../proto/lnd/lightning.js'
|
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, ChannelBalanceResponse, TransactionDetails, ListChannelsResponse, ClosedChannelsResponse, PendingChannelsResponse, ForwardingHistoryResponse } from '../../../proto/lnd/lightning.js'
|
||||||
import { OpenChannelReq } from './openChannelReq.js';
|
import { OpenChannelReq } from './openChannelReq.js';
|
||||||
import { AddInvoiceReq } from './addInvoiceReq.js';
|
import { AddInvoiceReq } from './addInvoiceReq.js';
|
||||||
import { PayInvoiceReq } from './payInvoiceReq.js';
|
import { PayInvoiceReq } from './payInvoiceReq.js';
|
||||||
|
|
@ -358,9 +358,9 @@ export default class {
|
||||||
return { confirmedBalance: Number(confirmedBalance), unconfirmedBalance: Number(unconfirmedBalance), totalBalance: Number(totalBalance), channelsBalance }
|
return { confirmedBalance: Number(confirmedBalance), unconfirmedBalance: Number(unconfirmedBalance), totalBalance: Number(totalBalance), channelsBalance }
|
||||||
}
|
}
|
||||||
|
|
||||||
async GetForwardingHistory(indexOffset: number): Promise<{ fee: number, chanIdIn: string, chanIdOut: string, timestampNs: number, offset: number }[]> {
|
async GetForwardingHistory(indexOffset: number, startTime = 0): Promise<ForwardingHistoryResponse> {
|
||||||
const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: 0n, endTime: 0n, peerAliasLookup: false }, DeadLineMetadata())
|
const { response } = await this.lightning.forwardingHistory({ indexOffset, numMaxEvents: 0, startTime: BigInt(startTime), endTime: 0n, peerAliasLookup: false }, DeadLineMetadata())
|
||||||
return response.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: Number(e.timestampNs), offset: response.lastOffsetIndex }))
|
return response
|
||||||
}
|
}
|
||||||
|
|
||||||
async GetAllPaidInvoices(max: number) {
|
async GetAllPaidInvoices(max: number) {
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import { EnvCanBeInteger } from "../helpers/envParser.js";
|
import { EnvCanBeInteger } from "../helpers/envParser.js";
|
||||||
|
import FunctionQueue from "../helpers/functionQueue.js";
|
||||||
import { getLogger } from "../helpers/logger.js";
|
import { getLogger } from "../helpers/logger.js";
|
||||||
import LND from "../lnd/lnd.js";
|
import LND from "../lnd/lnd.js";
|
||||||
import { ChannelBalance } from "../lnd/settings.js";
|
import { ChannelBalance } from "../lnd/settings.js";
|
||||||
|
|
@ -12,20 +13,24 @@ export const LoadWatchdogSettingsFromEnv = (test = false): WatchdogSettings => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
export class Watchdog {
|
export class Watchdog {
|
||||||
|
queue: FunctionQueue<void>
|
||||||
initialLndBalance: number;
|
initialLndBalance: number;
|
||||||
initialUsersBalance: number;
|
initialUsersBalance: number;
|
||||||
|
startedAtUnix: number;
|
||||||
|
latestIndexOffset: number;
|
||||||
|
accumulatedHtlcFees: number;
|
||||||
lnd: LND;
|
lnd: LND;
|
||||||
settings: WatchdogSettings;
|
settings: WatchdogSettings;
|
||||||
storage: Storage;
|
storage: Storage;
|
||||||
latestCheckStart = 0
|
latestCheckStart = 0
|
||||||
log = getLogger({ appName: "watchdog" })
|
log = getLogger({ appName: "watchdog" })
|
||||||
enabled = false
|
ready = false
|
||||||
interval: NodeJS.Timer;
|
interval: NodeJS.Timer;
|
||||||
constructor(settings: WatchdogSettings, lnd: LND, storage: Storage) {
|
constructor(settings: WatchdogSettings, lnd: LND, storage: Storage) {
|
||||||
this.lnd = lnd;
|
this.lnd = lnd;
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
|
this.queue = new FunctionQueue("watchdog::queue", () => this.StartCheck())
|
||||||
}
|
}
|
||||||
|
|
||||||
Stop() {
|
Stop() {
|
||||||
|
|
@ -35,10 +40,13 @@ export class Watchdog {
|
||||||
}
|
}
|
||||||
|
|
||||||
Start = async () => {
|
Start = async () => {
|
||||||
|
this.startedAtUnix = Math.floor(Date.now() / 1000)
|
||||||
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
|
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
|
||||||
this.initialLndBalance = await this.getTotalLndBalance(totalUsersBalance)
|
this.initialLndBalance = await this.getTotalLndBalance(totalUsersBalance)
|
||||||
this.initialUsersBalance = totalUsersBalance
|
this.initialUsersBalance = totalUsersBalance
|
||||||
this.enabled = true
|
const fwEvents = await this.lnd.GetForwardingHistory(0, this.startedAtUnix)
|
||||||
|
this.latestIndexOffset = fwEvents.lastOffsetIndex
|
||||||
|
this.accumulatedHtlcFees = 0
|
||||||
|
|
||||||
this.interval = setInterval(() => {
|
this.interval = setInterval(() => {
|
||||||
if (this.latestCheckStart + (1000 * 60) < Date.now()) {
|
if (this.latestCheckStart + (1000 * 60) < Date.now()) {
|
||||||
|
|
@ -46,6 +54,17 @@ export class Watchdog {
|
||||||
this.PaymentRequested()
|
this.PaymentRequested()
|
||||||
}
|
}
|
||||||
}, 1000 * 60)
|
}, 1000 * 60)
|
||||||
|
|
||||||
|
this.ready = true
|
||||||
|
}
|
||||||
|
|
||||||
|
updateAccumulatedHtlcFees = async () => {
|
||||||
|
const fwEvents = await this.lnd.GetForwardingHistory(this.latestIndexOffset, this.startedAtUnix)
|
||||||
|
this.latestIndexOffset = fwEvents.lastOffsetIndex
|
||||||
|
fwEvents.forwardingEvents.forEach((event) => {
|
||||||
|
this.accumulatedHtlcFees += Number(event.fee)
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -54,10 +73,11 @@ export class Watchdog {
|
||||||
const walletBalance = await this.lnd.GetWalletBalance()
|
const walletBalance = await this.lnd.GetWalletBalance()
|
||||||
this.log(Number(walletBalance.confirmedBalance), "sats in chain wallet")
|
this.log(Number(walletBalance.confirmedBalance), "sats in chain wallet")
|
||||||
const channelsBalance = await this.lnd.GetChannelBalance()
|
const channelsBalance = await this.lnd.GetChannelBalance()
|
||||||
getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal })
|
getLogger({ appName: "debugLndBalancev3" })({ w: walletBalance, c: channelsBalance, u: usersTotal, f: this.accumulatedHtlcFees })
|
||||||
|
|
||||||
const localChannelsBalance = Number(channelsBalance.localBalance?.sat || 0)
|
const localChannelsBalance = Number(channelsBalance.localBalance?.sat || 0)
|
||||||
const unsettledLocalBalance = Number(channelsBalance.unsettledLocalBalance?.sat || 0)
|
const unsettledLocalBalance = Number(channelsBalance.unsettledLocalBalance?.sat || 0)
|
||||||
return Number(walletBalance.confirmedBalance) + localChannelsBalance + unsettledLocalBalance
|
return Number(walletBalance.confirmedBalance) + localChannelsBalance + unsettledLocalBalance + this.accumulatedHtlcFees
|
||||||
}
|
}
|
||||||
|
|
||||||
checkBalanceUpdate = (deltaLnd: number, deltaUsers: number) => {
|
checkBalanceUpdate = (deltaLnd: number, deltaUsers: number) => {
|
||||||
|
|
@ -111,13 +131,9 @@ export class Watchdog {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
PaymentRequested = async () => {
|
StartCheck = async () => {
|
||||||
this.log("Payment requested, checking balance")
|
|
||||||
if (!this.enabled) {
|
|
||||||
this.log("WARNING! Watchdog not enabled, skipping balance check")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
this.latestCheckStart = Date.now()
|
this.latestCheckStart = Date.now()
|
||||||
|
await this.updateAccumulatedHtlcFees()
|
||||||
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
|
const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance()
|
||||||
const totalLndBalance = await this.getTotalLndBalance(totalUsersBalance)
|
const totalLndBalance = await this.getTotalLndBalance(totalUsersBalance)
|
||||||
const deltaLnd = totalLndBalance - this.initialLndBalance
|
const deltaLnd = totalLndBalance - this.initialLndBalance
|
||||||
|
|
@ -131,6 +147,16 @@ export class Watchdog {
|
||||||
this.lnd.UnlockOutgoingOperations()
|
this.lnd.UnlockOutgoingOperations()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PaymentRequested = async () => {
|
||||||
|
this.log("Payment requested, checking balance")
|
||||||
|
if (!this.ready) {
|
||||||
|
throw new Error("Watchdog not ready")
|
||||||
|
}
|
||||||
|
return new Promise<void>((res, rej) => {
|
||||||
|
this.queue.Run({ res, rej })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
checkDeltas = (deltaLnd: number, deltaUsers: number): DeltaCheckResult => {
|
checkDeltas = (deltaLnd: number, deltaUsers: number): DeltaCheckResult => {
|
||||||
if (deltaLnd < 0) {
|
if (deltaLnd < 0) {
|
||||||
if (deltaUsers < 0) {
|
if (deltaUsers < 0) {
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,8 @@ export default class Handler {
|
||||||
|
|
||||||
async FetchLatestForwardingEvents() {
|
async FetchLatestForwardingEvents() {
|
||||||
const latestIndex = await this.storage.metricsStorage.GetLatestForwardingIndexOffset()
|
const latestIndex = await this.storage.metricsStorage.GetLatestForwardingIndexOffset()
|
||||||
const forwards = await this.lnd.GetForwardingHistory(latestIndex)
|
const res = await this.lnd.GetForwardingHistory(latestIndex)
|
||||||
|
const forwards = res.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: e.timestampNs.toString(), offset: res.lastOffsetIndex }))
|
||||||
await Promise.all(forwards.map(async f => {
|
await Promise.all(forwards.map(async f => {
|
||||||
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdIn, { forward_fee_as_input: f.fee, latest_index_offset: f.offset })
|
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdIn, { forward_fee_as_input: f.fee, latest_index_offset: f.offset })
|
||||||
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdOut, { forward_fee_as_output: f.fee, latest_index_offset: f.offset })
|
await this.storage.metricsStorage.IncrementChannelRouting(f.chanIdOut, { forward_fee_as_output: f.fee, latest_index_offset: f.offset })
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue