send metric events

This commit is contained in:
boufni95 2025-01-29 15:13:21 +00:00
parent 78a2bdc4b7
commit 565da33c5a
3 changed files with 64 additions and 15 deletions

View file

@ -2,6 +2,25 @@ import { bytesToHex, concatBytes } from '@noble/hashes/utils'
import * as Types from '../../../proto/autogenerated/ts/types.js'
export const utf8Decoder: TextDecoder = new TextDecoder('utf-8')
export const utf8Encoder: TextEncoder = new TextEncoder()
export type DataPacket = { dataId: number, packetNum: number, totalPackets: number, data: Uint8Array }
export const encodeTLVDataPacket = (packInfo: DataPacket): TLV => {
const { data, dataId, packetNum, totalPackets } = packInfo
const tlv: TLV = {}
tlv[2] = [integerToUint8Array(dataId)]
tlv[3] = [integerToUint8Array(packetNum)]
tlv[4] = [integerToUint8Array(totalPackets)]
tlv[5] = [data]
return tlv
}
export const decodeTLVDataPacket = (tlv: TLV): DataPacket => {
return {
dataId: parseInt(bytesToHex(tlv[2][0]), 16),
packetNum: parseInt(bytesToHex(tlv[3][0]), 16),
totalPackets: parseInt(bytesToHex(tlv[4][0]), 16),
data: tlv[5][0]
}
}
export const encodeListTLV = (list: Uint8Array[]): TLV => {
const tlv: TLV = {}

View file

@ -132,12 +132,15 @@ export default class {
return metrics
}
LoadMetricsFile = async (app: string, method: string, chunk: number): Promise<Types.UsageMetricTlv> => {
LoadRawMetricsFile = async (app: string, method: string, chunk: number): Promise<Buffer> => {
if (!this.metaReady || !this.metricsMeta[app] || !this.metricsMeta[app][method] || !this.metricsMeta[app][method].chunks.includes(chunk)) {
throw new Error("metrics not found")
}
const fullPath = [this.metricsPath, app, method, `${chunk}.mtlv`].join("/")
const tlv = fs.readFileSync(fullPath)
return fs.readFileSync(fullPath)
}
LoadMetricsFile = async (app: string, method: string, chunk: number): Promise<Types.UsageMetricTlv> => {
const tlv = await this.LoadRawMetricsFile(app, method, chunk)
const decoded = decodeListTLV(parseTLV(tlv))
return {
base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')),

View file

@ -1,16 +1,21 @@
//@ts-ignore
import wrtc from 'wrtc'
import { getLogger } from "../helpers/logger.js"
import Storage from '../storage/index.js'
import { ERROR, getLogger } from "../helpers/logger.js"
import * as Types from '../../../proto/autogenerated/ts/types.js'
import { NostrSend, SendData, SendInitiator } from "../nostr/handler.js"
import { encodeTLV, encodeTLVDataPacket } from '../helpers/tlv.js'
type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number }
const configuration = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' }] }
const configuration = { 'iceServers': [{ 'urls': 'stun.stunprotocol.org:3478' }] }
type UserInfo = { userPub: string, appId: string }
export default class webRTC {
private storage: Storage
private log = getLogger({ component: 'webRTC' })
private connections: Record<string, RTCPeerConnection> = {}
private _nostrSend: NostrSend
constructor(storage: Storage) {
this.storage = storage
}
attachNostrSend(f: NostrSend) {
this._nostrSend = f
}
@ -67,22 +72,44 @@ export default class webRTC {
this.sendCandidate(u, JSON.stringify(message))
}
conn.onconnectionstatechange = (event) => {
console.log('onconnectionstatechange', event, conn.connectionState)
console.log('onconnectionstatechange', conn.connectionState)
}
conn.ondatachannel = (event) => {
console.log('ondatachannel', event)
const channel = event.channel
channel.addEventListener('message', (event) => {
console.log('message', event)
channel.send(event.data + " to you!")
channel.addEventListener('message', async (event) => {
const j = JSON.parse(event.data) as Types.SingleUsageMetricReq
const err = Types.SingleUsageMetricReqValidate(j, {
app_id_CustomCheck: id => id === u.appId,
metrics_name_CustomCheck: name => name !== ""
})
if (err) {
this.log(ERROR, 'SingleUsageMetricReqValidate', err)
return
}
const res = await this.storage.metricsEventStorage.LoadRawMetricsFile(j.app_id, j.metrics_name, j.page)
const id = Math.floor(Math.random() * 2_000_000_000)
let i = 0
const packets: Buffer[] = []
while (i < res.length) {
const chunk = res.slice(i, Math.min(i + 15_000, res.length))
packets.push(chunk)
i += 15_000
}
for (let i = 0; i < packets.length; i++) {
const packet = packets[i]
const tlv = encodeTLVDataPacket({ dataId: id, packetNum: i + 1, totalPackets: packets.length, data: packet })
const bytes = encodeTLV(tlv)
channel.send(bytes)
}
})
}
conn.oniceconnectionstatechange = (event) => {
/* conn.oniceconnectionstatechange = (event) => {
console.log('oniceconnectionstatechange', event)
}
conn.onicegatheringstatechange = (event) => {
console.log('onicegatheringstatechange', event)
}
} */
await conn.setRemoteDescription(JSON.parse(offer))
const answer = await conn.createAnswer()
await conn.setLocalDescription(answer)