sanity check

This commit is contained in:
hatim boufnichel 2024-03-08 17:31:33 +01:00
parent f85b5bc5e7
commit 99fa8fb21b
11 changed files with 282 additions and 32 deletions

View file

@ -42,3 +42,4 @@ MIGRATE_DB=false
#METRICS
RECORD_PERFORMANCE=true
SKIP_SANITY_CHECK=false

66
package-lock.json generated
View file

@ -22,6 +22,7 @@
"chai": "^4.3.7",
"copyfiles": "^2.4.1",
"cors": "^2.8.5",
"csv": "^6.3.8",
"dotenv": "^16.0.0",
"eccrypto": "^1.1.6",
"express": "^4.18.1",
@ -1378,6 +1379,35 @@
"integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
"devOptional": true
},
"node_modules/csv": {
"version": "6.3.8",
"resolved": "https://registry.npmjs.org/csv/-/csv-6.3.8.tgz",
"integrity": "sha512-gRh3yiT9bHBA5ka2yOpyFqAVu/ZpwWzajMUR/es0ljevAE88WyHBuMUy7jzd2o5j6LYQesEO/AyhbQ9BhbDXUA==",
"dependencies": {
"csv-generate": "^4.4.0",
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6",
"stream-transform": "^3.3.1"
},
"engines": {
"node": ">= 0.1.90"
}
},
"node_modules/csv-generate": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/csv-generate/-/csv-generate-4.4.0.tgz",
"integrity": "sha512-geM01acNPZ0wr4/9sKev5fCzFG/tsc/NbuFWrhLc47M1zQyUdEJH65+cxTLIVafEwhBjIYwQ7fdOL9roBqVltQ=="
},
"node_modules/csv-parse": {
"version": "5.5.5",
"resolved": "https://registry.npmjs.org/csv-parse/-/csv-parse-5.5.5.tgz",
"integrity": "sha512-erCk7tyU3yLWAhk6wvKxnyPtftuy/6Ak622gOO7BCJ05+TYffnPCJF905wmOQm+BpkX54OdAl8pveJwUdpnCXQ=="
},
"node_modules/csv-stringify": {
"version": "6.4.6",
"resolved": "https://registry.npmjs.org/csv-stringify/-/csv-stringify-6.4.6.tgz",
"integrity": "sha512-h2V2XZ3uOTLilF5dPIptgUfN/o2ia/80Ie0Lly18LAnw5s8Eb7kt8rfxSUy24AztJZas9f6DPZpVlzDUtFt/ag=="
},
"node_modules/d": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz",
@ -3735,6 +3765,11 @@
"node": ">= 0.8"
}
},
"node_modules/stream-transform": {
"version": "3.3.1",
"resolved": "https://registry.npmjs.org/stream-transform/-/stream-transform-3.3.1.tgz",
"integrity": "sha512-BL8pv9QL8Ikd11oZwlRDp1qYMhGR0i50zI9ltoijKGc4ubQWal/Rc4p6SYJp1TBOGpE0uAGchwbxOZ1ycwTuqQ=="
},
"node_modules/string_decoder": {
"version": "0.10.31",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",
@ -5614,6 +5649,32 @@
"integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
"devOptional": true
},
"csv": {
"version": "6.3.8",
"resolved": "https://registry.npmjs.org/csv/-/csv-6.3.8.tgz",
"integrity": "sha512-gRh3yiT9bHBA5ka2yOpyFqAVu/ZpwWzajMUR/es0ljevAE88WyHBuMUy7jzd2o5j6LYQesEO/AyhbQ9BhbDXUA==",
"requires": {
"csv-generate": "^4.4.0",
"csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6",
"stream-transform": "^3.3.1"
}
},
"csv-generate": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/csv-generate/-/csv-generate-4.4.0.tgz",
"integrity": "sha512-geM01acNPZ0wr4/9sKev5fCzFG/tsc/NbuFWrhLc47M1zQyUdEJH65+cxTLIVafEwhBjIYwQ7fdOL9roBqVltQ=="
},
"csv-parse": {
"version": "5.5.5",
"resolved": "https://registry.npmjs.org/csv-parse/-/csv-parse-5.5.5.tgz",
"integrity": "sha512-erCk7tyU3yLWAhk6wvKxnyPtftuy/6Ak622gOO7BCJ05+TYffnPCJF905wmOQm+BpkX54OdAl8pveJwUdpnCXQ=="
},
"csv-stringify": {
"version": "6.4.6",
"resolved": "https://registry.npmjs.org/csv-stringify/-/csv-stringify-6.4.6.tgz",
"integrity": "sha512-h2V2XZ3uOTLilF5dPIptgUfN/o2ia/80Ie0Lly18LAnw5s8Eb7kt8rfxSUy24AztJZas9f6DPZpVlzDUtFt/ag=="
},
"d": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz",
@ -7415,6 +7476,11 @@
"resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz",
"integrity": "sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ=="
},
"stream-transform": {
"version": "3.3.1",
"resolved": "https://registry.npmjs.org/stream-transform/-/stream-transform-3.3.1.tgz",
"integrity": "sha512-BL8pv9QL8Ikd11oZwlRDp1qYMhGR0i50zI9ltoijKGc4ubQWal/Rc4p6SYJp1TBOGpE0uAGchwbxOZ1ycwTuqQ=="
},
"string_decoder": {
"version": "0.10.31",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",

View file

@ -38,6 +38,7 @@
"chai": "^4.3.7",
"copyfiles": "^2.4.1",
"cors": "^2.8.5",
"csv": "^6.3.8",
"dotenv": "^16.0.0",
"eccrypto": "^1.1.6",
"express": "^4.18.1",
@ -74,4 +75,4 @@
"ts-node": "10.7.0",
"typescript": "4.5.2"
}
}
}

