diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index 0dc61d63..35698712 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -1,13 +1,14 @@ import { MainSettings } from "../main/settings.js"; -import { StateBundler } from "../storage/stateBundler.js"; +import { StateBundler } from "../storage/tlv/stateBundler.js"; +import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; export class Utils { stateBundler: StateBundler settings: MainSettings - constructor(settings: MainSettings) { + constructor(settings: MainSettings, tlvStorageFactory: TlvStorageFactory) { this.settings = settings - this.stateBundler = new StateBundler(settings.storageSettings) + this.stateBundler = new StateBundler(settings.storageSettings, tlvStorageFactory) } Stop() { diff --git a/src/services/lnd/lnd.ts b/src/services/lnd/lnd.ts index 3df13920..c638bb66 100644 --- a/src/services/lnd/lnd.ts +++ b/src/services/lnd/lnd.ts @@ -18,7 +18,7 @@ import { ERROR, getLogger } from '../helpers/logger.js'; import { HtlcEvent_EventType } from '../../../proto/lnd/router.js'; import { LiquidityProvider, LiquidityRequest } from '../main/liquidityProvider.js'; import { Utils } from '../helpers/utilsWrapper.js'; -import { TxPointSettings } from '../storage/stateBundler.js'; +import { TxPointSettings } from '../storage/tlv/stateBundler.js'; import { WalletKitClient } from '../../../proto/lnd/walletkit.client.js'; const DeadLineMetadata = (deadline = 10 * 1000) => ({ deadline: Date.now() + deadline }) const deadLndRetrySeconds = 5 diff --git a/src/services/main/init.ts b/src/services/main/init.ts index 2389510b..cc284c88 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -9,6 +9,7 @@ import { LoadMainSettingsFromEnv, MainSettings } from "./settings.js" import { Utils } from "../helpers/utilsWrapper.js" import { Wizard } from "../wizard/index.js" import { AdminManager } from "./adminManager.js" +import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js" export type AppData = { privateKey: string; publicKey: string; @@ -16,9 +17,10 @@ export type AppData = { name: string; } export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => { - const utils = new Utils(mainSettings) + const tlvStorageFactory = new TlvStorageFactory() + const utils = new Utils(mainSettings, tlvStorageFactory) const storageManager = new Storage(mainSettings.storageSettings) - await storageManager.Connect(log) + await storageManager.Connect(log, tlvStorageFactory) /* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) if (manualMigration) { return diff --git a/src/services/storage/applicationStorage.ts b/src/services/storage/applicationStorage.ts index 9bd8c6ba..c5b87bb4 100644 --- a/src/services/storage/applicationStorage.ts +++ b/src/services/storage/applicationStorage.ts @@ -7,7 +7,7 @@ import { ApplicationUser } from './entity/ApplicationUser.js'; import { getLogger } from '../helpers/logger.js'; import { User } from './entity/User.js'; import { InviteToken } from './entity/InviteToken.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export default class { dbs: StorageInterface userStorage: UserStorage diff --git a/src/services/storage/db.ts b/src/services/storage/db/db.ts similarity index 74% rename from src/services/storage/db.ts rename to src/services/storage/db/db.ts index 39b67ac1..12e8b396 100644 --- a/src/services/storage/db.ts +++ b/src/services/storage/db/db.ts @@ -1,29 +1,29 @@ import "reflect-metadata" import { DataSource, Migration } from "typeorm" -import { AddressReceivingTransaction } from "./entity/AddressReceivingTransaction.js" -import { User } from "./entity/User.js" -import { UserReceivingAddress } from "./entity/UserReceivingAddress.js" -import { UserReceivingInvoice } from "./entity/UserReceivingInvoice.js" -import { UserInvoicePayment } from "./entity/UserInvoicePayment.js" -import { EnvMustBeNonEmptyString } from "../helpers/envParser.js" -import { UserTransactionPayment } from "./entity/UserTransactionPayment.js" -import { UserBasicAuth } from "./entity/UserBasicAuth.js" -import { UserEphemeralKey } from "./entity/UserEphemeralKey.js" -import { UserToUserPayment } from "./entity/UserToUserPayment.js" -import { Application } from "./entity/Application.js" -import { ApplicationUser } from "./entity/ApplicationUser.js" -import { BalanceEvent } from "./entity/BalanceEvent.js" -import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" -import { getLogger } from "../helpers/logger.js" -import { ChannelRouting } from "./entity/ChannelRouting.js" -import { LspOrder } from "./entity/LspOrder.js" -import { Product } from "./entity/Product.js" -import { LndNodeInfo } from "./entity/LndNodeInfo.js" -import { TrackedProvider } from "./entity/TrackedProvider.js" -import { InviteToken } from "./entity/InviteToken.js" -import { DebitAccess } from "./entity/DebitAccess.js" -import { RootOperation } from "./entity/RootOperation.js" -import { UserOffer } from "./entity/UserOffer.js" +import { AddressReceivingTransaction } from "../entity/AddressReceivingTransaction.js" +import { User } from "../entity/User.js" +import { UserReceivingAddress } from "../entity/UserReceivingAddress.js" +import { UserReceivingInvoice } from "../entity/UserReceivingInvoice.js" +import { UserInvoicePayment } from "../entity/UserInvoicePayment.js" +import { EnvMustBeNonEmptyString } from "../../helpers/envParser.js" +import { UserTransactionPayment } from "../entity/UserTransactionPayment.js" +import { UserBasicAuth } from "../entity/UserBasicAuth.js" +import { UserEphemeralKey } from "../entity/UserEphemeralKey.js" +import { UserToUserPayment } from "../entity/UserToUserPayment.js" +import { Application } from "../entity/Application.js" +import { ApplicationUser } from "../entity/ApplicationUser.js" +import { BalanceEvent } from "../entity/BalanceEvent.js" +import { ChannelBalanceEvent } from "../entity/ChannelsBalanceEvent.js" +import { getLogger } from "../../helpers/logger.js" +import { ChannelRouting } from "../entity/ChannelRouting.js" +import { LspOrder } from "../entity/LspOrder.js" +import { Product } from "../entity/Product.js" +import { LndNodeInfo } from "../entity/LndNodeInfo.js" +import { TrackedProvider } from "../entity/TrackedProvider.js" +import { InviteToken } from "../entity/InviteToken.js" +import { DebitAccess } from "../entity/DebitAccess.js" +import { RootOperation } from "../entity/RootOperation.js" +import { UserOffer } from "../entity/UserOffer.js" export type DbSettings = { @@ -70,7 +70,7 @@ export const MainDbEntities = { export type MainDbNames = keyof typeof MainDbEntities export const MainDbEntitiesNames = Object.keys(MainDbEntities) -const MetricsDbEntities = { +export const MetricsDbEntities = { 'BalanceEvent': BalanceEvent, 'ChannelBalanceEvent': ChannelBalanceEvent, 'ChannelRouting': ChannelRouting, diff --git a/src/services/storage/serializationHelpers.ts b/src/services/storage/db/serializationHelpers.ts similarity index 100% rename from src/services/storage/serializationHelpers.ts rename to src/services/storage/db/serializationHelpers.ts diff --git a/src/services/storage/storageInterface.ts b/src/services/storage/db/storageInterface.ts similarity index 83% rename from src/services/storage/storageInterface.ts rename to src/services/storage/db/storageInterface.ts index ca0cfdfd..a7a04f78 100644 --- a/src/services/storage/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -10,6 +10,7 @@ import { IncrementOperation, DecrementOperation, SumOperation, + DBNames, } from './storageProcessor.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType.js'; import { serializeRequest, WhereCondition } from './serializationHelpers.js'; @@ -32,7 +33,7 @@ export class StorageInterface extends EventEmitter { } private initializeSubprocess() { - this.process = fork('./build/src/services/storage/storageProcessor'); + this.process = fork('./build/src/services/storage/db/storageProcessor'); this.process.on('message', (response: OperationResponse) => { this.emit(response.opId, response); @@ -51,61 +52,61 @@ export class StorageInterface extends EventEmitter { this.isConnected = true; } - Connect(settings: DbSettings): Promise { + Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise { const opId = Math.random().toString() - const connectOp: ConnectOperation = { type: 'connect', opId, settings } + const connectOp: ConnectOperation = { type: 'connect', opId, settings, dbType } return this.handleOp(connectOp) } - Delete(entity: MainDbNames, q: number | FindOptionsWhere, txId?: string): Promise { + Delete(entity: DBNames, q: number | FindOptionsWhere, txId?: string): Promise { const opId = Math.random().toString() const deleteOp: DeleteOperation = { type: 'delete', entity, opId, q, txId } return this.handleOp(deleteOp) } - Remove(entity: MainDbNames, q: T, txId?: string): Promise { + Remove(entity: DBNames, q: T, txId?: string): Promise { const opId = Math.random().toString() const removeOp: RemoveOperation = { type: 'remove', entity, opId, q, txId } return this.handleOp(removeOp) } - FindOne(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + FindOne(entity: DBNames, q: QueryOptions, txId?: string): Promise { const opId = Math.random().toString() const findOp: FindOneOperation = { type: 'findOne', entity, opId, q, txId } return this.handleOp(findOp) } - Find(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + Find(entity: DBNames, q: QueryOptions, txId?: string): Promise { const opId = Math.random().toString() const findOp: FindOperation = { type: 'find', entity, opId, q, txId } return this.handleOp(findOp) } - Sum(entity: MainDbNames, columnName: PickKeysByType, q: WhereCondition, txId?: string): Promise { + Sum(entity: DBNames, columnName: PickKeysByType, q: WhereCondition, txId?: string): Promise { const opId = Math.random().toString() const sumOp: SumOperation = { type: 'sum', entity, opId, columnName, q, txId } return this.handleOp(sumOp) } - Update(entity: MainDbNames, q: number | FindOptionsWhere, toUpdate: DeepPartial, txId?: string): Promise { + Update(entity: DBNames, q: number | FindOptionsWhere, toUpdate: DeepPartial, txId?: string): Promise { const opId = Math.random().toString() const updateOp: UpdateOperation = { type: 'update', entity, opId, toUpdate, q, txId } return this.handleOp(updateOp) } - Increment(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + Increment(entity: DBNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { const opId = Math.random().toString() const incrementOp: IncrementOperation = { type: 'increment', entity, opId, q, propertyPath, value, txId } return this.handleOp(incrementOp) } - Decrement(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + Decrement(entity: DBNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { const opId = Math.random().toString() const decrementOp: DecrementOperation = { type: 'decrement', entity, opId, q, propertyPath, value, txId } return this.handleOp(decrementOp) } - CreateAndSave(entity: MainDbNames, toSave: DeepPartial, txId?: string): Promise { + CreateAndSave(entity: DBNames, toSave: DeepPartial, txId?: string): Promise { const opId = Math.random().toString() const createAndSaveOp: CreateAndSaveOperation = { type: 'createAndSave', entity, opId, toSave, txId } return this.handleOp(createAndSaveOp) diff --git a/src/services/storage/storageProcessor.ts b/src/services/storage/db/storageProcessor.ts similarity index 76% rename from src/services/storage/storageProcessor.ts rename to src/services/storage/db/storageProcessor.ts index e20252d1..f42cec42 100644 --- a/src/services/storage/storageProcessor.ts +++ b/src/services/storage/db/storageProcessor.ts @@ -1,12 +1,12 @@ import { DataSource, EntityManager, DeepPartial, FindOptionsWhere, FindOptionsOrder, FindOperator } from 'typeorm'; -import NewDB, { DbSettings, MainDbEntities, MainDbNames, newMetricsDb } from './db.js'; -import { PubLogger, getLogger } from '../helpers/logger.js'; -import { allMetricsMigrations, allMigrations } from './migrations/runner.js'; +import NewDB, { DbSettings, MainDbEntities, MainDbNames, MetricsDbEntities, MetricsDbNames, newMetricsDb } from './db.js'; +import { PubLogger, getLogger } from '../../helpers/logger.js'; +import { allMetricsMigrations, allMigrations } from '../migrations/runner.js'; import transactionsQueue from './transactionsQueue.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType'; import { deserializeRequest, WhereCondition } from './serializationHelpers.js'; - +export type DBNames = MainDbNames | MetricsDbNames export type QueryOptions = { where?: WhereCondition order?: FindOptionsOrder @@ -17,6 +17,7 @@ export type ConnectOperation = { type: 'connect' opId: string settings: DbSettings + dbType: 'main' | 'metrics' debug?: boolean } @@ -36,7 +37,7 @@ export type EndTxOperation = { export type DeleteOperation = { type: 'delete' - entity: MainDbNames + entity: DBNames opId: string q: number | FindOptionsWhere txId?: string @@ -45,7 +46,7 @@ export type DeleteOperation = { export type RemoveOperation = { type: 'remove' - entity: MainDbNames + entity: DBNames opId: string q: T txId?: string @@ -54,7 +55,7 @@ export type RemoveOperation = { export type UpdateOperation = { type: 'update' - entity: MainDbNames + entity: DBNames opId: string toUpdate: DeepPartial q: number | FindOptionsWhere @@ -64,7 +65,7 @@ export type UpdateOperation = { export type IncrementOperation = { type: 'increment' - entity: MainDbNames + entity: DBNames opId: string q: FindOptionsWhere propertyPath: string, @@ -75,7 +76,7 @@ export type IncrementOperation = { export type DecrementOperation = { type: 'decrement' - entity: MainDbNames + entity: DBNames opId: string q: FindOptionsWhere propertyPath: string, @@ -86,7 +87,7 @@ export type DecrementOperation = { export type FindOneOperation = { type: 'findOne' - entity: MainDbNames + entity: DBNames opId: string q: QueryOptions txId?: string @@ -95,7 +96,7 @@ export type FindOneOperation = { export type FindOperation = { type: 'find' - entity: MainDbNames + entity: DBNames opId: string q: QueryOptions txId?: string @@ -104,7 +105,7 @@ export type FindOperation = { export type SumOperation = { type: 'sum' - entity: MainDbNames + entity: DBNames opId: string columnName: PickKeysByType q: WhereCondition @@ -114,7 +115,7 @@ export type SumOperation = { export type CreateAndSaveOperation = { type: 'createAndSave' - entity: MainDbNames + entity: DBNames opId: string toSave: DeepPartial txId?: string @@ -150,12 +151,12 @@ class StorageProcessor { //private locked: boolean = false private activeTransaction: ActiveTransaction | null = null //private queue: StartTxOperation[] = [] + private mode: 'main' | 'metrics' | '' = '' constructor() { if (!process.send) { throw new Error('This process must be spawned as a child process'); } - this.log = getLogger({ component: 'StorageProcessor' }) process.on('message', (operation: StorageOperation) => { this.handleOperation(operation); }); @@ -229,17 +230,38 @@ class StorageProcessor { } private async handleConnect(operation: ConnectOperation) { - const { source, executedMigrations } = await NewDB(operation.settings, allMigrations) - this.DB = source - this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB) - if (executedMigrations.length > 0) { - this.log(executedMigrations.length, "new migrations executed") - this.log("-------------------") + let migrationsExecuted = 0 + if (this.mode !== '') { + throw new Error('Already connected to a database') + } + this.log = getLogger({ component: 'StorageProcessor_' + operation.dbType }) + if (operation.dbType === 'main') { + const { source, executedMigrations } = await NewDB(operation.settings, allMigrations) + this.DB = source + this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB) + migrationsExecuted = executedMigrations.length + if (executedMigrations.length > 0) { + this.log(executedMigrations.length, "new migrations executed") + this.log("-------------------") + } + this.mode = 'main' + } else if (operation.dbType === 'metrics') { + const { source, executedMigrations } = await newMetricsDb(operation.settings, allMetricsMigrations) + this.DB = source + this.txQueue = new transactionsQueue('MetricsStorageProcessorQueue', this.DB) + migrationsExecuted = executedMigrations.length + if (executedMigrations.length > 0) { + this.log(executedMigrations.length, "new metrics migrations executed") + this.log("-------------------") + } + this.mode = 'metrics' + } else { + throw new Error('Unknown database type') } this.sendResponse({ success: true, type: 'connect', - data: executedMigrations.length, + data: migrationsExecuted, opId: operation.opId }); } @@ -303,17 +325,28 @@ class StorageProcessor { return this.activeTransaction.manager } - private getManager(txId?: string): DataSource | EntityManager { - if (txId) { - return this.getTx(txId) + private getEntity(entityName: DBNames) { + if (this.mode === 'main') { + const e = MainDbEntities[entityName as MainDbNames] + if (!e) { + throw new Error(`Unknown entity type for main db: ${entityName}`) + } + return e + } else if (this.mode === 'metrics') { + const e = MetricsDbEntities[entityName as MetricsDbNames] + if (!e) { + throw new Error(`Unknown entity type for metrics db: ${entityName}`) + } + return e + } else { + throw new Error('Unknown database mode') } - return this.DB } private async handleDelete(operation: DeleteOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).delete(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).delete(operation.q) }) this.sendResponse({ success: true, @@ -325,7 +358,7 @@ class StorageProcessor { private async handleRemove(operation: RemoveOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).remove(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).remove(operation.q) }) this.sendResponse({ @@ -338,7 +371,7 @@ class StorageProcessor { private async handleUpdate(operation: UpdateOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate) + return eM.getRepository(this.getEntity(operation.entity)).update(operation.q, operation.toUpdate) }) this.sendResponse({ @@ -351,7 +384,7 @@ class StorageProcessor { private async handleIncrement(operation: IncrementOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value) + return eM.getRepository(this.getEntity(operation.entity)).increment(operation.q, operation.propertyPath, operation.value) }) this.sendResponse({ @@ -364,7 +397,7 @@ class StorageProcessor { private async handleDecrement(operation: DecrementOperation) { const res = await this.handleWrite(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value) + return eM.getRepository(this.getEntity(operation.entity)).decrement(operation.q, operation.propertyPath, operation.value) }) this.sendResponse({ @@ -377,7 +410,7 @@ class StorageProcessor { private async handleFindOne(operation: FindOneOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).findOne(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).findOne(operation.q) }) this.sendResponse({ @@ -390,7 +423,7 @@ class StorageProcessor { private async handleFind(operation: FindOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).find(operation.q) + return eM.getRepository(this.getEntity(operation.entity)).find(operation.q) }) this.sendResponse({ @@ -403,7 +436,7 @@ class StorageProcessor { private async handleSum(operation: SumOperation) { const res = await this.handleRead(operation.txId, eM => { - return eM.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q) + return eM.getRepository(this.getEntity(operation.entity)).sum(operation.columnName, operation.q) }) this.sendResponse({ success: true, @@ -415,8 +448,8 @@ class StorageProcessor { private async handleCreateAndSave(operation: CreateAndSaveOperation) { const saved = await this.handleWrite(operation.txId, async eM => { - const res = eM.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) - return eM.getRepository(MainDbEntities[operation.entity]).save(res) + const res = eM.getRepository(this.getEntity(operation.entity)).create(operation.toSave) + return eM.getRepository(this.getEntity(operation.entity)).save(res) }) this.sendResponse({ diff --git a/src/services/storage/transactionsQueue.ts b/src/services/storage/db/transactionsQueue.ts similarity index 95% rename from src/services/storage/transactionsQueue.ts rename to src/services/storage/db/transactionsQueue.ts index e8275119..94c31fcd 100644 --- a/src/services/storage/transactionsQueue.ts +++ b/src/services/storage/db/transactionsQueue.ts @@ -1,5 +1,5 @@ import { DataSource, EntityManager, EntityTarget } from "typeorm" -import { PubLogger, getLogger } from "../helpers/logger.js" +import { PubLogger, getLogger } from "../../helpers/logger.js" type TX = (entityManager: EntityManager | DataSource) => Promise type TxOperation = { diff --git a/src/services/storage/debitStorage.ts b/src/services/storage/debitStorage.ts index 30fff55b..f1ed7357 100644 --- a/src/services/storage/debitStorage.ts +++ b/src/services/storage/debitStorage.ts @@ -1,5 +1,5 @@ import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; type AccessToAdd = { npub: string rules?: DebitAccessRules diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index ad214f5c..e832290c 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -1,18 +1,18 @@ import fs from 'fs' -import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db.js" +import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db/db.js" import ProductStorage from './productStorage.js' import ApplicationStorage from './applicationStorage.js' import UserStorage from "./userStorage.js"; import PaymentStorage from "./paymentStorage.js"; import MetricsStorage from "./metricsStorage.js"; -import MetricsEventStorage from "./metricsEventStorage.js"; +import MetricsEventStorage from "./tlv/metricsEventStorage.js"; import EventsLogManager from "./eventsLog.js"; import { LiquidityStorage } from "./liquidityStorage.js"; import DebitStorage from "./debitStorage.js" import OfferStorage from "./offerStorage.js" -import { StorageInterface, TX } from "./storageInterface.js"; -import { allMetricsMigrations, allMigrations } from "./migrations/runner.js" +import { StorageInterface, TX } from "./db/storageInterface.js"; import { PubLogger } from "../helpers/logger.js" +import { TlvStorageFactory } from './tlv/tlvFilesStorageFactory.js'; export type StorageSettings = { dbSettings: DbSettings eventLogPath: string @@ -40,9 +40,9 @@ export default class { this.settings = settings this.eventsLog = new EventsLogManager(settings.eventLogPath) } - async Connect(log: PubLogger) { + async Connect(log: PubLogger, tlvStorageFactory: TlvStorageFactory) { this.dbs = new StorageInterface() - await this.dbs.Connect(this.settings.dbSettings) + await this.dbs.Connect(this.settings.dbSettings, 'main') //const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations) //this.DB = source //this.txQueue = new TransactionsQueue("main", this.DB) @@ -51,21 +51,21 @@ export default class { this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage) this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage) this.metricsStorage = new MetricsStorage(this.settings) - this.metricsEventStorage = new MetricsEventStorage(this.settings) + this.metricsEventStorage = new MetricsEventStorage(this.settings, tlvStorageFactory) this.liquidityStorage = new LiquidityStorage(this.dbs) this.debitStorage = new DebitStorage(this.dbs) this.offerStorage = new OfferStorage(this.dbs) try { if (this.settings.dataDir) fs.mkdirSync(this.settings.dataDir) } catch (e) { } - const executedMetricsMigrations = await this.metricsStorage.Connect(allMetricsMigrations) + /* const executedMetricsMigrations = */ await this.metricsStorage.Connect() /* if (executedMigrations.length > 0) { log(executedMigrations.length, "new migrations executed") log("-------------------") } */ - if (executedMetricsMigrations.length > 0) { - log(executedMetricsMigrations.length, "new metrics migrations executed") - log("-------------------") - } + /* if (executedMetricsMigrations.length > 0) { + log(executedMetricsMigrations.length, "new metrics migrations executed") + log("-------------------") + } */ } Stop() { diff --git a/src/services/storage/liquidityStorage.ts b/src/services/storage/liquidityStorage.ts index 2c405a41..0a609ed5 100644 --- a/src/services/storage/liquidityStorage.ts +++ b/src/services/storage/liquidityStorage.ts @@ -2,7 +2,7 @@ import { IsNull, MoreThan, Not } from "typeorm" import { LspOrder } from "./entity/LspOrder.js"; import { LndNodeInfo } from "./entity/LndNodeInfo.js"; import { TrackedProvider } from "./entity/TrackedProvider.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export class LiquidityStorage { dbs: StorageInterface constructor(dbs: StorageInterface) { diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index 839ef357..a47b20b3 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -1,59 +1,65 @@ import { Between, DataSource, EntityManager, FindManyOptions, FindOperator, LessThanOrEqual, MoreThanOrEqual } from "typeorm" import { BalanceEvent } from "./entity/BalanceEvent.js" import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" -import TransactionsQueue from "./transactionsQueue.js"; +import TransactionsQueue from "./db/transactionsQueue.js"; import { StorageSettings } from "./index.js"; -import { newMetricsDb } from "./db.js"; +import { newMetricsDb } from "./db/db.js"; import { ChannelRouting } from "./entity/ChannelRouting.js"; import { RootOperation } from "./entity/RootOperation.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { - DB: DataSource | EntityManager + //DB: DataSource | EntityManager settings: StorageSettings - txQueue: TransactionsQueue + dbs: StorageInterface + //txQueue: TransactionsQueue constructor(settings: StorageSettings) { this.settings = settings; } - async Connect(metricsMigrations: Function[]) { - const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) - this.DB = source; - this.txQueue = new TransactionsQueue("metrics", this.DB) - return executedMigrations; + async Connect() { + //const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations) + //this.DB = source; + //this.txQueue = new TransactionsQueue("metrics", this.DB) + this.dbs = new StorageInterface() + await this.dbs.Connect(this.settings.dbSettings, 'metrics') + //return executedMigrations; } async SaveBalanceEvents(balanceEvent: Partial, channelBalanceEvents: Partial[]) { - const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent) - const balanceEntry = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false }) + //const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent) + //const balanceEntry = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false }) + + const balanceEntry = await this.dbs.CreateAndSave('BalanceEvent', balanceEvent) + + //const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) + //const channelsEntries = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false }) + + const channelsEntries = await this.dbs.CreateAndSave('ChannelBalanceEvent', channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) - const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry }))) - const channelsEntries = await this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false }) return { balanceEntry, channelsEntries } } - async GetBalanceEvents({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetBalanceEvents({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - const [chainBalanceEvents] = await Promise.all([ - entityManager.getRepository(BalanceEvent).find(q), - ]) + const chainBalanceEvents = await this.dbs.Find('BalanceEvent', q, txId) return { chainBalanceEvents } } async initChannelRoutingEvent(dayUnix: number, channelId: string) { - const existing = await this.DB.getRepository(ChannelRouting).findOne({ where: { day_unix: dayUnix, channel_id: channelId } }) + const existing = await this.dbs.FindOne('ChannelRouting', { where: { day_unix: dayUnix, channel_id: channelId } }) if (!existing) { - const entry = this.DB.getRepository(ChannelRouting).create({ day_unix: dayUnix, channel_id: channelId }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(ChannelRouting).save(entry), dbTx: false }) + return this.dbs.CreateAndSave('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }) } return existing } - GetChannelRouting({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + GetChannelRouting({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - return entityManager.getRepository(ChannelRouting).find(q) + return this.dbs.Find('ChannelRouting', q, txId) } async GetLatestForwardingIndexOffset() { - const latestIndex = await this.DB.getRepository(ChannelRouting).find({ order: { latest_index_offset: "DESC" }, take: 1 }) + const latestIndex = await this.dbs.Find('ChannelRouting', { order: { latest_index_offset: "DESC" }, take: 1 }) if (latestIndex.length > 0) { return latestIndex[0].latest_index_offset } @@ -63,50 +69,49 @@ export default class { async IncrementChannelRouting(channelId: string, event: Partial) { const dayUnix = getTodayUnix() const existing = await this.initChannelRoutingEvent(dayUnix, channelId) - const repo = this.DB.getRepository(ChannelRouting) + //const repo = this.DB.getRepository(ChannelRouting) if (event.send_errors) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors) } if (event.receive_errors) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors) } if (event.forward_errors_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input) } if (event.forward_errors_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output) } if (event.missed_forward_fee_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input) } if (event.missed_forward_fee_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output) } if (event.forward_fee_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input) } if (event.forward_fee_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output) } if (event.events_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input) } if (event.events_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output) + await this.dbs.Increment('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output) } if (event.latest_index_offset) { - await repo.update(existing.serial_id, { latest_index_offset: event.latest_index_offset }) + await this.dbs.Update('ChannelRouting', existing.serial_id, { latest_index_offset: event.latest_index_offset }) } } - async AddRootOperation(opType: string, id: string, amount: number, entityManager = this.DB) { - const newOp = entityManager.getRepository(RootOperation).create({ operation_type: opType, operation_amount: amount, operation_identifier: id }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(RootOperation).save(newOp), dbTx: false }) + async AddRootOperation(opType: string, id: string, amount: number, txId?: string) { + return this.dbs.CreateAndSave('RootOperation', { operation_type: opType, operation_amount: amount, operation_identifier: id }, txId) } - async GetRootOperations({ from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetRootOperations({ from, to }: { from?: number, to?: number }, txId?: string) { const q = getTimeQuery({ from, to }) - return entityManager.getRepository(RootOperation).find(q) + return this.dbs.Find('RootOperation', q, txId) } } diff --git a/src/services/storage/offerStorage.ts b/src/services/storage/offerStorage.ts index 41712d92..2a921c57 100644 --- a/src/services/storage/offerStorage.ts +++ b/src/services/storage/offerStorage.ts @@ -1,6 +1,6 @@ import crypto from 'crypto'; import { UserOffer } from "./entity/UserOffer.js"; -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index f3a7ce27..465983fb 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -11,9 +11,9 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio import { UserInvoicePayment } from './entity/UserInvoicePayment.js'; import { UserToUserPayment } from './entity/UserToUserPayment.js'; import { Application } from './entity/Application.js'; -import TransactionsQueue from "./transactionsQueue.js"; +import TransactionsQueue from "./db/transactionsQueue.js"; import { LoggedEvent } from './eventsLog.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo, offerId?: string, payerData?: Record } export const defaultInvoiceExpiry = 60 * 60 export default class { diff --git a/src/services/storage/productStorage.ts b/src/services/storage/productStorage.ts index c93be22a..26c386c5 100644 --- a/src/services/storage/productStorage.ts +++ b/src/services/storage/productStorage.ts @@ -1,6 +1,6 @@ import { Product } from "./entity/Product.js" import { User } from "./entity/User.js" -import { StorageInterface } from "./storageInterface.js"; +import { StorageInterface } from "./db/storageInterface.js"; export default class { dbs: StorageInterface constructor(dbs: StorageInterface) { diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/tlv/metricsEventStorage.ts similarity index 84% rename from src/services/storage/metricsEventStorage.ts rename to src/services/storage/tlv/metricsEventStorage.ts index 0fe8db0a..24a8c683 100644 --- a/src/services/storage/metricsEventStorage.ts +++ b/src/services/storage/tlv/metricsEventStorage.ts @@ -1,21 +1,20 @@ import fs from 'fs' -import * as Types from '../../../proto/autogenerated/ts/types.js' -import { StorageSettings } from "./index.js"; -import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'; -import { TlvFilesStorage } from './tlvFilesStorage.js'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { StorageSettings } from "../index.js"; +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js'; +import { TlvStorageFactory, TlvStorageInterface } from './tlvFilesStorageFactory.js'; export default class { - tlvStorage: TlvFilesStorage + tlvStorage: TlvStorageInterface cachePath: string last24hCache: { ts: number, ok: number, fail: number }[] = [] lastPersistedCache: number = 0 - constructor(settings: StorageSettings) { + constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { const metricsPath = [settings.dataDir, "metric_events"].filter(s => !!s).join("/") - this.tlvStorage = new TlvFilesStorage(metricsPath) + this.tlvStorage = tlvStorageFactory.NewStorage({ name: "metrics", path: metricsPath }) this.cachePath = [settings.dataDir, "metric_cache"].filter(s => !!s).join("/") if (!fs.existsSync(this.cachePath)) { fs.mkdirSync(this.cachePath, { recursive: true }); } - this.tlvStorage.initMeta() this.loadCache() setInterval(() => { if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) { @@ -77,7 +76,7 @@ export default class { } LoadLatestMetrics = async (limit = 30): Promise => { - const raw = this.tlvStorage.LoadLatest(limit) + const raw = await this.tlvStorage.LoadLatest(limit) const metrics: Types.UsageMetrics = { apps: {} } Object.keys(raw).forEach(app => { metrics.apps[app] = { app_metrics: {} } @@ -93,7 +92,7 @@ export default class { return metrics } LoadMetricsFile = async (app: string, method: string, chunk: number): Promise => { - const { fileData, chunks } = this.tlvStorage.LoadFile(app, method, chunk) + const { fileData, chunks } = await this.tlvStorage.LoadFile(app, method, chunk) //const tlv = await this.LoadRawMetricsFile(app, method, chunk) const decoded = decodeListTLV(parseTLV(fileData)) return { diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/tlv/stateBundler.ts similarity index 89% rename from src/services/storage/stateBundler.ts rename to src/services/storage/tlv/stateBundler.ts index 99737833..a8b634b4 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/tlv/stateBundler.ts @@ -1,9 +1,9 @@ -import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js" -import { getLogger } from "../helpers/logger.js" -import { decodeListTLV, integerToUint8Array, parseTLV } from "../helpers/tlv.js" -import { StorageSettings } from "./index.js" -import { TlvFilesStorage } from "./tlvFilesStorage.js" -import * as Types from "../../../proto/autogenerated/ts/types.js" +import { LatestBundleMetricReq } from "../../../../proto/autogenerated/ts/types.js" +import { getLogger } from "../../helpers/logger.js" +import { decodeListTLV, integerToUint8Array, parseTLV } from "../../helpers/tlv.js" +import { StorageSettings } from "../index.js" +import * as Types from "../../../../proto/autogenerated/ts/types.js" +import { TlvStorageFactory, TlvStorageInterface } from "./tlvFilesStorageFactory.js" const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const const maxStatePointTypes = ['maxProviderRespTime'] as const @@ -30,14 +30,13 @@ export type TxPointSettings = { } export class StateBundler { - tlvStorage: TlvFilesStorage + tlvStorage: TlvStorageInterface reportLog = getLogger({ component: 'stateBundlerReport' }) prevValues: Record = {} interval: NodeJS.Timeout - constructor(settings: StorageSettings) { + constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) { const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/") - this.tlvStorage = new TlvFilesStorage(bundlerPath) - this.tlvStorage.initMeta() + this.tlvStorage = tlvStorageFactory.NewStorage({ name: "bundler", path: bundlerPath }) this.interval = setInterval(() => { const mem = process.memoryUsage() this.AddValue('_root', 'memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true) @@ -53,7 +52,7 @@ export class StateBundler { } async GetBundleMetrics(req: Types.LatestBundleMetricReq): Promise { - const latest = this.tlvStorage.LoadLatest(req.limit) + const latest = await this.tlvStorage.LoadLatest(req.limit) const metrics: Types.BundleMetrics = { apps: {} } Object.keys(latest).forEach(app => { metrics.apps[app] = { app_bundles: {} } @@ -70,7 +69,7 @@ export class StateBundler { } async GetSingleBundleMetrics(req: Types.SingleMetricReq): Promise { - const { fileData, chunks } = this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page) + const { fileData, chunks } = await this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page) const decoded = decodeListTLV(parseTLV(fileData)) return { current_chunk: req.page, diff --git a/src/services/storage/tlvFilesStorage.ts b/src/services/storage/tlv/tlvFilesStorage.ts similarity index 88% rename from src/services/storage/tlvFilesStorage.ts rename to src/services/storage/tlv/tlvFilesStorage.ts index c4ad58d2..801fcc40 100644 --- a/src/services/storage/tlvFilesStorage.ts +++ b/src/services/storage/tlv/tlvFilesStorage.ts @@ -1,13 +1,14 @@ import fs from 'fs' -import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js' +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js' const chunkSizeBytes = 128 * 1024 export type LatestData = Record> +export type TlvFile = { fileData: Buffer, chunks: number[] } export class TlvFilesStorage { - storagePath: string - lastPersisted: number = 0 - meta: Record> = {} - pending: Record> = {} - metaReady = false + private storagePath: string + private lastPersisted: number = 0 + private meta: Record> = {} + private pending: Record> = {} + private metaReady = false constructor(storagePath: string) { this.storagePath = storagePath if (!fs.existsSync(this.storagePath)) { @@ -24,7 +25,7 @@ export class TlvFilesStorage { }); } - LoadFile = (app: string, dataName: string, chunk: number): { fileData: Buffer, chunks: number[] } => { + LoadFile = (app: string, dataName: string, chunk: number): TlvFile => { if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) { throw new Error("metrics not found") } @@ -71,7 +72,7 @@ export class TlvFilesStorage { return data } - persist = () => { + private persist = () => { if (!this.metaReady) { throw new Error("meta metrics not ready") } @@ -104,28 +105,28 @@ export class TlvFilesStorage { }) } - getMeta = (appId: string, dataName: string) => { + private getMeta = (appId: string, dataName: string) => { if (!this.meta[appId]) { return { chunks: [] } } return this.meta[appId][dataName] || { chunks: [] } } - initMeta = () => { + private initMeta = () => { this.foreachFile((app, dataName, tlvFiles) => { this.updateMeta(app, dataName, tlvFiles) }) this.metaReady = true } - updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { + private updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { if (!this.meta[appId]) { this.meta[appId] = {} } this.meta[appId][dataName] = { chunks: sortedChunks } } - foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { + private foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { if (!fs.existsSync(this.storagePath)) { fs.mkdirSync(this.storagePath, { recursive: true }); } diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts new file mode 100644 index 00000000..810996a9 --- /dev/null +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -0,0 +1,109 @@ +import { fork } from 'child_process'; +import { EventEmitter } from 'events'; +import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor'; +import { LatestData, TlvFile } from './tlvFilesStorage'; + +export type TlvStorageInterface = { + AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise + LoadLatest: (limit?: number) => Promise + LoadFile: (appId: string, dataName: string, chunk: number) => Promise +} + +export class TlvStorageFactory extends EventEmitter { + private process: any; + private isConnected: boolean = false; + private debug: boolean = false; + + constructor() { + super(); + this.initializeSubprocess(); + } + + setDebug(debug: boolean) { + this.debug = debug; + } + + private initializeSubprocess() { + this.process = fork('./build/src/services/storage/tlv/tlvFilesStorageProcessor'); + + this.process.on('message', (response: TlvOperationResponse) => { + this.emit(response.opId, response); + }); + + this.process.on('error', (error: Error) => { + console.error('Tlv Storage processor error:', error); + this.isConnected = false; + }); + + this.process.on('exit', (code: number) => { + console.log(`Tlv Storage processor exited with code ${code}`); + this.isConnected = false; + }); + + this.isConnected = true; + } + + NewStorage(settings: TlvStorageSettings): TlvStorageInterface { + const opId = Math.random().toString() + const connectOp: NewTlvStorageOperation = { type: 'newStorage', opId, settings } + this.handleOp(connectOp) + return { + AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => this.AddTlv(settings.name, appId, dataName, tlv), + LoadLatest: (limit?: number) => this.LoadLatest(settings.name, limit), + LoadFile: (appId: string, dataName: string, chunk: number) => this.LoadFile(settings.name, appId, dataName, chunk) + } + } + + AddTlv(storageName: string, appId: string, dataName: string, tlv: Uint8Array): Promise { + const opId = Math.random().toString() + const connectOp: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, tlv } + return this.handleOp(connectOp) + } + + LoadLatest(storageName: string, limit?: number): Promise { + const opId = Math.random().toString() + const connectOp: LoadLatestTlvOperation = { type: 'loadLatestTlv', opId, storageName, limit } + return this.handleOp(connectOp) + } + + LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise { + const opId = Math.random().toString() + const connectOp: LoadTlvFileOperation = { type: 'loadTlvFile', opId, storageName, appId, dataName, chunk } + return this.handleOp(connectOp) + } + + private handleOp(op: ITlvStorageOperation): Promise { + if (this.debug) console.log('handleOp', op) + this.checkConnected() + return new Promise((resolve, reject) => { + const responseHandler = (response: TlvOperationResponse) => { + if (this.debug) console.log('tlv responseHandler', response) + if (!response.success) { + reject(new Error(response.error)); + return + } + if (response.type !== op.type) { + reject(new Error('Invalid response type')); + return + } + resolve(response.data); + } + this.once(op.opId, responseHandler) + this.process.send(op) + }) + } + + private checkConnected() { + if (!this.isConnected) { + throw new Error('Tlv Storage processor is not connected'); + } + } + + public disconnect() { + if (this.process) { + this.process.kill(); + this.isConnected = false; + this.debug = false; + } + } +} \ No newline at end of file diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts new file mode 100644 index 00000000..ef1d3f28 --- /dev/null +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -0,0 +1,186 @@ +import { PubLogger, getLogger } from '../../helpers/logger.js'; +import { TlvFilesStorage } from './tlvFilesStorage.js'; + +export type TlvStorageSettings = { + path: string + name: string +} + +export type NewTlvStorageOperation = { + type: 'newStorage' + opId: string + settings: TlvStorageSettings + debug?: boolean +} + +export type AddTlvOperation = { + type: 'addTlv' + opId: string + storageName: string + appId: string + dataName: string + tlv: Uint8Array + debug?: boolean +} + +export type LoadLatestTlvOperation = { + type: 'loadLatestTlv' + opId: string + storageName: string + limit?: number + debug?: boolean +} + +export type LoadTlvFileOperation = { + type: 'loadTlvFile' + opId: string + storageName: string + appId: string + dataName: string + chunk: number + debug?: boolean +} + +export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } + +export interface ITlvStorageOperation { + opId: string + type: string + debug?: boolean +} + +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation + +export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } +export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse + +class TlvFilesStorageProcessor { + private log: PubLogger = console.log + private storages: Record = {} + constructor() { + if (!process.send) { + throw new Error('This process must be spawned as a child process'); + } + process.on('message', (operation: TlvStorageOperation) => { + this.handleOperation(operation); + }); + + process.on('error', (error: Error) => { + console.error('Error in storage processor:', error); + }); + } + + private async handleOperation(operation: TlvStorageOperation) { + try { + const opId = operation.opId; + switch (operation.type) { + case 'newStorage': + await this.handleNewStorage(operation); + break; + case 'addTlv': + await this.handleAddTlv(operation); + break; + case 'loadLatestTlv': + await this.handleLoadLatestTlv(operation); + break; + case 'loadTlvFile': + await this.handleLoadTlvFile(operation); + break; + default: + this.sendResponse({ + success: false, + error: `Unknown operation type: ${(operation as any).type}`, + opId + }) + return + } + } catch (error) { + this.sendResponse({ + success: false, + error: error instanceof Error ? error.message : 'Unknown error occurred', + opId: operation.opId + }); + } + } + + private async handleNewStorage(operation: NewTlvStorageOperation) { + if (this.storages[operation.settings.name]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.settings.name} already exists`, + opId: operation.opId + }) + return + } + this.storages[operation.settings.name] = new TlvFilesStorage(operation.settings.path) + this.sendResponse({ + success: true, + type: 'newStorage', + data: null, + opId: operation.opId + }); + } + + private async handleAddTlv(operation: AddTlvOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + this.storages[operation.storageName].AddTlv(operation.appId, operation.dataName, operation.tlv) + this.sendResponse({ + success: true, + type: 'addTlv', + data: null, + opId: operation.opId + }); + } + + private async handleLoadLatestTlv(operation: LoadLatestTlvOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + const data = this.storages[operation.storageName].LoadLatest(operation.limit) + this.sendResponse({ + success: true, + type: 'loadLatest', + data: data, + opId: operation.opId + }); + } + + private async handleLoadTlvFile(operation: LoadTlvFileOperation) { + if (!this.storages[operation.storageName]) { + this.sendResponse({ + success: false, + error: `Storage ${operation.storageName} does not exist`, + opId: operation.opId + }) + return + } + const data = this.storages[operation.storageName].LoadFile(operation.appId, operation.dataName, operation.chunk) + this.sendResponse({ + success: true, + type: 'loadFile', + data: data, + opId: operation.opId + }); + } + + private sendResponse(response: TlvOperationResponse) { + if (process.send) { + process.send(response); + } + } +} + +// Start the storage processor +new TlvFilesStorageProcessor(); diff --git a/src/services/storage/userStorage.ts b/src/services/storage/userStorage.ts index b35d40ae..f4ed6fda 100644 --- a/src/services/storage/userStorage.ts +++ b/src/services/storage/userStorage.ts @@ -3,7 +3,7 @@ import { User } from './entity/User.js'; import { UserBasicAuth } from './entity/UserBasicAuth.js'; import { getLogger } from '../helpers/logger.js'; import EventsLogManager from './eventsLog.js'; -import { StorageInterface } from './storageInterface.js'; +import { StorageInterface } from './db/storageInterface.js'; export default class { dbs: StorageInterface eventsLog: EventsLogManager diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 6aab22f1..ef23a772 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -6,12 +6,12 @@ import * as Types from '../../../proto/autogenerated/ts/types.js' import { NostrSend, SendData, SendInitiator } from "../nostr/handler.js" import { encodeTLbV, encodeTLV, encodeTLVDataPacket } from '../helpers/tlv.js' import { Utils } from '../helpers/utilsWrapper.js' -import { TlvFilesStorage } from '../storage/tlvFilesStorage.js' +import { TlvFilesStorage } from '../storage/tlv/tlvFilesStorage.js' +import { TlvStorageInterface } from '../storage/tlv/tlvFilesStorageFactory.js' type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number } const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] } type UserInfo = { userPub: string, appId: string } export default class webRTC { - private storage: Storage private log = getLogger({ component: 'webRTC' }) private connections: Record = {} @@ -94,7 +94,7 @@ export default class webRTC { this.log(ERROR, 'SingleUsageMetricReqValidate', err) return } - let tlvStorage: TlvFilesStorage + let tlvStorage: TlvStorageInterface switch (j.metric_type) { case Types.SingleMetricType.USAGE_METRIC: tlvStorage = this.storage.metricsEventStorage.tlvStorage @@ -105,7 +105,7 @@ export default class webRTC { default: throw new Error("Unknown metric type") } - const { fileData } = tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) + const { fileData } = await tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 const packets: Buffer[] = [] diff --git a/src/tests/networkSetup.ts b/src/tests/networkSetup.ts index 487450a1..ea38131f 100644 --- a/src/tests/networkSetup.ts +++ b/src/tests/networkSetup.ts @@ -3,13 +3,14 @@ import { BitcoinCoreWrapper } from "./bitcoinCore.js" import LND from '../services/lnd/lnd.js' import { LiquidityProvider } from "../services/main/liquidityProvider.js" import { Utils } from "../services/helpers/utilsWrapper.js" - +import { TlvStorageFactory } from "../services/storage/tlv/tlvFilesStorageFactory.js" export const setupNetwork = async () => { const settings = LoadTestSettingsFromEnv() const core = new BitcoinCoreWrapper(settings) await core.InitAddress() await core.Mine(1) - const setupUtils = new Utils(settings) + const tlvStorageFactory = new TlvStorageFactory() + const setupUtils = new Utils(settings, tlvStorageFactory) const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { }) await tryUntil(async i => { diff --git a/src/tests/testBase.ts b/src/tests/testBase.ts index a3b716b7..d60889a9 100644 --- a/src/tests/testBase.ts +++ b/src/tests/testBase.ts @@ -13,6 +13,7 @@ import { getLogger, resetDisabledLoggers } from '../services/helpers/logger.js' import { LiquidityProvider } from '../services/main/liquidityProvider.js' import { Utils } from '../services/helpers/utilsWrapper.js' import { AdminManager } from '../services/main/adminManager.js' +import { TlvStorageFactory } from '../services/storage/tlv/tlvFilesStorageFactory.js' chai.use(chaiString) export const expect = chai.expect export type Describe = (message: string, failure?: boolean) => void @@ -44,7 +45,8 @@ export type StorageTestBase = { export const setupStorageTest = async (d: Describe): Promise => { const settings = GetTestStorageSettings() const storageManager = new Storage(settings) - await storageManager.Connect(console.log) + const tlvStorageFactory = new TlvStorageFactory() + await storageManager.Connect(console.log, tlvStorageFactory) return { expect, storage: storageManager, @@ -69,7 +71,8 @@ export const SetupTest = async (d: Describe): Promise => { const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId } const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId } - const extermnalUtils = new Utils(settings) + const tlvStorageFactory = new TlvStorageFactory() + const extermnalUtils = new Utils(settings, tlvStorageFactory) const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { }) await externalAccessToMainLnd.Warmup()