move wrtc to tlv metrics sub process

This commit is contained in:
boufni95 2025-04-01 19:41:29 +00:00
parent 8bca7cb7db
commit bd10b02d88
6 changed files with 93 additions and 32 deletions

View file

@ -1,17 +1,23 @@
import { MainSettings } from "../main/settings.js";
import { StateBundler } from "../storage/tlv/stateBundler.js";
import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js";
import { NostrSend } from "../nostr/handler.js";
export class Utils {
tlvStorageFactory: TlvStorageFactory
stateBundler: StateBundler
settings: MainSettings
_nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') }
constructor(settings: MainSettings) {
this.settings = settings
this.tlvStorageFactory = new TlvStorageFactory()
this.stateBundler = new StateBundler(settings.storageSettings, this.tlvStorageFactory)
}
attachNostrSend(f: NostrSend) {
this._nostrSend = f
this.tlvStorageFactory.attachNostrSend(f)
}
Stop() {
this.stateBundler.Stop()
this.tlvStorageFactory.disconnect()

View file

@ -55,7 +55,7 @@ export default class {
utils: Utils
rugPullTracker: RugPullTracker
unlocker: Unlocker
webRTC: webRTC
//webRTC: webRTC
nostrSend: NostrSend = () => { getLogger({})("nostr send not initialized yet") }
constructor(settings: MainSettings, storage: Storage, adminManager: AdminManager, utils: Utils, unlocker: Unlocker) {
this.settings = settings
@ -76,7 +76,7 @@ export default class {
this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager)
this.debitManager = new DebitManager(this.storage, this.lnd, this.applicationManager)
this.offerManager = new OfferManager(this.storage, this.lnd, this.applicationManager, this.productManager)
this.webRTC = new webRTC(this.storage, this.utils)
//this.webRTC = new webRTC(this.storage, this.utils)
}
@ -99,7 +99,8 @@ export default class {
this.liquidityProvider.attachNostrSend(f)
this.debitManager.attachNostrSend(f)
this.offerManager.attachNostrSend(f)
this.webRTC.attachNostrSend(f)
this.utils.attachNostrSend(f)
//this.webRTC.attachNostrSend(f)
}
htlcCb: HtlcCb = (e) => {

View file

@ -11,7 +11,8 @@ export default (mainHandler: Main): Types.ServerMethods => {
offer_CustomCheck: offer => offer !== '',
}
})
return mainHandler.webRTC.OnMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message)
if (err != null) throw new Error(err.message)
return mainHandler.utils.tlvStorageFactory.WebRtcMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message)
},
SubToWebRtcCandidates: async ({ ctx }) => { },
GetUsageMetrics: async ({ ctx, req }) => {

View file

@ -1,8 +1,10 @@
import { ChildProcess, fork } from 'child_process';
import { EventEmitter } from 'events';
import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor';
import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation } from './tlvFilesStorageProcessor';
import { LatestData, TlvFile } from './tlvFilesStorage';
import { NostrSend, SendData, SendInitiator } from '../../nostr/handler';
import { WebRtcUserInfo } from '../../webRTC';
import * as Types from '../../../../proto/autogenerated/ts/types.js'
export type TlvStorageInterface = {
AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise<number>
LoadLatest: (limit?: number) => Promise<LatestData>
@ -13,7 +15,7 @@ export class TlvStorageFactory extends EventEmitter {
private process: ChildProcess;
private isConnected: boolean = false;
private debug: boolean = false;
private _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') }
constructor() {
super();
this.initializeSubprocess();
@ -23,11 +25,26 @@ export class TlvStorageFactory extends EventEmitter {
this.debug = debug;
}
attachNostrSend(f: NostrSend) {
this._nostrSend = f
}
private nostrSend = (opResponse: SuccessTlvOperationResponse<{ initiator: SendInitiator, data: SendData, relays?: string[] }>) => {
if (!this._nostrSend) {
throw new Error("No nostrSend attached")
}
this._nostrSend(opResponse.data.initiator, opResponse.data.data, opResponse.data.relays)
}
private initializeSubprocess() {
this.process = fork('./build/src/services/storage/tlv/tlvFilesStorageProcessor');
this.process.on('message', (response: TlvOperationResponse<any>) => {
if (response.success && response.type === 'nostrSend') {
this.nostrSend(response)
} else {
this.emit(response.opId, response);
}
});
this.process.on('error', (error: Error) => {
@ -81,6 +98,12 @@ export class TlvStorageFactory extends EventEmitter {
return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks }
}
WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise<Types.WebRtcAnswer> {
const opId = Math.random().toString()
const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message, debug: true }
return this.handleOp<Types.WebRtcAnswer>(op)
}
private handleOp<T>(op: ITlvStorageOperation): Promise<T> {
const debug = this.debug || op.debug
if (debug) console.log('handleOp', op)

View file

@ -1,5 +1,9 @@
import { PubLogger, getLogger } from '../../helpers/logger.js';
import webRTC, { WebRtcUserInfo } from '../../webRTC/index.js';
import { TlvFilesStorage } from './tlvFilesStorage.js';
import * as Types from '../../../../proto/autogenerated/ts/types.js'
import { SendData } from '../../nostr/handler.js';
import { SendInitiator } from '../../nostr/handler.js';
export type SerializableLatestData = Record<string, Record<string, { base64tlvs: string[], current_chunk: number, available_chunks: number[] }>>
export type SerializableTlvFile = { base64fileData: string, chunks: number[] }
export type TlvStorageSettings = {
@ -42,6 +46,14 @@ export type LoadTlvFileOperation = {
debug?: boolean
}
export type WebRtcMessageOperation = {
type: 'webRtcMessage'
opId: string
userInfo: WebRtcUserInfo
message: Types.WebRtcMessage_message
debug?: boolean
}
export type ErrorTlvOperationResponse = { success: false, error: string, opId: string }
export interface ITlvStorageOperation {
@ -50,7 +62,7 @@ export interface ITlvStorageOperation {
debug?: boolean
}
export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation
export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation
export type SuccessTlvOperationResponse<T> = { success: true, type: string, data: T, opId: string }
export type TlvOperationResponse<T> = SuccessTlvOperationResponse<T> | ErrorTlvOperationResponse
@ -58,6 +70,7 @@ export type TlvOperationResponse<T> = SuccessTlvOperationResponse<T> | ErrorTlvO
class TlvFilesStorageProcessor {
private log: PubLogger = console.log
private storages: Record<string, TlvFilesStorage> = {}
private wrtc: webRTC
constructor() {
if (!process.send) {
throw new Error('This process must be spawned as a child process');
@ -69,6 +82,17 @@ class TlvFilesStorageProcessor {
process.on('error', (error: Error) => {
console.error('Error in storage processor:', error);
});
this.wrtc = new webRTC(t => {
switch (t) {
case Types.SingleMetricType.USAGE_METRIC:
return this.storages['usage']
case Types.SingleMetricType.BUNDLE_METRIC:
return this.storages['bundle']
default:
throw new Error('Unknown metric type: ' + t)
}
})
}
private async handleOperation(operation: TlvStorageOperation) {
@ -88,6 +112,9 @@ class TlvFilesStorageProcessor {
case 'loadFile':
await this.handleLoadTlvFile(operation);
break;
case 'webRtcMessage':
await this.handleWebRtcMessage(operation);
break;
default:
this.sendResponse({
success: false,
@ -185,6 +212,16 @@ class TlvFilesStorageProcessor {
});
}
private async handleWebRtcMessage(operation: WebRtcMessageOperation) {
const answer = await this.wrtc.OnMessage(operation.userInfo, operation.message)
this.sendResponse<Types.WebRtcAnswer>({
success: true,
type: 'webRtcMessage',
data: answer,
opId: operation.opId
});
}
private sendResponse<T>(response: TlvOperationResponse<T>) {
if (process.send) {
process.send(response);

View file

@ -10,16 +10,19 @@ import { TlvFilesStorage } from '../storage/tlv/tlvFilesStorage.js'
import { TlvStorageInterface } from '../storage/tlv/tlvFilesStorageFactory.js'
type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number }
const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] }
type UserInfo = { userPub: string, appId: string }
export type WebRtcUserInfo = { userPub: string, appId: string }
export type TlvStorageGetter = (t: Types.SingleMetricType) => TlvFilesStorage
export default class webRTC {
private storage: Storage
//private storage: Storage
private log = getLogger({ component: 'webRTC' })
private connections: Record<string, RTCPeerConnection> = {}
private _nostrSend: NostrSend
private utils: Utils
constructor(storage: Storage, utils: Utils) {
this.storage = storage
this.utils = utils
private tlvStorageGetter: TlvStorageGetter
//private utils: Utils
constructor(tlvStorageGetter: TlvStorageGetter) {
this.tlvStorageGetter = tlvStorageGetter
}
attachNostrSend(f: NostrSend) {
this._nostrSend = f
@ -31,12 +34,12 @@ export default class webRTC {
this._nostrSend(initiator, data, relays)
}
private sendCandidate = (u: UserInfo, candidate: string) => {
private sendCandidate = (u: WebRtcUserInfo, candidate: string) => {
const message: Types.WebRtcCandidate & { requestId: string, status: 'OK' } = { candidate, requestId: "SubToWebRtcCandidates", status: 'OK' }
this.nostrSend({ type: 'app', appId: u.appId }, { type: 'content', content: JSON.stringify(message), pub: u.userPub })
}
OnMessage = async (u: UserInfo, message: Types.WebRtcMessage_message): Promise<Types.WebRtcAnswer> => {
OnMessage = async (u: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise<Types.WebRtcAnswer> => {
if (message.type === Types.WebRtcMessage_message_type.OFFER) {
return this.connect(u, message.offer)
} else if (message.type === Types.WebRtcMessage_message_type.CANDIDATE) {
@ -45,7 +48,7 @@ export default class webRTC {
return {}
}
private onCandidate = async (u: UserInfo, candidate: string): Promise<Types.WebRtcAnswer> => {
private onCandidate = async (u: WebRtcUserInfo, candidate: string): Promise<Types.WebRtcAnswer> => {
const key = this.getConnectionsKey(u)
if (!this.connections[key]) {
throw new Error('Connection not found')
@ -57,7 +60,7 @@ export default class webRTC {
}
return {}
}
private connect = async (u: UserInfo, offer: string): Promise<Types.WebRtcAnswer> => {
private connect = async (u: WebRtcUserInfo, offer: string): Promise<Types.WebRtcAnswer> => {
const key = this.getConnectionsKey(u)
this.log("connect", key)
if (this.connections[key]) {
@ -94,17 +97,7 @@ export default class webRTC {
this.log(ERROR, 'SingleUsageMetricReqValidate', err)
return
}
let tlvStorage: TlvStorageInterface
switch (j.metric_type) {
case Types.SingleMetricType.USAGE_METRIC:
tlvStorage = this.storage.metricsEventStorage.tlvStorage
break
case Types.SingleMetricType.BUNDLE_METRIC:
tlvStorage = this.utils.stateBundler.tlvStorage
break
default:
throw new Error("Unknown metric type")
}
const tlvStorage = this.tlvStorageGetter(j.metric_type)
const { fileData } = await tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page)
const id = j.request_id || Math.floor(Math.random() * 100_000_000)
let i = 0
@ -132,7 +125,7 @@ export default class webRTC {
return { answer: JSON.stringify(answer) }
}
getConnectionsKey = (u: UserInfo) => {
getConnectionsKey = (u: WebRtcUserInfo) => {
return u.appId + ":" + u.userPub
}
}