View file

@ -22,6 +22,9 @@ const start = async () => {
await storageManager.userStorage.UpdateUser(process.argv[3], { balance_sats: +process.argv[4] })
log("user balance updated correctly")
}
if (!mainSettings.skipSanityCheck) {
await storageManager.VerifyEventsLog()
}
const mainHandler = new Main(mainSettings, storageManager)
await mainHandler.lnd.Warmup()
const serverMethods = GetServerMethods(mainHandler)

View file

@ -16,6 +16,7 @@ import { UserReceivingInvoice, ZapInfo } from '../storage/entity/UserReceivingIn
import { UnsignedEvent } from '../nostr/tools/event.js'
import { NostrSend } from '../nostr/handler.js'
import MetricsManager from '../metrics/index.js'
import EventsLogManager from '../storage/eventsLog.js'
export const LoadMainSettingsFromEnv = (test = false): MainSettings => {
return {
lndSettings: LoadLndSettingsFromEnv(test),
@ -31,7 +32,8 @@ export const LoadMainSettingsFromEnv = (test = false): MainSettings => {
appToUserFee: EnvMustBeInteger("TX_FEE_INTERNAL_ROOT_BPS") / 10000,
serviceUrl: EnvMustBeNonEmptyString("SERVICE_URL"),
servicePort: EnvMustBeInteger("PORT"),
recordPerformance: process.env.RECORD_PERFORMANCE === 'true' || false
recordPerformance: process.env.RECORD_PERFORMANCE === 'true' || false,
skipSanityCheck: process.env.SKIP_SANITY_CHECK === 'true' || false
}
}
@ -59,6 +61,7 @@ export default class {
constructor(settings: MainSettings, storage: Storage) {
this.settings = settings
this.storage = storage
this.lnd = NewLightningHandler(settings.lndSettings, this.addressPaidCb, this.invoicePaidCb, this.newBlockCb, this.htlcCb)
this.metricsManager = new MetricsManager(this.storage, this.lnd)
@ -66,6 +69,7 @@ export default class {
this.productManager = new ProductManager(this.storage, this.paymentManager, this.settings)
this.applicationManager = new ApplicationManager(this.storage, this.settings, this.paymentManager)
this.appUserManager = new AppUserManager(this.storage, this.settings, this.applicationManager)
}
attachNostrSend(f: NostrSend) {
@ -97,13 +101,22 @@ export default class {
await this.storage.paymentStorage.UpdateUserTransactionPayment(c.tx.serial_id, { confs: c.confs })
} else {
this.storage.StartTransaction(async tx => {
const { user_address: userAddress, paid_amount: amount, service_fee: serviceFee, serial_id: serialId } = c.tx
const { user_address: userAddress, paid_amount: amount, service_fee: serviceFee, serial_id: serialId, tx_hash } = c.tx
if (!userAddress.linkedApplication) {
log("ERROR", "an address was paid, that has no linked application")
return
}
const updateResult = await this.storage.paymentStorage.UpdateAddressReceivingTransaction(serialId, { confs: c.confs }, tx)
if (!updateResult.affected) {
throw new Error("unable to flag chain transaction as paid")
}
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, amount - serviceFee, tx)
const operationId = `${Types.UserOperationType.INCOMING_TX}-${userAddress.serial_id}`
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, amount - serviceFee, userAddress.address, tx)
if (serviceFee > 0) {
await this.storage.userStorage.IncrementUserBalance(userAddress.linkedApplication.owner.user_id, serviceFee, 'fees', tx)
}
const addressData = `${userAddress.address}:${tx_hash}`
this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount })
const operationId = `${Types.UserOperationType.INCOMING_TX}-${serialId}`
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: serviceFee, confirmed: true, tx_hash: c.tx.tx_hash, internal: c.tx.internal }
this.sendOperationToNostr(userAddress.linkedApplication!, userAddress.user.user_id, op)
})
@ -132,7 +145,12 @@ export default class {
// This call will fail if the transaction is already registered
const addedTx = await this.storage.paymentStorage.AddAddressReceivingTransaction(userAddress, txOutput.hash, txOutput.index, amount, fee, internal, blockHeight, tx)
if (internal) {
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, addedTx.paid_amount - fee, tx)
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, addedTx.paid_amount - fee, userAddress.address, tx)
if (fee > 0) {
await this.storage.userStorage.IncrementUserBalance(userAddress.linkedApplication.owner.user_id, fee, 'fees', tx)
}
const addressData = `${address}:${txOutput.hash}`
this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount })
}
const operationId = `${Types.UserOperationType.INCOMING_TX}-${addedTx.serial_id}`
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: fee, confirmed: internal, tx_hash: txOutput.hash, internal: false }
@ -161,13 +179,15 @@ export default class {
fee = 0
}
try {
await this.storage.paymentStorage.FlagInvoiceAsPaid(userInvoice, amount, fee, internal, tx)
await this.storage.userStorage.IncrementUserBalance(userInvoice.user.user_id, amount - fee, tx)
if (isAppUserPayment && fee > 0) {
await this.storage.userStorage.IncrementUserBalance(userInvoice.linkedApplication.owner.user_id, fee, tx)
const flaggingRes = await this.storage.paymentStorage.FlagInvoiceAsPaid(userInvoice, amount, fee, internal, tx)
if (flaggingRes.affected) {
throw new Error("no invoice affected by flag as paid")
}
await this.storage.userStorage.IncrementUserBalance(userInvoice.user.user_id, amount - fee, userInvoice.invoice, tx)
if (fee > 0) {
await this.storage.userStorage.IncrementUserBalance(userInvoice.linkedApplication.owner.user_id, fee, 'fees', tx)
}
this.storage.eventsLog.LogEvent({ type: 'invoice_paid', userId: userInvoice.user.user_id, appId: userInvoice.linkedApplication.app_id, appUserId: "", balance: userInvoice.user.balance_sats, data: paymentRequest, amount })
await this.triggerPaidCallback(log, userInvoice.callbackUrl)
const operationId = `${Types.UserOperationType.INCOMING_INVOICE}-${userInvoice.serial_id}`
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_INVOICE, identifier: userInvoice.invoice, operationId, network_fee: 0, service_fee: fee, confirmed: true, tx_hash: "", internal }

View file

@ -95,12 +95,14 @@ export default class {
async NewAddress(ctx: Types.UserContext, req: Types.NewAddressRequest): Promise<Types.NewAddressResponse> {
const app = await this.storage.applicationStorage.GetApplication(ctx.app_id)
const user = await this.storage.userStorage.GetUser(ctx.user_id)
const existingAddress = await this.storage.paymentStorage.GetExistingUserAddress(ctx.user_id, app)
if (existingAddress) {
return { address: existingAddress.address }
}
const res = await this.lnd.NewAddress(req.addressType)
const userAddress = await this.storage.paymentStorage.AddUserAddress(ctx.user_id, res.address, { linkedApplication: app })
const userAddress = await this.storage.paymentStorage.AddUserAddress(user, res.address, { linkedApplication: app })
this.storage.eventsLog.LogEvent({ type: 'new_address', userId: user.user_id, appUserId: "", appId: app.app_id, balance: user.balance_sats, data: res.address, amount: 0 })
return { address: userAddress.address }
}
@ -108,6 +110,8 @@ export default class {
const user = await this.storage.userStorage.GetUser(userId)
const res = await this.lnd.NewInvoice(req.amountSats, req.memo, options.expiry)
const userInvoice = await this.storage.paymentStorage.AddUserInvoice(user, res.payRequest, options)
const appId = options.linkedApplication ? options.linkedApplication.app_id : ""
this.storage.eventsLog.LogEvent({ type: 'new_invoice', userId: user.user_id, appUserId: "", appId, balance: user.balance_sats, data: userInvoice.invoice, amount: req.amountSats })
return {
invoice: userInvoice.invoice
}
@ -162,7 +166,7 @@ export default class {
await this.lockUserWithMinBalance(userId, totalAmountToDecrement + routingFeeLimit)
try {
payment = await this.lnd.PayInvoice(req.invoice, req.amount, routingFeeLimit)
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + payment.feeSat)
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + payment.feeSat, req.invoice)
await this.storage.userStorage.UnlockUser(userId)
} catch (err) {
await this.storage.userStorage.UnlockUser(userId)
@ -172,14 +176,16 @@ export default class {
if (internalInvoice.paid_at_unix > 0) {
throw new Error("this invoice was already paid")
}
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement)
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement, req.invoice)
this.invoicePaidCb(req.invoice, payAmount, true)
}
if (isAppUserPayment && serviceFee > 0) {
await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, serviceFee)
await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, serviceFee, "fees")
}
const routingFees = payment ? payment.feeSat : 0
const newPayment = await this.storage.paymentStorage.AddUserInvoicePayment(userId, req.invoice, payAmount, routingFees, serviceFee, !!internalInvoice, linkedApplication)
const user = await this.storage.userStorage.GetUser(userId)
this.storage.eventsLog.LogEvent({ type: 'invoice_payment', userId, appId: linkedApplication.app_id, appUserId: "", balance: user.balance_sats, data: req.invoice, amount: payAmount })
return {
preimage: payment ? payment.paymentPreimage : "",
amount_paid: payment ? Number(payment.valueSat) : payAmount,
@ -207,23 +213,26 @@ export default class {
try {
const payment = await this.lnd.PayAddress(req.address, req.amoutSats, req.satsPerVByte)
txId = payment.txid
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee)
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee, req.address)
await this.storage.userStorage.UnlockUser(ctx.user_id)
} catch (err) {
await this.storage.userStorage.UnlockUser(ctx.user_id)
throw err
}
} else {
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee)
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee, req.address)
txId = crypto.randomBytes(32).toString("hex")
this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true)
}
if (isAppUserPayment && serviceFee > 0) {
await this.storage.userStorage.IncrementUserBalance(app.owner.user_id, serviceFee)
await this.storage.userStorage.IncrementUserBalance(app.owner.user_id, serviceFee, 'fees')
}
const newTx = await this.storage.paymentStorage.AddUserTransactionPayment(ctx.user_id, req.address, txId, 0, req.amoutSats, chainFees, serviceFee, !!internalAddress, blockHeight, app)
const user = await this.storage.userStorage.GetUser(ctx.user_id)
const txData = `${newTx.address}:${newTx.tx_hash}`
this.storage.eventsLog.LogEvent({ type: 'address_payment', userId: ctx.user_id, appId: app.app_id, appUserId: "", balance: user.balance_sats, data: txData, amount: req.amoutSats })
return {
txId: txId,
operation_id: `${Types.UserOperationType.OUTGOING_TX}-${newTx.serial_id}`,
@ -484,14 +493,18 @@ export default class {
const isAppUserPayment = fromUser.user_id !== linkedApplication.owner.user_id
let fee = this.getServiceFee(Types.UserOperationType.OUTGOING_USER_TO_USER, amount, isAppUserPayment)
const toIncrement = amount - fee
await this.storage.userStorage.DecrementUserBalance(fromUser.user_id, amount, tx)
await this.storage.userStorage.IncrementUserBalance(toUser.user_id, toIncrement, tx)
await this.storage.userStorage.DecrementUserBalance(fromUser.user_id, amount, toUserId, tx)
await this.storage.userStorage.IncrementUserBalance(toUser.user_id, toIncrement, fromUserId, tx)
await this.storage.paymentStorage.AddUserToUserPayment(fromUserId, toUserId, amount, fee, linkedApplication, tx)
if (isAppUserPayment && fee > 0) {
await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, fee, tx)
await this.storage.userStorage.IncrementUserBalance(linkedApplication.owner.user_id, fee, 'fees', tx)
}
sentAmount = toIncrement
})
const fromUser = await this.storage.userStorage.GetUser(fromUserId)
const toUser = await this.storage.userStorage.GetUser(toUserId)
this.storage.eventsLog.LogEvent({ type: 'u2u_sender', userId: fromUserId, appId: linkedApplication.app_id, appUserId: "", balance: fromUser.balance_sats, data: toUserId, amount: amount })
this.storage.eventsLog.LogEvent({ type: 'u2u_receiver', userId: toUserId, appId: linkedApplication.app_id, appUserId: "", balance: toUser.balance_sats, data: fromUserId, amount: amount })
return sentAmount
}

