diff --git a/metric_cache/last24hSF.json b/metric_cache/last24hSF.json new file mode 100644 index 00000000..0637a088 --- /dev/null +++ b/metric_cache/last24hSF.json @@ -0,0 +1 @@ +[] \ No newline at end of file diff --git a/src/nostrMiddleware.ts b/src/nostrMiddleware.ts index 39d79605..4b6f6cef 100644 --- a/src/nostrMiddleware.ts +++ b/src/nostrMiddleware.ts @@ -38,7 +38,7 @@ export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSett }, logger: { log: console.log, error: err => log(ERROR, err) }, }) - const nostr = new Nostr(nostrSettings, event => { + const nostr = new Nostr(nostrSettings, mainHandler.utils, event => { let j: NostrRequest try { j = JSON.parse(event.content) diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index 7c83e186..71a02bdd 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -2,15 +2,24 @@ import { MainSettings } from "../main/settings.js"; import { StateBundler } from "../storage/tlv/stateBundler.js"; import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; import { NostrSend } from "../nostr/handler.js"; +import { ProcessMetricsCollector } from "../storage/tlv/processMetricsCollector.js"; +type UtilsSettings = { + noCollector?: boolean + dataDir: string +} export class Utils { tlvStorageFactory: TlvStorageFactory stateBundler: StateBundler settings: MainSettings _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } - constructor(settings: MainSettings) { - this.settings = settings + constructor({ noCollector, dataDir }: UtilsSettings) { this.tlvStorageFactory = new TlvStorageFactory() - this.stateBundler = new StateBundler(settings.storageSettings, this.tlvStorageFactory) + this.stateBundler = new StateBundler(dataDir, this.tlvStorageFactory) + if (!noCollector) { + new ProcessMetricsCollector((metrics) => { + this.tlvStorageFactory.ProcessMetrics(metrics, '') + }) + } } attachNostrSend(f: NostrSend) { diff --git a/src/services/main/init.ts b/src/services/main/init.ts index 5f2d4034..35a5e37d 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -17,9 +17,9 @@ export type AppData = { name: string; } export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => { - const utils = new Utils(mainSettings) - const storageManager = new Storage(mainSettings.storageSettings) - await storageManager.Connect(log, utils.tlvStorageFactory) + const utils = new Utils({ dataDir: mainSettings.storageSettings.dataDir }) + const storageManager = new Storage(mainSettings.storageSettings, utils) + await storageManager.Connect(log) /* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) if (manualMigration) { return diff --git a/src/services/nostr/handler.ts b/src/services/nostr/handler.ts index c84a2c5f..34050167 100644 --- a/src/services/nostr/handler.ts +++ b/src/services/nostr/handler.ts @@ -6,6 +6,7 @@ import { SimplePool, Event, UnsignedEvent, getEventHash, finalizeEvent, Relay, n import { ERROR, getLogger } from '../helpers/logger.js' import { nip19 } from 'nostr-tools' import { encrypt as encryptV1, decrypt as decryptV1, getSharedSecret as getConversationKeyV1 } from './nip44v1.js' +import { ProcessMetrics, ProcessMetricsCollector } from '../storage/tlv/processMetricsCollector.js' const { nprofileEncode } = nip19 const { v2 } = nip44 const { encrypt: encryptV2, decrypt: decryptV2, utils } = v2 @@ -50,9 +51,13 @@ type EventResponse = { type: 'event' event: NostrEvent } +type ProcessMetricsResponse = { + type: 'processMetrics' + metrics: ProcessMetrics +} export type ChildProcessRequest = SettingsRequest | SendRequest -export type ChildProcessResponse = ReadyResponse | EventResponse +export type ChildProcessResponse = ReadyResponse | EventResponse | ProcessMetricsResponse const send = (message: ChildProcessResponse) => { if (process.send) { process.send(message, undefined, undefined, err => { @@ -88,6 +93,13 @@ const initSubprocessHandler = (settings: NostrSettings) => { event: event }) }) + + new ProcessMetricsCollector((metrics) => { + send({ + type: 'processMetrics', + metrics + }) + }) } const sendToNostr: NostrSend = (initiator, data, relays) => { if (!subProcessHandler) { diff --git a/src/services/nostr/index.ts b/src/services/nostr/index.ts index 845cbbd2..cbe55034 100644 --- a/src/services/nostr/index.ts +++ b/src/services/nostr/index.ts @@ -1,6 +1,7 @@ import { ChildProcess, fork } from 'child_process' import { EnvMustBeNonEmptyString } from "../helpers/envParser.js" import { NostrSettings, NostrEvent, ChildProcessRequest, ChildProcessResponse, SendData, SendInitiator } from "./handler.js" +import { Utils } from '../helpers/utilsWrapper.js' type EventCallback = (event: NostrEvent) => void const getEnvOrDefault = (name: string, defaultValue: string): string => { @@ -17,7 +18,9 @@ export const LoadNosrtSettingsFromEnv = (test = false) => { export default class NostrSubprocess { settings: NostrSettings childProcess: ChildProcess - constructor(settings: NostrSettings, eventCallback: EventCallback) { + utils: Utils + constructor(settings: NostrSettings, utils: Utils, eventCallback: EventCallback) { + this.utils = utils this.childProcess = fork("./build/src/services/nostr/handler") this.childProcess.on("error", console.error) this.childProcess.on("message", (message: ChildProcessResponse) => { @@ -28,6 +31,9 @@ export default class NostrSubprocess { case 'event': eventCallback(message.event) break + case 'processMetrics': + this.utils.tlvStorageFactory.ProcessMetrics(message.metrics, 'nostr') + break default: console.error("unknown nostr event response", message) break; diff --git a/src/services/storage/db/storageInterface.ts b/src/services/storage/db/storageInterface.ts index ae97409a..00794f08 100644 --- a/src/services/storage/db/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -11,9 +11,12 @@ import { DecrementOperation, SumOperation, DBNames, + SuccessOperationResponse, } from './storageProcessor.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType.js'; import { serializeRequest, WhereCondition } from './serializationHelpers.js'; +import { Utils } from '../../helpers/utilsWrapper.js'; +import { ProcessMetrics } from '../tlv/processMetricsCollector.js'; export type TX = (txId: string) => Promise @@ -22,21 +25,33 @@ export class StorageInterface extends EventEmitter { private process: ChildProcess; private isConnected: boolean = false; private debug: boolean = false; + private utils: Utils + private dbType: 'main' | 'metrics' - constructor() { + constructor(utils: Utils) { super(); this.initializeSubprocess(); + this.utils = utils } setDebug(debug: boolean) { this.debug = debug; } + private handleCollectedProcessMetrics(metrics: SuccessOperationResponse) { + if (!this.dbType) return + this.utils.tlvStorageFactory.ProcessMetrics(metrics.data, this.dbType + '_storage') + } + private initializeSubprocess() { this.process = fork('./build/src/services/storage/db/storageProcessor'); this.process.on('message', (response: OperationResponse) => { - this.emit(response.opId, response); + if (response.success && response.type === 'processMetrics') { + this.handleCollectedProcessMetrics(response) + } else { + this.emit(response.opId, response); + } }); this.process.on('error', (error: Error) => { @@ -54,6 +69,7 @@ export class StorageInterface extends EventEmitter { Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise { const opId = Math.random().toString() + this.dbType = dbType const connectOp: ConnectOperation = { type: 'connect', opId, settings, dbType } return this.handleOp(connectOp) } diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index e832290c..e8f9e771 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -13,6 +13,7 @@ import OfferStorage from "./offerStorage.js" import { StorageInterface, TX } from "./db/storageInterface.js"; import { PubLogger } from "../helpers/logger.js" import { TlvStorageFactory } from './tlv/tlvFilesStorageFactory.js'; +import { Utils } from '../helpers/utilsWrapper.js'; export type StorageSettings = { dbSettings: DbSettings eventLogPath: string @@ -36,12 +37,14 @@ export default class { debitStorage: DebitStorage offerStorage: OfferStorage eventsLog: EventsLogManager - constructor(settings: StorageSettings) { + utils: Utils + constructor(settings: StorageSettings, utils: Utils) { this.settings = settings + this.utils = utils this.eventsLog = new EventsLogManager(settings.eventLogPath) } - async Connect(log: PubLogger, tlvStorageFactory: TlvStorageFactory) { - this.dbs = new StorageInterface() + async Connect(log: PubLogger) { + this.dbs = new StorageInterface(this.utils) await this.dbs.Connect(this.settings.dbSettings, 'main') //const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations) //this.DB = source @@ -50,8 +53,8 @@ export default class { this.productStorage = new ProductStorage(this.dbs) this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage) this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage) - this.metricsStorage = new MetricsStorage(this.settings) - this.metricsEventStorage = new MetricsEventStorage(this.settings, tlvStorageFactory) + this.metricsStorage = new MetricsStorage(this.settings, this.utils) + this.metricsEventStorage = new MetricsEventStorage(this.settings, this.utils.tlvStorageFactory) this.liquidityStorage = new LiquidityStorage(this.dbs) this.debitStorage = new DebitStorage(this.dbs) this.offerStorage = new OfferStorage(this.dbs) diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index a47b20b3..3dd36e2f 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -7,19 +7,22 @@ import { newMetricsDb } from "./db/db.js"; import { ChannelRouting } from "./entity/ChannelRouting.js"; import { RootOperation } from "./entity/RootOperation.js"; import { StorageInterface } from "./db/storageInterface.js"; +import { Utils } from "../helpers/utilsWrapper.js"; export default class { //DB: DataSource | EntityManager settings: StorageSettings dbs: StorageInterface + utils: Utils //txQueue: TransactionsQueue - constructor(settings: StorageSettings) { + constructor(settings: StorageSettings, utils: Utils) { this.settings = settings; + this.utils = utils } async Connect() { //const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) //this.DB = source; //this.txQueue = new TransactionsQueue("metrics", this.DB) - this.dbs = new StorageInterface() + this.dbs = new StorageInterface(this.utils) await this.dbs.Connect(this.settings.dbSettings, 'metrics') //return executedMigrations; } diff --git a/src/services/storage/tlv/processMetricsCollector.ts b/src/services/storage/tlv/processMetricsCollector.ts new file mode 100644 index 00000000..52047a40 --- /dev/null +++ b/src/services/storage/tlv/processMetricsCollector.ts @@ -0,0 +1,39 @@ +import { getLogger } from "../../helpers/logger.js" +export type ProcessMetrics = { + memory_rss_kb?: number + memory_buffer_kb?: number + memory_heap_total_kb?: number + memory_heap_used_kb?: number + memory_external_kb?: number +} +export class ProcessMetricsCollector { + reportLog = getLogger({ component: 'ProcessMetricsCollector' }) + prevValues: Record = {} + interval: NodeJS.Timeout + constructor(cb: (metrics: ProcessMetrics) => void) { + this.interval = setInterval(() => { + const mem = process.memoryUsage() + const metrics: ProcessMetrics = { + memory_rss_kb: this.AddValue('memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true), + memory_buffer_kb: this.AddValue('memory_buffer_kb', Math.ceil(mem.arrayBuffers / 1000 || 0), true), + memory_heap_total_kb: this.AddValue('memory_heap_total_kb', Math.ceil(mem.heapTotal / 1000 || 0), true), + memory_heap_used_kb: this.AddValue('memory_heap_used_kb', Math.ceil(mem.heapUsed / 1000 || 0), true), + memory_external_kb: this.AddValue('memory_external_kb', Math.ceil(mem.external / 1000 || 0), true), + } + + cb(metrics) + }, 60 * 1000) + } + + Stop() { + clearInterval(this.interval) + } + + AddValue = (key: string, v: number, updateOnly = false): number | undefined => { + if (updateOnly && this.prevValues[key] === v) { + return + } + this.prevValues[key] = v + return v + } +} diff --git a/src/services/storage/tlv/stateBundler.ts b/src/services/storage/tlv/stateBundler.ts index a8b634b4..6873b8f1 100644 --- a/src/services/storage/tlv/stateBundler.ts +++ b/src/services/storage/tlv/stateBundler.ts @@ -34,8 +34,8 @@ export class StateBundler { reportLog = getLogger({ component: 'stateBundlerReport' }) prevValues: Record = {} interval: NodeJS.Timeout - constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { - const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/") + constructor(dataDir: string, tlvStorageFactory: TlvStorageFactory) { + const bundlerPath = [dataDir, "bundler_events"].filter(s => !!s).join("/") this.tlvStorage = tlvStorageFactory.NewStorage({ name: "bundler", path: bundlerPath }) this.interval = setInterval(() => { const mem = process.memoryUsage() diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index f6eb6b06..fc8bfc4d 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -1,10 +1,11 @@ import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; -import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation } from './tlvFilesStorageProcessor'; +import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation, ProcessMetricsTlvOperation } from './tlvFilesStorageProcessor'; import { LatestData, TlvFile } from './tlvFilesStorage'; import { NostrSend, SendData, SendInitiator } from '../../nostr/handler'; import { WebRtcUserInfo } from '../../webRTC'; import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { ProcessMetrics } from './processMetricsCollector'; export type TlvStorageInterface = { AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise LoadLatest: (limit?: number) => Promise @@ -79,7 +80,7 @@ export class TlvStorageFactory extends EventEmitter { async LoadLatest(storageName: string, limit?: number): Promise { const opId = Math.random().toString() - const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit, debug: true } + const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit } const latestData = await this.handleOp(op) const deserializedLatestData: LatestData = {} for (const appId in latestData) { @@ -93,17 +94,25 @@ export class TlvStorageFactory extends EventEmitter { async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { const opId = Math.random().toString() - const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk, debug: true } + const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk } const tlvFile = await this.handleOp(op) return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } } WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise { const opId = Math.random().toString() - const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message, debug: true } + const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message } return this.handleOp(op) } + ProcessMetrics(metrics: ProcessMetrics, processName: string): Promise { + const opId = Math.random().toString() + const op: ProcessMetricsTlvOperation = { type: 'processMetrics', opId, metrics, processName } + return this.handleOp(op) + } + + + private handleOp(op: ITlvStorageOperation): Promise { const debug = this.debug || op.debug if (debug) console.log('handleOp', op) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index 940f9fc2..c9d3a229 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -4,6 +4,8 @@ import { TlvFilesStorage } from './tlvFilesStorage.js'; import * as Types from '../../../../proto/autogenerated/ts/types.js' import { SendData } from '../../nostr/handler.js'; import { SendInitiator } from '../../nostr/handler.js'; +import { ProcessMetrics, ProcessMetricsCollector } from './processMetricsCollector.js'; +import { integerToUint8Array } from '../../helpers/tlv.js'; export type SerializableLatestData = Record> export type SerializableTlvFile = { base64fileData: string, chunks: number[] } export type TlvStorageSettings = { @@ -54,6 +56,14 @@ export type WebRtcMessageOperation = { debug?: boolean } +export type ProcessMetricsTlvOperation = { + type: 'processMetrics' + opId: string + processName?: string + metrics: ProcessMetrics + debug?: boolean +} + export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } export interface ITlvStorageOperation { @@ -62,7 +72,7 @@ export interface ITlvStorageOperation { debug?: boolean } -export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation | ProcessMetricsTlvOperation export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse @@ -101,6 +111,26 @@ class TlvFilesStorageProcessor { opId: Math.random().toString() }); }) + new ProcessMetricsCollector((pMetrics) => { + this.saveProcessMetrics(pMetrics, 'tlv_processor') + }) + } + + private serializeNowTlv = (v: number) => { + const nowUnix = Math.floor(Date.now() / 1000) + const entry = new Uint8Array(8) + entry.set(integerToUint8Array(nowUnix), 0) + entry.set(integerToUint8Array(v), 4) + return entry + } + + private saveProcessMetrics = (pMetrics: ProcessMetrics, processName = "") => { + const pName = processName ? '_' + processName : '' + if (pMetrics.memory_rss_kb) this.storages['bundle'].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb)) + if (pMetrics.memory_buffer_kb) this.storages['bundle'].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb)) + if (pMetrics.memory_heap_total_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb)) + if (pMetrics.memory_heap_used_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_used_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_used_kb)) + if (pMetrics.memory_external_kb) this.storages['bundle'].AddTlv('_root', 'memory_external_kb' + pName, this.serializeNowTlv(pMetrics.memory_external_kb)) } private async handleOperation(operation: TlvStorageOperation) { @@ -123,6 +153,9 @@ class TlvFilesStorageProcessor { case 'webRtcMessage': await this.handleWebRtcMessage(operation); break; + case 'processMetrics': + await this.handleProcessMetrics(operation); + break; default: this.sendResponse({ success: false, @@ -230,6 +263,16 @@ class TlvFilesStorageProcessor { }); } + private async handleProcessMetrics(operation: ProcessMetricsTlvOperation) { + this.saveProcessMetrics(operation.metrics, operation.processName) + this.sendResponse({ + success: true, + type: 'processMetrics', + data: null, + opId: operation.opId + }); + } + private sendResponse(response: TlvOperationResponse) { if (process.send) { process.send(response); diff --git a/src/tests/networkSetup.ts b/src/tests/networkSetup.ts index 3de2a23b..9ebcaa17 100644 --- a/src/tests/networkSetup.ts +++ b/src/tests/networkSetup.ts @@ -10,7 +10,7 @@ export const setupNetwork = async () => { await core.InitAddress() await core.Mine(1) const tlvStorageFactory = new TlvStorageFactory() - const setupUtils = new Utils(settings) + const setupUtils = new Utils({ dataDir: settings.storageSettings.dataDir }) const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) await tryUntil(async i => { diff --git a/src/tests/testBase.ts b/src/tests/testBase.ts index 2fff9c05..cccb4982 100644 --- a/src/tests/testBase.ts +++ b/src/tests/testBase.ts @@ -44,9 +44,9 @@ export type StorageTestBase = { export const setupStorageTest = async (d: Describe): Promise => { const settings = GetTestStorageSettings() - const storageManager = new Storage(settings) - const tlvStorageFactory = new TlvStorageFactory() - await storageManager.Connect(console.log, tlvStorageFactory) + const utils = new Utils({ dataDir: settings.dataDir }) + const storageManager = new Storage(settings, utils) + await storageManager.Connect(console.log) return { expect, storage: storageManager, @@ -71,7 +71,7 @@ export const SetupTest = async (d: Describe): Promise => { const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId } const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId } - const extermnalUtils = new Utils(settings) + const extermnalUtils = new Utils({ dataDir: settings.storageSettings.dataDir }) const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { }) await externalAccessToMainLnd.Warmup()