Merge pull request #795 from shocknet/metrics-subp

Metrics subp
This commit is contained in:
Justin (shocknet) 2025-04-05 15:13:22 -04:00 committed by GitHub
commit 11d10d4476
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 813 additions and 225 deletions

3
.gitignore vendored
View file

@ -22,3 +22,6 @@ app.nprofile
admin.connect
debug.txt
proto/autogenerated/debug.txt
metrics_cache/
metrics_events/
bundler_events/

View file

@ -38,7 +38,7 @@ export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSett
},
logger: { log: console.log, error: err => log(ERROR, err) },
})
const nostr = new Nostr(nostrSettings, event => {
const nostr = new Nostr(nostrSettings, mainHandler.utils, event => {
let j: NostrRequest
try {
j = JSON.parse(event.content)

View file

@ -1,16 +1,34 @@
import { MainSettings } from "../main/settings.js";
import { StateBundler } from "../storage/stateBundler.js";
import { StateBundler } from "../storage/tlv/stateBundler.js";
import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js";
import { NostrSend } from "../nostr/handler.js";
import { ProcessMetricsCollector } from "../storage/tlv/processMetricsCollector.js";
type UtilsSettings = {
noCollector?: boolean
dataDir: string
}
export class Utils {
tlvStorageFactory: TlvStorageFactory
stateBundler: StateBundler
settings: MainSettings
constructor(settings: MainSettings) {
this.settings = settings
this.stateBundler = new StateBundler(settings.storageSettings)
_nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') }
constructor({ noCollector, dataDir }: UtilsSettings) {
this.tlvStorageFactory = new TlvStorageFactory()
this.stateBundler = new StateBundler(dataDir, this.tlvStorageFactory)
if (!noCollector) {
new ProcessMetricsCollector((metrics) => {
this.tlvStorageFactory.ProcessMetrics(metrics, '')
})
}
}
attachNostrSend(f: NostrSend) {
this._nostrSend = f
this.tlvStorageFactory.attachNostrSend(f)
}
Stop() {
this.stateBundler.Stop()
this.tlvStorageFactory.disconnect()
}
}

View file

@ -18,7 +18,7 @@ import { ERROR, getLogger } from '../helpers/logger.js';
import { HtlcEvent_EventType } from '../../../proto/lnd/router.js';
import { LiquidityProvider, LiquidityRequest } from '../main/liquidityProvider.js';
import { Utils } from '../helpers/utilsWrapper.js';
import { TxPointSettings } from '../storage/stateBundler.js';
import { TxPointSettings } from '../storage/tlv/stateBundler.js';
import { WalletKitClient } from '../../../proto/lnd/walletkit.client.js';
const DeadLineMetadata = (deadline = 10 * 1000) => ({ deadline: Date.now() + deadline })
const deadLndRetrySeconds = 5

View file

@ -186,7 +186,7 @@ export class DebitManager {
this.notifyPaymentSuccess(appUser, debitRes, op, { appId: ctx.app_id, pub: req.npub, id: req.request_id })
return
default:
throw new Error("invalid response type")
throw new Error("invalid debit response type")
}
}

View file

@ -55,7 +55,7 @@ export default class {
utils: Utils
rugPullTracker: RugPullTracker
unlocker: Unlocker
webRTC: webRTC
//webRTC: webRTC
nostrSend: NostrSend = () => { getLogger({})("nostr send not initialized yet") }
constructor(settings: MainSettings, storage: Storage, adminManager: AdminManager, utils: Utils, unlocker: Unlocker) {
this.settings = settings
@ -76,7 +76,7 @@ export default class {
this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager)
this.debitManager = new DebitManager(this.storage, this.lnd, this.applicationManager)
this.offerManager = new OfferManager(this.storage, this.lnd, this.applicationManager, this.productManager)
this.webRTC = new webRTC(this.storage, this.utils)
//this.webRTC = new webRTC(this.storage, this.utils)
}
@ -99,7 +99,8 @@ export default class {
this.liquidityProvider.attachNostrSend(f)
this.debitManager.attachNostrSend(f)
this.offerManager.attachNostrSend(f)
this.webRTC.attachNostrSend(f)
this.utils.attachNostrSend(f)
//this.webRTC.attachNostrSend(f)
}
htlcCb: HtlcCb = (e) => {

View file

@ -9,6 +9,7 @@ import { LoadMainSettingsFromEnv, MainSettings } from "./settings.js"
import { Utils } from "../helpers/utilsWrapper.js"
import { Wizard } from "../wizard/index.js"
import { AdminManager } from "./adminManager.js"
import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"
export type AppData = {
privateKey: string;
publicKey: string;
@ -16,8 +17,8 @@ export type AppData = {
name: string;
}
export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => {
const utils = new Utils(mainSettings)
const storageManager = new Storage(mainSettings.storageSettings)
const utils = new Utils({ dataDir: mainSettings.storageSettings.dataDir })
const storageManager = new Storage(mainSettings.storageSettings, utils)
await storageManager.Connect(log)
/* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2])
if (manualMigration) {

View file

@ -6,6 +6,7 @@ import { SimplePool, Event, UnsignedEvent, getEventHash, finalizeEvent, Relay, n
import { ERROR, getLogger } from '../helpers/logger.js'
import { nip19 } from 'nostr-tools'
import { encrypt as encryptV1, decrypt as decryptV1, getSharedSecret as getConversationKeyV1 } from './nip44v1.js'
import { ProcessMetrics, ProcessMetricsCollector } from '../storage/tlv/processMetricsCollector.js'
const { nprofileEncode } = nip19
const { v2 } = nip44
const { encrypt: encryptV2, decrypt: decryptV2, utils } = v2
@ -50,9 +51,13 @@ type EventResponse = {
type: 'event'
event: NostrEvent
}
type ProcessMetricsResponse = {
type: 'processMetrics'
metrics: ProcessMetrics
}
export type ChildProcessRequest = SettingsRequest | SendRequest
export type ChildProcessResponse = ReadyResponse | EventResponse
export type ChildProcessResponse = ReadyResponse | EventResponse | ProcessMetricsResponse
const send = (message: ChildProcessResponse) => {
if (process.send) {
process.send(message, undefined, undefined, err => {
@ -88,6 +93,13 @@ const initSubprocessHandler = (settings: NostrSettings) => {
event: event
})
})
new ProcessMetricsCollector((metrics) => {
send({
type: 'processMetrics',
metrics
})
})
}
const sendToNostr: NostrSend = (initiator, data, relays) => {
if (!subProcessHandler) {

View file

@ -1,6 +1,7 @@
import { ChildProcess, fork } from 'child_process'
import { EnvMustBeNonEmptyString } from "../helpers/envParser.js"
import { NostrSettings, NostrEvent, ChildProcessRequest, ChildProcessResponse, SendData, SendInitiator } from "./handler.js"
import { Utils } from '../helpers/utilsWrapper.js'
type EventCallback = (event: NostrEvent) => void
const getEnvOrDefault = (name: string, defaultValue: string): string => {
@ -17,7 +18,9 @@ export const LoadNosrtSettingsFromEnv = (test = false) => {
export default class NostrSubprocess {
settings: NostrSettings
childProcess: ChildProcess
constructor(settings: NostrSettings, eventCallback: EventCallback) {
utils: Utils
constructor(settings: NostrSettings, utils: Utils, eventCallback: EventCallback) {
this.utils = utils
this.childProcess = fork("./build/src/services/nostr/handler")
this.childProcess.on("error", console.error)
this.childProcess.on("message", (message: ChildProcessResponse) => {
@ -28,6 +31,9 @@ export default class NostrSubprocess {
case 'event':
eventCallback(message.event)
break
case 'processMetrics':
this.utils.tlvStorageFactory.ProcessMetrics(message.metrics, 'nostr')
break
default:
console.error("unknown nostr event response", message)
break;

View file

@ -11,7 +11,8 @@ export default (mainHandler: Main): Types.ServerMethods => {
offer_CustomCheck: offer => offer !== '',
}
})
return mainHandler.webRTC.OnMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message)
if (err != null) throw new Error(err.message)
return mainHandler.utils.tlvStorageFactory.WebRtcMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message)
},
SubToWebRtcCandidates: async ({ ctx }) => { },
GetUsageMetrics: async ({ ctx, req }) => {

View file

@ -7,7 +7,7 @@ import { ApplicationUser } from './entity/ApplicationUser.js';
import { getLogger } from '../helpers/logger.js';
import { User } from './entity/User.js';
import { InviteToken } from './entity/InviteToken.js';
import { StorageInterface } from './storageInterface.js';
import { StorageInterface } from './db/storageInterface.js';
export default class {
dbs: StorageInterface
userStorage: UserStorage

View file

@ -1,29 +1,29 @@
import "reflect-metadata"
import { DataSource, Migration } from "typeorm"
import { AddressReceivingTransaction } from "./entity/AddressReceivingTransaction.js"
import { User } from "./entity/User.js"
import { UserReceivingAddress } from "./entity/UserReceivingAddress.js"
import { UserReceivingInvoice } from "./entity/UserReceivingInvoice.js"
import { UserInvoicePayment } from "./entity/UserInvoicePayment.js"
import { EnvMustBeNonEmptyString } from "../helpers/envParser.js"
import { UserTransactionPayment } from "./entity/UserTransactionPayment.js"
import { UserBasicAuth } from "./entity/UserBasicAuth.js"
import { UserEphemeralKey } from "./entity/UserEphemeralKey.js"
import { UserToUserPayment } from "./entity/UserToUserPayment.js"
import { Application } from "./entity/Application.js"
import { ApplicationUser } from "./entity/ApplicationUser.js"
import { BalanceEvent } from "./entity/BalanceEvent.js"
import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js"
import { getLogger } from "../helpers/logger.js"
import { ChannelRouting } from "./entity/ChannelRouting.js"
import { LspOrder } from "./entity/LspOrder.js"
import { Product } from "./entity/Product.js"
import { LndNodeInfo } from "./entity/LndNodeInfo.js"
import { TrackedProvider } from "./entity/TrackedProvider.js"
import { InviteToken } from "./entity/InviteToken.js"
import { DebitAccess } from "./entity/DebitAccess.js"
import { RootOperation } from "./entity/RootOperation.js"
import { UserOffer } from "./entity/UserOffer.js"
import { AddressReceivingTransaction } from "../entity/AddressReceivingTransaction.js"
import { User } from "../entity/User.js"
import { UserReceivingAddress } from "../entity/UserReceivingAddress.js"
import { UserReceivingInvoice } from "../entity/UserReceivingInvoice.js"
import { UserInvoicePayment } from "../entity/UserInvoicePayment.js"
import { EnvMustBeNonEmptyString } from "../../helpers/envParser.js"
import { UserTransactionPayment } from "../entity/UserTransactionPayment.js"
import { UserBasicAuth } from "../entity/UserBasicAuth.js"
import { UserEphemeralKey } from "../entity/UserEphemeralKey.js"
import { UserToUserPayment } from "../entity/UserToUserPayment.js"
import { Application } from "../entity/Application.js"
import { ApplicationUser } from "../entity/ApplicationUser.js"
import { BalanceEvent } from "../entity/BalanceEvent.js"
import { ChannelBalanceEvent } from "../entity/ChannelsBalanceEvent.js"
import { getLogger } from "../../helpers/logger.js"
import { ChannelRouting } from "../entity/ChannelRouting.js"
import { LspOrder } from "../entity/LspOrder.js"
import { Product } from "../entity/Product.js"
import { LndNodeInfo } from "../entity/LndNodeInfo.js"
import { TrackedProvider } from "../entity/TrackedProvider.js"
import { InviteToken } from "../entity/InviteToken.js"
import { DebitAccess } from "../entity/DebitAccess.js"
import { RootOperation } from "../entity/RootOperation.js"
import { UserOffer } from "../entity/UserOffer.js"
export type DbSettings = {
@ -70,7 +70,7 @@ export const MainDbEntities = {
export type MainDbNames = keyof typeof MainDbEntities
export const MainDbEntitiesNames = Object.keys(MainDbEntities)
const MetricsDbEntities = {
export const MetricsDbEntities = {
'BalanceEvent': BalanceEvent,
'ChannelBalanceEvent': ChannelBalanceEvent,
'ChannelRouting': ChannelRouting,

View file

@ -1,4 +1,4 @@
import { fork } from 'child_process';
import { ChildProcess, fork } from 'child_process';
import { EventEmitter } from 'events';
import { DbSettings, MainDbNames } from './db.js';
import { DeepPartial, FindOptionsWhere } from 'typeorm';
@ -10,32 +10,48 @@ import {
IncrementOperation,
DecrementOperation,
SumOperation,
DBNames,
SuccessOperationResponse,
} from './storageProcessor.js';
import { PickKeysByType } from 'typeorm/common/PickKeysByType.js';
import { serializeRequest, WhereCondition } from './serializationHelpers.js';
import { Utils } from '../../helpers/utilsWrapper.js';
import { ProcessMetrics } from '../tlv/processMetricsCollector.js';
export type TX<T> = (txId: string) => Promise<T>
export class StorageInterface extends EventEmitter {
private process: any;
private process: ChildProcess;
private isConnected: boolean = false;
private debug: boolean = false;
private utils: Utils
private dbType: 'main' | 'metrics'
constructor() {
constructor(utils: Utils) {
super();
this.initializeSubprocess();
this.utils = utils
}
setDebug(debug: boolean) {
this.debug = debug;
}
private handleCollectedProcessMetrics(metrics: SuccessOperationResponse<ProcessMetrics>) {
if (!this.dbType) return
this.utils.tlvStorageFactory.ProcessMetrics(metrics.data, this.dbType + '_storage')
}
private initializeSubprocess() {
this.process = fork('./build/src/services/storage/storageProcessor');
this.process = fork('./build/src/services/storage/db/storageProcessor');
this.process.on('message', (response: OperationResponse<any>) => {
this.emit(response.opId, response);
if (response.success && response.type === 'processMetrics') {
this.handleCollectedProcessMetrics(response)
} else {
this.emit(response.opId, response);
}
});
this.process.on('error', (error: Error) => {
@ -51,61 +67,62 @@ export class StorageInterface extends EventEmitter {
this.isConnected = true;
}
Connect(settings: DbSettings): Promise<number> {
Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise<number> {
const opId = Math.random().toString()
const connectOp: ConnectOperation = { type: 'connect', opId, settings }
this.dbType = dbType
const connectOp: ConnectOperation = { type: 'connect', opId, settings, dbType }
return this.handleOp<number>(connectOp)
}
Delete<T>(entity: MainDbNames, q: number | FindOptionsWhere<T>, txId?: string): Promise<number> {
Delete<T>(entity: DBNames, q: number | FindOptionsWhere<T>, txId?: string): Promise<number> {
const opId = Math.random().toString()
const deleteOp: DeleteOperation<T> = { type: 'delete', entity, opId, q, txId }
return this.handleOp<number>(deleteOp)
}
Remove<T>(entity: MainDbNames, q: T, txId?: string): Promise<T> {
Remove<T>(entity: DBNames, q: T, txId?: string): Promise<T> {
const opId = Math.random().toString()
const removeOp: RemoveOperation<T> = { type: 'remove', entity, opId, q, txId }
return this.handleOp<T>(removeOp)
}
FindOne<T>(entity: MainDbNames, q: QueryOptions<T>, txId?: string): Promise<T | null> {
FindOne<T>(entity: DBNames, q: QueryOptions<T>, txId?: string): Promise<T | null> {
const opId = Math.random().toString()
const findOp: FindOneOperation<T> = { type: 'findOne', entity, opId, q, txId }
return this.handleOp<T | null>(findOp)
}
Find<T>(entity: MainDbNames, q: QueryOptions<T>, txId?: string): Promise<T[]> {
Find<T>(entity: DBNames, q: QueryOptions<T>, txId?: string): Promise<T[]> {
const opId = Math.random().toString()
const findOp: FindOperation<T> = { type: 'find', entity, opId, q, txId }
return this.handleOp<T[]>(findOp)
}
Sum<T>(entity: MainDbNames, columnName: PickKeysByType<T, number>, q: WhereCondition<T>, txId?: string): Promise<number> {
Sum<T>(entity: DBNames, columnName: PickKeysByType<T, number>, q: WhereCondition<T>, txId?: string): Promise<number> {
const opId = Math.random().toString()
const sumOp: SumOperation<T> = { type: 'sum', entity, opId, columnName, q, txId }
return this.handleOp<number>(sumOp)
}
Update<T>(entity: MainDbNames, q: number | FindOptionsWhere<T>, toUpdate: DeepPartial<T>, txId?: string): Promise<number> {
Update<T>(entity: DBNames, q: number | FindOptionsWhere<T>, toUpdate: DeepPartial<T>, txId?: string): Promise<number> {
const opId = Math.random().toString()
const updateOp: UpdateOperation<T> = { type: 'update', entity, opId, toUpdate, q, txId }
return this.handleOp<number>(updateOp)
}
Increment<T>(entity: MainDbNames, q: FindOptionsWhere<T>, propertyPath: string, value: number | string, txId?: string): Promise<number> {
Increment<T>(entity: DBNames, q: FindOptionsWhere<T>, propertyPath: string, value: number | string, txId?: string): Promise<number> {
const opId = Math.random().toString()
const incrementOp: IncrementOperation<T> = { type: 'increment', entity, opId, q, propertyPath, value, txId }
return this.handleOp<number>(incrementOp)
}
Decrement<T>(entity: MainDbNames, q: FindOptionsWhere<T>, propertyPath: string, value: number | string, txId?: string): Promise<number> {
Decrement<T>(entity: DBNames, q: FindOptionsWhere<T>, propertyPath: string, value: number | string, txId?: string): Promise<number> {
const opId = Math.random().toString()
const decrementOp: DecrementOperation<T> = { type: 'decrement', entity, opId, q, propertyPath, value, txId }
return this.handleOp<number>(decrementOp)
}
CreateAndSave<T>(entity: MainDbNames, toSave: DeepPartial<T>, txId?: string): Promise<T> {
CreateAndSave<T>(entity: DBNames, toSave: DeepPartial<T>, txId?: string): Promise<T> {
const opId = Math.random().toString()
const createAndSaveOp: CreateAndSaveOperation<T> = { type: 'createAndSave', entity, opId, toSave, txId }
return this.handleOp<T>(createAndSaveOp)
@ -147,7 +164,7 @@ export class StorageInterface extends EventEmitter {
return
}
if (response.type !== op.type) {
reject(new Error('Invalid response type'));
reject(new Error('Invalid storage response type'));
return
}
resolve(response.data);

View file

@ -1,12 +1,13 @@
import { DataSource, EntityManager, DeepPartial, FindOptionsWhere, FindOptionsOrder, FindOperator } from 'typeorm';
import NewDB, { DbSettings, MainDbEntities, MainDbNames, newMetricsDb } from './db.js';
import { PubLogger, getLogger } from '../helpers/logger.js';
import { allMetricsMigrations, allMigrations } from './migrations/runner.js';
import NewDB, { DbSettings, MainDbEntities, MainDbNames, MetricsDbEntities, MetricsDbNames, newMetricsDb } from './db.js';
import { PubLogger, getLogger } from '../../helpers/logger.js';
import { allMetricsMigrations, allMigrations } from '../migrations/runner.js';
import transactionsQueue from './transactionsQueue.js';
import { PickKeysByType } from 'typeorm/common/PickKeysByType';
import { deserializeRequest, WhereCondition } from './serializationHelpers.js';
import { ProcessMetricsCollector } from '../tlv/processMetricsCollector.js';
export type DBNames = MainDbNames | MetricsDbNames
export type QueryOptions<T> = {
where?: WhereCondition<T>
order?: FindOptionsOrder<T>
@ -17,6 +18,7 @@ export type ConnectOperation = {
type: 'connect'
opId: string
settings: DbSettings
dbType: 'main' | 'metrics'
debug?: boolean
}
@ -36,7 +38,7 @@ export type EndTxOperation<T> = {
export type DeleteOperation<T> = {
type: 'delete'
entity: MainDbNames
entity: DBNames
opId: string
q: number | FindOptionsWhere<T>
txId?: string
@ -45,7 +47,7 @@ export type DeleteOperation<T> = {
export type RemoveOperation<T> = {
type: 'remove'
entity: MainDbNames
entity: DBNames
opId: string
q: T
txId?: string
@ -54,7 +56,7 @@ export type RemoveOperation<T> = {
export type UpdateOperation<T> = {
type: 'update'
entity: MainDbNames
entity: DBNames
opId: string
toUpdate: DeepPartial<T>
q: number | FindOptionsWhere<T>
@ -64,7 +66,7 @@ export type UpdateOperation<T> = {
export type IncrementOperation<T> = {
type: 'increment'
entity: MainDbNames
entity: DBNames
opId: string
q: FindOptionsWhere<T>
propertyPath: string,
@ -75,7 +77,7 @@ export type IncrementOperation<T> = {
export type DecrementOperation<T> = {
type: 'decrement'
entity: MainDbNames
entity: DBNames
opId: string
q: FindOptionsWhere<T>
propertyPath: string,
@ -86,7 +88,7 @@ export type DecrementOperation<T> = {
export type FindOneOperation<T> = {
type: 'findOne'
entity: MainDbNames
entity: DBNames
opId: string
q: QueryOptions<T>
txId?: string
@ -95,7 +97,7 @@ export type FindOneOperation<T> = {
export type FindOperation<T> = {
type: 'find'
entity: MainDbNames
entity: DBNames
opId: string
q: QueryOptions<T>
txId?: string
@ -104,7 +106,7 @@ export type FindOperation<T> = {
export type SumOperation<T> = {
type: 'sum'
entity: MainDbNames
entity: DBNames
opId: string
columnName: PickKeysByType<T, number>
q: WhereCondition<T>
@ -114,7 +116,7 @@ export type SumOperation<T> = {
export type CreateAndSaveOperation<T> = {
type: 'createAndSave'
entity: MainDbNames
entity: DBNames
opId: string
toSave: DeepPartial<T>
txId?: string
@ -150,12 +152,12 @@ class StorageProcessor {
//private locked: boolean = false
private activeTransaction: ActiveTransaction | null = null
//private queue: StartTxOperation[] = []
private mode: 'main' | 'metrics' | '' = ''
constructor() {
if (!process.send) {
throw new Error('This process must be spawned as a child process');
}
this.log = getLogger({ component: 'StorageProcessor' })
process.on('message', (operation: StorageOperation<any>) => {
this.handleOperation(operation);
});
@ -163,6 +165,15 @@ class StorageProcessor {
process.on('error', (error: Error) => {
console.error('Error in storage processor:', error);
});
new ProcessMetricsCollector((pMetrics) => {
this.sendResponse({
success: true,
type: 'processMetrics',
data: pMetrics,
opId: Math.random().toString()
})
})
}
private async handleOperation(operation: StorageOperation<any>) {
@ -229,17 +240,38 @@ class StorageProcessor {
}
private async handleConnect(operation: ConnectOperation) {
const { source, executedMigrations } = await NewDB(operation.settings, allMigrations)
this.DB = source
this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB)
if (executedMigrations.length > 0) {
this.log(executedMigrations.length, "new migrations executed")
this.log("-------------------")
let migrationsExecuted = 0
if (this.mode !== '') {
throw new Error('Already connected to a database')
}
this.log = getLogger({ component: 'StorageProcessor_' + operation.dbType })
if (operation.dbType === 'main') {
const { source, executedMigrations } = await NewDB(operation.settings, allMigrations)
this.DB = source
this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB)
migrationsExecuted = executedMigrations.length
if (executedMigrations.length > 0) {
this.log(executedMigrations.length, "new migrations executed")
this.log("-------------------")
}
this.mode = 'main'
} else if (operation.dbType === 'metrics') {
const { source, executedMigrations } = await newMetricsDb(operation.settings, allMetricsMigrations)
this.DB = source
this.txQueue = new transactionsQueue('MetricsStorageProcessorQueue', this.DB)
migrationsExecuted = executedMigrations.length
if (executedMigrations.length > 0) {
this.log(executedMigrations.length, "new metrics migrations executed")
this.log("-------------------")
}
this.mode = 'metrics'
} else {
throw new Error('Unknown database type')
}
this.sendResponse({
success: true,
type: 'connect',
data: executedMigrations.length,
data: migrationsExecuted,
opId: operation.opId
});
}
@ -303,17 +335,28 @@ class StorageProcessor {
return this.activeTransaction.manager
}
private getManager(txId?: string): DataSource | EntityManager {
if (txId) {
return this.getTx(txId)
private getEntity(entityName: DBNames) {
if (this.mode === 'main') {
const e = MainDbEntities[entityName as MainDbNames]
if (!e) {
throw new Error(`Unknown entity type for main db: ${entityName}`)
}
return e
} else if (this.mode === 'metrics') {
const e = MetricsDbEntities[entityName as MetricsDbNames]
if (!e) {
throw new Error(`Unknown entity type for metrics db: ${entityName}`)
}
return e
} else {
throw new Error('Unknown database mode')
}
return this.DB
}
private async handleDelete(operation: DeleteOperation<any>) {
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).delete(operation.q)
return eM.getRepository(this.getEntity(operation.entity)).delete(operation.q)
})
this.sendResponse({
success: true,
@ -325,7 +368,7 @@ class StorageProcessor {
private async handleRemove(operation: RemoveOperation<any>) {
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).remove(operation.q)
return eM.getRepository(this.getEntity(operation.entity)).remove(operation.q)
})
this.sendResponse({
@ -338,7 +381,7 @@ class StorageProcessor {
private async handleUpdate(operation: UpdateOperation<any>) {
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate)
return eM.getRepository(this.getEntity(operation.entity)).update(operation.q, operation.toUpdate)
})
this.sendResponse({
@ -351,7 +394,7 @@ class StorageProcessor {
private async handleIncrement(operation: IncrementOperation<any>) {
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value)
return eM.getRepository(this.getEntity(operation.entity)).increment(operation.q, operation.propertyPath, operation.value)
})
this.sendResponse({
@ -364,7 +407,7 @@ class StorageProcessor {
private async handleDecrement(operation: DecrementOperation<any>) {
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value)
return eM.getRepository(this.getEntity(operation.entity)).decrement(operation.q, operation.propertyPath, operation.value)
})
this.sendResponse({
@ -377,7 +420,7 @@ class StorageProcessor {
private async handleFindOne(operation: FindOneOperation<any>) {
const res = await this.handleRead(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).findOne(operation.q)
return eM.getRepository(this.getEntity(operation.entity)).findOne(operation.q)
})
this.sendResponse({
@ -390,7 +433,7 @@ class StorageProcessor {
private async handleFind(operation: FindOperation<any>) {
const res = await this.handleRead(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).find(operation.q)
return eM.getRepository(this.getEntity(operation.entity)).find(operation.q)
})
this.sendResponse({
@ -403,7 +446,7 @@ class StorageProcessor {
private async handleSum(operation: SumOperation<object>) {
const res = await this.handleRead(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q)
return eM.getRepository(this.getEntity(operation.entity)).sum(operation.columnName, operation.q)
})
this.sendResponse({
success: true,
@ -415,8 +458,8 @@ class StorageProcessor {
private async handleCreateAndSave(operation: CreateAndSaveOperation<any>) {
const saved = await this.handleWrite(operation.txId, async eM => {
const res = eM.getRepository(MainDbEntities[operation.entity]).create(operation.toSave)
return eM.getRepository(MainDbEntities[operation.entity]).save(res)
const res = eM.getRepository(this.getEntity(operation.entity)).create(operation.toSave)
return eM.getRepository(this.getEntity(operation.entity)).save(res)
})
this.sendResponse({

View file

@ -1,5 +1,5 @@
import { DataSource, EntityManager, EntityTarget } from "typeorm"
import { PubLogger, getLogger } from "../helpers/logger.js"
import { PubLogger, getLogger } from "../../helpers/logger.js"
type TX<T> = (entityManager: EntityManager | DataSource) => Promise<T>
type TxOperation<T> = {

View file

@ -1,5 +1,5 @@
import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js";
import { StorageInterface } from "./storageInterface.js";
import { StorageInterface } from "./db/storageInterface.js";
type AccessToAdd = {
npub: string
rules?: DebitAccessRules

View file

@ -1,18 +1,19 @@
import fs from 'fs'
import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db.js"
import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db/db.js"
import ProductStorage from './productStorage.js'
import ApplicationStorage from './applicationStorage.js'
import UserStorage from "./userStorage.js";
import PaymentStorage from "./paymentStorage.js";
import MetricsStorage from "./metricsStorage.js";
import MetricsEventStorage from "./metricsEventStorage.js";
import MetricsEventStorage from "./tlv/metricsEventStorage.js";
import EventsLogManager from "./eventsLog.js";
import { LiquidityStorage } from "./liquidityStorage.js";
import DebitStorage from "./debitStorage.js"
import OfferStorage from "./offerStorage.js"
import { StorageInterface, TX } from "./storageInterface.js";
import { allMetricsMigrations, allMigrations } from "./migrations/runner.js"
import { StorageInterface, TX } from "./db/storageInterface.js";
import { PubLogger } from "../helpers/logger.js"
import { TlvStorageFactory } from './tlv/tlvFilesStorageFactory.js';
import { Utils } from '../helpers/utilsWrapper.js';
export type StorageSettings = {
dbSettings: DbSettings
eventLogPath: string
@ -36,13 +37,15 @@ export default class {
debitStorage: DebitStorage
offerStorage: OfferStorage
eventsLog: EventsLogManager
constructor(settings: StorageSettings) {
utils: Utils
constructor(settings: StorageSettings, utils: Utils) {
this.settings = settings
this.utils = utils
this.eventsLog = new EventsLogManager(settings.eventLogPath)
}
async Connect(log: PubLogger) {
this.dbs = new StorageInterface()
await this.dbs.Connect(this.settings.dbSettings)
this.dbs = new StorageInterface(this.utils)
await this.dbs.Connect(this.settings.dbSettings, 'main')
//const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations)
//this.DB = source
//this.txQueue = new TransactionsQueue("main", this.DB)
@ -50,22 +53,22 @@ export default class {
this.productStorage = new ProductStorage(this.dbs)
this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage)
this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage)
this.metricsStorage = new MetricsStorage(this.settings)
this.metricsEventStorage = new MetricsEventStorage(this.settings)
this.metricsStorage = new MetricsStorage(this.settings, this.utils)
this.metricsEventStorage = new MetricsEventStorage(this.settings, this.utils.tlvStorageFactory)
this.liquidityStorage = new LiquidityStorage(this.dbs)
this.debitStorage = new DebitStorage(this.dbs)
this.offerStorage = new OfferStorage(this.dbs)
try { if (this.settings.dataDir) fs.mkdirSync(this.settings.dataDir) } catch (e) { }
const executedMetricsMigrations = await this.metricsStorage.Connect(allMetricsMigrations)
/* const executedMetricsMigrations = */ await this.metricsStorage.Connect()
/* if (executedMigrations.length > 0) {
log(executedMigrations.length, "new migrations executed")
log("-------------------")
} */
if (executedMetricsMigrations.length > 0) {
log(executedMetricsMigrations.length, "new metrics migrations executed")
log("-------------------")
}
/* if (executedMetricsMigrations.length > 0) {
log(executedMetricsMigrations.length, "new metrics migrations executed")
log("-------------------")
} */
}
Stop() {

View file

@ -2,7 +2,7 @@ import { IsNull, MoreThan, Not } from "typeorm"
import { LspOrder } from "./entity/LspOrder.js";
import { LndNodeInfo } from "./entity/LndNodeInfo.js";
import { TrackedProvider } from "./entity/TrackedProvider.js";
import { StorageInterface } from "./storageInterface.js";
import { StorageInterface } from "./db/storageInterface.js";
export class LiquidityStorage {
dbs: StorageInterface
constructor(dbs: StorageInterface) {

View file

@ -1,59 +1,68 @@
import { Between, DataSource, EntityManager, FindManyOptions, FindOperator, LessThanOrEqual, MoreThanOrEqual } from "typeorm"
import { BalanceEvent } from "./entity/BalanceEvent.js"
import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js"
import TransactionsQueue from "./transactionsQueue.js";
import TransactionsQueue from "./db/transactionsQueue.js";
import { StorageSettings } from "./index.js";
import { newMetricsDb } from "./db.js";
import { newMetricsDb } from "./db/db.js";
import { ChannelRouting } from "./entity/ChannelRouting.js";
import { RootOperation } from "./entity/RootOperation.js";
import { StorageInterface } from "./db/storageInterface.js";
import { Utils } from "../helpers/utilsWrapper.js";
export default class {
DB: DataSource | EntityManager
//DB: DataSource | EntityManager
settings: StorageSettings
txQueue: TransactionsQueue
constructor(settings: StorageSettings) {
dbs: StorageInterface
utils: Utils
//txQueue: TransactionsQueue
constructor(settings: StorageSettings, utils: Utils) {
this.settings = settings;
this.utils = utils
}
async Connect(metricsMigrations: Function[]) {
const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations)
this.DB = source;
this.txQueue = new TransactionsQueue("metrics", this.DB)
return executedMigrations;
async Connect() {
//const { source, executedMigrations } = await newMetricsDb(this.settings.dbSettings, metricsMigrations)
//this.DB = source;
//this.txQueue = new TransactionsQueue("metrics", this.DB)
this.dbs = new StorageInterface(this.utils)
await this.dbs.Connect(this.settings.dbSettings, 'metrics')
//return executedMigrations;
}
async SaveBalanceEvents(balanceEvent: Partial<BalanceEvent>, channelBalanceEvents: Partial<ChannelBalanceEvent>[]) {
const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent)
const balanceEntry = await this.txQueue.PushToQueue<BalanceEvent>({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false })
//const blanceEventEntry = this.DB.getRepository(BalanceEvent).create(balanceEvent)
//const balanceEntry = await this.txQueue.PushToQueue<BalanceEvent>({ exec: async db => db.getRepository(BalanceEvent).save(blanceEventEntry), dbTx: false })
const balanceEntry = await this.dbs.CreateAndSave<BalanceEvent>('BalanceEvent', balanceEvent)
//const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry })))
//const channelsEntries = await this.txQueue.PushToQueue<ChannelBalanceEvent[]>({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false })
const channelsEntries = await this.dbs.CreateAndSave<ChannelBalanceEvent[]>('ChannelBalanceEvent', channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry })))
const channelsEntry = this.DB.getRepository(ChannelBalanceEvent).create(channelBalanceEvents.map(e => ({ ...e, balance_event: balanceEntry })))
const channelsEntries = await this.txQueue.PushToQueue<ChannelBalanceEvent[]>({ exec: async db => db.getRepository(ChannelBalanceEvent).save(channelsEntry), dbTx: false })
return { balanceEntry, channelsEntries }
}
async GetBalanceEvents({ from, to }: { from?: number, to?: number }, entityManager = this.DB) {
async GetBalanceEvents({ from, to }: { from?: number, to?: number }, txId?: string) {
const q = getTimeQuery({ from, to })
const [chainBalanceEvents] = await Promise.all([
entityManager.getRepository(BalanceEvent).find(q),
])
const chainBalanceEvents = await this.dbs.Find<BalanceEvent>('BalanceEvent', q, txId)
return { chainBalanceEvents }
}
async initChannelRoutingEvent(dayUnix: number, channelId: string) {
const existing = await this.DB.getRepository(ChannelRouting).findOne({ where: { day_unix: dayUnix, channel_id: channelId } })
const existing = await this.dbs.FindOne<ChannelRouting>('ChannelRouting', { where: { day_unix: dayUnix, channel_id: channelId } })
if (!existing) {
const entry = this.DB.getRepository(ChannelRouting).create({ day_unix: dayUnix, channel_id: channelId })
return this.txQueue.PushToQueue<ChannelRouting>({ exec: async db => db.getRepository(ChannelRouting).save(entry), dbTx: false })
return this.dbs.CreateAndSave<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId })
}
return existing
}
GetChannelRouting({ from, to }: { from?: number, to?: number }, entityManager = this.DB) {
GetChannelRouting({ from, to }: { from?: number, to?: number }, txId?: string) {
const q = getTimeQuery({ from, to })
return entityManager.getRepository(ChannelRouting).find(q)
return this.dbs.Find<ChannelRouting>('ChannelRouting', q, txId)
}
async GetLatestForwardingIndexOffset() {
const latestIndex = await this.DB.getRepository(ChannelRouting).find({ order: { latest_index_offset: "DESC" }, take: 1 })
const latestIndex = await this.dbs.Find<ChannelRouting>('ChannelRouting', { order: { latest_index_offset: "DESC" }, take: 1 })
if (latestIndex.length > 0) {
return latestIndex[0].latest_index_offset
}
@ -63,50 +72,49 @@ export default class {
async IncrementChannelRouting(channelId: string, event: Partial<ChannelRouting>) {
const dayUnix = getTodayUnix()
const existing = await this.initChannelRoutingEvent(dayUnix, channelId)
const repo = this.DB.getRepository(ChannelRouting)
//const repo = this.DB.getRepository(ChannelRouting)
if (event.send_errors) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "send_errors", event.send_errors)
}
if (event.receive_errors) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors)
}
if (event.forward_errors_as_input) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input)
}
if (event.forward_errors_as_output) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output)
}
if (event.missed_forward_fee_as_input) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input)
}
if (event.missed_forward_fee_as_output) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_output", event.missed_forward_fee_as_output)
}
if (event.forward_fee_as_input) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_input", event.forward_fee_as_input)
}
if (event.forward_fee_as_output) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "forward_fee_as_output", event.forward_fee_as_output)
}
if (event.events_as_input) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_input", event.events_as_input)
}
if (event.events_as_output) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output)
await this.dbs.Increment<ChannelRouting>('ChannelRouting', { day_unix: dayUnix, channel_id: channelId }, "events_as_output", event.events_as_output)
}
if (event.latest_index_offset) {
await repo.update(existing.serial_id, { latest_index_offset: event.latest_index_offset })
await this.dbs.Update<ChannelRouting>('ChannelRouting', existing.serial_id, { latest_index_offset: event.latest_index_offset })
}
}
async AddRootOperation(opType: string, id: string, amount: number, entityManager = this.DB) {
const newOp = entityManager.getRepository(RootOperation).create({ operation_type: opType, operation_amount: amount, operation_identifier: id })
return this.txQueue.PushToQueue<RootOperation>({ exec: async db => db.getRepository(RootOperation).save(newOp), dbTx: false })
async AddRootOperation(opType: string, id: string, amount: number, txId?: string) {
return this.dbs.CreateAndSave<RootOperation>('RootOperation', { operation_type: opType, operation_amount: amount, operation_identifier: id }, txId)
}
async GetRootOperations({ from, to }: { from?: number, to?: number }, entityManager = this.DB) {
async GetRootOperations({ from, to }: { from?: number, to?: number }, txId?: string) {
const q = getTimeQuery({ from, to })
return entityManager.getRepository(RootOperation).find(q)
return this.dbs.Find<RootOperation>('RootOperation', q, txId)
}
}

View file

@ -1,6 +1,6 @@
import crypto from 'crypto';
import { UserOffer } from "./entity/UserOffer.js";
import { StorageInterface } from "./storageInterface.js";
import { StorageInterface } from "./db/storageInterface.js";
export default class {

View file

@ -11,9 +11,9 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio
import { UserInvoicePayment } from './entity/UserInvoicePayment.js';
import { UserToUserPayment } from './entity/UserToUserPayment.js';
import { Application } from './entity/Application.js';
import TransactionsQueue from "./transactionsQueue.js";
import TransactionsQueue from "./db/transactionsQueue.js";
import { LoggedEvent } from './eventsLog.js';
import { StorageInterface } from './storageInterface.js';
import { StorageInterface } from './db/storageInterface.js';
export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo, offerId?: string, payerData?: Record<string, string> }
export const defaultInvoiceExpiry = 60 * 60
export default class {

View file

@ -1,6 +1,6 @@
import { Product } from "./entity/Product.js"
import { User } from "./entity/User.js"
import { StorageInterface } from "./storageInterface.js";
import { StorageInterface } from "./db/storageInterface.js";
export default class {
dbs: StorageInterface
constructor(dbs: StorageInterface) {

View file

@ -1,21 +1,20 @@
import fs from 'fs'
import * as Types from '../../../proto/autogenerated/ts/types.js'
import { StorageSettings } from "./index.js";
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js';
import { TlvFilesStorage } from './tlvFilesStorage.js';
import * as Types from '../../../../proto/autogenerated/ts/types.js'
import { StorageSettings } from "../index.js";
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js';
import { TlvStorageFactory, TlvStorageInterface } from './tlvFilesStorageFactory.js';
export default class {
tlvStorage: TlvFilesStorage
tlvStorage: TlvStorageInterface
cachePath: string
last24hCache: { ts: number, ok: number, fail: number }[] = []
lastPersistedCache: number = 0
constructor(settings: StorageSettings) {
constructor(settings: StorageSettings, tlvStorageFactory: TlvStorageFactory) {
const metricsPath = [settings.dataDir, "metric_events"].filter(s => !!s).join("/")
this.tlvStorage = new TlvFilesStorage(metricsPath)
this.tlvStorage = tlvStorageFactory.NewStorage({ name: 'usage', path: metricsPath })
this.cachePath = [settings.dataDir, "metric_cache"].filter(s => !!s).join("/")
if (!fs.existsSync(this.cachePath)) {
fs.mkdirSync(this.cachePath, { recursive: true });
}
this.tlvStorage.initMeta()
this.loadCache()
setInterval(() => {
if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) {
@ -77,7 +76,7 @@ export default class {
}
LoadLatestMetrics = async (limit = 30): Promise<Types.UsageMetrics> => {
const raw = this.tlvStorage.LoadLatest(limit)
const raw = await this.tlvStorage.LoadLatest(limit)
const metrics: Types.UsageMetrics = { apps: {} }
Object.keys(raw).forEach(app => {
metrics.apps[app] = { app_metrics: {} }
@ -93,7 +92,7 @@ export default class {
return metrics
}
LoadMetricsFile = async (app: string, method: string, chunk: number): Promise<Types.UsageMetricTlv> => {
const { fileData, chunks } = this.tlvStorage.LoadFile(app, method, chunk)
const { fileData, chunks } = await this.tlvStorage.LoadFile(app, method, chunk)
//const tlv = await this.LoadRawMetricsFile(app, method, chunk)
const decoded = decodeListTLV(parseTLV(fileData))
return {

View file

@ -0,0 +1,39 @@
import { getLogger } from "../../helpers/logger.js"
export type ProcessMetrics = {
memory_rss_kb?: number
memory_buffer_kb?: number
memory_heap_total_kb?: number
memory_heap_used_kb?: number
memory_external_kb?: number
}
export class ProcessMetricsCollector {
reportLog = getLogger({ component: 'ProcessMetricsCollector' })
prevValues: Record<string, number> = {}
interval: NodeJS.Timeout
constructor(cb: (metrics: ProcessMetrics) => void) {
this.interval = setInterval(() => {
const mem = process.memoryUsage()
const metrics: ProcessMetrics = {
memory_rss_kb: this.AddValue('memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true),
memory_buffer_kb: this.AddValue('memory_buffer_kb', Math.ceil(mem.arrayBuffers / 1000 || 0), true),
memory_heap_total_kb: this.AddValue('memory_heap_total_kb', Math.ceil(mem.heapTotal / 1000 || 0), true),
memory_heap_used_kb: this.AddValue('memory_heap_used_kb', Math.ceil(mem.heapUsed / 1000 || 0), true),
memory_external_kb: this.AddValue('memory_external_kb', Math.ceil(mem.external / 1000 || 0), true),
}
cb(metrics)
}, 60 * 1000)
}
Stop() {
clearInterval(this.interval)
}
AddValue = (key: string, v: number, updateOnly = false): number | undefined => {
if (updateOnly && this.prevValues[key] === v) {
return
}
this.prevValues[key] = v
return v
}
}

View file

@ -1,9 +1,9 @@
import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js"
import { getLogger } from "../helpers/logger.js"
import { decodeListTLV, integerToUint8Array, parseTLV } from "../helpers/tlv.js"
import { StorageSettings } from "./index.js"
import { TlvFilesStorage } from "./tlvFilesStorage.js"
import * as Types from "../../../proto/autogenerated/ts/types.js"
import { LatestBundleMetricReq } from "../../../../proto/autogenerated/ts/types.js"
import { getLogger } from "../../helpers/logger.js"
import { decodeListTLV, integerToUint8Array, parseTLV } from "../../helpers/tlv.js"
import { StorageSettings } from "../index.js"
import * as Types from "../../../../proto/autogenerated/ts/types.js"
import { TlvStorageFactory, TlvStorageInterface } from "./tlvFilesStorageFactory.js"
const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const
const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const
const maxStatePointTypes = ['maxProviderRespTime'] as const
@ -30,14 +30,13 @@ export type TxPointSettings = {
}
export class StateBundler {
tlvStorage: TlvFilesStorage
tlvStorage: TlvStorageInterface
reportLog = getLogger({ component: 'stateBundlerReport' })
prevValues: Record<string, number> = {}
interval: NodeJS.Timeout
constructor(settings: StorageSettings) {
const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/")
this.tlvStorage = new TlvFilesStorage(bundlerPath)
this.tlvStorage.initMeta()
constructor(dataDir: string, tlvStorageFactory: TlvStorageFactory) {
const bundlerPath = [dataDir, "bundler_events"].filter(s => !!s).join("/")
this.tlvStorage = tlvStorageFactory.NewStorage({ name: 'bundler', path: bundlerPath })
this.interval = setInterval(() => {
const mem = process.memoryUsage()
this.AddValue('_root', 'memory_rss_kb', Math.ceil(mem.rss / 1000 || 0), true)
@ -53,7 +52,7 @@ export class StateBundler {
}
async GetBundleMetrics(req: Types.LatestBundleMetricReq): Promise<Types.BundleMetrics> {
const latest = this.tlvStorage.LoadLatest(req.limit)
const latest = await this.tlvStorage.LoadLatest(req.limit)
const metrics: Types.BundleMetrics = { apps: {} }
Object.keys(latest).forEach(app => {
metrics.apps[app] = { app_bundles: {} }
@ -70,7 +69,7 @@ export class StateBundler {
}
async GetSingleBundleMetrics(req: Types.SingleMetricReq): Promise<Types.BundleData> {
const { fileData, chunks } = this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page)
const { fileData, chunks } = await this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page)
const decoded = decodeListTLV(parseTLV(fileData))
return {
current_chunk: req.page,

View file

@ -1,13 +1,14 @@
import fs from 'fs'
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../../helpers/tlv.js'
const chunkSizeBytes = 128 * 1024
export type LatestData = Record<string, Record<string, { tlvs: Uint8Array[], current_chunk: number, available_chunks: number[] }>>
export type TlvFile = { fileData: Buffer, chunks: number[] }
export class TlvFilesStorage {
storagePath: string
lastPersisted: number = 0
meta: Record<string, Record<string, { chunks: number[] }>> = {}
pending: Record<string, Record<string, { tlvs: Uint8Array[] }>> = {}
metaReady = false
private storagePath: string
private lastPersisted: number = 0
private meta: Record<string, Record<string, { chunks: number[] }>> = {}
private pending: Record<string, Record<string, { tlvs: Uint8Array[] }>> = {}
private metaReady = false
constructor(storagePath: string) {
this.storagePath = storagePath
if (!fs.existsSync(this.storagePath)) {
@ -24,9 +25,9 @@ export class TlvFilesStorage {
});
}
LoadFile = (app: string, dataName: string, chunk: number): { fileData: Buffer, chunks: number[] } => {
LoadFile = (app: string, dataName: string, chunk: number): TlvFile => {
if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) {
throw new Error("metrics not found")
throw new Error(`tlv file for ${app} ${dataName} chunk ${chunk} not found`)
}
const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].filter(s => !!s).join("/")
const fileData = fs.readFileSync(fullPath)
@ -71,7 +72,7 @@ export class TlvFilesStorage {
return data
}
persist = () => {
private persist = () => {
if (!this.metaReady) {
throw new Error("meta metrics not ready")
}
@ -104,28 +105,28 @@ export class TlvFilesStorage {
})
}
getMeta = (appId: string, dataName: string) => {
private getMeta = (appId: string, dataName: string) => {
if (!this.meta[appId]) {
return { chunks: [] }
}
return this.meta[appId][dataName] || { chunks: [] }
}
initMeta = () => {
private initMeta = () => {
this.foreachFile((app, dataName, tlvFiles) => {
this.updateMeta(app, dataName, tlvFiles)
})
this.metaReady = true
}
updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => {
private updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => {
if (!this.meta[appId]) {
this.meta[appId] = {}
}
this.meta[appId][dataName] = { chunks: sortedChunks }
}
foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => {
private foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => {
if (!fs.existsSync(this.storagePath)) {
fs.mkdirSync(this.storagePath, { recursive: true });
}

View file

@ -0,0 +1,151 @@
import { ChildProcess, fork } from 'child_process';
import { EventEmitter } from 'events';
import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation, ProcessMetricsTlvOperation } from './tlvFilesStorageProcessor';
import { LatestData, TlvFile } from './tlvFilesStorage';
import { NostrSend, SendData, SendInitiator } from '../../nostr/handler';
import { WebRtcUserInfo } from '../../webRTC';
import * as Types from '../../../../proto/autogenerated/ts/types.js'
import { ProcessMetrics } from './processMetricsCollector';
export type TlvStorageInterface = {
AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise<number>
LoadLatest: (limit?: number) => Promise<LatestData>
LoadFile: (appId: string, dataName: string, chunk: number) => Promise<TlvFile>
}
export class TlvStorageFactory extends EventEmitter {
private process: ChildProcess;
private isConnected: boolean = false;
private debug: boolean = false;
private _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') }
constructor() {
super();
this.initializeSubprocess();
}
setDebug(debug: boolean) {
this.debug = debug;
}
attachNostrSend(f: NostrSend) {
this._nostrSend = f
}
private nostrSend = (opResponse: SuccessTlvOperationResponse<{ initiator: SendInitiator, data: SendData, relays?: string[] }>) => {
if (!this._nostrSend) {
throw new Error("No nostrSend attached")
}
this._nostrSend(opResponse.data.initiator, opResponse.data.data, opResponse.data.relays)
}
private initializeSubprocess() {
this.process = fork('./build/src/services/storage/tlv/tlvFilesStorageProcessor');
this.process.on('message', (response: TlvOperationResponse<any>) => {
if (response.success && response.type === 'nostrSend') {
this.nostrSend(response)
} else {
this.emit(response.opId, response);
}
});
this.process.on('error', (error: Error) => {
console.error('Tlv Storage processor error:', error);
this.isConnected = false;
});
this.process.on('exit', (code: number) => {
console.log(`Tlv Storage processor exited with code ${code}`);
this.isConnected = false;
});
this.isConnected = true;
}
NewStorage(settings: TlvStorageSettings): TlvStorageInterface {
const opId = Math.random().toString()
const op: NewTlvStorageOperation = { type: 'newStorage', opId, settings }
this.handleOp<void>(op)
return {
AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => this.AddTlv(settings.name, appId, dataName, tlv),
LoadLatest: (limit?: number) => this.LoadLatest(settings.name, limit),
LoadFile: (appId: string, dataName: string, chunk: number) => this.LoadFile(settings.name, appId, dataName, chunk)
}
}
AddTlv(storageName: string, appId: string, dataName: string, tlv: Uint8Array): Promise<number> {
const opId = Math.random().toString()
const op: AddTlvOperation = { type: 'addTlv', opId, storageName, appId, dataName, base64Tlv: Buffer.from(tlv).toString('base64') }
return this.handleOp<number>(op)
}
async LoadLatest(storageName: string, limit?: number): Promise<LatestData> {
const opId = Math.random().toString()
const op: LoadLatestTlvOperation = { type: 'loadLatest', opId, storageName, limit }
const latestData = await this.handleOp<SerializableLatestData>(op)
const deserializedLatestData: LatestData = {}
for (const appId in latestData) {
deserializedLatestData[appId] = {}
for (const dataName in latestData[appId]) {
deserializedLatestData[appId][dataName] = { tlvs: latestData[appId][dataName].base64tlvs.map(tlv => new Uint8Array(Buffer.from(tlv, 'base64'))), current_chunk: latestData[appId][dataName].current_chunk, available_chunks: latestData[appId][dataName].available_chunks }
}
}
return deserializedLatestData
}
async LoadFile(storageName: string, appId: string, dataName: string, chunk: number): Promise<TlvFile> {
const opId = Math.random().toString()
const op: LoadTlvFileOperation = { type: 'loadFile', opId, storageName, appId, dataName, chunk }
const tlvFile = await this.handleOp<SerializableTlvFile>(op)
return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks }
}
WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise<Types.WebRtcAnswer> {
const opId = Math.random().toString()
const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message }
return this.handleOp<Types.WebRtcAnswer>(op)
}
ProcessMetrics(metrics: ProcessMetrics, processName: string): Promise<void> {
const opId = Math.random().toString()
const op: ProcessMetricsTlvOperation = { type: 'processMetrics', opId, metrics, processName }
return this.handleOp<void>(op)
}
private handleOp<T>(op: ITlvStorageOperation): Promise<T> {
const debug = this.debug || op.debug
if (debug) console.log('handleOp', op)
this.checkConnected()
return new Promise<T>((resolve, reject) => {
const responseHandler = (response: TlvOperationResponse<T>) => {
if (debug) console.log('tlv responseHandler', response)
if (!response.success) {
reject(new Error(response.error));
return
}
if (response.type !== op.type) {
reject(new Error('Invalid tlv storage response type: ' + response.type + ' expected: ' + op.type));
return
}
resolve(response.data);
}
this.once(op.opId, responseHandler)
this.process.send({ ...op, debug })
})
}
private checkConnected() {
if (!this.isConnected) {
throw new Error('Tlv Storage processor is not connected');
}
}
public disconnect() {
if (this.process) {
this.process.kill();
this.isConnected = false;
this.debug = false;
}
}
}

View file

@ -0,0 +1,290 @@
import { PubLogger, getLogger } from '../../helpers/logger.js';
import webRTC, { WebRtcUserInfo } from '../../webRTC/index.js';
import { TlvFilesStorage } from './tlvFilesStorage.js';
import * as Types from '../../../../proto/autogenerated/ts/types.js'
import { SendData } from '../../nostr/handler.js';
import { SendInitiator } from '../../nostr/handler.js';
import { ProcessMetrics, ProcessMetricsCollector } from './processMetricsCollector.js';
import { integerToUint8Array } from '../../helpers/tlv.js';
export type SerializableLatestData = Record<string, Record<string, { base64tlvs: string[], current_chunk: number, available_chunks: number[] }>>
export type SerializableTlvFile = { base64fileData: string, chunks: number[] }
export const usageStorageName = 'usage'
export const bundlerStorageName = 'bundler'
export type TlvStorageSettings = {
path: string
name: typeof usageStorageName | typeof bundlerStorageName
}
export type NewTlvStorageOperation = {
type: 'newStorage'
opId: string
settings: TlvStorageSettings
debug?: boolean
}
export type AddTlvOperation = {
type: 'addTlv'
opId: string
storageName: string
appId: string
dataName: string
base64Tlv: string
debug?: boolean
}
export type LoadLatestTlvOperation = {
type: 'loadLatest'
opId: string
storageName: string
limit?: number
debug?: boolean
}
export type LoadTlvFileOperation = {
type: 'loadFile'
opId: string
storageName: string
appId: string
dataName: string
chunk: number
debug?: boolean
}
export type WebRtcMessageOperation = {
type: 'webRtcMessage'
opId: string
userInfo: WebRtcUserInfo
message: Types.WebRtcMessage_message
debug?: boolean
}
export type ProcessMetricsTlvOperation = {
type: 'processMetrics'
opId: string
processName?: string
metrics: ProcessMetrics
debug?: boolean
}
export type ErrorTlvOperationResponse = { success: false, error: string, opId: string }
export interface ITlvStorageOperation {
opId: string
type: string
debug?: boolean
}
export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation | ProcessMetricsTlvOperation
export type SuccessTlvOperationResponse<T> = { success: true, type: string, data: T, opId: string }
export type TlvOperationResponse<T> = SuccessTlvOperationResponse<T> | ErrorTlvOperationResponse
class TlvFilesStorageProcessor {
private log: PubLogger = console.log
private storages: Record<string, TlvFilesStorage> = {}
private wrtc: webRTC
constructor() {
if (!process.send) {
throw new Error('This process must be spawned as a child process');
}
process.on('message', (operation: TlvStorageOperation) => {
this.handleOperation(operation);
});
process.on('error', (error: Error) => {
console.error('Error in storage processor:', error);
});
this.wrtc = new webRTC(t => {
switch (t) {
case Types.SingleMetricType.USAGE_METRIC:
return this.storages[usageStorageName]
case Types.SingleMetricType.BUNDLE_METRIC:
return this.storages[bundlerStorageName]
default:
throw new Error('Unknown metric type: ' + t)
}
})
this.wrtc.attachNostrSend((initiator: SendInitiator, data: SendData, relays?: string[] | undefined) => {
this.sendResponse({
success: true,
type: 'nostrSend',
data: { initiator, data, relays },
opId: Math.random().toString()
});
})
new ProcessMetricsCollector((pMetrics) => {
this.saveProcessMetrics(pMetrics, 'tlv_processor')
})
}
private serializeNowTlv = (v: number) => {
const nowUnix = Math.floor(Date.now() / 1000)
const entry = new Uint8Array(8)
entry.set(integerToUint8Array(nowUnix), 0)
entry.set(integerToUint8Array(v), 4)
return entry
}
private saveProcessMetrics = (pMetrics: ProcessMetrics, processName = "") => {
const pName = processName ? '_' + processName : ''
if (!this.storages[bundlerStorageName]) {
console.log('no bundle storage yet')
return
}
if (pMetrics.memory_rss_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_rss_kb' + pName, this.serializeNowTlv(pMetrics.memory_rss_kb))
if (pMetrics.memory_buffer_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_buffer_kb' + pName, this.serializeNowTlv(pMetrics.memory_buffer_kb))
if (pMetrics.memory_heap_total_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_heap_total_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_total_kb))
if (pMetrics.memory_heap_used_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_heap_used_kb' + pName, this.serializeNowTlv(pMetrics.memory_heap_used_kb))
if (pMetrics.memory_external_kb) this.storages[bundlerStorageName].AddTlv('_root', 'memory_external_kb' + pName, this.serializeNowTlv(pMetrics.memory_external_kb))
}
private async handleOperation(operation: TlvStorageOperation) {
try {
const opId = operation.opId;
if (operation.debug) console.log('handleOperation', operation)
switch (operation.type) {
case 'newStorage':
await this.handleNewStorage(operation);
break;
case 'addTlv':
await this.handleAddTlv(operation);
break;
case 'loadLatest':
await this.handleLoadLatestTlv(operation);
break;
case 'loadFile':
await this.handleLoadTlvFile(operation);
break;
case 'webRtcMessage':
await this.handleWebRtcMessage(operation);
break;
case 'processMetrics':
await this.handleProcessMetrics(operation);
break;
default:
this.sendResponse({
success: false,
error: `Unknown operation type: ${(operation as any).type}`,
opId
})
return
}
} catch (error) {
this.sendResponse({
success: false,
error: error instanceof Error ? error.message : 'Unknown error occurred',
opId: operation.opId
});
}
}
private async handleNewStorage(operation: NewTlvStorageOperation) {
if (this.storages[operation.settings.name]) {
this.sendResponse({
success: false,
error: `Storage ${operation.settings.name} already exists`,
opId: operation.opId
})
return
}
this.storages[operation.settings.name] = new TlvFilesStorage(operation.settings.path)
this.sendResponse({
success: true,
type: 'newStorage',
data: null,
opId: operation.opId
});
}
private async handleAddTlv(operation: AddTlvOperation) {
if (!this.storages[operation.storageName]) {
this.sendResponse({
success: false,
error: `Storage ${operation.storageName} does not exist`,
opId: operation.opId
})
return
}
const tlv = new Uint8Array(Buffer.from(operation.base64Tlv, 'base64'))
this.storages[operation.storageName].AddTlv(operation.appId, operation.dataName, tlv)
this.sendResponse<null>({
success: true,
type: 'addTlv',
data: null,
opId: operation.opId
});
}
private async handleLoadLatestTlv(operation: LoadLatestTlvOperation) {
if (!this.storages[operation.storageName]) {
this.sendResponse({
success: false,
error: `Storage ${operation.storageName} does not exist`,
opId: operation.opId
})
return
}
const data = this.storages[operation.storageName].LoadLatest(operation.limit)
const serializableData: SerializableLatestData = {}
for (const appId in data) {
serializableData[appId] = {}
for (const dataName in data[appId]) {
serializableData[appId][dataName] = { base64tlvs: data[appId][dataName].tlvs.map(tlv => Buffer.from(tlv).toString('base64')), current_chunk: data[appId][dataName].current_chunk, available_chunks: data[appId][dataName].available_chunks }
}
}
this.sendResponse<SerializableLatestData>({
success: true,
type: 'loadLatest',
data: serializableData,
opId: operation.opId
});
}
private async handleLoadTlvFile(operation: LoadTlvFileOperation) {
if (!this.storages[operation.storageName]) {
this.sendResponse({
success: false,
error: `Storage ${operation.storageName} does not exist`,
opId: operation.opId
})
return
}
const data = this.storages[operation.storageName].LoadFile(operation.appId, operation.dataName, operation.chunk)
this.sendResponse<SerializableTlvFile>({
success: true,
type: 'loadFile',
data: { base64fileData: Buffer.from(data.fileData).toString('base64'), chunks: data.chunks },
opId: operation.opId
});
}
private async handleWebRtcMessage(operation: WebRtcMessageOperation) {
const answer = await this.wrtc.OnMessage(operation.userInfo, operation.message)
this.sendResponse<Types.WebRtcAnswer>({
success: true,
type: 'webRtcMessage',
data: answer,
opId: operation.opId
});
}
private async handleProcessMetrics(operation: ProcessMetricsTlvOperation) {
this.saveProcessMetrics(operation.metrics, operation.processName)
this.sendResponse<null>({
success: true,
type: 'processMetrics',
data: null,
opId: operation.opId
});
}
private sendResponse<T>(response: TlvOperationResponse<T>) {
if (process.send) {
process.send(response);
}
}
}
// Start the storage processor
new TlvFilesStorageProcessor();

View file

@ -3,7 +3,7 @@ import { User } from './entity/User.js';
import { UserBasicAuth } from './entity/UserBasicAuth.js';
import { getLogger } from '../helpers/logger.js';
import EventsLogManager from './eventsLog.js';
import { StorageInterface } from './storageInterface.js';
import { StorageInterface } from './db/storageInterface.js';
export default class {
dbs: StorageInterface
eventsLog: EventsLogManager

View file

@ -6,20 +6,23 @@ import * as Types from '../../../proto/autogenerated/ts/types.js'
import { NostrSend, SendData, SendInitiator } from "../nostr/handler.js"
import { encodeTLbV, encodeTLV, encodeTLVDataPacket } from '../helpers/tlv.js'
import { Utils } from '../helpers/utilsWrapper.js'
import { TlvFilesStorage } from '../storage/tlvFilesStorage.js'
import { TlvFilesStorage } from '../storage/tlv/tlvFilesStorage.js'
import { TlvStorageInterface } from '../storage/tlv/tlvFilesStorageFactory.js'
type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number }
const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] }
type UserInfo = { userPub: string, appId: string }
export default class webRTC {
export type WebRtcUserInfo = { userPub: string, appId: string }
private storage: Storage
export type TlvStorageGetter = (t: Types.SingleMetricType) => TlvFilesStorage
export default class webRTC {
//private storage: Storage
private log = getLogger({ component: 'webRTC' })
private connections: Record<string, RTCPeerConnection> = {}
private _nostrSend: NostrSend
private utils: Utils
constructor(storage: Storage, utils: Utils) {
this.storage = storage
this.utils = utils
private tlvStorageGetter: TlvStorageGetter
//private utils: Utils
constructor(tlvStorageGetter: TlvStorageGetter) {
this.tlvStorageGetter = tlvStorageGetter
}
attachNostrSend(f: NostrSend) {
this._nostrSend = f
@ -31,12 +34,12 @@ export default class webRTC {
this._nostrSend(initiator, data, relays)
}
private sendCandidate = (u: UserInfo, candidate: string) => {
private sendCandidate = (u: WebRtcUserInfo, candidate: string) => {
const message: Types.WebRtcCandidate & { requestId: string, status: 'OK' } = { candidate, requestId: "SubToWebRtcCandidates", status: 'OK' }
this.nostrSend({ type: 'app', appId: u.appId }, { type: 'content', content: JSON.stringify(message), pub: u.userPub })
}
OnMessage = async (u: UserInfo, message: Types.WebRtcMessage_message): Promise<Types.WebRtcAnswer> => {
OnMessage = async (u: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise<Types.WebRtcAnswer> => {
if (message.type === Types.WebRtcMessage_message_type.OFFER) {
return this.connect(u, message.offer)
} else if (message.type === Types.WebRtcMessage_message_type.CANDIDATE) {
@ -45,7 +48,7 @@ export default class webRTC {
return {}
}
private onCandidate = async (u: UserInfo, candidate: string): Promise<Types.WebRtcAnswer> => {
private onCandidate = async (u: WebRtcUserInfo, candidate: string): Promise<Types.WebRtcAnswer> => {
const key = this.getConnectionsKey(u)
if (!this.connections[key]) {
throw new Error('Connection not found')
@ -57,7 +60,7 @@ export default class webRTC {
}
return {}
}
private connect = async (u: UserInfo, offer: string): Promise<Types.WebRtcAnswer> => {
private connect = async (u: WebRtcUserInfo, offer: string): Promise<Types.WebRtcAnswer> => {
const key = this.getConnectionsKey(u)
this.log("connect", key)
if (this.connections[key]) {
@ -87,25 +90,15 @@ export default class webRTC {
try {
const j = JSON.parse(event.data) as Types.SingleMetricReq
const err = Types.SingleMetricReqValidate(j, {
app_id_CustomCheck: id => id === u.appId,
app_id_CustomCheck: id => id !== "",
metrics_name_CustomCheck: name => name !== ""
})
if (err) {
this.log(ERROR, 'SingleUsageMetricReqValidate', err)
this.log(ERROR, 'SingleUsageMetricReqValidate', err.message || err)
return
}
let tlvStorage: TlvFilesStorage
switch (j.metric_type) {
case Types.SingleMetricType.USAGE_METRIC:
tlvStorage = this.storage.metricsEventStorage.tlvStorage
break
case Types.SingleMetricType.BUNDLE_METRIC:
tlvStorage = this.utils.stateBundler.tlvStorage
break
default:
throw new Error("Unknown metric type")
}
const { fileData } = tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page)
const tlvStorage = this.tlvStorageGetter(j.metric_type)
const { fileData } = await tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page)
const id = j.request_id || Math.floor(Math.random() * 100_000_000)
let i = 0
const packets: Buffer[] = []
@ -132,7 +125,7 @@ export default class webRTC {
return { answer: JSON.stringify(answer) }
}
getConnectionsKey = (u: UserInfo) => {
getConnectionsKey = (u: WebRtcUserInfo) => {
return u.appId + ":" + u.userPub
}
}

View file

@ -3,13 +3,14 @@ import { BitcoinCoreWrapper } from "./bitcoinCore.js"
import LND from '../services/lnd/lnd.js'
import { LiquidityProvider } from "../services/main/liquidityProvider.js"
import { Utils } from "../services/helpers/utilsWrapper.js"
import { TlvStorageFactory } from "../services/storage/tlv/tlvFilesStorageFactory.js"
export const setupNetwork = async () => {
const settings = LoadTestSettingsFromEnv()
const core = new BitcoinCoreWrapper(settings)
await core.InitAddress()
await core.Mine(1)
const setupUtils = new Utils(settings)
const tlvStorageFactory = new TlvStorageFactory()
const setupUtils = new Utils({ dataDir: settings.storageSettings.dataDir })
const alice = new LND(settings.lndSettings, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { })
const bob = new LND({ ...settings.lndSettings, mainNode: settings.lndSettings.otherNode }, new LiquidityProvider("", setupUtils, async () => { }, async () => { }), setupUtils, async () => { }, async () => { }, () => { }, () => { })
await tryUntil<void>(async i => {

View file

@ -13,6 +13,7 @@ import { getLogger, resetDisabledLoggers } from '../services/helpers/logger.js'
import { LiquidityProvider } from '../services/main/liquidityProvider.js'
import { Utils } from '../services/helpers/utilsWrapper.js'
import { AdminManager } from '../services/main/adminManager.js'
import { TlvStorageFactory } from '../services/storage/tlv/tlvFilesStorageFactory.js'
chai.use(chaiString)
export const expect = chai.expect
export type Describe = (message: string, failure?: boolean) => void
@ -43,7 +44,8 @@ export type StorageTestBase = {
export const setupStorageTest = async (d: Describe): Promise<StorageTestBase> => {
const settings = GetTestStorageSettings()
const storageManager = new Storage(settings)
const utils = new Utils({ dataDir: settings.dataDir })
const storageManager = new Storage(settings, utils)
await storageManager.Connect(console.log)
return {
expect,
@ -69,7 +71,7 @@ export const SetupTest = async (d: Describe): Promise<TestBase> => {
const user1 = { userId: u1.info.userId, appUserIdentifier: u1.identifier, appId: app.appId }
const user2 = { userId: u2.info.userId, appUserIdentifier: u2.identifier, appId: app.appId }
const extermnalUtils = new Utils(settings)
const extermnalUtils = new Utils({ dataDir: settings.storageSettings.dataDir })
const externalAccessToMainLnd = new LND(settings.lndSettings, new LiquidityProvider("", extermnalUtils, async () => { }, async () => { }), extermnalUtils, async () => { }, async () => { }, () => { }, () => { })
await externalAccessToMainLnd.Warmup()