htlc tracker

This commit is contained in:
boufni95 2024-02-29 18:37:35 +01:00
parent 368f9aee4f
commit 5b6a558048
8 changed files with 242 additions and 31 deletions

View file

@ -1,12 +1,10 @@
import { DataSource } from "typeorm"
import { BalanceEvent } from "./build/src/services/storage/entity/BalanceEvent.js"
import { ChannelBalanceEvent } from "./build/src/services/storage/entity/ChannelsBalanceEvent.js"
import { RoutingEvent } from "./build/src/services/storage/entity/RoutingEvent.js"
import { HtlcFailures } from "./build/src/services/storage/entity/HtlcFailures.js"
export default new DataSource({
type: "sqlite",
database: "metrics.sqlite",
entities: [ RoutingEvent, BalanceEvent, ChannelBalanceEvent],
entities: [HtlcFailures],
});

View file

@ -0,0 +1,153 @@
import Storage from '../storage/index.js'
import { ForwardEvent, HtlcEvent, HtlcEvent_EventType } from "../../../proto/lnd/router.js";
import { getLogger } from "../helpers/logger.js";
type EventInfo = {
eventType: HtlcEvent_EventType
outgoingHtlcId: number
incomingHtlcId: number
outgoingChannelId: number
incomingChannelId: number
}
export default class HtlcTracker {
storage: Storage
pendingSendHtlcs: Map<number, number> = new Map()
pendingReceiveHtlcs: Map<number, number> = new Map()
pendingForwardHtlcs: Map<number, number> = new Map()
constructor(storage: Storage) {
this.storage = storage
}
log = getLogger({ appName: 'htlcTracker' })
onHtlcEvent = async (htlc: HtlcEvent) => {
const htlcEvent = htlc.event
if (htlcEvent.oneofKind === 'subscribedEvent') {
this.log("htlc subscribed")
return
}
const outgoingHtlcId = Number(htlc.outgoingHtlcId)
const incomingHtlcId = Number(htlc.incomingHtlcId)
const outgoingChannelId = Number(htlc.outgoingChannelId)
const incomingChannelId = Number(htlc.incomingChannelId)
const info: EventInfo = { eventType: htlc.eventType, outgoingChannelId, incomingChannelId, outgoingHtlcId, incomingHtlcId }
switch (htlcEvent.oneofKind) {
case 'forwardEvent':
return this.handleForward(htlcEvent.forwardEvent, info)
case 'forwardFailEvent':
return this.handleFailure({ ...info, failureReason: 'forwardFailEvent' })
case 'linkFailEvent':
return this.handleFailure({ ...info, failureReason: htlcEvent.linkFailEvent.failureString || 'linkFailEvent' })
case 'finalHtlcEvent':
if (!htlcEvent.finalHtlcEvent.settled) {
return this.handleFailure({ ...info, failureReason: 'finalHtlcEvent' })
} else {
return this.handleSuccess(info)
}
case 'settleEvent':
return this.handleSuccess(info)
default:
this.log("unknown htlc event type")
}
}
handleForward = (fwe: ForwardEvent, { eventType, outgoingHtlcId, incomingHtlcId }: EventInfo) => {
this.log("new forward event, currently tracked htlcs: (s,r,f)", this.pendingSendHtlcs.size, this.pendingReceiveHtlcs.size, this.pendingForwardHtlcs.size)
const { info } = fwe
const incomingAmtMsat = info ? Number(info.incomingAmtMsat) : 0
const outgoingAmtMsat = info ? Number(info.outgoingAmtMsat) : 0
if (eventType === HtlcEvent_EventType.SEND) {
this.pendingSendHtlcs.set(outgoingHtlcId, outgoingAmtMsat - incomingAmtMsat)
} else if (eventType === HtlcEvent_EventType.RECEIVE) {
this.pendingReceiveHtlcs.set(incomingHtlcId, incomingAmtMsat - outgoingAmtMsat)
} else if (eventType === HtlcEvent_EventType.FORWARD) {
this.pendingForwardHtlcs.set(outgoingHtlcId, outgoingAmtMsat - incomingAmtMsat)
} else {
this.log("unknown htlc event type for forward event")
}
}
handleFailure = ({ eventType, outgoingHtlcId, incomingHtlcId, incomingChannelId, outgoingChannelId, failureReason }: EventInfo & { failureReason: string }) => {
if (eventType === HtlcEvent_EventType.SEND && this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs) !== null) {
return this.incrementSendFailures(outgoingChannelId, failureReason)
}
if (eventType === HtlcEvent_EventType.RECEIVE && this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) !== null) {
return this.incrementReceiveFailures(incomingChannelId, failureReason)
}
if (eventType === HtlcEvent_EventType.FORWARD) {
const amt = this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs)
if (amt !== null) {
return this.incrementForwardFailures(incomingChannelId, outgoingChannelId, amt, failureReason)
}
}
if (eventType === HtlcEvent_EventType.UNKNOWN) {
const fwdAmt = this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs)
if (fwdAmt !== null) {
return this.incrementForwardFailures(incomingChannelId, outgoingChannelId, fwdAmt, failureReason)
}
if (this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs) !== null) {
return this.incrementSendFailures(outgoingChannelId, failureReason)
}
if (this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) !== null) {
return this.incrementReceiveFailures(incomingChannelId, failureReason)
}
}
this.log("unknown htlc event type for failure event")
}
handleSuccess = ({ eventType, outgoingHtlcId, incomingHtlcId }: EventInfo) => {
if (eventType === HtlcEvent_EventType.SEND) {
this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs)
} else if (eventType === HtlcEvent_EventType.RECEIVE) {
this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs)
} else if (eventType === HtlcEvent_EventType.FORWARD) {
this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs)
} else if (eventType === HtlcEvent_EventType.UNKNOWN) {
if (this.deleteMapEntry(outgoingHtlcId, this.pendingSendHtlcs) !== null) return
if (this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) !== null) return
if (this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs) !== null) return
} else {
this.log("unknown htlc event type for success event")
}
}
deleteMapEntry = (key: number, map: Map<number, number>) => {
if (!map.has(key)) {
return null
}
const v = map.get(key)
map.delete(key)
return v || null
}
incrementSendFailures = async (outgoingChannelId: number, reason: string) => {
await this.storage.metricsStorage.updateHtlcErrors(getToday(), d => {
d.send_failures++
d.failed_destinations[outgoingChannelId] = (d.failed_destinations[outgoingChannelId] || 0) + 1
d.errors[reason] = (d.errors[reason] || 0) + 1
return d
})
}
incrementReceiveFailures = async (incomingChannelId: number, reason: string) => {
await this.storage.metricsStorage.updateHtlcErrors(getToday(), d => {
d.receive_failures++
d.failed_sources[incomingChannelId] = (d.failed_sources[incomingChannelId] || 0) + 1
d.errors[reason] = (d.errors[reason] || 0) + 1
return d
})
}
incrementForwardFailures = async (incomingChannelId: number, outgoingChannelId: number, amt: number, reason: string) => {
await this.storage.metricsStorage.updateHtlcErrors(getToday(), d => {
d.forward_failures++
d.forward_failures_amt += amt
d.failed_sources[incomingChannelId] = (d.failed_sources[incomingChannelId] || 0) + 1
d.failed_destinations[outgoingChannelId] = (d.failed_destinations[outgoingChannelId] || 0) + 1
d.errors[reason] = (d.errors[reason] || 0) + 1
return d
})
}
}
const getToday = () => {
const now = new Date()
return `${now.getFullYear()}-${z(now.getMonth() + 1)}-${z(now.getDate())}`
}
const z = (n: number) => n < 10 ? `0${n}` : `${n}`