View file

@ -15,5 +15,6 @@ export type MainSettings = {
serviceUrl: string
servicePort: number
recordPerformance: boolean
skipSanityCheck: boolean
}

View file

@ -0,0 +1,76 @@
import fs from 'fs'
import { parse, stringify } from 'csv'
import { getLogger } from '../helpers/logger'
const eventLogPath = "logs/eventLog.csv"
type LoggedEventType = 'new_invoice' | 'new_address' | 'address_paid' | 'invoice_paid' | 'invoice_payment' | 'address_payment' | 'u2u_receiver' | 'u2u_sender' | 'balance_increment' | 'balance_decrement'
export type LoggedEvent = {
timestampMs: number
userId: string
appUserId: string
appId: string
balance: number
type: LoggedEventType
data: string
amount: number
}
const columns = ["timestampMs", "userId", "appUserId", "appId", "balance", "type", "data", "amount"]
type StringerWrite = (chunk: any, cb: (error: Error | null | undefined) => void) => boolean
export default class EventsLogManager {
log = getLogger({ appName: "EventsLogManager" })
stringerWrite: StringerWrite
constructor() {
const exists = fs.existsSync(eventLogPath)
if (!exists) {
const stringer = stringify({ header: true, columns })
stringer.pipe(fs.createWriteStream(eventLogPath, { flags: 'a' }))
this.stringerWrite = (chunk, cb) => stringer.write(chunk, cb)
} else {
const stringer = stringify({})
stringer.pipe(fs.createWriteStream(eventLogPath, { flags: 'a' }))
this.stringerWrite = (chunk, cb) => stringer.write(chunk, cb)
}
}
LogEvent = (e: Omit<LoggedEvent, 'timestampMs'>) => {
this.log(e.type, "->", e.userId, "->", e.appId, "->", e.appUserId, "->", e.balance, "->", e.data, "->", e.amount)
this.write([Date.now(), e.userId, e.appUserId, e.appId, e.balance, e.type, e.data, e.amount])
}
GetAllLogs = async (): Promise<LoggedEvent[]> => {
const logs = await this.Read()
this.log("found", logs.length, "event logs")
return logs
}
Read = async (): Promise<LoggedEvent[]> => {
const exists = fs.existsSync(eventLogPath)
if (!exists) {
return []
}
return new Promise<LoggedEvent[]>((res, rej) => {
const result: LoggedEvent[] = []
fs.createReadStream(eventLogPath)
.pipe(parse({ delimiter: ",", from_line: 2 }))
.on('data', data => { result.push(this.parseEvent(data)) })
.on('error', err => { rej(err) })
.on('end', () => { res(result) })
})
}
parseEvent = (args: string[]): LoggedEvent => {
const [timestampMs, userId, appUserId, appId, balance, type, data, amount] = args
return { timestampMs: +timestampMs, userId, appUserId, appId, balance: +balance, type: type as LoggedEventType, data, amount: +amount }
}
write = async (args: (string | number)[]) => {
return new Promise<void>((res, rej) => {
this.stringerWrite(args, err => {
if (err) {
rej(err)
} else { res() }
})
})
}
}

View file

@ -6,6 +6,7 @@ import UserStorage from "./userStorage.js";
import PaymentStorage from "./paymentStorage.js";
import MetricsStorage from "./metricsStorage.js";
import TransactionsQueue, { TX } from "./transactionsQueue.js";
import EventsLogManager, { LoggedEvent } from "./eventsLog.js";
export type StorageSettings = {
dbSettings: DbSettings
}
@ -22,14 +23,16 @@ export default class {
userStorage: UserStorage
paymentStorage: PaymentStorage
metricsStorage: MetricsStorage
eventsLog: EventsLogManager
constructor(settings: StorageSettings) {
this.settings = settings
this.eventsLog = new EventsLogManager()
}
async Connect(migrations: Function[], metricsMigrations: Function[]) {
const { source, executedMigrations } = await NewDB(this.settings.dbSettings, migrations)
this.DB = source
this.txQueue = new TransactionsQueue("main", this.DB)
this.userStorage = new UserStorage(this.DB, this.txQueue)
this.userStorage = new UserStorage(this.DB, this.txQueue, this.eventsLog)
this.productStorage = new ProductStorage(this.DB, this.txQueue)
this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue)
this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue)
@ -38,6 +41,40 @@ export default class {
return { executedMigrations, executedMetricsMigrations };
}
async VerifyEventsLog() {
const events = await this.eventsLog.GetAllLogs()
const users: Record<string, { ts: number, updatedBalance: number }> = {}
for (let i = 0; i < events.length; i++) {
const e = events[i]
if (e.type === 'balance_decrement' || e.type === 'balance_increment') {
users[e.userId] = this.checkUserEntry(e, users[e.userId])
} else {
await this.paymentStorage.VerifyDbEvent(e)
}
}
await Promise.all(Object.entries(users).map(async ([userId, u]) => {
const user = await this.userStorage.GetUser(userId)
if (user.balance_sats !== u.updatedBalance) {
throw new Error("sanity check on balance failed, expected: " + u.updatedBalance + " found: " + user.balance_sats)
}
}))
}
checkUserEntry(e: LoggedEvent, u: { ts: number, updatedBalance: number } | undefined) {
const newEntry = { ts: e.timestampMs, updatedBalance: e.balance + e.amount * (e.type === 'balance_decrement' ? -1 : 1) }
if (!u) {
return newEntry
}
if (e.timestampMs < u.ts) {
throw new Error("entry out of order " + e.timestampMs + " " + u.ts)
}
if (e.balance !== u.updatedBalance) {
throw new Error("inconsistent balance update got: " + e.balance + " expected " + u.updatedBalance)
}
return newEntry
}
StartTransaction(exec: TX<void>, description?: string) {
return this.txQueue.PushToQueue({ exec, dbTx: true, description })
}

