From 531947a49747ec0617d3c7660717d023c915e1f2 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 4 Feb 2025 20:13:41 +0000 Subject: [PATCH 01/11] bundle metrics --- proto/autogenerated/client.md | 26 +++ proto/autogenerated/go/http_client.go | 30 ++++ proto/autogenerated/go/types.go | 14 ++ proto/autogenerated/ts/express_server.ts | 22 +++ proto/autogenerated/ts/http_client.ts | 14 ++ proto/autogenerated/ts/nostr_client.ts | 15 ++ proto/autogenerated/ts/nostr_transport.ts | 16 ++ proto/autogenerated/ts/types.ts | 105 ++++++++++- proto/service/methods.proto | 8 + proto/service/structs.proto | 20 +++ src/index.ts | 19 +- src/services/helpers/tlv.ts | 2 + src/services/helpers/utilsWrapper.ts | 2 +- src/services/main/index.ts | 8 +- src/services/main/paymentManager.ts | 4 +- src/services/serverMethods/index.ts | 3 + src/services/storage/index.ts | 2 - src/services/storage/metricsEventStorage.ts | 166 +++-------------- src/services/storage/stateBundler.ts | 190 ++++++++++---------- src/services/storage/tlvFilesStorage.ts | 153 ++++++++++++++++ src/services/webRTC/index.ts | 6 +- 21 files changed, 573 insertions(+), 252 deletions(-) create mode 100644 src/services/storage/tlvFilesStorage.ts diff --git a/proto/autogenerated/client.md b/proto/autogenerated/client.md index 277807ee..04f579e6 100644 --- a/proto/autogenerated/client.md +++ b/proto/autogenerated/client.md @@ -93,6 +93,11 @@ The nostr server will send back a message response, and inside the body there wi - input: [AppsMetricsRequest](#AppsMetricsRequest) - output: [AppsMetrics](#AppsMetrics) +- GetBundleMetrics + - auth type: __Metrics__ + - input: [LatestBundleMetricReq](#LatestBundleMetricReq) + - output: [BundleMetrics](#BundleMetrics) + - GetDebitAuthorizations - auth type: __User__ - This methods has an __empty__ __request__ body @@ -481,6 +486,13 @@ The nostr server will send back a message response, and inside the body there wi - input: [AppsMetricsRequest](#AppsMetricsRequest) - output: [AppsMetrics](#AppsMetrics) +- GetBundleMetrics + - auth type: __Metrics__ + - http method: __post__ + - http route: __/api/reports/bundle__ + - input: [LatestBundleMetricReq](#LatestBundleMetricReq) + - output: [BundleMetrics](#BundleMetrics) + - GetDebitAuthorizations - auth type: __User__ - http method: __get__ @@ -958,6 +970,17 @@ The nostr server will send back a message response, and inside the body there wi - __nostr_pub__: _string_ - __user_identifier__: _string_ +### BundleData + - __available_chunks__: ARRAY of: _number_ + - __base_64_data__: ARRAY of: _string_ + - __current_chunk__: _number_ + +### BundleMetric + - __app_bundles__: MAP with key: _string_ and value: _[BundleData](#BundleData)_ + +### BundleMetrics + - __apps__: MAP with key: _string_ and value: _[BundleMetric](#BundleMetric)_ + ### CallbackUrl - __url__: _string_ @@ -1107,6 +1130,9 @@ The nostr server will send back a message response, and inside the body there wi - __token__: _string_ - __url__: _string_ +### LatestBundleMetricReq + - __limit__: _number_ *this field is optional + ### LatestUsageMetricReq - __limit__: _number_ *this field is optional diff --git a/proto/autogenerated/go/http_client.go b/proto/autogenerated/go/http_client.go index 78889c60..285945da 100644 --- a/proto/autogenerated/go/http_client.go +++ b/proto/autogenerated/go/http_client.go @@ -77,6 +77,7 @@ type Client struct { GetAppUser func(req GetAppUserRequest) (*AppUser, error) GetAppUserLNURLInfo func(req GetAppUserLNURLInfoRequest) (*LnurlPayInfoResponse, error) GetAppsMetrics func(req AppsMetricsRequest) (*AppsMetrics, error) + GetBundleMetrics func(req LatestBundleMetricReq) (*BundleMetrics, error) GetDebitAuthorizations func() (*DebitAuthorizations, error) GetErrorStats func() (*ErrorStats, error) GetHttpCreds func() (*HttpCreds, error) @@ -740,6 +741,35 @@ func NewClient(params ClientParams) *Client { } return &res, nil }, + GetBundleMetrics: func(req LatestBundleMetricReq) (*BundleMetrics, error) { + auth, err := params.RetrieveMetricsAuth() + if err != nil { + return nil, err + } + finalRoute := "/api/reports/bundle" + body, err := json.Marshal(req) + if err != nil { + return nil, err + } + resBody, err := doPostRequest(params.BaseURL+finalRoute, body, auth) + if err != nil { + return nil, err + } + result := ResultError{} + err = json.Unmarshal(resBody, &result) + if err != nil { + return nil, err + } + if result.Status == "ERROR" { + return nil, fmt.Errorf(result.Reason) + } + res := BundleMetrics{} + err = json.Unmarshal(resBody, &res) + if err != nil { + return nil, err + } + return &res, nil + }, GetDebitAuthorizations: func() (*DebitAuthorizations, error) { auth, err := params.RetrieveUserAuth() if err != nil { diff --git a/proto/autogenerated/go/types.go b/proto/autogenerated/go/types.go index 11158b1b..b5d42976 100644 --- a/proto/autogenerated/go/types.go +++ b/proto/autogenerated/go/types.go @@ -174,6 +174,17 @@ type BannedAppUser struct { Nostr_pub string `json:"nostr_pub"` User_identifier string `json:"user_identifier"` } +type BundleData struct { + Available_chunks []int64 `json:"available_chunks"` + Base_64_data []string `json:"base_64_data"` + Current_chunk int64 `json:"current_chunk"` +} +type BundleMetric struct { + App_bundles map[string]BundleData `json:"app_bundles"` +} +type BundleMetrics struct { + Apps map[string]BundleMetric `json:"apps"` +} type CallbackUrl struct { Url string `json:"url"` } @@ -323,6 +334,9 @@ type HttpCreds struct { Token string `json:"token"` Url string `json:"url"` } +type LatestBundleMetricReq struct { + Limit int64 `json:"limit"` +} type LatestUsageMetricReq struct { Limit int64 `json:"limit"` } diff --git a/proto/autogenerated/ts/express_server.ts b/proto/autogenerated/ts/express_server.ts index 5c1da226..3ab582e1 100644 --- a/proto/autogenerated/ts/express_server.ts +++ b/proto/autogenerated/ts/express_server.ts @@ -866,6 +866,28 @@ export default (methods: Types.ServerMethods, opts: ServerOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } }) + if (!opts.allowNotImplementedMethods && !methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented') + app.post('/api/reports/bundle', async (req, res) => { + const info: Types.RequestInfo = { rpcName: 'GetBundleMetrics', batch: false, nostr: false, batchSize: 0} + const stats: Types.RequestStats = { startMs:req.startTimeMs || 0, start:req.startTime || 0n, parse: process.hrtime.bigint(), guard: 0n, validate: 0n, handle: 0n } + let authCtx: Types.AuthContext = {} + try { + if (!methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented') + const authContext = await opts.MetricsAuthGuard(req.headers['authorization']) + authCtx = authContext + stats.guard = process.hrtime.bigint() + const request = req.body + const error = Types.LatestBundleMetricReqValidate(request) + stats.validate = process.hrtime.bigint() + if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authContext }, opts.metricsCallback) + const query = req.query + const params = req.params + const response = await methods.GetBundleMetrics({rpcName:'GetBundleMetrics', ctx:authContext , req: request}) + stats.handle = process.hrtime.bigint() + res.json({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + }) if (!opts.allowNotImplementedMethods && !methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented') app.get('/api/user/debit/get', async (req, res) => { const info: Types.RequestInfo = { rpcName: 'GetDebitAuthorizations', batch: false, nostr: false, batchSize: 0} diff --git a/proto/autogenerated/ts/http_client.ts b/proto/autogenerated/ts/http_client.ts index 2c408f93..f2d79af6 100644 --- a/proto/autogenerated/ts/http_client.ts +++ b/proto/autogenerated/ts/http_client.ts @@ -318,6 +318,20 @@ export default (params: ClientParams) => ({ } return { status: 'ERROR', reason: 'invalid response' } }, + GetBundleMetrics: async (request: Types.LatestBundleMetricReq): Promise => { + const auth = await params.retrieveMetricsAuth() + if (auth === null) throw new Error('retrieveMetricsAuth() returned null') + let finalRoute = '/api/reports/bundle' + const { data } = await axios.post(params.baseUrl + finalRoute, request, { headers: { 'authorization': auth } }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.BundleMetricsValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, GetDebitAuthorizations: async (): Promise => { const auth = await params.retrieveUserAuth() if (auth === null) throw new Error('retrieveUserAuth() returned null') diff --git a/proto/autogenerated/ts/nostr_client.ts b/proto/autogenerated/ts/nostr_client.ts index b2cc12c7..28a281f6 100644 --- a/proto/autogenerated/ts/nostr_client.ts +++ b/proto/autogenerated/ts/nostr_client.ts @@ -233,6 +233,21 @@ export default (params: NostrClientParams, send: (to:string, message: NostrRequ } return { status: 'ERROR', reason: 'invalid response' } }, + GetBundleMetrics: async (request: Types.LatestBundleMetricReq): Promise => { + const auth = await params.retrieveNostrMetricsAuth() + if (auth === null) throw new Error('retrieveNostrMetricsAuth() returned null') + const nostrRequest: NostrRequest = {} + nostrRequest.body = request + const data = await send(params.pubDestination, {rpcName:'GetBundleMetrics',authIdentifier:auth, ...nostrRequest }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.BundleMetricsValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, GetDebitAuthorizations: async (): Promise => { const auth = await params.retrieveNostrUserAuth() if (auth === null) throw new Error('retrieveNostrUserAuth() returned null') diff --git a/proto/autogenerated/ts/nostr_transport.ts b/proto/autogenerated/ts/nostr_transport.ts index 2c4ff956..5cf473ff 100644 --- a/proto/autogenerated/ts/nostr_transport.ts +++ b/proto/autogenerated/ts/nostr_transport.ts @@ -621,6 +621,22 @@ export default (methods: Types.ServerMethods, opts: NostrOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } break + case 'GetBundleMetrics': + try { + if (!methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented') + const authContext = await opts.NostrMetricsAuthGuard(req.appId, req.authIdentifier) + stats.guard = process.hrtime.bigint() + authCtx = authContext + const request = req.body + const error = Types.LatestBundleMetricReqValidate(request) + stats.validate = process.hrtime.bigint() + if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback) + const response = await methods.GetBundleMetrics({rpcName:'GetBundleMetrics', ctx:authContext , req: request}) + stats.handle = process.hrtime.bigint() + res({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + break case 'GetDebitAuthorizations': try { if (!methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented') diff --git a/proto/autogenerated/ts/types.ts b/proto/autogenerated/ts/types.ts index 76655f9a..ca4a72e9 100644 --- a/proto/autogenerated/ts/types.ts +++ b/proto/autogenerated/ts/types.ts @@ -28,8 +28,8 @@ export type MetricsContext = { app_id: string operator_id: string } -export type MetricsMethodInputs = GetAppsMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input -export type MetricsMethodOutputs = GetAppsMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output +export type MetricsMethodInputs = GetAppsMetrics_Input | GetBundleMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input +export type MetricsMethodOutputs = GetAppsMetrics_Output | GetBundleMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output export type UserContext = { app_id: string app_user_id: string @@ -108,6 +108,9 @@ export type GetAppUserLNURLInfo_Output = ResultError | ({ status: 'OK' } & Lnurl export type GetAppsMetrics_Input = {rpcName:'GetAppsMetrics', req: AppsMetricsRequest} export type GetAppsMetrics_Output = ResultError | ({ status: 'OK' } & AppsMetrics) +export type GetBundleMetrics_Input = {rpcName:'GetBundleMetrics', req: LatestBundleMetricReq} +export type GetBundleMetrics_Output = ResultError | ({ status: 'OK' } & BundleMetrics) + export type GetDebitAuthorizations_Input = {rpcName:'GetDebitAuthorizations'} export type GetDebitAuthorizations_Output = ResultError | ({ status: 'OK' } & DebitAuthorizations) @@ -312,6 +315,7 @@ export type ServerMethods = { GetAppUser?: (req: GetAppUser_Input & {ctx: AppContext }) => Promise GetAppUserLNURLInfo?: (req: GetAppUserLNURLInfo_Input & {ctx: AppContext }) => Promise GetAppsMetrics?: (req: GetAppsMetrics_Input & {ctx: MetricsContext }) => Promise + GetBundleMetrics?: (req: GetBundleMetrics_Input & {ctx: MetricsContext }) => Promise GetDebitAuthorizations?: (req: GetDebitAuthorizations_Input & {ctx: UserContext }) => Promise GetErrorStats?: (req: GetErrorStats_Input & {ctx: MetricsContext }) => Promise GetHttpCreds?: (req: GetHttpCreds_Input & {ctx: UserContext }) => Promise @@ -924,6 +928,84 @@ export const BannedAppUserValidate = (o?: BannedAppUser, opts: BannedAppUserOpti return null } +export type BundleData = { + available_chunks: number[] + base_64_data: string[] + current_chunk: number +} +export const BundleDataOptionalFields: [] = [] +export type BundleDataOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + available_chunks_CustomCheck?: (v: number[]) => boolean + base_64_data_CustomCheck?: (v: string[]) => boolean + current_chunk_CustomCheck?: (v: number) => boolean +} +export const BundleDataValidate = (o?: BundleData, opts: BundleDataOptions = {}, path: string = 'BundleData::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (!Array.isArray(o.available_chunks)) return new Error(`${path}.available_chunks: is not an array`) + for (let index = 0; index < o.available_chunks.length; index++) { + if (typeof o.available_chunks[index] !== 'number') return new Error(`${path}.available_chunks[${index}]: is not a number`) + } + if (opts.available_chunks_CustomCheck && !opts.available_chunks_CustomCheck(o.available_chunks)) return new Error(`${path}.available_chunks: custom check failed`) + + if (!Array.isArray(o.base_64_data)) return new Error(`${path}.base_64_data: is not an array`) + for (let index = 0; index < o.base_64_data.length; index++) { + if (typeof o.base_64_data[index] !== 'string') return new Error(`${path}.base_64_data[${index}]: is not a string`) + } + if (opts.base_64_data_CustomCheck && !opts.base_64_data_CustomCheck(o.base_64_data)) return new Error(`${path}.base_64_data: custom check failed`) + + if (typeof o.current_chunk !== 'number') return new Error(`${path}.current_chunk: is not a number`) + if (opts.current_chunk_CustomCheck && !opts.current_chunk_CustomCheck(o.current_chunk)) return new Error(`${path}.current_chunk: custom check failed`) + + return null +} + +export type BundleMetric = { + app_bundles: Record +} +export const BundleMetricOptionalFields: [] = [] +export type BundleMetricOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + app_bundles_EntryOptions?: BundleDataOptions + app_bundles_CustomCheck?: (v: Record) => boolean +} +export const BundleMetricValidate = (o?: BundleMetric, opts: BundleMetricOptions = {}, path: string = 'BundleMetric::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (typeof o.app_bundles !== 'object' || o.app_bundles === null) return new Error(`${path}.app_bundles: is not an object or is null`) + for (const key in o.app_bundles) { + const app_bundlesErr = BundleDataValidate(o.app_bundles[key], opts.app_bundles_EntryOptions, `${path}.app_bundles['${key}']`) + if (app_bundlesErr !== null) return app_bundlesErr + } + + return null +} + +export type BundleMetrics = { + apps: Record +} +export const BundleMetricsOptionalFields: [] = [] +export type BundleMetricsOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + apps_EntryOptions?: BundleMetricOptions + apps_CustomCheck?: (v: Record) => boolean +} +export const BundleMetricsValidate = (o?: BundleMetrics, opts: BundleMetricsOptions = {}, path: string = 'BundleMetrics::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (typeof o.apps !== 'object' || o.apps === null) return new Error(`${path}.apps: is not an object or is null`) + for (const key in o.apps) { + const appsErr = BundleMetricValidate(o.apps[key], opts.apps_EntryOptions, `${path}.apps['${key}']`) + if (appsErr !== null) return appsErr + } + + return null +} + export type CallbackUrl = { url: string } @@ -1812,6 +1894,25 @@ export const HttpCredsValidate = (o?: HttpCreds, opts: HttpCredsOptions = {}, pa return null } +export type LatestBundleMetricReq = { + limit?: number +} +export type LatestBundleMetricReqOptionalField = 'limit' +export const LatestBundleMetricReqOptionalFields: LatestBundleMetricReqOptionalField[] = ['limit'] +export type LatestBundleMetricReqOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: LatestBundleMetricReqOptionalField[] + limit_CustomCheck?: (v?: number) => boolean +} +export const LatestBundleMetricReqValidate = (o?: LatestBundleMetricReq, opts: LatestBundleMetricReqOptions = {}, path: string = 'LatestBundleMetricReq::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if ((o.limit || opts.allOptionalsAreSet || opts.checkOptionalsAreSet?.includes('limit')) && typeof o.limit !== 'number') return new Error(`${path}.limit: is not a number`) + if (opts.limit_CustomCheck && !opts.limit_CustomCheck(o.limit)) return new Error(`${path}.limit: custom check failed`) + + return null +} + export type LatestUsageMetricReq = { limit?: number } diff --git a/proto/service/methods.proto b/proto/service/methods.proto index 1b404f2b..5f20f8ac 100644 --- a/proto/service/methods.proto +++ b/proto/service/methods.proto @@ -181,6 +181,14 @@ service LightningPub { option (http_route) = "/api/reports/usage"; option (nostr) = true; } + + rpc GetBundleMetrics(structs.LatestBundleMetricReq) returns (structs.BundleMetrics) { + option (auth_type) = "Metrics"; + option (http_method) = "post"; + option (http_route) = "/api/reports/bundle"; + option (nostr) = true; + } + rpc GetSingleUsageMetrics(structs.SingleUsageMetricReq) returns (structs.UsageMetricTlv) { option (auth_type) = "Metrics"; option (http_method) = "post"; diff --git a/proto/service/structs.proto b/proto/service/structs.proto index 2e74b8fa..8079f6ea 100644 --- a/proto/service/structs.proto +++ b/proto/service/structs.proto @@ -62,6 +62,8 @@ message UsageMetric { optional string app_id = 11; } + + message UsageMetricTlv { repeated string base_64_tlvs = 1; int64 current_chunk = 2; @@ -77,6 +79,24 @@ message UsageMetrics { map apps = 1; } +message LatestBundleMetricReq { + optional int64 limit = 1; +} + +message BundleData { + repeated string base_64_data = 1; + int64 current_chunk = 2; + repeated int64 available_chunks = 3; +} + +message BundleMetric { + map app_bundles = 1; +} + +message BundleMetrics { + map apps = 1; +} + message AppsMetricsRequest { optional int64 from_unix = 1; optional int64 to_unix = 2; diff --git a/src/index.ts b/src/index.ts index 886202be..19464c04 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,10 +24,11 @@ const start = async () => { const serverMethods = GetServerMethods(mainHandler) const nostrSettings = LoadNosrtSettingsFromEnv() log("initializing nostr middleware") - const { Send } = nostrMiddleware(serverMethods, mainHandler, + const { Send, Stop } = nostrMiddleware(serverMethods, mainHandler, { ...nostrSettings, apps, clients: [liquidityProviderInfo] }, (e, p) => mainHandler.liquidityProvider.onEvent(e, p) ) + exitHandler(() => { Stop() }) log("starting server") mainHandler.attachNostrSend(Send) mainHandler.StartBeacons() @@ -40,3 +41,19 @@ const start = async () => { Server.Listen(mainSettings.servicePort) } start() + +const exitHandler = async (kill: () => void) => { + // catch ctrl+c event and exit normally + process.on('SIGINT', () => { + console.log('Ctrl-C detected, exiting safely...'); + process.exit(2); + }); + + //catch uncaught exceptions, trace, then exit normally + process.on('uncaughtException', (e) => { + console.log('Uncaught Exception detected, exiting safely, and killing all child processes...'); + console.log(e.stack); + kill(); + process.exit(99); + }); +} \ No newline at end of file diff --git a/src/services/helpers/tlv.ts b/src/services/helpers/tlv.ts index e86b9ada..7d033143 100644 --- a/src/services/helpers/tlv.ts +++ b/src/services/helpers/tlv.ts @@ -32,6 +32,8 @@ export const decodeListTLV = (tlv: TLV): Uint8Array[] => { return tlv[64] } + + export const usageMetricsToTlv = (metric: Types.UsageMetric): TLV => { const tlv: TLV = {} tlv[2] = [integerToUint8Array(Math.ceil(metric.processed_at_ms / 1000))] // 6 -> 6 diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index f00626e0..35cefc17 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -6,6 +6,6 @@ export class Utils { settings: MainSettings constructor(settings: MainSettings) { this.settings = settings - this.stateBundler = new StateBundler() + this.stateBundler = new StateBundler(settings.storageSettings) } } \ No newline at end of file diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 08001081..7de52a43 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -188,9 +188,9 @@ export default class { const operationId = `${Types.UserOperationType.INCOMING_TX}-${addedTx.serial_id}` const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: fee, confirmed: internal, tx_hash: txOutput.hash, internal: false } this.sendOperationToNostr(userAddress.linkedApplication, userAddress.user.user_id, op) - this.utils.stateBundler.AddTxPoint('addressWasPaid', amount, { used, from: 'system', timeDiscount: true }) + this.utils.stateBundler.AddTxPoint('addressWasPaid', amount, { used, from: 'system', timeDiscount: true }, userAddress.linkedApplication.app_id) } catch (err: any) { - this.utils.stateBundler.AddTxPointFailed('addressWasPaid', amount, { used, from: 'system' }) + this.utils.stateBundler.AddTxPointFailed('addressWasPaid', amount, { used, from: 'system' }, userAddress.linkedApplication.app_id) log(ERROR, "cannot process address paid transaction, already registered") } }) @@ -234,9 +234,9 @@ export default class { log(ERROR, "cannot create zap receipt", err.message || "") } this.liquidityManager.afterInInvoicePaid() - this.utils.stateBundler.AddTxPoint('invoiceWasPaid', amount, { used, from: 'system', timeDiscount: true }) + this.utils.stateBundler.AddTxPoint('invoiceWasPaid', amount, { used, from: 'system', timeDiscount: true }, userInvoice.linkedApplication.app_id) } catch (err: any) { - this.utils.stateBundler.AddTxPointFailed('invoiceWasPaid', amount, { used, from: 'system' }) + this.utils.stateBundler.AddTxPointFailed('invoiceWasPaid', amount, { used, from: 'system' }, userInvoice.linkedApplication.app_id) log(ERROR, "cannot process paid invoice", err.message || "") } }) diff --git a/src/services/main/paymentManager.ts b/src/services/main/paymentManager.ts index 4a18a2f8..797bfe8c 100644 --- a/src/services/main/paymentManager.ts +++ b/src/services/main/paymentManager.ts @@ -355,11 +355,11 @@ export default class { try { await this.invoicePaidCb(internalInvoice.invoice, payAmount, 'internal') const newPayment = await this.storage.paymentStorage.AddInternalPayment(userId, internalInvoice.invoice, payAmount, serviceFee, linkedApplication, debitNpub) - this.utils.stateBundler.AddTxPoint('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }) + this.utils.stateBundler.AddTxPoint('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }, linkedApplication.app_id) return { preimage: "", amtPaid: payAmount, networkFee: 0, serialId: newPayment.serial_id } } catch (err) { await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement, "internal_payment_refund:" + internalInvoice.invoice) - this.utils.stateBundler.AddTxPointFailed('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }) + this.utils.stateBundler.AddTxPointFailed('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }, linkedApplication.app_id) throw err } diff --git a/src/services/serverMethods/index.ts b/src/services/serverMethods/index.ts index 246985bf..7291aec8 100644 --- a/src/services/serverMethods/index.ts +++ b/src/services/serverMethods/index.ts @@ -17,6 +17,9 @@ export default (mainHandler: Main): Types.ServerMethods => { GetUsageMetrics: async ({ ctx, req }) => { return mainHandler.metricsManager.GetUsageMetrics(req) }, + GetBundleMetrics: async ({ ctx, req }) => { + return mainHandler.utils.stateBundler.GetBundleMetrics(req) + }, GetSingleUsageMetrics: async ({ ctx, req }) => { return mainHandler.metricsManager.GetSingleUsageMetrics(req) }, diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index 7f105ce0..5f94a546 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -10,7 +10,6 @@ import MetricsEventStorage from "./metricsEventStorage.js"; import TransactionsQueue, { TX } from "./transactionsQueue.js"; import EventsLogManager from "./eventsLog.js"; import { LiquidityStorage } from "./liquidityStorage.js"; -import { StateBundler } from "./stateBundler.js"; import DebitStorage from "./debitStorage.js" import OfferStorage from "./offerStorage.js" export type StorageSettings = { @@ -35,7 +34,6 @@ export default class { debitStorage: DebitStorage offerStorage: OfferStorage eventsLog: EventsLogManager - stateBundler: StateBundler constructor(settings: StorageSettings) { this.settings = settings this.eventsLog = new EventsLogManager(settings.eventLogPath) diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/metricsEventStorage.ts index 619f903c..ea1a2d5e 100644 --- a/src/services/storage/metricsEventStorage.ts +++ b/src/services/storage/metricsEventStorage.ts @@ -2,51 +2,31 @@ import fs from 'fs' import * as Types from '../../../proto/autogenerated/ts/types.js' import { StorageSettings } from "./index.js"; import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'; -const chunkSizeBytes = 128 * 1024 +import { TlvFilesStorage } from './tlvFilesStorage.js'; export default class { - settings: StorageSettings - metricsPath: string + tlvStorage: TlvFilesStorage cachePath: string - metaReady = false - metricsMeta: Record> = {} - pendingMetrics: Record> = {} last24hCache: { ts: number, ok: number, fail: number }[] = [] - lastPersistedMetrics: number = 0 lastPersistedCache: number = 0 constructor(settings: StorageSettings) { - this.settings = settings; - this.metricsPath = [settings.dataDir, "metric_events"].join("/") + const metricsPath = [settings.dataDir, "metric_events"].join("/") + this.tlvStorage = new TlvFilesStorage(metricsPath) this.cachePath = [settings.dataDir, "metric_cache"].join("/") if (!fs.existsSync(this.cachePath)) { fs.mkdirSync(this.cachePath, { recursive: true }); } - this.initMetricsMeta() + this.tlvStorage.initMeta() this.loadCache() setInterval(() => { - if (Date.now() - this.lastPersistedMetrics > 1000 * 60 * 4) { - this.persistMetrics() - } if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) { this.persistCache() } }, 1000 * 60 * 5) process.on('exit', () => { - this.persistMetrics() this.persistCache() }); - // catch ctrl+c event and exit normally - process.on('SIGINT', () => { - console.log('Ctrl-C...'); - process.exit(2); - }); - //catch uncaught exceptions, trace, then exit normally - process.on('uncaughtException', (e) => { - console.log('Uncaught Exception...'); - console.log(e.stack); - process.exit(99); - }); } getlast24hCache = () => { return this.last24hCache } @@ -93,139 +73,35 @@ export default class { } AddMetricEvent = (appId: string, method: string, metric: Uint8Array, success: boolean) => { - if (!this.metaReady) { - throw new Error("meta metrics not ready") - } - if (!this.pendingMetrics[appId]) { - this.pendingMetrics[appId] = {} - } - if (!this.pendingMetrics[appId][method]) { - this.pendingMetrics[appId][method] = { tlvs: [] } - } - this.pendingMetrics[appId][method].tlvs.push(metric) + this.tlvStorage.AddTlv(appId, method, metric) this.pushToCache(success) } LoadLatestMetrics = async (limit = 100): Promise => { - this.persistMetrics() + const raw = this.tlvStorage.LoadLatest(limit) const metrics: Types.UsageMetrics = { apps: {} } - this.foreachMetricMethodFile((app, method, tlvFiles) => { - if (tlvFiles.length === 0) { return } - const methodPath = [this.metricsPath, app, method].join("/") - const latest = tlvFiles[tlvFiles.length - 1] - const tlvFile = [methodPath, `${latest}.mtlv`].join("/") - const tlv = fs.readFileSync(tlvFile) - const decoded = decodeListTLV(parseTLV(tlv)) - if (!metrics.apps[app]) { - metrics.apps[app] = { app_metrics: {} } - } - if (decoded.length > limit) { - decoded.splice(0, decoded.length - limit) - } - metrics.apps[app].app_metrics[method] = { - base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')), - current_chunk: latest, - available_chunks: tlvFiles - } + Object.keys(raw).forEach(app => { + metrics.apps[app] = { app_metrics: {} } + Object.keys(raw[app]).forEach(method => { + const data = raw[app][method] + metrics.apps[app].app_metrics[method] = { + base_64_tlvs: data.tlvs.map(d => Buffer.from(d).toString('base64')), + current_chunk: data.current_chunk, + available_chunks: data.available_chunks + } + }) }) return metrics } - - LoadRawMetricsFile = async (app: string, method: string, chunk: number): Promise => { - if (!this.metaReady || !this.metricsMeta[app] || !this.metricsMeta[app][method] || !this.metricsMeta[app][method].chunks.includes(chunk)) { - throw new Error("metrics not found") - } - const fullPath = [this.metricsPath, app, method, `${chunk}.mtlv`].join("/") - return fs.readFileSync(fullPath) - } LoadMetricsFile = async (app: string, method: string, chunk: number): Promise => { - const tlv = await this.LoadRawMetricsFile(app, method, chunk) - const decoded = decodeListTLV(parseTLV(tlv)) + const { fileData, chunks } = this.tlvStorage.LoadFile(app, method, chunk) + //const tlv = await this.LoadRawMetricsFile(app, method, chunk) + const decoded = decodeListTLV(parseTLV(fileData)) return { base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')), current_chunk: chunk, - available_chunks: this.metricsMeta[app][method].chunks + available_chunks: chunks } } - - persistMetrics = () => { - if (!this.metaReady) { - throw new Error("meta metrics not ready") - } - this.lastPersistedMetrics = Date.now() - const tosync = this.pendingMetrics - this.pendingMetrics = {} - const apps = Object.keys(tosync) - apps.map(app => { - const appPath = [this.metricsPath, app].join("/") - if (!fs.existsSync(appPath)) { - fs.mkdirSync(appPath, { recursive: true }); - } - const methods = Object.keys(tosync[app]) - methods.map(methodName => { - const methodPath = [appPath, methodName].join("/") - if (!fs.existsSync(methodPath)) { - fs.mkdirSync(methodPath, { recursive: true }); - } - const method = tosync[app][methodName] - const meta = this.getMetricsMeta(app, methodName) - const chunks = meta.chunks.length > 0 ? meta.chunks : [0] - const latest = chunks[chunks.length - 1] - const tlv = encodeTLV(encodeListTLV(method.tlvs)) - const tlvFile = [methodPath, `${latest}.mtlv`].join("/") - fs.appendFileSync(tlvFile, Buffer.from(tlv)) - if (fs.lstatSync(tlvFile).size > chunkSizeBytes) { - this.updateMetricsMeta(app, methodName, [...chunks, latest + 1]) - } - }) - }) - } - - initMetricsMeta = () => { - this.foreachMetricMethodFile((app, method, tlvFiles) => { - this.updateMetricsMeta(app, method, tlvFiles) - }) - this.metaReady = true - } - - updateMetricsMeta = (appId: string, method: string, sortedChunks: number[]) => { - if (!this.metricsMeta[appId]) { - this.metricsMeta[appId] = {} - } - this.metricsMeta[appId][method] = { chunks: sortedChunks } - } - - getMetricsMeta = (appId: string, method: string) => { - if (!this.metricsMeta[appId]) { - return { chunks: [] } - } - return this.metricsMeta[appId][method] || { chunks: [] } - } - - foreachMetricMethodFile = (cb: (appId: string, method: string, tlvFiles: number[]) => void) => { - if (!fs.existsSync(this.metricsPath)) { - fs.mkdirSync(this.metricsPath, { recursive: true }); - } - const apps = fs.readdirSync(this.metricsPath) - apps.forEach(appDir => { - const appPath = [this.metricsPath, appDir].join("/") - if (!fs.lstatSync(appPath).isDirectory()) { - return - } - const methods = fs.readdirSync(appPath) - methods.forEach(methodDir => { - const methodPath = [appPath, methodDir].join("/") - if (!fs.lstatSync(methodPath).isDirectory()) { - return - } - const tlvFiles = fs.readdirSync(methodPath) - .filter(f => f.endsWith(".mtlv")) - .map(f => +f.slice(0, -".mtlv".length)) - .filter(n => !isNaN(n)) - .sort((a, b) => a - b) - cb(appDir, methodDir, tlvFiles) - }) - }) - } } \ No newline at end of file diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/stateBundler.ts index 4cd000be..ccebbd2a 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/stateBundler.ts @@ -1,5 +1,9 @@ +import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js" import { getLogger } from "../helpers/logger.js" - +import { integerToUint8Array } from "../helpers/tlv.js" +import { StorageSettings } from "./index.js" +import { TlvFilesStorage } from "./tlvFilesStorage.js" +import * as Types from "../../../proto/autogenerated/ts/types.js" const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const const maxStatePointTypes = ['maxProviderRespTime'] as const @@ -25,118 +29,120 @@ export type TxPointSettings = { timeDiscount?: true } export class StateBundler { - sinceStart: StateBundle = {} - lastReport: StateBundle = {} - sinceLatestReport: StateBundle = {} - reportPeriod = 1000 * 60 * 60 * 12 //12h - satsPer1SecondDiscount = 1 - totalSatsForDiscount = 0 - latestReport = Date.now() + tlvStorage: TlvFilesStorage reportLog = getLogger({ component: 'stateBundlerReport' }) - constructor() { - /* process.on('exit', () => { - this.Report() - }); - - // catch ctrl+c event and exit normally - process.on('SIGINT', () => { - console.log('Ctrl-C...'); - process.exit(2); - }); - - //catch uncaught exceptions, trace, then exit normally - process.on('uncaughtException', (e) => { - console.log('Uncaught Exception...'); - console.log(e.stack); - process.exit(99); - }); */ + constructor(settings: StorageSettings) { + const bundlerPath = [settings.dataDir, "bundler_events"].join("/") + this.tlvStorage = new TlvFilesStorage(bundlerPath) + this.tlvStorage.initMeta() } - increment = (key: string, value: number) => { - this.sinceStart[key] = (this.sinceStart[key] || 0) + value - this.sinceLatestReport[key] = (this.sinceLatestReport[key] || 0) + value - this.triggerReportCheck() - } - set = (key: string, value: number) => { - this.sinceStart[key] = value - this.sinceLatestReport[key] = value - this.triggerReportCheck() - } - max = (key: string, value: number) => { - this.sinceStart[key] = Math.max(this.sinceStart[key] || 0, value) - this.sinceLatestReport[key] = Math.max(this.sinceLatestReport[key] || 0, value) - this.triggerReportCheck() + async GetBundleMetrics(req: Types.LatestBundleMetricReq): Promise { + const latest = this.tlvStorage.LoadLatest(req.limit) + const metrics: Types.BundleMetrics = { apps: {} } + Object.keys(latest).forEach(app => { + metrics.apps[app] = { app_bundles: {} } + Object.keys(latest[app]).forEach(dataName => { + const data = latest[app][dataName] + metrics.apps[app].app_bundles[dataName] = { + current_chunk: data.current_chunk, + available_chunks: data.available_chunks, + base_64_data: data.tlvs.map(d => Buffer.from(d).toString('base64')) + } + }) + }) + return metrics } + /* increment = (key: string, value: number) => { + this.sinceStart[key] = (this.sinceStart[key] || 0) + value + this.sinceLatestReport[key] = (this.sinceLatestReport[key] || 0) + value + this.triggerReportCheck() + } + set = (key: string, value: number) => { + this.sinceStart[key] = value + this.sinceLatestReport[key] = value + this.triggerReportCheck() + } + max = (key: string, value: number) => { + this.sinceStart[key] = Math.max(this.sinceStart[key] || 0, value) + this.sinceLatestReport[key] = Math.max(this.sinceLatestReport[key] || 0, value) + this.triggerReportCheck() + } */ - AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings) => { - const { used, from, timeDiscount } = settings + AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => { + const { used, from } = settings const meta = settings.meta || [] const key = [actionName, from, used, ...meta].join('_') - if (timeDiscount) { - this.totalSatsForDiscount += v - } - this.increment(key, v) - //this.smallLogEvent(actionName, from) + + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings) => { + AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => { const { used, from } = settings const meta = settings.meta || [] const key = [actionName, from, used, ...meta, 'failed'].join('_') - this.increment(key, v) + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = []) => { + AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = [], appId = '__root') => { const key = [actionName, ...meta].join('_') - this.set(key, v) + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = []) => { + AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = [], appId = '__root') => { const key = [actionName, ...meta].join('_') - this.max(key, v) + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - triggerReportCheck = () => { - const discountSeconds = Math.floor(this.totalSatsForDiscount / this.satsPer1SecondDiscount) - const totalElapsed = Date.now() - this.latestReport - const elapsedWithDiscount = totalElapsed + discountSeconds * 1000 - if (elapsedWithDiscount > this.reportPeriod) { - this.Report() - } + serializeNow = (v: number) => { + const nowUnix = Math.floor(Date.now() / 1000) + const entry = new Uint8Array(8) + entry.set(integerToUint8Array(nowUnix), 0) + entry.set(integerToUint8Array(v), 4) + return entry } - smallLogEvent(event: TransactionStatePointType, from: 'user' | 'system') { - const char = from === 'user' ? 'U' : 'S' - switch (event) { - case 'addedAddress': - case 'addedInvoice': - process.stdout.write(`${char}+,`) - return - case 'addressWasPaid': - case 'invoiceWasPaid': - process.stdout.write(`${char}>,`) - return - case 'paidAnAddress': - case 'paidAnInvoice': - process.stdout.write(`${char}<,`) - return - case 'user2user': - process.stdout.write(`UU`) - } - } + /* triggerReportCheck = () => { + const discountSeconds = Math.floor(this.totalSatsForDiscount / this.satsPer1SecondDiscount) + const totalElapsed = Date.now() - this.latestReport + const elapsedWithDiscount = totalElapsed + discountSeconds * 1000 + if (elapsedWithDiscount > this.reportPeriod) { + this.Report() + } + } */ - Report = () => { - this.totalSatsForDiscount = 0 - this.latestReport = Date.now() - this.reportLog("+++++ since last report:") - Object.entries(this.sinceLatestReport).forEach(([key, value]) => { - this.reportLog(key, value) - }) - this.reportLog("+++++ since start:") - Object.entries(this.sinceStart).forEach(([key, value]) => { - this.reportLog(key, value) - }) - this.lastReport = { ...this.sinceLatestReport } - this.sinceLatestReport = {} - } + /* smallLogEvent(event: TransactionStatePointType, from: 'user' | 'system') { + const char = from === 'user' ? 'U' : 'S' + switch (event) { + case 'addedAddress': + case 'addedInvoice': + process.stdout.write(`${char}+,`) + return + case 'addressWasPaid': + case 'invoiceWasPaid': + process.stdout.write(`${char}>,`) + return + case 'paidAnAddress': + case 'paidAnInvoice': + process.stdout.write(`${char}<,`) + return + case 'user2user': + process.stdout.write(`UU`) + } + } */ + + /* Report = () => { + this.totalSatsForDiscount = 0 + this.latestReport = Date.now() + this.reportLog("+++++ since last report:") + Object.entries(this.sinceLatestReport).forEach(([key, value]) => { + this.reportLog(key, value) + }) + this.reportLog("+++++ since start:") + Object.entries(this.sinceStart).forEach(([key, value]) => { + this.reportLog(key, value) + }) + this.lastReport = { ...this.sinceLatestReport } + this.sinceLatestReport = {} + } */ } \ No newline at end of file diff --git a/src/services/storage/tlvFilesStorage.ts b/src/services/storage/tlvFilesStorage.ts new file mode 100644 index 00000000..ad7fe36f --- /dev/null +++ b/src/services/storage/tlvFilesStorage.ts @@ -0,0 +1,153 @@ +import fs from 'fs' +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js' +const chunkSizeBytes = 128 * 1024 +export type LatestData = Record> +export class TlvFilesStorage { + storagePath: string + lastPersisted: number = 0 + meta: Record> = {} + pending: Record> = {} + metaReady = false + constructor(storagePath: string) { + this.storagePath = storagePath + if (!fs.existsSync(this.storagePath)) { + fs.mkdirSync(this.storagePath, { recursive: true }); + } + this.initMeta() + setInterval(() => { + if (Date.now() - this.lastPersisted > 1000 * 60 * 4) { + this.persist() + } + }, 1000 * 60 * 5) + process.on('exit', () => { + this.persist() + }); + } + + LoadFile = (app: string, dataName: string, chunk: number): { fileData: Buffer, chunks: number[] } => { + if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) { + throw new Error("metrics not found") + } + const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].join("/") + const fileData = fs.readFileSync(fullPath) + return { fileData, chunks: this.meta[app][dataName].chunks } + } + + AddTlv = (appId: string, dataName: string, tlv: Uint8Array) => { + if (!this.metaReady) { + throw new Error("meta metrics not ready") + } + if (!this.pending[appId]) { + this.pending[appId] = {} + } + if (!this.pending[appId][dataName]) { + this.pending[appId][dataName] = { tlvs: [] } + } + this.pending[appId][dataName].tlvs.push(tlv) + } + + LoadLatest = (limit = 100): LatestData => { + this.persist() + const data: LatestData = {} + this.foreachFile((app, dataName, tlvFiles) => { + if (tlvFiles.length === 0) { return } + const methodPath = [this.storagePath, app, dataName].join("/") + const latest = tlvFiles[tlvFiles.length - 1] + const tlvFile = [methodPath, `${latest}.mtlv`].join("/") + const tlv = fs.readFileSync(tlvFile) + const decoded = decodeListTLV(parseTLV(tlv)) + if (!data[app]) { + data[app] = {} + } + if (decoded.length > limit) { + decoded.splice(0, decoded.length - limit) + } + data[app][dataName] = { + tlvs: decoded, + current_chunk: latest, + available_chunks: tlvFiles + } + }) + return data + } + + persist = () => { + if (!this.metaReady) { + throw new Error("meta metrics not ready") + } + this.lastPersisted = Date.now() + const tosync = this.pending + this.pending = {} + const apps = Object.keys(tosync) + apps.map(app => { + const appPath = [this.storagePath, app].join("/") + if (!fs.existsSync(appPath)) { + fs.mkdirSync(appPath, { recursive: true }); + } + const dataNames = Object.keys(tosync[app]) + dataNames.map(dataName => { + const dataPath = [appPath, dataName].join("/") + if (!fs.existsSync(dataPath)) { + fs.mkdirSync(dataPath, { recursive: true }); + } + const data = tosync[app][dataName] + const meta = this.getMeta(app, dataName) + const chunks = meta.chunks.length > 0 ? meta.chunks : [0] + const latest = chunks[chunks.length - 1] + const tlv = encodeTLV(encodeListTLV(data.tlvs)) + const tlvFile = [dataPath, `${latest}.mtlv`].join("/") + fs.appendFileSync(tlvFile, Buffer.from(tlv)) + if (fs.lstatSync(tlvFile).size > chunkSizeBytes) { + this.updateMeta(app, dataName, [...chunks, latest + 1]) + } + }) + }) + } + + getMeta = (appId: string, dataName: string) => { + if (!this.meta[appId]) { + return { chunks: [] } + } + return this.meta[appId][dataName] || { chunks: [] } + } + + initMeta = () => { + this.foreachFile((app, dataName, tlvFiles) => { + this.updateMeta(app, dataName, tlvFiles) + }) + this.metaReady = true + } + + updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { + if (!this.meta[appId]) { + this.meta[appId] = {} + } + this.meta[appId][dataName] = { chunks: sortedChunks } + } + + foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { + if (!fs.existsSync(this.storagePath)) { + fs.mkdirSync(this.storagePath, { recursive: true }); + } + const apps = fs.readdirSync(this.storagePath) + apps.forEach(appDir => { + const appPath = [this.storagePath, appDir].join("/") + if (!fs.lstatSync(appPath).isDirectory()) { + return + } + const dataNames = fs.readdirSync(appPath) + dataNames.forEach(dataName => { + const dataPath = [appPath, dataName].join("/") + if (!fs.lstatSync(dataPath).isDirectory()) { + return + } + const tlvFiles = fs.readdirSync(dataPath) + .filter(f => f.endsWith(".mtlv")) + .map(f => +f.slice(0, -".mtlv".length)) + .filter(n => !isNaN(n)) + .sort((a, b) => a - b) + cb(appDir, dataName, tlvFiles) + }) + }) + } +} \ No newline at end of file diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index bc90a0a6..5f2989b5 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -90,12 +90,12 @@ export default class webRTC { this.log(ERROR, 'SingleUsageMetricReqValidate', err) return } - const res = await this.storage.metricsEventStorage.LoadRawMetricsFile(j.app_id, j.metrics_name, j.page) + const { fileData } = this.storage.metricsEventStorage.tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 const packets: Buffer[] = [] - while (i < res.length) { - const chunk = res.slice(i, Math.min(i + 15_000, res.length)) + while (i < fileData.length) { + const chunk = fileData.slice(i, Math.min(i + 15_000, fileData.length)) packets.push(chunk) i += 15_000 } From ea28cbc1b5e52696372f4baf96f335cc0b03f4bb Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 15:57:02 +0000 Subject: [PATCH 02/11] path fix --- src/services/storage/metricsEventStorage.ts | 8 ++++---- src/services/storage/stateBundler.ts | 2 +- src/services/storage/tlvFilesStorage.ts | 16 ++++++++-------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/metricsEventStorage.ts index ea1a2d5e..058c4a94 100644 --- a/src/services/storage/metricsEventStorage.ts +++ b/src/services/storage/metricsEventStorage.ts @@ -9,9 +9,9 @@ export default class { last24hCache: { ts: number, ok: number, fail: number }[] = [] lastPersistedCache: number = 0 constructor(settings: StorageSettings) { - const metricsPath = [settings.dataDir, "metric_events"].join("/") + const metricsPath = [settings.dataDir, "metric_events"].filter(s => !!s).join("/") this.tlvStorage = new TlvFilesStorage(metricsPath) - this.cachePath = [settings.dataDir, "metric_cache"].join("/") + this.cachePath = [settings.dataDir, "metric_cache"].filter(s => !!s).join("/") if (!fs.existsSync(this.cachePath)) { fs.mkdirSync(this.cachePath, { recursive: true }); } @@ -60,12 +60,12 @@ export default class { } persistCache = () => { - const last24CachePath = [this.cachePath, "last24hSF.json"].join("/") + const last24CachePath = [this.cachePath, "last24hSF.json"].filter(s => !!s).filter(s => !!s).join("/") fs.writeFileSync(last24CachePath, JSON.stringify(this.last24hCache), {}) } loadCache = () => { - const last24CachePath = [this.cachePath, "last24hSF.json"].join("/") + const last24CachePath = [this.cachePath, "last24hSF.json"].filter(s => !!s).filter(s => !!s).join("/") if (fs.existsSync(last24CachePath)) { this.last24hCache = JSON.parse(fs.readFileSync(last24CachePath, 'utf-8')) this.rotateCache(Math.floor(Date.now() / 1000)) diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/stateBundler.ts index ccebbd2a..a9b1bf08 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/stateBundler.ts @@ -32,7 +32,7 @@ export class StateBundler { tlvStorage: TlvFilesStorage reportLog = getLogger({ component: 'stateBundlerReport' }) constructor(settings: StorageSettings) { - const bundlerPath = [settings.dataDir, "bundler_events"].join("/") + const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/") this.tlvStorage = new TlvFilesStorage(bundlerPath) this.tlvStorage.initMeta() } diff --git a/src/services/storage/tlvFilesStorage.ts b/src/services/storage/tlvFilesStorage.ts index ad7fe36f..2ba66201 100644 --- a/src/services/storage/tlvFilesStorage.ts +++ b/src/services/storage/tlvFilesStorage.ts @@ -28,7 +28,7 @@ export class TlvFilesStorage { if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) { throw new Error("metrics not found") } - const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].join("/") + const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].filter(s => !!s).join("/") const fileData = fs.readFileSync(fullPath) return { fileData, chunks: this.meta[app][dataName].chunks } } @@ -51,9 +51,9 @@ export class TlvFilesStorage { const data: LatestData = {} this.foreachFile((app, dataName, tlvFiles) => { if (tlvFiles.length === 0) { return } - const methodPath = [this.storagePath, app, dataName].join("/") + const methodPath = [this.storagePath, app, dataName].filter(s => !!s).join("/") const latest = tlvFiles[tlvFiles.length - 1] - const tlvFile = [methodPath, `${latest}.mtlv`].join("/") + const tlvFile = [methodPath, `${latest}.mtlv`].filter(s => !!s).join("/") const tlv = fs.readFileSync(tlvFile) const decoded = decodeListTLV(parseTLV(tlv)) if (!data[app]) { @@ -80,13 +80,13 @@ export class TlvFilesStorage { this.pending = {} const apps = Object.keys(tosync) apps.map(app => { - const appPath = [this.storagePath, app].join("/") + const appPath = [this.storagePath, app].filter(s => !!s).join("/") if (!fs.existsSync(appPath)) { fs.mkdirSync(appPath, { recursive: true }); } const dataNames = Object.keys(tosync[app]) dataNames.map(dataName => { - const dataPath = [appPath, dataName].join("/") + const dataPath = [appPath, dataName].filter(s => !!s).join("/") if (!fs.existsSync(dataPath)) { fs.mkdirSync(dataPath, { recursive: true }); } @@ -95,7 +95,7 @@ export class TlvFilesStorage { const chunks = meta.chunks.length > 0 ? meta.chunks : [0] const latest = chunks[chunks.length - 1] const tlv = encodeTLV(encodeListTLV(data.tlvs)) - const tlvFile = [dataPath, `${latest}.mtlv`].join("/") + const tlvFile = [dataPath, `${latest}.mtlv`].filter(s => !!s).join("/") fs.appendFileSync(tlvFile, Buffer.from(tlv)) if (fs.lstatSync(tlvFile).size > chunkSizeBytes) { this.updateMeta(app, dataName, [...chunks, latest + 1]) @@ -131,13 +131,13 @@ export class TlvFilesStorage { } const apps = fs.readdirSync(this.storagePath) apps.forEach(appDir => { - const appPath = [this.storagePath, appDir].join("/") + const appPath = [this.storagePath, appDir].filter(s => !!s).join("/") if (!fs.lstatSync(appPath).isDirectory()) { return } const dataNames = fs.readdirSync(appPath) dataNames.forEach(dataName => { - const dataPath = [appPath, dataName].join("/") + const dataPath = [appPath, dataName].filter(s => !!s).join("/") if (!fs.lstatSync(dataPath).isDirectory()) { return } From d2c3eca1f61fd8b9ba48edfd92671266166c3f2a Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 18:12:23 +0000 Subject: [PATCH 03/11] path fix --- src/services/storage/stateBundler.ts | 38 ++++++++++++---------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/stateBundler.ts index a9b1bf08..44799f15 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/stateBundler.ts @@ -31,6 +31,7 @@ export type TxPointSettings = { export class StateBundler { tlvStorage: TlvFilesStorage reportLog = getLogger({ component: 'stateBundlerReport' }) + prevValues: Record = {} constructor(settings: StorageSettings) { const bundlerPath = [settings.dataDir, "bundler_events"].filter(s => !!s).join("/") this.tlvStorage = new TlvFilesStorage(bundlerPath) @@ -53,45 +54,38 @@ export class StateBundler { }) return metrics } - /* increment = (key: string, value: number) => { - this.sinceStart[key] = (this.sinceStart[key] || 0) + value - this.sinceLatestReport[key] = (this.sinceLatestReport[key] || 0) + value - this.triggerReportCheck() + AddValue = (appId: string, key: string, v: number) => { + const prevValueKey = `${appId}_${key}` + if (this.prevValues[prevValueKey] === v) { + return } - set = (key: string, value: number) => { - this.sinceStart[key] = value - this.sinceLatestReport[key] = value - this.triggerReportCheck() - } - max = (key: string, value: number) => { - this.sinceStart[key] = Math.max(this.sinceStart[key] || 0, value) - this.sinceLatestReport[key] = Math.max(this.sinceLatestReport[key] || 0, value) - this.triggerReportCheck() - } */ + this.prevValues[prevValueKey] = v + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) + } - AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => { + AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '_root') => { const { used, from } = settings const meta = settings.meta || [] const key = [actionName, from, used, ...meta].join('_') - this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) + this.AddValue(appId, key, v) } - AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => { + AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '_root') => { const { used, from } = settings const meta = settings.meta || [] const key = [actionName, from, used, ...meta, 'failed'].join('_') - this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) + this.AddValue(appId, key, v) } - AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = [], appId = '__root') => { + AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = [], appId = '_root') => { const key = [actionName, ...meta].join('_') - this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) + this.AddValue(appId, key, v) } - AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = [], appId = '__root') => { + AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = [], appId = '_root') => { const key = [actionName, ...meta].join('_') - this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) + this.AddValue(appId, key, v) } serializeNow = (v: number) => { From 904cd97f388a71d2eaf946483347b9799e6a41ba Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 19:14:10 +0000 Subject: [PATCH 04/11] deb --- src/services/webRTC/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 5f2989b5..3188b3de 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -46,6 +46,7 @@ export default class webRTC { } const conn = this.connections[key] const iceCandidate: IceCandidate = JSON.parse(candidate) + console.log({ candidate }) if (!iceCandidate.candidate) { await conn.addIceCandidate(undefined); } else { From acd5c39f893042ea7f0b7f2ac262e5297d407747 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 19:16:48 +0000 Subject: [PATCH 05/11] deb --- src/services/webRTC/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 3188b3de..443ed5b9 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -46,7 +46,7 @@ export default class webRTC { } const conn = this.connections[key] const iceCandidate: IceCandidate = JSON.parse(candidate) - console.log({ candidate }) + console.log({ iceCandidate }) if (!iceCandidate.candidate) { await conn.addIceCandidate(undefined); } else { From 02c33d088565de5c6ef28646c83437825d9600cf Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 19:18:30 +0000 Subject: [PATCH 06/11] fix --- src/services/webRTC/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 443ed5b9..c58faed3 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -48,7 +48,7 @@ export default class webRTC { const iceCandidate: IceCandidate = JSON.parse(candidate) console.log({ iceCandidate }) if (!iceCandidate.candidate) { - await conn.addIceCandidate(undefined); + await conn.addIceCandidate(null); } else { await conn.addIceCandidate(iceCandidate); } From 4e94d8865191503b4cb584a11efb88d5125a6bef Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 19:19:13 +0000 Subject: [PATCH 07/11] up --- src/services/webRTC/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index c58faed3..963f16ee 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -48,6 +48,7 @@ export default class webRTC { const iceCandidate: IceCandidate = JSON.parse(candidate) console.log({ iceCandidate }) if (!iceCandidate.candidate) { + //@ts-ignore await conn.addIceCandidate(null); } else { await conn.addIceCandidate(iceCandidate); From 4bc89254d20dfecbef0d1c1d43aa7bdb986ad80d Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 19:29:25 +0000 Subject: [PATCH 08/11] deb --- src/services/webRTC/index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 963f16ee..879db943 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -48,8 +48,7 @@ export default class webRTC { const iceCandidate: IceCandidate = JSON.parse(candidate) console.log({ iceCandidate }) if (!iceCandidate.candidate) { - //@ts-ignore - await conn.addIceCandidate(null); + await conn.addIceCandidate(iceCandidate); } else { await conn.addIceCandidate(iceCandidate); } From cebb8559c1d5c39af88cd4459070ecd7087bca29 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 5 Feb 2025 19:33:38 +0000 Subject: [PATCH 09/11] fix --- src/services/webRTC/index.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 879db943..667ae608 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -47,9 +47,7 @@ export default class webRTC { const conn = this.connections[key] const iceCandidate: IceCandidate = JSON.parse(candidate) console.log({ iceCandidate }) - if (!iceCandidate.candidate) { - await conn.addIceCandidate(iceCandidate); - } else { + if (iceCandidate.candidate) { await conn.addIceCandidate(iceCandidate); } return {} From c0f137d996df4e4307d86c7f81ea172e5549a1d9 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Thu, 6 Feb 2025 16:15:17 +0000 Subject: [PATCH 10/11] single bundle metrics --- proto/autogenerated/client.md | 23 +++++++++++++-- proto/autogenerated/go/http_client.go | 34 ++++++++++++++++++++-- proto/autogenerated/go/types.go | 18 ++++++++---- proto/autogenerated/ts/express_server.ts | 24 +++++++++++++++- proto/autogenerated/ts/http_client.ts | 16 ++++++++++- proto/autogenerated/ts/nostr_client.ts | 17 ++++++++++- proto/autogenerated/ts/nostr_transport.ts | 18 +++++++++++- proto/autogenerated/ts/types.ts | 35 +++++++++++++++++------ proto/service/methods.proto | 8 +++++- proto/service/structs.proto | 10 +++++-- src/services/main/index.ts | 2 +- src/services/metrics/index.ts | 2 +- src/services/serverMethods/index.ts | 13 +++++++++ src/services/storage/stateBundler.ts | 14 ++++++++- src/services/webRTC/index.ts | 26 +++++++++++++---- 15 files changed, 226 insertions(+), 34 deletions(-) diff --git a/proto/autogenerated/client.md b/proto/autogenerated/client.md index 04f579e6..29433f5c 100644 --- a/proto/autogenerated/client.md +++ b/proto/autogenerated/client.md @@ -163,9 +163,14 @@ The nostr server will send back a message response, and inside the body there wi - This methods has an __empty__ __request__ body - output: [LndSeed](#LndSeed) +- GetSingleBundleMetrics + - auth type: __Metrics__ + - input: [SingleMetricReq](#SingleMetricReq) + - output: [BundleData](#BundleData) + - GetSingleUsageMetrics - auth type: __Metrics__ - - input: [SingleUsageMetricReq](#SingleUsageMetricReq) + - input: [SingleMetricReq](#SingleMetricReq) - output: [UsageMetricTlv](#UsageMetricTlv) - GetUsageMetrics @@ -609,11 +614,18 @@ The nostr server will send back a message response, and inside the body there wi - This methods has an __empty__ __request__ body - output: [LndSeed](#LndSeed) +- GetSingleBundleMetrics + - auth type: __Metrics__ + - http method: __post__ + - http route: __/api/reports/bundle/single__ + - input: [SingleMetricReq](#SingleMetricReq) + - output: [BundleData](#BundleData) + - GetSingleUsageMetrics - auth type: __Metrics__ - http method: __post__ - http route: __/api/reports/usage/single__ - - input: [SingleUsageMetricReq](#SingleUsageMetricReq) + - input: [SingleMetricReq](#SingleMetricReq) - output: [UsageMetricTlv](#UsageMetricTlv) - GetUsageMetrics @@ -1365,8 +1377,9 @@ The nostr server will send back a message response, and inside the body there wi - __amount__: _number_ - __invoice__: _string_ -### SingleUsageMetricReq +### SingleMetricReq - __app_id__: _string_ + - __metric_type__: _[SingleMetricType](#SingleMetricType)_ - __metrics_name__: _string_ - __page__: _number_ - __request_id__: _number_ *this field is optional @@ -1472,6 +1485,10 @@ The nostr server will send back a message response, and inside the body there wi - __CHAIN_OP__ - __INVOICE_OP__ +### SingleMetricType + - __BUNDLE_METRIC__ + - __USAGE_METRIC__ + ### UserOperationType - __INCOMING_INVOICE__ - __INCOMING_TX__ diff --git a/proto/autogenerated/go/http_client.go b/proto/autogenerated/go/http_client.go index 285945da..5f6a5a35 100644 --- a/proto/autogenerated/go/http_client.go +++ b/proto/autogenerated/go/http_client.go @@ -94,7 +94,8 @@ type Client struct { GetNPubLinkingState func(req GetNPubLinking) (*NPubLinking, error) GetPaymentState func(req GetPaymentStateRequest) (*PaymentState, error) GetSeed func() (*LndSeed, error) - GetSingleUsageMetrics func(req SingleUsageMetricReq) (*UsageMetricTlv, error) + GetSingleBundleMetrics func(req SingleMetricReq) (*BundleData, error) + GetSingleUsageMetrics func(req SingleMetricReq) (*UsageMetricTlv, error) GetUsageMetrics func(req LatestUsageMetricReq) (*UsageMetrics, error) GetUserInfo func() (*UserInfo, error) GetUserOffer func(req OfferId) (*OfferConfig, error) @@ -1090,7 +1091,36 @@ func NewClient(params ClientParams) *Client { } return &res, nil }, - GetSingleUsageMetrics: func(req SingleUsageMetricReq) (*UsageMetricTlv, error) { + GetSingleBundleMetrics: func(req SingleMetricReq) (*BundleData, error) { + auth, err := params.RetrieveMetricsAuth() + if err != nil { + return nil, err + } + finalRoute := "/api/reports/bundle/single" + body, err := json.Marshal(req) + if err != nil { + return nil, err + } + resBody, err := doPostRequest(params.BaseURL+finalRoute, body, auth) + if err != nil { + return nil, err + } + result := ResultError{} + err = json.Unmarshal(resBody, &result) + if err != nil { + return nil, err + } + if result.Status == "ERROR" { + return nil, fmt.Errorf(result.Reason) + } + res := BundleData{} + err = json.Unmarshal(resBody, &res) + if err != nil { + return nil, err + } + return &res, nil + }, + GetSingleUsageMetrics: func(req SingleMetricReq) (*UsageMetricTlv, error) { auth, err := params.RetrieveMetricsAuth() if err != nil { return nil, err diff --git a/proto/autogenerated/go/types.go b/proto/autogenerated/go/types.go index b5d42976..3a6bf98b 100644 --- a/proto/autogenerated/go/types.go +++ b/proto/autogenerated/go/types.go @@ -78,6 +78,13 @@ const ( INVOICE_OP OperationType = "INVOICE_OP" ) +type SingleMetricType string + +const ( + BUNDLE_METRIC SingleMetricType = "BUNDLE_METRIC" + USAGE_METRIC SingleMetricType = "USAGE_METRIC" +) + type UserOperationType string const ( @@ -569,11 +576,12 @@ type SetMockInvoiceAsPaidRequest struct { Amount int64 `json:"amount"` Invoice string `json:"invoice"` } -type SingleUsageMetricReq struct { - App_id string `json:"app_id"` - Metrics_name string `json:"metrics_name"` - Page int64 `json:"page"` - Request_id int64 `json:"request_id"` +type SingleMetricReq struct { + App_id string `json:"app_id"` + Metric_type SingleMetricType `json:"metric_type"` + Metrics_name string `json:"metrics_name"` + Page int64 `json:"page"` + Request_id int64 `json:"request_id"` } type UpdateChannelPolicyRequest struct { Policy *ChannelPolicy `json:"policy"` diff --git a/proto/autogenerated/ts/express_server.ts b/proto/autogenerated/ts/express_server.ts index 3ab582e1..6f57f62c 100644 --- a/proto/autogenerated/ts/express_server.ts +++ b/proto/autogenerated/ts/express_server.ts @@ -1128,6 +1128,28 @@ export default (methods: Types.ServerMethods, opts: ServerOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } }) + if (!opts.allowNotImplementedMethods && !methods.GetSingleBundleMetrics) throw new Error('method: GetSingleBundleMetrics is not implemented') + app.post('/api/reports/bundle/single', async (req, res) => { + const info: Types.RequestInfo = { rpcName: 'GetSingleBundleMetrics', batch: false, nostr: false, batchSize: 0} + const stats: Types.RequestStats = { startMs:req.startTimeMs || 0, start:req.startTime || 0n, parse: process.hrtime.bigint(), guard: 0n, validate: 0n, handle: 0n } + let authCtx: Types.AuthContext = {} + try { + if (!methods.GetSingleBundleMetrics) throw new Error('method: GetSingleBundleMetrics is not implemented') + const authContext = await opts.MetricsAuthGuard(req.headers['authorization']) + authCtx = authContext + stats.guard = process.hrtime.bigint() + const request = req.body + const error = Types.SingleMetricReqValidate(request) + stats.validate = process.hrtime.bigint() + if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authContext }, opts.metricsCallback) + const query = req.query + const params = req.params + const response = await methods.GetSingleBundleMetrics({rpcName:'GetSingleBundleMetrics', ctx:authContext , req: request}) + stats.handle = process.hrtime.bigint() + res.json({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + }) if (!opts.allowNotImplementedMethods && !methods.GetSingleUsageMetrics) throw new Error('method: GetSingleUsageMetrics is not implemented') app.post('/api/reports/usage/single', async (req, res) => { const info: Types.RequestInfo = { rpcName: 'GetSingleUsageMetrics', batch: false, nostr: false, batchSize: 0} @@ -1139,7 +1161,7 @@ export default (methods: Types.ServerMethods, opts: ServerOptions) => { authCtx = authContext stats.guard = process.hrtime.bigint() const request = req.body - const error = Types.SingleUsageMetricReqValidate(request) + const error = Types.SingleMetricReqValidate(request) stats.validate = process.hrtime.bigint() if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authContext }, opts.metricsCallback) const query = req.query diff --git a/proto/autogenerated/ts/http_client.ts b/proto/autogenerated/ts/http_client.ts index f2d79af6..c5a273fa 100644 --- a/proto/autogenerated/ts/http_client.ts +++ b/proto/autogenerated/ts/http_client.ts @@ -508,7 +508,21 @@ export default (params: ClientParams) => ({ } return { status: 'ERROR', reason: 'invalid response' } }, - GetSingleUsageMetrics: async (request: Types.SingleUsageMetricReq): Promise => { + GetSingleBundleMetrics: async (request: Types.SingleMetricReq): Promise => { + const auth = await params.retrieveMetricsAuth() + if (auth === null) throw new Error('retrieveMetricsAuth() returned null') + let finalRoute = '/api/reports/bundle/single' + const { data } = await axios.post(params.baseUrl + finalRoute, request, { headers: { 'authorization': auth } }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.BundleDataValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, + GetSingleUsageMetrics: async (request: Types.SingleMetricReq): Promise => { const auth = await params.retrieveMetricsAuth() if (auth === null) throw new Error('retrieveMetricsAuth() returned null') let finalRoute = '/api/reports/usage/single' diff --git a/proto/autogenerated/ts/nostr_client.ts b/proto/autogenerated/ts/nostr_client.ts index 28a281f6..53a9b167 100644 --- a/proto/autogenerated/ts/nostr_client.ts +++ b/proto/autogenerated/ts/nostr_client.ts @@ -437,7 +437,22 @@ export default (params: NostrClientParams, send: (to:string, message: NostrRequ } return { status: 'ERROR', reason: 'invalid response' } }, - GetSingleUsageMetrics: async (request: Types.SingleUsageMetricReq): Promise => { + GetSingleBundleMetrics: async (request: Types.SingleMetricReq): Promise => { + const auth = await params.retrieveNostrMetricsAuth() + if (auth === null) throw new Error('retrieveNostrMetricsAuth() returned null') + const nostrRequest: NostrRequest = {} + nostrRequest.body = request + const data = await send(params.pubDestination, {rpcName:'GetSingleBundleMetrics',authIdentifier:auth, ...nostrRequest }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.BundleDataValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, + GetSingleUsageMetrics: async (request: Types.SingleMetricReq): Promise => { const auth = await params.retrieveNostrMetricsAuth() if (auth === null) throw new Error('retrieveNostrMetricsAuth() returned null') const nostrRequest: NostrRequest = {} diff --git a/proto/autogenerated/ts/nostr_transport.ts b/proto/autogenerated/ts/nostr_transport.ts index 5cf473ff..e5efd03a 100644 --- a/proto/autogenerated/ts/nostr_transport.ts +++ b/proto/autogenerated/ts/nostr_transport.ts @@ -815,6 +815,22 @@ export default (methods: Types.ServerMethods, opts: NostrOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } break + case 'GetSingleBundleMetrics': + try { + if (!methods.GetSingleBundleMetrics) throw new Error('method: GetSingleBundleMetrics is not implemented') + const authContext = await opts.NostrMetricsAuthGuard(req.appId, req.authIdentifier) + stats.guard = process.hrtime.bigint() + authCtx = authContext + const request = req.body + const error = Types.SingleMetricReqValidate(request) + stats.validate = process.hrtime.bigint() + if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback) + const response = await methods.GetSingleBundleMetrics({rpcName:'GetSingleBundleMetrics', ctx:authContext , req: request}) + stats.handle = process.hrtime.bigint() + res({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + break case 'GetSingleUsageMetrics': try { if (!methods.GetSingleUsageMetrics) throw new Error('method: GetSingleUsageMetrics is not implemented') @@ -822,7 +838,7 @@ export default (methods: Types.ServerMethods, opts: NostrOptions) => { stats.guard = process.hrtime.bigint() authCtx = authContext const request = req.body - const error = Types.SingleUsageMetricReqValidate(request) + const error = Types.SingleMetricReqValidate(request) stats.validate = process.hrtime.bigint() if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback) const response = await methods.GetSingleUsageMetrics({rpcName:'GetSingleUsageMetrics', ctx:authContext , req: request}) diff --git a/proto/autogenerated/ts/types.ts b/proto/autogenerated/ts/types.ts index ca4a72e9..c24874f3 100644 --- a/proto/autogenerated/ts/types.ts +++ b/proto/autogenerated/ts/types.ts @@ -28,8 +28,8 @@ export type MetricsContext = { app_id: string operator_id: string } -export type MetricsMethodInputs = GetAppsMetrics_Input | GetBundleMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input -export type MetricsMethodOutputs = GetAppsMetrics_Output | GetBundleMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output +export type MetricsMethodInputs = GetAppsMetrics_Input | GetBundleMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleBundleMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input +export type MetricsMethodOutputs = GetAppsMetrics_Output | GetBundleMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleBundleMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output export type UserContext = { app_id: string app_user_id: string @@ -165,7 +165,10 @@ export type GetPaymentState_Output = ResultError | ({ status: 'OK' } & PaymentSt export type GetSeed_Input = {rpcName:'GetSeed'} export type GetSeed_Output = ResultError | ({ status: 'OK' } & LndSeed) -export type GetSingleUsageMetrics_Input = {rpcName:'GetSingleUsageMetrics', req: SingleUsageMetricReq} +export type GetSingleBundleMetrics_Input = {rpcName:'GetSingleBundleMetrics', req: SingleMetricReq} +export type GetSingleBundleMetrics_Output = ResultError | ({ status: 'OK' } & BundleData) + +export type GetSingleUsageMetrics_Input = {rpcName:'GetSingleUsageMetrics', req: SingleMetricReq} export type GetSingleUsageMetrics_Output = ResultError | ({ status: 'OK' } & UsageMetricTlv) export type GetUsageMetrics_Input = {rpcName:'GetUsageMetrics', req: LatestUsageMetricReq} @@ -332,6 +335,7 @@ export type ServerMethods = { GetNPubLinkingState?: (req: GetNPubLinkingState_Input & {ctx: AppContext }) => Promise GetPaymentState?: (req: GetPaymentState_Input & {ctx: UserContext }) => Promise GetSeed?: (req: GetSeed_Input & {ctx: AdminContext }) => Promise + GetSingleBundleMetrics?: (req: GetSingleBundleMetrics_Input & {ctx: MetricsContext }) => Promise GetSingleUsageMetrics?: (req: GetSingleUsageMetrics_Input & {ctx: MetricsContext }) => Promise GetUsageMetrics?: (req: GetUsageMetrics_Input & {ctx: MetricsContext }) => Promise GetUserInfo?: (req: GetUserInfo_Input & {ctx: UserContext }) => Promise @@ -404,6 +408,14 @@ export const enumCheckOperationType = (e?: OperationType): boolean => { for (const v in OperationType) if (e === v) return true return false } +export enum SingleMetricType { + BUNDLE_METRIC = 'BUNDLE_METRIC', + USAGE_METRIC = 'USAGE_METRIC', +} +export const enumCheckSingleMetricType = (e?: SingleMetricType): boolean => { + for (const v in SingleMetricType) if (e === v) return true + return false +} export enum UserOperationType { INCOMING_INVOICE = 'INCOMING_INVOICE', INCOMING_TX = 'INCOMING_TX', @@ -3288,28 +3300,33 @@ export const SetMockInvoiceAsPaidRequestValidate = (o?: SetMockInvoiceAsPaidRequ return null } -export type SingleUsageMetricReq = { +export type SingleMetricReq = { app_id: string + metric_type: SingleMetricType metrics_name: string page: number request_id?: number } -export type SingleUsageMetricReqOptionalField = 'request_id' -export const SingleUsageMetricReqOptionalFields: SingleUsageMetricReqOptionalField[] = ['request_id'] -export type SingleUsageMetricReqOptions = OptionsBaseMessage & { - checkOptionalsAreSet?: SingleUsageMetricReqOptionalField[] +export type SingleMetricReqOptionalField = 'request_id' +export const SingleMetricReqOptionalFields: SingleMetricReqOptionalField[] = ['request_id'] +export type SingleMetricReqOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: SingleMetricReqOptionalField[] app_id_CustomCheck?: (v: string) => boolean + metric_type_CustomCheck?: (v: SingleMetricType) => boolean metrics_name_CustomCheck?: (v: string) => boolean page_CustomCheck?: (v: number) => boolean request_id_CustomCheck?: (v?: number) => boolean } -export const SingleUsageMetricReqValidate = (o?: SingleUsageMetricReq, opts: SingleUsageMetricReqOptions = {}, path: string = 'SingleUsageMetricReq::root.'): Error | null => { +export const SingleMetricReqValidate = (o?: SingleMetricReq, opts: SingleMetricReqOptions = {}, path: string = 'SingleMetricReq::root.'): Error | null => { if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') if (typeof o.app_id !== 'string') return new Error(`${path}.app_id: is not a string`) if (opts.app_id_CustomCheck && !opts.app_id_CustomCheck(o.app_id)) return new Error(`${path}.app_id: custom check failed`) + if (!enumCheckSingleMetricType(o.metric_type)) return new Error(`${path}.metric_type: is not a valid SingleMetricType`) + if (opts.metric_type_CustomCheck && !opts.metric_type_CustomCheck(o.metric_type)) return new Error(`${path}.metric_type: custom check failed`) + if (typeof o.metrics_name !== 'string') return new Error(`${path}.metrics_name: is not a string`) if (opts.metrics_name_CustomCheck && !opts.metrics_name_CustomCheck(o.metrics_name)) return new Error(`${path}.metrics_name: custom check failed`) diff --git a/proto/service/methods.proto b/proto/service/methods.proto index 5f20f8ac..3d6856dd 100644 --- a/proto/service/methods.proto +++ b/proto/service/methods.proto @@ -189,7 +189,13 @@ service LightningPub { option (nostr) = true; } - rpc GetSingleUsageMetrics(structs.SingleUsageMetricReq) returns (structs.UsageMetricTlv) { + rpc GetSingleBundleMetrics(structs.SingleMetricReq) returns (structs.BundleData) { + option (auth_type) = "Metrics"; + option (http_method) = "post"; + option (http_route) = "/api/reports/bundle/single"; + option (nostr) = true; + } + rpc GetSingleUsageMetrics(structs.SingleMetricReq) returns (structs.UsageMetricTlv) { option (auth_type) = "Metrics"; option (http_method) = "post"; option (http_route) = "/api/reports/usage/single"; diff --git a/proto/service/structs.proto b/proto/service/structs.proto index 8079f6ea..c8514de0 100644 --- a/proto/service/structs.proto +++ b/proto/service/structs.proto @@ -41,11 +41,17 @@ message LatestUsageMetricReq { optional int64 limit = 1; } -message SingleUsageMetricReq { +enum SingleMetricType { + USAGE_METRIC = 0; + BUNDLE_METRIC = 1; +} + +message SingleMetricReq { string app_id = 1; string metrics_name = 2; int64 page = 3; - optional int64 request_id = 4; + SingleMetricType metric_type = 4; + optional int64 request_id = 5; } message UsageMetric { diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 7de52a43..27516f62 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -76,7 +76,7 @@ export default class { this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager) this.debitManager = new DebitManager(this.storage, this.lnd, this.applicationManager) this.offerManager = new OfferManager(this.storage, this.lnd, this.applicationManager, this.productManager) - this.webRTC = new webRTC(this.storage) + this.webRTC = new webRTC(this.storage, this.utils) } diff --git a/src/services/metrics/index.ts b/src/services/metrics/index.ts index 0a7a19c6..90568ffa 100644 --- a/src/services/metrics/index.ts +++ b/src/services/metrics/index.ts @@ -73,7 +73,7 @@ export default class Handler { return this.storage.metricsEventStorage.LoadLatestMetrics(req.limit) } - async GetSingleUsageMetrics(req: Types.SingleUsageMetricReq): Promise { + async GetSingleUsageMetrics(req: Types.SingleMetricReq): Promise { return this.storage.metricsEventStorage.LoadMetricsFile(req.app_id, req.metrics_name, req.page) } diff --git a/src/services/serverMethods/index.ts b/src/services/serverMethods/index.ts index 7291aec8..1b57deec 100644 --- a/src/services/serverMethods/index.ts +++ b/src/services/serverMethods/index.ts @@ -21,8 +21,21 @@ export default (mainHandler: Main): Types.ServerMethods => { return mainHandler.utils.stateBundler.GetBundleMetrics(req) }, GetSingleUsageMetrics: async ({ ctx, req }) => { + const err = Types.SingleMetricReqValidate(req, { + app_id_CustomCheck: id => id === "", + metrics_name_CustomCheck: name => name !== "" + }) + if (err != null) throw new Error(err.message) return mainHandler.metricsManager.GetSingleUsageMetrics(req) }, + GetSingleBundleMetrics: async ({ ctx, req }) => { + const err = Types.SingleMetricReqValidate(req, { + app_id_CustomCheck: id => id === "", + metrics_name_CustomCheck: name => name !== "" + }) + if (err != null) throw new Error(err.message) + return mainHandler.utils.stateBundler.GetSingleBundleMetrics(req) + }, GetErrorStats: async ({ ctx }) => { return mainHandler.metricsManager.GetErrorStats() }, diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/stateBundler.ts index 44799f15..560b3c24 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/stateBundler.ts @@ -1,6 +1,6 @@ import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js" import { getLogger } from "../helpers/logger.js" -import { integerToUint8Array } from "../helpers/tlv.js" +import { decodeListTLV, integerToUint8Array, parseTLV } from "../helpers/tlv.js" import { StorageSettings } from "./index.js" import { TlvFilesStorage } from "./tlvFilesStorage.js" import * as Types from "../../../proto/autogenerated/ts/types.js" @@ -29,6 +29,7 @@ export type TxPointSettings = { timeDiscount?: true } export class StateBundler { + tlvStorage: TlvFilesStorage reportLog = getLogger({ component: 'stateBundlerReport' }) prevValues: Record = {} @@ -54,6 +55,17 @@ export class StateBundler { }) return metrics } + + async GetSingleBundleMetrics(req: Types.SingleMetricReq): Promise { + const { fileData, chunks } = this.tlvStorage.LoadFile(req.app_id, req.metrics_name, req.page) + const decoded = decodeListTLV(parseTLV(fileData)) + return { + current_chunk: req.page, + available_chunks: chunks, + base_64_data: decoded.map(d => Buffer.from(d).toString('base64')) + } + } + AddValue = (appId: string, key: string, v: number) => { const prevValueKey = `${appId}_${key}` if (this.prevValues[prevValueKey] === v) { diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index 667ae608..6aab22f1 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -5,16 +5,21 @@ import { ERROR, getLogger } from "../helpers/logger.js" import * as Types from '../../../proto/autogenerated/ts/types.js' import { NostrSend, SendData, SendInitiator } from "../nostr/handler.js" import { encodeTLbV, encodeTLV, encodeTLVDataPacket } from '../helpers/tlv.js' +import { Utils } from '../helpers/utilsWrapper.js' +import { TlvFilesStorage } from '../storage/tlvFilesStorage.js' type IceCandidate = { type: string, candidate?: string, sdpMid?: string, sdpMLineIndex?: number } const configuration = { 'iceServers': [{ 'urls': 'stun:relay.webwormhole.io' }] } type UserInfo = { userPub: string, appId: string } export default class webRTC { + private storage: Storage private log = getLogger({ component: 'webRTC' }) private connections: Record = {} private _nostrSend: NostrSend - constructor(storage: Storage) { + private utils: Utils + constructor(storage: Storage, utils: Utils) { this.storage = storage + this.utils = utils } attachNostrSend(f: NostrSend) { this._nostrSend = f @@ -39,6 +44,7 @@ export default class webRTC { } return {} } + private onCandidate = async (u: UserInfo, candidate: string): Promise => { const key = this.getConnectionsKey(u) if (!this.connections[key]) { @@ -46,7 +52,6 @@ export default class webRTC { } const conn = this.connections[key] const iceCandidate: IceCandidate = JSON.parse(candidate) - console.log({ iceCandidate }) if (iceCandidate.candidate) { await conn.addIceCandidate(iceCandidate); } @@ -80,8 +85,8 @@ export default class webRTC { const channel = event.channel channel.addEventListener('message', async (event) => { try { - const j = JSON.parse(event.data) as Types.SingleUsageMetricReq - const err = Types.SingleUsageMetricReqValidate(j, { + const j = JSON.parse(event.data) as Types.SingleMetricReq + const err = Types.SingleMetricReqValidate(j, { app_id_CustomCheck: id => id === u.appId, metrics_name_CustomCheck: name => name !== "" }) @@ -89,7 +94,18 @@ export default class webRTC { this.log(ERROR, 'SingleUsageMetricReqValidate', err) return } - const { fileData } = this.storage.metricsEventStorage.tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) + let tlvStorage: TlvFilesStorage + switch (j.metric_type) { + case Types.SingleMetricType.USAGE_METRIC: + tlvStorage = this.storage.metricsEventStorage.tlvStorage + break + case Types.SingleMetricType.BUNDLE_METRIC: + tlvStorage = this.utils.stateBundler.tlvStorage + break + default: + throw new Error("Unknown metric type") + } + const { fileData } = tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 const packets: Buffer[] = [] From bb14c0c0c966e669c4d092fd1bf64f20ba288895 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Thu, 6 Feb 2025 16:46:34 +0000 Subject: [PATCH 11/11] update logic --- src/services/storage/stateBundler.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/stateBundler.ts index 560b3c24..3b81e15f 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/stateBundler.ts @@ -66,9 +66,9 @@ export class StateBundler { } } - AddValue = (appId: string, key: string, v: number) => { + AddValue = (appId: string, key: string, v: number, updateOnly = false) => { const prevValueKey = `${appId}_${key}` - if (this.prevValues[prevValueKey] === v) { + if (updateOnly && this.prevValues[prevValueKey] === v) { return } this.prevValues[prevValueKey] = v @@ -92,7 +92,7 @@ export class StateBundler { AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = [], appId = '_root') => { const key = [actionName, ...meta].join('_') - this.AddValue(appId, key, v) + this.AddValue(appId, key, v, true) } AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = [], appId = '_root') => {