View file

@ -7,42 +7,21 @@ import { BalanceInfo } from '../lnd/settings.js'
import { BalanceEvent } from '../storage/entity/BalanceEvent.js'
import { ChannelBalanceEvent } from '../storage/entity/ChannelsBalanceEvent.js'
import { LightningHandler } from '../lnd/index.js'
import HtlcTracker from './htlcTracker.js'
const maxEvents = 100_000
export default class Handler {
storage: Storage
lnd: LightningHandler
htlcTracker: HtlcTracker
metrics: Types.UsageMetric[] = []
constructor(storage: Storage, lnd: LightningHandler) {
this.storage = storage
this.lnd = lnd
this.htlcTracker = new HtlcTracker(this.storage)
}
async HtlcCb(htlc: HtlcEvent) {
const routingEvent: Partial<RoutingEvent> = {}
routingEvent.event_type = HtlcEvent_EventType[htlc.eventType]
routingEvent.incoming_channel_id = Number(htlc.incomingChannelId)
routingEvent.incoming_htlc_id = Number(htlc.incomingHtlcId)
routingEvent.outgoing_channel_id = Number(htlc.outgoingChannelId)
routingEvent.outgoing_htlc_id = Number(htlc.outgoingHtlcId)
routingEvent.timestamp_ns = Number(htlc.timestampNs)
if (htlc.event.oneofKind === 'finalHtlcEvent') {
routingEvent.offchain = htlc.event.finalHtlcEvent.offchain
routingEvent.settled = htlc.event.finalHtlcEvent.settled
} else if (htlc.event.oneofKind === 'forwardEvent') {
const { info } = htlc.event.forwardEvent
routingEvent.incoming_amt_msat = info ? Number(info.incomingAmtMsat) : undefined
routingEvent.outgoing_amt_msat = info ? Number(info.outgoingAmtMsat) : undefined
} else if (htlc.event.oneofKind === 'settleEvent') {
} else if (htlc.event.oneofKind === 'subscribedEvent') {
} else if (htlc.event.oneofKind === 'forwardFailEvent') {
routingEvent.forward_fail_event = true
} else if (htlc.event.oneofKind === 'linkFailEvent') {
routingEvent.failure_string = htlc.event.linkFailEvent.failureString
const { info } = htlc.event.linkFailEvent
routingEvent.incoming_amt_msat = info ? Number(info.incomingAmtMsat) : undefined
routingEvent.outgoing_amt_msat = info ? Number(info.outgoingAmtMsat) : undefined
}
await this.storage.metricsStorage.SaveRoutingEvent(routingEvent)
await this.htlcTracker.onHtlcEvent(htlc)
}
async NewBlockCb(height: number, balanceInfo: BalanceInfo) {

View file

@ -0,0 +1,6 @@
import { Entity, PrimaryGeneratedColumn, Column, Index, Check, CreateDateColumn, UpdateDateColumn } from "typeorm"
import { KVStorageBase } from "./KVStorageBase.js";
@Entity()
export class HtlcFailures extends KVStorageBase {
}

View file

@ -0,0 +1,22 @@
import { PrimaryGeneratedColumn, Column, Index, CreateDateColumn, UpdateDateColumn } from "typeorm"
export abstract class KVStorageBase {
@PrimaryGeneratedColumn()
serial_id: number
@Column()
@Index({ unique: true })
key: string
@Column({ type: 'simple-json' })
value: object
@Column()
version: number
@CreateDateColumn()
created_at: Date
@UpdateDateColumn()
updated_at: Date
}

View file

@ -5,6 +5,7 @@ import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js"
import TransactionsQueue, { TX } from "./transactionsQueue.js";
import { StorageSettings } from "./index.js";
import { newMetricsDb } from "./db.js";
import { HtlcFailures } from "./entity/HtlcFailures.js";
export default class {
DB: DataSource | EntityManager
settings: StorageSettings
@ -60,4 +61,36 @@ export default class {
])
return { chainBalanceEvents, channelsBalanceEvents }
}
updateHtlcErrors = async (key: string, update: (d: TMPHtlcFailureData) => TMPHtlcFailureData, entityManager = this.DB) => {
const existing = await entityManager.getRepository(HtlcFailures).findOne({ where: { key } })
if (existing) {
const data = update(existing.value as TMPHtlcFailureData)
await entityManager.getRepository(HtlcFailures).update({ key }, { value: data })
return
}
const data = update(newTMPHtlcFailureData())
await entityManager.getRepository(HtlcFailures).save({ key, value: data })
}
}
const newTMPHtlcFailureData = () => {
return {
send_failures: 0,
receive_failures: 0,
forward_failures: 0,
forward_failures_amt: 0,
failed_sources: {},
failed_destinations: {},
errors: {}
}
}
type TMPHtlcFailureData = { // TODO: move to a file with versions and stuff
send_failures: number
receive_failures: number
forward_failures: number
forward_failures_amt: number
failed_sources: Record<number, number>
failed_destinations: Record<number, number>
errors: Record<string, number>
}

