diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index fd873b43..7c83e186 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -1,17 +1,23 @@ import { MainSettings } from "../main/settings.js"; import { StateBundler } from "../storage/tlv/stateBundler.js"; import { TlvStorageFactory } from "../storage/tlv/tlvFilesStorageFactory.js"; - +import { NostrSend } from "../nostr/handler.js"; export class Utils { tlvStorageFactory: TlvStorageFactory stateBundler: StateBundler settings: MainSettings + _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } constructor(settings: MainSettings) { this.settings = settings this.tlvStorageFactory = new TlvStorageFactory() this.stateBundler = new StateBundler(settings.storageSettings, this.tlvStorageFactory) } + attachNostrSend(f: NostrSend) { + this._nostrSend = f + this.tlvStorageFactory.attachNostrSend(f) + } + Stop() { this.stateBundler.Stop() this.tlvStorageFactory.disconnect() diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 18860b99..0aaf5c4e 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -55,7 +55,7 @@ export default class { utils: Utils rugPullTracker: RugPullTracker unlocker: Unlocker - webRTC: webRTC + //webRTC: webRTC nostrSend: NostrSend = () => { getLogger({})("nostr send not initialized yet") } constructor(settings: MainSettings, storage: Storage, adminManager: AdminManager, utils: Utils, unlocker: Unlocker) { this.settings = settings @@ -76,7 +76,7 @@ export default class { this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager) this.debitManager = new DebitManager(this.storage, this.lnd, this.applicationManager) this.offerManager = new OfferManager(this.storage, this.lnd, this.applicationManager, this.productManager) - this.webRTC = new webRTC(this.storage, this.utils) + //this.webRTC = new webRTC(this.storage, this.utils) } @@ -99,7 +99,8 @@ export default class { this.liquidityProvider.attachNostrSend(f) this.debitManager.attachNostrSend(f) this.offerManager.attachNostrSend(f) - this.webRTC.attachNostrSend(f) + this.utils.attachNostrSend(f) + //this.webRTC.attachNostrSend(f) } htlcCb: HtlcCb = (e) => { diff --git a/src/services/serverMethods/index.ts b/src/services/serverMethods/index.ts index 1b57deec..678c9cea 100644 --- a/src/services/serverMethods/index.ts +++ b/src/services/serverMethods/index.ts @@ -11,7 +11,8 @@ export default (mainHandler: Main): Types.ServerMethods => { offer_CustomCheck: offer => offer !== '', } }) - return mainHandler.webRTC.OnMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message) + if (err != null) throw new Error(err.message) + return mainHandler.utils.tlvStorageFactory.WebRtcMessage({ userPub: ctx.operator_id, appId: ctx.app_id }, req.message) }, SubToWebRtcCandidates: async ({ ctx }) => { }, GetUsageMetrics: async ({ ctx, req }) => { diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index cab84a2d..f6eb6b06 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -1,8 +1,10 @@ import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; -import { AddTlvOperation, ITlvStorageOperation, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings } from './tlvFilesStorageProcessor'; +import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation } from './tlvFilesStorageProcessor'; import { LatestData, TlvFile } from './tlvFilesStorage'; - +import { NostrSend, SendData, SendInitiator } from '../../nostr/handler'; +import { WebRtcUserInfo } from '../../webRTC'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' export type TlvStorageInterface = { AddTlv: (appId: string, dataName: string, tlv: Uint8Array) => Promise LoadLatest: (limit?: number) => Promise @@ -13,7 +15,7 @@ export class TlvStorageFactory extends EventEmitter { private process: ChildProcess; private isConnected: boolean = false; private debug: boolean = false; - + private _nostrSend: NostrSend = () => { throw new Error('nostr send not initialized yet') } constructor() { super(); this.initializeSubprocess(); @@ -23,11 +25,26 @@ export class TlvStorageFactory extends EventEmitter { this.debug = debug; } + attachNostrSend(f: NostrSend) { + this._nostrSend = f + } + + private nostrSend = (opResponse: SuccessTlvOperationResponse<{ initiator: SendInitiator, data: SendData, relays?: string[] }>) => { + if (!this._nostrSend) { + throw new Error("No nostrSend attached") + } + this._nostrSend(opResponse.data.initiator, opResponse.data.data, opResponse.data.relays) + } + private initializeSubprocess() { this.process = fork('./build/src/services/storage/tlv/tlvFilesStorageProcessor'); this.process.on('message', (response: TlvOperationResponse) => { - this.emit(response.opId, response); + if (response.success && response.type === 'nostrSend') { + this.nostrSend(response) + } else { + this.emit(response.opId, response); + } }); this.process.on('error', (error: Error) => { @@ -81,6 +98,12 @@ export class TlvStorageFactory extends EventEmitter { return { fileData: Buffer.from(tlvFile.base64fileData, 'base64'), chunks: tlvFile.chunks } } + WebRtcMessage(userInfo: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise { + const opId = Math.random().toString() + const op: WebRtcMessageOperation = { type: 'webRtcMessage', opId, userInfo, message, debug: true } + return this.handleOp(op) + } + private handleOp(op: ITlvStorageOperation): Promise { const debug = this.debug || op.debug if (debug) console.log('handleOp', op) diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index 780e4bef..dada201f 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -1,5 +1,9 @@ import { PubLogger, getLogger } from '../../helpers/logger.js'; +import webRTC, { WebRtcUserInfo } from '../../webRTC/index.js'; import { TlvFilesStorage } from './tlvFilesStorage.js'; +import * as Types from '../../../../proto/autogenerated/ts/types.js' +import { SendData } from '../../nostr/handler.js'; +import { SendInitiator } from '../../nostr/handler.js'; export type SerializableLatestData = Record> export type SerializableTlvFile = { base64fileData: string, chunks: number[] } export type TlvStorageSettings = { @@ -42,6 +46,14 @@ export type LoadTlvFileOperation = { debug?: boolean } +export type WebRtcMessageOperation = { + type: 'webRtcMessage' + opId: string + userInfo: WebRtcUserInfo + message: Types.WebRtcMessage_message + debug?: boolean +} + export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } export interface ITlvStorageOperation { @@ -50,7 +62,7 @@ export interface ITlvStorageOperation { debug?: boolean } -export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse @@ -58,6 +70,7 @@ export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvO class TlvFilesStorageProcessor { private log: PubLogger = console.log private storages: Record = {} + private wrtc: webRTC constructor() { if (!process.send) { throw new Error('This process must be spawned as a child process'); @@ -69,6 +82,17 @@ class TlvFilesStorageProcessor { process.on('error', (error: Error) => { console.error('Error in storage processor:', error); }); + + this.wrtc = new webRTC(t => { + switch (t) { + case Types.SingleMetricType.USAGE_METRIC: + return this.storages['usage'] + case Types.SingleMetricType.BUNDLE_METRIC: + return this.storages['bundle'] + default: + throw new Error('Unknown metric type: ' + t) + } + }) } private async handleOperation(operation: TlvStorageOperation) { @@ -88,6 +112,9 @@ class TlvFilesStorageProcessor { case 'loadFile': await this.handleLoadTlvFile(operation); break; + case 'webRtcMessage': + await this.handleWebRtcMessage(operation); + break; default: this.sendResponse({ success: false, @@ -185,6 +212,16 @@ class TlvFilesStorageProcessor { }); } + private async handleWebRtcMessage(operation: WebRtcMessageOperation) { + const answer = await this.wrtc.OnMessage(operation.userInfo, operation.message) + this.sendResponse({ + success: true, + type: 'webRtcMessage', + data: answer, + opId: operation.opId + }); + } + private sendResponse(response: TlvOperationResponse) { if (process.send) { process.send(response); diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index ef23a772..678ba236 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -10,16 +10,19 @@ import { TlvFilesStorage } from '../storage/tlv/tlvFilesStorage.js' import { TlvStorageInterface } from '../storage/tlv/tlvFilesStorageFactory.js' type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number } const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] } -type UserInfo = { userPub: string, appId: string } +export type WebRtcUserInfo = { userPub: string, appId: string } + +export type TlvStorageGetter = (t: Types.SingleMetricType) => TlvFilesStorage + export default class webRTC { - private storage: Storage + //private storage: Storage private log = getLogger({ component: 'webRTC' }) private connections: Record = {} private _nostrSend: NostrSend - private utils: Utils - constructor(storage: Storage, utils: Utils) { - this.storage = storage - this.utils = utils + private tlvStorageGetter: TlvStorageGetter + //private utils: Utils + constructor(tlvStorageGetter: TlvStorageGetter) { + this.tlvStorageGetter = tlvStorageGetter } attachNostrSend(f: NostrSend) { this._nostrSend = f @@ -31,12 +34,12 @@ export default class webRTC { this._nostrSend(initiator, data, relays) } - private sendCandidate = (u: UserInfo, candidate: string) => { + private sendCandidate = (u: WebRtcUserInfo, candidate: string) => { const message: Types.WebRtcCandidate & { requestId: string, status: 'OK' } = { candidate, requestId: "SubToWebRtcCandidates", status: 'OK' } this.nostrSend({ type: 'app', appId: u.appId }, { type: 'content', content: JSON.stringify(message), pub: u.userPub }) } - OnMessage = async (u: UserInfo, message: Types.WebRtcMessage_message): Promise => { + OnMessage = async (u: WebRtcUserInfo, message: Types.WebRtcMessage_message): Promise => { if (message.type === Types.WebRtcMessage_message_type.OFFER) { return this.connect(u, message.offer) } else if (message.type === Types.WebRtcMessage_message_type.CANDIDATE) { @@ -45,7 +48,7 @@ export default class webRTC { return {} } - private onCandidate = async (u: UserInfo, candidate: string): Promise => { + private onCandidate = async (u: WebRtcUserInfo, candidate: string): Promise => { const key = this.getConnectionsKey(u) if (!this.connections[key]) { throw new Error('Connection not found') @@ -57,7 +60,7 @@ export default class webRTC { } return {} } - private connect = async (u: UserInfo, offer: string): Promise => { + private connect = async (u: WebRtcUserInfo, offer: string): Promise => { const key = this.getConnectionsKey(u) this.log("connect", key) if (this.connections[key]) { @@ -94,17 +97,7 @@ export default class webRTC { this.log(ERROR, 'SingleUsageMetricReqValidate', err) return } - let tlvStorage: TlvStorageInterface - switch (j.metric_type) { - case Types.SingleMetricType.USAGE_METRIC: - tlvStorage = this.storage.metricsEventStorage.tlvStorage - break - case Types.SingleMetricType.BUNDLE_METRIC: - tlvStorage = this.utils.stateBundler.tlvStorage - break - default: - throw new Error("Unknown metric type") - } + const tlvStorage = this.tlvStorageGetter(j.metric_type) const { fileData } = await tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 @@ -132,7 +125,7 @@ export default class webRTC { return { answer: JSON.stringify(answer) } } - getConnectionsKey = (u: UserInfo) => { + getConnectionsKey = (u: WebRtcUserInfo) => { return u.appId + ":" + u.userPub } } \ No newline at end of file