View file

@ -11,7 +11,8 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio
import { UserInvoicePayment } from './entity/UserInvoicePayment.js';
import { UserToUserPayment } from './entity/UserToUserPayment.js';
import { Application } from './entity/Application.js';
import TransactionsQueue, { TX } from "./transactionsQueue.js";
import TransactionsQueue from "./transactionsQueue.js";
import { LoggedEvent } from './eventsLog.js';
export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo }
export const defaultInvoiceExpiry = 60 * 60
export default class {
@ -23,6 +24,7 @@ export default class {
this.userStorage = userStorage
this.txQueue = txQueue
}
async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, dbTx: EntityManager | DataSource) {
const newAddressTransaction = dbTx.getRepository(AddressReceivingTransaction).create({
user_address: address,
@ -56,22 +58,22 @@ export default class {
return entityManager.getRepository(UserReceivingAddress).findOne({ where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } })
}
async AddUserAddress(userId: string, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise<UserReceivingAddress> {
async AddUserAddress(user: User, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise<UserReceivingAddress> {
const newUserAddress = this.DB.getRepository(UserReceivingAddress).create({
address,
callbackUrl: opts.callbackUrl || "",
linkedApplication: opts.linkedApplication,
user: await this.userStorage.GetUser(userId)
user
})
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${userId} linked to ${opts.linkedApplication?.app_id}: ${address} ` })
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${user.user_id} linked to ${opts.linkedApplication?.app_id}: ${address} ` })
}
async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, entityManager = this.DB) {
async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, dbTx: EntityManager | DataSource) {
const i: Partial<UserReceivingInvoice> = { paid_at_unix: Math.floor(Date.now() / 1000), paid_amount: amount, service_fee: serviceFee, internal }
if (!internal) {
i.paidByLnd = true
}
return entityManager.getRepository(UserReceivingInvoice).update(invoice.serial_id, i)
return dbTx.getRepository(UserReceivingInvoice).update(invoice.serial_id, i)
}
GetUserInvoicesFlaggedAsPaid(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserReceivingInvoice[]> {
@ -326,4 +328,29 @@ export default class {
])
return !!i || !!tx || !!u2u
}
async VerifyDbEvent(e: LoggedEvent) {
switch (e.type) {
case "new_invoice":
return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } })
case 'new_address':
return this.DB.getRepository(UserReceivingAddress).findOneOrFail({ where: { address: e.data, user: { user_id: e.userId } } })
case 'invoice_paid':
return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId }, paid_at_unix: MoreThan(0) } })
case 'invoice_payment':
return this.DB.getRepository(UserInvoicePayment).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } })
case 'address_paid':
const [receivingAddress, receivedHash] = e.data.split(":")
return this.DB.getRepository(AddressReceivingTransaction).findOneOrFail({ where: { user_address: { address: receivingAddress }, tx_hash: receivedHash, confs: MoreThan(0) } })
case 'address_payment':
const [sentAddress, sentHash] = e.data.split(":")
return this.DB.getRepository(UserTransactionPayment).findOneOrFail({ where: { address: sentAddress, tx_hash: sentHash, user: { user_id: e.userId } } })
case 'u2u_receiver':
return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { from_user: { user_id: e.data }, to_user: { user_id: e.userId } } })
case 'u2u_sender':
return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { to_user: { user_id: e.data }, from_user: { user_id: e.userId } } })
default:
break;
}
}
}