View file

@ -0,0 +1,16 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class HtlcErrors1709149327598 implements MigrationInterface {
name = 'HtlcErrors1709149327598'
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "htlc_failures" ("serial_id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "key" varchar NOT NULL, "value" text NOT NULL, "version" integer NOT NULL, "created_at" datetime NOT NULL DEFAULT (datetime('now')), "updated_at" datetime NOT NULL DEFAULT (datetime('now')))`);
await queryRunner.query(`CREATE UNIQUE INDEX "IDX_40f71cd66bed693e826d91d438" ON "htlc_failures" ("key") `);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "IDX_40f71cd66bed693e826d91d438"`);
await queryRunner.query(`DROP TABLE "htlc_failures"`);
}
}

View file

@ -3,7 +3,8 @@ import { DbSettings } from '../db.js'
import Storage, { StorageSettings } from '../index.js'
import { Initial1703170309875 } from './1703170309875-initial.js'
import { LndMetrics1703170330183 } from './1703170330183-lnd_metrics.js'
const allMigrations = [LndMetrics1703170330183]
import { HtlcErrors1709149327598 } from './1709149327598-htlc_errors.js'
const allMigrations = [LndMetrics1703170330183, HtlcErrors1709149327598]
export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise<boolean> => {
if (arg === 'initial_migration') {
await connectAndMigrate(log, storageManager, true, settings, [Initial1703170309875], [])
@ -11,6 +12,9 @@ export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Sto
} else if (arg === 'lnd_metrics_migration') {
await connectAndMigrate(log, storageManager, true, settings, [], [LndMetrics1703170330183])
return true
} else if (arg === 'htlc_errors_migration') {
await connectAndMigrate(log, storageManager, true, settings, [], [HtlcErrors1709149327598])
return true
} else if (arg === 'all_migrations') {
await connectAndMigrate(log, storageManager, true, settings, [], allMigrations)
return true