From 565da33c5ad28a8f304652f6aee2edadfce2840f Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 29 Jan 2025 15:13:21 +0000 Subject: [PATCH] send metric events --- src/services/helpers/tlv.ts | 19 ++++++++ src/services/storage/metricsEventStorage.ts | 7 ++- src/services/webRTC/index.ts | 53 ++++++++++++++++----- 3 files changed, 64 insertions(+), 15 deletions(-) diff --git a/src/services/helpers/tlv.ts b/src/services/helpers/tlv.ts index 7e8d6b08..df215090 100644 --- a/src/services/helpers/tlv.ts +++ b/src/services/helpers/tlv.ts @@ -2,6 +2,25 @@ import { bytesToHex, concatBytes } from '@noble/hashes/utils' import * as Types from '../../../proto/autogenerated/ts/types.js' export const utf8Decoder: TextDecoder = new TextDecoder('utf-8') export const utf8Encoder: TextEncoder = new TextEncoder() +export type DataPacket = { dataId: number, packetNum: number, totalPackets: number, data: Uint8Array } +export const encodeTLVDataPacket = (packInfo: DataPacket): TLV => { + const { data, dataId, packetNum, totalPackets } = packInfo + const tlv: TLV = {} + tlv[2] = [integerToUint8Array(dataId)] + tlv[3] = [integerToUint8Array(packetNum)] + tlv[4] = [integerToUint8Array(totalPackets)] + tlv[5] = [data] + return tlv +} + +export const decodeTLVDataPacket = (tlv: TLV): DataPacket => { + return { + dataId: parseInt(bytesToHex(tlv[2][0]), 16), + packetNum: parseInt(bytesToHex(tlv[3][0]), 16), + totalPackets: parseInt(bytesToHex(tlv[4][0]), 16), + data: tlv[5][0] + } +} export const encodeListTLV = (list: Uint8Array[]): TLV => { const tlv: TLV = {} diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/metricsEventStorage.ts index a1447883..619f903c 100644 --- a/src/services/storage/metricsEventStorage.ts +++ b/src/services/storage/metricsEventStorage.ts @@ -132,12 +132,15 @@ export default class { return metrics } - LoadMetricsFile = async (app: string, method: string, chunk: number): Promise => { + LoadRawMetricsFile = async (app: string, method: string, chunk: number): Promise => { if (!this.metaReady || !this.metricsMeta[app] || !this.metricsMeta[app][method] || !this.metricsMeta[app][method].chunks.includes(chunk)) { throw new Error("metrics not found") } const fullPath = [this.metricsPath, app, method, `${chunk}.mtlv`].join("/") - const tlv = fs.readFileSync(fullPath) + return fs.readFileSync(fullPath) + } + LoadMetricsFile = async (app: string, method: string, chunk: number): Promise => { + const tlv = await this.LoadRawMetricsFile(app, method, chunk) const decoded = decodeListTLV(parseTLV(tlv)) return { base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')), diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 419444ec..03b472fc 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -1,16 +1,21 @@ //@ts-ignore import wrtc from 'wrtc' -import { getLogger } from "../helpers/logger.js" +import Storage from '../storage/index.js' +import { ERROR, getLogger } from "../helpers/logger.js" import * as Types from '../../../proto/autogenerated/ts/types.js' import { NostrSend, SendData, SendInitiator } from "../nostr/handler.js" +import { encodeTLV, encodeTLVDataPacket } from '../helpers/tlv.js' type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number } -const configuration = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' }] } +const configuration = { 'iceServers': [{ 'urls': 'stun.stunprotocol.org:3478' }] } type UserInfo = { userPub: string, appId: string } export default class webRTC { - + private storage: Storage private log = getLogger({ component: 'webRTC' }) private connections: Record = {} private _nostrSend: NostrSend + constructor(storage: Storage) { + this.storage = storage + } attachNostrSend(f: NostrSend) { this._nostrSend = f } @@ -67,22 +72,44 @@ export default class webRTC { this.sendCandidate(u, JSON.stringify(message)) } conn.onconnectionstatechange = (event) => { - console.log('onconnectionstatechange', event, conn.connectionState) + console.log('onconnectionstatechange', conn.connectionState) } conn.ondatachannel = (event) => { console.log('ondatachannel', event) const channel = event.channel - channel.addEventListener('message', (event) => { - console.log('message', event) - channel.send(event.data + " to you!") + channel.addEventListener('message', async (event) => { + const j = JSON.parse(event.data) as Types.SingleUsageMetricReq + const err = Types.SingleUsageMetricReqValidate(j, { + app_id_CustomCheck: id => id === u.appId, + metrics_name_CustomCheck: name => name !== "" + }) + if (err) { + this.log(ERROR, 'SingleUsageMetricReqValidate', err) + return + } + const res = await this.storage.metricsEventStorage.LoadRawMetricsFile(j.app_id, j.metrics_name, j.page) + const id = Math.floor(Math.random() * 2_000_000_000) + let i = 0 + const packets: Buffer[] = [] + while (i < res.length) { + const chunk = res.slice(i, Math.min(i + 15_000, res.length)) + packets.push(chunk) + i += 15_000 + } + for (let i = 0; i < packets.length; i++) { + const packet = packets[i] + const tlv = encodeTLVDataPacket({ dataId: id, packetNum: i + 1, totalPackets: packets.length, data: packet }) + const bytes = encodeTLV(tlv) + channel.send(bytes) + } }) } - conn.oniceconnectionstatechange = (event) => { - console.log('oniceconnectionstatechange', event) - } - conn.onicegatheringstatechange = (event) => { - console.log('onicegatheringstatechange', event) - } + /* conn.oniceconnectionstatechange = (event) => { + console.log('oniceconnectionstatechange', event) + } + conn.onicegatheringstatechange = (event) => { + console.log('onicegatheringstatechange', event) + } */ await conn.setRemoteDescription(JSON.parse(offer)) const answer = await conn.createAnswer() await conn.setLocalDescription(answer)