diff --git a/src/services/storage/applicationStorage.ts b/src/services/storage/applicationStorage.ts index 173f2662..5b660b9f 100644 --- a/src/services/storage/applicationStorage.ts +++ b/src/services/storage/applicationStorage.ts @@ -5,24 +5,29 @@ import { Application } from "./entity/Application.js" import UserStorage from './userStorage.js'; import { ApplicationUser } from './entity/ApplicationUser.js'; import { getLogger } from '../helpers/logger.js'; +import TransactionsQueue, { TX } from "./transactionsQueue.js"; export default class { DB: DataSource | EntityManager userStorage: UserStorage - constructor(DB: DataSource | EntityManager, userStorage: UserStorage) { + txQueue: TransactionsQueue + constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) { this.DB = DB this.userStorage = userStorage + this.txQueue = txQueue } - async AddApplication(name: string, allowUserCreation: boolean, entityManager = this.DB): Promise { - const owner = await this.userStorage.AddUser(0, entityManager) - const repo = entityManager.getRepository(Application) - const newApplication = repo.create({ - app_id: crypto.randomBytes(32).toString('hex'), - name, - owner, - allow_user_creation: allowUserCreation + async AddApplication(name: string, allowUserCreation: boolean): Promise { + return this.DB.transaction(async tx => { + const owner = await this.userStorage.AddUser(0, tx) + const repo = this.DB.getRepository(Application) + const newApplication = repo.create({ + app_id: crypto.randomBytes(32).toString('hex'), + name, + owner, + allow_user_creation: allowUserCreation + }) + return tx.getRepository(Application).save(newApplication) }) - return repo.save(newApplication) } async GetApplicationByName(name: string, entityManager = this.DB) { diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index 3df3573b..61c10645 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -5,75 +5,39 @@ import ApplicationStorage from './applicationStorage.js' import UserStorage from "./userStorage.js"; import PaymentStorage from "./paymentStorage.js"; import MetricsStorage from "./metricsStorage.js"; +import TransactionsQueue, { TX } from "./transactionsQueue.js"; export type StorageSettings = { dbSettings: DbSettings } export const LoadStorageSettingsFromEnv = (test = false): StorageSettings => { return { dbSettings: LoadDbSettingsFromEnv(test) } } -type TX = (entityManager: EntityManager) => Promise export default class { DB: DataSource | EntityManager settings: StorageSettings + txQueue: TransactionsQueue productStorage: ProductStorage applicationStorage: ApplicationStorage userStorage: UserStorage paymentStorage: PaymentStorage metricsStorage: MetricsStorage - pendingTx: boolean - transactionsQueue: { exec: TX, res: () => void, rej: (message: string) => void }[] = [] constructor(settings: StorageSettings) { this.settings = settings } async Connect(migrations: Function[]) { const { source, executedMigrations } = await NewDB(this.settings.dbSettings, migrations) this.DB = source - this.userStorage = new UserStorage(this.DB) - this.productStorage = new ProductStorage(this.DB) - this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage) - this.paymentStorage = new PaymentStorage(this.DB, this.userStorage) - this.metricsStorage = new MetricsStorage(this.DB) + this.txQueue = new TransactionsQueue(this.DB) + this.userStorage = new UserStorage(this.DB, this.txQueue) + this.productStorage = new ProductStorage(this.DB, this.txQueue) + this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue) + this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue) + this.metricsStorage = new MetricsStorage(this.DB, this.txQueue) return executedMigrations } - StartTransaction(exec: TX) { - if (!this.pendingTx) { - return this.doTransaction(exec) - } - - return new Promise((res, rej) => { - this.transactionsQueue.push({ exec, res, rej }) - }) - } - - async ExecNextInQueue() { - this.pendingTx = false - const next = this.transactionsQueue.pop() - if (!next) { - return - } - try { - await this.doTransaction(next.exec) - next.res() - } catch (err: any) { - next.rej(err.message) - } - } - - doTransaction(exec: TX) { - if (this.pendingTx) { - throw new Error("cannot start DB transaction") - } - this.pendingTx = true - return this.DB.transaction(async tx => { - try { - await exec(tx) - this.ExecNextInQueue() - } catch (err) { - this.ExecNextInQueue() - throw err - } - }) + StartTransaction(exec: TX) { + return this.txQueue.PushToQueue({ exec, dbTx: true }) } } \ No newline at end of file diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index 84cdacde..186e910c 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -2,10 +2,13 @@ import { Between, DataSource, EntityManager, FindOperator, LessThanOrEqual, More import { RoutingEvent } from "./entity/RoutingEvent.js" import { BalanceEvent } from "./entity/BalanceEvent.js" import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" +import TransactionsQueue, { TX } from "./transactionsQueue.js"; export default class { DB: DataSource | EntityManager - constructor(DB: DataSource | EntityManager) { + txQueue: TransactionsQueue + constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { this.DB = DB + this.txQueue = txQueue } async SaveRoutingEvent(event: Partial, entityManager = this.DB) { const entry = entityManager.getRepository(RoutingEvent).create(event) diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index e5c61dfc..c8af90e9 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -11,17 +11,20 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio import { UserInvoicePayment } from './entity/UserInvoicePayment.js'; import { UserToUserPayment } from './entity/UserToUserPayment.js'; import { Application } from './entity/Application.js'; +import TransactionsQueue, { TX } from "./transactionsQueue.js"; export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo } export const defaultInvoiceExpiry = 60 * 60 export default class { DB: DataSource | EntityManager userStorage: UserStorage - constructor(DB: DataSource | EntityManager, userStorage: UserStorage) { + txQueue: TransactionsQueue + constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) { this.DB = DB this.userStorage = userStorage + this.txQueue = txQueue } - async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, entityManager = this.DB) { - const newAddressTransaction = entityManager.getRepository(AddressReceivingTransaction).create({ + async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, dbTx: EntityManager | DataSource) { + const newAddressTransaction = dbTx.getRepository(AddressReceivingTransaction).create({ user_address: address, tx_hash: txHash, output_index: outputIndex, @@ -32,7 +35,7 @@ export default class { broadcast_height: height, confs: internal ? 10 : 0 }) - return entityManager.getRepository(AddressReceivingTransaction).save(newAddressTransaction) + return dbTx.getRepository(AddressReceivingTransaction).save(newAddressTransaction) } GetUserReceivingTransactions(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { @@ -53,14 +56,14 @@ export default class { return entityManager.getRepository(UserReceivingAddress).findOne({ where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }) } - async AddUserAddress(userId: string, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}, entityManager = this.DB): Promise { - const newUserAddress = entityManager.getRepository(UserReceivingAddress).create({ + async AddUserAddress(userId: string, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise { + const newUserAddress = this.DB.getRepository(UserReceivingAddress).create({ address, callbackUrl: opts.callbackUrl || "", linkedApplication: opts.linkedApplication, - user: await this.userStorage.GetUser(userId, entityManager) + user: await this.userStorage.GetUser(userId) }) - return entityManager.getRepository(UserReceivingAddress).save(newUserAddress) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false }) } async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, entityManager = this.DB) { @@ -87,8 +90,8 @@ export default class { }) } - async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }, entityManager = this.DB): Promise { - const newUserInvoice = entityManager.getRepository(UserReceivingInvoice).create({ + async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }): Promise { + const newUserInvoice = this.DB.getRepository(UserReceivingInvoice).create({ invoice: invoice, callbackUrl: options.callbackUrl, user: user, @@ -98,7 +101,7 @@ export default class { linkedApplication: options.linkedApplication, zap_info: options.zapInfo }) - return entityManager.getRepository(UserReceivingInvoice).save(newUserInvoice) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false }) } async GetAddressOwner(address: string, entityManager = this.DB): Promise { @@ -117,8 +120,8 @@ export default class { }) } - async AddUserInvoicePayment(userId: string, invoice: string, amount: number, routingFees: number, serviceFees: number, internal: boolean, linkedApplication: Application, entityManager = this.DB): Promise { - const newPayment = entityManager.getRepository(UserInvoicePayment).create({ + async AddUserInvoicePayment(userId: string, invoice: string, amount: number, routingFees: number, serviceFees: number, internal: boolean, linkedApplication: Application): Promise { + const newPayment = this.DB.getRepository(UserInvoicePayment).create({ user: await this.userStorage.GetUser(userId), paid_amount: amount, invoice, @@ -128,7 +131,7 @@ export default class { internal, linkedApplication }) - return entityManager.getRepository(UserInvoicePayment).save(newPayment) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false }) } GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { @@ -147,8 +150,8 @@ export default class { }) } - async AddUserTransactionPayment(userId: string, address: string, txHash: string, txOutput: number, amount: number, chainFees: number, serviceFees: number, internal: boolean, height: number, linkedApplication: Application, entityManager = this.DB): Promise { - const newTx = entityManager.getRepository(UserTransactionPayment).create({ + async AddUserTransactionPayment(userId: string, address: string, txHash: string, txOutput: number, amount: number, chainFees: number, serviceFees: number, internal: boolean, height: number, linkedApplication: Application): Promise { + const newTx = this.DB.getRepository(UserTransactionPayment).create({ user: await this.userStorage.GetUser(userId), address, paid_amount: amount, @@ -162,7 +165,7 @@ export default class { confs: internal ? 10 : 0, linkedApplication }) - return entityManager.getRepository(UserTransactionPayment).save(newTx) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false }) } GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { @@ -195,18 +198,18 @@ export default class { } - async AddUserEphemeralKey(userId: string, keyType: EphemeralKeyType, linkedApplication: Application, entityManager = this.DB): Promise { - const found = await entityManager.getRepository(UserEphemeralKey).findOne({ where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }) + async AddUserEphemeralKey(userId: string, keyType: EphemeralKeyType, linkedApplication: Application): Promise { + const found = await this.DB.getRepository(UserEphemeralKey).findOne({ where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }) if (found) { return found } - const newKey = entityManager.getRepository(UserEphemeralKey).create({ - user: await this.userStorage.GetUser(userId, entityManager), + const newKey = this.DB.getRepository(UserEphemeralKey).create({ + user: await this.userStorage.GetUser(userId), key: crypto.randomBytes(31).toString('hex'), type: keyType, linkedApplication }) - return entityManager.getRepository(UserEphemeralKey).save(newKey) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserEphemeralKey).save(newKey), dbTx: false }) } async UseUserEphemeralKey(key: string, keyType: EphemeralKeyType, persist = false, entityManager = this.DB): Promise { @@ -225,16 +228,16 @@ export default class { return found } - async AddUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, entityManager = this.DB) { - const newKey = entityManager.getRepository(UserToUserPayment).create({ - from_user: await this.userStorage.GetUser(fromUserId, entityManager), - to_user: await this.userStorage.GetUser(toUserId, entityManager), + async AddUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application) { + const newKey = this.DB.getRepository(UserToUserPayment).create({ + from_user: await this.userStorage.GetUser(fromUserId), + to_user: await this.userStorage.GetUser(toUserId), paid_at_unix: Math.floor(Date.now() / 1000), paid_amount: amount, service_fees: fee, linkedApplication }) - return entityManager.getRepository(UserToUserPayment).save(newKey) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserToUserPayment).save(newKey), dbTx: false }) } GetUserToUserReceivedPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB) { diff --git a/src/services/storage/productStorage.ts b/src/services/storage/productStorage.ts index deeeb7c1..abc9f39f 100644 --- a/src/services/storage/productStorage.ts +++ b/src/services/storage/productStorage.ts @@ -1,16 +1,19 @@ import { DataSource, EntityManager } from "typeorm" import { Product } from "./entity/Product.js" import { User } from "./entity/User.js" +import TransactionsQueue, { TX } from "./transactionsQueue.js"; export default class { DB: DataSource | EntityManager - constructor(DB: DataSource | EntityManager) { + txQueue: TransactionsQueue + constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { this.DB = DB + this.txQueue = txQueue } - async AddProduct(name: string, priceSats: number, user: User, entityManager = this.DB): Promise { - const newProduct = entityManager.getRepository(Product).create({ + async AddProduct(name: string, priceSats: number, user: User): Promise { + const newProduct = this.DB.getRepository(Product).create({ name: name, price_sats: priceSats, owner: user }) - return entityManager.getRepository(Product).save(newProduct) + return this.txQueue.PushToQueue({ exec: async db => db.getRepository(Product).save(newProduct), dbTx: false }) } async GetProduct(id: string, entityManager = this.DB): Promise { diff --git a/src/services/storage/transactionsQueue.ts b/src/services/storage/transactionsQueue.ts new file mode 100644 index 00000000..aad4e3ce --- /dev/null +++ b/src/services/storage/transactionsQueue.ts @@ -0,0 +1,76 @@ +import { DataSource, EntityManager, EntityTarget } from "typeorm" + +export type TX = (entityManager: EntityManager | DataSource) => Promise +export type TxOperation = { + exec: TX + dbTx: boolean +} + +export default class { + DB: DataSource | EntityManager + pendingTx: boolean + transactionsQueue: { op: TxOperation, res: (v: any) => void, rej: (message: string) => void }[] = [] + constructor(DB: DataSource | EntityManager) { + this.DB = DB + } + + PushToQueue(op: TxOperation) { + if (!this.pendingTx) { + return this.execQueueItem(op) + } + + return new Promise((res, rej) => { + this.transactionsQueue.push({ op, res, rej }) + }) + } + + async execNextInQueue() { + this.pendingTx = false + const next = this.transactionsQueue.pop() + if (!next) { + return + } + try { + const res = await this.execQueueItem(next.op) + next.res(res) + } catch (err: any) { + next.rej(err.message) + } + } + + execQueueItem(op: TxOperation) { + if (this.pendingTx) { + throw new Error("cannot start DB transaction") + } + this.pendingTx = true + if (op.dbTx) { + return this.doTransaction(op.exec) + } + return this.doOperation(op.exec) + } + + async doOperation(exec: TX) { + try { + const res = await exec(this.DB) + this.execNextInQueue() + return res + } catch (err) { + this.execNextInQueue() + throw err + } + } + + + doTransaction(exec: TX) { + return this.DB.transaction(async tx => { + try { + const res = await exec(tx) + this.execNextInQueue() + return res + } catch (err) { + this.execNextInQueue() + throw err + } + }) + } +} \ No newline at end of file diff --git a/src/services/storage/userStorage.ts b/src/services/storage/userStorage.ts index 0f146c0a..9f45f6e8 100644 --- a/src/services/storage/userStorage.ts +++ b/src/services/storage/userStorage.ts @@ -3,21 +3,24 @@ import { DataSource, EntityManager } from "typeorm" import { User } from './entity/User.js'; import { UserBasicAuth } from './entity/UserBasicAuth.js'; import { getLogger } from '../helpers/logger.js'; +import TransactionsQueue from "./transactionsQueue.js"; export default class { DB: DataSource | EntityManager - constructor(DB: DataSource | EntityManager) { + txQueue: TransactionsQueue + constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { this.DB = DB + this.txQueue = txQueue } - async AddUser(balance: number, entityManager = this.DB): Promise { + async AddUser(balance: number, dbTx: DataSource | EntityManager): Promise { if (balance && process.env.ALLOW_BALANCE_MIGRATION !== 'true') { throw new Error("balance migration is not allowed") } getLogger({})("Adding user with balance", balance) - const newUser = entityManager.getRepository(User).create({ + const newUser = dbTx.getRepository(User).create({ user_id: crypto.randomBytes(32).toString('hex'), balance_sats: balance }) - return entityManager.getRepository(User).save(newUser) + return dbTx.getRepository(User).save(newUser) } async AddBasicUser(name: string, secret: string): Promise {