diff --git a/.gitignore b/.gitignore index 0e58d9f6..32399854 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,6 @@ app.nprofile admin.connect debug.txt proto/autogenerated/debug.txt +metrics_cache/ +metrics_events/ +bundler_events/ \ No newline at end of file diff --git a/src/nostrMiddleware.ts b/src/nostrMiddleware.ts index 39d79605..4b6f6cef 100644 --- a/src/nostrMiddleware.ts +++ b/src/nostrMiddleware.ts @@ -38,7 +38,7 @@ export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSett }, logger: { log: console.log, error: err => log(ERROR, err) }, }) - const nostr = new Nostr(nostrSettings, event => { + const nostr = new Nostr(nostrSettings, mainHandler.utils, event => { let j: NostrRequest try { j = JSON.parse(event.content) diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index 0dc61d63..71a02bdd 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -1,16 +1,34 @@ import { MainSettings } from "../main/settings.js"; -import { StateBundler } from "../storage/stateBundler.js"; - +import { StateBundler } from "../storage/tlv/stateBundler.js"; +import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; +import { NostrSend } from "../nostr/handler.js"; +import { ProcessMetricsCollector } from "../storage/tlv/processMetricsCollector.js"; +type UtilsSettings = { + noCollector?: boolean + dataDir: string +} export class Utils { - + tlvStorageFactory: TlvStorageFactory stateBundler: StateBundler settings: MainSettings - constructor(settings: MainSettings) { - this.settings = settings - this.stateBundler = new StateBundler(settings.storageSettings) + _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } + constructor({ noCollector, dataDir }: UtilsSettings) { + this.tlvStorageFactory = new TlvStorageFactory() + this.stateBundler = new StateBundler(dataDir, this.tlvStorageFactory) + if (!noCollector) { + new ProcessMetricsCollector((metrics) => { + this.tlvStorageFactory.ProcessMetrics(metrics, '') + }) + } + } + + attachNostrSend(f: NostrSend) { + this._nostrSend = f + this.tlvStorageFactory.attachNostrSend(f) } Stop() { this.stateBundler.Stop() + this.tlvStorageFactory.disconnect() } } \ No newline at end of file diff --git a/src/services/lnd/lnd.ts b/src/services/lnd/lnd.ts index 3df13920..c638bb66 100644 --- a/src/services/lnd/lnd.ts +++ b/src/services/lnd/lnd.ts @@ -18,7 +18,7 @@ import { ERROR, getLogger } from '../helpers/logger.js'; import { HtlcEvent_EventType } from '../../../proto/lnd/router.js'; import { LiquidityProvider, LiquidityRequest } from '../main/liquidityProvider.js'; import { Utils } from '../helpers/utilsWrapper.js'; -import { TxPointSettings } from '../storage/stateBundler.js'; +import { TxPointSettings } from '../storage/tlv/stateBundler.js'; import { WalletKitClient } from '../../../proto/lnd/walletkit.client.js'; const DeadLineMetadata = (deadline = 10 * 1000) => ({ deadline: Date.now() + deadline }) const deadLndRetrySeconds = 5 diff --git a/src/services/main/debitManager.ts b/src/services/main/debitManager.ts index 23fd8b2c..f16c2a57 100644 --- a/src/services/main/debitManager.ts +++ b/src/services/main/debitManager.ts @@ -186,7 +186,7 @@ export class DebitManager { this.notifyPaymentSuccess(appUser, debitRes, op, { appId: ctx.app_id, pub: req.npub, id: req.request_id }) return default: - throw new Error("invalid response type") + throw new Error("invalid debit response type") } } diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 18860b99..0aaf5c4e 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -55,7 +55,7 @@ export default class { utils: Utils rugPullTracker: RugPullTracker unlocker: Unlocker - webRTC: webRTC + //webRTC: webRTC nostrSend: NostrSend = () => { getLogger({})("nostr send not initialized yet") } constructor(settings: MainSettings, storage: Storage, adminManager: AdminManager, utils: Utils, unlocker: Unlocker) { this.settings = settings @@ -76,7 +76,7 @@ export default class { this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager) this.debitManager = new DebitManager(this.storage, this.lnd, this.applicationManager) this.offerManager = new OfferManager(this.storage, this.lnd, this.applicationManager, this.productManager) - this.webRTC = new webRTC(this.storage, this.utils) + //this.webRTC = new webRTC(this.storage, this.utils) } @@ -99,7 +99,8 @@ export default class { this.liquidityProvider.attachNostrSend(f) this.debitManager.attachNostrSend(f) this.offerManager.attachNostrSend(f) - this.webRTC.attachNostrSend(f) + this.utils.attachNostrSend(f) + //this.webRTC.attachNostrSend(f) } htlcCb: HtlcCb = (e) => { diff --git a/src/services/main/init.ts b/src/services/main/init.ts index 2389510b..35a5e37d 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -9,6 +9,7 @@ import { LoadMainSettingsFromEnv, MainSettings } from "./settings.js" import { Utils } from "../helpers/utilsWrapper.js" import { Wizard } from "../wizard/index.js" import { AdminManager } from "./adminManager.js" +import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js" export type AppData = { privateKey: string; publicKey: string; @@ -16,8 +17,8 @@ export type AppData = { name: string; } export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => { - const utils = new Utils(mainSettings) - const storageManager = new Storage(mainSettings.storageSettings) + const utils = new Utils({ dataDir: mainSettings.storageSettings.dataDir }) + const storageManager = new Storage(mainSettings.storageSettings, utils) await storageManager.Connect(log) /* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) if (manualMigration) { diff --git a/src/services/nostr/handler.ts b/src/services/nostr/handler.ts index c84a2c5f..34050167 100644 --- a/src/services/nostr/handler.ts +++ b/src/services/nostr/handler.ts @@ -6,6 +6,7 @@ import { SimplePool, Event, UnsignedEvent, getEventHash, finalizeEvent, Relay, n import { ERROR, getLogger } from '../helpers/logger.js' import { nip19 } from 'nostr-tools' import { encrypt as encryptV1, decrypt as decryptV1, getSharedSecret as getConversationKeyV1 } from './nip44v1.js' +import { ProcessMetrics, ProcessMetricsCollector } from '../storage/tlv/processMetricsCollector.js' const { nprofileEncode } = nip19 const { v2 } = nip44 const { encrypt: encryptV2, decrypt: decryptV2, utils } = v2 @@ -50,9 +51,13 @@ type EventResponse = { type: 'event' event: NostrEvent } +type ProcessMetricsResponse = { + type: 'processMetrics' + metrics: ProcessMetrics +} export type ChildProcessRequest = SettingsRequest | SendRequest -export type ChildProcessResponse = ReadyResponse | EventResponse +export type ChildProcessResponse = ReadyResponse | EventResponse | ProcessMetricsResponse const send = (message: ChildProcessResponse) => { if (process.send) { process.send(message, undefined, undefined, err => { @@ -88,6 +93,13 @@ const initSubprocessHandler = (settings: NostrSettings) => { event: event }) }) + + new ProcessMetricsCollector((metrics) => { + send({ + type: 'processMetrics', + metrics + }) + }) } const sendToNostr: NostrSend = (initiator, data, relays) => { if (!subProcessHandler) { diff --git a/src/services/nostr/index.ts b/src/services/nostr/index.ts index 845cbbd2..cbe55034 100644 --- a/src/services/nostr/index.ts +++ b/src/services/nostr/index.ts @@ -1,6 +1,7 @@ import { ChildProcess, fork } from 'child_process' import { EnvMustBeNonEmptyString } from "../helpers/envParser.js" import { NostrSettings, NostrEvent, ChildProcessRequest, ChildProcessResponse, SendData, SendInitiator } from "./handler.js" +import { Utils } from '../helpers/utilsWrapper.js' type EventCallback = (event: NostrEvent) => void const getEnvOrDefault = (name: string, defaultValue: string): string => { @@ -17,7 +18,9 @@ export const LoadNosrtSettingsFromEnv = (test = false) => { export default class NostrSubprocess { settings: NostrSettings childProcess: ChildProcess - constructor(settings: NostrSettings, eventCallback: EventCallback) { + utils: Utils + constructor(settings: NostrSettings, utils: Utils, eventCallback: EventCallback) { + this.utils = utils this.childProcess = fork("./build/src/services/nostr/handler") this.childProcess.on("error", console.error) this.childProcess.on("message", (message: ChildProcessResponse) => { @@ -28,6 +31,9 @@ export default class NostrSubprocess { case 'event': eventCallback(message.event) break + case 'processMetrics': + this.utils.tlvStorageFactory.ProcessMetrics(message.metrics, 'nostr') + break default: console.error("unknown nostr event response", message) break; diff --git a/src/services/serverMethods/index.ts b/src/services/serverMethods/index.ts index 1b57deec..678c9cea 100644 --- a/src/services/serverMethods/index.ts +++ b/src/services/serverMethods/index.ts @@ -11,7 +11,8 @@ export default (mainHandler: Main): Types.ServerMethods => { offer_CustomCheck: offer => offer !== '', } }) - return mainHandler.webRTC.OnMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message) + if (err != null) throw new Error(err.message) + return mainHandler.utils.tlvStorageFactory.WebRtcMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message) }, SubToWebRtcCandidates: async ({ ctx }) => { }, GetUsageMetrics: async ({ ctx, req }) => { diff --git a/src/services/storage/applicationStorage.ts b/src/services/storage/applicationStorage.ts index 9bd8c6ba..c5b87bb4 100644 --- a/src/services/storage/applicationStorage.ts +++ b/src/services/storage/applicationStorage.ts @@ -7,7 +7,7 @@ import { ApplicationUser } from './entity/ApplicationUser.js'; import { getLogger } from '../helpers/logger.js'; import { User } from './entity/User.js'; import { InviteToken } from './entity/InviteToken.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export default class { dbs: StorageInterface userStorage: UserStorage diff --git a/src/services/storage/db.ts b/src/services/storage/db/db.ts similarity index 74% rename from src/services/storage/db.ts rename to src/services/storage/db/db.ts index 39b67ac1..12e8b396 100644 --- a/src/services/storage/db.ts +++ b/src/services/storage/db/db.ts @@ -1,29 +1,29 @@ import "reflect-metadata" import { DataSource, Migration } from "typeorm" -import { AddressReceivingTransaction } from "./entity/AddressReceivingTransaction.js" -import { User } from "./entity/User.js" -import { UserReceivingAddress } from "./entity/UserReceivingAddress.js" -import { UserReceivingInvoice } from "./entity/UserReceivingInvoice.js" -import { UserInvoicePayment } from "./entity/UserInvoicePayment.js" -import { EnvMustBeNonEmptyString } from "../helpers/envParser.js" -import { UserTransactionPayment } from "./entity/UserTransactionPayment.js" -import { UserBasicAuth } from "./entity/UserBasicAuth.js" -import { UserEphemeralKey } from "./entity/UserEphemeralKey.js" -import { UserToUserPayment } from "./entity/UserToUserPayment.js" -import { Application } from "./entity/Application.js" -import { ApplicationUser } from "./entity/ApplicationUser.js" -import { BalanceEvent } from "./entity/BalanceEvent.js" -import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" -import { getLogger } from "../helpers/logger.js" -import { ChannelRouting } from "./entity/ChannelRouting.js" -import { LspOrder } from "./entity/LspOrder.js" -import { Product } from "./entity/Product.js" -import { LndNodeInfo } from "./entity/LndNodeInfo.js" -import { TrackedProvider } from "./entity/TrackedProvider.js" -import { InviteToken } from "./entity/InviteToken.js" -import { DebitAccess } from "./entity/DebitAccess.js" -import { RootOperation } from "./entity/RootOperation.js" -import { UserOffer } from "./entity/UserOffer.js" +import { AddressReceivingTransaction } from "../entity/AddressReceivingTransaction.js" +import { User } from "../entity/User.js" +import { UserReceivingAddress } from "../entity/UserReceivingAddress.js" +import { UserReceivingInvoice } from "../entity/UserReceivingInvoice.js" +import { UserInvoicePayment } from "../entity/UserInvoicePayment.js" +import { EnvMustBeNonEmptyString } from "../../helpers/envParser.js" +import { UserTransactionPayment } from "../entity/UserTransactionPayment.js" +import { UserBasicAuth } from "../entity/UserBasicAuth.js" +import { UserEphemeralKey } from "../entity/UserEphemeralKey.js" +import { UserToUserPayment } from "../entity/UserToUserPayment.js" +import { Application } from "../entity/Application.js" +import { ApplicationUser } from "../entity/ApplicationUser.js" +import { BalanceEvent } from "../entity/BalanceEvent.js" +import { ChannelBalanceEvent } from "../entity/ChannelsBalanceEvent.js" +import { getLogger } from "../../helpers/logger.js" +import { ChannelRouting } from "../entity/ChannelRouting.js" +import { LspOrder } from "../entity/LspOrder.js" +import { Product } from "../entity/Product.js" +import { LndNodeInfo } from "../entity/LndNodeInfo.js" +import { TrackedProvider } from "../entity/TrackedProvider.js" +import { InviteToken } from "../entity/InviteToken.js" +import { DebitAccess } from "../entity/DebitAccess.js" +import { RootOperation } from "../entity/RootOperation.js" +import { UserOffer } from "../entity/UserOffer.js" export type DbSettings = { @@ -70,7 +70,7 @@ export const MainDbEntities = { export type MainDbNames = keyof typeof MainDbEntities export const MainDbEntitiesNames = Object.keys(MainDbEntities) -const MetricsDbEntities = { +export const MetricsDbEntities = { 'BalanceEvent': BalanceEvent, 'ChannelBalanceEvent': ChannelBalanceEvent, 'ChannelRouting': ChannelRouting, diff --git a/src/services/storage/serializationHelpers.ts b/src/services/storage/db/serializationHelpers.ts similarity index 100% rename from src/services/storage/serializationHelpers.ts rename to src/services/storage/db/serializationHelpers.ts diff --git a/src/services/storage/storageInterface.ts b/src/services/storage/db/storageInterface.ts similarity index 73% rename from src/services/storage/storageInterface.ts rename to src/services/storage/db/storageInterface.ts index ca0cfdfd..00794f08 100644 --- a/src/services/storage/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -1,4 +1,4 @@ -import { fork } from 'child_process'; +import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; import { DbSettings, MainDbNames } from './db.js'; import { DeepPartial, FindOptionsWhere } from 'typeorm'; @@ -10,32 +10,48 @@ import { IncrementOperation, DecrementOperation, SumOperation, + DBNames, + SuccessOperationResponse, } from './storageProcessor.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType.js'; import { serializeRequest, WhereCondition } from './serializationHelpers.js'; +import { Utils } from '../../helpers/utilsWrapper.js'; +import { ProcessMetrics } from '../tlv/processMetricsCollector.js'; export type TX = (txId: string) => Promise export class StorageInterface extends EventEmitter { - private process: any; + private process: ChildProcess; private isConnected: boolean = false; private debug: boolean = false; + private utils: Utils + private dbType: 'main' | 'metrics' - constructor() { + constructor(utils: Utils) { super(); this.initializeSubprocess(); + this.utils = utils } setDebug(debug: boolean) { this.debug = debug; } + private handleCollectedProcessMetrics(metrics: SuccessOperationResponse) { + if (!this.dbType) return + this.utils.tlvStorageFactory.ProcessMetrics(metrics.data, this.dbType + '_storage') + } + private initializeSubprocess() { - this.process = fork('./build/src/services/storage/storageProcessor'); + this.process = fork('./build/src/services/storage/db/storageProcessor'); this.process.on('message', (response: OperationResponse) => { - this.emit(response.opId, response); + if (response.success && response.type === 'processMetrics') { + this.handleCollectedProcessMetrics(response) + } else { + this.emit(response.opId, response); + } }); this.process.on('error', (error: Error) => { @@ -51,61 +67,62 @@ export class StorageInterface extends EventEmitter { this.isConnected = true; } - Connect(settings: DbSettings): Promise { + Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise { const opId = Math.random().toString() - const connectOp: ConnectOperation = { type: 'connect', opId, settings } + this.dbType = dbType + const connectOp: ConnectOperation = { type: 'connect', opId, settings, dbType } return this.handleOp(connectOp) } - Delete(entity: MainDbNames, q: number | FindOptionsWhere, txId?: string): Promise { + Delete(entity: DBNames, q: number | FindOptionsWhere, txId?: string): Promise { const opId = Math.random().toString() const deleteOp: DeleteOperation = { type: 'delete', entity, opId, q, txId } return this.handleOp(deleteOp) } - Remove(entity: MainDbNames, q: T, txId?: string): Promise { + Remove(entity: DBNames, q: T, txId?: string): Promise { const opId = Math.random().toString() const removeOp: RemoveOperation = { type: 'remove', entity, opId, q, txId } return this.handleOp(removeOp) } - FindOne(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + FindOne(entity: DBNames, q: QueryOptions, txId?: string): Promise { const opId = Math.random().toString() const findOp: FindOneOperation = { type: 'findOne', entity, opId, q, txId } return this.handleOp(findOp) } - Find(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + Find(entity: DBNames, q: QueryOptions, txId?: string): Promise { const opId = Math.random().toString() const findOp: FindOperation = { type: 'find', entity, opId, q, txId } return this.handleOp(findOp) } - Sum(entity: MainDbNames, columnName: PickKeysByType, q: WhereCondition, txId?: string): Promise { + Sum(entity: DBNames, columnName: PickKeysByType, q: WhereCondition, txId?: string): Promise { const opId = Math.random().toString() const sumOp: SumOperation = { type: 'sum', entity, opId, columnName, q, txId } return this.handleOp(sumOp) } - Update(entity: MainDbNames, q: number | FindOptionsWhere, toUpdate: DeepPartial, txId?: string): Promise { + Update(entity: DBNames, q: number | FindOptionsWhere, toUpdate: DeepPartial, txId?: string): Promise { const opId = Math.random().toString() const updateOp: UpdateOperation = { type: 'update', entity, opId, toUpdate, q, txId } return this.handleOp(updateOp) } - Increment(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + Increment(entity: DBNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { const opId = Math.random().toString() const incrementOp: IncrementOperation = { type: 'increment', entity, opId, q, propertyPath, value, txId } return this.handleOp(incrementOp) } - Decrement(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + Decrement(entity: DBNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { const opId = Math.random().toString() const decrementOp: DecrementOperation = { type: 'decrement', entity, opId, q, propertyPath, value, txId } return this.handleOp(decrementOp) } - CreateAndSave(entity: MainDbNames, toSave: DeepPartial, txId?: string): Promise { + CreateAndSave(entity: DBNames, toSave: DeepPartial, txId?: string): Promise { const opId = Math.random().toString() const createAndSaveOp: CreateAndSaveOperation = { type: 'createAndSave', entity, opId, toSave, txId } return this.handleOp(createAndSaveOp) @@ -147,7 +164,7 @@ export class StorageInterface extends EventEmitter { return } if (response.type !== op.type) { - reject(new Error('Invalid response type')); + reject(new Error('Invalid storage response type')); return } resolve(response.data); diff --git a/src/services/storage/storageProcessor.ts b/src/services/storage/db/storageProcessor.ts similarity index 75% rename from src/services/storage/storageProcessor.ts rename to src/services/storage/db/storageProcessor.ts index e20252d1..549b9b0d 100644 --- a/src/services/storage/storageProcessor.ts +++ b/src/services/storage/db/storageProcessor.ts @@ -1,12 +1,13 @@ import { DataSource, EntityManager, DeepPartial, FindOptionsWhere, FindOptionsOrder, FindOperator } from 'typeorm'; -import NewDB, { DbSettings, MainDbEntities, MainDbNames, newMetricsDb } from './db.js'; -import { PubLogger, getLogger } from '../helpers/logger.js'; -import { allMetricsMigrations, allMigrations } from './migrations/runner.js'; +import NewDB, { DbSettings, MainDbEntities, MainDbNames, MetricsDbEntities, MetricsDbNames, newMetricsDb } from './db.js'; +import { PubLogger, getLogger } from '../../helpers/logger.js'; +import { allMetricsMigrations, allMigrations } from '../migrations/runner.js'; import transactionsQueue from './transactionsQueue.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType'; import { deserializeRequest, WhereCondition } from './serializationHelpers.js'; +import { ProcessMetricsCollector } from '../tlv/processMetricsCollector.js'; - +export type DBNames = MainDbNames | MetricsDbNames export type QueryOptions = { where?: WhereCondition order?: FindOptionsOrder @@ -17,6 +18,7 @@ export type ConnectOperation = { type: 'connect' opId: string settings: DbSettings + dbType: 'main' | 'metrics' debug?: boolean } @@ -36,7 +38,7 @@ export type EndTxOperation = { export type DeleteOperation = { type: 'delete' - entity: MainDbNames + entity: DBNames opId: string q: number | FindOptionsWhere txId?: string @@ -45,7 +47,7 @@ export type DeleteOperation = { export type RemoveOperation = { type: 'remove' - entity: MainDbNames + entity: DBNames opId: string q: T txId?: string @@ -54,7 +56,7 @@ export type RemoveOperation = { export type UpdateOperation = { type: 'update' - entity: MainDbNames + entity: DBNames opId: string toUpdate: DeepPartial q: number | FindOptionsWhere @@ -64,7 +66,7 @@ export type UpdateOperation = { export type IncrementOperation = { type: 'increment' - entity: MainDbNames + entity: DBNames opId: string q: FindOptionsWhere propertyPath: string, @@ -75,7 +77,7 @@ export type IncrementOperation = { export type DecrementOperation = { type: 'decrement' - entity: MainDbNames + entity: DBNames opId: string q: FindOptionsWhere propertyPath: string, @@ -86,7 +88,7 @@ export type DecrementOperation = { export type FindOneOperation = { type: 'findOne' - entity: MainDbNames + entity: DBNames opId: string q: QueryOptions txId?: string @@ -95,7 +97,7 @@ export type FindOneOperation = { export type FindOperation = { type: 'find' - entity: MainDbNames + entity: DBNames opId: string q: QueryOptions txId?: string @@ -104,7 +106,7 @@ export type FindOperation = { export type SumOperation = { type: 'sum' - entity: MainDbNames + entity: DBNames opId: string columnName: PickKeysByType q: WhereCondition @@ -114,7 +116,7 @@ export type SumOperation = { export type CreateAndSaveOperation = { type: 'createAndSave' - entity: MainDbNames + entity: DBNames opId: string toSave: DeepPartial txId?: string @@ -150,12 +152,12 @@ class StorageProcessor { //private locked: boolean = false private activeTransaction: ActiveTransaction | null = null //private queue: StartTxOperation[] = [] + private mode: 'main' | 'metrics' | '' = '' constructor() { if (!process.send) { throw new Error('This process must be spawned as a child process'); } - this.log = getLogger({ component: 'StorageProcessor' }) process.on('message', (operation: StorageOperation) => { this.handleOperation(operation); }); @@ -163,6 +165,15 @@ class StorageProcessor { process.on('error', (error: Error) => { console.error('Error in storage processor:', error); }); + + new ProcessMetricsCollector((pMetrics) => { + this.sendResponse({ + success: true, + type: 'processMetrics', + data: pMetrics, + opId: Math.random().toString() + }) + }) } private async handleOperation(operation: StorageOperation) { @@ -229,17 +240,38 @@ class StorageProcessor { } private async handleConnect(operation: ConnectOperation) { - const { source, executedMigrations } = await NewDB(operation.settings, allMigrations) - this.DB = source - this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB) - if (executedMigrations.length > 0) { - this.log(executedMigrations.length, "new migrations executed") - this.log("-------------------") + let migrationsExecuted = 0 + if (this.mode !== '') { + throw new Error('Already connected to a database') + } + this.log = getLogger({ component: 'StorageProcessor_' + operation.dbType }) + if (operation.dbType === 'main') { + const { source, executedMigrations } = await NewDB(operation.settings, allMigrations) + this.DB = source + this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB) + migrationsExecuted = executedMigrations.length + if (executedMigrations.length > 0) { + this.log(executedMigrations.length, "new migrations executed") + this.log("-------------------") + } + this.mode = 'main' + } else if (operation.dbType === 'metrics') { + const { source, executedMigrations } = await newMetricsDb(operation.settings, allMetricsMigrations) + this.DB = source + this.txQueue = new transactionsQueue('MetricsStorageProcessorQueue', this.DB) + migrationsExecuted = executedMigrations.length + if (executedMigrations.length > 0) { + this.log(executedMigrations.length, "new metrics migrations executed") + this.log("-------------------") + } + this.mode = 'metrics' + } else { + throw new Error('Unknown database type') } this.sendResponse({ success: true, type: 'connect', - data: executedMigrations.length, + data: migrationsExecuted, opId: operation.opId }); } @@ -303,17 +335,28 @@ class StorageProcessor { return this.activeTransaction.manager } - private getManager(txId?: string): DataSource | EntityManager { - if (txId) { - return this.getTx(txId) + private getEntity(entityName: DBNames) { + if (this.mode === 'main') { + const e = MainDbEntities[entityName as MainDbNames] + if (!e) { + throw new Error(`Unknown entity type for main db: ${entityName}`) + } + return e + } else if (this.mode === 'metrics') { + const e = MetricsDbEntities[entityName as MetricsDbNames] + if (!e) { + throw new Error(`Unknown entity type for metrics db: ${entityName}`) + } + return e + } else { + throw new Error('Unknown database mode') } - return this.DB } private async handleDelete(operation: DeleteOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).delete(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).delete(operation.q) }) this.sendResponse({ success: true, @@ -325,7 +368,7 @@ class StorageProcessor { private async handleRemove(operation: RemoveOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).remove(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).remove(operation.q) }) this.sendResponse({ @@ -338,7 +381,7 @@ class StorageProcessor { private async handleUpdate(operation: UpdateOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate) + return eM.getRepository(this.getEntity(operation.entity)).update(operation.q, operation.toUpdate) }) this.sendResponse({ @@ -351,7 +394,7 @@ class StorageProcessor { private async handleIncrement(operation: IncrementOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value) + return eM.getRepository(this.getEntity(operation.entity)).increment(operation.q, operation.propertyPath, operation.value) }) this.sendResponse({ @@ -364,7 +407,7 @@ class StorageProcessor { private async handleDecrement(operation: DecrementOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value) + return eM.getRepository(this.getEntity(operation.entity)).decrement(operation.q, operation.propertyPath, operation.value) }) this.sendResponse({ @@ -377,7 +420,7 @@ class StorageProcessor { private async handleFindOne(operation: FindOneOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).findOne(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).findOne(operation.q) }) this.sendResponse({ @@ -390,7 +433,7 @@ class StorageProcessor { private async handleFind(operation: FindOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).find(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).find(operation.q) }) this.sendResponse({ @@ -403,7 +446,7 @@ class StorageProcessor { private async handleSum(operation: SumOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q) + return eM.getRepository(this.getEntity(operation.entity)).sum(operation.columnName, operation.q) }) this.sendResponse({ success: true, @@ -415,8 +458,8 @@ class StorageProcessor { private async handleCreateAndSave(operation: CreateAndSaveOperation) { const saved = await this.handleWrite(operation.txId, async eM => { - const res = eM.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) - return eM.getRepository(MainDbEntities[operation.entity]).save(res) + const res = eM.getRepository(this.getEntity(operation.entity)).create(operation.toSave) + return eM.getRepository(this.getEntity(operation.entity)).save(res) }) this.sendResponse({ diff --git a/src/services/storage/transactionsQueue.ts b/src/services/storage/db/transactionsQueue.ts similarity index 95% rename from src/services/storage/transactionsQueue.ts rename to src/services/storage/db/transactionsQueue.ts index e8275119..94c31fcd 100644 --- a/src/services/storage/transactionsQueue.ts +++ b/src/services/storage/db/transactionsQueue.ts @@ -1,5 +1,5 @@ import { DataSource, EntityManager, EntityTarget } from "typeorm" -import { PubLogger, getLogger } from "../helpers/logger.js" +import { PubLogger, getLogger } from "../../helpers/logger.js" type TX = (entityManager: EntityManager | DataSource) => Promise type TxOperation = { diff --git a/src/services/storage/debitStorage.ts b/src/services/storage/debitStorage.ts index 30fff55b..f1ed7357 100644 --- a/src/services/storage/debitStorage.ts +++ b/src/services/storage/debitStorage.ts @@ -1,5 +1,5 @@ import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; type AccessToAdd = { npub: string rules?: DebitAccessRules diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index ad214f5c..e8f9e771 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -1,18 +1,19 @@ import fs from 'fs' -import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db.js" +import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db/db.js" import ProductStorage from './productStorage.js' import ApplicationStorage from './applicationStorage.js' import UserStorage from "./userStorage.js"; import PaymentStorage from "./paymentStorage.js"; import MetricsStorage from "./metricsStorage.js"; -import MetricsEventStorage from "./metricsEventStorage.js"; +import MetricsEventStorage from "./tlv/metricsEventStorage.js"; import EventsLogManager from "./eventsLog.js"; import { LiquidityStorage } from "./liquidityStorage.js"; import DebitStorage from "./debitStorage.js" import OfferStorage from "./offerStorage.js" -import { StorageInterface, TX } from "./storageInterface.js"; -import { allMetricsMigrations, allMigrations } from "./migrations/runner.js" +import { StorageInterface, TX } from "./db/storageInterface.js"; import { PubLogger } from "../helpers/logger.js" +import { TlvStorageFactory } from './tlv/tlvFilesStorageFactory.js'; +import { Utils } from '../helpers/utilsWrapper.js'; export type StorageSettings = { dbSettings: DbSettings eventLogPath: string @@ -36,13 +37,15 @@ export default class { debitStorage: DebitStorage offerStorage: OfferStorage eventsLog: EventsLogManager - constructor(settings: StorageSettings) { + utils: Utils + constructor(settings: StorageSettings, utils: Utils) { this.settings = settings + this.utils = utils this.eventsLog = new EventsLogManager(settings.eventLogPath) } async Connect(log: PubLogger) { - this.dbs = new StorageInterface() - await this.dbs.Connect(this.settings.dbSettings) + this.dbs = new StorageInterface(this.utils) + await this.dbs.Connect(this.settings.dbSettings, 'main') //const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations) //this.DB = source //this.txQueue = new TransactionsQueue("main", this.DB) @@ -50,22 +53,22 @@ export default class { this.productStorage = new ProductStorage(this.dbs) this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage) this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage) - this.metricsStorage = new MetricsStorage(this.settings) - this.metricsEventStorage = new MetricsEventStorage(this.settings) + this.metricsStorage = new MetricsStorage(this.settings, this.utils) + this.metricsEventStorage = new MetricsEventStorage(this.settings, this.utils.tlvStorageFactory) this.liquidityStorage = new LiquidityStorage(this.dbs) this.debitStorage = new DebitStorage(this.dbs) this.offerStorage = new OfferStorage(this.dbs) try { if (this.settings.dataDir) fs.mkdirSync(this.settings.dataDir) } catch (e) { } - const executedMetricsMigrations = await this.metricsStorage.Connect(allMetricsMigrations) + /* const executedMetricsMigrations = */ await this.metricsStorage.Connect() /* if (executedMigrations.length > 0) { log(executedMigrations.length, "new migrations executed") log("-------------------") } */ - if (executedMetricsMigrations.length > 0) { - log(executedMetricsMigrations.length, "new metrics migrations executed") - log("-------------------") - } + /* if (executedMetricsMigrations.length > 0) { + log(executedMetricsMigrations.length, "new metrics migrations executed") + log("-------------------") + } */ } Stop() { diff --git a/src/services/storage/liquidityStorage.ts b/src/services/storage/liquidityStorage.ts index 2c405a41..0a609ed5 100644 --- a/src/services/storage/liquidityStorage.ts +++ b/src/services/storage/liquidityStorage.ts @@ -2,7 +2,7 @@ import { IsNull, MoreThan, Not } from "typeorm" import { LspOrder } from "./entity/LspOrder.js"; import { LndNodeInfo } from "./entity/LndNodeInfo.js"; import { TrackedProvider } from "./entity/TrackedProvider.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export class LiquidityStorage { dbs: StorageInterface constructor(dbs: StorageInterface) { diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index 839ef357..3dd36e2f 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -1,59 +1,68 @@ import { Between, DataSource, EntityManager, FindManyOptions, FindOperator, LessThanOrEqual, MoreThanOrEqual } from "typeorm" import { BalanceEvent } from "./entity/BalanceEvent.js" import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" -import TransactionsQueue from "./transactionsQueue.js"; +import TransactionsQueue from "./db/transactionsQueue.js"; import { StorageSettings } from "./index.js"; -import { newMetricsDb } from "./db.js"; +import { newMetricsDb } from "./db/db.js"; import { ChannelRouting } from "./entity/ChannelRouting.js"; import { RootOperation } from "./entity/RootOperation.js"; +import { StorageInterface } from "./db/storageInterface.js"; +import { Utils } from "../helpers/utilsWrapper.js"; export default class { - DB: DataSource | EntityManager + //DB: DataSource | EntityManager settings: StorageSettings - txQueue: TransactionsQueue - constructor(settings: StorageSettings) { + dbs: StorageInterface + utils: Utils + //txQueue: TransactionsQueue + constructor(settings: StorageSettings, utils: Utils) { this.settings = settings; + this.utils = utils } - async Connect(metricsMigrations: Function[]) { - const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) - this.DB = source; - this.txQueue = new TransactionsQueue("metrics", this.DB) - return executedMigrations; + async Connect() { + //const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) + //this.DB = source; + //this.txQueue = new TransactionsQueue("metrics", this.DB) + this.dbs = new StorageInterface(this.utils) + await this.dbs.Connect(this.settings.dbSettings, 'metrics') + //return executedMigrations; } async SaveBalanceEvents(balanceEvent: Partial, channelBalanceEvents: Partial[]) { - const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent) - const balanceEntry = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false }) + //const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent) + //const balanceEntry = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false }) + + const balanceEntry = await this.dbs.CreateAndSave('BalanceEvent', balanceEvent) + + //const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) + //const channelsEntries = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false }) + + const channelsEntries = await this.dbs.CreateAndSave('ChannelBalanceEvent', channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) - const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) - const channelsEntries = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false }) return { balanceEntry, channelsEntries } } - async GetBalanceEvents({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetBalanceEvents({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - const [chainBalanceEvents] = await Promise.all([ - entityManager.getRepository(BalanceEvent).find(q), - ]) + const chainBalanceEvents = await this.dbs.Find('BalanceEvent', q, txId) return { chainBalanceEvents } } async initChannelRoutingEvent(dayUnix: number, channelId: string) { - const existing = await this.DB.getRepository(ChannelRouting).findOne({ where: { day_unix: dayUnix, channel_id: channelId } }) + const existing = await this.dbs.FindOne('ChannelRouting', { where: { day_unix: dayUnix, channel_id: channelId } }) if (!existing) { - const entry = this.DB.getRepository(ChannelRouting).create({ day_unix: dayUnix, channel_id: channelId }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelRouting).save(entry), dbTx: false }) + return this.dbs.CreateAndSave('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }) } return existing } - GetChannelRouting({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + GetChannelRouting({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - return entityManager.getRepository(ChannelRouting).find(q) + return this.dbs.Find('ChannelRouting', q, txId) } async GetLatestForwardingIndexOffset() { - const latestIndex = await this.DB.getRepository(ChannelRouting).find({ order: { latest_index_offset: "DESC" }, take: 1 }) + const latestIndex = await this.dbs.Find('ChannelRouting', { order: { latest_index_offset: "DESC" }, take: 1 }) if (latestIndex.length > 0) { return latestIndex[0].latest_index_offset } @@ -63,50 +72,49 @@ export default class { async IncrementChannelRouting(channelId: string, event: Partial) { const dayUnix = getTodayUnix() const existing = await this.initChannelRoutingEvent(dayUnix, channelId) - const repo = this.DB.getRepository(ChannelRouting) + //const repo = this.DB.getRepository(ChannelRouting) if (event.send_errors) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors) } if (event.receive_errors) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors) } if (event.forward_errors_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input) } if (event.forward_errors_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output) } if (event.missed_forward_fee_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input) } if (event.missed_forward_fee_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output) } if (event.forward_fee_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input) } if (event.forward_fee_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output) } if (event.events_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input) } if (event.events_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output) } if (event.latest_index_offset) { - await repo.update(existing.serial_id, { latest_index_offset: event.latest_index_offset }) + await this.dbs.Update('ChannelRouting', existing.serial_id, { latest_index_offset: event.latest_index_offset }) } } - async AddRootOperation(opType: string, id: string, amount: number, entityManager = this.DB) { - const newOp = entityManager.getRepository(RootOperation).create({ operation_type: opType, operation_amount: amount, operation_identifier: id }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(RootOperation).save(newOp), dbTx: false }) + async AddRootOperation(opType: string, id: string, amount: number, txId?: string) { + return this.dbs.CreateAndSave('RootOperation', { operation_type: opType, operation_amount: amount, operation_identifier: id }, txId) } - async GetRootOperations({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetRootOperations({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - return entityManager.getRepository(RootOperation).find(q) + return this.dbs.Find('RootOperation', q, txId) } } diff --git a/src/services/storage/offerStorage.ts b/src/services/storage/offerStorage.ts index 41712d92..2a921c57 100644 --- a/src/services/storage/offerStorage.ts +++ b/src/services/storage/offerStorage.ts @@ -1,6 +1,6 @@ import crypto from 'crypto'; import { UserOffer } from "./entity/UserOffer.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index f3a7ce27..465983fb 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -11,9 +11,9 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio import { UserInvoicePayment } from './entity/UserInvoicePayment.js'; import { UserToUserPayment } from './entity/UserToUserPayment.js'; import { Application } from './entity/Application.js'; -import TransactionsQueue from "./transactionsQueue.js"; +import TransactionsQueue from "./db/transactionsQueue.js"; import { LoggedEvent } from './eventsLog.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo, offerId?: string, payerData?: Record } export const defaultInvoiceExpiry = 60 * 60 export default class { diff --git a/src/services/storage/productStorage.ts b/src/services/storage/productStorage.ts index c93be22a..26c386c5 100644 --- a/src/services/storage/productStorage.ts +++ b/src/services/storage/productStorage.ts @@ -1,6 +1,6 @@ import { Product } from "./entity/Product.js" import { User } from "./entity/User.js" -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { dbs: StorageInterface constructor(dbs: StorageInterface) { diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/tlv/metricsEventStorage.ts similarity index 84% rename from src/services/storage/metricsEventStorage.ts rename to src/services/storage/tlv/metricsEventStorage.ts index 0fe8db0a..f27805f9 100644 --- a/src/services/storage/metricsEventStorage.ts +++ b/src/services/storage/tlv/metricsEventStorage.ts @@ -1,21 +1,20 @@ import fs from 'fs' -import * as Types from '../../../proto/autogenerated/ts/types.js' -import { StorageSettings } from "./index.js"; -import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'; -import { TlvFilesStorage } from './tlvFilesStorage.js'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { StorageSettings } from "../index.js"; +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js'; +import { TlvStorageFactory, TlvStorageInterface } from './tlvFilesStorageFactory.js'; export default class { - tlvStorage: TlvFilesStorage + tlvStorage: TlvStorageInterface cachePath: string last24hCache: { ts: number, ok: number, fail: number }[] = [] lastPersistedCache: number = 0 - constructor(settings: StorageSettings) { + constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { const metricsPath = [settings.dataDir, "metric_events"].filter(s => !!s).join("/") - this.tlvStorage = new TlvFilesStorage(metricsPath) + this.tlvStorage = tlvStorageFactory.NewStorage({ name: 'usage', path: metricsPath }) this.cachePath = [settings.dataDir, "metric_cache"].filter(s => !!s).join("/") if (!fs.existsSync(this.cachePath)) { fs.mkdirSync(this.cachePath, { recursive: true }); } - this.tlvStorage.initMeta() this.loadCache() setInterval(() => { if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) { @@ -77,7 +76,7 @@ export default class { } LoadLatestMetrics = async (limit = 30): Promise => { - const raw = this.tlvStorage.LoadLatest(limit) + const raw = await this.tlvStorage.LoadLatest(limit) const metrics: Types.UsageMetrics = { apps: {} } Object.keys(raw).forEach(app => { metrics.apps[app] = { app_metrics: {} } @@ -93,7 +92,7 @@ export default class { return metrics } LoadMetricsFile = async (app: string, method: string, chunk: number): Promise => { - const { fileData, chunks } = this.tlvStorage.LoadFile(app, method, chunk) + const { fileData, chunks } = await this.tlvStorage.LoadFile(app, method, chunk) //const tlv = await this.LoadRawMetricsFile(app, method, chunk) const decoded = decodeListTLV(parseTLV(fileData)) return { diff --git a/src/services/storage/tlv/processMetricsCollector.ts b/src/services/storage/tlv/processMetricsCollector.ts new file mode 100644 index 00000000..52047a40 --- /dev/null +++ b/src/services/storage/tlv/processMetricsCollector.ts @@ -0,0 +1,39 @@ +import { getLogger } from "../../helpers/logger.js" +export type ProcessMetrics = { + memory_rss_kb?: number + memory_buffer_kb?: number + memory_heap_total_kb?: number + memory_heap_used_kb?: number + memory_external_kb?: number +} +export class ProcessMetricsCollector { + reportLog = getLogger({ component: 'ProcessMetricsCollector' }) + prevValues: Record = {} + interval: NodeJS.Timeout + constructor(cb: (metrics: ProcessMetrics) => void) { + this.interval = setInterval(() => { + const mem = process.memoryUsage() + const metrics: ProcessMetrics = { + memory_rss_kb: this.AddValue('memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true), + memory_buffer_kb: this.AddValue('memory_buffer_kb', Math.ceil(mem.arrayBuffers / 1000 || 0), true), + memory_heap_total_kb: this.AddValue('memory_heap_total_kb', Math.ceil(mem.heapTotal / 1000 || 0), true), + memory_heap_used_kb: this.AddValue('memory_heap_used_kb', Math.ceil(mem.heapUsed / 1000 || 0), true), + memory_external_kb: this.AddValue('memory_external_kb', Math.ceil(mem.external / 1000 || 0), true), + } + + cb(metrics) + }, 60 * 1000) + } + + Stop() { + clearInterval(this.interval) + } + + AddValue = (key: string, v: number, updateOnly = false): number | undefined => { + if (updateOnly && this.prevValues[key] === v) { + return + } + this.prevValues[key] = v + return v + } +} diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/tlv/stateBundler.ts similarity index 88% rename from src/services/storage/stateBundler.ts rename to src/services/storage/tlv/stateBundler.ts index 99737833..fa696645 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/tlv/stateBundler.ts @@ -1,9 +1,9 @@ -import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js" -import { getLogger } from "../helpers/logger.js" -import { decodeListTLV, integerToUint8Array, parseTLV } from "../helpers/tlv.js" -import { StorageSettings } from "./index.js" -import { TlvFilesStorage } from "./tlvFilesStorage.js" -import * as Types from "../../../proto/autogenerated/ts/types.js" +import { LatestBundleMetricReq } from "../../../../proto/autogenerated/ts/types.js" +import { getLogger } from "../../helpers/logger.js" +import { decodeListTLV, integerToUint8Array, parseTLV } from "../../helpers/tlv.js" +import { StorageSettings } from "../index.js" +import * as Types from "../../../../proto/autogenerated/ts/types.js" +import { TlvStorageFactory, TlvStorageInterface } from "./tlvFilesStorageFactory.js" const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const const maxStatePointTypes = ['maxProviderRespTime'] as const @@ -30,14 +30,13 @@ export type TxPointSettings = { } export class StateBundler { - tlvStorage: TlvFilesStorage + tlvStorage: TlvStorageInterface reportLog = getLogger({ component: 'stateBundlerReport' }) prevValues: Record = {} interval: NodeJS.Timeout - constructor(settings: StorageSettings) { - const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/") - this.tlvStorage = new TlvFilesStorage(bundlerPath) - this.tlvStorage.initMeta() + constructor(dataDir: string, tlvStorageFactory: TlvStorageFactory) { + const bundlerPath = [dataDir, "bundler_events"].filter(s => !!s).join("/") + this.tlvStorage = tlvStorageFactory.NewStorage({ name: 'bundler', path: bundlerPath }) this.interval = setInterval(() => { const mem = process.memoryUsage() this.AddValue('_root', 'memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true) @@ -53,7 +52,7 @@ export class StateBundler { } async GetBundleMetrics(req: Types.LatestBundleMetricReq): Promise { - const latest = this.tlvStorage.LoadLatest(req.limit) + const latest = await this.tlvStorage.LoadLatest(req.limit) const metrics: Types.BundleMetrics = { apps: {} } Object.keys(latest).forEach(app => { metrics.apps[app] = { app_bundles: {} } @@ -70,7 +69,7 @@ export class StateBundler { } async GetSingleBundleMetrics(req: Types.SingleMetricReq): Promise { - const { fileData, chunks } = this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page) + const { fileData, chunks } = await this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page) const decoded = decodeListTLV(parseTLV(fileData)) return { current_chunk: req.page, diff --git a/src/services/storage/tlvFilesStorage.ts b/src/services/storage/tlv/tlvFilesStorage.ts similarity index 86% rename from src/services/storage/tlvFilesStorage.ts rename to src/services/storage/tlv/tlvFilesStorage.ts index c4ad58d2..22b93e14 100644 --- a/src/services/storage/tlvFilesStorage.ts +++ b/src/services/storage/tlv/tlvFilesStorage.ts @@ -1,13 +1,14 @@ import fs from 'fs' -import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js' +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js' const chunkSizeBytes = 128 * 1024 export type LatestData = Record> +export type TlvFile = { fileData: Buffer, chunks: number[] } export class TlvFilesStorage { - storagePath: string - lastPersisted: number = 0 - meta: Record> = {} - pending: Record> = {} - metaReady = false + private storagePath: string + private lastPersisted: number = 0 + private meta: Record> = {} + private pending: Record> = {} + private metaReady = false constructor(storagePath: string) { this.storagePath = storagePath if (!fs.existsSync(this.storagePath)) { @@ -24,9 +25,9 @@ export class TlvFilesStorage { }); } - LoadFile = (app: string, dataName: string, chunk: number): { fileData: Buffer, chunks: number[] } => { + LoadFile = (app: string, dataName: string, chunk: number): TlvFile => { if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) { - throw new Error("metrics not found") + throw new Error(`tlv file for ${app} ${dataName} chunk ${chunk} not found`) } const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].filter(s => !!s).join("/") const fileData = fs.readFileSync(fullPath) @@ -71,7 +72,7 @@ export class TlvFilesStorage { return data } - persist = () => { + private persist = () => { if (!this.metaReady) { throw new Error("meta metrics not ready") } @@ -104,28 +105,28 @@ export class TlvFilesStorage { }) } - getMeta = (appId: string, dataName: string) => { + private getMeta = (appId: string, dataName: string) => { if (!this.meta[appId]) { return { chunks: [] } } return this.meta[appId][dataName] || { chunks: [] } } - initMeta = () => { + private initMeta = () => { this.foreachFile((app, dataName, tlvFiles) => { this.updateMeta(app, dataName, tlvFiles) }) this.metaReady = true } - updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { + private updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { if (!this.meta[appId]) { this.meta[appId] = {} } this.meta[appId][dataName] = { chunks: sortedChunks } } - foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { + private foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { if (!fs.existsSync(this.storagePath)) { fs.mkdirSync(this.storagePath, { recursive: true }); } diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts new file mode 100644 index 00000000..fc8bfc4d --- /dev/null +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -0,0 +1,151 @@ +import { ChildProcess, fork } from 'child_process'; +import { EventEmitter } from 'events'; +import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation, ProcessMetricsTlvOperation } from './tlvFilesStorageProcessor'; +import { LatestData, TlvFile } from './tlvFilesStorage'; +import { NostrSend, SendData, SendInitiator } from '../../nostr/handler'; +import { WebRtcUserInfo } from '../../webRTC'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { ProcessMetrics } from './processMetricsCollector'; +export type TlvStorageInterface = { + AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise + LoadLatest: (limit?: number) => Promise + LoadFile: (appId: string, dataName: string, chunk: number) => Promise +} + +export class TlvStorageFactory extends EventEmitter { + private process: ChildProcess; + private isConnected: boolean = false; + private debug: boolean = false; + private _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } + constructor() { + super(); + this.initializeSubprocess(); + } + + setDebug(debug: boolean) { + this.debug = debug; + } + + attachNostrSend(f: NostrSend) { + this._nostrSend = f + } + + private nostrSend = (opResponse: SuccessTlvOperationResponse<{ initiator: SendInitiator, data: SendData, relays?: string[] }>) => { + if (!this._nostrSend) { + throw new Error("No nostrSend attached") + } + this._nostrSend(opResponse.data.initiator, opResponse.data.data, opResponse.data.relays) + } + + private initializeSubprocess() { + this.process = fork('./build/src/services/storage/tlv/tlvFilesStorageProcessor'); + + this.process.on('message', (response: TlvOperationResponse) => { + if (response.success && response.type === 'nostrSend') { + this.nostrSend(response) + } else { + this.emit(response.opId, response); + } + }); + + this.process.on('error', (error: Error) => { + console.error('Tlv Storage processor error:', error); + this.isConnected = false; + }); + + this.process.on('exit', (code: number) => { + console.log(`Tlv Storage processor exited with code ${code}`); + this.isConnected = false; + }); + + this.isConnected = true; + } + + NewStorage(settings: TlvStorageSettings): TlvStorageInterface { + const opId = Math.random().toString() + const op: NewTlvStorageOperation = { type: 'newStorage', opId, settings } + this.handleOp(op) + return { + AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => this.AddTlv(settings.name, appId, dataName, tlv), + LoadLatest: (limit?: number) => this.LoadLatest(settings.name, limit), + LoadFile: (appId: string, dataName: string, chunk: number) => this.LoadFile(settings.name, appId, dataName, chunk) + } + } + + AddTlv(storageName: string, appId: string, dataName: string, tlv: Uint8Array): Promise { + const opId = Math.random().toString() + const op: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, base64Tlv: Buffer.from(tlv).toString('base64') } + return this.handleOp(op) + } + + async LoadLatest(storageName: string, limit?: number): Promise { + const opId = Math.random().toString() + const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit } + const latestData = await this.handleOp(op) + const deserializedLatestData: LatestData = {} + for (const appId in latestData) { + deserializedLatestData[appId] = {} + for (const dataName in latestData[appId]) { + deserializedLatestData[appId][dataName] = { tlvs: latestData[appId][dataName].base64tlvs.map(tlv => new Uint8Array(Buffer.from(tlv, 'base64'))), current_chunk: latestData[appId][dataName].current_chunk, available_chunks: latestData[appId][dataName].available_chunks } + } + } + return deserializedLatestData + } + + async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { + const opId = Math.random().toString() + const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk } + const tlvFile = await this.handleOp(op) + return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } + } + + WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise { + const opId = Math.random().toString() + const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message } + return this.handleOp(op) + } + + ProcessMetrics(metrics: ProcessMetrics, processName: string): Promise { + const opId = Math.random().toString() + const op: ProcessMetricsTlvOperation = { type: 'processMetrics', opId, metrics, processName } + return this.handleOp(op) + } + + + + private handleOp(op: ITlvStorageOperation): Promise { + const debug = this.debug || op.debug + if (debug) console.log('handleOp', op) + this.checkConnected() + return new Promise((resolve, reject) => { + const responseHandler = (response: TlvOperationResponse) => { + if (debug) console.log('tlv responseHandler', response) + if (!response.success) { + reject(new Error(response.error)); + return + } + if (response.type !== op.type) { + reject(new Error('Invalid tlv storage response type: ' + response.type + ' expected: ' + op.type)); + return + } + resolve(response.data); + } + this.once(op.opId, responseHandler) + this.process.send({ ...op, debug }) + }) + } + + private checkConnected() { + if (!this.isConnected) { + throw new Error('Tlv Storage processor is not connected'); + } + } + + public disconnect() { + if (this.process) { + this.process.kill(); + this.isConnected = false; + this.debug = false; + } + } +} \ No newline at end of file diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts new file mode 100644 index 00000000..7cc97e1e --- /dev/null +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -0,0 +1,290 @@ +import { PubLogger, getLogger } from '../../helpers/logger.js'; +import webRTC, { WebRtcUserInfo } from '../../webRTC/index.js'; +import { TlvFilesStorage } from './tlvFilesStorage.js'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { SendData } from '../../nostr/handler.js'; +import { SendInitiator } from '../../nostr/handler.js'; +import { ProcessMetrics, ProcessMetricsCollector } from './processMetricsCollector.js'; +import { integerToUint8Array } from '../../helpers/tlv.js'; +export type SerializableLatestData = Record> +export type SerializableTlvFile = { base64fileData: string, chunks: number[] } +export const usageStorageName = 'usage' +export const bundlerStorageName = 'bundler' +export type TlvStorageSettings = { + path: string + name: typeof usageStorageName | typeof bundlerStorageName +} + +export type NewTlvStorageOperation = { + type: 'newStorage' + opId: string + settings: TlvStorageSettings + debug?: boolean +} + +export type AddTlvOperation = { + type: 'addTlv' + opId: string + storageName: string + appId: string + dataName: string + base64Tlv: string + debug?: boolean +} + +export type LoadLatestTlvOperation = { + type: 'loadLatest' + opId: string + storageName: string + limit?: number + debug?: boolean +} + +export type LoadTlvFileOperation = { + type: 'loadFile' + opId: string + storageName: string + appId: string + dataName: string + chunk: number + debug?: boolean +} + +export type WebRtcMessageOperation = { + type: 'webRtcMessage' + opId: string + userInfo: WebRtcUserInfo + message: Types.WebRtcMessage_message + debug?: boolean +} + +export type ProcessMetricsTlvOperation = { + type: 'processMetrics' + opId: string + processName?: string + metrics: ProcessMetrics + debug?: boolean +} + +export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } + +export interface ITlvStorageOperation { + opId: string + type: string + debug?: boolean +} + +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation | ProcessMetricsTlvOperation + +export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } +export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse + +class TlvFilesStorageProcessor { + private log: PubLogger = console.log + private storages: Record = {} + private wrtc: webRTC + constructor() { + if (!process.send) { + throw new Error('This process must be spawned as a child process'); + } + process.on('message', (operation: TlvStorageOperation) => { + this.handleOperation(operation); + }); + + process.on('error', (error: Error) => { + console.error('Error in storage processor:', error); + }); + + this.wrtc = new webRTC(t => { + switch (t) { + case Types.SingleMetricType.USAGE_METRIC: + return this.storages[usageStorageName] + case Types.SingleMetricType.BUNDLE_METRIC: + return this.storages[bundlerStorageName] + default: + throw new Error('Unknown metric type: ' + t) + } + }) + this.wrtc.attachNostrSend((initiator: SendInitiator, data: SendData, relays?: string[] | undefined) => { + this.sendResponse({ + success: true, + type: 'nostrSend', + data: { initiator, data, relays }, + opId: Math.random().toString() + }); + }) + new ProcessMetricsCollector((pMetrics) => { + this.saveProcessMetrics(pMetrics, 'tlv_processor') + }) + } + + private serializeNowTlv = (v: number) => { + const nowUnix = Math.floor(Date.now() / 1000) + const entry = new Uint8Array(8) + entry.set(integerToUint8Array(nowUnix), 0) + entry.set(integerToUint8Array(v), 4) + return entry + } + + private saveProcessMetrics = (pMetrics: ProcessMetrics, processName = "") => { + const pName = processName ? '_' + processName : '' + if (!this.storages[bundlerStorageName]) { + console.log('no bundle storage yet') + return + } + if (pMetrics.memory_rss_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb)) + if (pMetrics.memory_buffer_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb)) + if (pMetrics.memory_heap_total_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb)) + if (pMetrics.memory_heap_used_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_heap_used_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_used_kb)) + if (pMetrics.memory_external_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_external_kb' + pName, this.serializeNowTlv(pMetrics.memory_external_kb)) + } + + private async handleOperation(operation: TlvStorageOperation) { + try { + const opId = operation.opId; + if (operation.debug) console.log('handleOperation', operation) + switch (operation.type) { + case 'newStorage': + await this.handleNewStorage(operation); + break; + case 'addTlv': + await this.handleAddTlv(operation); + break; + case 'loadLatest': + await this.handleLoadLatestTlv(operation); + break; + case 'loadFile': + await this.handleLoadTlvFile(operation); + break; + case 'webRtcMessage': + await this.handleWebRtcMessage(operation); + break; + case 'processMetrics': + await this.handleProcessMetrics(operation); + break; + default: + this.sendResponse({ + success: false, + error: `Unknown operation type: ${(operation as any).type}`, + opId + }) + return + } + } catch (error) { + this.sendResponse({ + success: false, + error: error instanceof Error ? error.message : 'Unknown error occurred', + opId: operation.opId + }); + } + } + + private async handleNewStorage(operation: NewTlvStorageOperation) { + if (this.storages[operation.settings.name]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.settings.name} already exists`, + opId: operation.opId + }) + return + } + this.storages[operation.settings.name] = new TlvFilesStorage(operation.settings.path) + this.sendResponse({ + success: true, + type: 'newStorage', + data: null, + opId: operation.opId + }); + } + + private async handleAddTlv(operation: AddTlvOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + const tlv = new Uint8Array(Buffer.from(operation.base64Tlv, 'base64')) + this.storages[operation.storageName].AddTlv(operation.appId, operation.dataName, tlv) + this.sendResponse({ + success: true, + type: 'addTlv', + data: null, + opId: operation.opId + }); + } + + private async handleLoadLatestTlv(operation: LoadLatestTlvOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + const data = this.storages[operation.storageName].LoadLatest(operation.limit) + const serializableData: SerializableLatestData = {} + for (const appId in data) { + serializableData[appId] = {} + for (const dataName in data[appId]) { + serializableData[appId][dataName] = { base64tlvs: data[appId][dataName].tlvs.map(tlv => Buffer.from(tlv).toString('base64')), current_chunk: data[appId][dataName].current_chunk, available_chunks: data[appId][dataName].available_chunks } + } + } + this.sendResponse({ + success: true, + type: 'loadLatest', + data: serializableData, + opId: operation.opId + }); + } + + private async handleLoadTlvFile(operation: LoadTlvFileOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + const data = this.storages[operation.storageName].LoadFile(operation.appId, operation.dataName, operation.chunk) + this.sendResponse({ + success: true, + type: 'loadFile', + data: { base64fileData: Buffer.from(data.fileData).toString('base64'), chunks: data.chunks }, + opId: operation.opId + }); + } + + private async handleWebRtcMessage(operation: WebRtcMessageOperation) { + const answer = await this.wrtc.OnMessage(operation.userInfo, operation.message) + this.sendResponse({ + success: true, + type: 'webRtcMessage', + data: answer, + opId: operation.opId + }); + } + + private async handleProcessMetrics(operation: ProcessMetricsTlvOperation) { + this.saveProcessMetrics(operation.metrics, operation.processName) + this.sendResponse({ + success: true, + type: 'processMetrics', + data: null, + opId: operation.opId + }); + } + + private sendResponse(response: TlvOperationResponse) { + if (process.send) { + process.send(response); + } + } +} + +// Start the storage processor +new TlvFilesStorageProcessor(); diff --git a/src/services/storage/userStorage.ts b/src/services/storage/userStorage.ts index b35d40ae..f4ed6fda 100644 --- a/src/services/storage/userStorage.ts +++ b/src/services/storage/userStorage.ts @@ -3,7 +3,7 @@ import { User } from './entity/User.js'; import { UserBasicAuth } from './entity/UserBasicAuth.js'; import { getLogger } from '../helpers/logger.js'; import EventsLogManager from './eventsLog.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export default class { dbs: StorageInterface eventsLog: EventsLogManager diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 6aab22f1..043af1ba 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -6,20 +6,23 @@ import * as Types from '../../../proto/autogenerated/ts/types.js' import { NostrSend, SendData, SendInitiator } from "../nostr/handler.js" import { encodeTLbV, encodeTLV, encodeTLVDataPacket } from '../helpers/tlv.js' import { Utils } from '../helpers/utilsWrapper.js' -import { TlvFilesStorage } from '../storage/tlvFilesStorage.js' +import { TlvFilesStorage } from '../storage/tlv/tlvFilesStorage.js' +import { TlvStorageInterface } from '../storage/tlv/tlvFilesStorageFactory.js' type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number } const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] } -type UserInfo = { userPub: string, appId: string } -export default class webRTC { +export type WebRtcUserInfo = { userPub: string, appId: string } - private storage: Storage +export type TlvStorageGetter = (t: Types.SingleMetricType) => TlvFilesStorage + +export default class webRTC { + //private storage: Storage private log = getLogger({ component: 'webRTC' }) private connections: Record = {} private _nostrSend: NostrSend - private utils: Utils - constructor(storage: Storage, utils: Utils) { - this.storage = storage - this.utils = utils + private tlvStorageGetter: TlvStorageGetter + //private utils: Utils + constructor(tlvStorageGetter: TlvStorageGetter) { + this.tlvStorageGetter = tlvStorageGetter } attachNostrSend(f: NostrSend) { this._nostrSend = f @@ -31,12 +34,12 @@ export default class webRTC { this._nostrSend(initiator, data, relays) } - private sendCandidate = (u: UserInfo, candidate: string) => { + private sendCandidate = (u: WebRtcUserInfo, candidate: string) => { const message: Types.WebRtcCandidate & { requestId: string, status: 'OK' } = { candidate, requestId: "SubToWebRtcCandidates", status: 'OK' } this.nostrSend({ type: 'app', appId: u.appId }, { type: 'content', content: JSON.stringify(message), pub: u.userPub }) } - OnMessage = async (u: UserInfo, message: Types.WebRtcMessage_message): Promise => { + OnMessage = async (u: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise => { if (message.type === Types.WebRtcMessage_message_type.OFFER) { return this.connect(u, message.offer) } else if (message.type === Types.WebRtcMessage_message_type.CANDIDATE) { @@ -45,7 +48,7 @@ export default class webRTC { return {} } - private onCandidate = async (u: UserInfo, candidate: string): Promise => { + private onCandidate = async (u: WebRtcUserInfo, candidate: string): Promise => { const key = this.getConnectionsKey(u) if (!this.connections[key]) { throw new Error('Connection not found') @@ -57,7 +60,7 @@ export default class webRTC { } return {} } - private connect = async (u: UserInfo, offer: string): Promise => { + private connect = async (u: WebRtcUserInfo, offer: string): Promise => { const key = this.getConnectionsKey(u) this.log("connect", key) if (this.connections[key]) { @@ -87,25 +90,15 @@ export default class webRTC { try { const j = JSON.parse(event.data) as Types.SingleMetricReq const err = Types.SingleMetricReqValidate(j, { - app_id_CustomCheck: id => id === u.appId, + app_id_CustomCheck: id => id !== "", metrics_name_CustomCheck: name => name !== "" }) if (err) { - this.log(ERROR, 'SingleUsageMetricReqValidate', err) + this.log(ERROR, 'SingleUsageMetricReqValidate', err.message || err) return } - let tlvStorage: TlvFilesStorage - switch (j.metric_type) { - case Types.SingleMetricType.USAGE_METRIC: - tlvStorage = this.storage.metricsEventStorage.tlvStorage - break - case Types.SingleMetricType.BUNDLE_METRIC: - tlvStorage = this.utils.stateBundler.tlvStorage - break - default: - throw new Error("Unknown metric type") - } - const { fileData } = tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) + const tlvStorage = this.tlvStorageGetter(j.metric_type) + const { fileData } = await tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 const packets: Buffer[] = [] @@ -132,7 +125,7 @@ export default class webRTC { return { answer: JSON.stringify(answer) } } - getConnectionsKey = (u: UserInfo) => { + getConnectionsKey = (u: WebRtcUserInfo) => { return u.appId + ":" + u.userPub } } \ No newline at end of file diff --git a/src/tests/networkSetup.ts b/src/tests/networkSetup.ts index 487450a1..9ebcaa17 100644 --- a/src/tests/networkSetup.ts +++ b/src/tests/networkSetup.ts @@ -3,13 +3,14 @@ import { BitcoinCoreWrapper } from "./bitcoinCore.js" import LND from '../services/lnd/lnd.js' import { LiquidityProvider } from "../services/main/liquidityProvider.js" import { Utils } from "../services/helpers/utilsWrapper.js" - +import { TlvStorageFactory } from "../services/storage/tlv/tlvFilesStorageFactory.js" export const setupNetwork = async () => { const settings = LoadTestSettingsFromEnv() const core = new BitcoinCoreWrapper(settings) await core.InitAddress() await core.Mine(1) - const setupUtils = new Utils(settings) + const tlvStorageFactory = new TlvStorageFactory() + const setupUtils = new Utils({ dataDir: settings.storageSettings.dataDir }) const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) await tryUntil(async i => { diff --git a/src/tests/testBase.ts b/src/tests/testBase.ts index a3b716b7..cccb4982 100644 --- a/src/tests/testBase.ts +++ b/src/tests/testBase.ts @@ -13,6 +13,7 @@ import { getLogger, resetDisabledLoggers } from '../services/helpers/logger.js' import { LiquidityProvider } from '../services/main/liquidityProvider.js' import { Utils } from '../services/helpers/utilsWrapper.js' import { AdminManager } from '../services/main/adminManager.js' +import { TlvStorageFactory } from '../services/storage/tlv/tlvFilesStorageFactory.js' chai.use(chaiString) export const expect = chai.expect export type Describe = (message: string, failure?: boolean) => void @@ -43,7 +44,8 @@ export type StorageTestBase = { export const setupStorageTest = async (d: Describe): Promise => { const settings = GetTestStorageSettings() - const storageManager = new Storage(settings) + const utils = new Utils({ dataDir: settings.dataDir }) + const storageManager = new Storage(settings, utils) await storageManager.Connect(console.log) return { expect, @@ -69,7 +71,7 @@ export const SetupTest = async (d: Describe): Promise => { const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId } const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId } - const extermnalUtils = new Utils(settings) + const extermnalUtils = new Utils({ dataDir: settings.storageSettings.dataDir }) const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { }) await externalAccessToMainLnd.Warmup()