From ed6036ce1ec416d895cd1978d55680e987b680d1 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Wed, 14 May 2025 18:59:00 +0000 Subject: [PATCH] add alerter endpoints --- proto/autogenerated/client.md | 32 ++++++++++ proto/autogenerated/go/http_client.go | 49 +++++++++++++++ proto/autogenerated/go/types.go | 8 +++ proto/autogenerated/ts/express_server.ts | 38 +++++++++++ proto/autogenerated/ts/http_client.ts | 25 ++++++++ proto/autogenerated/ts/nostr_client.ts | 25 ++++++++ proto/autogenerated/ts/nostr_transport.ts | 26 ++++++++ proto/autogenerated/ts/types.ts | 63 ++++++++++++++++++- proto/service/methods.proto | 13 ++++ proto/service/structs.proto | 14 ++++- src/index.ts | 3 +- src/nostrMiddleware.ts | 4 +- src/services/main/index.ts | 32 ++++++++++ src/services/metrics/index.ts | 12 ++++ src/services/nostr/handler.ts | 16 ++++- src/services/nostr/index.ts | 12 ++++ src/services/storage/db/storageInterface.ts | 7 +++ src/services/storage/db/storageProcessor.ts | 19 +++++- .../storage/tlv/tlvFilesStorageFactory.ts | 8 ++- .../storage/tlv/tlvFilesStorageProcessor.ts | 19 +++++- 20 files changed, 414 insertions(+), 11 deletions(-) diff --git a/proto/autogenerated/client.md b/proto/autogenerated/client.md index 93fb4efc..2596a38e 100644 --- a/proto/autogenerated/client.md +++ b/proto/autogenerated/client.md @@ -158,6 +158,11 @@ The nostr server will send back a message response, and inside the body there wi - input: [GetPaymentStateRequest](#GetPaymentStateRequest) - output: [PaymentState](#PaymentState) +- GetProvidersDisruption + - auth type: __Metrics__ + - This methods has an __empty__ __request__ body + - output: [ProvidersDisruption](#ProvidersDisruption) + - GetSeed - auth type: __Admin__ - This methods has an __empty__ __request__ body @@ -250,6 +255,11 @@ The nostr server will send back a message response, and inside the body there wi - input: [PayInvoiceRequest](#PayInvoiceRequest) - output: [PayInvoiceResponse](#PayInvoiceResponse) +- PingSubProcesses + - auth type: __Metrics__ + - This methods has an __empty__ __request__ body + - This methods has an __empty__ __response__ body + - ResetDebit - auth type: __User__ - input: [DebitOperation](#DebitOperation) @@ -617,6 +627,13 @@ The nostr server will send back a message response, and inside the body there wi - input: [GetPaymentStateRequest](#GetPaymentStateRequest) - output: [PaymentState](#PaymentState) +- GetProvidersDisruption + - auth type: __Metrics__ + - http method: __post__ + - http route: __/api/metrics/providers/disruption__ + - This methods has an __empty__ __request__ body + - output: [ProvidersDisruption](#ProvidersDisruption) + - GetSeed - auth type: __Admin__ - http method: __get__ @@ -790,6 +807,13 @@ The nostr server will send back a message response, and inside the body there wi - input: [PayInvoiceRequest](#PayInvoiceRequest) - output: [PayInvoiceResponse](#PayInvoiceResponse) +- PingSubProcesses + - auth type: __Metrics__ + - http method: __post__ + - http route: __/api/metrics/ping__ + - This methods has an __empty__ __request__ body + - This methods has an __empty__ __response__ body + - RequestNPubLinkingToken - auth type: __App__ - http method: __post__ @@ -1352,6 +1376,14 @@ The nostr server will send back a message response, and inside the body there wi - __noffer__: _string_ - __price_sats__: _number_ +### ProviderDisruption + - __provider_pubkey__: _string_ + - __provider_type__: _string_ + - __since_unix__: _number_ + +### ProvidersDisruption + - __disruptions__: ARRAY of: _[ProviderDisruption](#ProviderDisruption)_ + ### RelaysMigration - __relays__: ARRAY of: _string_ diff --git a/proto/autogenerated/go/http_client.go b/proto/autogenerated/go/http_client.go index 97f6ff04..1f29f872 100644 --- a/proto/autogenerated/go/http_client.go +++ b/proto/autogenerated/go/http_client.go @@ -93,6 +93,7 @@ type Client struct { GetMigrationUpdate func() (*MigrationUpdate, error) GetNPubLinkingState func(req GetNPubLinking) (*NPubLinking, error) GetPaymentState func(req GetPaymentStateRequest) (*PaymentState, error) + GetProvidersDisruption func() (*ProvidersDisruption, error) GetSeed func() (*LndSeed, error) GetSingleBundleMetrics func(req SingleMetricReq) (*BundleData, error) GetSingleUsageMetrics func(req SingleMetricReq) (*UsageMetricTlv, error) @@ -116,6 +117,7 @@ type Client struct { PayAddress func(req PayAddressRequest) (*PayAddressResponse, error) PayAppUserInvoice func(req PayAppUserInvoiceRequest) (*PayInvoiceResponse, error) PayInvoice func(req PayInvoiceRequest) (*PayInvoiceResponse, error) + PingSubProcesses func() error RequestNPubLinkingToken func(req RequestNPubLinkingTokenRequest) (*RequestNPubLinkingTokenResponse, error) ResetDebit func(req DebitOperation) error ResetMetricsStorages func() error @@ -1096,6 +1098,32 @@ func NewClient(params ClientParams) *Client { } return &res, nil }, + GetProvidersDisruption: func() (*ProvidersDisruption, error) { + auth, err := params.RetrieveMetricsAuth() + if err != nil { + return nil, err + } + finalRoute := "/api/metrics/providers/disruption" + body := []byte{} + resBody, err := doPostRequest(params.BaseURL+finalRoute, body, auth) + if err != nil { + return nil, err + } + result := ResultError{} + err = json.Unmarshal(resBody, &result) + if err != nil { + return nil, err + } + if result.Status == "ERROR" { + return nil, fmt.Errorf(result.Reason) + } + res := ProvidersDisruption{} + err = json.Unmarshal(resBody, &res) + if err != nil { + return nil, err + } + return &res, nil + }, GetSeed: func() (*LndSeed, error) { auth, err := params.RetrieveAdminAuth() if err != nil { @@ -1726,6 +1754,27 @@ func NewClient(params ClientParams) *Client { } return &res, nil }, + PingSubProcesses: func() error { + auth, err := params.RetrieveMetricsAuth() + if err != nil { + return err + } + finalRoute := "/api/metrics/ping" + body := []byte{} + resBody, err := doPostRequest(params.BaseURL+finalRoute, body, auth) + if err != nil { + return err + } + result := ResultError{} + err = json.Unmarshal(resBody, &result) + if err != nil { + return err + } + if result.Status == "ERROR" { + return fmt.Errorf(result.Reason) + } + return nil + }, RequestNPubLinkingToken: func(req RequestNPubLinkingTokenRequest) (*RequestNPubLinkingTokenResponse, error) { auth, err := params.RetrieveAppAuth() if err != nil { diff --git a/proto/autogenerated/go/types.go b/proto/autogenerated/go/types.go index 2fa9b31d..68d35a25 100644 --- a/proto/autogenerated/go/types.go +++ b/proto/autogenerated/go/types.go @@ -527,6 +527,14 @@ type Product struct { Noffer string `json:"noffer"` Price_sats int64 `json:"price_sats"` } +type ProviderDisruption struct { + Provider_pubkey string `json:"provider_pubkey"` + Provider_type string `json:"provider_type"` + Since_unix int64 `json:"since_unix"` +} +type ProvidersDisruption struct { + Disruptions []ProviderDisruption `json:"disruptions"` +} type RelaysMigration struct { Relays []string `json:"relays"` } diff --git a/proto/autogenerated/ts/express_server.ts b/proto/autogenerated/ts/express_server.ts index 9be3037d..7a3116ef 100644 --- a/proto/autogenerated/ts/express_server.ts +++ b/proto/autogenerated/ts/express_server.ts @@ -1138,6 +1138,25 @@ export default (methods: Types.ServerMethods, opts: ServerOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } }) + if (!opts.allowNotImplementedMethods && !methods.GetProvidersDisruption) throw new Error('method: GetProvidersDisruption is not implemented') + app.post('/api/metrics/providers/disruption', async (req, res) => { + const info: Types.RequestInfo = { rpcName: 'GetProvidersDisruption', batch: false, nostr: false, batchSize: 0} + const stats: Types.RequestStats = { startMs:req.startTimeMs || 0, start:req.startTime || 0n, parse: process.hrtime.bigint(), guard: 0n, validate: 0n, handle: 0n } + let authCtx: Types.AuthContext = {} + try { + if (!methods.GetProvidersDisruption) throw new Error('method: GetProvidersDisruption is not implemented') + const authContext = await opts.MetricsAuthGuard(req.headers['authorization']) + authCtx = authContext + stats.guard = process.hrtime.bigint() + stats.validate = stats.guard + const query = req.query + const params = req.params + const response = await methods.GetProvidersDisruption({rpcName:'GetProvidersDisruption', ctx:authContext }) + stats.handle = process.hrtime.bigint() + res.json({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + }) if (!opts.allowNotImplementedMethods && !methods.GetSeed) throw new Error('method: GetSeed is not implemented') app.get('/api/admin/seed', async (req, res) => { const info: Types.RequestInfo = { rpcName: 'GetSeed', batch: false, nostr: false, batchSize: 0} @@ -1617,6 +1636,25 @@ export default (methods: Types.ServerMethods, opts: ServerOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } }) + if (!opts.allowNotImplementedMethods && !methods.PingSubProcesses) throw new Error('method: PingSubProcesses is not implemented') + app.post('/api/metrics/ping', async (req, res) => { + const info: Types.RequestInfo = { rpcName: 'PingSubProcesses', batch: false, nostr: false, batchSize: 0} + const stats: Types.RequestStats = { startMs:req.startTimeMs || 0, start:req.startTime || 0n, parse: process.hrtime.bigint(), guard: 0n, validate: 0n, handle: 0n } + let authCtx: Types.AuthContext = {} + try { + if (!methods.PingSubProcesses) throw new Error('method: PingSubProcesses is not implemented') + const authContext = await opts.MetricsAuthGuard(req.headers['authorization']) + authCtx = authContext + stats.guard = process.hrtime.bigint() + stats.validate = stats.guard + const query = req.query + const params = req.params + await methods.PingSubProcesses({rpcName:'PingSubProcesses', ctx:authContext }) + stats.handle = process.hrtime.bigint() + res.json({status: 'OK'}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + } catch (ex) { const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + }) if (!opts.allowNotImplementedMethods && !methods.RequestNPubLinkingToken) throw new Error('method: RequestNPubLinkingToken is not implemented') app.post('/api/app/user/npub/token', async (req, res) => { const info: Types.RequestInfo = { rpcName: 'RequestNPubLinkingToken', batch: false, nostr: false, batchSize: 0} diff --git a/proto/autogenerated/ts/http_client.ts b/proto/autogenerated/ts/http_client.ts index 9fa96276..0afa72aa 100644 --- a/proto/autogenerated/ts/http_client.ts +++ b/proto/autogenerated/ts/http_client.ts @@ -507,6 +507,20 @@ export default (params: ClientParams) => ({ } return { status: 'ERROR', reason: 'invalid response' } }, + GetProvidersDisruption: async (): Promise => { + const auth = await params.retrieveMetricsAuth() + if (auth === null) throw new Error('retrieveMetricsAuth() returned null') + let finalRoute = '/api/metrics/providers/disruption' + const { data } = await axios.post(params.baseUrl + finalRoute, {}, { headers: { 'authorization': auth } }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.ProvidersDisruptionValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, GetSeed: async (): Promise => { const auth = await params.retrieveAdminAuth() if (auth === null) throw new Error('retrieveAdminAuth() returned null') @@ -827,6 +841,17 @@ export default (params: ClientParams) => ({ } return { status: 'ERROR', reason: 'invalid response' } }, + PingSubProcesses: async (): Promise => { + const auth = await params.retrieveMetricsAuth() + if (auth === null) throw new Error('retrieveMetricsAuth() returned null') + let finalRoute = '/api/metrics/ping' + const { data } = await axios.post(params.baseUrl + finalRoute, {}, { headers: { 'authorization': auth } }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + return data + } + return { status: 'ERROR', reason: 'invalid response' } + }, RequestNPubLinkingToken: async (request: Types.RequestNPubLinkingTokenRequest): Promise => { const auth = await params.retrieveAppAuth() if (auth === null) throw new Error('retrieveAppAuth() returned null') diff --git a/proto/autogenerated/ts/nostr_client.ts b/proto/autogenerated/ts/nostr_client.ts index f6dcf1dd..6813815d 100644 --- a/proto/autogenerated/ts/nostr_client.ts +++ b/proto/autogenerated/ts/nostr_client.ts @@ -422,6 +422,20 @@ export default (params: NostrClientParams, send: (to:string, message: NostrRequ } return { status: 'ERROR', reason: 'invalid response' } }, + GetProvidersDisruption: async (): Promise => { + const auth = await params.retrieveNostrMetricsAuth() + if (auth === null) throw new Error('retrieveNostrMetricsAuth() returned null') + const nostrRequest: NostrRequest = {} + const data = await send(params.pubDestination, {rpcName:'GetProvidersDisruption',authIdentifier:auth, ...nostrRequest }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + const result = data + if(!params.checkResult) return { status: 'OK', ...result } + const error = Types.ProvidersDisruptionValidate(result) + if (error === null) { return { status: 'OK', ...result } } else return { status: 'ERROR', reason: error.message } + } + return { status: 'ERROR', reason: 'invalid response' } + }, GetSeed: async (): Promise => { const auth = await params.retrieveNostrAdminAuth() if (auth === null) throw new Error('retrieveNostrAdminAuth() returned null') @@ -685,6 +699,17 @@ export default (params: NostrClientParams, send: (to:string, message: NostrRequ } return { status: 'ERROR', reason: 'invalid response' } }, + PingSubProcesses: async (): Promise => { + const auth = await params.retrieveNostrMetricsAuth() + if (auth === null) throw new Error('retrieveNostrMetricsAuth() returned null') + const nostrRequest: NostrRequest = {} + const data = await send(params.pubDestination, {rpcName:'PingSubProcesses',authIdentifier:auth, ...nostrRequest }) + if (data.status === 'ERROR' && typeof data.reason === 'string') return data + if (data.status === 'OK') { + return data + } + return { status: 'ERROR', reason: 'invalid response' } + }, ResetDebit: async (request: Types.DebitOperation): Promise => { const auth = await params.retrieveNostrUserAuth() if (auth === null) throw new Error('retrieveNostrUserAuth() returned null') diff --git a/proto/autogenerated/ts/nostr_transport.ts b/proto/autogenerated/ts/nostr_transport.ts index 4f48a701..55553eb6 100644 --- a/proto/autogenerated/ts/nostr_transport.ts +++ b/proto/autogenerated/ts/nostr_transport.ts @@ -812,6 +812,19 @@ export default (methods: Types.ServerMethods, opts: NostrOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } break + case 'GetProvidersDisruption': + try { + if (!methods.GetProvidersDisruption) throw new Error('method: GetProvidersDisruption is not implemented') + const authContext = await opts.NostrMetricsAuthGuard(req.appId, req.authIdentifier) + stats.guard = process.hrtime.bigint() + authCtx = authContext + stats.validate = stats.guard + const response = await methods.GetProvidersDisruption({rpcName:'GetProvidersDisruption', ctx:authContext }) + stats.handle = process.hrtime.bigint() + res({status: 'OK', ...response}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + break case 'GetSeed': try { if (!methods.GetSeed) throw new Error('method: GetSeed is not implemented') @@ -1085,6 +1098,19 @@ export default (methods: Types.ServerMethods, opts: NostrOptions) => { opts.metricsCallback([{ ...info, ...stats, ...authContext }]) }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } break + case 'PingSubProcesses': + try { + if (!methods.PingSubProcesses) throw new Error('method: PingSubProcesses is not implemented') + const authContext = await opts.NostrMetricsAuthGuard(req.appId, req.authIdentifier) + stats.guard = process.hrtime.bigint() + authCtx = authContext + stats.validate = stats.guard + await methods.PingSubProcesses({rpcName:'PingSubProcesses', ctx:authContext }) + stats.handle = process.hrtime.bigint() + res({status: 'OK'}) + opts.metricsCallback([{ ...info, ...stats, ...authContext }]) + }catch(ex){ const e = ex as any; logErrorAndReturnResponse(e, e.message || e, res, logger, { ...info, ...stats, ...authCtx }, opts.metricsCallback); if (opts.throwErrors) throw e } + break case 'ResetDebit': try { if (!methods.ResetDebit) throw new Error('method: ResetDebit is not implemented') diff --git a/proto/autogenerated/ts/types.ts b/proto/autogenerated/ts/types.ts index ae494f22..dff4ae99 100644 --- a/proto/autogenerated/ts/types.ts +++ b/proto/autogenerated/ts/types.ts @@ -28,8 +28,8 @@ export type MetricsContext = { app_id: string operator_id: string } -export type MetricsMethodInputs = GetAppsMetrics_Input | GetBundleMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetSingleBundleMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | ResetMetricsStorages_Input | SubmitWebRtcMessage_Input | ZipMetricsStorages_Input -export type MetricsMethodOutputs = GetAppsMetrics_Output | GetBundleMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetSingleBundleMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | ResetMetricsStorages_Output | SubmitWebRtcMessage_Output | ZipMetricsStorages_Output +export type MetricsMethodInputs = GetAppsMetrics_Input | GetBundleMetrics_Input | GetErrorStats_Input | GetLndMetrics_Input | GetProvidersDisruption_Input | GetSingleBundleMetrics_Input | GetSingleUsageMetrics_Input | GetUsageMetrics_Input | PingSubProcesses_Input | ResetMetricsStorages_Input | SubmitWebRtcMessage_Input | ZipMetricsStorages_Input +export type MetricsMethodOutputs = GetAppsMetrics_Output | GetBundleMetrics_Output | GetErrorStats_Output | GetLndMetrics_Output | GetProvidersDisruption_Output | GetSingleBundleMetrics_Output | GetSingleUsageMetrics_Output | GetUsageMetrics_Output | PingSubProcesses_Output | ResetMetricsStorages_Output | SubmitWebRtcMessage_Output | ZipMetricsStorages_Output export type UserContext = { app_id: string app_user_id: string @@ -162,6 +162,9 @@ export type GetNPubLinkingState_Output = ResultError | ({ status: 'OK' } & NPubL export type GetPaymentState_Input = {rpcName:'GetPaymentState', req: GetPaymentStateRequest} export type GetPaymentState_Output = ResultError | ({ status: 'OK' } & PaymentState) +export type GetProvidersDisruption_Input = {rpcName:'GetProvidersDisruption'} +export type GetProvidersDisruption_Output = ResultError | ({ status: 'OK' } & ProvidersDisruption) + export type GetSeed_Input = {rpcName:'GetSeed'} export type GetSeed_Output = ResultError | ({ status: 'OK' } & LndSeed) @@ -247,6 +250,9 @@ export type PayAppUserInvoice_Output = ResultError | ({ status: 'OK' } & PayInvo export type PayInvoice_Input = {rpcName:'PayInvoice', req: PayInvoiceRequest} export type PayInvoice_Output = ResultError | ({ status: 'OK' } & PayInvoiceResponse) +export type PingSubProcesses_Input = {rpcName:'PingSubProcesses'} +export type PingSubProcesses_Output = ResultError | { status: 'OK' } + export type RequestNPubLinkingToken_Input = {rpcName:'RequestNPubLinkingToken', req: RequestNPubLinkingTokenRequest} export type RequestNPubLinkingToken_Output = ResultError | ({ status: 'OK' } & RequestNPubLinkingTokenResponse) @@ -340,6 +346,7 @@ export type ServerMethods = { GetMigrationUpdate?: (req: GetMigrationUpdate_Input & {ctx: UserContext }) => Promise GetNPubLinkingState?: (req: GetNPubLinkingState_Input & {ctx: AppContext }) => Promise GetPaymentState?: (req: GetPaymentState_Input & {ctx: UserContext }) => Promise + GetProvidersDisruption?: (req: GetProvidersDisruption_Input & {ctx: MetricsContext }) => Promise GetSeed?: (req: GetSeed_Input & {ctx: AdminContext }) => Promise GetSingleBundleMetrics?: (req: GetSingleBundleMetrics_Input & {ctx: MetricsContext }) => Promise GetSingleUsageMetrics?: (req: GetSingleUsageMetrics_Input & {ctx: MetricsContext }) => Promise @@ -363,6 +370,7 @@ export type ServerMethods = { PayAddress?: (req: PayAddress_Input & {ctx: UserContext }) => Promise PayAppUserInvoice?: (req: PayAppUserInvoice_Input & {ctx: AppContext }) => Promise PayInvoice?: (req: PayInvoice_Input & {ctx: UserContext }) => Promise + PingSubProcesses?: (req: PingSubProcesses_Input & {ctx: MetricsContext }) => Promise RequestNPubLinkingToken?: (req: RequestNPubLinkingToken_Input & {ctx: AppContext }) => Promise ResetDebit?: (req: ResetDebit_Input & {ctx: UserContext }) => Promise ResetMetricsStorages?: (req: ResetMetricsStorages_Input & {ctx: MetricsContext }) => Promise @@ -3030,6 +3038,57 @@ export const ProductValidate = (o?: Product, opts: ProductOptions = {}, path: st return null } +export type ProviderDisruption = { + provider_pubkey: string + provider_type: string + since_unix: number +} +export const ProviderDisruptionOptionalFields: [] = [] +export type ProviderDisruptionOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + provider_pubkey_CustomCheck?: (v: string) => boolean + provider_type_CustomCheck?: (v: string) => boolean + since_unix_CustomCheck?: (v: number) => boolean +} +export const ProviderDisruptionValidate = (o?: ProviderDisruption, opts: ProviderDisruptionOptions = {}, path: string = 'ProviderDisruption::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (typeof o.provider_pubkey !== 'string') return new Error(`${path}.provider_pubkey: is not a string`) + if (opts.provider_pubkey_CustomCheck && !opts.provider_pubkey_CustomCheck(o.provider_pubkey)) return new Error(`${path}.provider_pubkey: custom check failed`) + + if (typeof o.provider_type !== 'string') return new Error(`${path}.provider_type: is not a string`) + if (opts.provider_type_CustomCheck && !opts.provider_type_CustomCheck(o.provider_type)) return new Error(`${path}.provider_type: custom check failed`) + + if (typeof o.since_unix !== 'number') return new Error(`${path}.since_unix: is not a number`) + if (opts.since_unix_CustomCheck && !opts.since_unix_CustomCheck(o.since_unix)) return new Error(`${path}.since_unix: custom check failed`) + + return null +} + +export type ProvidersDisruption = { + disruptions: ProviderDisruption[] +} +export const ProvidersDisruptionOptionalFields: [] = [] +export type ProvidersDisruptionOptions = OptionsBaseMessage & { + checkOptionalsAreSet?: [] + disruptions_ItemOptions?: ProviderDisruptionOptions + disruptions_CustomCheck?: (v: ProviderDisruption[]) => boolean +} +export const ProvidersDisruptionValidate = (o?: ProvidersDisruption, opts: ProvidersDisruptionOptions = {}, path: string = 'ProvidersDisruption::root.'): Error | null => { + if (opts.checkOptionalsAreSet && opts.allOptionalsAreSet) return new Error(path + ': only one of checkOptionalsAreSet or allOptionalNonDefault can be set for each message') + if (typeof o !== 'object' || o === null) return new Error(path + ': object is not an instance of an object or is null') + + if (!Array.isArray(o.disruptions)) return new Error(`${path}.disruptions: is not an array`) + for (let index = 0; index < o.disruptions.length; index++) { + const disruptionsErr = ProviderDisruptionValidate(o.disruptions[index], opts.disruptions_ItemOptions, `${path}.disruptions[${index}]`) + if (disruptionsErr !== null) return disruptionsErr + } + if (opts.disruptions_CustomCheck && !opts.disruptions_CustomCheck(o.disruptions)) return new Error(`${path}.disruptions: custom check failed`) + + return null +} + export type RelaysMigration = { relays: string[] } diff --git a/proto/service/methods.proto b/proto/service/methods.proto index 81a9165e..7ebe175f 100644 --- a/proto/service/methods.proto +++ b/proto/service/methods.proto @@ -240,6 +240,19 @@ service LightningPub { option (http_route) = "/api/metrics/zip"; option (nostr) = true; } + rpc PingSubProcesses(structs.Empty) returns (structs.Empty) { + option (auth_type) = "Metrics"; + option (http_method) = "post"; + option (http_route) = "/api/metrics/ping"; + option (nostr) = true; + } + + rpc GetProvidersDisruption(structs.Empty) returns (structs.ProvidersDisruption) { + option (auth_type) = "Metrics"; + option (http_method) = "post"; + option (http_route) = "/api/metrics/providers/disruption"; + option (nostr) = true; + } rpc ResetMetricsStorages(structs.Empty) returns (structs.Empty) { option (auth_type) = "Metrics"; diff --git a/proto/service/structs.proto b/proto/service/structs.proto index f7693ce7..068576c5 100644 --- a/proto/service/structs.proto +++ b/proto/service/structs.proto @@ -749,4 +749,16 @@ message OfferInvoice { int64 paid_at_unix = 3; int64 amount = 4; map data = 5; -} \ No newline at end of file +} + +message ProviderDisruption { + string provider_pubkey = 1; + string provider_type = 2; + int64 since_unix = 3; +} + +message ProvidersDisruption { + repeated ProviderDisruption disruptions = 1; +} + + diff --git a/src/index.ts b/src/index.ts index 19464c04..3dd3db76 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,13 +24,14 @@ const start = async () => { const serverMethods = GetServerMethods(mainHandler) const nostrSettings = LoadNosrtSettingsFromEnv() log("initializing nostr middleware") - const { Send, Stop } = nostrMiddleware(serverMethods, mainHandler, + const { Send, Stop, Ping } = nostrMiddleware(serverMethods, mainHandler, { ...nostrSettings, apps, clients: [liquidityProviderInfo] }, (e, p) => mainHandler.liquidityProvider.onEvent(e, p) ) exitHandler(() => { Stop() }) log("starting server") mainHandler.attachNostrSend(Send) + mainHandler.attachNostrProcessPing(Ping) mainHandler.StartBeacons() const appNprofile = nprofileEncode({ pubkey: liquidityProviderInfo.publicKey, relays: nostrSettings.relays }) if (wizard) { diff --git a/src/nostrMiddleware.ts b/src/nostrMiddleware.ts index 4b6f6cef..ec8c6726 100644 --- a/src/nostrMiddleware.ts +++ b/src/nostrMiddleware.ts @@ -7,7 +7,7 @@ import { ERROR, getLogger } from "./services/helpers/logger.js"; import { NdebitData } from "nostr-tools/lib/types/nip68.js"; import { NofferData } from "nostr-tools/lib/types/nip69.js"; -export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSettings: NostrSettings, onClientEvent: (e: { requestId: string }, fromPub: string) => void): { Stop: () => void, Send: NostrSend } => { +export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSettings: NostrSettings, onClientEvent: (e: { requestId: string }, fromPub: string) => void): { Stop: () => void, Send: NostrSend, Ping: () => Promise } => { const log = getLogger({}) const nostrTransport = NewNostrTransport(serverMethods, { NostrUserAuthGuard: async (appId, pub) => { @@ -68,7 +68,7 @@ export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSett nostr.Send({ type: 'app', appId: event.appId }, { type: 'content', pub: event.pub, content: JSON.stringify({ ...res, requestId: j.requestId }) }) }, event.startAtNano, event.startAtMs) }) - return { Stop: () => nostr.Stop, Send: (...args) => nostr.Send(...args) } + return { Stop: () => nostr.Stop, Send: (...args) => nostr.Send(...args), Ping: () => nostr.Ping() } } diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 39eb235f..2ee3754e 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -57,6 +57,7 @@ export default class { unlocker: Unlocker //webRTC: webRTC nostrSend: NostrSend = () => { getLogger({})("nostr send not initialized yet") } + nostrProcessPing: (() => Promise) | null = null constructor(settings: MainSettings, storage: Storage, adminManager: AdminManager, utils: Utils, unlocker: Unlocker) { this.settings = settings this.storage = storage @@ -102,6 +103,37 @@ export default class { //this.webRTC.attachNostrSend(f) } + attachNostrProcessPing(f: () => Promise) { + this.nostrProcessPing = f + } + + async pingSubProcesses() { + if (!this.nostrProcessPing) { + throw new Error("nostr process ping not initialized") + } + const storageP = this.storage.dbs.Ping() + const metricsP = this.storage.metricsStorage.dbs.Ping() + const tlvP = this.utils.tlvStorageFactory.Ping() + const nostrP = this.nostrProcessPing() + const timeout = new Promise(res => setTimeout(() => res(true), 2 * 1000)) + const storageFail = await Promise.race([storageP, timeout]) + const metricsFail = await Promise.race([metricsP, timeout]) + const tlvFail = await Promise.race([tlvP, timeout]) + const nostrFail = await Promise.race([nostrP, timeout]) + if (storageFail) { + throw new Error("storage ping failed") + } + if (metricsFail) { + throw new Error("metrics ping failed") + } + if (tlvFail) { + throw new Error("tlv ping failed") + } + if (nostrFail) { + throw new Error("nostr ping failed") + } + } + htlcCb: HtlcCb = (e) => { this.metricsManager.HtlcCb(e) } diff --git a/src/services/metrics/index.ts b/src/services/metrics/index.ts index 8321c2dc..8396a556 100644 --- a/src/services/metrics/index.ts +++ b/src/services/metrics/index.ts @@ -27,6 +27,18 @@ export default class Handler { } + async GetProvidersDisruption(): Promise { + const providers = await this.storage.liquidityStorage.GetTrackedProviders() + const disruptions = providers.filter(p => p.latest_distruption_at_unix > 0) + return { + disruptions: disruptions.map(d => ({ + provider_pubkey: d.provider_pubkey, + provider_type: d.provider_type, + since_unix: d.latest_distruption_at_unix + })) + } + } + async HtlcCb(htlc: HtlcEvent) { diff --git a/src/services/nostr/handler.ts b/src/services/nostr/handler.ts index 46b10145..6e8c8ce5 100644 --- a/src/services/nostr/handler.ts +++ b/src/services/nostr/handler.ts @@ -38,12 +38,21 @@ type SettingsRequest = { settings: NostrSettings } +type PingRequest = { + type: 'ping' +} + type SendRequest = { type: 'send' initiator: SendInitiator data: SendData relays?: string[] } + +type PingResponse = { + type: 'pong' +} + type ReadyResponse = { type: 'ready' } @@ -56,8 +65,8 @@ type ProcessMetricsResponse = { metrics: ProcessMetrics } -export type ChildProcessRequest = SettingsRequest | SendRequest -export type ChildProcessResponse = ReadyResponse | EventResponse | ProcessMetricsResponse +export type ChildProcessRequest = SettingsRequest | SendRequest | PingRequest +export type ChildProcessResponse = ReadyResponse | EventResponse | ProcessMetricsResponse | PingResponse const send = (message: ChildProcessResponse) => { if (process.send) { process.send(message, undefined, undefined, err => { @@ -77,6 +86,9 @@ process.on("message", (message: ChildProcessRequest) => { case 'send': sendToNostr(message.initiator, message.data, message.relays) break + case 'ping': + send({ type: 'pong' }) + break default: getLogger({ component: "nostrMiddleware" })(ERROR, "unknown nostr request", message) break diff --git a/src/services/nostr/index.ts b/src/services/nostr/index.ts index cbe55034..9b9745e3 100644 --- a/src/services/nostr/index.ts +++ b/src/services/nostr/index.ts @@ -19,6 +19,7 @@ export default class NostrSubprocess { settings: NostrSettings childProcess: ChildProcess utils: Utils + awaitingPongs: (() => void)[] = [] constructor(settings: NostrSettings, utils: Utils, eventCallback: EventCallback) { this.utils = utils this.childProcess = fork("./build/src/services/nostr/handler") @@ -34,6 +35,10 @@ export default class NostrSubprocess { case 'processMetrics': this.utils.tlvStorageFactory.ProcessMetrics(message.metrics, 'nostr') break + case 'pong': + this.awaitingPongs.forEach(resolve => resolve()) + this.awaitingPongs = [] + break default: console.error("unknown nostr event response", message) break; @@ -44,6 +49,13 @@ export default class NostrSubprocess { this.childProcess.send(message) } + Ping() { + this.sendToChildProcess({ type: 'ping' }) + return new Promise((resolve) => { + this.awaitingPongs.push(resolve) + }) + } + Send(initiator: SendInitiator, data: SendData, relays?: string[]) { this.sendToChildProcess({ type: 'send', data, initiator, relays }) } diff --git a/src/services/storage/db/storageInterface.ts b/src/services/storage/db/storageInterface.ts index 00794f08..81da2270 100644 --- a/src/services/storage/db/storageInterface.ts +++ b/src/services/storage/db/storageInterface.ts @@ -12,6 +12,7 @@ import { SumOperation, DBNames, SuccessOperationResponse, + PingOperation, } from './storageProcessor.js'; import { PickKeysByType } from 'typeorm/common/PickKeysByType.js'; import { serializeRequest, WhereCondition } from './serializationHelpers.js'; @@ -67,6 +68,12 @@ export class StorageInterface extends EventEmitter { this.isConnected = true; } + Ping(): Promise { + const opId = Math.random().toString() + const pingOp: PingOperation = { type: 'ping', opId } + return this.handleOp(pingOp) + } + Connect(settings: DbSettings, dbType: 'main' | 'metrics'): Promise { const opId = Math.random().toString() this.dbType = dbType diff --git a/src/services/storage/db/storageProcessor.ts b/src/services/storage/db/storageProcessor.ts index 20c2cc07..05940629 100644 --- a/src/services/storage/db/storageProcessor.ts +++ b/src/services/storage/db/storageProcessor.ts @@ -22,6 +22,12 @@ export type ConnectOperation = { debug?: boolean } +export type PingOperation = { + type: 'ping' + opId: string + debug?: boolean +} + export type StartTxOperation = { type: 'startTx' opId: string @@ -133,7 +139,7 @@ export interface IStorageOperation { } export type StorageOperation = ConnectOperation | StartTxOperation | EndTxOperation | DeleteOperation | RemoveOperation | UpdateOperation | - FindOneOperation | FindOperation | CreateAndSaveOperation | IncrementOperation | DecrementOperation | SumOperation + FindOneOperation | FindOperation | CreateAndSaveOperation | IncrementOperation | DecrementOperation | SumOperation | PingOperation export type SuccessOperationResponse = { success: true, type: string, data: T, opId: string } export type OperationResponse = SuccessOperationResponse | ErrorOperationResponse @@ -222,6 +228,9 @@ class StorageProcessor { case 'createAndSave': await this.handleCreateAndSave(operation); break; + case 'ping': + await this.handlePing(operation); + break; default: this.sendResponse({ success: false, @@ -239,6 +248,14 @@ class StorageProcessor { } } + private async handlePing(operation: PingOperation) { + this.sendResponse({ + success: true, + type: 'ping', + data: null, + opId: operation.opId + }); + } private async handleConnect(operation: ConnectOperation) { let migrationsExecuted = 0 if (this.mode !== '') { diff --git a/src/services/storage/tlv/tlvFilesStorageFactory.ts b/src/services/storage/tlv/tlvFilesStorageFactory.ts index 9840c6ad..6a7c9bfd 100644 --- a/src/services/storage/tlv/tlvFilesStorageFactory.ts +++ b/src/services/storage/tlv/tlvFilesStorageFactory.ts @@ -1,6 +1,6 @@ import { ChildProcess, fork } from 'child_process'; import { EventEmitter } from 'events'; -import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation, ProcessMetricsTlvOperation, ZipStoragesOperation, ResetTlvStorageOperation } from './tlvFilesStorageProcessor'; +import { AddTlvOperation, ITlvStorageOperation, SuccessTlvOperationResponse, LoadLatestTlvOperation, LoadTlvFileOperation, NewTlvStorageOperation, SerializableLatestData, SerializableTlvFile, TlvOperationResponse, TlvStorageSettings, WebRtcMessageOperation, ProcessMetricsTlvOperation, ZipStoragesOperation, ResetTlvStorageOperation, PingTlvOperation } from './tlvFilesStorageProcessor'; import { LatestData, TlvFile } from './tlvFilesStorage'; import { NostrSend, SendData, SendInitiator } from '../../nostr/handler'; import { WebRtcUserInfo } from '../../webRTC'; @@ -63,6 +63,12 @@ export class TlvStorageFactory extends EventEmitter { this.isConnected = true; } + Ping(): Promise { + const opId = Math.random().toString() + const op: PingTlvOperation = { type: 'ping', opId } + return this.handleOp(op) + } + ZipStorages(): Promise { const opId = Math.random().toString() const op: ZipStoragesOperation = { type: 'zipStorages', opId } diff --git a/src/services/storage/tlv/tlvFilesStorageProcessor.ts b/src/services/storage/tlv/tlvFilesStorageProcessor.ts index 1beeeddc..1d725e37 100644 --- a/src/services/storage/tlv/tlvFilesStorageProcessor.ts +++ b/src/services/storage/tlv/tlvFilesStorageProcessor.ts @@ -80,6 +80,12 @@ export type ProcessMetricsTlvOperation = { debug?: boolean } +export type PingTlvOperation = { + type: 'ping' + opId: string + debug?: boolean +} + export type ErrorTlvOperationResponse = { success: false, error: string, opId: string } export interface ITlvStorageOperation { @@ -88,7 +94,7 @@ export interface ITlvStorageOperation { debug?: boolean } -export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation | ProcessMetricsTlvOperation | ResetTlvStorageOperation | ZipStoragesOperation +export type TlvStorageOperation = NewTlvStorageOperation | AddTlvOperation | LoadLatestTlvOperation | LoadTlvFileOperation | WebRtcMessageOperation | ProcessMetricsTlvOperation | ResetTlvStorageOperation | ZipStoragesOperation | PingTlvOperation export type SuccessTlvOperationResponse = { success: true, type: string, data: T, opId: string } export type TlvOperationResponse = SuccessTlvOperationResponse | ErrorTlvOperationResponse @@ -186,6 +192,9 @@ class TlvFilesStorageProcessor { case 'zipStorages': await this.handleZipStorages(operation); break; + case 'ping': + await this.handlePing(operation); + break; default: this.sendResponse({ success: false, @@ -203,6 +212,14 @@ class TlvFilesStorageProcessor { } } + private async handlePing(operation: PingTlvOperation) { + this.sendResponse({ + success: true, + type: 'ping', + data: null, + opId: operation.opId + }); + } private async handleResetStorage(operation: ResetTlvStorageOperation) { for (const storageName in this.storages) { this.storages[storageName].Reset()