commit
9bf6f8dff8
4 changed files with 12 additions and 20 deletions
|
|
@ -38,7 +38,7 @@ export default class {
|
|||
return { executedMigrations, executedMetricsMigrations };
|
||||
}
|
||||
|
||||
StartTransaction(exec: TX<void>) {
|
||||
return this.txQueue.PushToQueue({ exec, dbTx: true })
|
||||
StartTransaction(exec: TX<void>, description?: string) {
|
||||
return this.txQueue.PushToQueue({ exec, dbTx: true, description })
|
||||
}
|
||||
}
|
||||
|
|
@ -63,7 +63,7 @@ export default class {
|
|||
linkedApplication: opts.linkedApplication,
|
||||
user: await this.userStorage.GetUser(userId)
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false })
|
||||
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${userId} linked to ${opts.linkedApplication?.app_id}: ${address} ` })
|
||||
}
|
||||
|
||||
async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, entityManager = this.DB) {
|
||||
|
|
@ -101,7 +101,7 @@ export default class {
|
|||
linkedApplication: options.linkedApplication,
|
||||
zap_info: options.zapInfo
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserReceivingInvoice>({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false })
|
||||
return this.txQueue.PushToQueue<UserReceivingInvoice>({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false, description: `add invoice for ${user.user_id} linked to ${options.linkedApplication?.app_id}: ${invoice} ` })
|
||||
}
|
||||
|
||||
async GetAddressOwner(address: string, entityManager = this.DB): Promise<UserReceivingAddress | null> {
|
||||
|
|
@ -131,7 +131,7 @@ export default class {
|
|||
internal,
|
||||
linkedApplication
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserInvoicePayment>({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false })
|
||||
return this.txQueue.PushToQueue<UserInvoicePayment>({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` })
|
||||
}
|
||||
|
||||
GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserInvoicePayment[]> {
|
||||
|
|
@ -165,7 +165,7 @@ export default class {
|
|||
confs: internal ? 10 : 0,
|
||||
linkedApplication
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserTransactionPayment>({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false })
|
||||
return this.txQueue.PushToQueue<UserTransactionPayment>({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false, description: `add tx payment for ${userId} linked to ${linkedApplication.app_id}: ${address}, amt: ${amount} ` })
|
||||
}
|
||||
|
||||
GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserTransactionPayment[]> {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ export type TX<T> = (entityManager: EntityManager | DataSource) => Promise<T>
|
|||
export type TxOperation<T> = {
|
||||
exec: TX<T>
|
||||
dbTx: boolean
|
||||
description?: string
|
||||
}
|
||||
|
||||
export default class {
|
||||
|
|
@ -19,10 +20,9 @@ export default class {
|
|||
|
||||
PushToQueue<T>(op: TxOperation<T>) {
|
||||
if (!this.pendingTx) {
|
||||
this.log("executing item immediately")
|
||||
return this.execQueueItem(op)
|
||||
}
|
||||
this.log("holding item in queue")
|
||||
this.log("queue not empty, possibly stuck")
|
||||
return new Promise<T>((res, rej) => {
|
||||
this.transactionsQueue.push({ op, res, rej })
|
||||
})
|
||||
|
|
@ -32,16 +32,14 @@ export default class {
|
|||
this.pendingTx = false
|
||||
const next = this.transactionsQueue.pop()
|
||||
if (!next) {
|
||||
this.log("no more items in queue")
|
||||
this.log("queue is clear")
|
||||
return
|
||||
}
|
||||
try {
|
||||
this.log("executing next item in queue")
|
||||
const res = await this.execQueueItem(next.op)
|
||||
this.log("resolving next item in queue")
|
||||
if (next.op.description) this.log("done", next.op.description)
|
||||
next.res(res)
|
||||
} catch (err: any) {
|
||||
this.log("rejecting next item in queue")
|
||||
next.rej(err.message)
|
||||
}
|
||||
}
|
||||
|
|
@ -52,21 +50,17 @@ export default class {
|
|||
}
|
||||
this.pendingTx = true
|
||||
if (op.dbTx) {
|
||||
this.log("executing item transaction")
|
||||
return this.doTransaction(op.exec)
|
||||
}
|
||||
this.log("executing item operation")
|
||||
return this.doOperation(op.exec)
|
||||
}
|
||||
|
||||
async doOperation<T>(exec: TX<T>) {
|
||||
try {
|
||||
const res = await exec(this.DB)
|
||||
this.log("executing item operation done")
|
||||
this.execNextInQueue()
|
||||
return res
|
||||
} catch (err) {
|
||||
this.log("executing item operation failed")
|
||||
this.execNextInQueue()
|
||||
throw err
|
||||
}
|
||||
|
|
@ -77,11 +71,9 @@ export default class {
|
|||
return this.DB.transaction(async tx => {
|
||||
try {
|
||||
const res = await exec(tx)
|
||||
this.log("executing item transaction done")
|
||||
this.execNextInQueue()
|
||||
return res
|
||||
} catch (err) {
|
||||
this.log("executing item transaction failed")
|
||||
this.execNextInQueue()
|
||||
throw err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ export default class {
|
|||
if (!res.affected) {
|
||||
throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
getLogger({ userId: userId })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
|
||||
getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
|
||||
}
|
||||
async DecrementUserBalance(userId: string, decrement: number, entityManager = this.DB) {
|
||||
const user = await this.GetUser(userId, entityManager)
|
||||
|
|
@ -87,7 +87,7 @@ export default class {
|
|||
if (!res.affected) {
|
||||
throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
getLogger({ userId: userId })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
|
||||
getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
|
||||
}
|
||||
|
||||
async UpdateUser(userId: string, update: Partial<User>, entityManager = this.DB) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue