Merge branch 'master' into tests
This commit is contained in:
commit
dbe664d0b3
14 changed files with 207 additions and 80 deletions
|
|
@ -43,3 +43,4 @@ MIGRATE_DB=false
|
|||
#METRICS
|
||||
RECORD_PERFORMANCE=true
|
||||
SKIP_SANITY_CHECK=false
|
||||
DISABLE_EXTERNAL_PAYMENTS=false
|
||||
|
|
|
|||
|
|
@ -3,22 +3,26 @@ import Nostr from "./services/nostr/index.js"
|
|||
import { NostrSend, NostrSettings } from "./services/nostr/handler.js"
|
||||
import * as Types from '../proto/autogenerated/ts/types.js'
|
||||
import NewNostrTransport, { NostrRequest } from '../proto/autogenerated/ts/nostr_transport.js';
|
||||
import { getLogger } from "./services/helpers/logger.js";
|
||||
|
||||
export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSettings: NostrSettings): { Stop: () => void, Send: NostrSend } => {
|
||||
const log = getLogger({})
|
||||
const nostrTransport = NewNostrTransport(serverMethods, {
|
||||
NostrUserAuthGuard: async (appId, pub) => {
|
||||
const app = await mainHandler.storage.applicationStorage.GetApplication(appId || "")
|
||||
let nostrUser = await mainHandler.storage.applicationStorage.GetOrCreateNostrAppUser(app, pub || "")
|
||||
return { user_id: nostrUser.user.user_id, app_user_id: nostrUser.identifier, app_id: appId || "" }
|
||||
},
|
||||
metricsCallback: metrics => mainHandler.settings.recordPerformance ? mainHandler.metricsManager.AddMetrics(metrics) : null
|
||||
metricsCallback: metrics => mainHandler.settings.recordPerformance ? mainHandler.metricsManager.AddMetrics(metrics) : null,
|
||||
logger: { log, error: err => log("ERROR", err) }
|
||||
})
|
||||
const nostr = new Nostr(nostrSettings, event => {
|
||||
let j: NostrRequest
|
||||
try {
|
||||
j = JSON.parse(event.content)
|
||||
log("nostr event", j.rpcName || 'no rpc name')
|
||||
} catch {
|
||||
console.error("invalid json event received", event.content)
|
||||
log("ERROR", "invalid json event received", event.content)
|
||||
return
|
||||
}
|
||||
nostrTransport({ ...j, appId: event.appId }, res => {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import * as Types from '../../../proto/autogenerated/ts/types.js'
|
||||
import { GetInfoResponse, NewAddressResponse, AddInvoiceResponse, PayReq, Payment, SendCoinsResponse, EstimateFeeResponse, TransactionDetails, ClosedChannelsResponse, ListChannelsResponse, PendingChannelsResponse } from '../../../proto/lnd/lightning.js'
|
||||
import { GetInfoResponse, NewAddressResponse, AddInvoiceResponse, PayReq, Payment, SendCoinsResponse, EstimateFeeResponse, TransactionDetails, ClosedChannelsResponse, ListChannelsResponse, PendingChannelsResponse, ListInvoiceResponse, ListPaymentsResponse } from '../../../proto/lnd/lightning.js'
|
||||
import { EnvMustBeNonEmptyString, EnvMustBeInteger, EnvCanBeBoolean } from '../helpers/envParser.js'
|
||||
import { AddressPaidCb, BalanceInfo, DecodedInvoice, HtlcCb, Invoice, InvoicePaidCb, LndSettings, NewBlockCb, NodeInfo, PaidInvoice } from './settings.js'
|
||||
import LND from './lnd.js'
|
||||
|
|
@ -36,6 +36,8 @@ export interface LightningHandler {
|
|||
ListChannels(): Promise<ListChannelsResponse>
|
||||
ListPendingChannels(): Promise<PendingChannelsResponse>
|
||||
GetForwardingHistory(indexOffset: number): Promise<{ fee: number, chanIdIn: string, chanIdOut: string, timestampNs: number, offset: number }[]>
|
||||
GetAllPaidInvoices(max: number): Promise<ListInvoiceResponse>
|
||||
GetAllPayments(max: number): Promise<ListPaymentsResponse>
|
||||
}
|
||||
|
||||
export default (settings: LndSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb, newBlockCb: NewBlockCb, htlcCb: HtlcCb): LightningHandler => {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ export default class {
|
|||
invoicePaidCb: InvoicePaidCb
|
||||
newBlockCb: NewBlockCb
|
||||
htlcCb: HtlcCb
|
||||
log = getLogger({})
|
||||
log = getLogger({ appName: 'lndManager' })
|
||||
constructor(settings: LndSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb, newBlockCb: NewBlockCb, htlcCb: HtlcCb) {
|
||||
this.settings = settings
|
||||
this.addressPaidCb = addressPaidCb
|
||||
|
|
@ -167,7 +167,7 @@ export default class {
|
|||
if (tx.numConfirmations === 0) { // only process pending transactions, confirmed transaction are processed by the newBlock CB
|
||||
tx.outputDetails.forEach(output => {
|
||||
if (output.isOurAddress) {
|
||||
this.log("received chan TX", Number(output.amount), "sats")
|
||||
this.log("received chan TX", Number(output.amount), "sats", "for", output.address)
|
||||
this.addressPaidCb({ hash: tx.txHash, index: Number(output.outputIndex) }, output.address, Number(output.amount), false)
|
||||
}
|
||||
})
|
||||
|
|
@ -203,6 +203,7 @@ export default class {
|
|||
}
|
||||
|
||||
async NewAddress(addressType: Types.AddressType): Promise<NewAddressResponse> {
|
||||
this.log("generating new address")
|
||||
await this.Health()
|
||||
let lndAddressType: AddressType
|
||||
switch (addressType) {
|
||||
|
|
@ -219,21 +220,21 @@ export default class {
|
|||
throw new Error("unknown address type " + addressType)
|
||||
}
|
||||
const res = await this.lightning.newAddress({ account: "", type: lndAddressType }, DeadLineMetadata())
|
||||
this.log("new address", res.response.address)
|
||||
return res.response
|
||||
}
|
||||
|
||||
async NewInvoice(value: number, memo: string, expiry: number): Promise<Invoice> {
|
||||
getLogger({})("adding invoice...")
|
||||
this.log("generating new invoice for", value, "sats")
|
||||
await this.Health()
|
||||
getLogger({})("lnd healthy")
|
||||
const res = await this.lightning.addInvoice(AddInvoiceReq(value, expiry, false, memo), DeadLineMetadata())
|
||||
getLogger({})("got the invoice")
|
||||
this.log("new invoice", res.response.paymentRequest)
|
||||
return { payRequest: res.response.paymentRequest }
|
||||
}
|
||||
|
||||
async DecodeInvoice(paymentRequest: string): Promise<DecodedInvoice> {
|
||||
const res = await this.lightning.decodePayReq({ payReq: paymentRequest }, DeadLineMetadata())
|
||||
return { numSatoshis: Number(res.response.numSatoshis) }
|
||||
return { numSatoshis: Number(res.response.numSatoshis), paymentHash: res.response.paymentHash }
|
||||
}
|
||||
|
||||
GetFeeLimitAmount(amount: number): number {
|
||||
|
|
@ -251,11 +252,13 @@ export default class {
|
|||
}
|
||||
async PayInvoice(invoice: string, amount: number, feeLimit: number): Promise<PaidInvoice> {
|
||||
await this.Health()
|
||||
this.log("paying invoice", invoice, "for", amount, "sats")
|
||||
const abortController = new AbortController()
|
||||
const req = PayInvoiceReq(invoice, amount, feeLimit)
|
||||
const stream = this.router.sendPaymentV2(req, { abort: abortController.signal })
|
||||
return new Promise((res, rej) => {
|
||||
stream.responses.onError(error => {
|
||||
this.log("invoice payment failed", error)
|
||||
rej(error)
|
||||
})
|
||||
stream.responses.onMessage(payment => {
|
||||
|
|
@ -285,8 +288,9 @@ export default class {
|
|||
|
||||
async PayAddress(address: string, amount: number, satPerVByte: number, label = ""): Promise<SendCoinsResponse> {
|
||||
await this.Health()
|
||||
this.log("sending chain TX for", amount, "sats", "to", address)
|
||||
const res = await this.lightning.sendCoins(SendCoinsReq(address, amount, satPerVByte, label), DeadLineMetadata())
|
||||
this.log("sent chain TX for", amount, "sats")
|
||||
this.log("sent chain TX for", amount, "sats", "to", address)
|
||||
return res.response
|
||||
}
|
||||
|
||||
|
|
@ -315,6 +319,15 @@ export default class {
|
|||
return response.forwardingEvents.map(e => ({ fee: Number(e.fee), chanIdIn: e.chanIdIn, chanIdOut: e.chanIdOut, timestampNs: Number(e.timestampNs), offset: response.lastOffsetIndex }))
|
||||
}
|
||||
|
||||
async GetAllPaidInvoices(max: number) {
|
||||
const res = await this.lightning.listInvoices({ indexOffset: 0n, numMaxInvoices: BigInt(max), pendingOnly: false, reversed: true }, DeadLineMetadata())
|
||||
return res.response
|
||||
}
|
||||
async GetAllPayments(max: number) {
|
||||
const res = await this.lightning.listPayments({ countTotalPayments: false, includeIncomplete: false, indexOffset: 0n, maxPayments: BigInt(max), reversed: true })
|
||||
return res.response
|
||||
}
|
||||
|
||||
async OpenChannel(destination: string, closeAddress: string, fundingAmount: number, pushSats: number): Promise<string> {
|
||||
await this.Health()
|
||||
const abortController = new AbortController()
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import * as Types from '../../../proto/autogenerated/ts/types.js'
|
|||
import { LightningClient } from '../../../proto/lnd/lightning.client.js'
|
||||
import { InvoicesClient } from '../../../proto/lnd/invoices.client.js'
|
||||
import { RouterClient } from '../../../proto/lnd/router.client.js'
|
||||
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, TransactionDetails, ClosedChannelsResponse, ListChannelsResponse, PendingChannelsResponse } from '../../../proto/lnd/lightning.js'
|
||||
import { GetInfoResponse, AddressType, NewAddressResponse, AddInvoiceResponse, Invoice_InvoiceState, PayReq, Payment_PaymentStatus, Payment, PaymentFailureReason, SendCoinsResponse, EstimateFeeResponse, TransactionDetails, ClosedChannelsResponse, ListChannelsResponse, PendingChannelsResponse, ListInvoiceResponse, ListPaymentsResponse } from '../../../proto/lnd/lightning.js'
|
||||
import { OpenChannelReq } from './openChannelReq.js';
|
||||
import { AddInvoiceReq } from './addInvoiceReq.js';
|
||||
import { PayInvoiceReq } from './payInvoiceReq.js';
|
||||
|
|
@ -63,13 +63,13 @@ export default class {
|
|||
async DecodeInvoice(paymentRequest: string): Promise<DecodedInvoice> {
|
||||
if (paymentRequest.startsWith('lnbcrtmockout')) {
|
||||
const amt = this.decodeOutboundInvoice(paymentRequest)
|
||||
return { numSatoshis: amt }
|
||||
return { numSatoshis: amt, paymentHash: paymentRequest }
|
||||
}
|
||||
const i = this.invoicesAwaiting[paymentRequest]
|
||||
if (!i) {
|
||||
throw new Error("invoice not found")
|
||||
}
|
||||
return { numSatoshis: i.value }
|
||||
return { numSatoshis: i.value, paymentHash: paymentRequest }
|
||||
}
|
||||
|
||||
GetFeeLimitAmount(amount: number): number {
|
||||
|
|
@ -124,6 +124,13 @@ export default class {
|
|||
GetBalance(): Promise<BalanceInfo> {
|
||||
throw new Error("GetBalance disabled in mock mode")
|
||||
}
|
||||
|
||||
async GetAllPaidInvoices(max: number): Promise<ListInvoiceResponse> {
|
||||
throw new Error("not implemented")
|
||||
}
|
||||
async GetAllPayments(max: number): Promise<ListPaymentsResponse> {
|
||||
throw new Error("not implemented")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ export type Invoice = {
|
|||
}
|
||||
export type DecodedInvoice = {
|
||||
numSatoshis: number
|
||||
paymentHash: string
|
||||
}
|
||||
export type PaidInvoice = {
|
||||
feeSat: number
|
||||
|
|
|
|||
|
|
@ -217,4 +217,85 @@ export default class {
|
|||
log({ unsigned: event })
|
||||
this.nostrSend(invoice.linkedApplication.app_id, { type: 'event', event })
|
||||
}
|
||||
|
||||
async VerifyEventsLog() {
|
||||
const events = await this.storage.eventsLog.GetAllLogs()
|
||||
const invoices = await this.lnd.GetAllPaidInvoices(1000)
|
||||
const payments = await this.lnd.GetAllPayments(1000)
|
||||
const incrementSources: Record<string, boolean> = {}
|
||||
const decrementSources: Record<string, boolean> = {}
|
||||
|
||||
const users: Record<string, { ts: number, updatedBalance: number }> = {}
|
||||
for (let i = 0; i < events.length; i++) {
|
||||
const e = events[i]
|
||||
if (e.type === 'balance_decrement') {
|
||||
users[e.userId] = this.checkUserEntry(e, users[e.userId])
|
||||
if (LN_INVOICE_REGEX.test(e.data)) {
|
||||
if (decrementSources[e.data]) {
|
||||
throw new Error("payment decremented more that once " + e.data)
|
||||
}
|
||||
decrementSources[e.data] = true
|
||||
const paymentEntry = await this.storage.paymentStorage.GetPaymentOwner(e.data)
|
||||
if (!paymentEntry) {
|
||||
throw new Error("payment entry not found for " + e.data)
|
||||
}
|
||||
if (paymentEntry.paid_at_unix === 0) {
|
||||
throw new Error("payment was never paid " + e.data)
|
||||
}
|
||||
if (!paymentEntry.internal) {
|
||||
const entry = payments.payments.find(i => i.paymentRequest === e.data)
|
||||
if (!entry) {
|
||||
throw new Error("payment not found in lnd " + e.data)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (e.type === 'balance_increment') {
|
||||
users[e.userId] = this.checkUserEntry(e, users[e.userId])
|
||||
if (LN_INVOICE_REGEX.test(e.data)) {
|
||||
if (incrementSources[e.data]) {
|
||||
throw new Error("invoice incremented more that once " + e.data)
|
||||
}
|
||||
incrementSources[e.data] = true
|
||||
const invoiceEntry = await this.storage.paymentStorage.GetInvoiceOwner(e.data)
|
||||
if (!invoiceEntry) {
|
||||
throw new Error("invoice entry not found for " + e.data)
|
||||
}
|
||||
if (invoiceEntry.paid_at_unix === 0) {
|
||||
throw new Error("invoice was never paid " + e.data)
|
||||
}
|
||||
if (!invoiceEntry.internal) {
|
||||
const entry = invoices.invoices.find(i => i.paymentRequest === e.data)
|
||||
if (!entry) {
|
||||
throw new Error("invoice not found in lnd " + e.data)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
await this.storage.paymentStorage.VerifyDbEvent(e)
|
||||
}
|
||||
}
|
||||
await Promise.all(Object.entries(users).map(async ([userId, u]) => {
|
||||
const user = await this.storage.userStorage.GetUser(userId)
|
||||
if (user.balance_sats !== u.updatedBalance) {
|
||||
throw new Error("sanity check on balance failed, expected: " + u.updatedBalance + " found: " + user.balance_sats)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
checkUserEntry(e: LoggedEvent, u: { ts: number, updatedBalance: number } | undefined) {
|
||||
const newEntry = { ts: e.timestampMs, updatedBalance: e.balance + e.amount * (e.type === 'balance_decrement' ? -1 : 1) }
|
||||
if (!u) {
|
||||
return newEntry
|
||||
}
|
||||
if (e.timestampMs < u.ts) {
|
||||
throw new Error("entry out of order " + e.timestampMs + " " + u.ts)
|
||||
}
|
||||
if (e.balance !== u.updatedBalance) {
|
||||
throw new Error("inconsistent balance update got: " + e.balance + " expected " + u.updatedBalance)
|
||||
}
|
||||
return newEntry
|
||||
}
|
||||
}
|
||||
|
||||
const LN_INVOICE_REGEX = /^(lightning:)?(lnbc|lntb)[0-9a-zA-Z]+$/;
|
||||
|
|
@ -44,6 +44,7 @@ export default class {
|
|||
lnd: LightningHandler
|
||||
addressPaidCb: AddressPaidCb
|
||||
invoicePaidCb: InvoicePaidCb
|
||||
log = getLogger({ appName: "PaymentManager" })
|
||||
constructor(storage: Storage, lnd: LightningHandler, settings: MainSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb) {
|
||||
this.storage = storage
|
||||
this.settings = settings
|
||||
|
|
@ -117,20 +118,6 @@ export default class {
|
|||
}
|
||||
}
|
||||
|
||||
async lockUserWithMinBalance(userId: string, minBalance: number) {
|
||||
return this.storage.StartTransaction(async tx => {
|
||||
const user = await this.storage.userStorage.GetUser(userId, tx)
|
||||
if (user.locked) {
|
||||
throw new Error("user is already withdrawing")
|
||||
}
|
||||
if (user.balance_sats < minBalance) {
|
||||
throw new Error("insufficient balance")
|
||||
}
|
||||
// this call will fail if the user is already locked
|
||||
await this.storage.userStorage.LockUser(userId, tx)
|
||||
})
|
||||
}
|
||||
|
||||
GetMaxPayableInvoice(balance: number, appUser: boolean): number {
|
||||
let maxWithinServiceFee = 0
|
||||
if (appUser) {
|
||||
|
|
@ -148,6 +135,7 @@ export default class {
|
|||
}
|
||||
|
||||
async PayInvoice(userId: string, req: Types.PayInvoiceRequest, linkedApplication: Application): Promise<Types.PayInvoiceResponse> {
|
||||
this.log("paying invoice", req.invoice, "for user", userId, "with amount", req.amount)
|
||||
const decoded = await this.lnd.DecodeInvoice(req.invoice)
|
||||
if (decoded.numSatoshis !== 0 && req.amount !== 0) {
|
||||
throw new Error("invoice has value, do not provide amount the the request")
|
||||
|
|
@ -162,17 +150,23 @@ export default class {
|
|||
const internalInvoice = await this.storage.paymentStorage.GetInvoiceOwner(req.invoice)
|
||||
let payment: PaidInvoice | null = null
|
||||
if (!internalInvoice) {
|
||||
if (this.settings.disableExternalPayments) {
|
||||
throw new Error("something went wrong sending payment, please try again later")
|
||||
}
|
||||
this.log("paying external invoice", req.invoice)
|
||||
const routingFeeLimit = this.lnd.GetFeeLimitAmount(payAmount)
|
||||
await this.lockUserWithMinBalance(userId, totalAmountToDecrement + routingFeeLimit)
|
||||
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, req.invoice)
|
||||
try {
|
||||
payment = await this.lnd.PayInvoice(req.invoice, req.amount, routingFeeLimit)
|
||||
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + payment.feeSat, req.invoice)
|
||||
await this.storage.userStorage.UnlockUser(userId)
|
||||
if (routingFeeLimit - payment.feeSat > 0) {
|
||||
await this.storage.userStorage.IncrementUserBalance(userId, routingFeeLimit - payment.feeSat, "routing_fee_refund")
|
||||
}
|
||||
} catch (err) {
|
||||
await this.storage.userStorage.UnlockUser(userId)
|
||||
await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, "payment_refund")
|
||||
throw err
|
||||
}
|
||||
} else {
|
||||
this.log("paying internal invoice", req.invoice)
|
||||
if (internalInvoice.paid_at_unix > 0) {
|
||||
throw new Error("this invoice was already paid")
|
||||
}
|
||||
|
|
@ -197,6 +191,7 @@ export default class {
|
|||
|
||||
|
||||
async PayAddress(ctx: Types.UserContext, req: Types.PayAddressRequest): Promise<Types.PayAddressResponse> {
|
||||
throw new Error("address payment currently disabled, use Lightning instead")
|
||||
const { blockHeight } = await this.lnd.GetInfo()
|
||||
const app = await this.storage.applicationStorage.GetApplication(ctx.app_id)
|
||||
const serviceFee = this.getServiceFee(Types.UserOperationType.OUTGOING_TX, req.amoutSats, false)
|
||||
|
|
@ -205,21 +200,21 @@ export default class {
|
|||
let txId = ""
|
||||
let chainFees = 0
|
||||
if (!internalAddress) {
|
||||
this.log("paying external address")
|
||||
const estimate = await this.lnd.EstimateChainFees(req.address, req.amoutSats, 1)
|
||||
const vBytes = Math.ceil(Number(estimate.feeSat / estimate.satPerVbyte))
|
||||
chainFees = vBytes * req.satsPerVByte
|
||||
const total = req.amoutSats + chainFees
|
||||
await this.lockUserWithMinBalance(ctx.user_id, total + serviceFee)
|
||||
this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee, req.address)
|
||||
try {
|
||||
const payment = await this.lnd.PayAddress(req.address, req.amoutSats, req.satsPerVByte)
|
||||
txId = payment.txid
|
||||
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee, req.address)
|
||||
await this.storage.userStorage.UnlockUser(ctx.user_id)
|
||||
} catch (err) {
|
||||
await this.storage.userStorage.UnlockUser(ctx.user_id)
|
||||
await this.storage.userStorage.IncrementUserBalance(ctx.user_id, total + serviceFee, req.address)
|
||||
throw err
|
||||
}
|
||||
} else {
|
||||
this.log("paying internal address")
|
||||
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee, req.address)
|
||||
txId = crypto.randomBytes(32).toString("hex")
|
||||
this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true)
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ export type MainSettings = {
|
|||
servicePort: number
|
||||
recordPerformance: boolean
|
||||
skipSanityCheck: boolean
|
||||
disableExternalPayments: boolean
|
||||
}
|
||||
|
||||
export const LoadMainSettingsFromEnv = (): MainSettings => {
|
||||
return {
|
||||
lndSettings: LoadLndSettingsFromEnv(),
|
||||
|
|
@ -37,6 +37,7 @@ export const LoadMainSettingsFromEnv = (): MainSettings => {
|
|||
servicePort: EnvMustBeInteger("PORT"),
|
||||
recordPerformance: process.env.RECORD_PERFORMANCE === 'true' || false,
|
||||
skipSanityCheck: process.env.SKIP_SANITY_CHECK === 'true' || false,
|
||||
disableExternalPayments: process.env.DISABLE_EXTERNAL_PAYMENTS === 'true' || false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ export default class HtlcTracker {
|
|||
return this.incrementReceiveFailures(incomingChannelId)
|
||||
}
|
||||
}
|
||||
this.log("unknown htlc event type for failure event")
|
||||
this.log("unknown htlc event type for failure event", eventType)
|
||||
}
|
||||
|
||||
handleSuccess = ({ eventType, outgoingHtlcId, incomingHtlcId }: EventInfo) => {
|
||||
|
|
@ -104,7 +104,7 @@ export default class HtlcTracker {
|
|||
if (this.deleteMapEntry(incomingHtlcId, this.pendingReceiveHtlcs) !== null) return
|
||||
if (this.deleteMapEntry(outgoingHtlcId, this.pendingForwardHtlcs) !== null) return
|
||||
} else {
|
||||
this.log("unknown htlc event type for success event")
|
||||
this.log("unknown htlc event type for success event", eventType)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,12 @@ export type LoggedEvent = {
|
|||
data: string
|
||||
amount: number
|
||||
}
|
||||
type TimeEntry = {
|
||||
timestamp: number
|
||||
amount: number
|
||||
balance: number
|
||||
userId: string
|
||||
}
|
||||
const columns = ["timestampMs", "userId", "appUserId", "appId", "balance", "type", "data", "amount"]
|
||||
type StringerWrite = (chunk: any, cb: (error: Error | null | undefined) => void) => boolean
|
||||
export default class EventsLogManager {
|
||||
|
|
@ -38,20 +44,21 @@ export default class EventsLogManager {
|
|||
this.write([Date.now(), e.userId, e.appUserId, e.appId, e.balance, e.type, e.data, e.amount])
|
||||
}
|
||||
|
||||
GetAllLogs = async (): Promise<LoggedEvent[]> => {
|
||||
const logs = await this.Read()
|
||||
GetAllLogs = async (path?: string): Promise<LoggedEvent[]> => {
|
||||
const logs = await this.Read(path)
|
||||
this.log("found", logs.length, "event logs")
|
||||
return logs
|
||||
}
|
||||
|
||||
Read = async (): Promise<LoggedEvent[]> => {
|
||||
const exists = fs.existsSync(eventLogPath)
|
||||
Read = async (path?: string): Promise<LoggedEvent[]> => {
|
||||
const filePath = path ? path : eventLogPath
|
||||
const exists = fs.existsSync(filePath)
|
||||
if (!exists) {
|
||||
return []
|
||||
}
|
||||
return new Promise<LoggedEvent[]>((res, rej) => {
|
||||
const result: LoggedEvent[] = []
|
||||
fs.createReadStream(eventLogPath)
|
||||
fs.createReadStream(filePath)
|
||||
.pipe(parse({ delimiter: ",", from_line: 2 }))
|
||||
.on('data', data => { result.push(this.parseEvent(data)) })
|
||||
.on('error', err => { rej(err) })
|
||||
|
|
@ -73,4 +80,44 @@ export default class EventsLogManager {
|
|||
})
|
||||
})
|
||||
}
|
||||
|
||||
ignoredKeys = ['fees', "bc1qkafgye62h2zhzlwtrga6jytz2p7af4lg8fwqt6", "6eb1d279f95377b8514aad3b79ff1cddbe9f5d3b95653b55719850df9df63821", "b11585413bfa7bf65a5f1263e3100e53b4c9afe6b5d8c94c6b85017dfcbf3d49"]
|
||||
createTimeSeries = (events: LoggedEvent[]) => {
|
||||
const dataAppIds: Record<string, string> = {}
|
||||
const order: { timestamp: number, data: string, type: 'inc' | 'dec' }[] = []
|
||||
const incrementEntries: Record<string, TimeEntry> = {}
|
||||
const decrementEntries: Record<string, TimeEntry> = {}
|
||||
events.forEach(e => {
|
||||
if (this.ignoredKeys.includes(e.data)) {
|
||||
return
|
||||
}
|
||||
if (e.type === 'balance_increment') {
|
||||
if (incrementEntries[e.data]) {
|
||||
throw new Error("increment duplicate! " + e.data)
|
||||
}
|
||||
incrementEntries[e.data] = { timestamp: e.timestampMs, balance: e.balance, amount: e.amount, userId: e.userId }
|
||||
order.push({ timestamp: e.timestampMs, data: e.data, type: 'inc' })
|
||||
} else if (e.type === 'balance_decrement') {
|
||||
if (decrementEntries[e.data]) {
|
||||
throw new Error("decrement duplicate! " + e.data)
|
||||
}
|
||||
decrementEntries[e.data] = { timestamp: e.timestampMs, balance: e.balance, amount: e.amount, userId: e.userId }
|
||||
order.push({ timestamp: e.timestampMs, data: e.data, type: 'dec' })
|
||||
} else if (e.appId) {
|
||||
dataAppIds[e.data] = e.appId
|
||||
}
|
||||
})
|
||||
const full = order.map(o => {
|
||||
const { type } = o
|
||||
if (type === 'inc') {
|
||||
const entry = incrementEntries[o.data]
|
||||
return { timestamp: entry.timestamp, amount: entry.amount, balance: entry.balance, userId: entry.userId, appId: dataAppIds[o.data], internal: !!decrementEntries[o.data] }
|
||||
} else {
|
||||
const entry = decrementEntries[o.data]
|
||||
return { timestamp: entry.timestamp, amount: -entry.amount, balance: entry.balance, userId: entry.userId, appId: dataAppIds[o.data], internal: !!incrementEntries[o.data] }
|
||||
}
|
||||
})
|
||||
full.sort((a, b) => a.timestamp - b.timestamp)
|
||||
fs.writeFileSync("timeSeries.json", JSON.stringify(full, null, 2))
|
||||
}
|
||||
}
|
||||
|
|
@ -6,14 +6,13 @@ import UserStorage from "./userStorage.js";
|
|||
import PaymentStorage from "./paymentStorage.js";
|
||||
import MetricsStorage from "./metricsStorage.js";
|
||||
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
||||
import EventsLogManager, { LoggedEvent } from "./eventsLog.js";
|
||||
import EventsLogManager from "./eventsLog.js";
|
||||
export type StorageSettings = {
|
||||
dbSettings: DbSettings
|
||||
}
|
||||
export const LoadStorageSettingsFromEnv = (): StorageSettings => {
|
||||
return { dbSettings: LoadDbSettingsFromEnv() }
|
||||
}
|
||||
|
||||
export default class {
|
||||
DB: DataSource | EntityManager
|
||||
settings: StorageSettings
|
||||
|
|
@ -41,40 +40,6 @@ export default class {
|
|||
return { executedMigrations, executedMetricsMigrations };
|
||||
}
|
||||
|
||||
async VerifyEventsLog() {
|
||||
const events = await this.eventsLog.GetAllLogs()
|
||||
|
||||
const users: Record<string, { ts: number, updatedBalance: number }> = {}
|
||||
for (let i = 0; i < events.length; i++) {
|
||||
const e = events[i]
|
||||
if (e.type === 'balance_decrement' || e.type === 'balance_increment') {
|
||||
users[e.userId] = this.checkUserEntry(e, users[e.userId])
|
||||
} else {
|
||||
await this.paymentStorage.VerifyDbEvent(e)
|
||||
}
|
||||
}
|
||||
await Promise.all(Object.entries(users).map(async ([userId, u]) => {
|
||||
const user = await this.userStorage.GetUser(userId)
|
||||
if (user.balance_sats !== u.updatedBalance) {
|
||||
throw new Error("sanity check on balance failed, expected: " + u.updatedBalance + " found: " + user.balance_sats)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
checkUserEntry(e: LoggedEvent, u: { ts: number, updatedBalance: number } | undefined) {
|
||||
const newEntry = { ts: e.timestampMs, updatedBalance: e.balance + e.amount * (e.type === 'balance_decrement' ? -1 : 1) }
|
||||
if (!u) {
|
||||
return newEntry
|
||||
}
|
||||
if (e.timestampMs < u.ts) {
|
||||
throw new Error("entry out of order " + e.timestampMs + " " + u.ts)
|
||||
}
|
||||
if (e.balance !== u.updatedBalance) {
|
||||
throw new Error("inconsistent balance update got: " + e.balance + " expected " + u.updatedBalance)
|
||||
}
|
||||
return newEntry
|
||||
}
|
||||
|
||||
StartTransaction(exec: TX<void>, description?: string) {
|
||||
return this.txQueue.PushToQueue({ exec, dbTx: true, description })
|
||||
}
|
||||
|
|
|
|||
|
|
@ -121,6 +121,13 @@ export default class {
|
|||
}
|
||||
})
|
||||
}
|
||||
async GetPaymentOwner(paymentRequest: string, entityManager = this.DB): Promise<UserInvoicePayment | null> {
|
||||
return entityManager.getRepository(UserInvoicePayment).findOne({
|
||||
where: {
|
||||
invoice: paymentRequest
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async AddUserInvoicePayment(userId: string, invoice: string, amount: number, routingFees: number, serviceFees: number, internal: boolean, linkedApplication: Application): Promise<UserInvoicePayment> {
|
||||
const newPayment = this.DB.getRepository(UserInvoicePayment).create({
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ export default class {
|
|||
user_id: userId,
|
||||
}, "balance_sats", increment)
|
||||
if (!res.affected) {
|
||||
getLogger({ userId: userId, appName: "balanceUpdates" })("user unaffected by increment")
|
||||
throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
|
||||
|
|
@ -83,12 +84,14 @@ export default class {
|
|||
async DecrementUserBalance(userId: string, decrement: number, reason: string, entityManager = this.DB) {
|
||||
const user = await this.GetUser(userId, entityManager)
|
||||
if (!user || user.balance_sats < decrement) {
|
||||
getLogger({ userId: userId, appName: "balanceUpdates" })("user to decrement not found")
|
||||
throw new Error("not enough balance to decrement")
|
||||
}
|
||||
const res = await entityManager.getRepository(User).decrement({
|
||||
user_id: userId,
|
||||
}, "balance_sats", decrement)
|
||||
if (!res.affected) {
|
||||
getLogger({ userId: userId, appName: "balanceUpdates" })("user unaffected by decrement")
|
||||
throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue