From 164b153c2ee0035c618a1a3b1748095059bd9200 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Mon, 31 Mar 2025 16:21:12 +0000 Subject: [PATCH 01/23] metrics sub processes --- src/services/helpers/utilsWrapper.ts | 7 +- src/services/lnd/lnd.ts | 2 +- src/services/main/init.ts | 6 +- src/services/storage/applicationStorage.ts | 2 +- src/services/storage/{ => db}/db.ts | 50 ++--- .../storage/{ => db}/serializationHelpers.ts | 0 .../storage/{ => db}/storageInterface.ts | 25 +-- .../storage/{ => db}/storageProcessor.ts | 103 ++++++---- .../storage/{ => db}/transactionsQueue.ts | 2 +- src/services/storage/debitStorage.ts | 2 +- src/services/storage/index.ts | 24 +-- src/services/storage/liquidityStorage.ts | 2 +- src/services/storage/metricsStorage.ts | 85 ++++---- src/services/storage/offerStorage.ts | 2 +- src/services/storage/paymentStorage.ts | 4 +- src/services/storage/productStorage.ts | 2 +- .../storage/{ => tlv}/metricsEventStorage.ts | 19 +- .../storage/{ => tlv}/stateBundler.ts | 23 ++- .../storage/{ => tlv}/tlvFilesStorage.ts | 25 +-- .../storage/tlv/tlvFilesStorageFactory.ts | 109 ++++++++++ .../storage/tlv/tlvFilesStorageProcessor.ts | 186 ++++++++++++++++++ src/services/storage/userStorage.ts | 2 +- src/services/webRTC/index.ts | 8 +- src/tests/networkSetup.ts | 5 +- src/tests/testBase.ts | 7 +- 25 files changed, 521 insertions(+), 181 deletions(-) rename src/services/storage/{ => db}/db.ts (74%) rename src/services/storage/{ => db}/serializationHelpers.ts (100%) rename src/services/storage/{ => db}/storageInterface.ts (83%) rename src/services/storage/{ => db}/storageProcessor.ts (76%) rename src/services/storage/{ => db}/transactionsQueue.ts (95%) rename src/services/storage/{ => tlv}/metricsEventStorage.ts (84%) rename src/services/storage/{ => tlv}/stateBundler.ts (89%) rename src/services/storage/{ => tlv}/tlvFilesStorage.ts (88%) create mode 100644 src/services/storage/tlv/tlvFilesStorageFactory.ts create mode 100644 src/services/storage/tlv/tlvFilesStorageProcessor.ts diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index 0dc61d63..35698712 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -1,13 +1,14 @@ import { MainSettings } from "../main/settings.js"; -import { StateBundler } from "../storage/stateBundler.js"; +import { StateBundler } from "../storage/tlv/stateBundler.js"; +import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; export class Utils { stateBundler: StateBundler settings: MainSettings - constructor(settings: MainSettings) { + constructor(settings: MainSettings, tlvStorageFactory: TlvStorageFactory) { this.settings = settings - this.stateBundler = new StateBundler(settings.storageSettings) + this.stateBundler = new StateBundler(settings.storageSettings, tlvStorageFactory) } Stop() { diff --git a/src/services/lnd/lnd.ts b/src/services/lnd/lnd.ts index 3df13920..c638bb66 100644 --- a/src/services/lnd/lnd.ts +++ b/src/services/lnd/lnd.ts @@ -18,7 +18,7 @@ import { ERROR, getLogger } from '../helpers/logger.js'; import { HtlcEvent_EventType } from '../../../proto/lnd/router.js'; import { LiquidityProvider, LiquidityRequest } from '../main/liquidityProvider.js'; import { Utils } from '../helpers/utilsWrapper.js'; -import { TxPointSettings } from '../storage/stateBundler.js'; +import { TxPointSettings } from '../storage/tlv/stateBundler.js'; import { WalletKitClient } from '../../../proto/lnd/walletkit.client.js'; const DeadLineMetadata = (deadline = 10 * 1000) => ({ deadline: Date.now() + deadline }) const deadLndRetrySeconds = 5 diff --git a/src/services/main/init.ts b/src/services/main/init.ts index 2389510b..cc284c88 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -9,6 +9,7 @@ import { LoadMainSettingsFromEnv, MainSettings } from "./settings.js" import { Utils } from "../helpers/utilsWrapper.js" import { Wizard } from "../wizard/index.js" import { AdminManager } from "./adminManager.js" +import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js" export type AppData = { privateKey: string; publicKey: string; @@ -16,9 +17,10 @@ export type AppData = { name: string; } export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => { - const utils = new Utils(mainSettings) + const tlvStorageFactory = new TlvStorageFactory() + const utils = new Utils(mainSettings, tlvStorageFactory) const storageManager = new Storage(mainSettings.storageSettings) - await storageManager.Connect(log) + await storageManager.Connect(log, tlvStorageFactory) /* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) if (manualMigration) { return diff --git a/src/services/storage/applicationStorage.ts b/src/services/storage/applicationStorage.ts index 9bd8c6ba..c5b87bb4 100644 --- a/src/services/storage/applicationStorage.ts +++ b/src/services/storage/applicationStorage.ts @@ -7,7 +7,7 @@ import { ApplicationUser } from './entity/ApplicationUser.js'; import { getLogger } from '../helpers/logger.js'; import { User } from './entity/User.js'; import { InviteToken } from './entity/InviteToken.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export default class { dbs: StorageInterface userStorage: UserStorage diff --git a/src/services/storage/db.ts b/src/services/storage/db/db.ts similarity index 74% rename from src/services/storage/db.ts rename to src/services/storage/db/db.ts index 39b67ac1..12e8b396 100644 --- a/src/services/storage/db.ts +++ b/src/services/storage/db/db.ts @@ -1,29 +1,29 @@ import "reflect-metadata" import { DataSource, Migration } from "typeorm" -import { AddressReceivingTransaction } from "./entity/AddressReceivingTransaction.js" -import { User } from "./entity/User.js" -import { UserReceivingAddress } from "./entity/UserReceivingAddress.js" -import { UserReceivingInvoice } from "./entity/UserReceivingInvoice.js" -import { UserInvoicePayment } from "./entity/UserInvoicePayment.js" -import { EnvMustBeNonEmptyString } from "../helpers/envParser.js" -import { UserTransactionPayment } from "./entity/UserTransactionPayment.js" -import { UserBasicAuth } from "./entity/UserBasicAuth.js" -import { UserEphemeralKey } from "./entity/UserEphemeralKey.js" -import { UserToUserPayment } from "./entity/UserToUserPayment.js" -import { Application } from "./entity/Application.js" -import { ApplicationUser } from "./entity/ApplicationUser.js" -import { BalanceEvent } from "./entity/BalanceEvent.js" -import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" -import { getLogger } from "../helpers/logger.js" -import { ChannelRouting } from "./entity/ChannelRouting.js" -import { LspOrder } from "./entity/LspOrder.js" -import { Product } from "./entity/Product.js" -import { LndNodeInfo } from "./entity/LndNodeInfo.js" -import { TrackedProvider } from "./entity/TrackedProvider.js" -import { InviteToken } from "./entity/InviteToken.js" -import { DebitAccess } from "./entity/DebitAccess.js" -import { RootOperation } from "./entity/RootOperation.js" -import { UserOffer } from "./entity/UserOffer.js" +import { AddressReceivingTransaction } from "../entity/AddressReceivingTransaction.js" +import { User } from "../entity/User.js" +import { UserReceivingAddress } from "../entity/UserReceivingAddress.js" +import { UserReceivingInvoice } from "../entity/UserReceivingInvoice.js" +import { UserInvoicePayment } from "../entity/UserInvoicePayment.js" +import { EnvMustBeNonEmptyString } from "../../helpers/envParser.js" +import { UserTransactionPayment } from "../entity/UserTransactionPayment.js" +import { UserBasicAuth } from "../entity/UserBasicAuth.js" +import { UserEphemeralKey } from "../entity/UserEphemeralKey.js" +import { UserToUserPayment } from "../entity/UserToUserPayment.js" +import { Application } from "../entity/Application.js" +import { ApplicationUser } from "../entity/ApplicationUser.js" +import { BalanceEvent } from "../entity/BalanceEvent.js" +import { ChannelBalanceEvent } from "../entity/ChannelsBalanceEvent.js" +import { getLogger } from "../../helpers/logger.js" +import { ChannelRouting } from "../entity/ChannelRouting.js" +import { LspOrder } from "../entity/LspOrder.js" +import { Product } from "../entity/Product.js" +import { LndNodeInfo } from "../entity/LndNodeInfo.js" +import { TrackedProvider } from "../entity/TrackedProvider.js" +import { InviteToken } from "../entity/InviteToken.js" +import { DebitAccess } from "../entity/DebitAccess.js" +import { RootOperation } from "../entity/RootOperation.js" +import { UserOffer } from "../entity/UserOffer.js" export type DbSettings = { @@ -70,7 +70,7 @@ export const MainDbEntities = { export type MainDbNames = keyof typeof MainDbEntities export const MainDbEntitiesNames = Object.keys(MainDbEntities) -const MetricsDbEntities = { +export const MetricsDbEntities = { 'BalanceEvent': BalanceEvent, 'ChannelBalanceEvent': ChannelBalanceEvent, 'ChannelRouting': ChannelRouting, diff --git a/src/services/storage/serializationHelpers.ts b/src/services/storage/db/serializationHelpers.ts similarity index 100% rename from src/services/storage/serializationHelpers.ts rename to src/services/storage/db/serializationHelpers.ts diff --git a/src/services/storage/storageInterface.ts b/src/services/storage/db/storageInterface.ts similarity index 83% rename from src/services/storage/storageInterface.ts rename to src/services/storage/db/storageInterface.ts index ca0cfdfd..a7a04f78 100644 --- a/src/services/storage/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -10,6 +10,7 @@ import { IncrementOperation, DecrementOperation, SumOperation, + DBNames, } from './storageProcessor.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType.js'; import { serializeRequest, WhereCondition } from './serializationHelpers.js'; @@ -32,7 +33,7 @@ export class StorageInterface extends EventEmitter { } private initializeSubprocess() { - this.process = fork('./build/src/services/storage/storageProcessor'); + this.process = fork('./build/src/services/storage/db/storageProcessor'); this.process.on('message', (response: OperationResponse) => { this.emit(response.opId, response); @@ -51,61 +52,61 @@ export class StorageInterface extends EventEmitter { this.isConnected = true; } - Connect(settings: DbSettings): Promise { + Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise { const opId = Math.random().toString() - const connectOp: ConnectOperation = { type: 'connect', opId, settings } + const connectOp: ConnectOperation = { type: 'connect', opId, settings, dbType } return this.handleOp(connectOp) } - Delete(entity: MainDbNames, q: number | FindOptionsWhere, txId?: string): Promise { + Delete(entity: DBNames, q: number | FindOptionsWhere, txId?: string): Promise { const opId = Math.random().toString() const deleteOp: DeleteOperation = { type: 'delete', entity, opId, q, txId } return this.handleOp(deleteOp) } - Remove(entity: MainDbNames, q: T, txId?: string): Promise { + Remove(entity: DBNames, q: T, txId?: string): Promise { const opId = Math.random().toString() const removeOp: RemoveOperation = { type: 'remove', entity, opId, q, txId } return this.handleOp(removeOp) } - FindOne(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + FindOne(entity: DBNames, q: QueryOptions, txId?: string): Promise { const opId = Math.random().toString() const findOp: FindOneOperation = { type: 'findOne', entity, opId, q, txId } return this.handleOp(findOp) } - Find(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + Find(entity: DBNames, q: QueryOptions, txId?: string): Promise { const opId = Math.random().toString() const findOp: FindOperation = { type: 'find', entity, opId, q, txId } return this.handleOp(findOp) } - Sum(entity: MainDbNames, columnName: PickKeysByType, q: WhereCondition, txId?: string): Promise { + Sum(entity: DBNames, columnName: PickKeysByType, q: WhereCondition, txId?: string): Promise { const opId = Math.random().toString() const sumOp: SumOperation = { type: 'sum', entity, opId, columnName, q, txId } return this.handleOp(sumOp) } - Update(entity: MainDbNames, q: number | FindOptionsWhere, toUpdate: DeepPartial, txId?: string): Promise { + Update(entity: DBNames, q: number | FindOptionsWhere, toUpdate: DeepPartial, txId?: string): Promise { const opId = Math.random().toString() const updateOp: UpdateOperation = { type: 'update', entity, opId, toUpdate, q, txId } return this.handleOp(updateOp) } - Increment(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + Increment(entity: DBNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { const opId = Math.random().toString() const incrementOp: IncrementOperation = { type: 'increment', entity, opId, q, propertyPath, value, txId } return this.handleOp(incrementOp) } - Decrement(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + Decrement(entity: DBNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { const opId = Math.random().toString() const decrementOp: DecrementOperation = { type: 'decrement', entity, opId, q, propertyPath, value, txId } return this.handleOp(decrementOp) } - CreateAndSave(entity: MainDbNames, toSave: DeepPartial, txId?: string): Promise { + CreateAndSave(entity: DBNames, toSave: DeepPartial, txId?: string): Promise { const opId = Math.random().toString() const createAndSaveOp: CreateAndSaveOperation = { type: 'createAndSave', entity, opId, toSave, txId } return this.handleOp(createAndSaveOp) diff --git a/src/services/storage/storageProcessor.ts b/src/services/storage/db/storageProcessor.ts similarity index 76% rename from src/services/storage/storageProcessor.ts rename to src/services/storage/db/storageProcessor.ts index e20252d1..f42cec42 100644 --- a/src/services/storage/storageProcessor.ts +++ b/src/services/storage/db/storageProcessor.ts @@ -1,12 +1,12 @@ import { DataSource, EntityManager, DeepPartial, FindOptionsWhere, FindOptionsOrder, FindOperator } from 'typeorm'; -import NewDB, { DbSettings, MainDbEntities, MainDbNames, newMetricsDb } from './db.js'; -import { PubLogger, getLogger } from '../helpers/logger.js'; -import { allMetricsMigrations, allMigrations } from './migrations/runner.js'; +import NewDB, { DbSettings, MainDbEntities, MainDbNames, MetricsDbEntities, MetricsDbNames, newMetricsDb } from './db.js'; +import { PubLogger, getLogger } from '../../helpers/logger.js'; +import { allMetricsMigrations, allMigrations } from '../migrations/runner.js'; import transactionsQueue from './transactionsQueue.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType'; import { deserializeRequest, WhereCondition } from './serializationHelpers.js'; - +export type DBNames = MainDbNames | MetricsDbNames export type QueryOptions = { where?: WhereCondition order?: FindOptionsOrder @@ -17,6 +17,7 @@ export type ConnectOperation = { type: 'connect' opId: string settings: DbSettings + dbType: 'main' | 'metrics' debug?: boolean } @@ -36,7 +37,7 @@ export type EndTxOperation = { export type DeleteOperation = { type: 'delete' - entity: MainDbNames + entity: DBNames opId: string q: number | FindOptionsWhere txId?: string @@ -45,7 +46,7 @@ export type DeleteOperation = { export type RemoveOperation = { type: 'remove' - entity: MainDbNames + entity: DBNames opId: string q: T txId?: string @@ -54,7 +55,7 @@ export type RemoveOperation = { export type UpdateOperation = { type: 'update' - entity: MainDbNames + entity: DBNames opId: string toUpdate: DeepPartial q: number | FindOptionsWhere @@ -64,7 +65,7 @@ export type UpdateOperation = { export type IncrementOperation = { type: 'increment' - entity: MainDbNames + entity: DBNames opId: string q: FindOptionsWhere propertyPath: string, @@ -75,7 +76,7 @@ export type IncrementOperation = { export type DecrementOperation = { type: 'decrement' - entity: MainDbNames + entity: DBNames opId: string q: FindOptionsWhere propertyPath: string, @@ -86,7 +87,7 @@ export type DecrementOperation = { export type FindOneOperation = { type: 'findOne' - entity: MainDbNames + entity: DBNames opId: string q: QueryOptions txId?: string @@ -95,7 +96,7 @@ export type FindOneOperation = { export type FindOperation = { type: 'find' - entity: MainDbNames + entity: DBNames opId: string q: QueryOptions txId?: string @@ -104,7 +105,7 @@ export type FindOperation = { export type SumOperation = { type: 'sum' - entity: MainDbNames + entity: DBNames opId: string columnName: PickKeysByType q: WhereCondition @@ -114,7 +115,7 @@ export type SumOperation = { export type CreateAndSaveOperation = { type: 'createAndSave' - entity: MainDbNames + entity: DBNames opId: string toSave: DeepPartial txId?: string @@ -150,12 +151,12 @@ class StorageProcessor { //private locked: boolean = false private activeTransaction: ActiveTransaction | null = null //private queue: StartTxOperation[] = [] + private mode: 'main' | 'metrics' | '' = '' constructor() { if (!process.send) { throw new Error('This process must be spawned as a child process'); } - this.log = getLogger({ component: 'StorageProcessor' }) process.on('message', (operation: StorageOperation) => { this.handleOperation(operation); }); @@ -229,17 +230,38 @@ class StorageProcessor { } private async handleConnect(operation: ConnectOperation) { - const { source, executedMigrations } = await NewDB(operation.settings, allMigrations) - this.DB = source - this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB) - if (executedMigrations.length > 0) { - this.log(executedMigrations.length, "new migrations executed") - this.log("-------------------") + let migrationsExecuted = 0 + if (this.mode !== '') { + throw new Error('Already connected to a database') + } + this.log = getLogger({ component: 'StorageProcessor_' + operation.dbType }) + if (operation.dbType === 'main') { + const { source, executedMigrations } = await NewDB(operation.settings, allMigrations) + this.DB = source + this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB) + migrationsExecuted = executedMigrations.length + if (executedMigrations.length > 0) { + this.log(executedMigrations.length, "new migrations executed") + this.log("-------------------") + } + this.mode = 'main' + } else if (operation.dbType === 'metrics') { + const { source, executedMigrations } = await newMetricsDb(operation.settings, allMetricsMigrations) + this.DB = source + this.txQueue = new transactionsQueue('MetricsStorageProcessorQueue', this.DB) + migrationsExecuted = executedMigrations.length + if (executedMigrations.length > 0) { + this.log(executedMigrations.length, "new metrics migrations executed") + this.log("-------------------") + } + this.mode = 'metrics' + } else { + throw new Error('Unknown database type') } this.sendResponse({ success: true, type: 'connect', - data: executedMigrations.length, + data: migrationsExecuted, opId: operation.opId }); } @@ -303,17 +325,28 @@ class StorageProcessor { return this.activeTransaction.manager } - private getManager(txId?: string): DataSource | EntityManager { - if (txId) { - return this.getTx(txId) + private getEntity(entityName: DBNames) { + if (this.mode === 'main') { + const e = MainDbEntities[entityName as MainDbNames] + if (!e) { + throw new Error(`Unknown entity type for main db: ${entityName}`) + } + return e + } else if (this.mode === 'metrics') { + const e = MetricsDbEntities[entityName as MetricsDbNames] + if (!e) { + throw new Error(`Unknown entity type for metrics db: ${entityName}`) + } + return e + } else { + throw new Error('Unknown database mode') } - return this.DB } private async handleDelete(operation: DeleteOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).delete(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).delete(operation.q) }) this.sendResponse({ success: true, @@ -325,7 +358,7 @@ class StorageProcessor { private async handleRemove(operation: RemoveOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).remove(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).remove(operation.q) }) this.sendResponse({ @@ -338,7 +371,7 @@ class StorageProcessor { private async handleUpdate(operation: UpdateOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate) + return eM.getRepository(this.getEntity(operation.entity)).update(operation.q, operation.toUpdate) }) this.sendResponse({ @@ -351,7 +384,7 @@ class StorageProcessor { private async handleIncrement(operation: IncrementOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value) + return eM.getRepository(this.getEntity(operation.entity)).increment(operation.q, operation.propertyPath, operation.value) }) this.sendResponse({ @@ -364,7 +397,7 @@ class StorageProcessor { private async handleDecrement(operation: DecrementOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value) + return eM.getRepository(this.getEntity(operation.entity)).decrement(operation.q, operation.propertyPath, operation.value) }) this.sendResponse({ @@ -377,7 +410,7 @@ class StorageProcessor { private async handleFindOne(operation: FindOneOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).findOne(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).findOne(operation.q) }) this.sendResponse({ @@ -390,7 +423,7 @@ class StorageProcessor { private async handleFind(operation: FindOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).find(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).find(operation.q) }) this.sendResponse({ @@ -403,7 +436,7 @@ class StorageProcessor { private async handleSum(operation: SumOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q) + return eM.getRepository(this.getEntity(operation.entity)).sum(operation.columnName, operation.q) }) this.sendResponse({ success: true, @@ -415,8 +448,8 @@ class StorageProcessor { private async handleCreateAndSave(operation: CreateAndSaveOperation) { const saved = await this.handleWrite(operation.txId, async eM => { - const res = eM.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) - return eM.getRepository(MainDbEntities[operation.entity]).save(res) + const res = eM.getRepository(this.getEntity(operation.entity)).create(operation.toSave) + return eM.getRepository(this.getEntity(operation.entity)).save(res) }) this.sendResponse({ diff --git a/src/services/storage/transactionsQueue.ts b/src/services/storage/db/transactionsQueue.ts similarity index 95% rename from src/services/storage/transactionsQueue.ts rename to src/services/storage/db/transactionsQueue.ts index e8275119..94c31fcd 100644 --- a/src/services/storage/transactionsQueue.ts +++ b/src/services/storage/db/transactionsQueue.ts @@ -1,5 +1,5 @@ import { DataSource, EntityManager, EntityTarget } from "typeorm" -import { PubLogger, getLogger } from "../helpers/logger.js" +import { PubLogger, getLogger } from "../../helpers/logger.js" type TX = (entityManager: EntityManager | DataSource) => Promise type TxOperation = { diff --git a/src/services/storage/debitStorage.ts b/src/services/storage/debitStorage.ts index 30fff55b..f1ed7357 100644 --- a/src/services/storage/debitStorage.ts +++ b/src/services/storage/debitStorage.ts @@ -1,5 +1,5 @@ import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; type AccessToAdd = { npub: string rules?: DebitAccessRules diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index ad214f5c..e832290c 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -1,18 +1,18 @@ import fs from 'fs' -import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db.js" +import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db/db.js" import ProductStorage from './productStorage.js' import ApplicationStorage from './applicationStorage.js' import UserStorage from "./userStorage.js"; import PaymentStorage from "./paymentStorage.js"; import MetricsStorage from "./metricsStorage.js"; -import MetricsEventStorage from "./metricsEventStorage.js"; +import MetricsEventStorage from "./tlv/metricsEventStorage.js"; import EventsLogManager from "./eventsLog.js"; import { LiquidityStorage } from "./liquidityStorage.js"; import DebitStorage from "./debitStorage.js" import OfferStorage from "./offerStorage.js" -import { StorageInterface, TX } from "./storageInterface.js"; -import { allMetricsMigrations, allMigrations } from "./migrations/runner.js" +import { StorageInterface, TX } from "./db/storageInterface.js"; import { PubLogger } from "../helpers/logger.js" +import { TlvStorageFactory } from './tlv/tlvFilesStorageFactory.js'; export type StorageSettings = { dbSettings: DbSettings eventLogPath: string @@ -40,9 +40,9 @@ export default class { this.settings = settings this.eventsLog = new EventsLogManager(settings.eventLogPath) } - async Connect(log: PubLogger) { + async Connect(log: PubLogger, tlvStorageFactory: TlvStorageFactory) { this.dbs = new StorageInterface() - await this.dbs.Connect(this.settings.dbSettings) + await this.dbs.Connect(this.settings.dbSettings, 'main') //const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations) //this.DB = source //this.txQueue = new TransactionsQueue("main", this.DB) @@ -51,21 +51,21 @@ export default class { this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage) this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage) this.metricsStorage = new MetricsStorage(this.settings) - this.metricsEventStorage = new MetricsEventStorage(this.settings) + this.metricsEventStorage = new MetricsEventStorage(this.settings, tlvStorageFactory) this.liquidityStorage = new LiquidityStorage(this.dbs) this.debitStorage = new DebitStorage(this.dbs) this.offerStorage = new OfferStorage(this.dbs) try { if (this.settings.dataDir) fs.mkdirSync(this.settings.dataDir) } catch (e) { } - const executedMetricsMigrations = await this.metricsStorage.Connect(allMetricsMigrations) + /* const executedMetricsMigrations = */ await this.metricsStorage.Connect() /* if (executedMigrations.length > 0) { log(executedMigrations.length, "new migrations executed") log("-------------------") } */ - if (executedMetricsMigrations.length > 0) { - log(executedMetricsMigrations.length, "new metrics migrations executed") - log("-------------------") - } + /* if (executedMetricsMigrations.length > 0) { + log(executedMetricsMigrations.length, "new metrics migrations executed") + log("-------------------") + } */ } Stop() { diff --git a/src/services/storage/liquidityStorage.ts b/src/services/storage/liquidityStorage.ts index 2c405a41..0a609ed5 100644 --- a/src/services/storage/liquidityStorage.ts +++ b/src/services/storage/liquidityStorage.ts @@ -2,7 +2,7 @@ import { IsNull, MoreThan, Not } from "typeorm" import { LspOrder } from "./entity/LspOrder.js"; import { LndNodeInfo } from "./entity/LndNodeInfo.js"; import { TrackedProvider } from "./entity/TrackedProvider.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export class LiquidityStorage { dbs: StorageInterface constructor(dbs: StorageInterface) { diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index 839ef357..a47b20b3 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -1,59 +1,65 @@ import { Between, DataSource, EntityManager, FindManyOptions, FindOperator, LessThanOrEqual, MoreThanOrEqual } from "typeorm" import { BalanceEvent } from "./entity/BalanceEvent.js" import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" -import TransactionsQueue from "./transactionsQueue.js"; +import TransactionsQueue from "./db/transactionsQueue.js"; import { StorageSettings } from "./index.js"; -import { newMetricsDb } from "./db.js"; +import { newMetricsDb } from "./db/db.js"; import { ChannelRouting } from "./entity/ChannelRouting.js"; import { RootOperation } from "./entity/RootOperation.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { - DB: DataSource | EntityManager + //DB: DataSource | EntityManager settings: StorageSettings - txQueue: TransactionsQueue + dbs: StorageInterface + //txQueue: TransactionsQueue constructor(settings: StorageSettings) { this.settings = settings; } - async Connect(metricsMigrations: Function[]) { - const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) - this.DB = source; - this.txQueue = new TransactionsQueue("metrics", this.DB) - return executedMigrations; + async Connect() { + //const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) + //this.DB = source; + //this.txQueue = new TransactionsQueue("metrics", this.DB) + this.dbs = new StorageInterface() + await this.dbs.Connect(this.settings.dbSettings, 'metrics') + //return executedMigrations; } async SaveBalanceEvents(balanceEvent: Partial, channelBalanceEvents: Partial[]) { - const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent) - const balanceEntry = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false }) + //const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent) + //const balanceEntry = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false }) + + const balanceEntry = await this.dbs.CreateAndSave('BalanceEvent', balanceEvent) + + //const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) + //const channelsEntries = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false }) + + const channelsEntries = await this.dbs.CreateAndSave('ChannelBalanceEvent', channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) - const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) - const channelsEntries = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false }) return { balanceEntry, channelsEntries } } - async GetBalanceEvents({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetBalanceEvents({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - const [chainBalanceEvents] = await Promise.all([ - entityManager.getRepository(BalanceEvent).find(q), - ]) + const chainBalanceEvents = await this.dbs.Find('BalanceEvent', q, txId) return { chainBalanceEvents } } async initChannelRoutingEvent(dayUnix: number, channelId: string) { - const existing = await this.DB.getRepository(ChannelRouting).findOne({ where: { day_unix: dayUnix, channel_id: channelId } }) + const existing = await this.dbs.FindOne('ChannelRouting', { where: { day_unix: dayUnix, channel_id: channelId } }) if (!existing) { - const entry = this.DB.getRepository(ChannelRouting).create({ day_unix: dayUnix, channel_id: channelId }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelRouting).save(entry), dbTx: false }) + return this.dbs.CreateAndSave('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }) } return existing } - GetChannelRouting({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + GetChannelRouting({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - return entityManager.getRepository(ChannelRouting).find(q) + return this.dbs.Find('ChannelRouting', q, txId) } async GetLatestForwardingIndexOffset() { - const latestIndex = await this.DB.getRepository(ChannelRouting).find({ order: { latest_index_offset: "DESC" }, take: 1 }) + const latestIndex = await this.dbs.Find('ChannelRouting', { order: { latest_index_offset: "DESC" }, take: 1 }) if (latestIndex.length > 0) { return latestIndex[0].latest_index_offset } @@ -63,50 +69,49 @@ export default class { async IncrementChannelRouting(channelId: string, event: Partial) { const dayUnix = getTodayUnix() const existing = await this.initChannelRoutingEvent(dayUnix, channelId) - const repo = this.DB.getRepository(ChannelRouting) + //const repo = this.DB.getRepository(ChannelRouting) if (event.send_errors) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors) } if (event.receive_errors) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors) } if (event.forward_errors_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input) } if (event.forward_errors_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output) } if (event.missed_forward_fee_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input) } if (event.missed_forward_fee_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output) } if (event.forward_fee_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input) } if (event.forward_fee_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output) } if (event.events_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input) } if (event.events_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output) } if (event.latest_index_offset) { - await repo.update(existing.serial_id, { latest_index_offset: event.latest_index_offset }) + await this.dbs.Update('ChannelRouting', existing.serial_id, { latest_index_offset: event.latest_index_offset }) } } - async AddRootOperation(opType: string, id: string, amount: number, entityManager = this.DB) { - const newOp = entityManager.getRepository(RootOperation).create({ operation_type: opType, operation_amount: amount, operation_identifier: id }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(RootOperation).save(newOp), dbTx: false }) + async AddRootOperation(opType: string, id: string, amount: number, txId?: string) { + return this.dbs.CreateAndSave('RootOperation', { operation_type: opType, operation_amount: amount, operation_identifier: id }, txId) } - async GetRootOperations({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetRootOperations({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - return entityManager.getRepository(RootOperation).find(q) + return this.dbs.Find('RootOperation', q, txId) } } diff --git a/src/services/storage/offerStorage.ts b/src/services/storage/offerStorage.ts index 41712d92..2a921c57 100644 --- a/src/services/storage/offerStorage.ts +++ b/src/services/storage/offerStorage.ts @@ -1,6 +1,6 @@ import crypto from 'crypto'; import { UserOffer } from "./entity/UserOffer.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index f3a7ce27..465983fb 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -11,9 +11,9 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio import { UserInvoicePayment } from './entity/UserInvoicePayment.js'; import { UserToUserPayment } from './entity/UserToUserPayment.js'; import { Application } from './entity/Application.js'; -import TransactionsQueue from "./transactionsQueue.js"; +import TransactionsQueue from "./db/transactionsQueue.js"; import { LoggedEvent } from './eventsLog.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo, offerId?: string, payerData?: Record } export const defaultInvoiceExpiry = 60 * 60 export default class { diff --git a/src/services/storage/productStorage.ts b/src/services/storage/productStorage.ts index c93be22a..26c386c5 100644 --- a/src/services/storage/productStorage.ts +++ b/src/services/storage/productStorage.ts @@ -1,6 +1,6 @@ import { Product } from "./entity/Product.js" import { User } from "./entity/User.js" -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { dbs: StorageInterface constructor(dbs: StorageInterface) { diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/tlv/metricsEventStorage.ts similarity index 84% rename from src/services/storage/metricsEventStorage.ts rename to src/services/storage/tlv/metricsEventStorage.ts index 0fe8db0a..24a8c683 100644 --- a/src/services/storage/metricsEventStorage.ts +++ b/src/services/storage/tlv/metricsEventStorage.ts @@ -1,21 +1,20 @@ import fs from 'fs' -import * as Types from '../../../proto/autogenerated/ts/types.js' -import { StorageSettings } from "./index.js"; -import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'; -import { TlvFilesStorage } from './tlvFilesStorage.js'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { StorageSettings } from "../index.js"; +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js'; +import { TlvStorageFactory, TlvStorageInterface } from './tlvFilesStorageFactory.js'; export default class { - tlvStorage: TlvFilesStorage + tlvStorage: TlvStorageInterface cachePath: string last24hCache: { ts: number, ok: number, fail: number }[] = [] lastPersistedCache: number = 0 - constructor(settings: StorageSettings) { + constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { const metricsPath = [settings.dataDir, "metric_events"].filter(s => !!s).join("/") - this.tlvStorage = new TlvFilesStorage(metricsPath) + this.tlvStorage = tlvStorageFactory.NewStorage({ name: "metrics", path: metricsPath }) this.cachePath = [settings.dataDir, "metric_cache"].filter(s => !!s).join("/") if (!fs.existsSync(this.cachePath)) { fs.mkdirSync(this.cachePath, { recursive: true }); } - this.tlvStorage.initMeta() this.loadCache() setInterval(() => { if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) { @@ -77,7 +76,7 @@ export default class { } LoadLatestMetrics = async (limit = 30): Promise => { - const raw = this.tlvStorage.LoadLatest(limit) + const raw = await this.tlvStorage.LoadLatest(limit) const metrics: Types.UsageMetrics = { apps: {} } Object.keys(raw).forEach(app => { metrics.apps[app] = { app_metrics: {} } @@ -93,7 +92,7 @@ export default class { return metrics } LoadMetricsFile = async (app: string, method: string, chunk: number): Promise => { - const { fileData, chunks } = this.tlvStorage.LoadFile(app, method, chunk) + const { fileData, chunks } = await this.tlvStorage.LoadFile(app, method, chunk) //const tlv = await this.LoadRawMetricsFile(app, method, chunk) const decoded = decodeListTLV(parseTLV(fileData)) return { diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/tlv/stateBundler.ts similarity index 89% rename from src/services/storage/stateBundler.ts rename to src/services/storage/tlv/stateBundler.ts index 99737833..a8b634b4 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/tlv/stateBundler.ts @@ -1,9 +1,9 @@ -import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js" -import { getLogger } from "../helpers/logger.js" -import { decodeListTLV, integerToUint8Array, parseTLV } from "../helpers/tlv.js" -import { StorageSettings } from "./index.js" -import { TlvFilesStorage } from "./tlvFilesStorage.js" -import * as Types from "../../../proto/autogenerated/ts/types.js" +import { LatestBundleMetricReq } from "../../../../proto/autogenerated/ts/types.js" +import { getLogger } from "../../helpers/logger.js" +import { decodeListTLV, integerToUint8Array, parseTLV } from "../../helpers/tlv.js" +import { StorageSettings } from "../index.js" +import * as Types from "../../../../proto/autogenerated/ts/types.js" +import { TlvStorageFactory, TlvStorageInterface } from "./tlvFilesStorageFactory.js" const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const const maxStatePointTypes = ['maxProviderRespTime'] as const @@ -30,14 +30,13 @@ export type TxPointSettings = { } export class StateBundler { - tlvStorage: TlvFilesStorage + tlvStorage: TlvStorageInterface reportLog = getLogger({ component: 'stateBundlerReport' }) prevValues: Record = {} interval: NodeJS.Timeout - constructor(settings: StorageSettings) { + constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/") - this.tlvStorage = new TlvFilesStorage(bundlerPath) - this.tlvStorage.initMeta() + this.tlvStorage = tlvStorageFactory.NewStorage({ name: "bundler", path: bundlerPath }) this.interval = setInterval(() => { const mem = process.memoryUsage() this.AddValue('_root', 'memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true) @@ -53,7 +52,7 @@ export class StateBundler { } async GetBundleMetrics(req: Types.LatestBundleMetricReq): Promise { - const latest = this.tlvStorage.LoadLatest(req.limit) + const latest = await this.tlvStorage.LoadLatest(req.limit) const metrics: Types.BundleMetrics = { apps: {} } Object.keys(latest).forEach(app => { metrics.apps[app] = { app_bundles: {} } @@ -70,7 +69,7 @@ export class StateBundler { } async GetSingleBundleMetrics(req: Types.SingleMetricReq): Promise { - const { fileData, chunks } = this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page) + const { fileData, chunks } = await this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page) const decoded = decodeListTLV(parseTLV(fileData)) return { current_chunk: req.page, diff --git a/src/services/storage/tlvFilesStorage.ts b/src/services/storage/tlv/tlvFilesStorage.ts similarity index 88% rename from src/services/storage/tlvFilesStorage.ts rename to src/services/storage/tlv/tlvFilesStorage.ts index c4ad58d2..801fcc40 100644 --- a/src/services/storage/tlvFilesStorage.ts +++ b/src/services/storage/tlv/tlvFilesStorage.ts @@ -1,13 +1,14 @@ import fs from 'fs' -import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js' +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js' const chunkSizeBytes = 128 * 1024 export type LatestData = Record> +export type TlvFile = { fileData: Buffer, chunks: number[] } export class TlvFilesStorage { - storagePath: string - lastPersisted: number = 0 - meta: Record> = {} - pending: Record> = {} - metaReady = false + private storagePath: string + private lastPersisted: number = 0 + private meta: Record> = {} + private pending: Record> = {} + private metaReady = false constructor(storagePath: string) { this.storagePath = storagePath if (!fs.existsSync(this.storagePath)) { @@ -24,7 +25,7 @@ export class TlvFilesStorage { }); } - LoadFile = (app: string, dataName: string, chunk: number): { fileData: Buffer, chunks: number[] } => { + LoadFile = (app: string, dataName: string, chunk: number): TlvFile => { if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) { throw new Error("metrics not found") } @@ -71,7 +72,7 @@ export class TlvFilesStorage { return data } - persist = () => { + private persist = () => { if (!this.metaReady) { throw new Error("meta metrics not ready") } @@ -104,28 +105,28 @@ export class TlvFilesStorage { }) } - getMeta = (appId: string, dataName: string) => { + private getMeta = (appId: string, dataName: string) => { if (!this.meta[appId]) { return { chunks: [] } } return this.meta[appId][dataName] || { chunks: [] } } - initMeta = () => { + private initMeta = () => { this.foreachFile((app, dataName, tlvFiles) => { this.updateMeta(app, dataName, tlvFiles) }) this.metaReady = true } - updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { + private updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { if (!this.meta[appId]) { this.meta[appId] = {} } this.meta[appId][dataName] = { chunks: sortedChunks } } - foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { + private foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { if (!fs.existsSync(this.storagePath)) { fs.mkdirSync(this.storagePath, { recursive: true }); } diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts new file mode 100644 index 00000000..810996a9 --- /dev/null +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -0,0 +1,109 @@ +import { fork } from 'child_process'; +import { EventEmitter } from 'events'; +import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor'; +import { LatestData, TlvFile } from './tlvFilesStorage'; + +export type TlvStorageInterface = { + AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise + LoadLatest: (limit?: number) => Promise + LoadFile: (appId: string, dataName: string, chunk: number) => Promise +} + +export class TlvStorageFactory extends EventEmitter { + private process: any; + private isConnected: boolean = false; + private debug: boolean = false; + + constructor() { + super(); + this.initializeSubprocess(); + } + + setDebug(debug: boolean) { + this.debug = debug; + } + + private initializeSubprocess() { + this.process = fork('./build/src/services/storage/tlv/tlvFilesStorageProcessor'); + + this.process.on('message', (response: TlvOperationResponse) => { + this.emit(response.opId, response); + }); + + this.process.on('error', (error: Error) => { + console.error('Tlv Storage processor error:', error); + this.isConnected = false; + }); + + this.process.on('exit', (code: number) => { + console.log(`Tlv Storage processor exited with code ${code}`); + this.isConnected = false; + }); + + this.isConnected = true; + } + + NewStorage(settings: TlvStorageSettings): TlvStorageInterface { + const opId = Math.random().toString() + const connectOp: NewTlvStorageOperation = { type: 'newStorage', opId, settings } + this.handleOp(connectOp) + return { + AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => this.AddTlv(settings.name, appId, dataName, tlv), + LoadLatest: (limit?: number) => this.LoadLatest(settings.name, limit), + LoadFile: (appId: string, dataName: string, chunk: number) => this.LoadFile(settings.name, appId, dataName, chunk) + } + } + + AddTlv(storageName: string, appId: string, dataName: string, tlv: Uint8Array): Promise { + const opId = Math.random().toString() + const connectOp: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, tlv } + return this.handleOp(connectOp) + } + + LoadLatest(storageName: string, limit?: number): Promise { + const opId = Math.random().toString() + const connectOp: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit } + return this.handleOp(connectOp) + } + + LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { + const opId = Math.random().toString() + const connectOp: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk } + return this.handleOp(connectOp) + } + + private handleOp(op: ITlvStorageOperation): Promise { + if (this.debug) console.log('handleOp', op) + this.checkConnected() + return new Promise((resolve, reject) => { + const responseHandler = (response: TlvOperationResponse) => { + if (this.debug) console.log('tlv responseHandler', response) + if (!response.success) { + reject(new Error(response.error)); + return + } + if (response.type !== op.type) { + reject(new Error('Invalid response type')); + return + } + resolve(response.data); + } + this.once(op.opId, responseHandler) + this.process.send(op) + }) + } + + private checkConnected() { + if (!this.isConnected) { + throw new Error('Tlv Storage processor is not connected'); + } + } + + public disconnect() { + if (this.process) { + this.process.kill(); + this.isConnected = false; + this.debug = false; + } + } +} \ No newline at end of file diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts new file mode 100644 index 00000000..ef1d3f28 --- /dev/null +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -0,0 +1,186 @@ +import { PubLogger, getLogger } from '../../helpers/logger.js'; +import { TlvFilesStorage } from './tlvFilesStorage.js'; + +export type TlvStorageSettings = { + path: string + name: string +} + +export type NewTlvStorageOperation = { + type: 'newStorage' + opId: string + settings: TlvStorageSettings + debug?: boolean +} + +export type AddTlvOperation = { + type: 'addTlv' + opId: string + storageName: string + appId: string + dataName: string + tlv: Uint8Array + debug?: boolean +} + +export type LoadLatestTlvOperation = { + type: 'loadLatestTlv' + opId: string + storageName: string + limit?: number + debug?: boolean +} + +export type LoadTlvFileOperation = { + type: 'loadTlvFile' + opId: string + storageName: string + appId: string + dataName: string + chunk: number + debug?: boolean +} + +export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } + +export interface ITlvStorageOperation { + opId: string + type: string + debug?: boolean +} + +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation + +export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } +export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse + +class TlvFilesStorageProcessor { + private log: PubLogger = console.log + private storages: Record = {} + constructor() { + if (!process.send) { + throw new Error('This process must be spawned as a child process'); + } + process.on('message', (operation: TlvStorageOperation) => { + this.handleOperation(operation); + }); + + process.on('error', (error: Error) => { + console.error('Error in storage processor:', error); + }); + } + + private async handleOperation(operation: TlvStorageOperation) { + try { + const opId = operation.opId; + switch (operation.type) { + case 'newStorage': + await this.handleNewStorage(operation); + break; + case 'addTlv': + await this.handleAddTlv(operation); + break; + case 'loadLatestTlv': + await this.handleLoadLatestTlv(operation); + break; + case 'loadTlvFile': + await this.handleLoadTlvFile(operation); + break; + default: + this.sendResponse({ + success: false, + error: `Unknown operation type: ${(operation as any).type}`, + opId + }) + return + } + } catch (error) { + this.sendResponse({ + success: false, + error: error instanceof Error ? error.message : 'Unknown error occurred', + opId: operation.opId + }); + } + } + + private async handleNewStorage(operation: NewTlvStorageOperation) { + if (this.storages[operation.settings.name]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.settings.name} already exists`, + opId: operation.opId + }) + return + } + this.storages[operation.settings.name] = new TlvFilesStorage(operation.settings.path) + this.sendResponse({ + success: true, + type: 'newStorage', + data: null, + opId: operation.opId + }); + } + + private async handleAddTlv(operation: AddTlvOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + this.storages[operation.storageName].AddTlv(operation.appId, operation.dataName, operation.tlv) + this.sendResponse({ + success: true, + type: 'addTlv', + data: null, + opId: operation.opId + }); + } + + private async handleLoadLatestTlv(operation: LoadLatestTlvOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + const data = this.storages[operation.storageName].LoadLatest(operation.limit) + this.sendResponse({ + success: true, + type: 'loadLatest', + data: data, + opId: operation.opId + }); + } + + private async handleLoadTlvFile(operation: LoadTlvFileOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + const data = this.storages[operation.storageName].LoadFile(operation.appId, operation.dataName, operation.chunk) + this.sendResponse({ + success: true, + type: 'loadFile', + data: data, + opId: operation.opId + }); + } + + private sendResponse(response: TlvOperationResponse) { + if (process.send) { + process.send(response); + } + } +} + +// Start the storage processor +new TlvFilesStorageProcessor(); diff --git a/src/services/storage/userStorage.ts b/src/services/storage/userStorage.ts index b35d40ae..f4ed6fda 100644 --- a/src/services/storage/userStorage.ts +++ b/src/services/storage/userStorage.ts @@ -3,7 +3,7 @@ import { User } from './entity/User.js'; import { UserBasicAuth } from './entity/UserBasicAuth.js'; import { getLogger } from '../helpers/logger.js'; import EventsLogManager from './eventsLog.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export default class { dbs: StorageInterface eventsLog: EventsLogManager diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 6aab22f1..ef23a772 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -6,12 +6,12 @@ import * as Types from '../../../proto/autogenerated/ts/types.js' import { NostrSend, SendData, SendInitiator } from "../nostr/handler.js" import { encodeTLbV, encodeTLV, encodeTLVDataPacket } from '../helpers/tlv.js' import { Utils } from '../helpers/utilsWrapper.js' -import { TlvFilesStorage } from '../storage/tlvFilesStorage.js' +import { TlvFilesStorage } from '../storage/tlv/tlvFilesStorage.js' +import { TlvStorageInterface } from '../storage/tlv/tlvFilesStorageFactory.js' type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number } const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] } type UserInfo = { userPub: string, appId: string } export default class webRTC { - private storage: Storage private log = getLogger({ component: 'webRTC' }) private connections: Record = {} @@ -94,7 +94,7 @@ export default class webRTC { this.log(ERROR, 'SingleUsageMetricReqValidate', err) return } - let tlvStorage: TlvFilesStorage + let tlvStorage: TlvStorageInterface switch (j.metric_type) { case Types.SingleMetricType.USAGE_METRIC: tlvStorage = this.storage.metricsEventStorage.tlvStorage @@ -105,7 +105,7 @@ export default class webRTC { default: throw new Error("Unknown metric type") } - const { fileData } = tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) + const { fileData } = await tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 const packets: Buffer[] = [] diff --git a/src/tests/networkSetup.ts b/src/tests/networkSetup.ts index 487450a1..ea38131f 100644 --- a/src/tests/networkSetup.ts +++ b/src/tests/networkSetup.ts @@ -3,13 +3,14 @@ import { BitcoinCoreWrapper } from "./bitcoinCore.js" import LND from '../services/lnd/lnd.js' import { LiquidityProvider } from "../services/main/liquidityProvider.js" import { Utils } from "../services/helpers/utilsWrapper.js" - +import { TlvStorageFactory } from "../services/storage/tlv/tlvFilesStorageFactory.js" export const setupNetwork = async () => { const settings = LoadTestSettingsFromEnv() const core = new BitcoinCoreWrapper(settings) await core.InitAddress() await core.Mine(1) - const setupUtils = new Utils(settings) + const tlvStorageFactory = new TlvStorageFactory() + const setupUtils = new Utils(settings, tlvStorageFactory) const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) await tryUntil(async i => { diff --git a/src/tests/testBase.ts b/src/tests/testBase.ts index a3b716b7..d60889a9 100644 --- a/src/tests/testBase.ts +++ b/src/tests/testBase.ts @@ -13,6 +13,7 @@ import { getLogger, resetDisabledLoggers } from '../services/helpers/logger.js' import { LiquidityProvider } from '../services/main/liquidityProvider.js' import { Utils } from '../services/helpers/utilsWrapper.js' import { AdminManager } from '../services/main/adminManager.js' +import { TlvStorageFactory } from '../services/storage/tlv/tlvFilesStorageFactory.js' chai.use(chaiString) export const expect = chai.expect export type Describe = (message: string, failure?: boolean) => void @@ -44,7 +45,8 @@ export type StorageTestBase = { export const setupStorageTest = async (d: Describe): Promise => { const settings = GetTestStorageSettings() const storageManager = new Storage(settings) - await storageManager.Connect(console.log) + const tlvStorageFactory = new TlvStorageFactory() + await storageManager.Connect(console.log, tlvStorageFactory) return { expect, storage: storageManager, @@ -69,7 +71,8 @@ export const SetupTest = async (d: Describe): Promise => { const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId } const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId } - const extermnalUtils = new Utils(settings) + const tlvStorageFactory = new TlvStorageFactory() + const extermnalUtils = new Utils(settings, tlvStorageFactory) const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { }) await externalAccessToMainLnd.Warmup() From 7e77e3c7c17d6628986adf44d8f69bdbc60ccd05 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Mon, 31 Mar 2025 16:24:36 +0000 Subject: [PATCH 02/23] wire stoppers --- src/services/helpers/utilsWrapper.ts | 8 +++++--- src/services/main/init.ts | 5 ++--- src/tests/networkSetup.ts | 2 +- src/tests/testBase.ts | 3 +-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index 35698712..fd873b43 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -3,15 +3,17 @@ import { StateBundler } from "../storage/tlv/stateBundler.js"; import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; export class Utils { - + tlvStorageFactory: TlvStorageFactory stateBundler: StateBundler settings: MainSettings - constructor(settings: MainSettings, tlvStorageFactory: TlvStorageFactory) { + constructor(settings: MainSettings) { this.settings = settings - this.stateBundler = new StateBundler(settings.storageSettings, tlvStorageFactory) + this.tlvStorageFactory = new TlvStorageFactory() + this.stateBundler = new StateBundler(settings.storageSettings, this.tlvStorageFactory) } Stop() { this.stateBundler.Stop() + this.tlvStorageFactory.disconnect() } } \ No newline at end of file diff --git a/src/services/main/init.ts b/src/services/main/init.ts index cc284c88..5f2d4034 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -17,10 +17,9 @@ export type AppData = { name: string; } export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => { - const tlvStorageFactory = new TlvStorageFactory() - const utils = new Utils(mainSettings, tlvStorageFactory) + const utils = new Utils(mainSettings) const storageManager = new Storage(mainSettings.storageSettings) - await storageManager.Connect(log, tlvStorageFactory) + await storageManager.Connect(log, utils.tlvStorageFactory) /* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) if (manualMigration) { return diff --git a/src/tests/networkSetup.ts b/src/tests/networkSetup.ts index ea38131f..3de2a23b 100644 --- a/src/tests/networkSetup.ts +++ b/src/tests/networkSetup.ts @@ -10,7 +10,7 @@ export const setupNetwork = async () => { await core.InitAddress() await core.Mine(1) const tlvStorageFactory = new TlvStorageFactory() - const setupUtils = new Utils(settings, tlvStorageFactory) + const setupUtils = new Utils(settings) const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) await tryUntil(async i => { diff --git a/src/tests/testBase.ts b/src/tests/testBase.ts index d60889a9..2fff9c05 100644 --- a/src/tests/testBase.ts +++ b/src/tests/testBase.ts @@ -71,8 +71,7 @@ export const SetupTest = async (d: Describe): Promise => { const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId } const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId } - const tlvStorageFactory = new TlvStorageFactory() - const extermnalUtils = new Utils(settings, tlvStorageFactory) + const extermnalUtils = new Utils(settings) const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { }) await externalAccessToMainLnd.Warmup() From 0cd8137d2c439236b5c47d8d58899f53d97ade7d Mon Sep 17 00:00:00 2001 From: boufni95 Date: Mon, 31 Mar 2025 16:29:09 +0000 Subject: [PATCH 03/23] up --- src/services/storage/db/storageInterface.ts | 4 ++-- src/services/storage/tlv/tlvFilesStorage.ts | 2 +- src/services/storage/tlv/tlvFilesStorageFactory.ts | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/services/storage/db/storageInterface.ts b/src/services/storage/db/storageInterface.ts index a7a04f78..94a52fb0 100644 --- a/src/services/storage/db/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -1,4 +1,4 @@ -import { fork } from 'child_process'; +import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; import { DbSettings, MainDbNames } from './db.js'; import { DeepPartial, FindOptionsWhere } from 'typeorm'; @@ -19,7 +19,7 @@ import { serializeRequest, WhereCondition } from './serializationHelpers.js'; export type TX = (txId: string) => Promise export class StorageInterface extends EventEmitter { - private process: any; + private process: ChildProcess; private isConnected: boolean = false; private debug: boolean = false; diff --git a/src/services/storage/tlv/tlvFilesStorage.ts b/src/services/storage/tlv/tlvFilesStorage.ts index 801fcc40..22b93e14 100644 --- a/src/services/storage/tlv/tlvFilesStorage.ts +++ b/src/services/storage/tlv/tlvFilesStorage.ts @@ -27,7 +27,7 @@ export class TlvFilesStorage { LoadFile = (app: string, dataName: string, chunk: number): TlvFile => { if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) { - throw new Error("metrics not found") + throw new Error(`tlv file for ${app} ${dataName} chunk ${chunk} not found`) } const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].filter(s => !!s).join("/") const fileData = fs.readFileSync(fullPath) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index 810996a9..edf48bb3 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -1,4 +1,4 @@ -import { fork } from 'child_process'; +import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor'; import { LatestData, TlvFile } from './tlvFilesStorage'; @@ -10,7 +10,7 @@ export type TlvStorageInterface = { } export class TlvStorageFactory extends EventEmitter { - private process: any; + private process: ChildProcess; private isConnected: boolean = false; private debug: boolean = false; From 4dffaeeb0953e259be5c0e851b114f2d72fde325 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Mon, 31 Mar 2025 17:44:37 +0000 Subject: [PATCH 04/23] deb --- src/services/storage/tlv/tlvFilesStorageFactory.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index edf48bb3..633b8be5 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -12,7 +12,7 @@ export type TlvStorageInterface = { export class TlvStorageFactory extends EventEmitter { private process: ChildProcess; private isConnected: boolean = false; - private debug: boolean = false; + private debug: boolean = true; constructor() { super(); From 8a35487cdd6712a401c4f68b4618f0c521dc49b7 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Mon, 31 Mar 2025 17:52:56 +0000 Subject: [PATCH 05/23] more deb --- .../storage/tlv/tlvFilesStorageFactory.ts | 16 ++++++++-------- .../storage/tlv/tlvFilesStorageProcessor.ts | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index 633b8be5..8f78fcb8 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -45,8 +45,8 @@ export class TlvStorageFactory extends EventEmitter { NewStorage(settings: TlvStorageSettings): TlvStorageInterface { const opId = Math.random().toString() - const connectOp: NewTlvStorageOperation = { type: 'newStorage', opId, settings } - this.handleOp(connectOp) + const op: NewTlvStorageOperation = { type: 'newStorage', opId, settings } + this.handleOp(op) return { AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => this.AddTlv(settings.name, appId, dataName, tlv), LoadLatest: (limit?: number) => this.LoadLatest(settings.name, limit), @@ -56,20 +56,20 @@ export class TlvStorageFactory extends EventEmitter { AddTlv(storageName: string, appId: string, dataName: string, tlv: Uint8Array): Promise { const opId = Math.random().toString() - const connectOp: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, tlv } - return this.handleOp(connectOp) + const op: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, tlv } + return this.handleOp(op) } LoadLatest(storageName: string, limit?: number): Promise { const opId = Math.random().toString() - const connectOp: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit } - return this.handleOp(connectOp) + const op: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit } + return this.handleOp(op) } LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { const opId = Math.random().toString() - const connectOp: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk } - return this.handleOp(connectOp) + const op: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk } + return this.handleOp(op) } private handleOp(op: ITlvStorageOperation): Promise { diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index ef1d3f28..b5417bf2 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -73,6 +73,7 @@ class TlvFilesStorageProcessor { private async handleOperation(operation: TlvStorageOperation) { try { const opId = operation.opId; + if (operation.debug) console.log('handleOperation', operation) switch (operation.type) { case 'newStorage': await this.handleNewStorage(operation); From 92aa96c876ab6c35a3cf5bd3982d90887f31537b Mon Sep 17 00:00:00 2001 From: boufni95 Date: Mon, 31 Mar 2025 18:14:23 +0000 Subject: [PATCH 06/23] deb --- src/services/storage/tlv/tlvFilesStorageFactory.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index 8f78fcb8..e1a05fcd 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -89,7 +89,7 @@ export class TlvStorageFactory extends EventEmitter { resolve(response.data); } this.once(op.opId, responseHandler) - this.process.send(op) + this.process.send({ ...op, debug: this.debug || op.debug }) }) } From af126ee90079ed1a4444e29d5867256a4cf160b1 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Mon, 31 Mar 2025 18:17:12 +0000 Subject: [PATCH 07/23] tlv fix --- src/services/storage/tlv/tlvFilesStorageProcessor.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index b5417bf2..dbb266c2 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -73,6 +73,7 @@ class TlvFilesStorageProcessor { private async handleOperation(operation: TlvStorageOperation) { try { const opId = operation.opId; + if (operation.type === 'addTlv') operation.tlv = new Uint8Array(operation.tlv) if (operation.debug) console.log('handleOperation', operation) switch (operation.type) { case 'newStorage': From 0ba919cc71f1e6403e12322fdcb56dbaa2edd2df Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 15:53:42 +0000 Subject: [PATCH 08/23] cannot send buffer over IPC :( --- .../storage/tlv/tlvFilesStorageFactory.ts | 21 ++++++++++---- .../storage/tlv/tlvFilesStorageProcessor.ts | 28 ++++++++++++------- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index e1a05fcd..8d5ba554 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -1,6 +1,6 @@ import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; -import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor'; +import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor'; import { LatestData, TlvFile } from './tlvFilesStorage'; export type TlvStorageInterface = { @@ -56,20 +56,29 @@ export class TlvStorageFactory extends EventEmitter { AddTlv(storageName: string, appId: string, dataName: string, tlv: Uint8Array): Promise { const opId = Math.random().toString() - const op: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, tlv } + const op: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, base64Tlv: Buffer.from(tlv).toString('base64') } return this.handleOp(op) } - LoadLatest(storageName: string, limit?: number): Promise { + async LoadLatest(storageName: string, limit?: number): Promise { const opId = Math.random().toString() const op: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit } - return this.handleOp(op) + const latestData = await this.handleOp(op) + const deserializedLatestData: LatestData = {} + for (const appId in latestData) { + deserializedLatestData[appId] = {} + for (const dataName in latestData[appId]) { + deserializedLatestData[appId][dataName] = { tlvs: latestData[appId][dataName].base64tlvs.map(tlv => new Uint8Array(Buffer.from(tlv, 'base64'))), current_chunk: latestData[appId][dataName].current_chunk, available_chunks: latestData[appId][dataName].available_chunks } + } + } + return deserializedLatestData } - LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { + async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { const opId = Math.random().toString() const op: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk } - return this.handleOp(op) + const tlvFile = await this.handleOp(op) + return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } } private handleOp(op: ITlvStorageOperation): Promise { diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index dbb266c2..bef1d3e7 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -1,6 +1,7 @@ import { PubLogger, getLogger } from '../../helpers/logger.js'; import { TlvFilesStorage } from './tlvFilesStorage.js'; - +export type SerializableLatestData = Record> +export type SerializableTlvFile = { base64fileData: string, chunks: number[] } export type TlvStorageSettings = { path: string name: string @@ -19,7 +20,7 @@ export type AddTlvOperation = { storageName: string appId: string dataName: string - tlv: Uint8Array + base64Tlv: string debug?: boolean } @@ -73,7 +74,6 @@ class TlvFilesStorageProcessor { private async handleOperation(operation: TlvStorageOperation) { try { const opId = operation.opId; - if (operation.type === 'addTlv') operation.tlv = new Uint8Array(operation.tlv) if (operation.debug) console.log('handleOperation', operation) switch (operation.type) { case 'newStorage': @@ -132,8 +132,9 @@ class TlvFilesStorageProcessor { }) return } - this.storages[operation.storageName].AddTlv(operation.appId, operation.dataName, operation.tlv) - this.sendResponse({ + const tlv = new Uint8Array(Buffer.from(operation.base64Tlv, 'base64')) + this.storages[operation.storageName].AddTlv(operation.appId, operation.dataName, tlv) + this.sendResponse({ success: true, type: 'addTlv', data: null, @@ -151,10 +152,17 @@ class TlvFilesStorageProcessor { return } const data = this.storages[operation.storageName].LoadLatest(operation.limit) - this.sendResponse({ + const serializableData: SerializableLatestData = {} + for (const appId in data) { + serializableData[appId] = {} + for (const dataName in data[appId]) { + serializableData[appId][dataName] = { base64tlvs: data[appId][dataName].tlvs.map(tlv => Buffer.from(tlv).toString('base64')), current_chunk: data[appId][dataName].current_chunk, available_chunks: data[appId][dataName].available_chunks } + } + } + this.sendResponse({ success: true, type: 'loadLatest', - data: data, + data: serializableData, opId: operation.opId }); } @@ -169,15 +177,15 @@ class TlvFilesStorageProcessor { return } const data = this.storages[operation.storageName].LoadFile(operation.appId, operation.dataName, operation.chunk) - this.sendResponse({ + this.sendResponse({ success: true, type: 'loadFile', - data: data, + data: { base64fileData: Buffer.from(data.fileData).toString('base64'), chunks: data.chunks }, opId: operation.opId }); } - private sendResponse(response: TlvOperationResponse) { + private sendResponse(response: TlvOperationResponse) { if (process.send) { process.send(response); } From 14ca9b669c1a3cf8d053681d0fbaa42fe2882b2c Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 18:37:12 +0000 Subject: [PATCH 09/23] more deb --- src/services/main/debitManager.ts | 2 +- src/services/storage/db/storageInterface.ts | 2 +- src/services/storage/tlv/tlvFilesStorageFactory.ts | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/services/main/debitManager.ts b/src/services/main/debitManager.ts index 23fd8b2c..f16c2a57 100644 --- a/src/services/main/debitManager.ts +++ b/src/services/main/debitManager.ts @@ -186,7 +186,7 @@ export class DebitManager { this.notifyPaymentSuccess(appUser, debitRes, op, { appId: ctx.app_id, pub: req.npub, id: req.request_id }) return default: - throw new Error("invalid response type") + throw new Error("invalid debit response type") } } diff --git a/src/services/storage/db/storageInterface.ts b/src/services/storage/db/storageInterface.ts index 94a52fb0..ae97409a 100644 --- a/src/services/storage/db/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -148,7 +148,7 @@ export class StorageInterface extends EventEmitter { return } if (response.type !== op.type) { - reject(new Error('Invalid response type')); + reject(new Error('Invalid storage response type')); return } resolve(response.data); diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index 8d5ba554..d9329cad 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -12,7 +12,7 @@ export type TlvStorageInterface = { export class TlvStorageFactory extends EventEmitter { private process: ChildProcess; private isConnected: boolean = false; - private debug: boolean = true; + private debug: boolean = false; constructor() { super(); @@ -62,7 +62,7 @@ export class TlvStorageFactory extends EventEmitter { async LoadLatest(storageName: string, limit?: number): Promise { const opId = Math.random().toString() - const op: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit } + const op: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit, debug: true } const latestData = await this.handleOp(op) const deserializedLatestData: LatestData = {} for (const appId in latestData) { @@ -76,7 +76,7 @@ export class TlvStorageFactory extends EventEmitter { async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { const opId = Math.random().toString() - const op: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk } + const op: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk, debug: true } const tlvFile = await this.handleOp(op) return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } } @@ -92,7 +92,7 @@ export class TlvStorageFactory extends EventEmitter { return } if (response.type !== op.type) { - reject(new Error('Invalid response type')); + reject(new Error('Invalid tlv storage response type')); return } resolve(response.data); From 5881fa4c9d90ed7b089807f68e78e01b067331f9 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 18:39:42 +0000 Subject: [PATCH 10/23] fix deb flag --- src/services/storage/tlv/tlvFilesStorageFactory.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index d9329cad..84a716db 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -82,11 +82,12 @@ export class TlvStorageFactory extends EventEmitter { } private handleOp(op: ITlvStorageOperation): Promise { - if (this.debug) console.log('handleOp', op) + const debug = this.debug || op.debug + if (debug) console.log('handleOp', op) this.checkConnected() return new Promise((resolve, reject) => { const responseHandler = (response: TlvOperationResponse) => { - if (this.debug) console.log('tlv responseHandler', response) + if (debug) console.log('tlv responseHandler', response) if (!response.success) { reject(new Error(response.error)); return @@ -98,7 +99,7 @@ export class TlvStorageFactory extends EventEmitter { resolve(response.data); } this.once(op.opId, responseHandler) - this.process.send({ ...op, debug: this.debug || op.debug }) + this.process.send({ ...op, debug }) }) } From 84ee279b359f6e90e842bf41a1770dfa5a0363f1 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 18:41:24 +0000 Subject: [PATCH 11/23] deb --- src/services/storage/tlv/tlvFilesStorageFactory.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index 84a716db..f75e5bb9 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -93,7 +93,7 @@ export class TlvStorageFactory extends EventEmitter { return } if (response.type !== op.type) { - reject(new Error('Invalid tlv storage response type')); + reject(new Error('Invalid tlv storage response type: ' + response.type + ' expected: ' + op.type)); return } resolve(response.data); From 8bca7cb7db9d286f71be25682b016897c62df54f Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 18:43:23 +0000 Subject: [PATCH 12/23] fix --- src/services/storage/tlv/tlvFilesStorageFactory.ts | 4 ++-- src/services/storage/tlv/tlvFilesStorageProcessor.ts | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index f75e5bb9..cab84a2d 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -62,7 +62,7 @@ export class TlvStorageFactory extends EventEmitter { async LoadLatest(storageName: string, limit?: number): Promise { const opId = Math.random().toString() - const op: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit, debug: true } + const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit, debug: true } const latestData = await this.handleOp(op) const deserializedLatestData: LatestData = {} for (const appId in latestData) { @@ -76,7 +76,7 @@ export class TlvStorageFactory extends EventEmitter { async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { const opId = Math.random().toString() - const op: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk, debug: true } + const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk, debug: true } const tlvFile = await this.handleOp(op) return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } } diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index bef1d3e7..780e4bef 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -25,7 +25,7 @@ export type AddTlvOperation = { } export type LoadLatestTlvOperation = { - type: 'loadLatestTlv' + type: 'loadLatest' opId: string storageName: string limit?: number @@ -33,7 +33,7 @@ export type LoadLatestTlvOperation = { } export type LoadTlvFileOperation = { - type: 'loadTlvFile' + type: 'loadFile' opId: string storageName: string appId: string @@ -82,10 +82,10 @@ class TlvFilesStorageProcessor { case 'addTlv': await this.handleAddTlv(operation); break; - case 'loadLatestTlv': + case 'loadLatest': await this.handleLoadLatestTlv(operation); break; - case 'loadTlvFile': + case 'loadFile': await this.handleLoadTlvFile(operation); break; default: From bd10b02d8806267cfb47e605627ed6ebda41662f Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 19:41:29 +0000 Subject: [PATCH 13/23] move wrtc to tlv metrics sub process --- src/services/helpers/utilsWrapper.ts | 8 +++- src/services/main/index.ts | 7 ++-- src/services/serverMethods/index.ts | 3 +- .../storage/tlv/tlvFilesStorageFactory.ts | 31 +++++++++++++-- .../storage/tlv/tlvFilesStorageProcessor.ts | 39 ++++++++++++++++++- src/services/webRTC/index.ts | 37 +++++++----------- 6 files changed, 93 insertions(+), 32 deletions(-) diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index fd873b43..7c83e186 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -1,17 +1,23 @@ import { MainSettings } from "../main/settings.js"; import { StateBundler } from "../storage/tlv/stateBundler.js"; import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; - +import { NostrSend } from "../nostr/handler.js"; export class Utils { tlvStorageFactory: TlvStorageFactory stateBundler: StateBundler settings: MainSettings + _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } constructor(settings: MainSettings) { this.settings = settings this.tlvStorageFactory = new TlvStorageFactory() this.stateBundler = new StateBundler(settings.storageSettings, this.tlvStorageFactory) } + attachNostrSend(f: NostrSend) { + this._nostrSend = f + this.tlvStorageFactory.attachNostrSend(f) + } + Stop() { this.stateBundler.Stop() this.tlvStorageFactory.disconnect() diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 18860b99..0aaf5c4e 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -55,7 +55,7 @@ export default class { utils: Utils rugPullTracker: RugPullTracker unlocker: Unlocker - webRTC: webRTC + //webRTC: webRTC nostrSend: NostrSend = () => { getLogger({})("nostr send not initialized yet") } constructor(settings: MainSettings, storage: Storage, adminManager: AdminManager, utils: Utils, unlocker: Unlocker) { this.settings = settings @@ -76,7 +76,7 @@ export default class { this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager) this.debitManager = new DebitManager(this.storage, this.lnd, this.applicationManager) this.offerManager = new OfferManager(this.storage, this.lnd, this.applicationManager, this.productManager) - this.webRTC = new webRTC(this.storage, this.utils) + //this.webRTC = new webRTC(this.storage, this.utils) } @@ -99,7 +99,8 @@ export default class { this.liquidityProvider.attachNostrSend(f) this.debitManager.attachNostrSend(f) this.offerManager.attachNostrSend(f) - this.webRTC.attachNostrSend(f) + this.utils.attachNostrSend(f) + //this.webRTC.attachNostrSend(f) } htlcCb: HtlcCb = (e) => { diff --git a/src/services/serverMethods/index.ts b/src/services/serverMethods/index.ts index 1b57deec..678c9cea 100644 --- a/src/services/serverMethods/index.ts +++ b/src/services/serverMethods/index.ts @@ -11,7 +11,8 @@ export default (mainHandler: Main): Types.ServerMethods => { offer_CustomCheck: offer => offer !== '', } }) - return mainHandler.webRTC.OnMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message) + if (err != null) throw new Error(err.message) + return mainHandler.utils.tlvStorageFactory.WebRtcMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message) }, SubToWebRtcCandidates: async ({ ctx }) => { }, GetUsageMetrics: async ({ ctx, req }) => { diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index cab84a2d..f6eb6b06 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -1,8 +1,10 @@ import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; -import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor'; +import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation } from './tlvFilesStorageProcessor'; import { LatestData, TlvFile } from './tlvFilesStorage'; - +import { NostrSend, SendData, SendInitiator } from '../../nostr/handler'; +import { WebRtcUserInfo } from '../../webRTC'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' export type TlvStorageInterface = { AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise LoadLatest: (limit?: number) => Promise @@ -13,7 +15,7 @@ export class TlvStorageFactory extends EventEmitter { private process: ChildProcess; private isConnected: boolean = false; private debug: boolean = false; - + private _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } constructor() { super(); this.initializeSubprocess(); @@ -23,11 +25,26 @@ export class TlvStorageFactory extends EventEmitter { this.debug = debug; } + attachNostrSend(f: NostrSend) { + this._nostrSend = f + } + + private nostrSend = (opResponse: SuccessTlvOperationResponse<{ initiator: SendInitiator, data: SendData, relays?: string[] }>) => { + if (!this._nostrSend) { + throw new Error("No nostrSend attached") + } + this._nostrSend(opResponse.data.initiator, opResponse.data.data, opResponse.data.relays) + } + private initializeSubprocess() { this.process = fork('./build/src/services/storage/tlv/tlvFilesStorageProcessor'); this.process.on('message', (response: TlvOperationResponse) => { - this.emit(response.opId, response); + if (response.success && response.type === 'nostrSend') { + this.nostrSend(response) + } else { + this.emit(response.opId, response); + } }); this.process.on('error', (error: Error) => { @@ -81,6 +98,12 @@ export class TlvStorageFactory extends EventEmitter { return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } } + WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise { + const opId = Math.random().toString() + const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message, debug: true } + return this.handleOp(op) + } + private handleOp(op: ITlvStorageOperation): Promise { const debug = this.debug || op.debug if (debug) console.log('handleOp', op) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index 780e4bef..dada201f 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -1,5 +1,9 @@ import { PubLogger, getLogger } from '../../helpers/logger.js'; +import webRTC, { WebRtcUserInfo } from '../../webRTC/index.js'; import { TlvFilesStorage } from './tlvFilesStorage.js'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { SendData } from '../../nostr/handler.js'; +import { SendInitiator } from '../../nostr/handler.js'; export type SerializableLatestData = Record> export type SerializableTlvFile = { base64fileData: string, chunks: number[] } export type TlvStorageSettings = { @@ -42,6 +46,14 @@ export type LoadTlvFileOperation = { debug?: boolean } +export type WebRtcMessageOperation = { + type: 'webRtcMessage' + opId: string + userInfo: WebRtcUserInfo + message: Types.WebRtcMessage_message + debug?: boolean +} + export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } export interface ITlvStorageOperation { @@ -50,7 +62,7 @@ export interface ITlvStorageOperation { debug?: boolean } -export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse @@ -58,6 +70,7 @@ export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvO class TlvFilesStorageProcessor { private log: PubLogger = console.log private storages: Record = {} + private wrtc: webRTC constructor() { if (!process.send) { throw new Error('This process must be spawned as a child process'); @@ -69,6 +82,17 @@ class TlvFilesStorageProcessor { process.on('error', (error: Error) => { console.error('Error in storage processor:', error); }); + + this.wrtc = new webRTC(t => { + switch (t) { + case Types.SingleMetricType.USAGE_METRIC: + return this.storages['usage'] + case Types.SingleMetricType.BUNDLE_METRIC: + return this.storages['bundle'] + default: + throw new Error('Unknown metric type: ' + t) + } + }) } private async handleOperation(operation: TlvStorageOperation) { @@ -88,6 +112,9 @@ class TlvFilesStorageProcessor { case 'loadFile': await this.handleLoadTlvFile(operation); break; + case 'webRtcMessage': + await this.handleWebRtcMessage(operation); + break; default: this.sendResponse({ success: false, @@ -185,6 +212,16 @@ class TlvFilesStorageProcessor { }); } + private async handleWebRtcMessage(operation: WebRtcMessageOperation) { + const answer = await this.wrtc.OnMessage(operation.userInfo, operation.message) + this.sendResponse({ + success: true, + type: 'webRtcMessage', + data: answer, + opId: operation.opId + }); + } + private sendResponse(response: TlvOperationResponse) { if (process.send) { process.send(response); diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index ef23a772..678ba236 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -10,16 +10,19 @@ import { TlvFilesStorage } from '../storage/tlv/tlvFilesStorage.js' import { TlvStorageInterface } from '../storage/tlv/tlvFilesStorageFactory.js' type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number } const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] } -type UserInfo = { userPub: string, appId: string } +export type WebRtcUserInfo = { userPub: string, appId: string } + +export type TlvStorageGetter = (t: Types.SingleMetricType) => TlvFilesStorage + export default class webRTC { - private storage: Storage + //private storage: Storage private log = getLogger({ component: 'webRTC' }) private connections: Record = {} private _nostrSend: NostrSend - private utils: Utils - constructor(storage: Storage, utils: Utils) { - this.storage = storage - this.utils = utils + private tlvStorageGetter: TlvStorageGetter + //private utils: Utils + constructor(tlvStorageGetter: TlvStorageGetter) { + this.tlvStorageGetter = tlvStorageGetter } attachNostrSend(f: NostrSend) { this._nostrSend = f @@ -31,12 +34,12 @@ export default class webRTC { this._nostrSend(initiator, data, relays) } - private sendCandidate = (u: UserInfo, candidate: string) => { + private sendCandidate = (u: WebRtcUserInfo, candidate: string) => { const message: Types.WebRtcCandidate & { requestId: string, status: 'OK' } = { candidate, requestId: "SubToWebRtcCandidates", status: 'OK' } this.nostrSend({ type: 'app', appId: u.appId }, { type: 'content', content: JSON.stringify(message), pub: u.userPub }) } - OnMessage = async (u: UserInfo, message: Types.WebRtcMessage_message): Promise => { + OnMessage = async (u: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise => { if (message.type === Types.WebRtcMessage_message_type.OFFER) { return this.connect(u, message.offer) } else if (message.type === Types.WebRtcMessage_message_type.CANDIDATE) { @@ -45,7 +48,7 @@ export default class webRTC { return {} } - private onCandidate = async (u: UserInfo, candidate: string): Promise => { + private onCandidate = async (u: WebRtcUserInfo, candidate: string): Promise => { const key = this.getConnectionsKey(u) if (!this.connections[key]) { throw new Error('Connection not found') @@ -57,7 +60,7 @@ export default class webRTC { } return {} } - private connect = async (u: UserInfo, offer: string): Promise => { + private connect = async (u: WebRtcUserInfo, offer: string): Promise => { const key = this.getConnectionsKey(u) this.log("connect", key) if (this.connections[key]) { @@ -94,17 +97,7 @@ export default class webRTC { this.log(ERROR, 'SingleUsageMetricReqValidate', err) return } - let tlvStorage: TlvStorageInterface - switch (j.metric_type) { - case Types.SingleMetricType.USAGE_METRIC: - tlvStorage = this.storage.metricsEventStorage.tlvStorage - break - case Types.SingleMetricType.BUNDLE_METRIC: - tlvStorage = this.utils.stateBundler.tlvStorage - break - default: - throw new Error("Unknown metric type") - } + const tlvStorage = this.tlvStorageGetter(j.metric_type) const { fileData } = await tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 @@ -132,7 +125,7 @@ export default class webRTC { return { answer: JSON.stringify(answer) } } - getConnectionsKey = (u: UserInfo) => { + getConnectionsKey = (u: WebRtcUserInfo) => { return u.appId + ":" + u.userPub } } \ No newline at end of file From 98ad3be9efc2b43aa79b808938e1b8bc11433d70 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 19:45:09 +0000 Subject: [PATCH 14/23] fix nostr send --- src/services/storage/tlv/tlvFilesStorageProcessor.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index dada201f..940f9fc2 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -93,6 +93,14 @@ class TlvFilesStorageProcessor { throw new Error('Unknown metric type: ' + t) } }) + this.wrtc.attachNostrSend((initiator: SendInitiator, data: SendData, relays?: string[] | undefined) => { + this.sendResponse({ + success: true, + type: 'nostrSend', + data: { initiator, data, relays }, + opId: Math.random().toString() + }); + }) } private async handleOperation(operation: TlvStorageOperation) { From a328d2a3dbe3d5fb2d4e2bd0267e9d32d558cc72 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 20:51:17 +0000 Subject: [PATCH 15/23] collect mem data for all processes --- metric_cache/last24hSF.json | 1 + src/nostrMiddleware.ts | 2 +- src/services/helpers/utilsWrapper.ts | 15 +++++-- src/services/main/init.ts | 6 +-- src/services/nostr/handler.ts | 14 +++++- src/services/nostr/index.ts | 8 +++- src/services/storage/db/storageInterface.ts | 20 ++++++++- src/services/storage/index.ts | 13 +++--- src/services/storage/metricsStorage.ts | 7 ++- .../storage/tlv/processMetricsCollector.ts | 39 ++++++++++++++++ src/services/storage/tlv/stateBundler.ts | 4 +- .../storage/tlv/tlvFilesStorageFactory.ts | 17 +++++-- .../storage/tlv/tlvFilesStorageProcessor.ts | 45 ++++++++++++++++++- src/tests/networkSetup.ts | 2 +- src/tests/testBase.ts | 8 ++-- 15 files changed, 171 insertions(+), 30 deletions(-) create mode 100644 metric_cache/last24hSF.json create mode 100644 src/services/storage/tlv/processMetricsCollector.ts diff --git a/metric_cache/last24hSF.json b/metric_cache/last24hSF.json new file mode 100644 index 00000000..0637a088 --- /dev/null +++ b/metric_cache/last24hSF.json @@ -0,0 +1 @@ +[] \ No newline at end of file diff --git a/src/nostrMiddleware.ts b/src/nostrMiddleware.ts index 39d79605..4b6f6cef 100644 --- a/src/nostrMiddleware.ts +++ b/src/nostrMiddleware.ts @@ -38,7 +38,7 @@ export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSett }, logger: { log: console.log, error: err => log(ERROR, err) }, }) - const nostr = new Nostr(nostrSettings, event => { + const nostr = new Nostr(nostrSettings, mainHandler.utils, event => { let j: NostrRequest try { j = JSON.parse(event.content) diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index 7c83e186..71a02bdd 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -2,15 +2,24 @@ import { MainSettings } from "../main/settings.js"; import { StateBundler } from "../storage/tlv/stateBundler.js"; import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; import { NostrSend } from "../nostr/handler.js"; +import { ProcessMetricsCollector } from "../storage/tlv/processMetricsCollector.js"; +type UtilsSettings = { + noCollector?: boolean + dataDir: string +} export class Utils { tlvStorageFactory: TlvStorageFactory stateBundler: StateBundler settings: MainSettings _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } - constructor(settings: MainSettings) { - this.settings = settings + constructor({ noCollector, dataDir }: UtilsSettings) { this.tlvStorageFactory = new TlvStorageFactory() - this.stateBundler = new StateBundler(settings.storageSettings, this.tlvStorageFactory) + this.stateBundler = new StateBundler(dataDir, this.tlvStorageFactory) + if (!noCollector) { + new ProcessMetricsCollector((metrics) => { + this.tlvStorageFactory.ProcessMetrics(metrics, '') + }) + } } attachNostrSend(f: NostrSend) { diff --git a/src/services/main/init.ts b/src/services/main/init.ts index 5f2d4034..35a5e37d 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -17,9 +17,9 @@ export type AppData = { name: string; } export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => { - const utils = new Utils(mainSettings) - const storageManager = new Storage(mainSettings.storageSettings) - await storageManager.Connect(log, utils.tlvStorageFactory) + const utils = new Utils({ dataDir: mainSettings.storageSettings.dataDir }) + const storageManager = new Storage(mainSettings.storageSettings, utils) + await storageManager.Connect(log) /* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) if (manualMigration) { return diff --git a/src/services/nostr/handler.ts b/src/services/nostr/handler.ts index c84a2c5f..34050167 100644 --- a/src/services/nostr/handler.ts +++ b/src/services/nostr/handler.ts @@ -6,6 +6,7 @@ import { SimplePool, Event, UnsignedEvent, getEventHash, finalizeEvent, Relay, n import { ERROR, getLogger } from '../helpers/logger.js' import { nip19 } from 'nostr-tools' import { encrypt as encryptV1, decrypt as decryptV1, getSharedSecret as getConversationKeyV1 } from './nip44v1.js' +import { ProcessMetrics, ProcessMetricsCollector } from '../storage/tlv/processMetricsCollector.js' const { nprofileEncode } = nip19 const { v2 } = nip44 const { encrypt: encryptV2, decrypt: decryptV2, utils } = v2 @@ -50,9 +51,13 @@ type EventResponse = { type: 'event' event: NostrEvent } +type ProcessMetricsResponse = { + type: 'processMetrics' + metrics: ProcessMetrics +} export type ChildProcessRequest = SettingsRequest | SendRequest -export type ChildProcessResponse = ReadyResponse | EventResponse +export type ChildProcessResponse = ReadyResponse | EventResponse | ProcessMetricsResponse const send = (message: ChildProcessResponse) => { if (process.send) { process.send(message, undefined, undefined, err => { @@ -88,6 +93,13 @@ const initSubprocessHandler = (settings: NostrSettings) => { event: event }) }) + + new ProcessMetricsCollector((metrics) => { + send({ + type: 'processMetrics', + metrics + }) + }) } const sendToNostr: NostrSend = (initiator, data, relays) => { if (!subProcessHandler) { diff --git a/src/services/nostr/index.ts b/src/services/nostr/index.ts index 845cbbd2..cbe55034 100644 --- a/src/services/nostr/index.ts +++ b/src/services/nostr/index.ts @@ -1,6 +1,7 @@ import { ChildProcess, fork } from 'child_process' import { EnvMustBeNonEmptyString } from "../helpers/envParser.js" import { NostrSettings, NostrEvent, ChildProcessRequest, ChildProcessResponse, SendData, SendInitiator } from "./handler.js" +import { Utils } from '../helpers/utilsWrapper.js' type EventCallback = (event: NostrEvent) => void const getEnvOrDefault = (name: string, defaultValue: string): string => { @@ -17,7 +18,9 @@ export const LoadNosrtSettingsFromEnv = (test = false) => { export default class NostrSubprocess { settings: NostrSettings childProcess: ChildProcess - constructor(settings: NostrSettings, eventCallback: EventCallback) { + utils: Utils + constructor(settings: NostrSettings, utils: Utils, eventCallback: EventCallback) { + this.utils = utils this.childProcess = fork("./build/src/services/nostr/handler") this.childProcess.on("error", console.error) this.childProcess.on("message", (message: ChildProcessResponse) => { @@ -28,6 +31,9 @@ export default class NostrSubprocess { case 'event': eventCallback(message.event) break + case 'processMetrics': + this.utils.tlvStorageFactory.ProcessMetrics(message.metrics, 'nostr') + break default: console.error("unknown nostr event response", message) break; diff --git a/src/services/storage/db/storageInterface.ts b/src/services/storage/db/storageInterface.ts index ae97409a..00794f08 100644 --- a/src/services/storage/db/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -11,9 +11,12 @@ import { DecrementOperation, SumOperation, DBNames, + SuccessOperationResponse, } from './storageProcessor.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType.js'; import { serializeRequest, WhereCondition } from './serializationHelpers.js'; +import { Utils } from '../../helpers/utilsWrapper.js'; +import { ProcessMetrics } from '../tlv/processMetricsCollector.js'; export type TX = (txId: string) => Promise @@ -22,21 +25,33 @@ export class StorageInterface extends EventEmitter { private process: ChildProcess; private isConnected: boolean = false; private debug: boolean = false; + private utils: Utils + private dbType: 'main' | 'metrics' - constructor() { + constructor(utils: Utils) { super(); this.initializeSubprocess(); + this.utils = utils } setDebug(debug: boolean) { this.debug = debug; } + private handleCollectedProcessMetrics(metrics: SuccessOperationResponse) { + if (!this.dbType) return + this.utils.tlvStorageFactory.ProcessMetrics(metrics.data, this.dbType + '_storage') + } + private initializeSubprocess() { this.process = fork('./build/src/services/storage/db/storageProcessor'); this.process.on('message', (response: OperationResponse) => { - this.emit(response.opId, response); + if (response.success && response.type === 'processMetrics') { + this.handleCollectedProcessMetrics(response) + } else { + this.emit(response.opId, response); + } }); this.process.on('error', (error: Error) => { @@ -54,6 +69,7 @@ export class StorageInterface extends EventEmitter { Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise { const opId = Math.random().toString() + this.dbType = dbType const connectOp: ConnectOperation = { type: 'connect', opId, settings, dbType } return this.handleOp(connectOp) } diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index e832290c..e8f9e771 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -13,6 +13,7 @@ import OfferStorage from "./offerStorage.js" import { StorageInterface, TX } from "./db/storageInterface.js"; import { PubLogger } from "../helpers/logger.js" import { TlvStorageFactory } from './tlv/tlvFilesStorageFactory.js'; +import { Utils } from '../helpers/utilsWrapper.js'; export type StorageSettings = { dbSettings: DbSettings eventLogPath: string @@ -36,12 +37,14 @@ export default class { debitStorage: DebitStorage offerStorage: OfferStorage eventsLog: EventsLogManager - constructor(settings: StorageSettings) { + utils: Utils + constructor(settings: StorageSettings, utils: Utils) { this.settings = settings + this.utils = utils this.eventsLog = new EventsLogManager(settings.eventLogPath) } - async Connect(log: PubLogger, tlvStorageFactory: TlvStorageFactory) { - this.dbs = new StorageInterface() + async Connect(log: PubLogger) { + this.dbs = new StorageInterface(this.utils) await this.dbs.Connect(this.settings.dbSettings, 'main') //const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations) //this.DB = source @@ -50,8 +53,8 @@ export default class { this.productStorage = new ProductStorage(this.dbs) this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage) this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage) - this.metricsStorage = new MetricsStorage(this.settings) - this.metricsEventStorage = new MetricsEventStorage(this.settings, tlvStorageFactory) + this.metricsStorage = new MetricsStorage(this.settings, this.utils) + this.metricsEventStorage = new MetricsEventStorage(this.settings, this.utils.tlvStorageFactory) this.liquidityStorage = new LiquidityStorage(this.dbs) this.debitStorage = new DebitStorage(this.dbs) this.offerStorage = new OfferStorage(this.dbs) diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index a47b20b3..3dd36e2f 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -7,19 +7,22 @@ import { newMetricsDb } from "./db/db.js"; import { ChannelRouting } from "./entity/ChannelRouting.js"; import { RootOperation } from "./entity/RootOperation.js"; import { StorageInterface } from "./db/storageInterface.js"; +import { Utils } from "../helpers/utilsWrapper.js"; export default class { //DB: DataSource | EntityManager settings: StorageSettings dbs: StorageInterface + utils: Utils //txQueue: TransactionsQueue - constructor(settings: StorageSettings) { + constructor(settings: StorageSettings, utils: Utils) { this.settings = settings; + this.utils = utils } async Connect() { //const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) //this.DB = source; //this.txQueue = new TransactionsQueue("metrics", this.DB) - this.dbs = new StorageInterface() + this.dbs = new StorageInterface(this.utils) await this.dbs.Connect(this.settings.dbSettings, 'metrics') //return executedMigrations; } diff --git a/src/services/storage/tlv/processMetricsCollector.ts b/src/services/storage/tlv/processMetricsCollector.ts new file mode 100644 index 00000000..52047a40 --- /dev/null +++ b/src/services/storage/tlv/processMetricsCollector.ts @@ -0,0 +1,39 @@ +import { getLogger } from "../../helpers/logger.js" +export type ProcessMetrics = { + memory_rss_kb?: number + memory_buffer_kb?: number + memory_heap_total_kb?: number + memory_heap_used_kb?: number + memory_external_kb?: number +} +export class ProcessMetricsCollector { + reportLog = getLogger({ component: 'ProcessMetricsCollector' }) + prevValues: Record = {} + interval: NodeJS.Timeout + constructor(cb: (metrics: ProcessMetrics) => void) { + this.interval = setInterval(() => { + const mem = process.memoryUsage() + const metrics: ProcessMetrics = { + memory_rss_kb: this.AddValue('memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true), + memory_buffer_kb: this.AddValue('memory_buffer_kb', Math.ceil(mem.arrayBuffers / 1000 || 0), true), + memory_heap_total_kb: this.AddValue('memory_heap_total_kb', Math.ceil(mem.heapTotal / 1000 || 0), true), + memory_heap_used_kb: this.AddValue('memory_heap_used_kb', Math.ceil(mem.heapUsed / 1000 || 0), true), + memory_external_kb: this.AddValue('memory_external_kb', Math.ceil(mem.external / 1000 || 0), true), + } + + cb(metrics) + }, 60 * 1000) + } + + Stop() { + clearInterval(this.interval) + } + + AddValue = (key: string, v: number, updateOnly = false): number | undefined => { + if (updateOnly && this.prevValues[key] === v) { + return + } + this.prevValues[key] = v + return v + } +} diff --git a/src/services/storage/tlv/stateBundler.ts b/src/services/storage/tlv/stateBundler.ts index a8b634b4..6873b8f1 100644 --- a/src/services/storage/tlv/stateBundler.ts +++ b/src/services/storage/tlv/stateBundler.ts @@ -34,8 +34,8 @@ export class StateBundler { reportLog = getLogger({ component: 'stateBundlerReport' }) prevValues: Record = {} interval: NodeJS.Timeout - constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { - const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/") + constructor(dataDir: string, tlvStorageFactory: TlvStorageFactory) { + const bundlerPath = [dataDir, "bundler_events"].filter(s => !!s).join("/") this.tlvStorage = tlvStorageFactory.NewStorage({ name: "bundler", path: bundlerPath }) this.interval = setInterval(() => { const mem = process.memoryUsage() diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index f6eb6b06..fc8bfc4d 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -1,10 +1,11 @@ import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; -import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation } from './tlvFilesStorageProcessor'; +import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation, ProcessMetricsTlvOperation } from './tlvFilesStorageProcessor'; import { LatestData, TlvFile } from './tlvFilesStorage'; import { NostrSend, SendData, SendInitiator } from '../../nostr/handler'; import { WebRtcUserInfo } from '../../webRTC'; import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { ProcessMetrics } from './processMetricsCollector'; export type TlvStorageInterface = { AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise LoadLatest: (limit?: number) => Promise @@ -79,7 +80,7 @@ export class TlvStorageFactory extends EventEmitter { async LoadLatest(storageName: string, limit?: number): Promise { const opId = Math.random().toString() - const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit, debug: true } + const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit } const latestData = await this.handleOp(op) const deserializedLatestData: LatestData = {} for (const appId in latestData) { @@ -93,17 +94,25 @@ export class TlvStorageFactory extends EventEmitter { async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { const opId = Math.random().toString() - const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk, debug: true } + const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk } const tlvFile = await this.handleOp(op) return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } } WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise { const opId = Math.random().toString() - const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message, debug: true } + const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message } return this.handleOp(op) } + ProcessMetrics(metrics: ProcessMetrics, processName: string): Promise { + const opId = Math.random().toString() + const op: ProcessMetricsTlvOperation = { type: 'processMetrics', opId, metrics, processName } + return this.handleOp(op) + } + + + private handleOp(op: ITlvStorageOperation): Promise { const debug = this.debug || op.debug if (debug) console.log('handleOp', op) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index 940f9fc2..c9d3a229 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -4,6 +4,8 @@ import { TlvFilesStorage } from './tlvFilesStorage.js'; import * as Types from '../../../../proto/autogenerated/ts/types.js' import { SendData } from '../../nostr/handler.js'; import { SendInitiator } from '../../nostr/handler.js'; +import { ProcessMetrics, ProcessMetricsCollector } from './processMetricsCollector.js'; +import { integerToUint8Array } from '../../helpers/tlv.js'; export type SerializableLatestData = Record> export type SerializableTlvFile = { base64fileData: string, chunks: number[] } export type TlvStorageSettings = { @@ -54,6 +56,14 @@ export type WebRtcMessageOperation = { debug?: boolean } +export type ProcessMetricsTlvOperation = { + type: 'processMetrics' + opId: string + processName?: string + metrics: ProcessMetrics + debug?: boolean +} + export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } export interface ITlvStorageOperation { @@ -62,7 +72,7 @@ export interface ITlvStorageOperation { debug?: boolean } -export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation | ProcessMetricsTlvOperation export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse @@ -101,6 +111,26 @@ class TlvFilesStorageProcessor { opId: Math.random().toString() }); }) + new ProcessMetricsCollector((pMetrics) => { + this.saveProcessMetrics(pMetrics, 'tlv_processor') + }) + } + + private serializeNowTlv = (v: number) => { + const nowUnix = Math.floor(Date.now() / 1000) + const entry = new Uint8Array(8) + entry.set(integerToUint8Array(nowUnix), 0) + entry.set(integerToUint8Array(v), 4) + return entry + } + + private saveProcessMetrics = (pMetrics: ProcessMetrics, processName = "") => { + const pName = processName ? '_' + processName : '' + if (pMetrics.memory_rss_kb) this.storages['bundle'].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb)) + if (pMetrics.memory_buffer_kb) this.storages['bundle'].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb)) + if (pMetrics.memory_heap_total_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb)) + if (pMetrics.memory_heap_used_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_used_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_used_kb)) + if (pMetrics.memory_external_kb) this.storages['bundle'].AddTlv('_root', 'memory_external_kb' + pName, this.serializeNowTlv(pMetrics.memory_external_kb)) } private async handleOperation(operation: TlvStorageOperation) { @@ -123,6 +153,9 @@ class TlvFilesStorageProcessor { case 'webRtcMessage': await this.handleWebRtcMessage(operation); break; + case 'processMetrics': + await this.handleProcessMetrics(operation); + break; default: this.sendResponse({ success: false, @@ -230,6 +263,16 @@ class TlvFilesStorageProcessor { }); } + private async handleProcessMetrics(operation: ProcessMetricsTlvOperation) { + this.saveProcessMetrics(operation.metrics, operation.processName) + this.sendResponse({ + success: true, + type: 'processMetrics', + data: null, + opId: operation.opId + }); + } + private sendResponse(response: TlvOperationResponse) { if (process.send) { process.send(response); diff --git a/src/tests/networkSetup.ts b/src/tests/networkSetup.ts index 3de2a23b..9ebcaa17 100644 --- a/src/tests/networkSetup.ts +++ b/src/tests/networkSetup.ts @@ -10,7 +10,7 @@ export const setupNetwork = async () => { await core.InitAddress() await core.Mine(1) const tlvStorageFactory = new TlvStorageFactory() - const setupUtils = new Utils(settings) + const setupUtils = new Utils({ dataDir: settings.storageSettings.dataDir }) const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) await tryUntil(async i => { diff --git a/src/tests/testBase.ts b/src/tests/testBase.ts index 2fff9c05..cccb4982 100644 --- a/src/tests/testBase.ts +++ b/src/tests/testBase.ts @@ -44,9 +44,9 @@ export type StorageTestBase = { export const setupStorageTest = async (d: Describe): Promise => { const settings = GetTestStorageSettings() - const storageManager = new Storage(settings) - const tlvStorageFactory = new TlvStorageFactory() - await storageManager.Connect(console.log, tlvStorageFactory) + const utils = new Utils({ dataDir: settings.dataDir }) + const storageManager = new Storage(settings, utils) + await storageManager.Connect(console.log) return { expect, storage: storageManager, @@ -71,7 +71,7 @@ export const SetupTest = async (d: Describe): Promise => { const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId } const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId } - const extermnalUtils = new Utils(settings) + const extermnalUtils = new Utils({ dataDir: settings.storageSettings.dataDir }) const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { }) await externalAccessToMainLnd.Warmup() From 8b504412dfb9626c96457a3f816e19ecc9007546 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 20:53:53 +0000 Subject: [PATCH 16/23] fix gitignore --- .gitignore | 2 ++ metric_cache/last24hSF.json | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) delete mode 100644 metric_cache/last24hSF.json diff --git a/.gitignore b/.gitignore index 0e58d9f6..44fa292e 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,5 @@ app.nprofile admin.connect debug.txt proto/autogenerated/debug.txt + + diff --git a/metric_cache/last24hSF.json b/metric_cache/last24hSF.json deleted file mode 100644 index 0637a088..00000000 --- a/metric_cache/last24hSF.json +++ /dev/null @@ -1 +0,0 @@ -[] \ No newline at end of file From f10874b774c1b28bb688afb7fee8e062a32f1ba5 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 20:54:19 +0000 Subject: [PATCH 17/23] up gitingore --- .gitignore | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 44fa292e..32399854 100644 --- a/.gitignore +++ b/.gitignore @@ -22,5 +22,6 @@ app.nprofile admin.connect debug.txt proto/autogenerated/debug.txt - - +metrics_cache/ +metrics_events/ +bundler_events/ \ No newline at end of file From 0085225a9590a93d1d36dd20bf6978b19e8d987e Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 21:00:39 +0000 Subject: [PATCH 18/23] up --- src/services/storage/tlv/tlvFilesStorageProcessor.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index c9d3a229..4299d0ba 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -126,6 +126,10 @@ class TlvFilesStorageProcessor { private saveProcessMetrics = (pMetrics: ProcessMetrics, processName = "") => { const pName = processName ? '_' + processName : '' + if (!this.storages['bundle']) { + console.log('no bundle storage yet') + return + } if (pMetrics.memory_rss_kb) this.storages['bundle'].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb)) if (pMetrics.memory_buffer_kb) this.storages['bundle'].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb)) if (pMetrics.memory_heap_total_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb)) From fe11182eec5219d0def326fb06859c760c870db2 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 21:07:10 +0000 Subject: [PATCH 19/23] naimng fix --- .../storage/tlv/tlvFilesStorageProcessor.ts | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index 4299d0ba..a2bf0900 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -8,9 +8,11 @@ import { ProcessMetrics, ProcessMetricsCollector } from './processMetricsCollect import { integerToUint8Array } from '../../helpers/tlv.js'; export type SerializableLatestData = Record> export type SerializableTlvFile = { base64fileData: string, chunks: number[] } +const usageStorageName = 'usage' +const bundlerStorageName = 'bundler' export type TlvStorageSettings = { path: string - name: string + name: typeof usageStorageName | typeof bundlerStorageName } export type NewTlvStorageOperation = { @@ -96,9 +98,9 @@ class TlvFilesStorageProcessor { this.wrtc = new webRTC(t => { switch (t) { case Types.SingleMetricType.USAGE_METRIC: - return this.storages['usage'] + return this.storages[usageStorageName] case Types.SingleMetricType.BUNDLE_METRIC: - return this.storages['bundle'] + return this.storages[bundlerStorageName] default: throw new Error('Unknown metric type: ' + t) } @@ -126,15 +128,15 @@ class TlvFilesStorageProcessor { private saveProcessMetrics = (pMetrics: ProcessMetrics, processName = "") => { const pName = processName ? '_' + processName : '' - if (!this.storages['bundle']) { + if (!this.storages[bundlerStorageName]) { console.log('no bundle storage yet') return } - if (pMetrics.memory_rss_kb) this.storages['bundle'].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb)) - if (pMetrics.memory_buffer_kb) this.storages['bundle'].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb)) - if (pMetrics.memory_heap_total_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb)) - if (pMetrics.memory_heap_used_kb) this.storages['bundle'].AddTlv('_root', 'memory_heap_used_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_used_kb)) - if (pMetrics.memory_external_kb) this.storages['bundle'].AddTlv('_root', 'memory_external_kb' + pName, this.serializeNowTlv(pMetrics.memory_external_kb)) + if (pMetrics.memory_rss_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb)) + if (pMetrics.memory_buffer_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb)) + if (pMetrics.memory_heap_total_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb)) + if (pMetrics.memory_heap_used_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_heap_used_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_used_kb)) + if (pMetrics.memory_external_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_external_kb' + pName, this.serializeNowTlv(pMetrics.memory_external_kb)) } private async handleOperation(operation: TlvStorageOperation) { From b3ac15ff632eb01e208bf866bfe2b29bdb27bcbf Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 21:11:30 +0000 Subject: [PATCH 20/23] fix --- src/services/storage/tlv/metricsEventStorage.ts | 2 +- src/services/storage/tlv/stateBundler.ts | 2 +- src/services/storage/tlv/tlvFilesStorageProcessor.ts | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/services/storage/tlv/metricsEventStorage.ts b/src/services/storage/tlv/metricsEventStorage.ts index 24a8c683..f27805f9 100644 --- a/src/services/storage/tlv/metricsEventStorage.ts +++ b/src/services/storage/tlv/metricsEventStorage.ts @@ -10,7 +10,7 @@ export default class { lastPersistedCache: number = 0 constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { const metricsPath = [settings.dataDir, "metric_events"].filter(s => !!s).join("/") - this.tlvStorage = tlvStorageFactory.NewStorage({ name: "metrics", path: metricsPath }) + this.tlvStorage = tlvStorageFactory.NewStorage({ name: 'usage', path: metricsPath }) this.cachePath = [settings.dataDir, "metric_cache"].filter(s => !!s).join("/") if (!fs.existsSync(this.cachePath)) { fs.mkdirSync(this.cachePath, { recursive: true }); diff --git a/src/services/storage/tlv/stateBundler.ts b/src/services/storage/tlv/stateBundler.ts index 6873b8f1..fa696645 100644 --- a/src/services/storage/tlv/stateBundler.ts +++ b/src/services/storage/tlv/stateBundler.ts @@ -36,7 +36,7 @@ export class StateBundler { interval: NodeJS.Timeout constructor(dataDir: string, tlvStorageFactory: TlvStorageFactory) { const bundlerPath = [dataDir, "bundler_events"].filter(s => !!s).join("/") - this.tlvStorage = tlvStorageFactory.NewStorage({ name: "bundler", path: bundlerPath }) + this.tlvStorage = tlvStorageFactory.NewStorage({ name: 'bundler', path: bundlerPath }) this.interval = setInterval(() => { const mem = process.memoryUsage() this.AddValue('_root', 'memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index a2bf0900..7cc97e1e 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -8,8 +8,8 @@ import { ProcessMetrics, ProcessMetricsCollector } from './processMetricsCollect import { integerToUint8Array } from '../../helpers/tlv.js'; export type SerializableLatestData = Record> export type SerializableTlvFile = { base64fileData: string, chunks: number[] } -const usageStorageName = 'usage' -const bundlerStorageName = 'bundler' +export const usageStorageName = 'usage' +export const bundlerStorageName = 'bundler' export type TlvStorageSettings = { path: string name: typeof usageStorageName | typeof bundlerStorageName From 5a5306ce74d50758a77c7390786d8d25d591debe Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 1 Apr 2025 21:17:10 +0000 Subject: [PATCH 21/23] storage process metrics --- src/services/storage/db/storageProcessor.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/services/storage/db/storageProcessor.ts b/src/services/storage/db/storageProcessor.ts index f42cec42..549b9b0d 100644 --- a/src/services/storage/db/storageProcessor.ts +++ b/src/services/storage/db/storageProcessor.ts @@ -5,6 +5,7 @@ import { allMetricsMigrations, allMigrations } from '../migrations/runner.js'; import transactionsQueue from './transactionsQueue.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType'; import { deserializeRequest, WhereCondition } from './serializationHelpers.js'; +import { ProcessMetricsCollector } from '../tlv/processMetricsCollector.js'; export type DBNames = MainDbNames | MetricsDbNames export type QueryOptions = { @@ -164,6 +165,15 @@ class StorageProcessor { process.on('error', (error: Error) => { console.error('Error in storage processor:', error); }); + + new ProcessMetricsCollector((pMetrics) => { + this.sendResponse({ + success: true, + type: 'processMetrics', + data: pMetrics, + opId: Math.random().toString() + }) + }) } private async handleOperation(operation: StorageOperation) { From f162d6e7483aeee2170bb66aa271b4690e0644a2 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 2 Apr 2025 14:45:18 +0000 Subject: [PATCH 22/23] single metrics err --- src/services/webRTC/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 678ba236..011b6d56 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -94,7 +94,7 @@ export default class webRTC { metrics_name_CustomCheck: name => name !== "" }) if (err) { - this.log(ERROR, 'SingleUsageMetricReqValidate', err) + this.log(ERROR, 'SingleUsageMetricReqValidate', err.message || err) return } const tlvStorage = this.tlvStorageGetter(j.metric_type) From f2445dd850e08408187c561839293e171c9a37cd Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 2 Apr 2025 14:46:59 +0000 Subject: [PATCH 23/23] fix check --- src/services/webRTC/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 011b6d56..043af1ba 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -90,7 +90,7 @@ export default class webRTC { try { const j = JSON.parse(event.data) as Types.SingleMetricReq const err = Types.SingleMetricReqValidate(j, { - app_id_CustomCheck: id => id === u.appId, + app_id_CustomCheck: id => id !== "", metrics_name_CustomCheck: name => name !== "" }) if (err) {