persisted metrics

This commit is contained in:
boufni95 2025-01-09 16:03:27 +00:00
parent 0bf7a37b6c
commit 6a22e3439a
9 changed files with 230 additions and 25 deletions

View file

@ -1291,7 +1291,9 @@ The nostr server will send back a message response, and inside the body there wi
- __validate_in_nano__: _number_
### UsageMetricTlv
- __available_chunks__: ARRAY of: _number_
- __base_64_tlvs__: ARRAY of: _string_
- __current_chunk__: _number_
### UsageMetrics
- __apps__: MAP with key: _string_ and value: _[AppUsageMetrics](#AppUsageMetrics)_

View file

@ -555,7 +555,9 @@ type UsageMetric struct {
Validate_in_nano int64 `json:"validate_in_nano"`
}
type UsageMetricTlv struct {
Base_64_tlvs []string `json:"base_64_tlvs"`
Available_chunks []int64 `json:"available_chunks"`
Base_64_tlvs []string `json:"base_64_tlvs"`
Current_chunk int64 `json:"current_chunk"`
}
type UsageMetrics struct {
Apps map[string]AppUsageMetrics `json:"apps"`

View file

@ -3162,23 +3162,36 @@ export const UsageMetricValidate = (o?: UsageMetric, opts: UsageMetricOptions =
}
export type UsageMetricTlv = {
available_chunks: number[]
base_64_tlvs: string[]
current_chunk: number
}
export const UsageMetricTlvOptionalFields: [] = []
export type UsageMetricTlvOptions = OptionsBaseMessage & {
checkOptionalsAreSet?: []
available_chunks_CustomCheck?: (v: number[]) => boolean
base_64_tlvs_CustomCheck?: (v: string[]) => boolean
current_chunk_CustomCheck?: (v: number) => boolean
}
export const UsageMetricTlvValidate = (o?: UsageMetricTlv, opts: UsageMetricTlvOptions = {}, path: string = 'UsageMetricTlv::root.'): Error | null => {
if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message')
if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null')
if (!Array.isArray(o.available_chunks)) return new Error(`${path}.available_chunks: is not an array`)
for (let index = 0; index < o.available_chunks.length; index++) {
if (typeof o.available_chunks[index] !== 'number') return new Error(`${path}.available_chunks[${index}]: is not a number`)
}
if (opts.available_chunks_CustomCheck && !opts.available_chunks_CustomCheck(o.available_chunks)) return new Error(`${path}.available_chunks: custom check failed`)
if (!Array.isArray(o.base_64_tlvs)) return new Error(`${path}.base_64_tlvs: is not an array`)
for (let index = 0; index < o.base_64_tlvs.length; index++) {
if (typeof o.base_64_tlvs[index] !== 'string') return new Error(`${path}.base_64_tlvs[${index}]: is not a string`)
}
if (opts.base_64_tlvs_CustomCheck && !opts.base_64_tlvs_CustomCheck(o.base_64_tlvs)) return new Error(`${path}.base_64_tlvs: custom check failed`)
if (typeof o.current_chunk !== 'number') return new Error(`${path}.current_chunk: is not a number`)
if (opts.current_chunk_CustomCheck && !opts.current_chunk_CustomCheck(o.current_chunk)) return new Error(`${path}.current_chunk: custom check failed`)
return null
}

View file

@ -35,6 +35,9 @@ message UsageMetric {
message UsageMetricTlv {
repeated string base_64_tlvs = 1;
int64 current_chunk = 2;
repeated int64 available_chunks = 3;
}
message AppUsageMetrics {

View file

@ -3,6 +3,16 @@ import * as Types from '../../../proto/autogenerated/ts/types.js'
export const utf8Decoder: TextDecoder = new TextDecoder('utf-8')
export const utf8Encoder: TextEncoder = new TextEncoder()
export const encodeListTLV = (list: Uint8Array[]): TLV => {
const tlv: TLV = {}
tlv[64] = list
return tlv
}
export const decodeListTLV = (tlv: TLV): Uint8Array[] => {
return tlv[64]
}
export const usageMetricsToTlv = (metric: Types.UsageMetric): TLV => {
const tlv: TLV = {}
tlv[2] = [integerToUint8Array(Math.ceil(metric.processed_at_ms / 1000))] // 6 -> 6

View file

@ -1,3 +1,4 @@
import fs from 'fs'
import Storage from '../storage/index.js'
import * as Types from '../../../proto/autogenerated/ts/types.js'
import { Application } from '../storage/entity/Application.js'
@ -7,22 +8,28 @@ import { BalanceEvent } from '../storage/entity/BalanceEvent.js'
import { ChannelBalanceEvent } from '../storage/entity/ChannelsBalanceEvent.js'
import LND from '../lnd/lnd.js'
import HtlcTracker from './htlcTracker.js'
import { encodeTLV, usageMetricsToTlv } from './tlv.js'
const maxEvents = 100_000
import { MainSettings } from '../main/settings.js'
import { getLogger } from '../helpers/logger.js'
import { encodeTLV, usageMetricsToTlv } from '../helpers/tlv.js'
export default class Handler {
storage: Storage
lnd: LND
htlcTracker: HtlcTracker
metrics: Record<string, Types.AppUsageMetrics> = {}
constructor(storage: Storage, lnd: LND) {
metricsPath: string
logger = getLogger({ component: "metrics" })
constructor(mainSettings: MainSettings, storage: Storage, lnd: LND) {
this.storage = storage
this.lnd = lnd
this.htlcTracker = new HtlcTracker(this.storage)
this.metricsPath = [mainSettings.storageSettings.dataDir, "metrics"].join("/")
}
async HtlcCb(htlc: HtlcEvent) {
await this.htlcTracker.onHtlcEvent(htlc)
}
@ -31,7 +38,6 @@ export default class Handler {
const providers = await this.storage.liquidityStorage.GetTrackedProviders()
const channels = await this.lnd.GetChannelBalance()
let providerTotal = 0
console.log({ providers })
providers.forEach(p => {
if (p.provider_type === 'lnPub') {
providerTotal += p.latest_balance
@ -64,6 +70,12 @@ export default class Handler {
}))
}
async GetUsageMetrics(): Promise<Types.UsageMetrics> {
return this.storage.metricsEventStorage.LoadLatestMetrics()
}
AddMetrics(newMetrics: (Types.RequestMetric & { app_id?: string })[]) {
newMetrics.forEach(m => {
const appId = m.app_id || "_root"
@ -81,25 +93,29 @@ export default class Handler {
processed_at_ms: m.startMs
}
const tlv = usageMetricsToTlv(um)
const tlvString = Buffer.from(encodeTLV(tlv)).toString("base64")
if (!this.metrics[appId]) {
this.metrics[appId] = { app_metrics: {} }
}
if (!this.metrics[appId].app_metrics[m.rpcName]) {
this.metrics[appId].app_metrics[m.rpcName] = { base_64_tlvs: [] }
}
const len = this.metrics[appId].app_metrics[m.rpcName].base_64_tlvs.push(tlvString)
if (len > maxEvents) {
this.metrics[appId].app_metrics[m.rpcName].base_64_tlvs.splice(0, len - maxEvents)
}
this.storage.metricsEventStorage.AddMetricEvent(appId, m.rpcName, encodeTLV(tlv))
})
}
async GetUsageMetrics(): Promise<Types.UsageMetrics> {
return {
apps: this.metrics
}
}
/* addTrackedMetric = (appId: string, method: string, metric: Uint8Array) => {
if (!this.metaReady) {
throw new Error("meta metrics not ready")
}
const tlvString = Buffer.from(metric).toString("base64")
if (!this.metrics[appId]) {
this.metrics[appId] = { app_metrics: {} }
}
if (!this.metrics[appId].app_metrics[method]) {
this.metrics[appId].app_metrics[method] = { base_64_tlvs: [] }
}
const len = this.metrics[appId].app_metrics[method].base_64_tlvs.push(tlvString)
if (len > maxEvents) {
this.metrics[appId].app_metrics[method].base_64_tlvs.splice(0, len - maxEvents)
}
} */
async GetAppsMetrics(req: Types.AppsMetricsRequest): Promise<Types.AppsMetrics> {
const dbApps = await this.storage.applicationStorage.GetApplications()
const apps = await Promise.all(dbApps.map(app => this.GetAppMetrics(req, app)))

View file

@ -6,6 +6,7 @@ import ApplicationStorage from './applicationStorage.js'
import UserStorage from "./userStorage.js";
import PaymentStorage from "./paymentStorage.js";
import MetricsStorage from "./metricsStorage.js";
import MetricsEventStorage from "./metricsEventStorage.js";
import TransactionsQueue, { TX } from "./transactionsQueue.js";
import EventsLogManager from "./eventsLog.js";
import { LiquidityStorage } from "./liquidityStorage.js";
@ -29,6 +30,7 @@ export default class {
userStorage: UserStorage
paymentStorage: PaymentStorage
metricsStorage: MetricsStorage
metricsEventStorage: MetricsEventStorage
liquidityStorage: LiquidityStorage
debitStorage: DebitStorage
offerStorage: OfferStorage
@ -47,6 +49,7 @@ export default class {
this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue)
this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue)
this.metricsStorage = new MetricsStorage(this.settings)
this.metricsEventStorage = new MetricsEventStorage(this.settings)
this.liquidityStorage = new LiquidityStorage(this.DB, this.txQueue)
this.debitStorage = new DebitStorage(this.DB, this.txQueue)
this.offerStorage = new OfferStorage(this.DB, this.txQueue)

View file

@ -0,0 +1,156 @@
import fs from 'fs'
import * as Types from '../../../proto/autogenerated/ts/types.js'
import { StorageSettings } from "./index.js";
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js';
const chunkSizeBytes = 128 * 1024
export default class {
settings: StorageSettings
metricsPath: string
metaReady = false
metricsMeta: Record<string, Record<string, { chunks: number[] }>> = {}
pendingMetrics: Record<string, Record<string, { tlvs: Uint8Array[] }>> = {}
last24hOk: Record<number, number> = {}
last24hFail: Record<number, number> = {}
lastPersisted: number = 0
constructor(settings: StorageSettings) {
this.settings = settings;
this.metricsPath = [settings.dataDir, "metric_events"].join("/")
this.initMetricsMeta()
setInterval(() => {
if (Date.now() - this.lastPersisted > 1000 * 60 * 5) {
this.persistMetrics()
}
}, 1000 * 60 * 5)
process.on('exit', () => {
this.persistMetrics()
});
// catch ctrl+c event and exit normally
process.on('SIGINT', () => {
console.log('Ctrl-C...');
process.exit(2);
});
//catch uncaught exceptions, trace, then exit normally
process.on('uncaughtException', (e) => {
console.log('Uncaught Exception...');
console.log(e.stack);
process.exit(99);
});
}
AddMetricEvent = (appId: string, method: string, metric: Uint8Array) => {
if (!this.metaReady) {
throw new Error("meta metrics not ready")
}
if (!this.pendingMetrics[appId]) {
this.pendingMetrics[appId] = {}
}
if (!this.pendingMetrics[appId][method]) {
this.pendingMetrics[appId][method] = { tlvs: [] }
}
this.pendingMetrics[appId][method].tlvs.push(metric)
}
LoadLatestMetrics = async (): Promise<Types.UsageMetrics> => {
this.persistMetrics()
const metrics: Types.UsageMetrics = { apps: {} }
this.foreachMetricMethodFile((app, method, tlvFiles) => {
if (tlvFiles.length === 0) { return }
const methodPath = [this.metricsPath, app, method].join("/")
const latest = tlvFiles[tlvFiles.length - 1]
const tlvFile = [methodPath, `${latest}.mtlv`].join("/")
const tlv = fs.readFileSync(tlvFile)
const decoded = decodeListTLV(parseTLV(tlv))
if (!metrics.apps[app]) {
metrics.apps[app] = { app_metrics: {} }
}
metrics.apps[app].app_metrics[method] = {
base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')),
current_chunk: latest,
available_chunks: tlvFiles
}
})
return metrics
}
persistMetrics = () => {
if (!this.metaReady) {
throw new Error("meta metrics not ready")
}
this.lastPersisted = Date.now()
const tosync = this.pendingMetrics
this.pendingMetrics = {}
const apps = Object.keys(tosync)
apps.map(app => {
const appPath = [this.metricsPath, app].join("/")
if (!fs.existsSync(appPath)) {
fs.mkdirSync(appPath, { recursive: true });
}
const methods = Object.keys(tosync[app])
methods.map(methodName => {
const methodPath = [appPath, methodName].join("/")
if (!fs.existsSync(methodPath)) {
fs.mkdirSync(methodPath, { recursive: true });
}
const method = tosync[app][methodName]
const meta = this.getMetricsMeta(app, methodName)
const chunks = meta.chunks.length > 0 ? meta.chunks : [0]
const latest = chunks[chunks.length - 1]
const tlv = encodeTLV(encodeListTLV(method.tlvs))
const tlvFile = [methodPath, `${latest}.mtlv`].join("/")
fs.appendFileSync(tlvFile, Buffer.from(tlv))
if (fs.lstatSync(tlvFile).size > chunkSizeBytes) {
this.updateMetricsMeta(app, methodName, [...chunks, latest + 1])
}
})
})
}
initMetricsMeta = () => {
this.foreachMetricMethodFile((app, method, tlvFiles) => {
this.updateMetricsMeta(app, method, tlvFiles)
})
this.metaReady = true
}
updateMetricsMeta = (appId: string, method: string, sortedChunks: number[]) => {
if (!this.metricsMeta[appId]) {
this.metricsMeta[appId] = {}
}
this.metricsMeta[appId][method] = { chunks: sortedChunks }
}
getMetricsMeta = (appId: string, method: string) => {
if (!this.metricsMeta[appId]) {
return { chunks: [] }
}
return this.metricsMeta[appId][method] || { chunks: [] }
}
foreachMetricMethodFile = (cb: (appId: string, method: string, tlvFiles: number[]) => void) => {
if (!fs.existsSync(this.metricsPath)) {
fs.mkdirSync(this.metricsPath, { recursive: true });
}
const apps = fs.readdirSync(this.metricsPath)
apps.forEach(appDir => {
const appPath = [this.metricsPath, appDir].join("/")
if (!fs.lstatSync(appPath).isDirectory()) {
return
}
const methods = fs.readdirSync(appPath)
methods.forEach(methodDir => {
const methodPath = [appPath, methodDir].join("/")
if (!fs.lstatSync(methodPath).isDirectory()) {
return
}
const tlvFiles = fs.readdirSync(methodPath)
.filter(f => f.endsWith(".mtlv"))
.map(f => +f.slice(0, -".mtlv".length))
.filter(n => !isNaN(n))
.sort((a, b) => a - b)
cb(appDir, methodDir, tlvFiles)
})
})
}
}

View file

@ -34,7 +34,7 @@ export class StateBundler {
latestReport = Date.now()
reportLog = getLogger({ component: 'stateBundlerReport' })
constructor() {
process.on('exit', () => {
/* process.on('exit', () => {
this.Report()
});
@ -49,7 +49,7 @@ export class StateBundler {
console.log('Uncaught Exception...');
console.log(e.stack);
process.exit(99);
});
}); */
}
increment = (key: string, value: number) => {