bundle metrics
This commit is contained in:
parent
d25450e022
commit
531947a497
21 changed files with 573 additions and 252 deletions
|
|
@ -93,6 +93,11 @@ The nostr server will send back a message response, and inside the body there wi
|
||||||
- input: [AppsMetricsRequest](#AppsMetricsRequest)
|
- input: [AppsMetricsRequest](#AppsMetricsRequest)
|
||||||
- output: [AppsMetrics](#AppsMetrics)
|
- output: [AppsMetrics](#AppsMetrics)
|
||||||
|
|
||||||
|
- GetBundleMetrics
|
||||||
|
- auth type: __Metrics__
|
||||||
|
- input: [LatestBundleMetricReq](#LatestBundleMetricReq)
|
||||||
|
- output: [BundleMetrics](#BundleMetrics)
|
||||||
|
|
||||||
- GetDebitAuthorizations
|
- GetDebitAuthorizations
|
||||||
- auth type: __User__
|
- auth type: __User__
|
||||||
- This methods has an __empty__ __request__ body
|
- This methods has an __empty__ __request__ body
|
||||||
|
|
@ -481,6 +486,13 @@ The nostr server will send back a message response, and inside the body there wi
|
||||||
- input: [AppsMetricsRequest](#AppsMetricsRequest)
|
- input: [AppsMetricsRequest](#AppsMetricsRequest)
|
||||||
- output: [AppsMetrics](#AppsMetrics)
|
- output: [AppsMetrics](#AppsMetrics)
|
||||||
|
|
||||||
|
- GetBundleMetrics
|
||||||
|
- auth type: __Metrics__
|
||||||
|
- http method: __post__
|
||||||
|
- http route: __/api/reports/bundle__
|
||||||
|
- input: [LatestBundleMetricReq](#LatestBundleMetricReq)
|
||||||
|
- output: [BundleMetrics](#BundleMetrics)
|
||||||
|
|
||||||
- GetDebitAuthorizations
|
- GetDebitAuthorizations
|
||||||
- auth type: __User__
|
- auth type: __User__
|
||||||
- http method: __get__
|
- http method: __get__
|
||||||
|
|
@ -958,6 +970,17 @@ The nostr server will send back a message response, and inside the body there wi
|
||||||
- __nostr_pub__: _string_
|
- __nostr_pub__: _string_
|
||||||
- __user_identifier__: _string_
|
- __user_identifier__: _string_
|
||||||
|
|
||||||
|
### BundleData
|
||||||
|
- __available_chunks__: ARRAY of: _number_
|
||||||
|
- __base_64_data__: ARRAY of: _string_
|
||||||
|
- __current_chunk__: _number_
|
||||||
|
|
||||||
|
### BundleMetric
|
||||||
|
- __app_bundles__: MAP with key: _string_ and value: _[BundleData](#BundleData)_
|
||||||
|
|
||||||
|
### BundleMetrics
|
||||||
|
- __apps__: MAP with key: _string_ and value: _[BundleMetric](#BundleMetric)_
|
||||||
|
|
||||||
### CallbackUrl
|
### CallbackUrl
|
||||||
- __url__: _string_
|
- __url__: _string_
|
||||||
|
|
||||||
|
|
@ -1107,6 +1130,9 @@ The nostr server will send back a message response, and inside the body there wi
|
||||||
- __token__: _string_
|
- __token__: _string_
|
||||||
- __url__: _string_
|
- __url__: _string_
|
||||||
|
|
||||||
|
### LatestBundleMetricReq
|
||||||
|
- __limit__: _number_ *this field is optional
|
||||||
|
|
||||||
### LatestUsageMetricReq
|
### LatestUsageMetricReq
|
||||||
- __limit__: _number_ *this field is optional
|
- __limit__: _number_ *this field is optional
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -77,6 +77,7 @@ type Client struct {
|
||||||
GetAppUser func(req GetAppUserRequest) (*AppUser, error)
|
GetAppUser func(req GetAppUserRequest) (*AppUser, error)
|
||||||
GetAppUserLNURLInfo func(req GetAppUserLNURLInfoRequest) (*LnurlPayInfoResponse, error)
|
GetAppUserLNURLInfo func(req GetAppUserLNURLInfoRequest) (*LnurlPayInfoResponse, error)
|
||||||
GetAppsMetrics func(req AppsMetricsRequest) (*AppsMetrics, error)
|
GetAppsMetrics func(req AppsMetricsRequest) (*AppsMetrics, error)
|
||||||
|
GetBundleMetrics func(req LatestBundleMetricReq) (*BundleMetrics, error)
|
||||||
GetDebitAuthorizations func() (*DebitAuthorizations, error)
|
GetDebitAuthorizations func() (*DebitAuthorizations, error)
|
||||||
GetErrorStats func() (*ErrorStats, error)
|
GetErrorStats func() (*ErrorStats, error)
|
||||||
GetHttpCreds func() (*HttpCreds, error)
|
GetHttpCreds func() (*HttpCreds, error)
|
||||||
|
|
@ -740,6 +741,35 @@ func NewClient(params ClientParams) *Client {
|
||||||
}
|
}
|
||||||
return &res, nil
|
return &res, nil
|
||||||
},
|
},
|
||||||
|
GetBundleMetrics: func(req LatestBundleMetricReq) (*BundleMetrics, error) {
|
||||||
|
auth, err := params.RetrieveMetricsAuth()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
finalRoute := "/api/reports/bundle"
|
||||||
|
body, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resBody, err := doPostRequest(params.BaseURL+finalRoute, body, auth)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result := ResultError{}
|
||||||
|
err = json.Unmarshal(resBody, &result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if result.Status == "ERROR" {
|
||||||
|
return nil, fmt.Errorf(result.Reason)
|
||||||
|
}
|
||||||
|
res := BundleMetrics{}
|
||||||
|
err = json.Unmarshal(resBody, &res)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &res, nil
|
||||||
|
},
|
||||||
GetDebitAuthorizations: func() (*DebitAuthorizations, error) {
|
GetDebitAuthorizations: func() (*DebitAuthorizations, error) {
|
||||||
auth, err := params.RetrieveUserAuth()
|
auth, err := params.RetrieveUserAuth()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -174,6 +174,17 @@ type BannedAppUser struct {
|
||||||
Nostr_pub string `json:"nostr_pub"`
|
Nostr_pub string `json:"nostr_pub"`
|
||||||
User_identifier string `json:"user_identifier"`
|
User_identifier string `json:"user_identifier"`
|
||||||
}
|
}
|
||||||
|
type BundleData struct {
|
||||||
|
Available_chunks []int64 `json:"available_chunks"`
|
||||||
|
Base_64_data []string `json:"base_64_data"`
|
||||||
|
Current_chunk int64 `json:"current_chunk"`
|
||||||
|
}
|
||||||
|
type BundleMetric struct {
|
||||||
|
App_bundles map[string]BundleData `json:"app_bundles"`
|
||||||
|
}
|
||||||
|
type BundleMetrics struct {
|
||||||
|
Apps map[string]BundleMetric `json:"apps"`
|
||||||
|
}
|
||||||
type CallbackUrl struct {
|
type CallbackUrl struct {
|
||||||
Url string `json:"url"`
|
Url string `json:"url"`
|
||||||
}
|
}
|
||||||
|
|
@ -323,6 +334,9 @@ type HttpCreds struct {
|
||||||
Token string `json:"token"`
|
Token string `json:"token"`
|
||||||
Url string `json:"url"`
|
Url string `json:"url"`
|
||||||
}
|
}
|
||||||
|
type LatestBundleMetricReq struct {
|
||||||
|
Limit int64 `json:"limit"`
|
||||||
|
}
|
||||||
type LatestUsageMetricReq struct {
|
type LatestUsageMetricReq struct {
|
||||||
Limit int64 `json:"limit"`
|
Limit int64 `json:"limit"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -866,6 +866,28 @@ export default (methods: Types.ServerMethods, opts: ServerOptions) => {
|
||||||
opts.metricsCallback([{ ...info, ...stats, ...authContext }])
|
opts.metricsCallback([{ ...info, ...stats, ...authContext }])
|
||||||
} catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e }
|
} catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e }
|
||||||
})
|
})
|
||||||
|
if (!opts.allowNotImplementedMethods && !methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented')
|
||||||
|
app.post('/api/reports/bundle', async (req, res) => {
|
||||||
|
const info: Types.RequestInfo = { rpcName: 'GetBundleMetrics', batch: false, nostr: false, batchSize: 0}
|
||||||
|
const stats: Types.RequestStats = { startMs:req.startTimeMs || 0, start:req.startTime || 0n, parse: process.hrtime.bigint(), guard: 0n, validate: 0n, handle: 0n }
|
||||||
|
let authCtx: Types.AuthContext = {}
|
||||||
|
try {
|
||||||
|
if (!methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented')
|
||||||
|
const authContext = await opts.MetricsAuthGuard(req.headers['authorization'])
|
||||||
|
authCtx = authContext
|
||||||
|
stats.guard = process.hrtime.bigint()
|
||||||
|
const request = req.body
|
||||||
|
const error = Types.LatestBundleMetricReqValidate(request)
|
||||||
|
stats.validate = process.hrtime.bigint()
|
||||||
|
if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authContext }, opts.metricsCallback)
|
||||||
|
const query = req.query
|
||||||
|
const params = req.params
|
||||||
|
const response = await methods.GetBundleMetrics({rpcName:'GetBundleMetrics', ctx:authContext , req: request})
|
||||||
|
stats.handle = process.hrtime.bigint()
|
||||||
|
res.json({status: 'OK', ...response})
|
||||||
|
opts.metricsCallback([{ ...info, ...stats, ...authContext }])
|
||||||
|
} catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e }
|
||||||
|
})
|
||||||
if (!opts.allowNotImplementedMethods && !methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented')
|
if (!opts.allowNotImplementedMethods && !methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented')
|
||||||
app.get('/api/user/debit/get', async (req, res) => {
|
app.get('/api/user/debit/get', async (req, res) => {
|
||||||
const info: Types.RequestInfo = { rpcName: 'GetDebitAuthorizations', batch: false, nostr: false, batchSize: 0}
|
const info: Types.RequestInfo = { rpcName: 'GetDebitAuthorizations', batch: false, nostr: false, batchSize: 0}
|
||||||
|
|
|
||||||
|
|
@ -318,6 +318,20 @@ export default (params: ClientParams) => ({
|
||||||
}
|
}
|
||||||
return { status: 'ERROR', reason: 'invalid response' }
|
return { status: 'ERROR', reason: 'invalid response' }
|
||||||
},
|
},
|
||||||
|
GetBundleMetrics: async (request: Types.LatestBundleMetricReq): Promise<ResultError | ({ status: 'OK' }& Types.BundleMetrics)> => {
|
||||||
|
const auth = await params.retrieveMetricsAuth()
|
||||||
|
if (auth === null) throw new Error('retrieveMetricsAuth() returned null')
|
||||||
|
let finalRoute = '/api/reports/bundle'
|
||||||
|
const { data } = await axios.post(params.baseUrl + finalRoute, request, { headers: { 'authorization': auth } })
|
||||||
|
if (data.status === 'ERROR' && typeof data.reason === 'string') return data
|
||||||
|
if (data.status === 'OK') {
|
||||||
|
const result = data
|
||||||
|
if(!params.checkResult) return { status: 'OK', ...result }
|
||||||
|
const error = Types.BundleMetricsValidate(result)
|
||||||
|
if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message }
|
||||||
|
}
|
||||||
|
return { status: 'ERROR', reason: 'invalid response' }
|
||||||
|
},
|
||||||
GetDebitAuthorizations: async (): Promise<ResultError | ({ status: 'OK' }& Types.DebitAuthorizations)> => {
|
GetDebitAuthorizations: async (): Promise<ResultError | ({ status: 'OK' }& Types.DebitAuthorizations)> => {
|
||||||
const auth = await params.retrieveUserAuth()
|
const auth = await params.retrieveUserAuth()
|
||||||
if (auth === null) throw new Error('retrieveUserAuth() returned null')
|
if (auth === null) throw new Error('retrieveUserAuth() returned null')
|
||||||
|
|
|
||||||
|
|
@ -233,6 +233,21 @@ export default (params: NostrClientParams, send: (to:string, message: NostrRequ
|
||||||
}
|
}
|
||||||
return { status: 'ERROR', reason: 'invalid response' }
|
return { status: 'ERROR', reason: 'invalid response' }
|
||||||
},
|
},
|
||||||
|
GetBundleMetrics: async (request: Types.LatestBundleMetricReq): Promise<ResultError | ({ status: 'OK' }& Types.BundleMetrics)> => {
|
||||||
|
const auth = await params.retrieveNostrMetricsAuth()
|
||||||
|
if (auth === null) throw new Error('retrieveNostrMetricsAuth() returned null')
|
||||||
|
const nostrRequest: NostrRequest = {}
|
||||||
|
nostrRequest.body = request
|
||||||
|
const data = await send(params.pubDestination, {rpcName:'GetBundleMetrics',authIdentifier:auth, ...nostrRequest })
|
||||||
|
if (data.status === 'ERROR' && typeof data.reason === 'string') return data
|
||||||
|
if (data.status === 'OK') {
|
||||||
|
const result = data
|
||||||
|
if(!params.checkResult) return { status: 'OK', ...result }
|
||||||
|
const error = Types.BundleMetricsValidate(result)
|
||||||
|
if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message }
|
||||||
|
}
|
||||||
|
return { status: 'ERROR', reason: 'invalid response' }
|
||||||
|
},
|
||||||
GetDebitAuthorizations: async (): Promise<ResultError | ({ status: 'OK' }& Types.DebitAuthorizations)> => {
|
GetDebitAuthorizations: async (): Promise<ResultError | ({ status: 'OK' }& Types.DebitAuthorizations)> => {
|
||||||
const auth = await params.retrieveNostrUserAuth()
|
const auth = await params.retrieveNostrUserAuth()
|
||||||
if (auth === null) throw new Error('retrieveNostrUserAuth() returned null')
|
if (auth === null) throw new Error('retrieveNostrUserAuth() returned null')
|
||||||
|
|
|
||||||
|
|
@ -621,6 +621,22 @@ export default (methods: Types.ServerMethods, opts: NostrOptions) => {
|
||||||
opts.metricsCallback([{ ...info, ...stats, ...authContext }])
|
opts.metricsCallback([{ ...info, ...stats, ...authContext }])
|
||||||
}catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e }
|
}catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e }
|
||||||
break
|
break
|
||||||
|
case 'GetBundleMetrics':
|
||||||
|
try {
|
||||||
|
if (!methods.GetBundleMetrics) throw new Error('method: GetBundleMetrics is not implemented')
|
||||||
|
const authContext = await opts.NostrMetricsAuthGuard(req.appId, req.authIdentifier)
|
||||||
|
stats.guard = process.hrtime.bigint()
|
||||||
|
authCtx = authContext
|
||||||
|
const request = req.body
|
||||||
|
const error = Types.LatestBundleMetricReqValidate(request)
|
||||||
|
stats.validate = process.hrtime.bigint()
|
||||||
|
if (error !== null) return logErrorAndReturnResponse(error, 'invalid request body', res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback)
|
||||||
|
const response = await methods.GetBundleMetrics({rpcName:'GetBundleMetrics', ctx:authContext , req: request})
|
||||||
|
stats.handle = process.hrtime.bigint()
|
||||||
|
res({status: 'OK', ...response})
|
||||||
|
opts.metricsCallback([{ ...info, ...stats, ...authContext }])
|
||||||
|
}catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e }
|
||||||
|
break
|
||||||
case 'GetDebitAuthorizations':
|
case 'GetDebitAuthorizations':
|
||||||
try {
|
try {
|
||||||
if (!methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented')
|
if (!methods.GetDebitAuthorizations) throw new Error('method: GetDebitAuthorizations is not implemented')
|
||||||
|
|
|
||||||
|
|
@ -28,8 +28,8 @@ export type MetricsContext = {
|
||||||
app_id: string
|
app_id: string
|
||||||
operator_id: string
|
operator_id: string
|
||||||
}
|
}
|
||||||
export type MetricsMethodInputs = GetAppsMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input
|
export type MetricsMethodInputs = GetAppsMetrics_Input | GetBundleMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | SubmitWebRtcMessage_Input
|
||||||
export type MetricsMethodOutputs = GetAppsMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output
|
export type MetricsMethodOutputs = GetAppsMetrics_Output | GetBundleMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | SubmitWebRtcMessage_Output
|
||||||
export type UserContext = {
|
export type UserContext = {
|
||||||
app_id: string
|
app_id: string
|
||||||
app_user_id: string
|
app_user_id: string
|
||||||
|
|
@ -108,6 +108,9 @@ export type GetAppUserLNURLInfo_Output = ResultError | ({ status: 'OK' } & Lnurl
|
||||||
export type GetAppsMetrics_Input = {rpcName:'GetAppsMetrics', req: AppsMetricsRequest}
|
export type GetAppsMetrics_Input = {rpcName:'GetAppsMetrics', req: AppsMetricsRequest}
|
||||||
export type GetAppsMetrics_Output = ResultError | ({ status: 'OK' } & AppsMetrics)
|
export type GetAppsMetrics_Output = ResultError | ({ status: 'OK' } & AppsMetrics)
|
||||||
|
|
||||||
|
export type GetBundleMetrics_Input = {rpcName:'GetBundleMetrics', req: LatestBundleMetricReq}
|
||||||
|
export type GetBundleMetrics_Output = ResultError | ({ status: 'OK' } & BundleMetrics)
|
||||||
|
|
||||||
export type GetDebitAuthorizations_Input = {rpcName:'GetDebitAuthorizations'}
|
export type GetDebitAuthorizations_Input = {rpcName:'GetDebitAuthorizations'}
|
||||||
export type GetDebitAuthorizations_Output = ResultError | ({ status: 'OK' } & DebitAuthorizations)
|
export type GetDebitAuthorizations_Output = ResultError | ({ status: 'OK' } & DebitAuthorizations)
|
||||||
|
|
||||||
|
|
@ -312,6 +315,7 @@ export type ServerMethods = {
|
||||||
GetAppUser?: (req: GetAppUser_Input & {ctx: AppContext }) => Promise<AppUser>
|
GetAppUser?: (req: GetAppUser_Input & {ctx: AppContext }) => Promise<AppUser>
|
||||||
GetAppUserLNURLInfo?: (req: GetAppUserLNURLInfo_Input & {ctx: AppContext }) => Promise<LnurlPayInfoResponse>
|
GetAppUserLNURLInfo?: (req: GetAppUserLNURLInfo_Input & {ctx: AppContext }) => Promise<LnurlPayInfoResponse>
|
||||||
GetAppsMetrics?: (req: GetAppsMetrics_Input & {ctx: MetricsContext }) => Promise<AppsMetrics>
|
GetAppsMetrics?: (req: GetAppsMetrics_Input & {ctx: MetricsContext }) => Promise<AppsMetrics>
|
||||||
|
GetBundleMetrics?: (req: GetBundleMetrics_Input & {ctx: MetricsContext }) => Promise<BundleMetrics>
|
||||||
GetDebitAuthorizations?: (req: GetDebitAuthorizations_Input & {ctx: UserContext }) => Promise<DebitAuthorizations>
|
GetDebitAuthorizations?: (req: GetDebitAuthorizations_Input & {ctx: UserContext }) => Promise<DebitAuthorizations>
|
||||||
GetErrorStats?: (req: GetErrorStats_Input & {ctx: MetricsContext }) => Promise<ErrorStats>
|
GetErrorStats?: (req: GetErrorStats_Input & {ctx: MetricsContext }) => Promise<ErrorStats>
|
||||||
GetHttpCreds?: (req: GetHttpCreds_Input & {ctx: UserContext }) => Promise<void>
|
GetHttpCreds?: (req: GetHttpCreds_Input & {ctx: UserContext }) => Promise<void>
|
||||||
|
|
@ -924,6 +928,84 @@ export const BannedAppUserValidate = (o?: BannedAppUser, opts: BannedAppUserOpti
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type BundleData = {
|
||||||
|
available_chunks: number[]
|
||||||
|
base_64_data: string[]
|
||||||
|
current_chunk: number
|
||||||
|
}
|
||||||
|
export const BundleDataOptionalFields: [] = []
|
||||||
|
export type BundleDataOptions = OptionsBaseMessage & {
|
||||||
|
checkOptionalsAreSet?: []
|
||||||
|
available_chunks_CustomCheck?: (v: number[]) => boolean
|
||||||
|
base_64_data_CustomCheck?: (v: string[]) => boolean
|
||||||
|
current_chunk_CustomCheck?: (v: number) => boolean
|
||||||
|
}
|
||||||
|
export const BundleDataValidate = (o?: BundleData, opts: BundleDataOptions = {}, path: string = 'BundleData::root.'): Error | null => {
|
||||||
|
if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message')
|
||||||
|
if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null')
|
||||||
|
|
||||||
|
if (!Array.isArray(o.available_chunks)) return new Error(`${path}.available_chunks: is not an array`)
|
||||||
|
for (let index = 0; index < o.available_chunks.length; index++) {
|
||||||
|
if (typeof o.available_chunks[index] !== 'number') return new Error(`${path}.available_chunks[${index}]: is not a number`)
|
||||||
|
}
|
||||||
|
if (opts.available_chunks_CustomCheck && !opts.available_chunks_CustomCheck(o.available_chunks)) return new Error(`${path}.available_chunks: custom check failed`)
|
||||||
|
|
||||||
|
if (!Array.isArray(o.base_64_data)) return new Error(`${path}.base_64_data: is not an array`)
|
||||||
|
for (let index = 0; index < o.base_64_data.length; index++) {
|
||||||
|
if (typeof o.base_64_data[index] !== 'string') return new Error(`${path}.base_64_data[${index}]: is not a string`)
|
||||||
|
}
|
||||||
|
if (opts.base_64_data_CustomCheck && !opts.base_64_data_CustomCheck(o.base_64_data)) return new Error(`${path}.base_64_data: custom check failed`)
|
||||||
|
|
||||||
|
if (typeof o.current_chunk !== 'number') return new Error(`${path}.current_chunk: is not a number`)
|
||||||
|
if (opts.current_chunk_CustomCheck && !opts.current_chunk_CustomCheck(o.current_chunk)) return new Error(`${path}.current_chunk: custom check failed`)
|
||||||
|
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
|
export type BundleMetric = {
|
||||||
|
app_bundles: Record<string, BundleData>
|
||||||
|
}
|
||||||
|
export const BundleMetricOptionalFields: [] = []
|
||||||
|
export type BundleMetricOptions = OptionsBaseMessage & {
|
||||||
|
checkOptionalsAreSet?: []
|
||||||
|
app_bundles_EntryOptions?: BundleDataOptions
|
||||||
|
app_bundles_CustomCheck?: (v: Record<string, BundleData>) => boolean
|
||||||
|
}
|
||||||
|
export const BundleMetricValidate = (o?: BundleMetric, opts: BundleMetricOptions = {}, path: string = 'BundleMetric::root.'): Error | null => {
|
||||||
|
if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message')
|
||||||
|
if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null')
|
||||||
|
|
||||||
|
if (typeof o.app_bundles !== 'object' || o.app_bundles === null) return new Error(`${path}.app_bundles: is not an object or is null`)
|
||||||
|
for (const key in o.app_bundles) {
|
||||||
|
const app_bundlesErr = BundleDataValidate(o.app_bundles[key], opts.app_bundles_EntryOptions, `${path}.app_bundles['${key}']`)
|
||||||
|
if (app_bundlesErr !== null) return app_bundlesErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
|
export type BundleMetrics = {
|
||||||
|
apps: Record<string, BundleMetric>
|
||||||
|
}
|
||||||
|
export const BundleMetricsOptionalFields: [] = []
|
||||||
|
export type BundleMetricsOptions = OptionsBaseMessage & {
|
||||||
|
checkOptionalsAreSet?: []
|
||||||
|
apps_EntryOptions?: BundleMetricOptions
|
||||||
|
apps_CustomCheck?: (v: Record<string, BundleMetric>) => boolean
|
||||||
|
}
|
||||||
|
export const BundleMetricsValidate = (o?: BundleMetrics, opts: BundleMetricsOptions = {}, path: string = 'BundleMetrics::root.'): Error | null => {
|
||||||
|
if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message')
|
||||||
|
if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null')
|
||||||
|
|
||||||
|
if (typeof o.apps !== 'object' || o.apps === null) return new Error(`${path}.apps: is not an object or is null`)
|
||||||
|
for (const key in o.apps) {
|
||||||
|
const appsErr = BundleMetricValidate(o.apps[key], opts.apps_EntryOptions, `${path}.apps['${key}']`)
|
||||||
|
if (appsErr !== null) return appsErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
export type CallbackUrl = {
|
export type CallbackUrl = {
|
||||||
url: string
|
url: string
|
||||||
}
|
}
|
||||||
|
|
@ -1812,6 +1894,25 @@ export const HttpCredsValidate = (o?: HttpCreds, opts: HttpCredsOptions = {}, pa
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type LatestBundleMetricReq = {
|
||||||
|
limit?: number
|
||||||
|
}
|
||||||
|
export type LatestBundleMetricReqOptionalField = 'limit'
|
||||||
|
export const LatestBundleMetricReqOptionalFields: LatestBundleMetricReqOptionalField[] = ['limit']
|
||||||
|
export type LatestBundleMetricReqOptions = OptionsBaseMessage & {
|
||||||
|
checkOptionalsAreSet?: LatestBundleMetricReqOptionalField[]
|
||||||
|
limit_CustomCheck?: (v?: number) => boolean
|
||||||
|
}
|
||||||
|
export const LatestBundleMetricReqValidate = (o?: LatestBundleMetricReq, opts: LatestBundleMetricReqOptions = {}, path: string = 'LatestBundleMetricReq::root.'): Error | null => {
|
||||||
|
if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message')
|
||||||
|
if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null')
|
||||||
|
|
||||||
|
if ((o.limit || opts.allOptionalsAreSet || opts.checkOptionalsAreSet?.includes('limit')) && typeof o.limit !== 'number') return new Error(`${path}.limit: is not a number`)
|
||||||
|
if (opts.limit_CustomCheck && !opts.limit_CustomCheck(o.limit)) return new Error(`${path}.limit: custom check failed`)
|
||||||
|
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
export type LatestUsageMetricReq = {
|
export type LatestUsageMetricReq = {
|
||||||
limit?: number
|
limit?: number
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -181,6 +181,14 @@ service LightningPub {
|
||||||
option (http_route) = "/api/reports/usage";
|
option (http_route) = "/api/reports/usage";
|
||||||
option (nostr) = true;
|
option (nostr) = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpc GetBundleMetrics(structs.LatestBundleMetricReq) returns (structs.BundleMetrics) {
|
||||||
|
option (auth_type) = "Metrics";
|
||||||
|
option (http_method) = "post";
|
||||||
|
option (http_route) = "/api/reports/bundle";
|
||||||
|
option (nostr) = true;
|
||||||
|
}
|
||||||
|
|
||||||
rpc GetSingleUsageMetrics(structs.SingleUsageMetricReq) returns (structs.UsageMetricTlv) {
|
rpc GetSingleUsageMetrics(structs.SingleUsageMetricReq) returns (structs.UsageMetricTlv) {
|
||||||
option (auth_type) = "Metrics";
|
option (auth_type) = "Metrics";
|
||||||
option (http_method) = "post";
|
option (http_method) = "post";
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,8 @@ message UsageMetric {
|
||||||
optional string app_id = 11;
|
optional string app_id = 11;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
message UsageMetricTlv {
|
message UsageMetricTlv {
|
||||||
repeated string base_64_tlvs = 1;
|
repeated string base_64_tlvs = 1;
|
||||||
int64 current_chunk = 2;
|
int64 current_chunk = 2;
|
||||||
|
|
@ -77,6 +79,24 @@ message UsageMetrics {
|
||||||
map<string,AppUsageMetrics> apps = 1;
|
map<string,AppUsageMetrics> apps = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message LatestBundleMetricReq {
|
||||||
|
optional int64 limit = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message BundleData {
|
||||||
|
repeated string base_64_data = 1;
|
||||||
|
int64 current_chunk = 2;
|
||||||
|
repeated int64 available_chunks = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message BundleMetric {
|
||||||
|
map<string, BundleData> app_bundles = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message BundleMetrics {
|
||||||
|
map<string, BundleMetric> apps = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message AppsMetricsRequest {
|
message AppsMetricsRequest {
|
||||||
optional int64 from_unix = 1;
|
optional int64 from_unix = 1;
|
||||||
optional int64 to_unix = 2;
|
optional int64 to_unix = 2;
|
||||||
|
|
|
||||||
19
src/index.ts
19
src/index.ts
|
|
@ -24,10 +24,11 @@ const start = async () => {
|
||||||
const serverMethods = GetServerMethods(mainHandler)
|
const serverMethods = GetServerMethods(mainHandler)
|
||||||
const nostrSettings = LoadNosrtSettingsFromEnv()
|
const nostrSettings = LoadNosrtSettingsFromEnv()
|
||||||
log("initializing nostr middleware")
|
log("initializing nostr middleware")
|
||||||
const { Send } = nostrMiddleware(serverMethods, mainHandler,
|
const { Send, Stop } = nostrMiddleware(serverMethods, mainHandler,
|
||||||
{ ...nostrSettings, apps, clients: [liquidityProviderInfo] },
|
{ ...nostrSettings, apps, clients: [liquidityProviderInfo] },
|
||||||
(e, p) => mainHandler.liquidityProvider.onEvent(e, p)
|
(e, p) => mainHandler.liquidityProvider.onEvent(e, p)
|
||||||
)
|
)
|
||||||
|
exitHandler(() => { Stop() })
|
||||||
log("starting server")
|
log("starting server")
|
||||||
mainHandler.attachNostrSend(Send)
|
mainHandler.attachNostrSend(Send)
|
||||||
mainHandler.StartBeacons()
|
mainHandler.StartBeacons()
|
||||||
|
|
@ -40,3 +41,19 @@ const start = async () => {
|
||||||
Server.Listen(mainSettings.servicePort)
|
Server.Listen(mainSettings.servicePort)
|
||||||
}
|
}
|
||||||
start()
|
start()
|
||||||
|
|
||||||
|
const exitHandler = async (kill: () => void) => {
|
||||||
|
// catch ctrl+c event and exit normally
|
||||||
|
process.on('SIGINT', () => {
|
||||||
|
console.log('Ctrl-C detected, exiting safely...');
|
||||||
|
process.exit(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
//catch uncaught exceptions, trace, then exit normally
|
||||||
|
process.on('uncaughtException', (e) => {
|
||||||
|
console.log('Uncaught Exception detected, exiting safely, and killing all child processes...');
|
||||||
|
console.log(e.stack);
|
||||||
|
kill();
|
||||||
|
process.exit(99);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
@ -32,6 +32,8 @@ export const decodeListTLV = (tlv: TLV): Uint8Array[] => {
|
||||||
return tlv[64]
|
return tlv[64]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
export const usageMetricsToTlv = (metric: Types.UsageMetric): TLV => {
|
export const usageMetricsToTlv = (metric: Types.UsageMetric): TLV => {
|
||||||
const tlv: TLV = {}
|
const tlv: TLV = {}
|
||||||
tlv[2] = [integerToUint8Array(Math.ceil(metric.processed_at_ms / 1000))] // 6 -> 6
|
tlv[2] = [integerToUint8Array(Math.ceil(metric.processed_at_ms / 1000))] // 6 -> 6
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,6 @@ export class Utils {
|
||||||
settings: MainSettings
|
settings: MainSettings
|
||||||
constructor(settings: MainSettings) {
|
constructor(settings: MainSettings) {
|
||||||
this.settings = settings
|
this.settings = settings
|
||||||
this.stateBundler = new StateBundler()
|
this.stateBundler = new StateBundler(settings.storageSettings)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -188,9 +188,9 @@ export default class {
|
||||||
const operationId = `${Types.UserOperationType.INCOMING_TX}-${addedTx.serial_id}`
|
const operationId = `${Types.UserOperationType.INCOMING_TX}-${addedTx.serial_id}`
|
||||||
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: fee, confirmed: internal, tx_hash: txOutput.hash, internal: false }
|
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: fee, confirmed: internal, tx_hash: txOutput.hash, internal: false }
|
||||||
this.sendOperationToNostr(userAddress.linkedApplication, userAddress.user.user_id, op)
|
this.sendOperationToNostr(userAddress.linkedApplication, userAddress.user.user_id, op)
|
||||||
this.utils.stateBundler.AddTxPoint('addressWasPaid', amount, { used, from: 'system', timeDiscount: true })
|
this.utils.stateBundler.AddTxPoint('addressWasPaid', amount, { used, from: 'system', timeDiscount: true }, userAddress.linkedApplication.app_id)
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
this.utils.stateBundler.AddTxPointFailed('addressWasPaid', amount, { used, from: 'system' })
|
this.utils.stateBundler.AddTxPointFailed('addressWasPaid', amount, { used, from: 'system' }, userAddress.linkedApplication.app_id)
|
||||||
log(ERROR, "cannot process address paid transaction, already registered")
|
log(ERROR, "cannot process address paid transaction, already registered")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
@ -234,9 +234,9 @@ export default class {
|
||||||
log(ERROR, "cannot create zap receipt", err.message || "")
|
log(ERROR, "cannot create zap receipt", err.message || "")
|
||||||
}
|
}
|
||||||
this.liquidityManager.afterInInvoicePaid()
|
this.liquidityManager.afterInInvoicePaid()
|
||||||
this.utils.stateBundler.AddTxPoint('invoiceWasPaid', amount, { used, from: 'system', timeDiscount: true })
|
this.utils.stateBundler.AddTxPoint('invoiceWasPaid', amount, { used, from: 'system', timeDiscount: true }, userInvoice.linkedApplication.app_id)
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
this.utils.stateBundler.AddTxPointFailed('invoiceWasPaid', amount, { used, from: 'system' })
|
this.utils.stateBundler.AddTxPointFailed('invoiceWasPaid', amount, { used, from: 'system' }, userInvoice.linkedApplication.app_id)
|
||||||
log(ERROR, "cannot process paid invoice", err.message || "")
|
log(ERROR, "cannot process paid invoice", err.message || "")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -355,11 +355,11 @@ export default class {
|
||||||
try {
|
try {
|
||||||
await this.invoicePaidCb(internalInvoice.invoice, payAmount, 'internal')
|
await this.invoicePaidCb(internalInvoice.invoice, payAmount, 'internal')
|
||||||
const newPayment = await this.storage.paymentStorage.AddInternalPayment(userId, internalInvoice.invoice, payAmount, serviceFee, linkedApplication, debitNpub)
|
const newPayment = await this.storage.paymentStorage.AddInternalPayment(userId, internalInvoice.invoice, payAmount, serviceFee, linkedApplication, debitNpub)
|
||||||
this.utils.stateBundler.AddTxPoint('paidAnInvoice', payAmount, { used: 'internal', from: 'user' })
|
this.utils.stateBundler.AddTxPoint('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }, linkedApplication.app_id)
|
||||||
return { preimage: "", amtPaid: payAmount, networkFee: 0, serialId: newPayment.serial_id }
|
return { preimage: "", amtPaid: payAmount, networkFee: 0, serialId: newPayment.serial_id }
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement, "internal_payment_refund:" + internalInvoice.invoice)
|
await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement, "internal_payment_refund:" + internalInvoice.invoice)
|
||||||
this.utils.stateBundler.AddTxPointFailed('paidAnInvoice', payAmount, { used: 'internal', from: 'user' })
|
this.utils.stateBundler.AddTxPointFailed('paidAnInvoice', payAmount, { used: 'internal', from: 'user' }, linkedApplication.app_id)
|
||||||
|
|
||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,9 @@ export default (mainHandler: Main): Types.ServerMethods => {
|
||||||
GetUsageMetrics: async ({ ctx, req }) => {
|
GetUsageMetrics: async ({ ctx, req }) => {
|
||||||
return mainHandler.metricsManager.GetUsageMetrics(req)
|
return mainHandler.metricsManager.GetUsageMetrics(req)
|
||||||
},
|
},
|
||||||
|
GetBundleMetrics: async ({ ctx, req }) => {
|
||||||
|
return mainHandler.utils.stateBundler.GetBundleMetrics(req)
|
||||||
|
},
|
||||||
GetSingleUsageMetrics: async ({ ctx, req }) => {
|
GetSingleUsageMetrics: async ({ ctx, req }) => {
|
||||||
return mainHandler.metricsManager.GetSingleUsageMetrics(req)
|
return mainHandler.metricsManager.GetSingleUsageMetrics(req)
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import MetricsEventStorage from "./metricsEventStorage.js";
|
||||||
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
||||||
import EventsLogManager from "./eventsLog.js";
|
import EventsLogManager from "./eventsLog.js";
|
||||||
import { LiquidityStorage } from "./liquidityStorage.js";
|
import { LiquidityStorage } from "./liquidityStorage.js";
|
||||||
import { StateBundler } from "./stateBundler.js";
|
|
||||||
import DebitStorage from "./debitStorage.js"
|
import DebitStorage from "./debitStorage.js"
|
||||||
import OfferStorage from "./offerStorage.js"
|
import OfferStorage from "./offerStorage.js"
|
||||||
export type StorageSettings = {
|
export type StorageSettings = {
|
||||||
|
|
@ -35,7 +34,6 @@ export default class {
|
||||||
debitStorage: DebitStorage
|
debitStorage: DebitStorage
|
||||||
offerStorage: OfferStorage
|
offerStorage: OfferStorage
|
||||||
eventsLog: EventsLogManager
|
eventsLog: EventsLogManager
|
||||||
stateBundler: StateBundler
|
|
||||||
constructor(settings: StorageSettings) {
|
constructor(settings: StorageSettings) {
|
||||||
this.settings = settings
|
this.settings = settings
|
||||||
this.eventsLog = new EventsLogManager(settings.eventLogPath)
|
this.eventsLog = new EventsLogManager(settings.eventLogPath)
|
||||||
|
|
|
||||||
|
|
@ -2,51 +2,31 @@ import fs from 'fs'
|
||||||
import * as Types from '../../../proto/autogenerated/ts/types.js'
|
import * as Types from '../../../proto/autogenerated/ts/types.js'
|
||||||
import { StorageSettings } from "./index.js";
|
import { StorageSettings } from "./index.js";
|
||||||
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js';
|
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js';
|
||||||
const chunkSizeBytes = 128 * 1024
|
import { TlvFilesStorage } from './tlvFilesStorage.js';
|
||||||
export default class {
|
export default class {
|
||||||
settings: StorageSettings
|
tlvStorage: TlvFilesStorage
|
||||||
metricsPath: string
|
|
||||||
cachePath: string
|
cachePath: string
|
||||||
metaReady = false
|
|
||||||
metricsMeta: Record<string, Record<string, { chunks: number[] }>> = {}
|
|
||||||
pendingMetrics: Record<string, Record<string, { tlvs: Uint8Array[] }>> = {}
|
|
||||||
last24hCache: { ts: number, ok: number, fail: number }[] = []
|
last24hCache: { ts: number, ok: number, fail: number }[] = []
|
||||||
lastPersistedMetrics: number = 0
|
|
||||||
lastPersistedCache: number = 0
|
lastPersistedCache: number = 0
|
||||||
constructor(settings: StorageSettings) {
|
constructor(settings: StorageSettings) {
|
||||||
this.settings = settings;
|
const metricsPath = [settings.dataDir, "metric_events"].join("/")
|
||||||
this.metricsPath = [settings.dataDir, "metric_events"].join("/")
|
this.tlvStorage = new TlvFilesStorage(metricsPath)
|
||||||
this.cachePath = [settings.dataDir, "metric_cache"].join("/")
|
this.cachePath = [settings.dataDir, "metric_cache"].join("/")
|
||||||
if (!fs.existsSync(this.cachePath)) {
|
if (!fs.existsSync(this.cachePath)) {
|
||||||
fs.mkdirSync(this.cachePath, { recursive: true });
|
fs.mkdirSync(this.cachePath, { recursive: true });
|
||||||
}
|
}
|
||||||
this.initMetricsMeta()
|
this.tlvStorage.initMeta()
|
||||||
this.loadCache()
|
this.loadCache()
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
if (Date.now() - this.lastPersistedMetrics > 1000 * 60 * 4) {
|
|
||||||
this.persistMetrics()
|
|
||||||
}
|
|
||||||
if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) {
|
if (Date.now() - this.lastPersistedCache > 1000 * 60 * 4) {
|
||||||
this.persistCache()
|
this.persistCache()
|
||||||
}
|
}
|
||||||
}, 1000 * 60 * 5)
|
}, 1000 * 60 * 5)
|
||||||
process.on('exit', () => {
|
process.on('exit', () => {
|
||||||
this.persistMetrics()
|
|
||||||
this.persistCache()
|
this.persistCache()
|
||||||
});
|
});
|
||||||
|
|
||||||
// catch ctrl+c event and exit normally
|
|
||||||
process.on('SIGINT', () => {
|
|
||||||
console.log('Ctrl-C...');
|
|
||||||
process.exit(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
//catch uncaught exceptions, trace, then exit normally
|
|
||||||
process.on('uncaughtException', (e) => {
|
|
||||||
console.log('Uncaught Exception...');
|
|
||||||
console.log(e.stack);
|
|
||||||
process.exit(99);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
getlast24hCache = () => { return this.last24hCache }
|
getlast24hCache = () => { return this.last24hCache }
|
||||||
|
|
@ -93,139 +73,35 @@ export default class {
|
||||||
}
|
}
|
||||||
|
|
||||||
AddMetricEvent = (appId: string, method: string, metric: Uint8Array, success: boolean) => {
|
AddMetricEvent = (appId: string, method: string, metric: Uint8Array, success: boolean) => {
|
||||||
if (!this.metaReady) {
|
this.tlvStorage.AddTlv(appId, method, metric)
|
||||||
throw new Error("meta metrics not ready")
|
|
||||||
}
|
|
||||||
if (!this.pendingMetrics[appId]) {
|
|
||||||
this.pendingMetrics[appId] = {}
|
|
||||||
}
|
|
||||||
if (!this.pendingMetrics[appId][method]) {
|
|
||||||
this.pendingMetrics[appId][method] = { tlvs: [] }
|
|
||||||
}
|
|
||||||
this.pendingMetrics[appId][method].tlvs.push(metric)
|
|
||||||
this.pushToCache(success)
|
this.pushToCache(success)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LoadLatestMetrics = async (limit = 100): Promise<Types.UsageMetrics> => {
|
LoadLatestMetrics = async (limit = 100): Promise<Types.UsageMetrics> => {
|
||||||
this.persistMetrics()
|
const raw = this.tlvStorage.LoadLatest(limit)
|
||||||
const metrics: Types.UsageMetrics = { apps: {} }
|
const metrics: Types.UsageMetrics = { apps: {} }
|
||||||
this.foreachMetricMethodFile((app, method, tlvFiles) => {
|
Object.keys(raw).forEach(app => {
|
||||||
if (tlvFiles.length === 0) { return }
|
|
||||||
const methodPath = [this.metricsPath, app, method].join("/")
|
|
||||||
const latest = tlvFiles[tlvFiles.length - 1]
|
|
||||||
const tlvFile = [methodPath, `${latest}.mtlv`].join("/")
|
|
||||||
const tlv = fs.readFileSync(tlvFile)
|
|
||||||
const decoded = decodeListTLV(parseTLV(tlv))
|
|
||||||
if (!metrics.apps[app]) {
|
|
||||||
metrics.apps[app] = { app_metrics: {} }
|
metrics.apps[app] = { app_metrics: {} }
|
||||||
}
|
Object.keys(raw[app]).forEach(method => {
|
||||||
if (decoded.length > limit) {
|
const data = raw[app][method]
|
||||||
decoded.splice(0, decoded.length - limit)
|
|
||||||
}
|
|
||||||
metrics.apps[app].app_metrics[method] = {
|
metrics.apps[app].app_metrics[method] = {
|
||||||
base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')),
|
base_64_tlvs: data.tlvs.map(d => Buffer.from(d).toString('base64')),
|
||||||
current_chunk: latest,
|
current_chunk: data.current_chunk,
|
||||||
available_chunks: tlvFiles
|
available_chunks: data.available_chunks
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
})
|
||||||
return metrics
|
return metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
LoadRawMetricsFile = async (app: string, method: string, chunk: number): Promise<Buffer> => {
|
|
||||||
if (!this.metaReady || !this.metricsMeta[app] || !this.metricsMeta[app][method] || !this.metricsMeta[app][method].chunks.includes(chunk)) {
|
|
||||||
throw new Error("metrics not found")
|
|
||||||
}
|
|
||||||
const fullPath = [this.metricsPath, app, method, `${chunk}.mtlv`].join("/")
|
|
||||||
return fs.readFileSync(fullPath)
|
|
||||||
}
|
|
||||||
LoadMetricsFile = async (app: string, method: string, chunk: number): Promise<Types.UsageMetricTlv> => {
|
LoadMetricsFile = async (app: string, method: string, chunk: number): Promise<Types.UsageMetricTlv> => {
|
||||||
const tlv = await this.LoadRawMetricsFile(app, method, chunk)
|
const { fileData, chunks } = this.tlvStorage.LoadFile(app, method, chunk)
|
||||||
const decoded = decodeListTLV(parseTLV(tlv))
|
//const tlv = await this.LoadRawMetricsFile(app, method, chunk)
|
||||||
|
const decoded = decodeListTLV(parseTLV(fileData))
|
||||||
return {
|
return {
|
||||||
base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')),
|
base_64_tlvs: decoded.map(d => Buffer.from(d).toString('base64')),
|
||||||
current_chunk: chunk,
|
current_chunk: chunk,
|
||||||
available_chunks: this.metricsMeta[app][method].chunks
|
available_chunks: chunks
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
persistMetrics = () => {
|
|
||||||
if (!this.metaReady) {
|
|
||||||
throw new Error("meta metrics not ready")
|
|
||||||
}
|
|
||||||
this.lastPersistedMetrics = Date.now()
|
|
||||||
const tosync = this.pendingMetrics
|
|
||||||
this.pendingMetrics = {}
|
|
||||||
const apps = Object.keys(tosync)
|
|
||||||
apps.map(app => {
|
|
||||||
const appPath = [this.metricsPath, app].join("/")
|
|
||||||
if (!fs.existsSync(appPath)) {
|
|
||||||
fs.mkdirSync(appPath, { recursive: true });
|
|
||||||
}
|
|
||||||
const methods = Object.keys(tosync[app])
|
|
||||||
methods.map(methodName => {
|
|
||||||
const methodPath = [appPath, methodName].join("/")
|
|
||||||
if (!fs.existsSync(methodPath)) {
|
|
||||||
fs.mkdirSync(methodPath, { recursive: true });
|
|
||||||
}
|
|
||||||
const method = tosync[app][methodName]
|
|
||||||
const meta = this.getMetricsMeta(app, methodName)
|
|
||||||
const chunks = meta.chunks.length > 0 ? meta.chunks : [0]
|
|
||||||
const latest = chunks[chunks.length - 1]
|
|
||||||
const tlv = encodeTLV(encodeListTLV(method.tlvs))
|
|
||||||
const tlvFile = [methodPath, `${latest}.mtlv`].join("/")
|
|
||||||
fs.appendFileSync(tlvFile, Buffer.from(tlv))
|
|
||||||
if (fs.lstatSync(tlvFile).size > chunkSizeBytes) {
|
|
||||||
this.updateMetricsMeta(app, methodName, [...chunks, latest + 1])
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
initMetricsMeta = () => {
|
|
||||||
this.foreachMetricMethodFile((app, method, tlvFiles) => {
|
|
||||||
this.updateMetricsMeta(app, method, tlvFiles)
|
|
||||||
})
|
|
||||||
this.metaReady = true
|
|
||||||
}
|
|
||||||
|
|
||||||
updateMetricsMeta = (appId: string, method: string, sortedChunks: number[]) => {
|
|
||||||
if (!this.metricsMeta[appId]) {
|
|
||||||
this.metricsMeta[appId] = {}
|
|
||||||
}
|
|
||||||
this.metricsMeta[appId][method] = { chunks: sortedChunks }
|
|
||||||
}
|
|
||||||
|
|
||||||
getMetricsMeta = (appId: string, method: string) => {
|
|
||||||
if (!this.metricsMeta[appId]) {
|
|
||||||
return { chunks: [] }
|
|
||||||
}
|
|
||||||
return this.metricsMeta[appId][method] || { chunks: [] }
|
|
||||||
}
|
|
||||||
|
|
||||||
foreachMetricMethodFile = (cb: (appId: string, method: string, tlvFiles: number[]) => void) => {
|
|
||||||
if (!fs.existsSync(this.metricsPath)) {
|
|
||||||
fs.mkdirSync(this.metricsPath, { recursive: true });
|
|
||||||
}
|
|
||||||
const apps = fs.readdirSync(this.metricsPath)
|
|
||||||
apps.forEach(appDir => {
|
|
||||||
const appPath = [this.metricsPath, appDir].join("/")
|
|
||||||
if (!fs.lstatSync(appPath).isDirectory()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
const methods = fs.readdirSync(appPath)
|
|
||||||
methods.forEach(methodDir => {
|
|
||||||
const methodPath = [appPath, methodDir].join("/")
|
|
||||||
if (!fs.lstatSync(methodPath).isDirectory()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
const tlvFiles = fs.readdirSync(methodPath)
|
|
||||||
.filter(f => f.endsWith(".mtlv"))
|
|
||||||
.map(f => +f.slice(0, -".mtlv".length))
|
|
||||||
.filter(n => !isNaN(n))
|
|
||||||
.sort((a, b) => a - b)
|
|
||||||
cb(appDir, methodDir, tlvFiles)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
@ -1,5 +1,9 @@
|
||||||
|
import { LatestBundleMetricReq } from "../../../proto/autogenerated/ts/types.js"
|
||||||
import { getLogger } from "../helpers/logger.js"
|
import { getLogger } from "../helpers/logger.js"
|
||||||
|
import { integerToUint8Array } from "../helpers/tlv.js"
|
||||||
|
import { StorageSettings } from "./index.js"
|
||||||
|
import { TlvFilesStorage } from "./tlvFilesStorage.js"
|
||||||
|
import * as Types from "../../../proto/autogenerated/ts/types.js"
|
||||||
const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const
|
const transactionStatePointTypes = ['addedInvoice', 'invoiceWasPaid', 'paidAnInvoice', 'addedAddress', 'addressWasPaid', 'paidAnAddress', 'user2user'] as const
|
||||||
const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const
|
const balanceStatePointTypes = ['providerBalance', 'providerMaxWithdrawable', 'walletBalance', 'channelBalance', 'usersBalance', 'feesPaidForLiquidity', 'totalLndBalance', 'accumulatedHtlcFees', 'deltaUsers', 'deltaExternal'] as const
|
||||||
const maxStatePointTypes = ['maxProviderRespTime'] as const
|
const maxStatePointTypes = ['maxProviderRespTime'] as const
|
||||||
|
|
@ -25,34 +29,31 @@ export type TxPointSettings = {
|
||||||
timeDiscount?: true
|
timeDiscount?: true
|
||||||
}
|
}
|
||||||
export class StateBundler {
|
export class StateBundler {
|
||||||
sinceStart: StateBundle = {}
|
tlvStorage: TlvFilesStorage
|
||||||
lastReport: StateBundle = {}
|
|
||||||
sinceLatestReport: StateBundle = {}
|
|
||||||
reportPeriod = 1000 * 60 * 60 * 12 //12h
|
|
||||||
satsPer1SecondDiscount = 1
|
|
||||||
totalSatsForDiscount = 0
|
|
||||||
latestReport = Date.now()
|
|
||||||
reportLog = getLogger({ component: 'stateBundlerReport' })
|
reportLog = getLogger({ component: 'stateBundlerReport' })
|
||||||
constructor() {
|
constructor(settings: StorageSettings) {
|
||||||
/* process.on('exit', () => {
|
const bundlerPath = [settings.dataDir, "bundler_events"].join("/")
|
||||||
this.Report()
|
this.tlvStorage = new TlvFilesStorage(bundlerPath)
|
||||||
});
|
this.tlvStorage.initMeta()
|
||||||
|
|
||||||
// catch ctrl+c event and exit normally
|
|
||||||
process.on('SIGINT', () => {
|
|
||||||
console.log('Ctrl-C...');
|
|
||||||
process.exit(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
//catch uncaught exceptions, trace, then exit normally
|
|
||||||
process.on('uncaughtException', (e) => {
|
|
||||||
console.log('Uncaught Exception...');
|
|
||||||
console.log(e.stack);
|
|
||||||
process.exit(99);
|
|
||||||
}); */
|
|
||||||
}
|
}
|
||||||
|
|
||||||
increment = (key: string, value: number) => {
|
async GetBundleMetrics(req: Types.LatestBundleMetricReq): Promise<Types.BundleMetrics> {
|
||||||
|
const latest = this.tlvStorage.LoadLatest(req.limit)
|
||||||
|
const metrics: Types.BundleMetrics = { apps: {} }
|
||||||
|
Object.keys(latest).forEach(app => {
|
||||||
|
metrics.apps[app] = { app_bundles: {} }
|
||||||
|
Object.keys(latest[app]).forEach(dataName => {
|
||||||
|
const data = latest[app][dataName]
|
||||||
|
metrics.apps[app].app_bundles[dataName] = {
|
||||||
|
current_chunk: data.current_chunk,
|
||||||
|
available_chunks: data.available_chunks,
|
||||||
|
base_64_data: data.tlvs.map(d => Buffer.from(d).toString('base64'))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
/* increment = (key: string, value: number) => {
|
||||||
this.sinceStart[key] = (this.sinceStart[key] || 0) + value
|
this.sinceStart[key] = (this.sinceStart[key] || 0) + value
|
||||||
this.sinceLatestReport[key] = (this.sinceLatestReport[key] || 0) + value
|
this.sinceLatestReport[key] = (this.sinceLatestReport[key] || 0) + value
|
||||||
this.triggerReportCheck()
|
this.triggerReportCheck()
|
||||||
|
|
@ -66,46 +67,51 @@ export class StateBundler {
|
||||||
this.sinceStart[key] = Math.max(this.sinceStart[key] || 0, value)
|
this.sinceStart[key] = Math.max(this.sinceStart[key] || 0, value)
|
||||||
this.sinceLatestReport[key] = Math.max(this.sinceLatestReport[key] || 0, value)
|
this.sinceLatestReport[key] = Math.max(this.sinceLatestReport[key] || 0, value)
|
||||||
this.triggerReportCheck()
|
this.triggerReportCheck()
|
||||||
}
|
} */
|
||||||
|
|
||||||
AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings) => {
|
AddTxPoint = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => {
|
||||||
const { used, from, timeDiscount } = settings
|
const { used, from } = settings
|
||||||
const meta = settings.meta || []
|
const meta = settings.meta || []
|
||||||
const key = [actionName, from, used, ...meta].join('_')
|
const key = [actionName, from, used, ...meta].join('_')
|
||||||
if (timeDiscount) {
|
|
||||||
this.totalSatsForDiscount += v
|
this.tlvStorage.AddTlv(appId, key, this.serializeNow(v))
|
||||||
}
|
|
||||||
this.increment(key, v)
|
|
||||||
//this.smallLogEvent(actionName, from)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings) => {
|
AddTxPointFailed = (actionName: TransactionStatePointType, v: number, settings: TxPointSettings, appId = '__root') => {
|
||||||
const { used, from } = settings
|
const { used, from } = settings
|
||||||
const meta = settings.meta || []
|
const meta = settings.meta || []
|
||||||
const key = [actionName, from, used, ...meta, 'failed'].join('_')
|
const key = [actionName, from, used, ...meta, 'failed'].join('_')
|
||||||
this.increment(key, v)
|
this.tlvStorage.AddTlv(appId, key, this.serializeNow(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = []) => {
|
AddBalancePoint = (actionName: BalanceStatePointType, v: number, meta = [], appId = '__root') => {
|
||||||
const key = [actionName, ...meta].join('_')
|
const key = [actionName, ...meta].join('_')
|
||||||
this.set(key, v)
|
this.tlvStorage.AddTlv(appId, key, this.serializeNow(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = []) => {
|
AddMaxPoint = (actionName: MaxStatePointType, v: number, meta = [], appId = '__root') => {
|
||||||
const key = [actionName, ...meta].join('_')
|
const key = [actionName, ...meta].join('_')
|
||||||
this.max(key, v)
|
this.tlvStorage.AddTlv(appId, key, this.serializeNow(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
triggerReportCheck = () => {
|
serializeNow = (v: number) => {
|
||||||
|
const nowUnix = Math.floor(Date.now() / 1000)
|
||||||
|
const entry = new Uint8Array(8)
|
||||||
|
entry.set(integerToUint8Array(nowUnix), 0)
|
||||||
|
entry.set(integerToUint8Array(v), 4)
|
||||||
|
return entry
|
||||||
|
}
|
||||||
|
|
||||||
|
/* triggerReportCheck = () => {
|
||||||
const discountSeconds = Math.floor(this.totalSatsForDiscount / this.satsPer1SecondDiscount)
|
const discountSeconds = Math.floor(this.totalSatsForDiscount / this.satsPer1SecondDiscount)
|
||||||
const totalElapsed = Date.now() - this.latestReport
|
const totalElapsed = Date.now() - this.latestReport
|
||||||
const elapsedWithDiscount = totalElapsed + discountSeconds * 1000
|
const elapsedWithDiscount = totalElapsed + discountSeconds * 1000
|
||||||
if (elapsedWithDiscount > this.reportPeriod) {
|
if (elapsedWithDiscount > this.reportPeriod) {
|
||||||
this.Report()
|
this.Report()
|
||||||
}
|
}
|
||||||
}
|
} */
|
||||||
|
|
||||||
smallLogEvent(event: TransactionStatePointType, from: 'user' | 'system') {
|
/* smallLogEvent(event: TransactionStatePointType, from: 'user' | 'system') {
|
||||||
const char = from === 'user' ? 'U' : 'S'
|
const char = from === 'user' ? 'U' : 'S'
|
||||||
switch (event) {
|
switch (event) {
|
||||||
case 'addedAddress':
|
case 'addedAddress':
|
||||||
|
|
@ -123,9 +129,9 @@ export class StateBundler {
|
||||||
case 'user2user':
|
case 'user2user':
|
||||||
process.stdout.write(`UU`)
|
process.stdout.write(`UU`)
|
||||||
}
|
}
|
||||||
}
|
} */
|
||||||
|
|
||||||
Report = () => {
|
/* Report = () => {
|
||||||
this.totalSatsForDiscount = 0
|
this.totalSatsForDiscount = 0
|
||||||
this.latestReport = Date.now()
|
this.latestReport = Date.now()
|
||||||
this.reportLog("+++++ since last report:")
|
this.reportLog("+++++ since last report:")
|
||||||
|
|
@ -138,5 +144,5 @@ export class StateBundler {
|
||||||
})
|
})
|
||||||
this.lastReport = { ...this.sinceLatestReport }
|
this.lastReport = { ...this.sinceLatestReport }
|
||||||
this.sinceLatestReport = {}
|
this.sinceLatestReport = {}
|
||||||
}
|
} */
|
||||||
}
|
}
|
||||||
153
src/services/storage/tlvFilesStorage.ts
Normal file
153
src/services/storage/tlvFilesStorage.ts
Normal file
|
|
@ -0,0 +1,153 @@
|
||||||
|
import fs from 'fs'
|
||||||
|
import { decodeListTLV, encodeListTLV, encodeTLV, parseTLV } from '../helpers/tlv.js'
|
||||||
|
const chunkSizeBytes = 128 * 1024
|
||||||
|
export type LatestData = Record<string, Record<string, { tlvs: Uint8Array[], current_chunk: number, available_chunks: number[] }>>
|
||||||
|
export class TlvFilesStorage {
|
||||||
|
storagePath: string
|
||||||
|
lastPersisted: number = 0
|
||||||
|
meta: Record<string, Record<string, { chunks: number[] }>> = {}
|
||||||
|
pending: Record<string, Record<string, { tlvs: Uint8Array[] }>> = {}
|
||||||
|
metaReady = false
|
||||||
|
constructor(storagePath: string) {
|
||||||
|
this.storagePath = storagePath
|
||||||
|
if (!fs.existsSync(this.storagePath)) {
|
||||||
|
fs.mkdirSync(this.storagePath, { recursive: true });
|
||||||
|
}
|
||||||
|
this.initMeta()
|
||||||
|
setInterval(() => {
|
||||||
|
if (Date.now() - this.lastPersisted > 1000 * 60 * 4) {
|
||||||
|
this.persist()
|
||||||
|
}
|
||||||
|
}, 1000 * 60 * 5)
|
||||||
|
process.on('exit', () => {
|
||||||
|
this.persist()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
LoadFile = (app: string, dataName: string, chunk: number): { fileData: Buffer, chunks: number[] } => {
|
||||||
|
if (!this.metaReady || !this.meta[app] || !this.meta[app][dataName] || !this.meta[app][dataName].chunks.includes(chunk)) {
|
||||||
|
throw new Error("metrics not found")
|
||||||
|
}
|
||||||
|
const fullPath = [this.storagePath, app, dataName, `${chunk}.mtlv`].join("/")
|
||||||
|
const fileData = fs.readFileSync(fullPath)
|
||||||
|
return { fileData, chunks: this.meta[app][dataName].chunks }
|
||||||
|
}
|
||||||
|
|
||||||
|
AddTlv = (appId: string, dataName: string, tlv: Uint8Array) => {
|
||||||
|
if (!this.metaReady) {
|
||||||
|
throw new Error("meta metrics not ready")
|
||||||
|
}
|
||||||
|
if (!this.pending[appId]) {
|
||||||
|
this.pending[appId] = {}
|
||||||
|
}
|
||||||
|
if (!this.pending[appId][dataName]) {
|
||||||
|
this.pending[appId][dataName] = { tlvs: [] }
|
||||||
|
}
|
||||||
|
this.pending[appId][dataName].tlvs.push(tlv)
|
||||||
|
}
|
||||||
|
|
||||||
|
LoadLatest = (limit = 100): LatestData => {
|
||||||
|
this.persist()
|
||||||
|
const data: LatestData = {}
|
||||||
|
this.foreachFile((app, dataName, tlvFiles) => {
|
||||||
|
if (tlvFiles.length === 0) { return }
|
||||||
|
const methodPath = [this.storagePath, app, dataName].join("/")
|
||||||
|
const latest = tlvFiles[tlvFiles.length - 1]
|
||||||
|
const tlvFile = [methodPath, `${latest}.mtlv`].join("/")
|
||||||
|
const tlv = fs.readFileSync(tlvFile)
|
||||||
|
const decoded = decodeListTLV(parseTLV(tlv))
|
||||||
|
if (!data[app]) {
|
||||||
|
data[app] = {}
|
||||||
|
}
|
||||||
|
if (decoded.length > limit) {
|
||||||
|
decoded.splice(0, decoded.length - limit)
|
||||||
|
}
|
||||||
|
data[app][dataName] = {
|
||||||
|
tlvs: decoded,
|
||||||
|
current_chunk: latest,
|
||||||
|
available_chunks: tlvFiles
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
persist = () => {
|
||||||
|
if (!this.metaReady) {
|
||||||
|
throw new Error("meta metrics not ready")
|
||||||
|
}
|
||||||
|
this.lastPersisted = Date.now()
|
||||||
|
const tosync = this.pending
|
||||||
|
this.pending = {}
|
||||||
|
const apps = Object.keys(tosync)
|
||||||
|
apps.map(app => {
|
||||||
|
const appPath = [this.storagePath, app].join("/")
|
||||||
|
if (!fs.existsSync(appPath)) {
|
||||||
|
fs.mkdirSync(appPath, { recursive: true });
|
||||||
|
}
|
||||||
|
const dataNames = Object.keys(tosync[app])
|
||||||
|
dataNames.map(dataName => {
|
||||||
|
const dataPath = [appPath, dataName].join("/")
|
||||||
|
if (!fs.existsSync(dataPath)) {
|
||||||
|
fs.mkdirSync(dataPath, { recursive: true });
|
||||||
|
}
|
||||||
|
const data = tosync[app][dataName]
|
||||||
|
const meta = this.getMeta(app, dataName)
|
||||||
|
const chunks = meta.chunks.length > 0 ? meta.chunks : [0]
|
||||||
|
const latest = chunks[chunks.length - 1]
|
||||||
|
const tlv = encodeTLV(encodeListTLV(data.tlvs))
|
||||||
|
const tlvFile = [dataPath, `${latest}.mtlv`].join("/")
|
||||||
|
fs.appendFileSync(tlvFile, Buffer.from(tlv))
|
||||||
|
if (fs.lstatSync(tlvFile).size > chunkSizeBytes) {
|
||||||
|
this.updateMeta(app, dataName, [...chunks, latest + 1])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
getMeta = (appId: string, dataName: string) => {
|
||||||
|
if (!this.meta[appId]) {
|
||||||
|
return { chunks: [] }
|
||||||
|
}
|
||||||
|
return this.meta[appId][dataName] || { chunks: [] }
|
||||||
|
}
|
||||||
|
|
||||||
|
initMeta = () => {
|
||||||
|
this.foreachFile((app, dataName, tlvFiles) => {
|
||||||
|
this.updateMeta(app, dataName, tlvFiles)
|
||||||
|
})
|
||||||
|
this.metaReady = true
|
||||||
|
}
|
||||||
|
|
||||||
|
updateMeta = (appId: string, dataName: string, sortedChunks: number[]) => {
|
||||||
|
if (!this.meta[appId]) {
|
||||||
|
this.meta[appId] = {}
|
||||||
|
}
|
||||||
|
this.meta[appId][dataName] = { chunks: sortedChunks }
|
||||||
|
}
|
||||||
|
|
||||||
|
foreachFile = (cb: (appId: string, dataName: string, tlvFiles: number[]) => void) => {
|
||||||
|
if (!fs.existsSync(this.storagePath)) {
|
||||||
|
fs.mkdirSync(this.storagePath, { recursive: true });
|
||||||
|
}
|
||||||
|
const apps = fs.readdirSync(this.storagePath)
|
||||||
|
apps.forEach(appDir => {
|
||||||
|
const appPath = [this.storagePath, appDir].join("/")
|
||||||
|
if (!fs.lstatSync(appPath).isDirectory()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const dataNames = fs.readdirSync(appPath)
|
||||||
|
dataNames.forEach(dataName => {
|
||||||
|
const dataPath = [appPath, dataName].join("/")
|
||||||
|
if (!fs.lstatSync(dataPath).isDirectory()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const tlvFiles = fs.readdirSync(dataPath)
|
||||||
|
.filter(f => f.endsWith(".mtlv"))
|
||||||
|
.map(f => +f.slice(0, -".mtlv".length))
|
||||||
|
.filter(n => !isNaN(n))
|
||||||
|
.sort((a, b) => a - b)
|
||||||
|
cb(appDir, dataName, tlvFiles)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -90,12 +90,12 @@ export default class webRTC {
|
||||||
this.log(ERROR, 'SingleUsageMetricReqValidate', err)
|
this.log(ERROR, 'SingleUsageMetricReqValidate', err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
const res = await this.storage.metricsEventStorage.LoadRawMetricsFile(j.app_id, j.metrics_name, j.page)
|
const { fileData } = this.storage.metricsEventStorage.tlvStorage.LoadFile(j.app_id, j.metrics_name, j.page)
|
||||||
const id = j.request_id || Math.floor(Math.random() * 100_000_000)
|
const id = j.request_id || Math.floor(Math.random() * 100_000_000)
|
||||||
let i = 0
|
let i = 0
|
||||||
const packets: Buffer[] = []
|
const packets: Buffer[] = []
|
||||||
while (i < res.length) {
|
while (i < fileData.length) {
|
||||||
const chunk = res.slice(i, Math.min(i + 15_000, res.length))
|
const chunk = fileData.slice(i, Math.min(i + 15_000, fileData.length))
|
||||||
packets.push(chunk)
|
packets.push(chunk)
|
||||||
i += 15_000
|
i += 15_000
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue