diff --git a/proto/autogenerated/client.md b/proto/autogenerated/client.md index 01572a2e..9b709dda 100644 --- a/proto/autogenerated/client.md +++ b/proto/autogenerated/client.md @@ -1291,7 +1291,9 @@ The nostr server will send back a message response, and inside the body there wi - __validate_in_nano__: _number_ ### UsageMetricTlv + - __available_chunks__: ARRAY of: _number_ - __base_64_tlvs__: ARRAY of: _string_ + - __current_chunk__: _number_ ### UsageMetrics - __apps__: MAP with key: _string_ and value: _[AppUsageMetrics](#AppUsageMetrics)_ diff --git a/proto/autogenerated/go/types.go b/proto/autogenerated/go/types.go index 8d6ba2c2..a316cbdc 100644 --- a/proto/autogenerated/go/types.go +++ b/proto/autogenerated/go/types.go @@ -555,7 +555,9 @@ type UsageMetric struct { Validate_in_nano int64 `json:"validate_in_nano"` } type UsageMetricTlv struct { - Base_64_tlvs []string `json:"base_64_tlvs"` + Available_chunks []int64 `json:"available_chunks"` + Base_64_tlvs []string `json:"base_64_tlvs"` + Current_chunk int64 `json:"current_chunk"` } type UsageMetrics struct { Apps map[string]AppUsageMetrics `json:"apps"` diff --git a/proto/autogenerated/ts/types.ts b/proto/autogenerated/ts/types.ts index 824204ad..6b815ab7 100644 --- a/proto/autogenerated/ts/types.ts +++ b/proto/autogenerated/ts/types.ts @@ -3162,23 +3162,36 @@ export const UsageMetricValidate = (o?: UsageMetric, opts: UsageMetricOptions = } export type UsageMetricTlv = { + available_chunks: number[] base_64_tlvs: string[] + current_chunk: number } export const UsageMetricTlvOptionalFields: [] = [] export type UsageMetricTlvOptions = OptionsBaseMessage & { checkOptionalsAreSet?: [] + available_chunks_CustomCheck?: (v: number[]) => boolean base_64_tlvs_CustomCheck?: (v: string[]) => boolean + current_chunk_CustomCheck?: (v: number) => boolean } export const UsageMetricTlvValidate = (o?: UsageMetricTlv, opts: UsageMetricTlvOptions = {}, path: string = 'UsageMetricTlv::root.'): Error | null => { if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + if (!Array.isArray(o.available_chunks)) return new Error(`${path}.available_chunks: is not an array`) + for (let index = 0; index < o.available_chunks.length; index++) { + if (typeof o.available_chunks[index] !== 'number') return new Error(`${path}.available_chunks[${index}]: is not a number`) + } + if (opts.available_chunks_CustomCheck && !opts.available_chunks_CustomCheck(o.available_chunks)) return new Error(`${path}.available_chunks: custom check failed`) + if (!Array.isArray(o.base_64_tlvs)) return new Error(`${path}.base_64_tlvs: is not an array`) for (let index = 0; index < o.base_64_tlvs.length; index++) { if (typeof o.base_64_tlvs[index] !== 'string') return new Error(`${path}.base_64_tlvs[${index}]: is not a string`) } if (opts.base_64_tlvs_CustomCheck && !opts.base_64_tlvs_CustomCheck(o.base_64_tlvs)) return new Error(`${path}.base_64_tlvs: custom check failed`) + if (typeof o.current_chunk !== 'number') return new Error(`${path}.current_chunk: is not a number`) + if (opts.current_chunk_CustomCheck && !opts.current_chunk_CustomCheck(o.current_chunk)) return new Error(`${path}.current_chunk: custom check failed`) + return null } diff --git a/proto/service/structs.proto b/proto/service/structs.proto index bece972c..572f224f 100644 --- a/proto/service/structs.proto +++ b/proto/service/structs.proto @@ -35,6 +35,9 @@ message UsageMetric { message UsageMetricTlv { repeated string base_64_tlvs = 1; + int64 current_chunk = 2; + repeated int64 available_chunks = 3; + } message AppUsageMetrics { diff --git a/src/services/metrics/tlv.ts b/src/services/helpers/tlv.ts similarity index 94% rename from src/services/metrics/tlv.ts rename to src/services/helpers/tlv.ts index cf4b2a6e..7e8d6b08 100644 --- a/src/services/metrics/tlv.ts +++ b/src/services/helpers/tlv.ts @@ -3,6 +3,16 @@ import * as Types from '../../../proto/autogenerated/ts/types.js' export const utf8Decoder: TextDecoder = new TextDecoder('utf-8') export const utf8Encoder: TextEncoder = new TextEncoder() +export const encodeListTLV = (list: Uint8Array[]): TLV => { + const tlv: TLV = {} + tlv[64] = list + return tlv +} + +export const decodeListTLV = (tlv: TLV): Uint8Array[] => { + return tlv[64] +} + export const usageMetricsToTlv = (metric: Types.UsageMetric): TLV => { const tlv: TLV = {} tlv[2] = [integerToUint8Array(Math.ceil(metric.processed_at_ms / 1000))] // 6 -> 6 diff --git a/src/services/metrics/index.ts b/src/services/metrics/index.ts index d0894eef..3ee77898 100644 --- a/src/services/metrics/index.ts +++ b/src/services/metrics/index.ts @@ -1,3 +1,4 @@ +import fs from 'fs' import Storage from '../storage/index.js' import * as Types from '../../../proto/autogenerated/ts/types.js' import { Application } from '../storage/entity/Application.js' @@ -7,22 +8,28 @@ import { BalanceEvent } from '../storage/entity/BalanceEvent.js' import { ChannelBalanceEvent } from '../storage/entity/ChannelsBalanceEvent.js' import LND from '../lnd/lnd.js' import HtlcTracker from './htlcTracker.js' -import { encodeTLV, usageMetricsToTlv } from './tlv.js' -const maxEvents = 100_000 +import { MainSettings } from '../main/settings.js' +import { getLogger } from '../helpers/logger.js' +import { encodeTLV, usageMetricsToTlv } from '../helpers/tlv.js' + export default class Handler { - storage: Storage lnd: LND htlcTracker: HtlcTracker - metrics: Record = {} - constructor(storage: Storage, lnd: LND) { + metricsPath: string + logger = getLogger({ component: "metrics" }) + constructor(mainSettings: MainSettings, storage: Storage, lnd: LND) { this.storage = storage this.lnd = lnd this.htlcTracker = new HtlcTracker(this.storage) + this.metricsPath = [mainSettings.storageSettings.dataDir, "metrics"].join("/") + } + + async HtlcCb(htlc: HtlcEvent) { await this.htlcTracker.onHtlcEvent(htlc) } @@ -31,7 +38,6 @@ export default class Handler { const providers = await this.storage.liquidityStorage.GetTrackedProviders() const channels = await this.lnd.GetChannelBalance() let providerTotal = 0 - console.log({ providers }) providers.forEach(p => { if (p.provider_type === 'lnPub') { providerTotal += p.latest_balance @@ -64,6 +70,12 @@ export default class Handler { })) } + async GetUsageMetrics(): Promise { + return this.storage.metricsEventStorage.LoadLatestMetrics() + } + + + AddMetrics(newMetrics: (Types.RequestMetric & { app_id?: string })[]) { newMetrics.forEach(m => { const appId = m.app_id || "_root" @@ -81,25 +93,29 @@ export default class Handler { processed_at_ms: m.startMs } const tlv = usageMetricsToTlv(um) - const tlvString = Buffer.from(encodeTLV(tlv)).toString("base64") - if (!this.metrics[appId]) { - this.metrics[appId] = { app_metrics: {} } - } - if (!this.metrics[appId].app_metrics[m.rpcName]) { - this.metrics[appId].app_metrics[m.rpcName] = { base_64_tlvs: [] } - } - const len = this.metrics[appId].app_metrics[m.rpcName].base_64_tlvs.push(tlvString) - if (len > maxEvents) { - this.metrics[appId].app_metrics[m.rpcName].base_64_tlvs.splice(0, len - maxEvents) - } + this.storage.metricsEventStorage.AddMetricEvent(appId, m.rpcName, encodeTLV(tlv)) }) } - async GetUsageMetrics(): Promise { - return { - apps: this.metrics - } - } + + + /* addTrackedMetric = (appId: string, method: string, metric: Uint8Array) => { + if (!this.metaReady) { + throw new Error("meta metrics not ready") + } + const tlvString = Buffer.from(metric).toString("base64") + if (!this.metrics[appId]) { + this.metrics[appId] = { app_metrics: {} } + } + if (!this.metrics[appId].app_metrics[method]) { + this.metrics[appId].app_metrics[method] = { base_64_tlvs: [] } + } + const len = this.metrics[appId].app_metrics[method].base_64_tlvs.push(tlvString) + if (len > maxEvents) { + this.metrics[appId].app_metrics[method].base_64_tlvs.splice(0, len - maxEvents) + } + } */ + async GetAppsMetrics(req: Types.AppsMetricsRequest): Promise { const dbApps = await this.storage.applicationStorage.GetApplications() const apps = await Promise.all(dbApps.map(app => this.GetAppMetrics(req, app))) diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index f266fd01..7f105ce0 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -6,6 +6,7 @@ import ApplicationStorage from './applicationStorage.js' import UserStorage from "./userStorage.js"; import PaymentStorage from "./paymentStorage.js"; import MetricsStorage from "./metricsStorage.js"; +import MetricsEventStorage from "./metricsEventStorage.js"; import TransactionsQueue, { TX } from "./transactionsQueue.js"; import EventsLogManager from "./eventsLog.js"; import { LiquidityStorage } from "./liquidityStorage.js"; @@ -29,6 +30,7 @@ export default class { userStorage: UserStorage paymentStorage: PaymentStorage metricsStorage: MetricsStorage + metricsEventStorage: MetricsEventStorage liquidityStorage: LiquidityStorage debitStorage: DebitStorage offerStorage: OfferStorage @@ -47,6 +49,7 @@ export default class { this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue) this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue) this.metricsStorage = new MetricsStorage(this.settings) + this.metricsEventStorage = new MetricsEventStorage(this.settings) this.liquidityStorage = new LiquidityStorage(this.DB, this.txQueue) this.debitStorage = new DebitStorage(this.DB, this.txQueue) this.offerStorage = new OfferStorage(this.DB, this.txQueue) diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/metricsEventStorage.ts new file mode 100644 index 00000000..6bf436e5 --- /dev/null +++ b/src/services/storage/metricsEventStorage.ts @@ -0,0 +1,156 @@ +import fs from 'fs' +import * as Types from '../../../proto/autogenerated/ts/types.js' +import { StorageSettings } from "./index.js"; +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'; +const chunkSizeBytes = 128 * 1024 +export default class { + settings: StorageSettings + metricsPath: string + metaReady = false + metricsMeta: Record> = {} + pendingMetrics: Record> = {} + last24hOk: Record = {} + last24hFail: Record = {} + lastPersisted: number = 0 + constructor(settings: StorageSettings) { + this.settings = settings; + this.metricsPath = [settings.dataDir, "metric_events"].join("/") + this.initMetricsMeta() + setInterval(() => { + if (Date.now() - this.lastPersisted > 1000 * 60 * 5) { + this.persistMetrics() + } + }, 1000 * 60 * 5) + process.on('exit', () => { + this.persistMetrics() + }); + + // catch ctrl+c event and exit normally + process.on('SIGINT', () => { + console.log('Ctrl-C...'); + process.exit(2); + }); + + //catch uncaught exceptions, trace, then exit normally + process.on('uncaughtException', (e) => { + console.log('Uncaught Exception...'); + console.log(e.stack); + process.exit(99); + }); + } + + AddMetricEvent = (appId: string, method: string, metric: Uint8Array) => { + if (!this.metaReady) { + throw new Error("meta metrics not ready") + } + if (!this.pendingMetrics[appId]) { + this.pendingMetrics[appId] = {} + } + if (!this.pendingMetrics[appId][method]) { + this.pendingMetrics[appId][method] = { tlvs: [] } + } + this.pendingMetrics[appId][method].tlvs.push(metric) + } + + LoadLatestMetrics = async (): Promise => { + this.persistMetrics() + const metrics: Types.UsageMetrics = { apps: {} } + this.foreachMetricMethodFile((app, method, tlvFiles) => { + if (tlvFiles.length === 0) { return } + const methodPath = [this.metricsPath, app, method].join("/") + const latest = tlvFiles[tlvFiles.length - 1] + const tlvFile = [methodPath, `${latest}.mtlv`].join("/") + const tlv = fs.readFileSync(tlvFile) + const decoded = decodeListTLV(parseTLV(tlv)) + if (!metrics.apps[app]) { + metrics.apps[app] = { app_metrics: {} } + } + metrics.apps[app].app_metrics[method] = { + base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')), + current_chunk: latest, + available_chunks: tlvFiles + } + }) + return metrics + } + + persistMetrics = () => { + if (!this.metaReady) { + throw new Error("meta metrics not ready") + } + this.lastPersisted = Date.now() + const tosync = this.pendingMetrics + this.pendingMetrics = {} + const apps = Object.keys(tosync) + apps.map(app => { + const appPath = [this.metricsPath, app].join("/") + if (!fs.existsSync(appPath)) { + fs.mkdirSync(appPath, { recursive: true }); + } + const methods = Object.keys(tosync[app]) + methods.map(methodName => { + const methodPath = [appPath, methodName].join("/") + if (!fs.existsSync(methodPath)) { + fs.mkdirSync(methodPath, { recursive: true }); + } + const method = tosync[app][methodName] + const meta = this.getMetricsMeta(app, methodName) + const chunks = meta.chunks.length > 0 ? meta.chunks : [0] + const latest = chunks[chunks.length - 1] + const tlv = encodeTLV(encodeListTLV(method.tlvs)) + const tlvFile = [methodPath, `${latest}.mtlv`].join("/") + fs.appendFileSync(tlvFile, Buffer.from(tlv)) + if (fs.lstatSync(tlvFile).size > chunkSizeBytes) { + this.updateMetricsMeta(app, methodName, [...chunks, latest + 1]) + } + }) + }) + } + + initMetricsMeta = () => { + this.foreachMetricMethodFile((app, method, tlvFiles) => { + this.updateMetricsMeta(app, method, tlvFiles) + }) + this.metaReady = true + } + + updateMetricsMeta = (appId: string, method: string, sortedChunks: number[]) => { + if (!this.metricsMeta[appId]) { + this.metricsMeta[appId] = {} + } + this.metricsMeta[appId][method] = { chunks: sortedChunks } + } + + getMetricsMeta = (appId: string, method: string) => { + if (!this.metricsMeta[appId]) { + return { chunks: [] } + } + return this.metricsMeta[appId][method] || { chunks: [] } + } + + foreachMetricMethodFile = (cb: (appId: string, method: string, tlvFiles: number[]) => void) => { + if (!fs.existsSync(this.metricsPath)) { + fs.mkdirSync(this.metricsPath, { recursive: true }); + } + const apps = fs.readdirSync(this.metricsPath) + apps.forEach(appDir => { + const appPath = [this.metricsPath, appDir].join("/") + if (!fs.lstatSync(appPath).isDirectory()) { + return + } + const methods = fs.readdirSync(appPath) + methods.forEach(methodDir => { + const methodPath = [appPath, methodDir].join("/") + if (!fs.lstatSync(methodPath).isDirectory()) { + return + } + const tlvFiles = fs.readdirSync(methodPath) + .filter(f => f.endsWith(".mtlv")) + .map(f => +f.slice(0, -".mtlv".length)) + .filter(n => !isNaN(n)) + .sort((a, b) => a - b) + cb(appDir, methodDir, tlvFiles) + }) + }) + } +} \ No newline at end of file diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/stateBundler.ts index dab2a215..4cd000be 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/stateBundler.ts @@ -34,7 +34,7 @@ export class StateBundler { latestReport = Date.now() reportLog = getLogger({ component: 'stateBundlerReport' }) constructor() { - process.on('exit', () => { + /* process.on('exit', () => { this.Report() }); @@ -49,7 +49,7 @@ export class StateBundler { console.log('Uncaught Exception...'); console.log(e.stack); process.exit(99); - }); + }); */ } increment = (key: string, value: number) => {