From 99fa8fb21b86080aaf501519c6b13dc56845069c Mon Sep 17 00:00:00 2001 From: hatim boufnichel Date: Fri, 8 Mar 2024 17:31:33 +0100 Subject: [PATCH] sanity check --- env.example | 1 + package-lock.json | 66 ++++++++++++++++++++++ package.json | 3 +- src/index.ts | 3 + src/services/main/index.ts | 42 ++++++++++---- src/services/main/paymentManager.ts | 33 +++++++---- src/services/main/settings.ts | 1 + src/services/storage/eventsLog.ts | 76 ++++++++++++++++++++++++++ src/services/storage/index.ts | 39 ++++++++++++- src/services/storage/paymentStorage.ts | 39 +++++++++++-- src/services/storage/userStorage.ts | 11 +++- 11 files changed, 282 insertions(+), 32 deletions(-) create mode 100644 src/services/storage/eventsLog.ts diff --git a/env.example b/env.example index b3cad64a..e783413d 100644 --- a/env.example +++ b/env.example @@ -42,3 +42,4 @@ MIGRATE_DB=false #METRICS RECORD_PERFORMANCE=true +SKIP_SANITY_CHECK=false diff --git a/package-lock.json b/package-lock.json index 58335a98..02a51b88 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "chai": "^4.3.7", "copyfiles": "^2.4.1", "cors": "^2.8.5", + "csv": "^6.3.8", "dotenv": "^16.0.0", "eccrypto": "^1.1.6", "express": "^4.18.1", @@ -1378,6 +1379,35 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "devOptional": true }, + "node_modules/csv": { + "version": "6.3.8", + "resolved": "https://registry.npmjs.org/csv/-/csv-6.3.8.tgz", + "integrity": "sha512-gRh3yiT9bHBA5ka2yOpyFqAVu/ZpwWzajMUR/es0ljevAE88WyHBuMUy7jzd2o5j6LYQesEO/AyhbQ9BhbDXUA==", + "dependencies": { + "csv-generate": "^4.4.0", + "csv-parse": "^5.5.5", + "csv-stringify": "^6.4.6", + "stream-transform": "^3.3.1" + }, + "engines": { + "node": ">= 0.1.90" + } + }, + "node_modules/csv-generate": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/csv-generate/-/csv-generate-4.4.0.tgz", + "integrity": "sha512-geM01acNPZ0wr4/9sKev5fCzFG/tsc/NbuFWrhLc47M1zQyUdEJH65+cxTLIVafEwhBjIYwQ7fdOL9roBqVltQ==" + }, + "node_modules/csv-parse": { + "version": "5.5.5", + "resolved": "https://registry.npmjs.org/csv-parse/-/csv-parse-5.5.5.tgz", + "integrity": "sha512-erCk7tyU3yLWAhk6wvKxnyPtftuy/6Ak622gOO7BCJ05+TYffnPCJF905wmOQm+BpkX54OdAl8pveJwUdpnCXQ==" + }, + "node_modules/csv-stringify": { + "version": "6.4.6", + "resolved": "https://registry.npmjs.org/csv-stringify/-/csv-stringify-6.4.6.tgz", + "integrity": "sha512-h2V2XZ3uOTLilF5dPIptgUfN/o2ia/80Ie0Lly18LAnw5s8Eb7kt8rfxSUy24AztJZas9f6DPZpVlzDUtFt/ag==" + }, "node_modules/d": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz", @@ -3735,6 +3765,11 @@ "node": ">= 0.8" } }, + "node_modules/stream-transform": { + "version": "3.3.1", + "resolved": "https://registry.npmjs.org/stream-transform/-/stream-transform-3.3.1.tgz", + "integrity": "sha512-BL8pv9QL8Ikd11oZwlRDp1qYMhGR0i50zI9ltoijKGc4ubQWal/Rc4p6SYJp1TBOGpE0uAGchwbxOZ1ycwTuqQ==" + }, "node_modules/string_decoder": { "version": "0.10.31", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", @@ -5614,6 +5649,32 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "devOptional": true }, + "csv": { + "version": "6.3.8", + "resolved": "https://registry.npmjs.org/csv/-/csv-6.3.8.tgz", + "integrity": "sha512-gRh3yiT9bHBA5ka2yOpyFqAVu/ZpwWzajMUR/es0ljevAE88WyHBuMUy7jzd2o5j6LYQesEO/AyhbQ9BhbDXUA==", + "requires": { + "csv-generate": "^4.4.0", + "csv-parse": "^5.5.5", + "csv-stringify": "^6.4.6", + "stream-transform": "^3.3.1" + } + }, + "csv-generate": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/csv-generate/-/csv-generate-4.4.0.tgz", + "integrity": "sha512-geM01acNPZ0wr4/9sKev5fCzFG/tsc/NbuFWrhLc47M1zQyUdEJH65+cxTLIVafEwhBjIYwQ7fdOL9roBqVltQ==" + }, + "csv-parse": { + "version": "5.5.5", + "resolved": "https://registry.npmjs.org/csv-parse/-/csv-parse-5.5.5.tgz", + "integrity": "sha512-erCk7tyU3yLWAhk6wvKxnyPtftuy/6Ak622gOO7BCJ05+TYffnPCJF905wmOQm+BpkX54OdAl8pveJwUdpnCXQ==" + }, + "csv-stringify": { + "version": "6.4.6", + "resolved": "https://registry.npmjs.org/csv-stringify/-/csv-stringify-6.4.6.tgz", + "integrity": "sha512-h2V2XZ3uOTLilF5dPIptgUfN/o2ia/80Ie0Lly18LAnw5s8Eb7kt8rfxSUy24AztJZas9f6DPZpVlzDUtFt/ag==" + }, "d": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz", @@ -7415,6 +7476,11 @@ "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", "integrity": "sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ==" }, + "stream-transform": { + "version": "3.3.1", + "resolved": "https://registry.npmjs.org/stream-transform/-/stream-transform-3.3.1.tgz", + "integrity": "sha512-BL8pv9QL8Ikd11oZwlRDp1qYMhGR0i50zI9ltoijKGc4ubQWal/Rc4p6SYJp1TBOGpE0uAGchwbxOZ1ycwTuqQ==" + }, "string_decoder": { "version": "0.10.31", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", diff --git a/package.json b/package.json index a8d155b3..042c32c5 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "chai": "^4.3.7", "copyfiles": "^2.4.1", "cors": "^2.8.5", + "csv": "^6.3.8", "dotenv": "^16.0.0", "eccrypto": "^1.1.6", "express": "^4.18.1", @@ -74,4 +75,4 @@ "ts-node": "10.7.0", "typescript": "4.5.2" } -} \ No newline at end of file +} diff --git a/src/index.ts b/src/index.ts index 14fc294f..3f8f435e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,9 @@ const start = async () => { await storageManager.userStorage.UpdateUser(process.argv[3], { balance_sats: +process.argv[4] }) log("user balance updated correctly") } + if (!mainSettings.skipSanityCheck) { + await storageManager.VerifyEventsLog() + } const mainHandler = new Main(mainSettings, storageManager) await mainHandler.lnd.Warmup() const serverMethods = GetServerMethods(mainHandler) diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 61a8d924..4e1f116b 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -16,6 +16,7 @@ import { UserReceivingInvoice, ZapInfo } from '../storage/entity/UserReceivingIn import { UnsignedEvent } from '../nostr/tools/event.js' import { NostrSend } from '../nostr/handler.js' import MetricsManager from '../metrics/index.js' +import EventsLogManager from '../storage/eventsLog.js' export const LoadMainSettingsFromEnv = (test = false): MainSettings => { return { lndSettings: LoadLndSettingsFromEnv(test), @@ -31,7 +32,8 @@ export const LoadMainSettingsFromEnv = (test = false): MainSettings => { appToUserFee: EnvMustBeInteger("TX_FEE_INTERNAL_ROOT_BPS") / 10000, serviceUrl: EnvMustBeNonEmptyString("SERVICE_URL"), servicePort: EnvMustBeInteger("PORT"), - recordPerformance: process.env.RECORD_PERFORMANCE === 'true' || false + recordPerformance: process.env.RECORD_PERFORMANCE === 'true' || false, + skipSanityCheck: process.env.SKIP_SANITY_CHECK === 'true' || false } } @@ -59,6 +61,7 @@ export default class { constructor(settings: MainSettings, storage: Storage) { this.settings = settings this.storage = storage + this.lnd = NewLightningHandler(settings.lndSettings, this.addressPaidCb, this.invoicePaidCb, this.newBlockCb, this.htlcCb) this.metricsManager = new MetricsManager(this.storage, this.lnd) @@ -66,6 +69,7 @@ export default class { this.productManager = new ProductManager(this.storage, this.paymentManager, this.settings) this.applicationManager = new ApplicationManager(this.storage, this.settings, this.paymentManager) this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager) + } attachNostrSend(f: NostrSend) { @@ -97,13 +101,22 @@ export default class { await this.storage.paymentStorage.UpdateUserTransactionPayment(c.tx.serial_id, { confs: c.confs }) } else { this.storage.StartTransaction(async tx => { - const { user_address: userAddress, paid_amount: amount, service_fee: serviceFee, serial_id: serialId } = c.tx + const { user_address: userAddress, paid_amount: amount, service_fee: serviceFee, serial_id: serialId, tx_hash } = c.tx + if (!userAddress.linkedApplication) { + log("ERROR", "an address was paid, that has no linked application") + return + } const updateResult = await this.storage.paymentStorage.UpdateAddressReceivingTransaction(serialId, { confs: c.confs }, tx) if (!updateResult.affected) { throw new Error("unable to flag chain transaction as paid") } - await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, amount - serviceFee, tx) - const operationId = `${Types.UserOperationType.INCOMING_TX}-${userAddress.serial_id}` + await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, amount - serviceFee, userAddress.address, tx) + if (serviceFee > 0) { + await this.storage.userStorage.IncrementUserBalance(userAddress.linkedApplication.owner.user_id, serviceFee, 'fees', tx) + } + const addressData = `${userAddress.address}:${tx_hash}` + this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount }) + const operationId = `${Types.UserOperationType.INCOMING_TX}-${serialId}` const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: serviceFee, confirmed: true, tx_hash: c.tx.tx_hash, internal: c.tx.internal } this.sendOperationToNostr(userAddress.linkedApplication!, userAddress.user.user_id, op) }) @@ -132,7 +145,12 @@ export default class { // This call will fail if the transaction is already registered const addedTx = await this.storage.paymentStorage.AddAddressReceivingTransaction(userAddress, txOutput.hash, txOutput.index, amount, fee, internal, blockHeight, tx) if (internal) { - await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, addedTx.paid_amount - fee, tx) + await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, addedTx.paid_amount - fee, userAddress.address, tx) + if (fee > 0) { + await this.storage.userStorage.IncrementUserBalance(userAddress.linkedApplication.owner.user_id, fee, 'fees', tx) + } + const addressData = `${address}:${txOutput.hash}` + this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount }) } const operationId = `${Types.UserOperationType.INCOMING_TX}-${addedTx.serial_id}` const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: fee, confirmed: internal, tx_hash: txOutput.hash, internal: false } @@ -161,13 +179,15 @@ export default class { fee = 0 } try { - await this.storage.paymentStorage.FlagInvoiceAsPaid(userInvoice, amount, fee, internal, tx) - - await this.storage.userStorage.IncrementUserBalance(userInvoice.user.user_id, amount - fee, tx) - if (isAppUserPayment && fee > 0) { - await this.storage.userStorage.IncrementUserBalance(userInvoice.linkedApplication.owner.user_id, fee, tx) + const flaggingRes = await this.storage.paymentStorage.FlagInvoiceAsPaid(userInvoice, amount, fee, internal, tx) + if (flaggingRes.affected) { + throw new Error("no invoice affected by flag as paid") } - + await this.storage.userStorage.IncrementUserBalance(userInvoice.user.user_id, amount - fee, userInvoice.invoice, tx) + if (fee > 0) { + await this.storage.userStorage.IncrementUserBalance(userInvoice.linkedApplication.owner.user_id, fee, 'fees', tx) + } + this.storage.eventsLog.LogEvent({ type: 'invoice_paid', userId: userInvoice.user.user_id, appId: userInvoice.linkedApplication.app_id, appUserId: "", balance: userInvoice.user.balance_sats, data: paymentRequest, amount }) await this.triggerPaidCallback(log, userInvoice.callbackUrl) const operationId = `${Types.UserOperationType.INCOMING_INVOICE}-${userInvoice.serial_id}` const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_INVOICE, identifier: userInvoice.invoice, operationId, network_fee: 0, service_fee: fee, confirmed: true, tx_hash: "", internal } diff --git a/src/services/main/paymentManager.ts b/src/services/main/paymentManager.ts index 7448c9c3..d81b5884 100644 --- a/src/services/main/paymentManager.ts +++ b/src/services/main/paymentManager.ts @@ -95,12 +95,14 @@ export default class { async NewAddress(ctx: Types.UserContext, req: Types.NewAddressRequest): Promise { const app = await this.storage.applicationStorage.GetApplication(ctx.app_id) + const user = await this.storage.userStorage.GetUser(ctx.user_id) const existingAddress = await this.storage.paymentStorage.GetExistingUserAddress(ctx.user_id, app) if (existingAddress) { return { address: existingAddress.address } } const res = await this.lnd.NewAddress(req.addressType) - const userAddress = await this.storage.paymentStorage.AddUserAddress(ctx.user_id, res.address, { linkedApplication: app }) + const userAddress = await this.storage.paymentStorage.AddUserAddress(user, res.address, { linkedApplication: app }) + this.storage.eventsLog.LogEvent({ type: 'new_address', userId: user.user_id, appUserId: "", appId: app.app_id, balance: user.balance_sats, data: res.address, amount: 0 }) return { address: userAddress.address } } @@ -108,6 +110,8 @@ export default class { const user = await this.storage.userStorage.GetUser(userId) const res = await this.lnd.NewInvoice(req.amountSats, req.memo, options.expiry) const userInvoice = await this.storage.paymentStorage.AddUserInvoice(user, res.payRequest, options) + const appId = options.linkedApplication ? options.linkedApplication.app_id : "" + this.storage.eventsLog.LogEvent({ type: 'new_invoice', userId: user.user_id, appUserId: "", appId, balance: user.balance_sats, data: userInvoice.invoice, amount: req.amountSats }) return { invoice: userInvoice.invoice } @@ -162,7 +166,7 @@ export default class { await this.lockUserWithMinBalance(userId, totalAmountToDecrement + routingFeeLimit) try { payment = await this.lnd.PayInvoice(req.invoice, req.amount, routingFeeLimit) - await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + payment.feeSat) + await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + payment.feeSat, req.invoice) await this.storage.userStorage.UnlockUser(userId) } catch (err) { await this.storage.userStorage.UnlockUser(userId) @@ -172,14 +176,16 @@ export default class { if (internalInvoice.paid_at_unix > 0) { throw new Error("this invoice was already paid") } - await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement) + await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement, req.invoice) this.invoicePaidCb(req.invoice, payAmount, true) } if (isAppUserPayment && serviceFee > 0) { - await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, serviceFee) + await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, serviceFee, "fees") } const routingFees = payment ? payment.feeSat : 0 const newPayment = await this.storage.paymentStorage.AddUserInvoicePayment(userId, req.invoice, payAmount, routingFees, serviceFee, !!internalInvoice, linkedApplication) + const user = await this.storage.userStorage.GetUser(userId) + this.storage.eventsLog.LogEvent({ type: 'invoice_payment', userId, appId: linkedApplication.app_id, appUserId: "", balance: user.balance_sats, data: req.invoice, amount: payAmount }) return { preimage: payment ? payment.paymentPreimage : "", amount_paid: payment ? Number(payment.valueSat) : payAmount, @@ -207,23 +213,26 @@ export default class { try { const payment = await this.lnd.PayAddress(req.address, req.amoutSats, req.satsPerVByte) txId = payment.txid - await this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee) + await this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee, req.address) await this.storage.userStorage.UnlockUser(ctx.user_id) } catch (err) { await this.storage.userStorage.UnlockUser(ctx.user_id) throw err } } else { - await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee) + await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee, req.address) txId = crypto.randomBytes(32).toString("hex") this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true) } if (isAppUserPayment && serviceFee > 0) { - await this.storage.userStorage.IncrementUserBalance(app.owner.user_id, serviceFee) + await this.storage.userStorage.IncrementUserBalance(app.owner.user_id, serviceFee, 'fees') } const newTx = await this.storage.paymentStorage.AddUserTransactionPayment(ctx.user_id, req.address, txId, 0, req.amoutSats, chainFees, serviceFee, !!internalAddress, blockHeight, app) + const user = await this.storage.userStorage.GetUser(ctx.user_id) + const txData = `${newTx.address}:${newTx.tx_hash}` + this.storage.eventsLog.LogEvent({ type: 'address_payment', userId: ctx.user_id, appId: app.app_id, appUserId: "", balance: user.balance_sats, data: txData, amount: req.amoutSats }) return { txId: txId, operation_id: `${Types.UserOperationType.OUTGOING_TX}-${newTx.serial_id}`, @@ -484,14 +493,18 @@ export default class { const isAppUserPayment = fromUser.user_id !== linkedApplication.owner.user_id let fee = this.getServiceFee(Types.UserOperationType.OUTGOING_USER_TO_USER, amount, isAppUserPayment) const toIncrement = amount - fee - await this.storage.userStorage.DecrementUserBalance(fromUser.user_id, amount, tx) - await this.storage.userStorage.IncrementUserBalance(toUser.user_id, toIncrement, tx) + await this.storage.userStorage.DecrementUserBalance(fromUser.user_id, amount, toUserId, tx) + await this.storage.userStorage.IncrementUserBalance(toUser.user_id, toIncrement, fromUserId, tx) await this.storage.paymentStorage.AddUserToUserPayment(fromUserId, toUserId, amount, fee, linkedApplication, tx) if (isAppUserPayment && fee > 0) { - await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, fee, tx) + await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, fee, 'fees', tx) } sentAmount = toIncrement }) + const fromUser = await this.storage.userStorage.GetUser(fromUserId) + const toUser = await this.storage.userStorage.GetUser(toUserId) + this.storage.eventsLog.LogEvent({ type: 'u2u_sender', userId: fromUserId, appId: linkedApplication.app_id, appUserId: "", balance: fromUser.balance_sats, data: toUserId, amount: amount }) + this.storage.eventsLog.LogEvent({ type: 'u2u_receiver', userId: toUserId, appId: linkedApplication.app_id, appUserId: "", balance: toUser.balance_sats, data: fromUserId, amount: amount }) return sentAmount } diff --git a/src/services/main/settings.ts b/src/services/main/settings.ts index a4b705a1..1502d502 100644 --- a/src/services/main/settings.ts +++ b/src/services/main/settings.ts @@ -15,5 +15,6 @@ export type MainSettings = { serviceUrl: string servicePort: number recordPerformance: boolean + skipSanityCheck: boolean } \ No newline at end of file diff --git a/src/services/storage/eventsLog.ts b/src/services/storage/eventsLog.ts new file mode 100644 index 00000000..168f2d38 --- /dev/null +++ b/src/services/storage/eventsLog.ts @@ -0,0 +1,76 @@ +import fs from 'fs' +import { parse, stringify } from 'csv' +import { getLogger } from '../helpers/logger' +const eventLogPath = "logs/eventLog.csv" +type LoggedEventType = 'new_invoice' | 'new_address' | 'address_paid' | 'invoice_paid' | 'invoice_payment' | 'address_payment' | 'u2u_receiver' | 'u2u_sender' | 'balance_increment' | 'balance_decrement' +export type LoggedEvent = { + timestampMs: number + userId: string + appUserId: string + appId: string + balance: number + type: LoggedEventType + data: string + amount: number +} +const columns = ["timestampMs", "userId", "appUserId", "appId", "balance", "type", "data", "amount"] +type StringerWrite = (chunk: any, cb: (error: Error | null | undefined) => void) => boolean +export default class EventsLogManager { + log = getLogger({ appName: "EventsLogManager" }) + stringerWrite: StringerWrite + constructor() { + const exists = fs.existsSync(eventLogPath) + if (!exists) { + const stringer = stringify({ header: true, columns }) + stringer.pipe(fs.createWriteStream(eventLogPath, { flags: 'a' })) + this.stringerWrite = (chunk, cb) => stringer.write(chunk, cb) + } else { + const stringer = stringify({}) + stringer.pipe(fs.createWriteStream(eventLogPath, { flags: 'a' })) + this.stringerWrite = (chunk, cb) => stringer.write(chunk, cb) + } + } + + + + LogEvent = (e: Omit) => { + this.log(e.type, "->", e.userId, "->", e.appId, "->", e.appUserId, "->", e.balance, "->", e.data, "->", e.amount) + this.write([Date.now(), e.userId, e.appUserId, e.appId, e.balance, e.type, e.data, e.amount]) + } + + GetAllLogs = async (): Promise => { + const logs = await this.Read() + this.log("found", logs.length, "event logs") + return logs + } + + Read = async (): Promise => { + const exists = fs.existsSync(eventLogPath) + if (!exists) { + return [] + } + return new Promise((res, rej) => { + const result: LoggedEvent[] = [] + fs.createReadStream(eventLogPath) + .pipe(parse({ delimiter: ",", from_line: 2 })) + .on('data', data => { result.push(this.parseEvent(data)) }) + .on('error', err => { rej(err) }) + .on('end', () => { res(result) }) + }) + } + + parseEvent = (args: string[]): LoggedEvent => { + const [timestampMs, userId, appUserId, appId, balance, type, data, amount] = args + return { timestampMs: +timestampMs, userId, appUserId, appId, balance: +balance, type: type as LoggedEventType, data, amount: +amount } + } + + write = async (args: (string | number)[]) => { + return new Promise((res, rej) => { + this.stringerWrite(args, err => { + if (err) { + rej(err) + } else { res() } + }) + }) + } +} \ No newline at end of file diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index a1696cc4..512889f9 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -6,6 +6,7 @@ import UserStorage from "./userStorage.js"; import PaymentStorage from "./paymentStorage.js"; import MetricsStorage from "./metricsStorage.js"; import TransactionsQueue, { TX } from "./transactionsQueue.js"; +import EventsLogManager, { LoggedEvent } from "./eventsLog.js"; export type StorageSettings = { dbSettings: DbSettings } @@ -22,14 +23,16 @@ export default class { userStorage: UserStorage paymentStorage: PaymentStorage metricsStorage: MetricsStorage + eventsLog: EventsLogManager constructor(settings: StorageSettings) { this.settings = settings + this.eventsLog = new EventsLogManager() } async Connect(migrations: Function[], metricsMigrations: Function[]) { const { source, executedMigrations } = await NewDB(this.settings.dbSettings, migrations) this.DB = source this.txQueue = new TransactionsQueue("main", this.DB) - this.userStorage = new UserStorage(this.DB, this.txQueue) + this.userStorage = new UserStorage(this.DB, this.txQueue, this.eventsLog) this.productStorage = new ProductStorage(this.DB, this.txQueue) this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue) this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue) @@ -38,6 +41,40 @@ export default class { return { executedMigrations, executedMetricsMigrations }; } + async VerifyEventsLog() { + const events = await this.eventsLog.GetAllLogs() + + const users: Record = {} + for (let i = 0; i < events.length; i++) { + const e = events[i] + if (e.type === 'balance_decrement' || e.type === 'balance_increment') { + users[e.userId] = this.checkUserEntry(e, users[e.userId]) + } else { + await this.paymentStorage.VerifyDbEvent(e) + } + } + await Promise.all(Object.entries(users).map(async ([userId, u]) => { + const user = await this.userStorage.GetUser(userId) + if (user.balance_sats !== u.updatedBalance) { + throw new Error("sanity check on balance failed, expected: " + u.updatedBalance + " found: " + user.balance_sats) + } + })) + } + + checkUserEntry(e: LoggedEvent, u: { ts: number, updatedBalance: number } | undefined) { + const newEntry = { ts: e.timestampMs, updatedBalance: e.balance + e.amount * (e.type === 'balance_decrement' ? -1 : 1) } + if (!u) { + return newEntry + } + if (e.timestampMs < u.ts) { + throw new Error("entry out of order " + e.timestampMs + " " + u.ts) + } + if (e.balance !== u.updatedBalance) { + throw new Error("inconsistent balance update got: " + e.balance + " expected " + u.updatedBalance) + } + return newEntry + } + StartTransaction(exec: TX, description?: string) { return this.txQueue.PushToQueue({ exec, dbTx: true, description }) } diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index c8005ec2..95309e14 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -11,7 +11,8 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio import { UserInvoicePayment } from './entity/UserInvoicePayment.js'; import { UserToUserPayment } from './entity/UserToUserPayment.js'; import { Application } from './entity/Application.js'; -import TransactionsQueue, { TX } from "./transactionsQueue.js"; +import TransactionsQueue from "./transactionsQueue.js"; +import { LoggedEvent } from './eventsLog.js'; export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo } export const defaultInvoiceExpiry = 60 * 60 export default class { @@ -23,6 +24,7 @@ export default class { this.userStorage = userStorage this.txQueue = txQueue } + async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, dbTx: EntityManager | DataSource) { const newAddressTransaction = dbTx.getRepository(AddressReceivingTransaction).create({ user_address: address, @@ -56,22 +58,22 @@ export default class { return entityManager.getRepository(UserReceivingAddress).findOne({ where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }) } - async AddUserAddress(userId: string, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise { + async AddUserAddress(user: User, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise { const newUserAddress = this.DB.getRepository(UserReceivingAddress).create({ address, callbackUrl: opts.callbackUrl || "", linkedApplication: opts.linkedApplication, - user: await this.userStorage.GetUser(userId) + user }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${userId} linked to ${opts.linkedApplication?.app_id}: ${address} ` }) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${user.user_id} linked to ${opts.linkedApplication?.app_id}: ${address} ` }) } - async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, entityManager = this.DB) { + async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, dbTx: EntityManager | DataSource) { const i: Partial = { paid_at_unix: Math.floor(Date.now() / 1000), paid_amount: amount, service_fee: serviceFee, internal } if (!internal) { i.paidByLnd = true } - return entityManager.getRepository(UserReceivingInvoice).update(invoice.serial_id, i) + return dbTx.getRepository(UserReceivingInvoice).update(invoice.serial_id, i) } GetUserInvoicesFlaggedAsPaid(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { @@ -326,4 +328,29 @@ export default class { ]) return !!i || !!tx || !!u2u } + + async VerifyDbEvent(e: LoggedEvent) { + switch (e.type) { + case "new_invoice": + return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } }) + case 'new_address': + return this.DB.getRepository(UserReceivingAddress).findOneOrFail({ where: { address: e.data, user: { user_id: e.userId } } }) + case 'invoice_paid': + return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId }, paid_at_unix: MoreThan(0) } }) + case 'invoice_payment': + return this.DB.getRepository(UserInvoicePayment).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } }) + case 'address_paid': + const [receivingAddress, receivedHash] = e.data.split(":") + return this.DB.getRepository(AddressReceivingTransaction).findOneOrFail({ where: { user_address: { address: receivingAddress }, tx_hash: receivedHash, confs: MoreThan(0) } }) + case 'address_payment': + const [sentAddress, sentHash] = e.data.split(":") + return this.DB.getRepository(UserTransactionPayment).findOneOrFail({ where: { address: sentAddress, tx_hash: sentHash, user: { user_id: e.userId } } }) + case 'u2u_receiver': + return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { from_user: { user_id: e.data }, to_user: { user_id: e.userId } } }) + case 'u2u_sender': + return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { to_user: { user_id: e.data }, from_user: { user_id: e.userId } } }) + default: + break; + } + } } \ No newline at end of file diff --git a/src/services/storage/userStorage.ts b/src/services/storage/userStorage.ts index c5cd43fb..e22491d5 100644 --- a/src/services/storage/userStorage.ts +++ b/src/services/storage/userStorage.ts @@ -4,12 +4,15 @@ import { User } from './entity/User.js'; import { UserBasicAuth } from './entity/UserBasicAuth.js'; import { getLogger } from '../helpers/logger.js'; import TransactionsQueue from "./transactionsQueue.js"; +import EventsLogManager from './eventsLog.js'; export default class { DB: DataSource | EntityManager txQueue: TransactionsQueue - constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { + eventsLog: EventsLogManager + constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue, eventsLog: EventsLogManager) { this.DB = DB this.txQueue = txQueue + this.eventsLog = eventsLog } async AddUser(balance: number, dbTx: DataSource | EntityManager): Promise { if (balance && process.env.ALLOW_BALANCE_MIGRATION !== 'true') { @@ -66,7 +69,7 @@ export default class { throw new Error("unaffected user unlock for " + userId) // TODO: fix logs doxing } } - async IncrementUserBalance(userId: string, increment: number, entityManager = this.DB) { + async IncrementUserBalance(userId: string, increment: number, reason: string, entityManager = this.DB) { const user = await this.GetUser(userId, entityManager) const res = await entityManager.getRepository(User).increment({ user_id: userId, @@ -75,8 +78,9 @@ export default class { throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing } getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats") + this.eventsLog.LogEvent({ type: 'balance_increment', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: increment }) } - async DecrementUserBalance(userId: string, decrement: number, entityManager = this.DB) { + async DecrementUserBalance(userId: string, decrement: number, reason: string, entityManager = this.DB) { const user = await this.GetUser(userId, entityManager) if (!user || user.balance_sats < decrement) { throw new Error("not enough balance to decrement") @@ -88,6 +92,7 @@ export default class { throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing } getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats") + this.eventsLog.LogEvent({ type: 'balance_decrement', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: decrement }) } async UpdateUser(userId: string, update: Partial, entityManager = this.DB) {