diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 9c7488df..3c8a3af6 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -1,4 +1,3 @@ -import crypto from 'crypto' import fetch from "node-fetch" import Storage, { LoadStorageSettingsFromEnv } from '../storage/index.js' import * as Types from '../../../proto/autogenerated/ts/types.js' @@ -219,85 +218,4 @@ export default class { log({ unsigned: event }) this.nostrSend(invoice.linkedApplication.app_id, { type: 'event', event }) } - - async VerifyEventsLog() { - const events = await this.storage.eventsLog.GetAllLogs() - const invoices = await this.lnd.GetAllPaidInvoices(1000) - const payments = await this.lnd.GetAllPayments(1000) - const incrementSources: Record = {} - const decrementSources: Record = {} - - const users: Record = {} - for (let i = 0; i < events.length; i++) { - const e = events[i] - if (e.type === 'balance_decrement') { - users[e.userId] = this.checkUserEntry(e, users[e.userId]) - if (LN_INVOICE_REGEX.test(e.data)) { - if (decrementSources[e.data]) { - throw new Error("payment decremented more that once " + e.data) - } - decrementSources[e.data] = true - const paymentEntry = await this.storage.paymentStorage.GetPaymentOwner(e.data) - if (!paymentEntry) { - throw new Error("payment entry not found for " + e.data) - } - if (paymentEntry.paid_at_unix === 0) { - throw new Error("payment was never paid " + e.data) - } - if (!paymentEntry.internal) { - const entry = payments.payments.find(i => i.paymentRequest === e.data) - if (!entry) { - throw new Error("payment not found in lnd " + e.data) - } - } - } - } else if (e.type === 'balance_increment') { - users[e.userId] = this.checkUserEntry(e, users[e.userId]) - if (LN_INVOICE_REGEX.test(e.data)) { - if (incrementSources[e.data]) { - throw new Error("invoice incremented more that once " + e.data) - } - incrementSources[e.data] = true - const invoiceEntry = await this.storage.paymentStorage.GetInvoiceOwner(e.data) - if (!invoiceEntry) { - throw new Error("invoice entry not found for " + e.data) - } - if (invoiceEntry.paid_at_unix === 0) { - throw new Error("invoice was never paid " + e.data) - } - if (!invoiceEntry.internal) { - const entry = invoices.invoices.find(i => i.paymentRequest === e.data) - if (!entry) { - throw new Error("invoice not found in lnd " + e.data) - } - } - - } - } else { - await this.storage.paymentStorage.VerifyDbEvent(e) - } - } - await Promise.all(Object.entries(users).map(async ([userId, u]) => { - const user = await this.storage.userStorage.GetUser(userId) - if (user.balance_sats !== u.updatedBalance) { - throw new Error("sanity check on balance failed, expected: " + u.updatedBalance + " found: " + user.balance_sats) - } - })) - } - - checkUserEntry(e: LoggedEvent, u: { ts: number, updatedBalance: number } | undefined) { - const newEntry = { ts: e.timestampMs, updatedBalance: e.balance + e.amount * (e.type === 'balance_decrement' ? -1 : 1) } - if (!u) { - return newEntry - } - if (e.timestampMs < u.ts) { - throw new Error("entry out of order " + e.timestampMs + " " + u.ts) - } - if (e.balance !== u.updatedBalance) { - throw new Error("inconsistent balance update got: " + e.balance + " expected " + u.updatedBalance) - } - return newEntry - } } - -const LN_INVOICE_REGEX = /^(lightning:)?(lnbc|lntb)[0-9a-zA-Z]+$/; \ No newline at end of file diff --git a/src/services/main/init.ts b/src/services/main/init.ts index 419a3f9f..71bcbe4f 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -2,6 +2,7 @@ import { PubLogger, getLogger } from "../helpers/logger.js" import Storage from "../storage/index.js" import { TypeOrmMigrationRunner } from "../storage/migrations/runner.js" import Main from "./index.js" +import SanityChecker from "./sanityChecker.js" import { MainSettings } from "./settings.js" export type AppData = { privateKey: string; @@ -18,10 +19,10 @@ export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings const mainHandler = new Main(mainSettings, storageManager) await mainHandler.lnd.Warmup() if (!mainSettings.skipSanityCheck) { - await mainHandler.VerifyEventsLog() + const sanityChecker = new SanityChecker(storageManager, mainHandler.lnd) + await sanityChecker.VerifyEventsLog() } - const totalUsersBalance = await mainHandler.storage.paymentStorage.GetTotalUsersBalance() - await mainHandler.paymentManager.watchDog.SeedLndBalance(totalUsersBalance || 0) + await mainHandler.paymentManager.watchDog.Start() const appsData = await mainHandler.storage.applicationStorage.GetApplications() const existingWalletApp = await appsData.find(app => app.name === 'wallet' || app.name === 'wallet-test') if (!existingWalletApp) { diff --git a/src/services/main/paymentManager.ts b/src/services/main/paymentManager.ts index c8335368..6e29c4a9 100644 --- a/src/services/main/paymentManager.ts +++ b/src/services/main/paymentManager.ts @@ -14,7 +14,7 @@ import { SendCoinsResponse } from '../../../proto/lnd/lightning.js' import { Event, verifiedSymbol, verifySignature } from '../nostr/tools/event.js' import { AddressReceivingTransaction } from '../storage/entity/AddressReceivingTransaction.js' import { UserTransactionPayment } from '../storage/entity/UserTransactionPayment.js' -import { Watchdog } from '../lnd/watchdog.js' +import { Watchdog } from './watchdog.js' interface UserOperationInfo { serial_id: number paid_amount: number @@ -39,7 +39,6 @@ const defaultLnurlPayMetadata = `[["text/plain", "lnurl pay to Lightning.pub"]]` const confInOne = 1000 * 1000 const confInTwo = 100 * 1000 * 1000 export default class { - storage: Storage settings: MainSettings lnd: LightningHandler @@ -51,7 +50,7 @@ export default class { this.storage = storage this.settings = settings this.lnd = lnd - this.watchDog = new Watchdog(settings.watchDogSettings, lnd) + this.watchDog = new Watchdog(settings.watchDogSettings, lnd, storage) this.addressPaidCb = addressPaidCb this.invoicePaidCb = invoicePaidCb } @@ -143,14 +142,9 @@ export default class { } } - async WatchdogCheck() { - const total = await this.storage.paymentStorage.GetTotalUsersBalance() - await this.watchDog.PaymentRequested(total || 0) - } - async PayInvoice(userId: string, req: Types.PayInvoiceRequest, linkedApplication: Application): Promise { this.log("paying invoice", req.invoice, "for user", userId, "with amount", req.amount) - await this.WatchdogCheck() + await this.watchDog.PaymentRequested() const maybeBanned = await this.storage.userStorage.GetUser(userId) if (maybeBanned.locked) { throw new Error("user is banned, cannot send payment") @@ -167,51 +161,69 @@ export default class { const serviceFee = this.getServiceFee(Types.UserOperationType.OUTGOING_INVOICE, payAmount, isAppUserPayment) const totalAmountToDecrement = payAmount + serviceFee const internalInvoice = await this.storage.paymentStorage.GetInvoiceOwner(req.invoice) - let payment: PaidInvoice | null = null - if (!internalInvoice) { - if (this.settings.disableExternalPayments) { - throw new Error("something went wrong sending payment, please try again later") - } - this.log("paying external invoice", req.invoice) - const routingFeeLimit = this.lnd.GetFeeLimitAmount(payAmount) - await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, req.invoice) - try { - payment = await this.lnd.PayInvoice(req.invoice, req.amount, routingFeeLimit) - if (routingFeeLimit - payment.feeSat > 0) { - await this.storage.userStorage.IncrementUserBalance(userId, routingFeeLimit - payment.feeSat, "routing_fee_refund") - } - } catch (err) { - await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, "payment_refund") - throw err - } + let paymentInfo = { preimage: "", amtPaid: 0, networkFee: 0, serialId: 0 } + if (internalInvoice) { + paymentInfo = await this.PayInternalInvoice(userId, internalInvoice, { payAmount, serviceFee }, linkedApplication) } else { - this.log("paying internal invoice", req.invoice) - if (internalInvoice.paid_at_unix > 0) { - throw new Error("this invoice was already paid") - } - await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement, req.invoice) - this.invoicePaidCb(req.invoice, payAmount, true) + paymentInfo = await this.PayExternalInvoice(userId, req.invoice, { payAmount, serviceFee, amountForLnd: req.amount }, linkedApplication) } if (isAppUserPayment && serviceFee > 0) { await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, serviceFee, "fees") } - const routingFees = payment ? payment.feeSat : 0 - const newPayment = await this.storage.paymentStorage.AddUserInvoicePayment(userId, req.invoice, payAmount, routingFees, serviceFee, !!internalInvoice, linkedApplication) const user = await this.storage.userStorage.GetUser(userId) this.storage.eventsLog.LogEvent({ type: 'invoice_payment', userId, appId: linkedApplication.app_id, appUserId: "", balance: user.balance_sats, data: req.invoice, amount: payAmount }) return { - preimage: payment ? payment.paymentPreimage : "", - amount_paid: payment ? Number(payment.valueSat) : payAmount, - operation_id: `${Types.UserOperationType.OUTGOING_INVOICE}-${newPayment.serial_id}`, - network_fee: routingFees, + preimage: paymentInfo.preimage, + amount_paid: paymentInfo.amtPaid, + operation_id: `${Types.UserOperationType.OUTGOING_INVOICE}-${paymentInfo.serialId}`, + network_fee: paymentInfo.networkFee, service_fee: serviceFee } } + async PayExternalInvoice(userId: string, invoice: string, amounts: { payAmount: number, serviceFee: number, amountForLnd: number }, linkedApplication: Application) { + if (this.settings.disableExternalPayments) { + throw new Error("something went wrong sending payment, please try again later") + } + const { amountForLnd, payAmount, serviceFee } = amounts + const totalAmountToDecrement = payAmount + serviceFee + this.log("paying external invoice", invoice) + const routingFeeLimit = this.lnd.GetFeeLimitAmount(payAmount) + await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, invoice) + const pendingPayment = await this.storage.paymentStorage.AddPendingExternalPayment(userId, invoice, payAmount, linkedApplication) + try { + const payment = await this.lnd.PayInvoice(invoice, amountForLnd, routingFeeLimit) + if (routingFeeLimit - payment.feeSat > 0) { + await this.storage.userStorage.IncrementUserBalance(userId, routingFeeLimit - payment.feeSat, "routing_fee_refund:" + invoice) + } + await this.storage.paymentStorage.UpdateExternalPayment(pendingPayment.serial_id, payment.feeSat, serviceFee, true) + + return { preimage: payment.paymentPreimage, amtPaid: payment.valueSat, networkFee: payment.feeSat, serialId: pendingPayment.serial_id } + + } catch (err) { + await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, "payment_refund:" + invoice) + await this.storage.paymentStorage.UpdateExternalPayment(pendingPayment.serial_id, 0, 0, false) + throw err + } + } + + async PayInternalInvoice(userId: string, internalInvoice: UserReceivingInvoice, amounts: { payAmount: number, serviceFee: number }, linkedApplication: Application) { + this.log("paying internal invoice", internalInvoice.invoice) + if (internalInvoice.paid_at_unix > 0) { + throw new Error("this invoice was already paid") + } + const { payAmount, serviceFee } = amounts + const totalAmountToDecrement = payAmount + serviceFee + await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement, internalInvoice.invoice) + this.invoicePaidCb(internalInvoice.invoice, payAmount, true) + const newPayment = await this.storage.paymentStorage.AddInternalPayment(userId, internalInvoice.invoice, payAmount, serviceFee, linkedApplication) + return { preimage: "", amtPaid: payAmount, networkFee: 0, serialId: newPayment.serial_id } + } + async PayAddress(ctx: Types.UserContext, req: Types.PayAddressRequest): Promise { throw new Error("address payment currently disabled, use Lightning instead") - await this.WatchdogCheck() + await this.watchDog.PaymentRequested() this.log("paying address", req.address, "for user", ctx.user_id, "with amount", req.amoutSats) const maybeBanned = await this.storage.userStorage.GetUser(ctx.user_id) if (maybeBanned.locked) { @@ -230,18 +242,21 @@ export default class { const vBytes = Math.ceil(Number(estimate.feeSat / estimate.satPerVbyte)) chainFees = vBytes * req.satsPerVByte const total = req.amoutSats + chainFees + // WARNING, before re-enabling this, make sure to add the tx_hash to the DecrementUserBalance "reason"!! this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee, req.address) try { const payment = await this.lnd.PayAddress(req.address, req.amoutSats, req.satsPerVByte) txId = payment.txid } catch (err) { + // WARNING, before re-enabling this, make sure to add the tx_hash to the IncrementUserBalance "reason"!! await this.storage.userStorage.IncrementUserBalance(ctx.user_id, total + serviceFee, req.address) throw err } } else { this.log("paying internal address") - await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee, req.address) txId = crypto.randomBytes(32).toString("hex") + const addressData = `${req.address}:${txId}` + await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee, addressData) this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true) } @@ -520,9 +535,10 @@ export default class { const isAppUserPayment = fromUser.user_id !== linkedApplication.owner.user_id let fee = this.getServiceFee(Types.UserOperationType.OUTGOING_USER_TO_USER, amount, isAppUserPayment) const toIncrement = amount - fee - await this.storage.userStorage.DecrementUserBalance(fromUser.user_id, amount, toUserId, tx) - await this.storage.userStorage.IncrementUserBalance(toUser.user_id, toIncrement, fromUserId, tx) - await this.storage.paymentStorage.AddUserToUserPayment(fromUserId, toUserId, amount, fee, linkedApplication, tx) + const paymentEntry = await this.storage.paymentStorage.CreateUserToUserPayment(fromUserId, toUserId, amount, fee, linkedApplication, tx) + await this.storage.userStorage.DecrementUserBalance(fromUser.user_id, amount, `${toUserId}:${paymentEntry.serial_id}`, tx) + await this.storage.userStorage.IncrementUserBalance(toUser.user_id, toIncrement, `${fromUserId}:${paymentEntry.serial_id}`, tx) + await this.storage.paymentStorage.SaveUserToUserPayment(paymentEntry, tx) if (isAppUserPayment && fee > 0) { await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, fee, 'fees', tx) } diff --git a/src/services/main/sanityChecker.ts b/src/services/main/sanityChecker.ts new file mode 100644 index 00000000..72f3087d --- /dev/null +++ b/src/services/main/sanityChecker.ts @@ -0,0 +1,263 @@ +import Storage from '../storage/index.js' +import { LightningHandler } from "../lnd/index.js" +import { LoggedEvent } from '../storage/eventsLog.js' +import { Invoice, Payment } from '../../../proto/lnd/lightning'; +const LN_INVOICE_REGEX = /^(lightning:)?(lnbc|lntb)[0-9a-zA-Z]+$/; +const BITCOIN_ADDRESS_REGEX = /^(bitcoin:)?([13][a-km-zA-HJ-NP-Z1-9]{25,34}|bc1[a-zA-HJ-NP-Z0-9]{39,59})$/; +type UniqueDecrementReasons = 'ban' +type UniqueIncrementReasons = 'fees' | 'routing_fee_refund' | 'payment_refund' +type CommonReasons = 'invoice' | 'address' | 'u2u' +type Reason = UniqueDecrementReasons | UniqueIncrementReasons | CommonReasons +export default class SanityChecker { + storage: Storage + lnd: LightningHandler + + events: LoggedEvent[] = [] + invoices: Invoice[] = [] + payments: Payment[] = [] + incrementSources: Record = {} + decrementSources: Record = {} + decrementEvents: Record = {} + users: Record = {} + constructor(storage: Storage, lnd: LightningHandler) { + this.storage = storage + this.lnd = lnd + } + + parseDataField(data: string): { type: Reason, data: string, txHash?: string, serialId?: number } { + const parts = data.split(":") + if (parts.length === 1) { + const [fullData] = parts + if (fullData === 'fees' || fullData === 'ban') { + return { type: fullData, data: fullData } + } else if (LN_INVOICE_REGEX.test(fullData)) { + return { type: 'invoice', data: fullData } + } else if (BITCOIN_ADDRESS_REGEX.test(fullData)) { + return { type: 'address', data: fullData } + } else { + return { type: 'u2u', data: fullData } + } + } else if (parts.length === 2) { + const [prefix, data] = parts + if (prefix === 'routing_fee_refund' || prefix === 'payment_refund') { + return { type: prefix, data } + } else if (BITCOIN_ADDRESS_REGEX.test(prefix)) { + return { type: 'address', data: prefix, txHash: data } + } else { + return { type: 'u2u', data: prefix, serialId: +data } + } + } + throw new Error("unknown data format") + } + + async verifyDecrementEvent(e: LoggedEvent) { + if (this.decrementSources[e.data]) { + throw new Error("entry decremented more that once " + e.data) + } + this.decrementSources[e.data] = true + this.users[e.userId] = this.checkUserEntry(e, this.users[e.userId]) + const parsed = this.parseDataField(e.data) + switch (parsed.type) { + case 'ban': + return + case 'address': + return this.validateUserTransactionPayment({ address: parsed.data, txHash: parsed.txHash, userId: e.userId }) + case 'invoice': + return this.validateUserInvoicePayment({ invoice: parsed.data, userId: e.userId, amt: e.amount }) + case 'u2u': + return this.validateUser2UserPayment({ fromUser: e.userId, toUser: parsed.data, serialId: parsed.serialId }) + default: + throw new Error("unknown decrement type " + parsed.type) + } + } + + async validateUserTransactionPayment({ address, txHash, userId }: { userId: string, address: string, txHash?: string }) { + if (!txHash) { + throw new Error("no tx hash provided to payment for address " + address) + } + const entry = await this.storage.paymentStorage.GetUserTransactionPaymentOwner(address, txHash) + if (!entry) { + throw new Error("no payment found for tx hash " + txHash) + } + if (entry.user.user_id !== userId) { + throw new Error("payment user id mismatch for tx hash " + txHash) + } + if (entry.paid_at_unix <= 0) { + throw new Error("payment not paid for tx hash " + txHash) + } + } + + async validateUserInvoicePayment({ invoice, userId, amt }: { userId: string, invoice: string, amt: number }) { + const entry = await this.storage.paymentStorage.GetPaymentOwner(invoice) + if (!entry) { + throw new Error("no payment found for invoice " + invoice) + } + if (entry.user.user_id !== userId) { + throw new Error("payment user id mismatch for invoice " + invoice) + } + if (entry.paid_at_unix === 0) { + throw new Error("payment never settled for invoice " + invoice) // TODO: check if this is correct + } + if (entry.paid_at_unix === -1) { + const refund = amt - (entry.paid_amount + entry.routing_fees + entry.service_fees) + this.decrementEvents[invoice] = { userId, refund, falure: true } + } else { + this.decrementEvents[invoice] = { userId, refund: amt, falure: false } + } + if (!entry.internal) { + const lndEntry = this.payments.find(i => i.paymentRequest === invoice) + if (!lndEntry) { + throw new Error("payment not found in lnd for invoice " + invoice) + } + } + } + + async validateUser2UserPayment({ fromUser, toUser, serialId }: { fromUser: string, toUser: string, serialId?: number }) { + if (!serialId) { + throw new Error("no serial id provided to u2u payment") + } + const entry = await this.storage.paymentStorage.GetUser2UserPayment(serialId) + if (!entry) { + throw new Error("no payment u2u found for serial id " + serialId) + } + if (entry.from_user.user_id !== fromUser || entry.to_user.user_id !== toUser) { + throw new Error("u2u payment user id mismatch for serial id " + serialId) + } + if (entry.paid_at_unix <= 0) { + throw new Error("payment not paid for serial id " + serialId) + } + } + + async verifyIncrementEvent(e: LoggedEvent) { + if (this.incrementSources[e.data]) { + throw new Error("entry incremented more that once " + e.data) + } + this.incrementSources[e.data] = true + this.users[e.userId] = this.checkUserEntry(e, this.users[e.userId]) + const parsed = this.parseDataField(e.data) + switch (parsed.type) { + case 'fees': + return + case 'address': + return this.validateAddressReceivingTransaction({ address: parsed.data, txHash: parsed.txHash, userId: e.userId }) + case 'invoice': + return this.validateReceivingInvoice({ invoice: parsed.data, userId: e.userId }) + case 'u2u': + return this.validateUser2UserPayment({ fromUser: parsed.data, toUser: e.userId, serialId: parsed.serialId }) + case 'routing_fee_refund': + return this.validateRoutingFeeRefund({ amt: e.amount, invoice: parsed.data, userId: e.userId }) + case 'payment_refund': + return this.validatePaymentRefund({ amt: e.amount, invoice: parsed.data, userId: e.userId }) + default: + throw new Error("unknown increment type " + parsed.type) + } + } + + async validateAddressReceivingTransaction({ userId, address, txHash }: { userId: string, address: string, txHash?: string }) { + if (!txHash) { + throw new Error("no tx hash provided to address " + address) + } + const entry = await this.storage.paymentStorage.GetAddressReceivingTransactionOwner(address, txHash) + if (!entry) { + throw new Error("no tx found for tx hash " + txHash) + } + if (entry.user_address.user.user_id !== userId) { + throw new Error("tx user id mismatch for tx hash " + txHash) + } + if (entry.paid_at_unix <= 0) { + throw new Error("tx not paid for tx hash " + txHash) + } + } + + async validateReceivingInvoice({ userId, invoice }: { userId: string, invoice: string }) { + const entry = await this.storage.paymentStorage.GetInvoiceOwner(invoice) + if (!entry) { + throw new Error("no invoice found for invoice " + invoice) + } + if (entry.user.user_id !== userId) { + throw new Error("invoice user id mismatch for invoice " + invoice) + } + if (entry.paid_at_unix <= 0) { + throw new Error("invoice not paid for invoice " + invoice) + } + if (!entry.internal) { + const entry = this.invoices.find(i => i.paymentRequest === invoice) + if (!entry) { + throw new Error("invoice not found in lnd " + invoice) + } + } + } + + async validateRoutingFeeRefund({ amt, invoice, userId }: { userId: string, invoice: string, amt: number }) { + const entry = this.decrementEvents[invoice] + if (!entry) { + throw new Error("no decrement event found for invoice routing fee refound " + invoice) + } + if (entry.userId !== userId) { + throw new Error("user id mismatch for routing fee refund " + invoice) + } + if (entry.falure) { + throw new Error("payment failled, should not refund routing fees " + invoice) + } + if (entry.refund !== amt) { + throw new Error("refund amount mismatch for routing fee refund " + invoice) + } + } + + async validatePaymentRefund({ amt, invoice, userId }: { userId: string, invoice: string, amt: number }) { + const entry = this.decrementEvents[invoice] + if (!entry) { + throw new Error("no decrement event found for invoice payment refund " + invoice) + } + if (entry.userId !== userId) { + throw new Error("user id mismatch for payment refund " + invoice) + } + if (!entry.falure) { + throw new Error("payment did not fail, should not refund payment " + invoice) + } + if (entry.refund !== amt) { + throw new Error("refund amount mismatch for payment refund " + invoice) + } + } + + async VerifyEventsLog() { + this.events = await this.storage.eventsLog.GetAllLogs() + this.invoices = (await this.lnd.GetAllPaidInvoices(1000)).invoices + this.payments = (await this.lnd.GetAllPayments(1000)).payments + this.incrementSources = {} + this.decrementSources = {} + this.users = {} + this.users = {} + this.decrementEvents = {} + for (let i = 0; i < this.events.length; i++) { + const e = this.events[i] + if (e.type === 'balance_decrement') { + await this.verifyDecrementEvent(e) + } else if (e.type === 'balance_increment') { + await this.verifyIncrementEvent(e) + } else { + await this.storage.paymentStorage.VerifyDbEvent(e) + } + } + await Promise.all(Object.entries(this.users).map(async ([userId, u]) => { + const user = await this.storage.userStorage.GetUser(userId) + if (user.balance_sats !== u.updatedBalance) { + throw new Error("sanity check on balance failed, expected: " + u.updatedBalance + " found: " + user.balance_sats) + } + })) + } + + checkUserEntry(e: LoggedEvent, u: { ts: number, updatedBalance: number } | undefined) { + const newEntry = { ts: e.timestampMs, updatedBalance: e.balance + e.amount * (e.type === 'balance_decrement' ? -1 : 1) } + if (!u) { + return newEntry + } + if (e.timestampMs < u.ts) { + throw new Error("entry out of order " + e.timestampMs + " " + u.ts) + } + if (e.balance !== u.updatedBalance) { + throw new Error("inconsistent balance update got: " + e.balance + " expected " + u.updatedBalance) + } + return newEntry + } +} \ No newline at end of file diff --git a/src/services/main/settings.ts b/src/services/main/settings.ts index 0fddaf87..e2bcda27 100644 --- a/src/services/main/settings.ts +++ b/src/services/main/settings.ts @@ -1,6 +1,6 @@ import { LoadStorageSettingsFromEnv, StorageSettings } from '../storage/index.js' import { LndSettings } from '../lnd/settings.js' -import { LoadWatchdogSettingsFromEnv, WatchdogSettings } from '../lnd/watchdog.js' +import { LoadWatchdogSettingsFromEnv, WatchdogSettings } from './watchdog.js' import { LoadLndSettingsFromEnv } from '../lnd/index.js' import { EnvMustBeInteger, EnvMustBeNonEmptyString } from '../helpers/envParser.js' export type MainSettings = { diff --git a/src/services/lnd/watchdog.ts b/src/services/main/watchdog.ts similarity index 87% rename from src/services/lnd/watchdog.ts rename to src/services/main/watchdog.ts index da52eaa4..f5108f6c 100644 --- a/src/services/lnd/watchdog.ts +++ b/src/services/main/watchdog.ts @@ -1,6 +1,7 @@ import { EnvCanBeInteger } from "../helpers/envParser.js"; import { getLogger } from "../helpers/logger.js"; -import { LightningHandler } from "./index.js"; +import { LightningHandler } from "../lnd/index.js"; +import Storage from '../storage/index.js' export type WatchdogSettings = { maxDiffSats: number } @@ -14,17 +15,28 @@ export class Watchdog { initialUsersBalance: number; lnd: LightningHandler; settings: WatchdogSettings; + storage: Storage; + latestCheckStart = 0 log = getLogger({ appName: "watchdog" }) enabled = false - constructor(settings: WatchdogSettings, lnd: LightningHandler) { + constructor(settings: WatchdogSettings, lnd: LightningHandler, storage: Storage) { this.lnd = lnd; this.settings = settings; + this.storage = storage; } - SeedLndBalance = async (totalUsersBalance: number) => { + Start = async () => { + const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() this.initialLndBalance = await this.getTotalLndBalance() this.initialUsersBalance = totalUsersBalance this.enabled = true + + setInterval(() => { + if (this.latestCheckStart + (1000 * 60) < Date.now()) { + this.log("No balance check was made in the last minute, checking now") + this.PaymentRequested() + } + }, 1000 * 60) } getTotalLndBalance = async () => { @@ -77,12 +89,14 @@ export class Watchdog { return false } - PaymentRequested = async (totalUsersBalance: number) => { + PaymentRequested = async () => { this.log("Payment requested, checking balance") if (!this.enabled) { this.log("WARNING! Watchdog not enabled, skipping balance check") return } + this.latestCheckStart = Date.now() + const totalUsersBalance = await this.storage.paymentStorage.GetTotalUsersBalance() const totalLndBalance = await this.getTotalLndBalance() const deltaLnd = totalLndBalance - this.initialLndBalance const deltaUsers = totalUsersBalance - this.initialUsersBalance diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index 03281232..7a0c249e 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -114,6 +114,23 @@ export default class { }) } + async GetAddressReceivingTransactionOwner(address: string, txHash: string, entityManager = this.DB): Promise { + return entityManager.getRepository(AddressReceivingTransaction).findOne({ + where: { + user_address: { address }, + tx_hash: txHash + } + }) + } + async GetUserTransactionPaymentOwner(address: string, txHash: string, entityManager = this.DB): Promise { + return entityManager.getRepository(UserTransactionPayment).findOne({ + where: { + address, + tx_hash: txHash + } + }) + } + async GetInvoiceOwner(paymentRequest: string, entityManager = this.DB): Promise { return entityManager.getRepository(UserReceivingInvoice).findOne({ where: { @@ -128,19 +145,48 @@ export default class { } }) } + async GetUser2UserPayment(serialId: number, entityManager = this.DB): Promise { + return entityManager.getRepository(UserToUserPayment).findOne({ + where: { + serial_id: serialId + } + }) + } - async AddUserInvoicePayment(userId: string, invoice: string, amount: number, routingFees: number, serviceFees: number, internal: boolean, linkedApplication: Application): Promise { + async AddPendingExternalPayment(userId: string, invoice: string, amount: number, linkedApplication: Application): Promise { const newPayment = this.DB.getRepository(UserInvoicePayment).create({ user: await this.userStorage.GetUser(userId), paid_amount: amount, invoice, - routing_fees: routingFees, - service_fees: serviceFees, - paid_at_unix: Math.floor(Date.now() / 1000), - internal, + routing_fees: 0, + service_fees: 0, + paid_at_unix: 0, + internal: false, linkedApplication }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` }) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add pending invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` }) + } + + async UpdateExternalPayment(invoicePaymentSerialId: number, routingFees: number, serviceFees: number, success: boolean) { + return this.DB.getRepository(UserInvoicePayment).update(invoicePaymentSerialId, { + routing_fees: routingFees, + service_fees: serviceFees, + paid_at_unix: success ? Math.floor(Date.now() / 1000) : -1 + }) + } + + async AddInternalPayment(userId: string, invoice: string, amount: number, serviceFees: number, linkedApplication: Application): Promise { + const newPayment = this.DB.getRepository(UserInvoicePayment).create({ + user: await this.userStorage.GetUser(userId), + paid_amount: amount, + invoice, + routing_fees: 0, + service_fees: serviceFees, + paid_at_unix: Math.floor(Date.now() / 1000), + internal: true, + linkedApplication + }) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add internal invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` }) } GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { @@ -237,16 +283,18 @@ export default class { return found } - async AddUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, dbTx: DataSource | EntityManager) { - const newKey = dbTx.getRepository(UserToUserPayment).create({ - from_user: await this.userStorage.GetUser(fromUserId), - to_user: await this.userStorage.GetUser(toUserId), + async CreateUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, dbTx: DataSource | EntityManager) { + return dbTx.getRepository(UserToUserPayment).create({ + from_user: await this.userStorage.GetUser(fromUserId, dbTx), + to_user: await this.userStorage.GetUser(toUserId, dbTx), paid_at_unix: Math.floor(Date.now() / 1000), paid_amount: amount, service_fees: fee, linkedApplication }) - return dbTx.getRepository(UserToUserPayment).save(newKey) + } + async SaveUserToUserPayment(payment: UserToUserPayment, dbTx: DataSource | EntityManager) { + return dbTx.getRepository(UserToUserPayment).save(payment) } GetUserToUserReceivedPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB) { @@ -362,6 +410,7 @@ export default class { } async GetTotalUsersBalance(entityManager = this.DB) { - return entityManager.getRepository(User).sum("balance_sats") + const total = await entityManager.getRepository(User).sum("balance_sats") + return total || 0 } } \ No newline at end of file