save tx queue

This commit is contained in:
boufni95 2024-02-09 17:22:41 +01:00
parent 4b1249f6a2
commit 55769eaa19
7 changed files with 149 additions and 92 deletions

View file

@ -5,24 +5,29 @@ import { Application } from "./entity/Application.js"
import UserStorage from './userStorage.js';
import { ApplicationUser } from './entity/ApplicationUser.js';
import { getLogger } from '../helpers/logger.js';
import TransactionsQueue, { TX } from "./transactionsQueue.js";
export default class {
DB: DataSource | EntityManager
userStorage: UserStorage
constructor(DB: DataSource | EntityManager, userStorage: UserStorage) {
txQueue: TransactionsQueue
constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) {
this.DB = DB
this.userStorage = userStorage
this.txQueue = txQueue
}
async AddApplication(name: string, allowUserCreation: boolean, entityManager = this.DB): Promise<Application> {
const owner = await this.userStorage.AddUser(0, entityManager)
const repo = entityManager.getRepository(Application)
async AddApplication(name: string, allowUserCreation: boolean): Promise<Application> {
return this.DB.transaction(async tx => {
const owner = await this.userStorage.AddUser(0, tx)
const repo = this.DB.getRepository(Application)
const newApplication = repo.create({
app_id: crypto.randomBytes(32).toString('hex'),
name,
owner,
allow_user_creation: allowUserCreation
})
return repo.save(newApplication)
return tx.getRepository(Application).save(newApplication)
})
}
async GetApplicationByName(name: string, entityManager = this.DB) {

View file

@ -5,75 +5,39 @@ import ApplicationStorage from './applicationStorage.js'
import UserStorage from "./userStorage.js";
import PaymentStorage from "./paymentStorage.js";
import MetricsStorage from "./metricsStorage.js";
import TransactionsQueue, { TX } from "./transactionsQueue.js";
export type StorageSettings = {
dbSettings: DbSettings
}
export const LoadStorageSettingsFromEnv = (test = false): StorageSettings => {
return { dbSettings: LoadDbSettingsFromEnv(test) }
}
type TX = (entityManager: EntityManager) => Promise<void>
export default class {
DB: DataSource | EntityManager
settings: StorageSettings
txQueue: TransactionsQueue
productStorage: ProductStorage
applicationStorage: ApplicationStorage
userStorage: UserStorage
paymentStorage: PaymentStorage
metricsStorage: MetricsStorage
pendingTx: boolean
transactionsQueue: { exec: TX, res: () => void, rej: (message: string) => void }[] = []
constructor(settings: StorageSettings) {
this.settings = settings
}
async Connect(migrations: Function[]) {
const { source, executedMigrations } = await NewDB(this.settings.dbSettings, migrations)
this.DB = source
this.userStorage = new UserStorage(this.DB)
this.productStorage = new ProductStorage(this.DB)
this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage)
this.paymentStorage = new PaymentStorage(this.DB, this.userStorage)
this.metricsStorage = new MetricsStorage(this.DB)
this.txQueue = new TransactionsQueue(this.DB)
this.userStorage = new UserStorage(this.DB, this.txQueue)
this.productStorage = new ProductStorage(this.DB, this.txQueue)
this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue)
this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue)
this.metricsStorage = new MetricsStorage(this.DB, this.txQueue)
return executedMigrations
}
StartTransaction(exec: TX) {
if (!this.pendingTx) {
return this.doTransaction(exec)
}
return new Promise<void>((res, rej) => {
this.transactionsQueue.push({ exec, res, rej })
})
}
async ExecNextInQueue() {
this.pendingTx = false
const next = this.transactionsQueue.pop()
if (!next) {
return
}
try {
await this.doTransaction(next.exec)
next.res()
} catch (err: any) {
next.rej(err.message)
}
}
doTransaction(exec: TX) {
if (this.pendingTx) {
throw new Error("cannot start DB transaction")
}
this.pendingTx = true
return this.DB.transaction(async tx => {
try {
await exec(tx)
this.ExecNextInQueue()
} catch (err) {
this.ExecNextInQueue()
throw err
}
})
StartTransaction(exec: TX<void>) {
return this.txQueue.PushToQueue({ exec, dbTx: true })
}
}

View file

@ -2,10 +2,13 @@ import { Between, DataSource, EntityManager, FindOperator, LessThanOrEqual, More
import { RoutingEvent } from "./entity/RoutingEvent.js"
import { BalanceEvent } from "./entity/BalanceEvent.js"
import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js"
import TransactionsQueue, { TX } from "./transactionsQueue.js";
export default class {
DB: DataSource | EntityManager
constructor(DB: DataSource | EntityManager) {
txQueue: TransactionsQueue
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
this.DB = DB
this.txQueue = txQueue
}
async SaveRoutingEvent(event: Partial<RoutingEvent>, entityManager = this.DB) {
const entry = entityManager.getRepository(RoutingEvent).create(event)

View file

@ -11,17 +11,20 @@ import { AddressReceivingTransaction } from './entity/AddressReceivingTransactio
import { UserInvoicePayment } from './entity/UserInvoicePayment.js';
import { UserToUserPayment } from './entity/UserToUserPayment.js';
import { Application } from './entity/Application.js';
import TransactionsQueue, { TX } from "./transactionsQueue.js";
export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo }
export const defaultInvoiceExpiry = 60 * 60
export default class {
DB: DataSource | EntityManager
userStorage: UserStorage
constructor(DB: DataSource | EntityManager, userStorage: UserStorage) {
txQueue: TransactionsQueue
constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) {
this.DB = DB
this.userStorage = userStorage
this.txQueue = txQueue
}
async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, entityManager = this.DB) {
const newAddressTransaction = entityManager.getRepository(AddressReceivingTransaction).create({
async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, dbTx: EntityManager | DataSource) {
const newAddressTransaction = dbTx.getRepository(AddressReceivingTransaction).create({
user_address: address,
tx_hash: txHash,
output_index: outputIndex,
@ -32,7 +35,7 @@ export default class {
broadcast_height: height,
confs: internal ? 10 : 0
})
return entityManager.getRepository(AddressReceivingTransaction).save(newAddressTransaction)
return dbTx.getRepository(AddressReceivingTransaction).save(newAddressTransaction)
}
GetUserReceivingTransactions(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<AddressReceivingTransaction[]> {
@ -53,14 +56,14 @@ export default class {
return entityManager.getRepository(UserReceivingAddress).findOne({ where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } })
}
async AddUserAddress(userId: string, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}, entityManager = this.DB): Promise<UserReceivingAddress> {
const newUserAddress = entityManager.getRepository(UserReceivingAddress).create({
async AddUserAddress(userId: string, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise<UserReceivingAddress> {
const newUserAddress = this.DB.getRepository(UserReceivingAddress).create({
address,
callbackUrl: opts.callbackUrl || "",
linkedApplication: opts.linkedApplication,
user: await this.userStorage.GetUser(userId, entityManager)
user: await this.userStorage.GetUser(userId)
})
return entityManager.getRepository(UserReceivingAddress).save(newUserAddress)
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false })
}
async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, entityManager = this.DB) {
@ -87,8 +90,8 @@ export default class {
})
}
async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }, entityManager = this.DB): Promise<UserReceivingInvoice> {
const newUserInvoice = entityManager.getRepository(UserReceivingInvoice).create({
async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }): Promise<UserReceivingInvoice> {
const newUserInvoice = this.DB.getRepository(UserReceivingInvoice).create({
invoice: invoice,
callbackUrl: options.callbackUrl,
user: user,
@ -98,7 +101,7 @@ export default class {
linkedApplication: options.linkedApplication,
zap_info: options.zapInfo
})
return entityManager.getRepository(UserReceivingInvoice).save(newUserInvoice)
return this.txQueue.PushToQueue<UserReceivingInvoice>({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false })
}
async GetAddressOwner(address: string, entityManager = this.DB): Promise<UserReceivingAddress | null> {
@ -117,8 +120,8 @@ export default class {
})
}
async AddUserInvoicePayment(userId: string, invoice: string, amount: number, routingFees: number, serviceFees: number, internal: boolean, linkedApplication: Application, entityManager = this.DB): Promise<UserInvoicePayment> {
const newPayment = entityManager.getRepository(UserInvoicePayment).create({
async AddUserInvoicePayment(userId: string, invoice: string, amount: number, routingFees: number, serviceFees: number, internal: boolean, linkedApplication: Application): Promise<UserInvoicePayment> {
const newPayment = this.DB.getRepository(UserInvoicePayment).create({
user: await this.userStorage.GetUser(userId),
paid_amount: amount,
invoice,
@ -128,7 +131,7 @@ export default class {
internal,
linkedApplication
})
return entityManager.getRepository(UserInvoicePayment).save(newPayment)
return this.txQueue.PushToQueue<UserInvoicePayment>({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false })
}
GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserInvoicePayment[]> {
@ -147,8 +150,8 @@ export default class {
})
}
async AddUserTransactionPayment(userId: string, address: string, txHash: string, txOutput: number, amount: number, chainFees: number, serviceFees: number, internal: boolean, height: number, linkedApplication: Application, entityManager = this.DB): Promise<UserTransactionPayment> {
const newTx = entityManager.getRepository(UserTransactionPayment).create({
async AddUserTransactionPayment(userId: string, address: string, txHash: string, txOutput: number, amount: number, chainFees: number, serviceFees: number, internal: boolean, height: number, linkedApplication: Application): Promise<UserTransactionPayment> {
const newTx = this.DB.getRepository(UserTransactionPayment).create({
user: await this.userStorage.GetUser(userId),
address,
paid_amount: amount,
@ -162,7 +165,7 @@ export default class {
confs: internal ? 10 : 0,
linkedApplication
})
return entityManager.getRepository(UserTransactionPayment).save(newTx)
return this.txQueue.PushToQueue<UserTransactionPayment>({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false })
}
GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserTransactionPayment[]> {
@ -195,18 +198,18 @@ export default class {
}
async AddUserEphemeralKey(userId: string, keyType: EphemeralKeyType, linkedApplication: Application, entityManager = this.DB): Promise<UserEphemeralKey> {
const found = await entityManager.getRepository(UserEphemeralKey).findOne({ where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } })
async AddUserEphemeralKey(userId: string, keyType: EphemeralKeyType, linkedApplication: Application): Promise<UserEphemeralKey> {
const found = await this.DB.getRepository(UserEphemeralKey).findOne({ where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } })
if (found) {
return found
}
const newKey = entityManager.getRepository(UserEphemeralKey).create({
user: await this.userStorage.GetUser(userId, entityManager),
const newKey = this.DB.getRepository(UserEphemeralKey).create({
user: await this.userStorage.GetUser(userId),
key: crypto.randomBytes(31).toString('hex'),
type: keyType,
linkedApplication
})
return entityManager.getRepository(UserEphemeralKey).save(newKey)
return this.txQueue.PushToQueue<UserEphemeralKey>({ exec: async db => db.getRepository(UserEphemeralKey).save(newKey), dbTx: false })
}
async UseUserEphemeralKey(key: string, keyType: EphemeralKeyType, persist = false, entityManager = this.DB): Promise<UserEphemeralKey> {
@ -225,16 +228,16 @@ export default class {
return found
}
async AddUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, entityManager = this.DB) {
const newKey = entityManager.getRepository(UserToUserPayment).create({
from_user: await this.userStorage.GetUser(fromUserId, entityManager),
to_user: await this.userStorage.GetUser(toUserId, entityManager),
async AddUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application) {
const newKey = this.DB.getRepository(UserToUserPayment).create({
from_user: await this.userStorage.GetUser(fromUserId),
to_user: await this.userStorage.GetUser(toUserId),
paid_at_unix: Math.floor(Date.now() / 1000),
paid_amount: amount,
service_fees: fee,
linkedApplication
})
return entityManager.getRepository(UserToUserPayment).save(newKey)
return this.txQueue.PushToQueue<UserToUserPayment>({ exec: async db => db.getRepository(UserToUserPayment).save(newKey), dbTx: false })
}
GetUserToUserReceivedPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB) {

View file

@ -1,16 +1,19 @@
import { DataSource, EntityManager } from "typeorm"
import { Product } from "./entity/Product.js"
import { User } from "./entity/User.js"
import TransactionsQueue, { TX } from "./transactionsQueue.js";
export default class {
DB: DataSource | EntityManager
constructor(DB: DataSource | EntityManager) {
txQueue: TransactionsQueue
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
this.DB = DB
this.txQueue = txQueue
}
async AddProduct(name: string, priceSats: number, user: User, entityManager = this.DB): Promise<Product> {
const newProduct = entityManager.getRepository(Product).create({
async AddProduct(name: string, priceSats: number, user: User): Promise<Product> {
const newProduct = this.DB.getRepository(Product).create({
name: name, price_sats: priceSats, owner: user
})
return entityManager.getRepository(Product).save(newProduct)
return this.txQueue.PushToQueue<Product>({ exec: async db => db.getRepository(Product).save(newProduct), dbTx: false })
}
async GetProduct(id: string, entityManager = this.DB): Promise<Product> {

View file

@ -0,0 +1,76 @@
import { DataSource, EntityManager, EntityTarget } from "typeorm"
export type TX<T> = (entityManager: EntityManager | DataSource) => Promise<T>
export type TxOperation<T> = {
exec: TX<T>
dbTx: boolean
}
export default class {
DB: DataSource | EntityManager
pendingTx: boolean
transactionsQueue: { op: TxOperation<any>, res: (v: any) => void, rej: (message: string) => void }[] = []
constructor(DB: DataSource | EntityManager) {
this.DB = DB
}
PushToQueue<T>(op: TxOperation<T>) {
if (!this.pendingTx) {
return this.execQueueItem(op)
}
return new Promise<T>((res, rej) => {
this.transactionsQueue.push({ op, res, rej })
})
}
async execNextInQueue() {
this.pendingTx = false
const next = this.transactionsQueue.pop()
if (!next) {
return
}
try {
const res = await this.execQueueItem(next.op)
next.res(res)
} catch (err: any) {
next.rej(err.message)
}
}
execQueueItem<T>(op: TxOperation<T>) {
if (this.pendingTx) {
throw new Error("cannot start DB transaction")
}
this.pendingTx = true
if (op.dbTx) {
return this.doTransaction(op.exec)
}
return this.doOperation(op.exec)
}
async doOperation<T>(exec: TX<T>) {
try {
const res = await exec(this.DB)
this.execNextInQueue()
return res
} catch (err) {
this.execNextInQueue()
throw err
}
}
doTransaction<T>(exec: TX<T>) {
return this.DB.transaction(async tx => {
try {
const res = await exec(tx)
this.execNextInQueue()
return res
} catch (err) {
this.execNextInQueue()
throw err
}
})
}
}

View file

@ -3,21 +3,24 @@ import { DataSource, EntityManager } from "typeorm"
import { User } from './entity/User.js';
import { UserBasicAuth } from './entity/UserBasicAuth.js';
import { getLogger } from '../helpers/logger.js';
import TransactionsQueue from "./transactionsQueue.js";
export default class {
DB: DataSource | EntityManager
constructor(DB: DataSource | EntityManager) {
txQueue: TransactionsQueue
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
this.DB = DB
this.txQueue = txQueue
}
async AddUser(balance: number, entityManager = this.DB): Promise<User> {
async AddUser(balance: number, dbTx: DataSource | EntityManager): Promise<User> {
if (balance && process.env.ALLOW_BALANCE_MIGRATION !== 'true') {
throw new Error("balance migration is not allowed")
}
getLogger({})("Adding user with balance", balance)
const newUser = entityManager.getRepository(User).create({
const newUser = dbTx.getRepository(User).create({
user_id: crypto.randomBytes(32).toString('hex'),
balance_sats: balance
})
return entityManager.getRepository(User).save(newUser)
return dbTx.getRepository(User).save(newUser)
}
async AddBasicUser(name: string, secret: string): Promise<UserBasicAuth> {