View file

@ -4,12 +4,15 @@ import { User } from './entity/User.js';
import { UserBasicAuth } from './entity/UserBasicAuth.js';
import { getLogger } from '../helpers/logger.js';
import TransactionsQueue from "./transactionsQueue.js";
import EventsLogManager from './eventsLog.js';
export default class {
DB: DataSource | EntityManager
txQueue: TransactionsQueue
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
eventsLog: EventsLogManager
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue, eventsLog: EventsLogManager) {
this.DB = DB
this.txQueue = txQueue
this.eventsLog = eventsLog
}
async AddUser(balance: number, dbTx: DataSource | EntityManager): Promise<User> {
if (balance && process.env.ALLOW_BALANCE_MIGRATION !== 'true') {
@ -66,7 +69,7 @@ export default class {
throw new Error("unaffected user unlock for " + userId) // TODO: fix logs doxing
}
}
async IncrementUserBalance(userId: string, increment: number, entityManager = this.DB) {
async IncrementUserBalance(userId: string, increment: number, reason: string, entityManager = this.DB) {
const user = await this.GetUser(userId, entityManager)
const res = await entityManager.getRepository(User).increment({
user_id: userId,
@ -75,8 +78,9 @@ export default class {
throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing
}
getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
this.eventsLog.LogEvent({ type: 'balance_increment', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: increment })
}
async DecrementUserBalance(userId: string, decrement: number, entityManager = this.DB) {
async DecrementUserBalance(userId: string, decrement: number, reason: string, entityManager = this.DB) {
const user = await this.GetUser(userId, entityManager)
if (!user || user.balance_sats < decrement) {
throw new Error("not enough balance to decrement")
@ -88,6 +92,7 @@ export default class {
throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing
}
getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
this.eventsLog.LogEvent({ type: 'balance_decrement', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: decrement })
}
async UpdateUser(userId: string, update: Partial<User>, entityManager = this.DB) {