collect mem data for all processes

This commit is contained in:
boufni95 2025-04-01 20:51:17 +00:00
parent 98ad3be9ef
commit a328d2a3db
15 changed files with 171 additions and 30 deletions

View file

@ -0,0 +1 @@
[]

View file

@ -38,7 +38,7 @@ export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSett
},
logger: { log: console.log, error: err => log(ERROR, err) },
})
const nostr = new Nostr(nostrSettings, event => {
const nostr = new Nostr(nostrSettings, mainHandler.utils, event => {
let j: NostrRequest
try {
j = JSON.parse(event.content)

View file

@ -2,15 +2,24 @@ import { MainSettings } from "../main/settings.js";
import { StateBundler } from "../storage/tlv/stateBundler.js";
import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js";
import { NostrSend } from "../nostr/handler.js";
import { ProcessMetricsCollector } from "../storage/tlv/processMetricsCollector.js";
type UtilsSettings = {
noCollector?: boolean
dataDir: string
}
export class Utils {
tlvStorageFactory: TlvStorageFactory
stateBundler: StateBundler
settings: MainSettings
_nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') }
constructor(settings: MainSettings) {
this.settings = settings
constructor({ noCollector, dataDir }: UtilsSettings) {
this.tlvStorageFactory = new TlvStorageFactory()
this.stateBundler = new StateBundler(settings.storageSettings, this.tlvStorageFactory)
this.stateBundler = new StateBundler(dataDir, this.tlvStorageFactory)
if (!noCollector) {
new ProcessMetricsCollector((metrics) => {
this.tlvStorageFactory.ProcessMetrics(metrics, '')
})
}
}
attachNostrSend(f: NostrSend) {

View file

@ -17,9 +17,9 @@ export type AppData = {
name: string;
}
export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => {
const utils = new Utils(mainSettings)
const storageManager = new Storage(mainSettings.storageSettings)
await storageManager.Connect(log, utils.tlvStorageFactory)
const utils = new Utils({ dataDir: mainSettings.storageSettings.dataDir })
const storageManager = new Storage(mainSettings.storageSettings, utils)
await storageManager.Connect(log)
/* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2])
if (manualMigration) {
return

View file

@ -6,6 +6,7 @@ import { SimplePool, Event, UnsignedEvent, getEventHash, finalizeEvent, Relay, n
import { ERROR, getLogger } from '../helpers/logger.js'
import { nip19 } from 'nostr-tools'
import { encrypt as encryptV1, decrypt as decryptV1, getSharedSecret as getConversationKeyV1 } from './nip44v1.js'
import { ProcessMetrics, ProcessMetricsCollector } from '../storage/tlv/processMetricsCollector.js'
const { nprofileEncode } = nip19
const { v2 } = nip44
const { encrypt: encryptV2, decrypt: decryptV2, utils } = v2
@ -50,9 +51,13 @@ type EventResponse = {
type: 'event'
event: NostrEvent
}
type ProcessMetricsResponse = {
type: 'processMetrics'
metrics: ProcessMetrics
}
export type ChildProcessRequest = SettingsRequest | SendRequest
export type ChildProcessResponse = ReadyResponse | EventResponse
export type ChildProcessResponse = ReadyResponse | EventResponse | ProcessMetricsResponse
const send = (message: ChildProcessResponse) => {
if (process.send) {
process.send(message, undefined, undefined, err => {
@ -88,6 +93,13 @@ const initSubprocessHandler = (settings: NostrSettings) => {
event: event
})
})
new ProcessMetricsCollector((metrics) => {
send({
type: 'processMetrics',
metrics
})
})
}
const sendToNostr: NostrSend = (initiator, data, relays) => {
if (!subProcessHandler) {

View file

@ -1,6 +1,7 @@
import { ChildProcess, fork } from 'child_process'
import { EnvMustBeNonEmptyString } from "../helpers/envParser.js"
import { NostrSettings, NostrEvent, ChildProcessRequest, ChildProcessResponse, SendData, SendInitiator } from "./handler.js"
import { Utils } from '../helpers/utilsWrapper.js'
type EventCallback = (event: NostrEvent) => void
const getEnvOrDefault = (name: string, defaultValue: string): string => {
@ -17,7 +18,9 @@ export const LoadNosrtSettingsFromEnv = (test = false) => {
export default class NostrSubprocess {
settings: NostrSettings
childProcess: ChildProcess
constructor(settings: NostrSettings, eventCallback: EventCallback) {
utils: Utils
constructor(settings: NostrSettings, utils: Utils, eventCallback: EventCallback) {
this.utils = utils
this.childProcess = fork("./build/src/services/nostr/handler")
this.childProcess.on("error", console.error)
this.childProcess.on("message", (message: ChildProcessResponse) => {
@ -28,6 +31,9 @@ export default class NostrSubprocess {
case 'event':
eventCallback(message.event)
break
case 'processMetrics':
this.utils.tlvStorageFactory.ProcessMetrics(message.metrics, 'nostr')
break
default:
console.error("unknown nostr event response", message)
break;

View file

@ -11,9 +11,12 @@ import {
DecrementOperation,
SumOperation,
DBNames,
SuccessOperationResponse,
} from './storageProcessor.js';
import { PickKeysByType } from 'typeorm/common/PickKeysByType.js';
import { serializeRequest, WhereCondition } from './serializationHelpers.js';
import { Utils } from '../../helpers/utilsWrapper.js';
import { ProcessMetrics } from '../tlv/processMetricsCollector.js';
export type TX<T> = (txId: string) => Promise<T>
@ -22,21 +25,33 @@ export class StorageInterface extends EventEmitter {
private process: ChildProcess;
private isConnected: boolean = false;
private debug: boolean = false;
private utils: Utils
private dbType: 'main' | 'metrics'
constructor() {
constructor(utils: Utils) {
super();
this.initializeSubprocess();
this.utils = utils
}
setDebug(debug: boolean) {
this.debug = debug;
}
private handleCollectedProcessMetrics(metrics: SuccessOperationResponse<ProcessMetrics>) {
if (!this.dbType) return
this.utils.tlvStorageFactory.ProcessMetrics(metrics.data, this.dbType + '_storage')
}
private initializeSubprocess() {
this.process = fork('./build/src/services/storage/db/storageProcessor');
this.process.on('message', (response: OperationResponse<any>) => {
if (response.success && response.type === 'processMetrics') {
this.handleCollectedProcessMetrics(response)
} else {
this.emit(response.opId, response);
}
});
this.process.on('error', (error: Error) => {
@ -54,6 +69,7 @@ export class StorageInterface extends EventEmitter {
Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise<number> {
const opId = Math.random().toString()
this.dbType = dbType
const connectOp: ConnectOperation = { type: 'connect', opId, settings, dbType }
return this.handleOp<number>(connectOp)
}

View file

@ -13,6 +13,7 @@ import OfferStorage from "./offerStorage.js"
import { StorageInterface, TX } from "./db/storageInterface.js";
import { PubLogger } from "../helpers/logger.js"
import { TlvStorageFactory } from './tlv/tlvFilesStorageFactory.js';
import { Utils } from '../helpers/utilsWrapper.js';
export type StorageSettings = {
dbSettings: DbSettings
eventLogPath: string
@ -36,12 +37,14 @@ export default class {
debitStorage: DebitStorage
offerStorage: OfferStorage
eventsLog: EventsLogManager
constructor(settings: StorageSettings) {
utils: Utils
constructor(settings: StorageSettings, utils: Utils) {
this.settings = settings
this.utils = utils
this.eventsLog = new EventsLogManager(settings.eventLogPath)
}
async Connect(log: PubLogger, tlvStorageFactory: TlvStorageFactory) {
this.dbs = new StorageInterface()
async Connect(log: PubLogger) {
this.dbs = new StorageInterface(this.utils)
await this.dbs.Connect(this.settings.dbSettings, 'main')
//const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations)
//this.DB = source
@ -50,8 +53,8 @@ export default class {
this.productStorage = new ProductStorage(this.dbs)
this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage)
this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage)
this.metricsStorage = new MetricsStorage(this.settings)
this.metricsEventStorage = new MetricsEventStorage(this.settings, tlvStorageFactory)
this.metricsStorage = new MetricsStorage(this.settings, this.utils)
this.metricsEventStorage = new MetricsEventStorage(this.settings, this.utils.tlvStorageFactory)
this.liquidityStorage = new LiquidityStorage(this.dbs)
this.debitStorage = new DebitStorage(this.dbs)
this.offerStorage = new OfferStorage(this.dbs)

View file

@ -7,19 +7,22 @@ import { newMetricsDb } from "./db/db.js";
import { ChannelRouting } from "./entity/ChannelRouting.js";
import { RootOperation } from "./entity/RootOperation.js";
import { StorageInterface } from "./db/storageInterface.js";
import { Utils } from "../helpers/utilsWrapper.js";
export default class {
//DB: DataSource | EntityManager
settings: StorageSettings
dbs: StorageInterface
utils: Utils
//txQueue: TransactionsQueue
constructor(settings: StorageSettings) {
constructor(settings: StorageSettings, utils: Utils) {
this.settings = settings;
this.utils = utils
}
async Connect() {
//const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations)
//this.DB = source;
//this.txQueue = new TransactionsQueue("metrics", this.DB)
this.dbs = new StorageInterface()
this.dbs = new StorageInterface(this.utils)
await this.dbs.Connect(this.settings.dbSettings, 'metrics')
//return executedMigrations;
}

View file

@ -0,0 +1,39 @@
import { getLogger } from "../../helpers/logger.js"
export type ProcessMetrics = {
memory_rss_kb?: number
memory_buffer_kb?: number
memory_heap_total_kb?: number
memory_heap_used_kb?: number
memory_external_kb?: number
}
export class ProcessMetricsCollector {
reportLog = getLogger({ component: 'ProcessMetricsCollector' })
prevValues: Record<string, number> = {}
interval: NodeJS.Timeout
constructor(cb: (metrics: ProcessMetrics) => void) {
this.interval = setInterval(() => {
const mem = process.memoryUsage()
const metrics: ProcessMetrics = {
memory_rss_kb: this.AddValue('memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true),
memory_buffer_kb: this.AddValue('memory_buffer_kb', Math.ceil(mem.arrayBuffers / 1000 || 0), true),
memory_heap_total_kb: this.AddValue('memory_heap_total_kb', Math.ceil(mem.heapTotal / 1000 || 0), true),
memory_heap_used_kb: this.AddValue('memory_heap_used_kb', Math.ceil(mem.heapUsed / 1000 || 0), true),
memory_external_kb: this.AddValue('memory_external_kb', Math.ceil(mem.external / 1000 || 0), true),
}
cb(metrics)
}, 60 * 1000)
}
Stop() {
clearInterval(this.interval)
}
AddValue = (key: string, v: number, updateOnly = false): number | undefined => {
if (updateOnly && this.prevValues[key] === v) {
return
}
this.prevValues[key] = v
return v
}
}

View file

@ -34,8 +34,8 @@ export class StateBundler {
reportLog = getLogger({ component: 'stateBundlerReport' })
prevValues: Record<string, number> = {}
interval: NodeJS.Timeout
constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) {
const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/")
constructor(dataDir: string, tlvStorageFactory: TlvStorageFactory) {
const bundlerPath = [dataDir, "bundler_events"].filter(s => !!s).join("/")
this.tlvStorage = tlvStorageFactory.NewStorage({ name: "bundler", path: bundlerPath })
this.interval = setInterval(() => {
const mem = process.memoryUsage()

View file

@ -1,10 +1,11 @@
import { ChildProcess, fork } from 'child_process';
import { EventEmitter } from 'events';
import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation } from './tlvFilesStorageProcessor';
import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation, ProcessMetricsTlvOperation } from './tlvFilesStorageProcessor';
import { LatestData, TlvFile } from './tlvFilesStorage';
import { NostrSend, SendData, SendInitiator } from '../../nostr/handler';
import { WebRtcUserInfo } from '../../webRTC';
import * as Types from '../../../../proto/autogenerated/ts/types.js'
import { ProcessMetrics } from './processMetricsCollector';
export type TlvStorageInterface = {
AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise<number>
LoadLatest: (limit?: number) => Promise<LatestData>
@ -79,7 +80,7 @@ export class TlvStorageFactory extends EventEmitter {
async LoadLatest(storageName: string, limit?: number): Promise<LatestData> {
const opId = Math.random().toString()
const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit, debug: true }
const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit }
const latestData = await this.handleOp<SerializableLatestData>(op)
const deserializedLatestData: LatestData = {}
for (const appId in latestData) {
@ -93,17 +94,25 @@ export class TlvStorageFactory extends EventEmitter {
async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise<TlvFile> {
const opId = Math.random().toString()
const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk, debug: true }
const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk }
const tlvFile = await this.handleOp<SerializableTlvFile>(op)
return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks }
}
WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise<Types.WebRtcAnswer> {
const opId = Math.random().toString()
const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message, debug: true }
const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message }
return this.handleOp<Types.WebRtcAnswer>(op)
}
ProcessMetrics(metrics: ProcessMetrics, processName: string): Promise<void> {
const opId = Math.random().toString()
const op: ProcessMetricsTlvOperation = { type: 'processMetrics', opId, metrics, processName }
return this.handleOp<void>(op)
}
private handleOp<T>(op: ITlvStorageOperation): Promise<T> {
const debug = this.debug || op.debug
if (debug) console.log('handleOp', op)

View file

@ -4,6 +4,8 @@ import { TlvFilesStorage } from './tlvFilesStorage.js';
import * as Types from '../../../../proto/autogenerated/ts/types.js'
import { SendData } from '../../nostr/handler.js';
import { SendInitiator } from '../../nostr/handler.js';
import { ProcessMetrics, ProcessMetricsCollector } from './processMetricsCollector.js';
import { integerToUint8Array } from '../../helpers/tlv.js';
export type SerializableLatestData = Record<string, Record<string, { base64tlvs: string[], current_chunk: number, available_chunks: number[] }>>
export type SerializableTlvFile = { base64fileData: string, chunks: number[] }
export type TlvStorageSettings = {
@ -54,6 +56,14 @@ export type WebRtcMessageOperation = {
debug?: boolean
}
export type ProcessMetricsTlvOperation = {
type: 'processMetrics'
opId: string
processName?: string
metrics: ProcessMetrics
debug?: boolean
}
export type ErrorTlvOperationResponse = { success: false, error: string, opId: string }
export interface ITlvStorageOperation {
@ -62,7 +72,7 @@ export interface ITlvStorageOperation {
debug?: boolean
}
export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation
export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation | ProcessMetricsTlvOperation
export type SuccessTlvOperationResponse<T> = { success: true, type: string, data: T, opId: string }
export type TlvOperationResponse<T> = SuccessTlvOperationResponse<T> | ErrorTlvOperationResponse
@ -101,6 +111,26 @@ class TlvFilesStorageProcessor {
opId: Math.random().toString()
});
})
new ProcessMetricsCollector((pMetrics) => {
this.saveProcessMetrics(pMetrics, 'tlv_processor')
})
}
private serializeNowTlv = (v: number) => {
const nowUnix = Math.floor(Date.now() / 1000)
const entry = new Uint8Array(8)
entry.set(integerToUint8Array(nowUnix), 0)
entry.set(integerToUint8Array(v), 4)
return entry
}
private saveProcessMetrics = (pMetrics: ProcessMetrics, processName = "") => {
const pName = processName ? '_' + processName : ''
if (pMetrics.memory_rss_kb) this.storages['bundle'].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb))
if (pMetrics.memory_buffer_kb) this.storages['bundle'].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb))
if (pMetrics.memory_heap_total_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb))
if (pMetrics.memory_heap_used_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_used_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_used_kb))
if (pMetrics.memory_external_kb) this.storages['bundle'].AddTlv('_root', 'memory_external_kb' + pName, this.serializeNowTlv(pMetrics.memory_external_kb))
}
private async handleOperation(operation: TlvStorageOperation) {
@ -123,6 +153,9 @@ class TlvFilesStorageProcessor {
case 'webRtcMessage':
await this.handleWebRtcMessage(operation);
break;
case 'processMetrics':
await this.handleProcessMetrics(operation);
break;
default:
this.sendResponse({
success: false,
@ -230,6 +263,16 @@ class TlvFilesStorageProcessor {
});
}
private async handleProcessMetrics(operation: ProcessMetricsTlvOperation) {
this.saveProcessMetrics(operation.metrics, operation.processName)
this.sendResponse<null>({
success: true,
type: 'processMetrics',
data: null,
opId: operation.opId
});
}
private sendResponse<T>(response: TlvOperationResponse<T>) {
if (process.send) {
process.send(response);

View file

@ -10,7 +10,7 @@ export const setupNetwork = async () => {
await core.InitAddress()
await core.Mine(1)
const tlvStorageFactory = new TlvStorageFactory()
const setupUtils = new Utils(settings)
const setupUtils = new Utils({ dataDir: settings.storageSettings.dataDir })
const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { })
const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { })
await tryUntil<void>(async i => {

View file

@ -44,9 +44,9 @@ export type StorageTestBase = {
export const setupStorageTest = async (d: Describe): Promise<StorageTestBase> => {
const settings = GetTestStorageSettings()
const storageManager = new Storage(settings)
const tlvStorageFactory = new TlvStorageFactory()
await storageManager.Connect(console.log, tlvStorageFactory)
const utils = new Utils({ dataDir: settings.dataDir })
const storageManager = new Storage(settings, utils)
await storageManager.Connect(console.log)
return {
expect,
storage: storageManager,
@ -71,7 +71,7 @@ export const SetupTest = async (d: Describe): Promise<TestBase> => {
const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId }
const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId }
const extermnalUtils = new Utils(settings)
const extermnalUtils = new Utils({ dataDir: settings.storageSettings.dataDir })
const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { })
await externalAccessToMainLnd.Warmup()