diff --git a/src/services/main/paymentManager.ts b/src/services/main/paymentManager.ts index 38f9474d..3bc32c3e 100644 --- a/src/services/main/paymentManager.ts +++ b/src/services/main/paymentManager.ts @@ -215,7 +215,8 @@ export default class { } } else { await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee) - this.addressPaidCb({ hash: crypto.randomBytes(32).toString("hex"), index: 0 }, req.address, req.amoutSats, true) + txId = crypto.randomBytes(32).toString("hex") + this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true) } if (isAppUserPayment && serviceFee > 0) { diff --git a/src/services/storage/db.ts b/src/services/storage/db.ts index 8f5722f0..7fa86a94 100644 --- a/src/services/storage/db.ts +++ b/src/services/storage/db.ts @@ -68,4 +68,17 @@ export default async (settings: DbSettings, migrations: Function[]): Promise<{ s return { source, executedMigrations } } return { source, executedMigrations: [] } +} + +export const runFakeMigration = async (databaseFile: string, migrations: Function[]) => { + const source = await new DataSource({ + type: "sqlite", + database: databaseFile, + // logging: true, + entities: [User, UserReceivingInvoice, UserReceivingAddress, AddressReceivingTransaction, UserInvoicePayment, UserTransactionPayment, + UserBasicAuth, UserEphemeralKey, Product, UserToUserPayment, Application, ApplicationUser, UserToUserPayment], + //synchronize: true, + migrations + }).initialize() + return source.runMigrations({ fake: true }) } \ No newline at end of file diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index 558c79c1..a1696cc4 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -38,7 +38,7 @@ export default class { return { executedMigrations, executedMetricsMigrations }; } - StartTransaction(exec: TX) { - return this.txQueue.PushToQueue({ exec, dbTx: true }) + StartTransaction(exec: TX, description?: string) { + return this.txQueue.PushToQueue({ exec, dbTx: true, description }) } } \ No newline at end of file diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index acb1cce9..8cc5332c 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -72,10 +72,10 @@ export default class { await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors) } if (event.forward_errors_as_input) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forwar_errors_as_input", event.forward_errors_as_input) + await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input) } if (event.forward_errors_as_output) { - await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forwar_errors_as_output", event.forward_errors_as_output) + await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output) } if (event.missed_forward_fee_as_input) { await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input) diff --git a/src/services/storage/migrations/runner.ts b/src/services/storage/migrations/runner.ts index cc35c702..3c81aa2f 100644 --- a/src/services/storage/migrations/runner.ts +++ b/src/services/storage/migrations/runner.ts @@ -1,38 +1,21 @@ import { PubLogger } from '../../helpers/logger.js' -import { DbSettings } from '../db.js' +import { DbSettings, runFakeMigration } from '../db.js' import Storage, { StorageSettings } from '../index.js' import { Initial1703170309875 } from './1703170309875-initial.js' import { LndMetrics1703170330183 } from './1703170330183-lnd_metrics.js' import { ChannelRouting1709316653538 } from './1709316653538-channel_routing.js' -const allMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538] +const allMigrations = [Initial1703170309875] +const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538] export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise => { - if (arg === 'initial_migration') { - await connectAndMigrate(log, storageManager, true, settings, [Initial1703170309875], []) + if (arg === 'fake_initial_migration') { + runFakeMigration(settings.databaseFile, [Initial1703170309875]) return true - } else if (arg === 'lnd_metrics_migration') { - await connectAndMigrate(log, storageManager, true, settings, [], [LndMetrics1703170330183]) - return true - } else if (arg === 'channel_routing_migration') { - await connectAndMigrate(log, storageManager, true, settings, [], [ChannelRouting1709316653538]) - return true - } else if (arg === 'all_migrations') { - await connectAndMigrate(log, storageManager, true, settings, [], allMigrations) - return true - } else if (settings.migrate) { - await connectAndMigrate(log, storageManager, false, settings, [], allMigrations) - return false } - await connectAndMigrate(log, storageManager, false, settings, [], []) + await connectAndMigrate(log, storageManager, allMigrations, allMetricsMigrations) return false } -const connectAndMigrate = async (log: PubLogger, storageManager: Storage, manual: boolean, settings: DbSettings, migrations: Function[], metricsMigrations: Function[]) => { - if (manual && settings.migrate) { - throw new Error("auto migration is enabled, no need to run manual migration") - } - if (migrations.length > 0) { - log("will add", migrations.length, "typeorm migrations...") - } +const connectAndMigrate = async (log: PubLogger, storageManager: Storage, migrations: Function[], metricsMigrations: Function[]) => { const { executedMigrations, executedMetricsMigrations } = await storageManager.Connect(migrations, metricsMigrations) if (migrations.length > 0) { log(executedMigrations.length, "of", migrations.length, "migrations were executed correctly") @@ -43,5 +26,4 @@ const connectAndMigrate = async (log: PubLogger, storageManager: Storage, manual log(executedMetricsMigrations.length, "of", metricsMigrations.length, "metrics migrations were executed correctly") log(executedMetricsMigrations) } - } \ No newline at end of file diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index 49f73e21..07893213 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -63,7 +63,7 @@ export default class { linkedApplication: opts.linkedApplication, user: await this.userStorage.GetUser(userId) }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false }) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${userId} linked to ${opts.linkedApplication?.app_id}: ${address} ` }) } async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, entityManager = this.DB) { @@ -101,7 +101,7 @@ export default class { linkedApplication: options.linkedApplication, zap_info: options.zapInfo }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false }) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false, description: `add invoice for ${user.user_id} linked to ${options.linkedApplication?.app_id}: ${invoice} ` }) } async GetAddressOwner(address: string, entityManager = this.DB): Promise { @@ -131,7 +131,7 @@ export default class { internal, linkedApplication }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false }) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` }) } GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { @@ -165,7 +165,7 @@ export default class { confs: internal ? 10 : 0, linkedApplication }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false }) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false, description: `add tx payment for ${userId} linked to ${linkedApplication.app_id}: ${address}, amt: ${amount} ` }) } GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { diff --git a/src/services/storage/transactionsQueue.ts b/src/services/storage/transactionsQueue.ts index 5f2fff8e..44a74fb4 100644 --- a/src/services/storage/transactionsQueue.ts +++ b/src/services/storage/transactionsQueue.ts @@ -5,6 +5,7 @@ export type TX = (entityManager: EntityManager | DataSource) => Promise export type TxOperation = { exec: TX dbTx: boolean + description?: string } export default class { @@ -19,10 +20,9 @@ export default class { PushToQueue(op: TxOperation) { if (!this.pendingTx) { - this.log("executing item immediately") return this.execQueueItem(op) } - this.log("holding item in queue") + this.log("queue not empty, possibly stuck") return new Promise((res, rej) => { this.transactionsQueue.push({ op, res, rej }) }) @@ -32,16 +32,14 @@ export default class { this.pendingTx = false const next = this.transactionsQueue.pop() if (!next) { - this.log("no more items in queue") + this.log("queue is clear") return } try { - this.log("executing next item in queue") const res = await this.execQueueItem(next.op) - this.log("resolving next item in queue") + if (next.op.description) this.log("done", next.op.description) next.res(res) } catch (err: any) { - this.log("rejecting next item in queue") next.rej(err.message) } } @@ -52,21 +50,17 @@ export default class { } this.pendingTx = true if (op.dbTx) { - this.log("executing item transaction") return this.doTransaction(op.exec) } - this.log("executing item operation") return this.doOperation(op.exec) } async doOperation(exec: TX) { try { const res = await exec(this.DB) - this.log("executing item operation done") this.execNextInQueue() return res } catch (err) { - this.log("executing item operation failed") this.execNextInQueue() throw err } @@ -77,11 +71,9 @@ export default class { return this.DB.transaction(async tx => { try { const res = await exec(tx) - this.log("executing item transaction done") this.execNextInQueue() return res } catch (err) { - this.log("executing item transaction failed") this.execNextInQueue() throw err } diff --git a/src/services/storage/userStorage.ts b/src/services/storage/userStorage.ts index 9f45f6e8..c5cd43fb 100644 --- a/src/services/storage/userStorage.ts +++ b/src/services/storage/userStorage.ts @@ -74,7 +74,7 @@ export default class { if (!res.affected) { throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing } - getLogger({ userId: userId })("incremented balance from", user.balance_sats, "sats, by", increment, "sats") + getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats") } async DecrementUserBalance(userId: string, decrement: number, entityManager = this.DB) { const user = await this.GetUser(userId, entityManager) @@ -87,7 +87,7 @@ export default class { if (!res.affected) { throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing } - getLogger({ userId: userId })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats") + getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats") } async UpdateUser(userId: string, update: Partial, entityManager = this.DB) {