From 531947a49747ec0617d3c7660717d023c915e1f2 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Tue, 4 Feb 2025 20:13:41 +0000 Subject: [PATCH] bundle metrics --- proto/autogenerated/client.md | 26 +++ proto/autogenerated/go/http_client.go | 30 ++++ proto/autogenerated/go/types.go | 14 ++ proto/autogenerated/ts/express_server.ts | 22 +++ proto/autogenerated/ts/http_client.ts | 14 ++ proto/autogenerated/ts/nostr_client.ts | 15 ++ proto/autogenerated/ts/nostr_transport.ts | 16 ++ proto/autogenerated/ts/types.ts | 105 ++++++++++- proto/service/methods.proto | 8 + proto/service/structs.proto | 20 +++ src/index.ts | 19 +- src/services/helpers/tlv.ts | 2 + src/services/helpers/utilsWrapper.ts | 2 +- src/services/main/index.ts | 8 +- src/services/main/paymentManager.ts | 4 +- src/services/serverMethods/index.ts | 3 + src/services/storage/index.ts | 2 - src/services/storage/metricsEventStorage.ts | 166 +++-------------- src/services/storage/stateBundler.ts | 190 ++++++++++---------- src/services/storage/tlvFilesStorage.ts | 153 ++++++++++++++++ src/services/webRTC/index.ts | 6 +- 21 files changed, 573 insertions(+), 252 deletions(-) create mode 100644 src/services/storage/tlvFilesStorage.ts diff --git a/proto/autogenerated/client.md b/proto/autogenerated/client.md index 277807ee..04f579e6 100644 --- a/proto/autogenerated/client.md +++ b/proto/autogenerated/client.md @@ -93,6 +93,11 @@ The nostr server will send back a message response, and inside the body there wi - input: [AppsMetricsRequest](#AppsMetricsRequest) - output: [AppsMetrics](#AppsMetrics) +- GetBundleMetrics + - auth type: __Metrics__ + - input: [LatestBundleMetricReq](#LatestBundleMetricReq) + - output: [BundleMetrics](#BundleMetrics) + - GetDebitAuthorizations - auth type: __User__ - This methods has an __empty__ __request__ body @@ -481,6 +486,13 @@ The nostr server will send back a message response, and inside the body there wi - input: [AppsMetricsRequest](#AppsMetricsRequest) - output: [AppsMetrics](#AppsMetrics) +- GetBundleMetrics + - auth type: __Metrics__ + - http method: __post__ + - http route: __/api/reports/bundle__ + - input: [LatestBundleMetricReq](#LatestBundleMetricReq) + - output: [BundleMetrics](#BundleMetrics) + - GetDebitAuthorizations - auth type: __User__ - http method: __get__ @@ -958,6 +970,17 @@ The nostr server will send back a message response, and inside the body there wi - __nostr_pub__: _string_ - __user_identifier__: _string_ +### BundleData + - __available_chunks__: ARRAY of: _number_ + - __base_64_data__: ARRAY of: _string_ + - __current_chunk__: _number_ + +### BundleMetric + - __app_bundles__: MAP with key: _string_ and value: _[BundleData](#BundleData)_ + +### BundleMetrics + - __apps__: MAP with key: _string_ and value: _[BundleMetric](#BundleMetric)_ + ### CallbackUrl - __url__: _string_ @@ -1107,6 +1130,9 @@ The nostr server will send back a message response, and inside the body there wi - __token__: _string_ - __url__: _string_ +### LatestBundleMetricReq + - __limit__: _number_ *this field is optional + ### LatestUsageMetricReq - __limit__: _number_ *this field is optional diff --git a/proto/autogenerated/go/http_client.go b/proto/autogenerated/go/http_client.go index 78889c60..285945da 100644 --- a/proto/autogenerated/go/http_client.go +++ b/proto/autogenerated/go/http_client.go @@ -77,6 +77,7 @@ type Client struct { GetAppUser func(req GetAppUserRequest) (*AppUser, error) GetAppUserLNURLInfo func(req GetAppUserLNURLInfoRequest) (*LnurlPayInfoResponse, error) GetAppsMetrics func(req AppsMetricsRequest) (*AppsMetrics, error) + GetBundleMetrics func(req LatestBundleMetricReq) (*BundleMetrics, error) GetDebitAuthorizations func() (*DebitAuthorizations, error) GetErrorStats func() (*ErrorStats, error) GetHttpCreds func() (*HttpCreds, error) @@ -740,6 +741,35 @@ func NewClient(params ClientParams) *Client { } return &res, nil }, + GetBundleMetrics: func(req LatestBundleMetricReq) (*BundleMetrics, error) { + auth, err := params.RetrieveMetricsAuth() + if err != nil { + return nil, err + } + finalRoute := "/api/reports/bundle" + body, err := json.Marshal(req) + if err != nil { + return nil, err + } + resBody, err := doPostRequest(params.BaseURL+finalRoute, body, auth) + if err != nil { + return nil, err + } + result := ResultError{} + err = json.Unmarshal(resBody, &result) + if err != nil { + return nil, err + } + if result.Status == "ERROR" { + return nil, fmt.Errorf(result.Reason) + } + res := BundleMetrics{} + err = json.Unmarshal(resBody, &res) + if err != nil { + return nil, err + } + return &res, nil + }, GetDebitAuthorizations: func() (*DebitAuthorizations, error) { auth, err := params.RetrieveUserAuth() if err != nil { diff --git a/proto/autogenerated/go/types.go b/proto/autogenerated/go/types.go index 11158b1b..b5d42976 100644 --- a/proto/autogenerated/go/types.go +++ b/proto/autogenerated/go/types.go @@ -174,6 +174,17 @@ type BannedAppUser struct { Nostr_pub string `json:"nostr_pub"` User_identifier string `json:"user_identifier"` } +type BundleData struct { + Available_chunks []int64 `json:"available_chunks"` + Base_64_data []string `json:"base_64_data"` + Current_chunk int64 `json:"current_chunk"` +} +type BundleMetric struct { + App_bundles map[string]BundleData `json:"app_bundles"` +} +type BundleMetrics struct { + Apps map[string]BundleMetric `json:"apps"` +} type CallbackUrl struct { Url string `json:"url"` } @@ -323,6 +334,9 @@ type HttpCreds struct { Token string `json:"token"` Url string `json:"url"` } +type LatestBundleMetricReq struct { + Limit int64 `json:"limit"` +} type LatestUsageMetricReq struct { Limit int64 `json:"limit"` } diff --git a/proto/autogenerated/ts/express_server.ts b/proto/autogenerated/ts/express_server.ts index 5c1da226..3ab582e1 100644 --- a/proto/autogenerated/ts/express_server.ts +++ b/proto/autogenerated/ts/express_server.ts @@ -866,6 +866,28 @@ export default (methods: Types.ServerMethods, opts: ServerOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } }) + if (!opts.allowNotImplementedMethods && !methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented') + app.post('/api/reports/bundle', async (req, res) => { + const info: Types.RequestInfo = { rpcName: 'GetBundleMetrics', batch: false, nostr: false, batchSize: 0} + const stats: Types.RequestStats = { startMs:req.startTimeMs || 0, start:req.startTime || 0n, parse: process.hrtime.bigint(), guard: 0n, validate: 0n, handle: 0n } + let authCtx: Types.AuthContext = {} + try { + if (!methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented') + const authContext = await opts.MetricsAuthGuard(req.headers['authorization']) + authCtx = authContext + stats.guard = process.hrtime.bigint() + const request = req.body + const error = Types.LatestBundleMetricReqValidate(request) + stats.validate = process.hrtime.bigint() + if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authContext }, opts.metricsCallback) + const query = req.query + const params = req.params + const response = await methods.GetBundleMetrics({rpcName:'GetBundleMetrics', ctx:authContext , req: request}) + stats.handle = process.hrtime.bigint() + res.json({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + }) if (!opts.allowNotImplementedMethods && !methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented') app.get('/api/user/debit/get', async (req, res) => { const info: Types.RequestInfo = { rpcName: 'GetDebitAuthorizations', batch: false, nostr: false, batchSize: 0} diff --git a/proto/autogenerated/ts/http_client.ts b/proto/autogenerated/ts/http_client.ts index 2c408f93..f2d79af6 100644 --- a/proto/autogenerated/ts/http_client.ts +++ b/proto/autogenerated/ts/http_client.ts @@ -318,6 +318,20 @@ export default (params: ClientParams) => ({ } return { status: 'ERROR', reason: 'invalid response' } }, + GetBundleMetrics: async (request: Types.LatestBundleMetricReq): Promise => { + const auth = await params.retrieveMetricsAuth() + if (auth === null) throw new Error('retrieveMetricsAuth() returned null') + let finalRoute = '/api/reports/bundle' + const { data } = await axios.post(params.baseUrl + finalRoute, request, { headers: { 'authorization': auth } }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.BundleMetricsValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, GetDebitAuthorizations: async (): Promise => { const auth = await params.retrieveUserAuth() if (auth === null) throw new Error('retrieveUserAuth() returned null') diff --git a/proto/autogenerated/ts/nostr_client.ts b/proto/autogenerated/ts/nostr_client.ts index b2cc12c7..28a281f6 100644 --- a/proto/autogenerated/ts/nostr_client.ts +++ b/proto/autogenerated/ts/nostr_client.ts @@ -233,6 +233,21 @@ export default (params: NostrClientParams, send: (to:string, message: NostrRequ } return { status: 'ERROR', reason: 'invalid response' } }, + GetBundleMetrics: async (request: Types.LatestBundleMetricReq): Promise => { + const auth = await params.retrieveNostrMetricsAuth() + if (auth === null) throw new Error('retrieveNostrMetricsAuth() returned null') + const nostrRequest: NostrRequest = {} + nostrRequest.body = request + const data = await send(params.pubDestination, {rpcName:'GetBundleMetrics',authIdentifier:auth, ...nostrRequest }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.BundleMetricsValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, GetDebitAuthorizations: async (): Promise => { const auth = await params.retrieveNostrUserAuth() if (auth === null) throw new Error('retrieveNostrUserAuth() returned null') diff --git a/proto/autogenerated/ts/nostr_transport.ts b/proto/autogenerated/ts/nostr_transport.ts index 2c4ff956..5cf473ff 100644 --- a/proto/autogenerated/ts/nostr_transport.ts +++ b/proto/autogenerated/ts/nostr_transport.ts @@ -621,6 +621,22 @@ export default (methods: Types.ServerMethods, opts: NostrOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } break + case 'GetBundleMetrics': + try { + if (!methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented') + const authContext = await opts.NostrMetricsAuthGuard(req.appId, req.authIdentifier) + stats.guard = process.hrtime.bigint() + authCtx = authContext + const request = req.body + const error = Types.LatestBundleMetricReqValidate(request) + stats.validate = process.hrtime.bigint() + if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback) + const response = await methods.GetBundleMetrics({rpcName:'GetBundleMetrics', ctx:authContext , req: request}) + stats.handle = process.hrtime.bigint() + res({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + break case 'GetDebitAuthorizations': try { if (!methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented') diff --git a/proto/autogenerated/ts/types.ts b/proto/autogenerated/ts/types.ts index 76655f9a..ca4a72e9 100644 --- a/proto/autogenerated/ts/types.ts +++ b/proto/autogenerated/ts/types.ts @@ -28,8 +28,8 @@ export type MetricsContext = { app_id: string operator_id: string } -export type MetricsMethodInputs = GetAppsMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input -export type MetricsMethodOutputs = GetAppsMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output +export type MetricsMethodInputs = GetAppsMetrics_Input | GetBundleMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input +export type MetricsMethodOutputs = GetAppsMetrics_Output | GetBundleMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output export type UserContext = { app_id: string app_user_id: string @@ -108,6 +108,9 @@ export type GetAppUserLNURLInfo_Output = ResultError | ({ status: 'OK' } & Lnurl export type GetAppsMetrics_Input = {rpcName:'GetAppsMetrics', req: AppsMetricsRequest} export type GetAppsMetrics_Output = ResultError | ({ status: 'OK' } & AppsMetrics) +export type GetBundleMetrics_Input = {rpcName:'GetBundleMetrics', req: LatestBundleMetricReq} +export type GetBundleMetrics_Output = ResultError | ({ status: 'OK' } & BundleMetrics) + export type GetDebitAuthorizations_Input = {rpcName:'GetDebitAuthorizations'} export type GetDebitAuthorizations_Output = ResultError | ({ status: 'OK' } & DebitAuthorizations) @@ -312,6 +315,7 @@ export type ServerMethods = { GetAppUser?: (req: GetAppUser_Input & {ctx: AppContext }) => Promise GetAppUserLNURLInfo?: (req: GetAppUserLNURLInfo_Input & {ctx: AppContext }) => Promise GetAppsMetrics?: (req: GetAppsMetrics_Input & {ctx: MetricsContext }) => Promise + GetBundleMetrics?: (req: GetBundleMetrics_Input & {ctx: MetricsContext }) => Promise GetDebitAuthorizations?: (req: GetDebitAuthorizations_Input & {ctx: UserContext }) => Promise GetErrorStats?: (req: GetErrorStats_Input & {ctx: MetricsContext }) => Promise GetHttpCreds?: (req: GetHttpCreds_Input & {ctx: UserContext }) => Promise @@ -924,6 +928,84 @@ export const BannedAppUserValidate = (o?: BannedAppUser, opts: BannedAppUserOpti return null } +export type BundleData = { + available_chunks: number[] + base_64_data: string[] + current_chunk: number +} +export const BundleDataOptionalFields: [] = [] +export type BundleDataOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + available_chunks_CustomCheck?: (v: number[]) => boolean + base_64_data_CustomCheck?: (v: string[]) => boolean + current_chunk_CustomCheck?: (v: number) => boolean +} +export const BundleDataValidate = (o?: BundleData, opts: BundleDataOptions = {}, path: string = 'BundleData::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (!Array.isArray(o.available_chunks)) return new Error(`${path}.available_chunks: is not an array`) + for (let index = 0; index < o.available_chunks.length; index++) { + if (typeof o.available_chunks[index] !== 'number') return new Error(`${path}.available_chunks[${index}]: is not a number`) + } + if (opts.available_chunks_CustomCheck && !opts.available_chunks_CustomCheck(o.available_chunks)) return new Error(`${path}.available_chunks: custom check failed`) + + if (!Array.isArray(o.base_64_data)) return new Error(`${path}.base_64_data: is not an array`) + for (let index = 0; index < o.base_64_data.length; index++) { + if (typeof o.base_64_data[index] !== 'string') return new Error(`${path}.base_64_data[${index}]: is not a string`) + } + if (opts.base_64_data_CustomCheck && !opts.base_64_data_CustomCheck(o.base_64_data)) return new Error(`${path}.base_64_data: custom check failed`) + + if (typeof o.current_chunk !== 'number') return new Error(`${path}.current_chunk: is not a number`) + if (opts.current_chunk_CustomCheck && !opts.current_chunk_CustomCheck(o.current_chunk)) return new Error(`${path}.current_chunk: custom check failed`) + + return null +} + +export type BundleMetric = { + app_bundles: Record +} +export const BundleMetricOptionalFields: [] = [] +export type BundleMetricOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + app_bundles_EntryOptions?: BundleDataOptions + app_bundles_CustomCheck?: (v: Record) => boolean +} +export const BundleMetricValidate = (o?: BundleMetric, opts: BundleMetricOptions = {}, path: string = 'BundleMetric::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (typeof o.app_bundles !== 'object' || o.app_bundles === null) return new Error(`${path}.app_bundles: is not an object or is null`) + for (const key in o.app_bundles) { + const app_bundlesErr = BundleDataValidate(o.app_bundles[key], opts.app_bundles_EntryOptions, `${path}.app_bundles['${key}']`) + if (app_bundlesErr !== null) return app_bundlesErr + } + + return null +} + +export type BundleMetrics = { + apps: Record +} +export const BundleMetricsOptionalFields: [] = [] +export type BundleMetricsOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + apps_EntryOptions?: BundleMetricOptions + apps_CustomCheck?: (v: Record) => boolean +} +export const BundleMetricsValidate = (o?: BundleMetrics, opts: BundleMetricsOptions = {}, path: string = 'BundleMetrics::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (typeof o.apps !== 'object' || o.apps === null) return new Error(`${path}.apps: is not an object or is null`) + for (const key in o.apps) { + const appsErr = BundleMetricValidate(o.apps[key], opts.apps_EntryOptions, `${path}.apps['${key}']`) + if (appsErr !== null) return appsErr + } + + return null +} + export type CallbackUrl = { url: string } @@ -1812,6 +1894,25 @@ export const HttpCredsValidate = (o?: HttpCreds, opts: HttpCredsOptions = {}, pa return null } +export type LatestBundleMetricReq = { + limit?: number +} +export type LatestBundleMetricReqOptionalField = 'limit' +export const LatestBundleMetricReqOptionalFields: LatestBundleMetricReqOptionalField[] = ['limit'] +export type LatestBundleMetricReqOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: LatestBundleMetricReqOptionalField[] + limit_CustomCheck?: (v?: number) => boolean +} +export const LatestBundleMetricReqValidate = (o?: LatestBundleMetricReq, opts: LatestBundleMetricReqOptions = {}, path: string = 'LatestBundleMetricReq::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if ((o.limit || opts.allOptionalsAreSet || opts.checkOptionalsAreSet?.includes('limit')) && typeof o.limit !== 'number') return new Error(`${path}.limit: is not a number`) + if (opts.limit_CustomCheck && !opts.limit_CustomCheck(o.limit)) return new Error(`${path}.limit: custom check failed`) + + return null +} + export type LatestUsageMetricReq = { limit?: number } diff --git a/proto/service/methods.proto b/proto/service/methods.proto index 1b404f2b..5f20f8ac 100644 --- a/proto/service/methods.proto +++ b/proto/service/methods.proto @@ -181,6 +181,14 @@ service LightningPub { option (http_route) = "/api/reports/usage"; option (nostr) = true; } + + rpc GetBundleMetrics(structs.LatestBundleMetricReq) returns (structs.BundleMetrics) { + option (auth_type) = "Metrics"; + option (http_method) = "post"; + option (http_route) = "/api/reports/bundle"; + option (nostr) = true; + } + rpc GetSingleUsageMetrics(structs.SingleUsageMetricReq) returns (structs.UsageMetricTlv) { option (auth_type) = "Metrics"; option (http_method) = "post"; diff --git a/proto/service/structs.proto b/proto/service/structs.proto index 2e74b8fa..8079f6ea 100644 --- a/proto/service/structs.proto +++ b/proto/service/structs.proto @@ -62,6 +62,8 @@ message UsageMetric { optional string app_id = 11; } + + message UsageMetricTlv { repeated string base_64_tlvs = 1; int64 current_chunk = 2; @@ -77,6 +79,24 @@ message UsageMetrics { map apps = 1; } +message LatestBundleMetricReq { + optional int64 limit = 1; +} + +message BundleData { + repeated string base_64_data = 1; + int64 current_chunk = 2; + repeated int64 available_chunks = 3; +} + +message BundleMetric { + map app_bundles = 1; +} + +message BundleMetrics { + map apps = 1; +} + message AppsMetricsRequest { optional int64 from_unix = 1; optional int64 to_unix = 2; diff --git a/src/index.ts b/src/index.ts index 886202be..19464c04 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,10 +24,11 @@ const start = async () => { const serverMethods = GetServerMethods(mainHandler) const nostrSettings = LoadNosrtSettingsFromEnv() log("initializing nostr middleware") - const { Send } = nostrMiddleware(serverMethods, mainHandler, + const { Send, Stop } = nostrMiddleware(serverMethods, mainHandler, { ...nostrSettings, apps, clients: [liquidityProviderInfo] }, (e, p) => mainHandler.liquidityProvider.onEvent(e, p) ) + exitHandler(() => { Stop() }) log("starting server") mainHandler.attachNostrSend(Send) mainHandler.StartBeacons() @@ -40,3 +41,19 @@ const start = async () => { Server.Listen(mainSettings.servicePort) } start() + +const exitHandler = async (kill: () => void) => { + // catch ctrl+c event and exit normally + process.on('SIGINT', () => { + console.log('Ctrl-C detected, exiting safely...'); + process.exit(2); + }); + + //catch uncaught exceptions, trace, then exit normally + process.on('uncaughtException', (e) => { + console.log('Uncaught Exception detected, exiting safely, and killing all child processes...'); + console.log(e.stack); + kill(); + process.exit(99); + }); +} \ No newline at end of file diff --git a/src/services/helpers/tlv.ts b/src/services/helpers/tlv.ts index e86b9ada..7d033143 100644 --- a/src/services/helpers/tlv.ts +++ b/src/services/helpers/tlv.ts @@ -32,6 +32,8 @@ export const decodeListTLV = (tlv: TLV): Uint8Array[] => { return tlv[64] } + + export const usageMetricsToTlv = (metric: Types.UsageMetric): TLV => { const tlv: TLV = {} tlv[2] = [integerToUint8Array(Math.ceil(metric.processed_at_ms / 1000))] // 6 -> 6 diff --git a/src/services/helpers/utilsWrapper.ts b/src/services/helpers/utilsWrapper.ts index f00626e0..35cefc17 100644 --- a/src/services/helpers/utilsWrapper.ts +++ b/src/services/helpers/utilsWrapper.ts @@ -6,6 +6,6 @@ export class Utils { settings: MainSettings constructor(settings: MainSettings) { this.settings = settings - this.stateBundler = new StateBundler() + this.stateBundler = new StateBundler(settings.storageSettings) } } \ No newline at end of file diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 08001081..7de52a43 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -188,9 +188,9 @@ export default class { const operationId = `${Types.UserOperationType.INCOMING_TX}-${addedTx.serial_id}` const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: fee, confirmed: internal, tx_hash: txOutput.hash, internal: false } this.sendOperationToNostr(userAddress.linkedApplication, userAddress.user.user_id, op) - this.utils.stateBundler.AddTxPoint('addressWasPaid', amount, { used, from: 'system', timeDiscount: true }) + this.utils.stateBundler.AddTxPoint('addressWasPaid', amount, { used, from: 'system', timeDiscount: true }, userAddress.linkedApplication.app_id) } catch (err: any) { - this.utils.stateBundler.AddTxPointFailed('addressWasPaid', amount, { used, from: 'system' }) + this.utils.stateBundler.AddTxPointFailed('addressWasPaid', amount, { used, from: 'system' }, userAddress.linkedApplication.app_id) log(ERROR, "cannot process address paid transaction, already registered") } }) @@ -234,9 +234,9 @@ export default class { log(ERROR, "cannot create zap receipt", err.message || "") } this.liquidityManager.afterInInvoicePaid() - this.utils.stateBundler.AddTxPoint('invoiceWasPaid', amount, { used, from: 'system', timeDiscount: true }) + this.utils.stateBundler.AddTxPoint('invoiceWasPaid', amount, { used, from: 'system', timeDiscount: true }, userInvoice.linkedApplication.app_id) } catch (err: any) { - this.utils.stateBundler.AddTxPointFailed('invoiceWasPaid', amount, { used, from: 'system' }) + this.utils.stateBundler.AddTxPointFailed('invoiceWasPaid', amount, { used, from: 'system' }, userInvoice.linkedApplication.app_id) log(ERROR, "cannot process paid invoice", err.message || "") } }) diff --git a/src/services/main/paymentManager.ts b/src/services/main/paymentManager.ts index 4a18a2f8..797bfe8c 100644 --- a/src/services/main/paymentManager.ts +++ b/src/services/main/paymentManager.ts @@ -355,11 +355,11 @@ export default class { try { await this.invoicePaidCb(internalInvoice.invoice, payAmount, 'internal') const newPayment = await this.storage.paymentStorage.AddInternalPayment(userId, internalInvoice.invoice, payAmount, serviceFee, linkedApplication, debitNpub) - this.utils.stateBundler.AddTxPoint('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }) + this.utils.stateBundler.AddTxPoint('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }, linkedApplication.app_id) return { preimage: "", amtPaid: payAmount, networkFee: 0, serialId: newPayment.serial_id } } catch (err) { await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement, "internal_payment_refund:" + internalInvoice.invoice) - this.utils.stateBundler.AddTxPointFailed('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }) + this.utils.stateBundler.AddTxPointFailed('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }, linkedApplication.app_id) throw err } diff --git a/src/services/serverMethods/index.ts b/src/services/serverMethods/index.ts index 246985bf..7291aec8 100644 --- a/src/services/serverMethods/index.ts +++ b/src/services/serverMethods/index.ts @@ -17,6 +17,9 @@ export default (mainHandler: Main): Types.ServerMethods => { GetUsageMetrics: async ({ ctx, req }) => { return mainHandler.metricsManager.GetUsageMetrics(req) }, + GetBundleMetrics: async ({ ctx, req }) => { + return mainHandler.utils.stateBundler.GetBundleMetrics(req) + }, GetSingleUsageMetrics: async ({ ctx, req }) => { return mainHandler.metricsManager.GetSingleUsageMetrics(req) }, diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index 7f105ce0..5f94a546 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -10,7 +10,6 @@ import MetricsEventStorage from "./metricsEventStorage.js"; import TransactionsQueue, { TX } from "./transactionsQueue.js"; import EventsLogManager from "./eventsLog.js"; import { LiquidityStorage } from "./liquidityStorage.js"; -import { StateBundler } from "./stateBundler.js"; import DebitStorage from "./debitStorage.js" import OfferStorage from "./offerStorage.js" export type StorageSettings = { @@ -35,7 +34,6 @@ export default class { debitStorage: DebitStorage offerStorage: OfferStorage eventsLog: EventsLogManager - stateBundler: StateBundler constructor(settings: StorageSettings) { this.settings = settings this.eventsLog = new EventsLogManager(settings.eventLogPath) diff --git a/src/services/storage/metricsEventStorage.ts b/src/services/storage/metricsEventStorage.ts index 619f903c..ea1a2d5e 100644 --- a/src/services/storage/metricsEventStorage.ts +++ b/src/services/storage/metricsEventStorage.ts @@ -2,51 +2,31 @@ import fs from 'fs' import * as Types from '../../../proto/autogenerated/ts/types.js' import { StorageSettings } from "./index.js"; import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'; -const chunkSizeBytes = 128 * 1024 +import { TlvFilesStorage } from './tlvFilesStorage.js'; export default class { - settings: StorageSettings - metricsPath: string + tlvStorage: TlvFilesStorage cachePath: string - metaReady = false - metricsMeta: Record> = {} - pendingMetrics: Record> = {} last24hCache: { ts: number, ok: number, fail: number }[] = [] - lastPersistedMetrics: number = 0 lastPersistedCache: number = 0 constructor(settings: StorageSettings) { - this.settings = settings; - this.metricsPath = [settings.dataDir, "metric_events"].join("/") + const metricsPath = [settings.dataDir, "metric_events"].join("/") + this.tlvStorage = new TlvFilesStorage(metricsPath) this.cachePath = [settings.dataDir, "metric_cache"].join("/") if (!fs.existsSync(this.cachePath)) { fs.mkdirSync(this.cachePath, { recursive: true }); } - this.initMetricsMeta() + this.tlvStorage.initMeta() this.loadCache() setInterval(() => { - if (Date.now() - this.lastPersistedMetrics > 1000 * 60 * 4) { - this.persistMetrics() - } if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) { this.persistCache() } }, 1000 * 60 * 5) process.on('exit', () => { - this.persistMetrics() this.persistCache() }); - // catch ctrl+c event and exit normally - process.on('SIGINT', () => { - console.log('Ctrl-C...'); - process.exit(2); - }); - //catch uncaught exceptions, trace, then exit normally - process.on('uncaughtException', (e) => { - console.log('Uncaught Exception...'); - console.log(e.stack); - process.exit(99); - }); } getlast24hCache = () => { return this.last24hCache } @@ -93,139 +73,35 @@ export default class { } AddMetricEvent = (appId: string, method: string, metric: Uint8Array, success: boolean) => { - if (!this.metaReady) { - throw new Error("meta metrics not ready") - } - if (!this.pendingMetrics[appId]) { - this.pendingMetrics[appId] = {} - } - if (!this.pendingMetrics[appId][method]) { - this.pendingMetrics[appId][method] = { tlvs: [] } - } - this.pendingMetrics[appId][method].tlvs.push(metric) + this.tlvStorage.AddTlv(appId, method, metric) this.pushToCache(success) } LoadLatestMetrics = async (limit = 100): Promise => { - this.persistMetrics() + const raw = this.tlvStorage.LoadLatest(limit) const metrics: Types.UsageMetrics = { apps: {} } - this.foreachMetricMethodFile((app, method, tlvFiles) => { - if (tlvFiles.length === 0) { return } - const methodPath = [this.metricsPath, app, method].join("/") - const latest = tlvFiles[tlvFiles.length - 1] - const tlvFile = [methodPath, `${latest}.mtlv`].join("/") - const tlv = fs.readFileSync(tlvFile) - const decoded = decodeListTLV(parseTLV(tlv)) - if (!metrics.apps[app]) { - metrics.apps[app] = { app_metrics: {} } - } - if (decoded.length > limit) { - decoded.splice(0, decoded.length - limit) - } - metrics.apps[app].app_metrics[method] = { - base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')), - current_chunk: latest, - available_chunks: tlvFiles - } + Object.keys(raw).forEach(app => { + metrics.apps[app] = { app_metrics: {} } + Object.keys(raw[app]).forEach(method => { + const data = raw[app][method] + metrics.apps[app].app_metrics[method] = { + base_64_tlvs: data.tlvs.map(d => Buffer.from(d).toString('base64')), + current_chunk: data.current_chunk, + available_chunks: data.available_chunks + } + }) }) return metrics } - - LoadRawMetricsFile = async (app: string, method: string, chunk: number): Promise => { - if (!this.metaReady || !this.metricsMeta[app] || !this.metricsMeta[app][method] || !this.metricsMeta[app][method].chunks.includes(chunk)) { - throw new Error("metrics not found") - } - const fullPath = [this.metricsPath, app, method, `${chunk}.mtlv`].join("/") - return fs.readFileSync(fullPath) - } LoadMetricsFile = async (app: string, method: string, chunk: number): Promise => { - const tlv = await this.LoadRawMetricsFile(app, method, chunk) - const decoded = decodeListTLV(parseTLV(tlv)) + const { fileData, chunks } = this.tlvStorage.LoadFile(app, method, chunk) + //const tlv = await this.LoadRawMetricsFile(app, method, chunk) + const decoded = decodeListTLV(parseTLV(fileData)) return { base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')), current_chunk: chunk, - available_chunks: this.metricsMeta[app][method].chunks + available_chunks: chunks } } - - persistMetrics = () => { - if (!this.metaReady) { - throw new Error("meta metrics not ready") - } - this.lastPersistedMetrics = Date.now() - const tosync = this.pendingMetrics - this.pendingMetrics = {} - const apps = Object.keys(tosync) - apps.map(app => { - const appPath = [this.metricsPath, app].join("/") - if (!fs.existsSync(appPath)) { - fs.mkdirSync(appPath, { recursive: true }); - } - const methods = Object.keys(tosync[app]) - methods.map(methodName => { - const methodPath = [appPath, methodName].join("/") - if (!fs.existsSync(methodPath)) { - fs.mkdirSync(methodPath, { recursive: true }); - } - const method = tosync[app][methodName] - const meta = this.getMetricsMeta(app, methodName) - const chunks = meta.chunks.length > 0 ? meta.chunks : [0] - const latest = chunks[chunks.length - 1] - const tlv = encodeTLV(encodeListTLV(method.tlvs)) - const tlvFile = [methodPath, `${latest}.mtlv`].join("/") - fs.appendFileSync(tlvFile, Buffer.from(tlv)) - if (fs.lstatSync(tlvFile).size > chunkSizeBytes) { - this.updateMetricsMeta(app, methodName, [...chunks, latest + 1]) - } - }) - }) - } - - initMetricsMeta = () => { - this.foreachMetricMethodFile((app, method, tlvFiles) => { - this.updateMetricsMeta(app, method, tlvFiles) - }) - this.metaReady = true - } - - updateMetricsMeta = (appId: string, method: string, sortedChunks: number[]) => { - if (!this.metricsMeta[appId]) { - this.metricsMeta[appId] = {} - } - this.metricsMeta[appId][method] = { chunks: sortedChunks } - } - - getMetricsMeta = (appId: string, method: string) => { - if (!this.metricsMeta[appId]) { - return { chunks: [] } - } - return this.metricsMeta[appId][method] || { chunks: [] } - } - - foreachMetricMethodFile = (cb: (appId: string, method: string, tlvFiles: number[]) => void) => { - if (!fs.existsSync(this.metricsPath)) { - fs.mkdirSync(this.metricsPath, { recursive: true }); - } - const apps = fs.readdirSync(this.metricsPath) - apps.forEach(appDir => { - const appPath = [this.metricsPath, appDir].join("/") - if (!fs.lstatSync(appPath).isDirectory()) { - return - } - const methods = fs.readdirSync(appPath) - methods.forEach(methodDir => { - const methodPath = [appPath, methodDir].join("/") - if (!fs.lstatSync(methodPath).isDirectory()) { - return - } - const tlvFiles = fs.readdirSync(methodPath) - .filter(f => f.endsWith(".mtlv")) - .map(f => +f.slice(0, -".mtlv".length)) - .filter(n => !isNaN(n)) - .sort((a, b) => a - b) - cb(appDir, methodDir, tlvFiles) - }) - }) - } } \ No newline at end of file diff --git a/src/services/storage/stateBundler.ts b/src/services/storage/stateBundler.ts index 4cd000be..ccebbd2a 100644 --- a/src/services/storage/stateBundler.ts +++ b/src/services/storage/stateBundler.ts @@ -1,5 +1,9 @@ +import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js" import { getLogger } from "../helpers/logger.js" - +import { integerToUint8Array } from "../helpers/tlv.js" +import { StorageSettings } from "./index.js" +import { TlvFilesStorage } from "./tlvFilesStorage.js" +import * as Types from "../../../proto/autogenerated/ts/types.js" const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const const maxStatePointTypes = ['maxProviderRespTime'] as const @@ -25,118 +29,120 @@ export type TxPointSettings = { timeDiscount?: true } export class StateBundler { - sinceStart: StateBundle = {} - lastReport: StateBundle = {} - sinceLatestReport: StateBundle = {} - reportPeriod = 1000 * 60 * 60 * 12 //12h - satsPer1SecondDiscount = 1 - totalSatsForDiscount = 0 - latestReport = Date.now() + tlvStorage: TlvFilesStorage reportLog = getLogger({ component: 'stateBundlerReport' }) - constructor() { - /* process.on('exit', () => { - this.Report() - }); - - // catch ctrl+c event and exit normally - process.on('SIGINT', () => { - console.log('Ctrl-C...'); - process.exit(2); - }); - - //catch uncaught exceptions, trace, then exit normally - process.on('uncaughtException', (e) => { - console.log('Uncaught Exception...'); - console.log(e.stack); - process.exit(99); - }); */ + constructor(settings: StorageSettings) { + const bundlerPath = [settings.dataDir, "bundler_events"].join("/") + this.tlvStorage = new TlvFilesStorage(bundlerPath) + this.tlvStorage.initMeta() } - increment = (key: string, value: number) => { - this.sinceStart[key] = (this.sinceStart[key] || 0) + value - this.sinceLatestReport[key] = (this.sinceLatestReport[key] || 0) + value - this.triggerReportCheck() - } - set = (key: string, value: number) => { - this.sinceStart[key] = value - this.sinceLatestReport[key] = value - this.triggerReportCheck() - } - max = (key: string, value: number) => { - this.sinceStart[key] = Math.max(this.sinceStart[key] || 0, value) - this.sinceLatestReport[key] = Math.max(this.sinceLatestReport[key] || 0, value) - this.triggerReportCheck() + async GetBundleMetrics(req: Types.LatestBundleMetricReq): Promise { + const latest = this.tlvStorage.LoadLatest(req.limit) + const metrics: Types.BundleMetrics = { apps: {} } + Object.keys(latest).forEach(app => { + metrics.apps[app] = { app_bundles: {} } + Object.keys(latest[app]).forEach(dataName => { + const data = latest[app][dataName] + metrics.apps[app].app_bundles[dataName] = { + current_chunk: data.current_chunk, + available_chunks: data.available_chunks, + base_64_data: data.tlvs.map(d => Buffer.from(d).toString('base64')) + } + }) + }) + return metrics } + /* increment = (key: string, value: number) => { + this.sinceStart[key] = (this.sinceStart[key] || 0) + value + this.sinceLatestReport[key] = (this.sinceLatestReport[key] || 0) + value + this.triggerReportCheck() + } + set = (key: string, value: number) => { + this.sinceStart[key] = value + this.sinceLatestReport[key] = value + this.triggerReportCheck() + } + max = (key: string, value: number) => { + this.sinceStart[key] = Math.max(this.sinceStart[key] || 0, value) + this.sinceLatestReport[key] = Math.max(this.sinceLatestReport[key] || 0, value) + this.triggerReportCheck() + } */ - AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings) => { - const { used, from, timeDiscount } = settings + AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => { + const { used, from } = settings const meta = settings.meta || [] const key = [actionName, from, used, ...meta].join('_') - if (timeDiscount) { - this.totalSatsForDiscount += v - } - this.increment(key, v) - //this.smallLogEvent(actionName, from) + + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings) => { + AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => { const { used, from } = settings const meta = settings.meta || [] const key = [actionName, from, used, ...meta, 'failed'].join('_') - this.increment(key, v) + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = []) => { + AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = [], appId = '__root') => { const key = [actionName, ...meta].join('_') - this.set(key, v) + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = []) => { + AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = [], appId = '__root') => { const key = [actionName, ...meta].join('_') - this.max(key, v) + this.tlvStorage.AddTlv(appId, key, this.serializeNow(v)) } - triggerReportCheck = () => { - const discountSeconds = Math.floor(this.totalSatsForDiscount / this.satsPer1SecondDiscount) - const totalElapsed = Date.now() - this.latestReport - const elapsedWithDiscount = totalElapsed + discountSeconds * 1000 - if (elapsedWithDiscount > this.reportPeriod) { - this.Report() - } + serializeNow = (v: number) => { + const nowUnix = Math.floor(Date.now() / 1000) + const entry = new Uint8Array(8) + entry.set(integerToUint8Array(nowUnix), 0) + entry.set(integerToUint8Array(v), 4) + return entry } - smallLogEvent(event: TransactionStatePointType, from: 'user' | 'system') { - const char = from === 'user' ? 'U' : 'S' - switch (event) { - case 'addedAddress': - case 'addedInvoice': - process.stdout.write(`${char}+,`) - return - case 'addressWasPaid': - case 'invoiceWasPaid': - process.stdout.write(`${char}>,`) - return - case 'paidAnAddress': - case 'paidAnInvoice': - process.stdout.write(`${char}<,`) - return - case 'user2user': - process.stdout.write(`UU`) - } - } + /* triggerReportCheck = () => { + const discountSeconds = Math.floor(this.totalSatsForDiscount / this.satsPer1SecondDiscount) + const totalElapsed = Date.now() - this.latestReport + const elapsedWithDiscount = totalElapsed + discountSeconds * 1000 + if (elapsedWithDiscount > this.reportPeriod) { + this.Report() + } + } */ - Report = () => { - this.totalSatsForDiscount = 0 - this.latestReport = Date.now() - this.reportLog("+++++ since last report:") - Object.entries(this.sinceLatestReport).forEach(([key, value]) => { - this.reportLog(key, value) - }) - this.reportLog("+++++ since start:") - Object.entries(this.sinceStart).forEach(([key, value]) => { - this.reportLog(key, value) - }) - this.lastReport = { ...this.sinceLatestReport } - this.sinceLatestReport = {} - } + /* smallLogEvent(event: TransactionStatePointType, from: 'user' | 'system') { + const char = from === 'user' ? 'U' : 'S' + switch (event) { + case 'addedAddress': + case 'addedInvoice': + process.stdout.write(`${char}+,`) + return + case 'addressWasPaid': + case 'invoiceWasPaid': + process.stdout.write(`${char}>,`) + return + case 'paidAnAddress': + case 'paidAnInvoice': + process.stdout.write(`${char}<,`) + return + case 'user2user': + process.stdout.write(`UU`) + } + } */ + + /* Report = () => { + this.totalSatsForDiscount = 0 + this.latestReport = Date.now() + this.reportLog("+++++ since last report:") + Object.entries(this.sinceLatestReport).forEach(([key, value]) => { + this.reportLog(key, value) + }) + this.reportLog("+++++ since start:") + Object.entries(this.sinceStart).forEach(([key, value]) => { + this.reportLog(key, value) + }) + this.lastReport = { ...this.sinceLatestReport } + this.sinceLatestReport = {} + } */ } \ No newline at end of file diff --git a/src/services/storage/tlvFilesStorage.ts b/src/services/storage/tlvFilesStorage.ts new file mode 100644 index 00000000..ad7fe36f --- /dev/null +++ b/src/services/storage/tlvFilesStorage.ts @@ -0,0 +1,153 @@ +import fs from 'fs' +import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js' +const chunkSizeBytes = 128 * 1024 +export type LatestData = Record> +export class TlvFilesStorage { + storagePath: string + lastPersisted: number = 0 + meta: Record> = {} + pending: Record> = {} + metaReady = false + constructor(storagePath: string) { + this.storagePath = storagePath + if (!fs.existsSync(this.storagePath)) { + fs.mkdirSync(this.storagePath, { recursive: true }); + } + this.initMeta() + setInterval(() => { + if (Date.now() - this.lastPersisted > 1000 * 60 * 4) { + this.persist() + } + }, 1000 * 60 * 5) + process.on('exit', () => { + this.persist() + }); + } + + LoadFile = (app: string, dataName: string, chunk: number): { fileData: Buffer, chunks: number[] } => { + if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) { + throw new Error("metrics not found") + } + const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].join("/") + const fileData = fs.readFileSync(fullPath) + return { fileData, chunks: this.meta[app][dataName].chunks } + } + + AddTlv = (appId: string, dataName: string, tlv: Uint8Array) => { + if (!this.metaReady) { + throw new Error("meta metrics not ready") + } + if (!this.pending[appId]) { + this.pending[appId] = {} + } + if (!this.pending[appId][dataName]) { + this.pending[appId][dataName] = { tlvs: [] } + } + this.pending[appId][dataName].tlvs.push(tlv) + } + + LoadLatest = (limit = 100): LatestData => { + this.persist() + const data: LatestData = {} + this.foreachFile((app, dataName, tlvFiles) => { + if (tlvFiles.length === 0) { return } + const methodPath = [this.storagePath, app, dataName].join("/") + const latest = tlvFiles[tlvFiles.length - 1] + const tlvFile = [methodPath, `${latest}.mtlv`].join("/") + const tlv = fs.readFileSync(tlvFile) + const decoded = decodeListTLV(parseTLV(tlv)) + if (!data[app]) { + data[app] = {} + } + if (decoded.length > limit) { + decoded.splice(0, decoded.length - limit) + } + data[app][dataName] = { + tlvs: decoded, + current_chunk: latest, + available_chunks: tlvFiles + } + }) + return data + } + + persist = () => { + if (!this.metaReady) { + throw new Error("meta metrics not ready") + } + this.lastPersisted = Date.now() + const tosync = this.pending + this.pending = {} + const apps = Object.keys(tosync) + apps.map(app => { + const appPath = [this.storagePath, app].join("/") + if (!fs.existsSync(appPath)) { + fs.mkdirSync(appPath, { recursive: true }); + } + const dataNames = Object.keys(tosync[app]) + dataNames.map(dataName => { + const dataPath = [appPath, dataName].join("/") + if (!fs.existsSync(dataPath)) { + fs.mkdirSync(dataPath, { recursive: true }); + } + const data = tosync[app][dataName] + const meta = this.getMeta(app, dataName) + const chunks = meta.chunks.length > 0 ? meta.chunks : [0] + const latest = chunks[chunks.length - 1] + const tlv = encodeTLV(encodeListTLV(data.tlvs)) + const tlvFile = [dataPath, `${latest}.mtlv`].join("/") + fs.appendFileSync(tlvFile, Buffer.from(tlv)) + if (fs.lstatSync(tlvFile).size > chunkSizeBytes) { + this.updateMeta(app, dataName, [...chunks, latest + 1]) + } + }) + }) + } + + getMeta = (appId: string, dataName: string) => { + if (!this.meta[appId]) { + return { chunks: [] } + } + return this.meta[appId][dataName] || { chunks: [] } + } + + initMeta = () => { + this.foreachFile((app, dataName, tlvFiles) => { + this.updateMeta(app, dataName, tlvFiles) + }) + this.metaReady = true + } + + updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => { + if (!this.meta[appId]) { + this.meta[appId] = {} + } + this.meta[appId][dataName] = { chunks: sortedChunks } + } + + foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => { + if (!fs.existsSync(this.storagePath)) { + fs.mkdirSync(this.storagePath, { recursive: true }); + } + const apps = fs.readdirSync(this.storagePath) + apps.forEach(appDir => { + const appPath = [this.storagePath, appDir].join("/") + if (!fs.lstatSync(appPath).isDirectory()) { + return + } + const dataNames = fs.readdirSync(appPath) + dataNames.forEach(dataName => { + const dataPath = [appPath, dataName].join("/") + if (!fs.lstatSync(dataPath).isDirectory()) { + return + } + const tlvFiles = fs.readdirSync(dataPath) + .filter(f => f.endsWith(".mtlv")) + .map(f => +f.slice(0, -".mtlv".length)) + .filter(n => !isNaN(n)) + .sort((a, b) => a - b) + cb(appDir, dataName, tlvFiles) + }) + }) + } +} \ No newline at end of file diff --git a/src/services/webRTC/index.ts b/src/services/webRTC/index.ts index bc90a0a6..5f2989b5 100644 --- a/src/services/webRTC/index.ts +++ b/src/services/webRTC/index.ts @@ -90,12 +90,12 @@ export default class webRTC { this.log(ERROR, 'SingleUsageMetricReqValidate', err) return } - const res = await this.storage.metricsEventStorage.LoadRawMetricsFile(j.app_id, j.metrics_name, j.page) + const { fileData } = this.storage.metricsEventStorage.tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page) const id = j.request_id || Math.floor(Math.random() * 100_000_000) let i = 0 const packets: Buffer[] = [] - while (i < res.length) { - const chunk = res.slice(i, Math.min(i + 15_000, res.length)) + while (i < fileData.length) { + const chunk = fileData.slice(i, Math.min(i + 15_000, fileData.length)) packets.push(chunk) i += 15_000 }