diff --git a/metricsDatasource.js b/metricsDatasource.js index 9801c558..6f53d3bb 100644 --- a/metricsDatasource.js +++ b/metricsDatasource.js @@ -1,12 +1,10 @@ import { DataSource } from "typeorm" -import { BalanceEvent } from "./build/src/services/storage/entity/BalanceEvent.js" -import { ChannelBalanceEvent } from "./build/src/services/storage/entity/ChannelsBalanceEvent.js" -import { RoutingEvent } from "./build/src/services/storage/entity/RoutingEvent.js" +import { HtlcFailures } from "./build/src/services/storage/entity/HtlcFailures.js" export default new DataSource({ type: "sqlite", database: "metrics.sqlite", - entities: [ RoutingEvent, BalanceEvent, ChannelBalanceEvent], + entities: [HtlcFailures], }); \ No newline at end of file diff --git a/src/services/metrics/htlcTracker.ts b/src/services/metrics/htlcTracker.ts new file mode 100644 index 00000000..4cdf993d --- /dev/null +++ b/src/services/metrics/htlcTracker.ts @@ -0,0 +1,153 @@ +import Storage from '../storage/index.js' +import { ForwardEvent, HtlcEvent, HtlcEvent_EventType } from "../../../proto/lnd/router.js"; +import { getLogger } from "../helpers/logger.js"; +type EventInfo = { + eventType: HtlcEvent_EventType + outgoingHtlcId: number + incomingHtlcId: number + outgoingChannelId: number + incomingChannelId: number +} +export default class HtlcTracker { + storage: Storage + pendingSendHtlcs: Map = new Map() + pendingReceiveHtlcs: Map = new Map() + pendingForwardHtlcs: Map = new Map() + constructor(storage: Storage) { + this.storage = storage + } + log = getLogger({ appName: 'htlcTracker' }) + onHtlcEvent = async (htlc: HtlcEvent) => { + const htlcEvent = htlc.event + if (htlcEvent.oneofKind === 'subscribedEvent') { + this.log("htlc subscribed") + return + } + const outgoingHtlcId = Number(htlc.outgoingHtlcId) + const incomingHtlcId = Number(htlc.incomingHtlcId) + const outgoingChannelId = Number(htlc.outgoingChannelId) + const incomingChannelId = Number(htlc.incomingChannelId) + const info: EventInfo = { eventType: htlc.eventType, outgoingChannelId, incomingChannelId, outgoingHtlcId, incomingHtlcId } + switch (htlcEvent.oneofKind) { + case 'forwardEvent': + return this.handleForward(htlcEvent.forwardEvent, info) + case 'forwardFailEvent': + return this.handleFailure({ ...info, failureReason: 'forwardFailEvent' }) + case 'linkFailEvent': + return this.handleFailure({ ...info, failureReason: htlcEvent.linkFailEvent.failureString || 'linkFailEvent' }) + case 'finalHtlcEvent': + if (!htlcEvent.finalHtlcEvent.settled) { + return this.handleFailure({ ...info, failureReason: 'finalHtlcEvent' }) + } else { + return this.handleSuccess(info) + } + case 'settleEvent': + return this.handleSuccess(info) + default: + this.log("unknown htlc event type") + } + } + + handleForward = (fwe: ForwardEvent, { eventType, outgoingHtlcId, incomingHtlcId }: EventInfo) => { + this.log("new forward event, currently tracked htlcs: (s,r,f)", this.pendingSendHtlcs.size, this.pendingReceiveHtlcs.size, this.pendingForwardHtlcs.size) + const { info } = fwe + const incomingAmtMsat = info ? Number(info.incomingAmtMsat) : 0 + const outgoingAmtMsat = info ? Number(info.outgoingAmtMsat) : 0 + if (eventType === HtlcEvent_EventType.SEND) { + this.pendingSendHtlcs.set(outgoingHtlcId, outgoingAmtMsat - incomingAmtMsat) + } else if (eventType === HtlcEvent_EventType.RECEIVE) { + this.pendingReceiveHtlcs.set(incomingHtlcId, incomingAmtMsat - outgoingAmtMsat) + } else if (eventType === HtlcEvent_EventType.FORWARD) { + this.pendingForwardHtlcs.set(outgoingHtlcId, outgoingAmtMsat - incomingAmtMsat) + } else { + this.log("unknown htlc event type for forward event") + } + } + + handleFailure = ({ eventType, outgoingHtlcId, incomingHtlcId, incomingChannelId, outgoingChannelId, failureReason }: EventInfo & { failureReason: string }) => { + if (eventType === HtlcEvent_EventType.SEND && this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs) !== null) { + return this.incrementSendFailures(outgoingChannelId, failureReason) + } + if (eventType === HtlcEvent_EventType.RECEIVE && this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) !== null) { + return this.incrementReceiveFailures(incomingChannelId, failureReason) + } + if (eventType === HtlcEvent_EventType.FORWARD) { + const amt = this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs) + if (amt !== null) { + return this.incrementForwardFailures(incomingChannelId, outgoingChannelId, amt, failureReason) + } + } + if (eventType === HtlcEvent_EventType.UNKNOWN) { + const fwdAmt = this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs) + if (fwdAmt !== null) { + return this.incrementForwardFailures(incomingChannelId, outgoingChannelId, fwdAmt, failureReason) + } + if (this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs) !== null) { + return this.incrementSendFailures(outgoingChannelId, failureReason) + } + if (this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) !== null) { + return this.incrementReceiveFailures(incomingChannelId, failureReason) + } + } + this.log("unknown htlc event type for failure event") + } + + handleSuccess = ({ eventType, outgoingHtlcId, incomingHtlcId }: EventInfo) => { + if (eventType === HtlcEvent_EventType.SEND) { + this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs) + } else if (eventType === HtlcEvent_EventType.RECEIVE) { + this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) + } else if (eventType === HtlcEvent_EventType.FORWARD) { + this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs) + } else if (eventType === HtlcEvent_EventType.UNKNOWN) { + if (this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs) !== null) return + if (this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) !== null) return + if (this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs) !== null) return + } else { + this.log("unknown htlc event type for success event") + } + } + + deleteMapEntry = (key: number, map: Map) => { + if (!map.has(key)) { + return null + } + const v = map.get(key) + map.delete(key) + return v || null + } + + incrementSendFailures = async (outgoingChannelId: number, reason: string) => { + await this.storage.metricsStorage.updateHtlcErrors(getToday(), d => { + d.send_failures++ + d.failed_destinations[outgoingChannelId] = (d.failed_destinations[outgoingChannelId] || 0) + 1 + d.errors[reason] = (d.errors[reason] || 0) + 1 + return d + }) + } + incrementReceiveFailures = async (incomingChannelId: number, reason: string) => { + await this.storage.metricsStorage.updateHtlcErrors(getToday(), d => { + d.receive_failures++ + d.failed_sources[incomingChannelId] = (d.failed_sources[incomingChannelId] || 0) + 1 + d.errors[reason] = (d.errors[reason] || 0) + 1 + return d + }) + } + incrementForwardFailures = async (incomingChannelId: number, outgoingChannelId: number, amt: number, reason: string) => { + await this.storage.metricsStorage.updateHtlcErrors(getToday(), d => { + d.forward_failures++ + d.forward_failures_amt += amt + d.failed_sources[incomingChannelId] = (d.failed_sources[incomingChannelId] || 0) + 1 + d.failed_destinations[outgoingChannelId] = (d.failed_destinations[outgoingChannelId] || 0) + 1 + d.errors[reason] = (d.errors[reason] || 0) + 1 + return d + }) + } +} + +const getToday = () => { + const now = new Date() + return `${now.getFullYear()}-${z(now.getMonth() + 1)}-${z(now.getDate())}` + +} +const z = (n: number) => n < 10 ? `0${n}` : `${n}` \ No newline at end of file diff --git a/src/services/metrics/index.ts b/src/services/metrics/index.ts index b46e4d5b..e67f69d0 100644 --- a/src/services/metrics/index.ts +++ b/src/services/metrics/index.ts @@ -7,42 +7,21 @@ import { BalanceInfo } from '../lnd/settings.js' import { BalanceEvent } from '../storage/entity/BalanceEvent.js' import { ChannelBalanceEvent } from '../storage/entity/ChannelsBalanceEvent.js' import { LightningHandler } from '../lnd/index.js' +import HtlcTracker from './htlcTracker.js' const maxEvents = 100_000 export default class Handler { storage: Storage lnd: LightningHandler + htlcTracker: HtlcTracker metrics: Types.UsageMetric[] = [] constructor(storage: Storage, lnd: LightningHandler) { this.storage = storage this.lnd = lnd + this.htlcTracker = new HtlcTracker(this.storage) } async HtlcCb(htlc: HtlcEvent) { - const routingEvent: Partial = {} - routingEvent.event_type = HtlcEvent_EventType[htlc.eventType] - routingEvent.incoming_channel_id = Number(htlc.incomingChannelId) - routingEvent.incoming_htlc_id = Number(htlc.incomingHtlcId) - routingEvent.outgoing_channel_id = Number(htlc.outgoingChannelId) - routingEvent.outgoing_htlc_id = Number(htlc.outgoingHtlcId) - routingEvent.timestamp_ns = Number(htlc.timestampNs) - if (htlc.event.oneofKind === 'finalHtlcEvent') { - routingEvent.offchain = htlc.event.finalHtlcEvent.offchain - routingEvent.settled = htlc.event.finalHtlcEvent.settled - } else if (htlc.event.oneofKind === 'forwardEvent') { - const { info } = htlc.event.forwardEvent - routingEvent.incoming_amt_msat = info ? Number(info.incomingAmtMsat) : undefined - routingEvent.outgoing_amt_msat = info ? Number(info.outgoingAmtMsat) : undefined - } else if (htlc.event.oneofKind === 'settleEvent') { - } else if (htlc.event.oneofKind === 'subscribedEvent') { - } else if (htlc.event.oneofKind === 'forwardFailEvent') { - routingEvent.forward_fail_event = true - } else if (htlc.event.oneofKind === 'linkFailEvent') { - routingEvent.failure_string = htlc.event.linkFailEvent.failureString - const { info } = htlc.event.linkFailEvent - routingEvent.incoming_amt_msat = info ? Number(info.incomingAmtMsat) : undefined - routingEvent.outgoing_amt_msat = info ? Number(info.outgoingAmtMsat) : undefined - } - await this.storage.metricsStorage.SaveRoutingEvent(routingEvent) + await this.htlcTracker.onHtlcEvent(htlc) } async NewBlockCb(height: number, balanceInfo: BalanceInfo) { diff --git a/src/services/storage/entity/HtlcFailures.ts b/src/services/storage/entity/HtlcFailures.ts new file mode 100644 index 00000000..edbbb4ac --- /dev/null +++ b/src/services/storage/entity/HtlcFailures.ts @@ -0,0 +1,6 @@ +import { Entity, PrimaryGeneratedColumn, Column, Index, Check, CreateDateColumn, UpdateDateColumn } from "typeorm" +import { KVStorageBase } from "./KVStorageBase.js"; + +@Entity() +export class HtlcFailures extends KVStorageBase { +} diff --git a/src/services/storage/entity/KVStorageBase.ts b/src/services/storage/entity/KVStorageBase.ts new file mode 100644 index 00000000..78e43166 --- /dev/null +++ b/src/services/storage/entity/KVStorageBase.ts @@ -0,0 +1,22 @@ +import { PrimaryGeneratedColumn, Column, Index, CreateDateColumn, UpdateDateColumn } from "typeorm" + +export abstract class KVStorageBase { + @PrimaryGeneratedColumn() + serial_id: number + + @Column() + @Index({ unique: true }) + key: string + + @Column({ type: 'simple-json' }) + value: object + + @Column() + version: number + + @CreateDateColumn() + created_at: Date + + @UpdateDateColumn() + updated_at: Date +} diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index 08a03715..622133c6 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -5,6 +5,7 @@ import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" import TransactionsQueue, { TX } from "./transactionsQueue.js"; import { StorageSettings } from "./index.js"; import { newMetricsDb } from "./db.js"; +import { HtlcFailures } from "./entity/HtlcFailures.js"; export default class { DB: DataSource | EntityManager settings: StorageSettings @@ -60,4 +61,36 @@ export default class { ]) return { chainBalanceEvents, channelsBalanceEvents } } + + updateHtlcErrors = async (key: string, update: (d: TMPHtlcFailureData) => TMPHtlcFailureData, entityManager = this.DB) => { + const existing = await entityManager.getRepository(HtlcFailures).findOne({ where: { key } }) + if (existing) { + const data = update(existing.value as TMPHtlcFailureData) + await entityManager.getRepository(HtlcFailures).update({ key }, { value: data }) + return + } + const data = update(newTMPHtlcFailureData()) + await entityManager.getRepository(HtlcFailures).save({ key, value: data }) + + } +} +const newTMPHtlcFailureData = () => { + return { + send_failures: 0, + receive_failures: 0, + forward_failures: 0, + forward_failures_amt: 0, + failed_sources: {}, + failed_destinations: {}, + errors: {} + } +} +type TMPHtlcFailureData = { // TODO: move to a file with versions and stuff + send_failures: number + receive_failures: number + forward_failures: number + forward_failures_amt: number + failed_sources: Record + failed_destinations: Record + errors: Record } \ No newline at end of file diff --git a/src/services/storage/migrations/1709149327598-htlc_errors.ts b/src/services/storage/migrations/1709149327598-htlc_errors.ts new file mode 100644 index 00000000..c506be6e --- /dev/null +++ b/src/services/storage/migrations/1709149327598-htlc_errors.ts @@ -0,0 +1,16 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class HtlcErrors1709149327598 implements MigrationInterface { + name = 'HtlcErrors1709149327598' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE TABLE "htlc_failures" ("serial_id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "key" varchar NOT NULL, "value" text NOT NULL, "version" integer NOT NULL, "created_at" datetime NOT NULL DEFAULT (datetime('now')), "updated_at" datetime NOT NULL DEFAULT (datetime('now')))`); + await queryRunner.query(`CREATE UNIQUE INDEX "IDX_40f71cd66bed693e826d91d438" ON "htlc_failures" ("key") `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "IDX_40f71cd66bed693e826d91d438"`); + await queryRunner.query(`DROP TABLE "htlc_failures"`); + } + +} diff --git a/src/services/storage/migrations/runner.ts b/src/services/storage/migrations/runner.ts index 7a7780e5..c9213a81 100644 --- a/src/services/storage/migrations/runner.ts +++ b/src/services/storage/migrations/runner.ts @@ -3,7 +3,8 @@ import { DbSettings } from '../db.js' import Storage, { StorageSettings } from '../index.js' import { Initial1703170309875 } from './1703170309875-initial.js' import { LndMetrics1703170330183 } from './1703170330183-lnd_metrics.js' -const allMigrations = [LndMetrics1703170330183] +import { HtlcErrors1709149327598 } from './1709149327598-htlc_errors.js' +const allMigrations = [LndMetrics1703170330183, HtlcErrors1709149327598] export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise => { if (arg === 'initial_migration') { await connectAndMigrate(log, storageManager, true, settings, [Initial1703170309875], []) @@ -11,6 +12,9 @@ export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Sto } else if (arg === 'lnd_metrics_migration') { await connectAndMigrate(log, storageManager, true, settings, [], [LndMetrics1703170330183]) return true + } else if (arg === 'htlc_errors_migration') { + await connectAndMigrate(log, storageManager, true, settings, [], [HtlcErrors1709149327598]) + return true } else if (arg === 'all_migrations') { await connectAndMigrate(log, storageManager, true, settings, [], allMigrations) return true @@ -34,7 +38,7 @@ const connectAndMigrate = async (log: PubLogger, storageManager: Storage, manual log(executedMigrations.length, "of", migrations.length, "migrations were executed correctly") log(executedMigrations) log("-------------------") - + } if (metricsMigrations.length > 0) { log(executedMetricsMigrations.length, "of", migrations.length, "metrics migrations were executed correctly") log(executedMetricsMigrations)