storage subprocess
This commit is contained in:
parent
8f6f5bf209
commit
e244dc058a
17 changed files with 972 additions and 479 deletions
|
|
@ -136,8 +136,8 @@ export default class {
|
|||
log(ERROR, "an address was paid, that has no linked application")
|
||||
return
|
||||
}
|
||||
const updateResult = await this.storage.paymentStorage.UpdateAddressReceivingTransaction(serialId, { confs: c.confs }, tx)
|
||||
if (!updateResult.affected) {
|
||||
const affected = await this.storage.paymentStorage.UpdateAddressReceivingTransaction(serialId, { confs: c.confs }, tx)
|
||||
if (!affected) {
|
||||
throw new Error("unable to flag chain transaction as paid")
|
||||
}
|
||||
const addressData = `${userAddress.address}:${tx_hash}`
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import { PubLogger, getLogger } from "../helpers/logger.js"
|
|||
import { LiquidityProvider } from "./liquidityProvider.js"
|
||||
import { Unlocker } from "./unlocker.js"
|
||||
import Storage from "../storage/index.js"
|
||||
import { TypeOrmMigrationRunner } from "../storage/migrations/runner.js"
|
||||
/* import { TypeOrmMigrationRunner } from "../storage/migrations/runner.js" */
|
||||
import Main from "./index.js"
|
||||
import SanityChecker from "./sanityChecker.js"
|
||||
import { LoadMainSettingsFromEnv, MainSettings } from "./settings.js"
|
||||
|
|
@ -18,10 +18,11 @@ export type AppData = {
|
|||
export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => {
|
||||
const utils = new Utils(mainSettings)
|
||||
const storageManager = new Storage(mainSettings.storageSettings)
|
||||
const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2])
|
||||
if (manualMigration) {
|
||||
return
|
||||
}
|
||||
await storageManager.Connect(log)
|
||||
/* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2])
|
||||
if (manualMigration) {
|
||||
return
|
||||
} */
|
||||
const unlocker = new Unlocker(mainSettings, storageManager)
|
||||
await unlocker.Unlock()
|
||||
const adminManager = new AdminManager(mainSettings, storageManager)
|
||||
|
|
|
|||
|
|
@ -89,27 +89,23 @@ export default class {
|
|||
if (state.paid_at_unix < 0) {
|
||||
const fullAmount = p.paid_amount + p.service_fees + p.routing_fees
|
||||
log("found a failed provider payment, refunding", fullAmount, "sats to user", p.user.user_id)
|
||||
await this.storage.txQueue.PushToQueue({
|
||||
dbTx: true, description: "refund failed provider payment", exec: async tx => {
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx)
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx)
|
||||
}
|
||||
})
|
||||
await this.storage.StartTransaction(async tx => {
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx)
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx)
|
||||
}, "refund failed provider payment")
|
||||
return
|
||||
} else if (state.paid_at_unix > 0) {
|
||||
log("provider payment succeeded", p.serial_id, "updating payment info")
|
||||
const routingFeeLimit = p.routing_fees
|
||||
const serviceFee = p.service_fees
|
||||
const actualFee = state.network_fee + state.service_fee
|
||||
await this.storage.txQueue.PushToQueue({
|
||||
dbTx: true, description: "pending provider payment success after restart", exec: async tx => {
|
||||
if (routingFeeLimit - actualFee > 0) {
|
||||
this.log("refund pending provider payment routing fee", routingFeeLimit, actualFee, "sats")
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx)
|
||||
}
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx)
|
||||
await this.storage.StartTransaction(async tx => {
|
||||
if (routingFeeLimit - actualFee > 0) {
|
||||
this.log("refund pending provider payment routing fee", routingFeeLimit, actualFee, "sats")
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx)
|
||||
}
|
||||
})
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx)
|
||||
}, "pending provider payment success after restart")
|
||||
if (p.linkedApplication && p.user.user_id !== p.linkedApplication.owner.user_id && serviceFee > 0) {
|
||||
await this.storage.userStorage.IncrementUserBalance(p.linkedApplication.owner.user_id, serviceFee, "fees")
|
||||
}
|
||||
|
|
@ -140,15 +136,13 @@ export default class {
|
|||
const routingFeeLimit = p.routing_fees
|
||||
const serviceFee = p.service_fees
|
||||
const actualFee = Number(payment.feeSat)
|
||||
await this.storage.txQueue.PushToQueue({
|
||||
dbTx: true, description: "pending payment success after restart", exec: async tx => {
|
||||
if (routingFeeLimit - actualFee > 0) {
|
||||
this.log("refund pending payment routing fee", routingFeeLimit, actualFee, "sats")
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx)
|
||||
}
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx)
|
||||
await this.storage.StartTransaction(async tx => {
|
||||
if (routingFeeLimit - actualFee > 0) {
|
||||
this.log("refund pending payment routing fee", routingFeeLimit, actualFee, "sats")
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx)
|
||||
}
|
||||
})
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx)
|
||||
}, "pending payment success after restart")
|
||||
if (p.linkedApplication && p.user.user_id !== p.linkedApplication.owner.user_id && serviceFee > 0) {
|
||||
await this.storage.userStorage.IncrementUserBalance(p.linkedApplication.owner.user_id, serviceFee, "fees")
|
||||
}
|
||||
|
|
@ -158,12 +152,11 @@ export default class {
|
|||
case Payment_PaymentStatus.FAILED:
|
||||
const fullAmount = p.paid_amount + p.service_fees + p.routing_fees
|
||||
log("found a failed pending payment, refunding", fullAmount, "sats to user", p.user.user_id)
|
||||
await this.storage.txQueue.PushToQueue({
|
||||
dbTx: true, description: "refund failed pending payment", exec: async tx => {
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx)
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx)
|
||||
}
|
||||
})
|
||||
await this.storage.StartTransaction(async tx => {
|
||||
await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx)
|
||||
await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx)
|
||||
}, "refund failed pending payment")
|
||||
return
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
@ -319,12 +312,10 @@ export default class {
|
|||
const routingFeeLimit = this.lnd.GetFeeLimitAmount(payAmount)
|
||||
const use = await this.liquidityManager.beforeOutInvoicePayment(payAmount)
|
||||
const provider = use === 'provider' ? this.lnd.liquidProvider.GetProviderDestination() : undefined
|
||||
const pendingPayment = await this.storage.txQueue.PushToQueue({
|
||||
dbTx: true, description: "payment started", exec: async tx => {
|
||||
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, invoice, tx)
|
||||
return await this.storage.paymentStorage.AddPendingExternalPayment(userId, invoice, { payAmount, serviceFee, networkFee: routingFeeLimit }, linkedApplication, provider, tx, debitNpub)
|
||||
}
|
||||
})
|
||||
const pendingPayment = await this.storage.StartTransaction(async tx => {
|
||||
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, invoice, tx)
|
||||
return await this.storage.paymentStorage.AddPendingExternalPayment(userId, invoice, { payAmount, serviceFee, networkFee: routingFeeLimit }, linkedApplication, provider, tx, debitNpub)
|
||||
}, "payment started")
|
||||
this.log("ready to pay")
|
||||
try {
|
||||
const payment = await this.lnd.PayInvoice(invoice, amountForLnd, routingFeeLimit, payAmount, { useProvider: use === 'provider', from: 'user' }, index => {
|
||||
|
|
|
|||
|
|
@ -1,69 +1,59 @@
|
|||
import crypto from 'crypto';
|
||||
import { Between, DataSource, EntityManager, FindOperator, IsNull, LessThanOrEqual, MoreThanOrEqual } from "typeorm"
|
||||
import { Between, FindOperator, IsNull, LessThanOrEqual, MoreThanOrEqual } from "typeorm"
|
||||
import { generateSecretKey, getPublicKey } from 'nostr-tools';
|
||||
import { Application } from "./entity/Application.js"
|
||||
import UserStorage from './userStorage.js';
|
||||
import { ApplicationUser } from './entity/ApplicationUser.js';
|
||||
import { getLogger } from '../helpers/logger.js';
|
||||
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
||||
import { User } from './entity/User.js';
|
||||
import { InviteToken } from './entity/InviteToken.js';
|
||||
import { StorageInterface } from './storageInterface.js';
|
||||
export default class {
|
||||
DB: DataSource | EntityManager
|
||||
dbs: StorageInterface
|
||||
userStorage: UserStorage
|
||||
txQueue: TransactionsQueue
|
||||
constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) {
|
||||
this.DB = DB
|
||||
constructor(dbs: StorageInterface, userStorage: UserStorage) {
|
||||
this.dbs = dbs
|
||||
this.userStorage = userStorage
|
||||
this.txQueue = txQueue
|
||||
}
|
||||
|
||||
async AddApplication(name: string, allowUserCreation: boolean): Promise<Application> {
|
||||
return this.DB.transaction(async tx => {
|
||||
const owner = await this.userStorage.AddUser(0, tx)
|
||||
const repo = this.DB.getRepository(Application)
|
||||
const newApplication = repo.create({
|
||||
return this.dbs.Tx(async txId => {
|
||||
const owner = await this.userStorage.AddUser(0, txId)
|
||||
return this.dbs.CreateAndSave<Application>('Application', {
|
||||
app_id: crypto.randomBytes(32).toString('hex'),
|
||||
name,
|
||||
owner,
|
||||
allow_user_creation: allowUserCreation
|
||||
})
|
||||
return tx.getRepository(Application).save(newApplication)
|
||||
}, txId)
|
||||
})
|
||||
}
|
||||
|
||||
async GetApplicationByName(name: string, entityManager = this.DB) {
|
||||
const found = await entityManager.getRepository(Application).findOne({
|
||||
where: {
|
||||
name
|
||||
}
|
||||
})
|
||||
async GetApplicationByName(name: string, txId?: string) {
|
||||
const found = await this.dbs.FindOne<Application>('Application', { where: { name } }, txId)
|
||||
if (!found) {
|
||||
throw new Error(`application ${name} not found`)
|
||||
}
|
||||
return found
|
||||
}
|
||||
|
||||
async GetApplications(entityManager = this.DB): Promise<Application[]> {
|
||||
return entityManager.getRepository(Application).find()
|
||||
async GetApplications(txId?: string): Promise<Application[]> {
|
||||
return this.dbs.Find<Application>('Application', {}, txId)
|
||||
}
|
||||
async GetApplication(appId: string, entityManager = this.DB): Promise<Application> {
|
||||
async GetApplication(appId: string, txId?: string): Promise<Application> {
|
||||
if (!appId) {
|
||||
throw new Error("invalid app id provided")
|
||||
}
|
||||
const found = await entityManager.getRepository(Application).findOne({
|
||||
where: {
|
||||
app_id: appId
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
const found = await this.dbs.FindOne<Application>('Application', { where: { app_id: appId } }, txId)
|
||||
if (!found) {
|
||||
throw new Error(`application ${appId} not found`)
|
||||
}
|
||||
return found
|
||||
}
|
||||
|
||||
async UpdateApplication(app: Application, update: Partial<Application>, entityManager = this.DB) {
|
||||
await entityManager.getRepository(Application).update(app.serial_id, update)
|
||||
async UpdateApplication(app: Application, update: Partial<Application>, txId?: string) {
|
||||
await this.dbs.Update<Application>('Application', app.serial_id, update, txId)
|
||||
}
|
||||
|
||||
async GenerateApplicationKeys(app: Application) {
|
||||
|
|
@ -75,32 +65,27 @@ export default class {
|
|||
}
|
||||
|
||||
async AddApplicationUser(application: Application, userIdentifier: string, balance: number, nostrPub?: string) {
|
||||
return this.DB.transaction(async tx => {
|
||||
const user = await this.userStorage.AddUser(balance, tx)
|
||||
const repo = tx.getRepository(ApplicationUser)
|
||||
const appUser = repo.create({
|
||||
return this.dbs.Tx(async txId => {
|
||||
const user = await this.userStorage.AddUser(balance, txId)
|
||||
return this.dbs.CreateAndSave<ApplicationUser>('ApplicationUser', {
|
||||
user: user,
|
||||
application,
|
||||
identifier: userIdentifier,
|
||||
nostr_public_key: nostrPub
|
||||
})
|
||||
return repo.save(appUser)
|
||||
}, txId)
|
||||
})
|
||||
}
|
||||
|
||||
async GetApplicationUserIfExists(application: Application, userIdentifier: string, entityManager = this.DB): Promise<ApplicationUser | null> {
|
||||
return entityManager.getRepository(ApplicationUser).findOne({ where: { identifier: userIdentifier, application: { serial_id: application.serial_id } } })
|
||||
async GetApplicationUserIfExists(application: Application, userIdentifier: string, txId?: string): Promise<ApplicationUser | null> {
|
||||
return this.dbs.FindOne<ApplicationUser>('ApplicationUser', { where: { identifier: userIdentifier, application: { serial_id: application.serial_id } } }, txId)
|
||||
}
|
||||
|
||||
async GetOrCreateNostrAppUser(application: Application, nostrPub: string, entityManager = this.DB): Promise<ApplicationUser> {
|
||||
async GetOrCreateNostrAppUser(application: Application, nostrPub: string, txId?: string): Promise<ApplicationUser> {
|
||||
if (!nostrPub) {
|
||||
throw new Error("no nostrPub provided")
|
||||
}
|
||||
const user = await entityManager.getRepository(ApplicationUser).findOne({ where: { nostr_public_key: nostrPub } })
|
||||
const user = await this.dbs.FindOne<ApplicationUser>('ApplicationUser', { where: { nostr_public_key: nostrPub } }, txId)
|
||||
if (user) {
|
||||
//if (user.application.app_id !== application.app_id) {
|
||||
// throw new Error("tried to access a user of application:" + user.application.app_id + "from application:" + application.app_id)
|
||||
//}
|
||||
return user
|
||||
}
|
||||
if (!application.allow_user_creation) {
|
||||
|
|
@ -109,20 +94,20 @@ export default class {
|
|||
return this.AddApplicationUser(application, crypto.randomBytes(32).toString('hex'), 0, nostrPub)
|
||||
}
|
||||
|
||||
async FindNostrAppUser(nostrPub: string, entityManager = this.DB) {
|
||||
return entityManager.getRepository(ApplicationUser).findOne({ where: { nostr_public_key: nostrPub } })
|
||||
async FindNostrAppUser(nostrPub: string, txId?: string) {
|
||||
return this.dbs.FindOne<ApplicationUser>('ApplicationUser', { where: { nostr_public_key: nostrPub } }, txId)
|
||||
}
|
||||
|
||||
async GetOrCreateApplicationUser(application: Application, userIdentifier: string, balance: number, entityManager = this.DB): Promise<{ user: ApplicationUser, created: boolean }> {
|
||||
const user = await this.GetApplicationUserIfExists(application, userIdentifier, entityManager)
|
||||
async GetOrCreateApplicationUser(application: Application, userIdentifier: string, balance: number): Promise<{ user: ApplicationUser, created: boolean }> {
|
||||
const user = await this.GetApplicationUserIfExists(application, userIdentifier)
|
||||
if (user) {
|
||||
return { user, created: false }
|
||||
}
|
||||
return { user: await this.AddApplicationUser(application, userIdentifier, balance), created: true }
|
||||
}
|
||||
|
||||
async GetApplicationUser(application: Application, userIdentifier: string, entityManager = this.DB): Promise<ApplicationUser> {
|
||||
const found = await this.GetApplicationUserIfExists(application, userIdentifier, entityManager)
|
||||
async GetApplicationUser(application: Application, userIdentifier: string, txId?: string): Promise<ApplicationUser> {
|
||||
const found = await this.GetApplicationUserIfExists(application, userIdentifier, txId)
|
||||
if (!found) {
|
||||
getLogger({ appName: application.name })("user", userIdentifier, "not found", application.name)
|
||||
throw new Error(`application user not found`)
|
||||
|
|
@ -134,7 +119,7 @@ export default class {
|
|||
return found
|
||||
}
|
||||
|
||||
async GetApplicationUsers(application: Application | null, { from, to }: { from?: number, to?: number }, entityManager = this.DB) {
|
||||
async GetApplicationUsers(application: Application | null, { from, to }: { from?: number, to?: number }, txId?: string) {
|
||||
const q = application ? { app_id: application.app_id } : IsNull()
|
||||
let time: { created_at?: FindOperator<Date> } = {}
|
||||
if (!!from && !!to) {
|
||||
|
|
@ -144,61 +129,53 @@ export default class {
|
|||
} else if (!!to) {
|
||||
time.created_at = LessThanOrEqual<Date>(new Date(to * 1000))
|
||||
}
|
||||
return entityManager.getRepository(ApplicationUser).find({ where: { application: q, ...time } })
|
||||
return this.dbs.Find<ApplicationUser>('ApplicationUser', { where: { application: q, ...time } }, txId)
|
||||
}
|
||||
|
||||
async GetAppUserFromUser(application: Application, userId: string, entityManager = this.DB): Promise<ApplicationUser | null> {
|
||||
return entityManager.getRepository(ApplicationUser).findOne({ where: { user: { user_id: userId }, application: { app_id: application.app_id } } })
|
||||
async GetAppUserFromUser(application: Application, userId: string, txId?: string): Promise<ApplicationUser | null> {
|
||||
return this.dbs.FindOne<ApplicationUser>('ApplicationUser', { where: { user: { user_id: userId }, application: { app_id: application.app_id } } }, txId)
|
||||
}
|
||||
|
||||
async GetAllAppUsersFromUser(userId: string, entityManager = this.DB): Promise<ApplicationUser[]> {
|
||||
return entityManager.getRepository(ApplicationUser).find({ where: { user: { user_id: userId } } })
|
||||
async GetAllAppUsersFromUser(userId: string, txId?: string): Promise<ApplicationUser[]> {
|
||||
return this.dbs.Find<ApplicationUser>('ApplicationUser', { where: { user: { user_id: userId } } }, txId)
|
||||
}
|
||||
|
||||
async IsApplicationOwner(userId: string, entityManager = this.DB) {
|
||||
return entityManager.getRepository(Application).findOne({ where: { owner: { user_id: userId } } })
|
||||
async IsApplicationOwner(userId: string, txId?: string) {
|
||||
return this.dbs.FindOne<Application>('Application', { where: { owner: { user_id: userId } } }, txId)
|
||||
}
|
||||
|
||||
|
||||
async AddNPubToApplicationUser(serialId: number, nPub: string, entityManager = this.DB) {
|
||||
return entityManager.getRepository(ApplicationUser).update(serialId, { nostr_public_key: nPub })
|
||||
async AddNPubToApplicationUser(serialId: number, nPub: string, txId?: string) {
|
||||
return this.dbs.Update<ApplicationUser>('ApplicationUser', serialId, { nostr_public_key: nPub }, txId)
|
||||
}
|
||||
|
||||
async UpdateUserCallbackUrl(application: Application, userIdentifier: string, callbackUrl: string, entityManager = this.DB) {
|
||||
return entityManager.getRepository(ApplicationUser).update({ application: { app_id: application.app_id }, identifier: userIdentifier }, { callback_url: callbackUrl })
|
||||
async UpdateUserCallbackUrl(application: Application, userIdentifier: string, callbackUrl: string, txId?: string) {
|
||||
return this.dbs.Update<ApplicationUser>('ApplicationUser', { application: { app_id: application.app_id }, identifier: userIdentifier }, { callback_url: callbackUrl }, txId)
|
||||
}
|
||||
|
||||
async RemoveApplicationUserAndBaseUser(appUser: ApplicationUser, entityManager = this.DB) {
|
||||
async RemoveApplicationUserAndBaseUser(appUser: ApplicationUser, txId?: string) {
|
||||
const baseUser = appUser.user;
|
||||
await entityManager.getRepository(ApplicationUser).remove(appUser);
|
||||
await entityManager.getRepository(User).remove(baseUser);
|
||||
this.dbs.Remove<ApplicationUser>('ApplicationUser', appUser, txId)
|
||||
this.dbs.Remove<User>('User', baseUser, txId)
|
||||
}
|
||||
|
||||
|
||||
async AddInviteToken(app: Application, sats?: number) {
|
||||
return this.txQueue.PushToQueue({
|
||||
dbTx: false,
|
||||
exec: async tx => {
|
||||
const inviteRepo = tx.getRepository(InviteToken);
|
||||
const newInviteToken = inviteRepo.create({
|
||||
inviteToken: crypto.randomBytes(32).toString('hex'),
|
||||
used: false,
|
||||
sats: sats,
|
||||
application: app
|
||||
});
|
||||
|
||||
return inviteRepo.save(newInviteToken)
|
||||
}
|
||||
return this.dbs.CreateAndSave<InviteToken>('InviteToken', {
|
||||
inviteToken: crypto.randomBytes(32).toString('hex'),
|
||||
used: false,
|
||||
sats: sats,
|
||||
application: app
|
||||
})
|
||||
}
|
||||
|
||||
async FindInviteToken(token: string) {
|
||||
return this.DB.getRepository(InviteToken).findOne({ where: { inviteToken: token } })
|
||||
return this.dbs.FindOne<InviteToken>('InviteToken', { where: { inviteToken: token } })
|
||||
}
|
||||
|
||||
|
||||
async SetInviteTokenAsUsed(inviteToken: InviteToken) {
|
||||
return this.DB.getRepository(InviteToken).update(inviteToken, { used: true });
|
||||
return this.dbs.Update<InviteToken>('InviteToken', inviteToken, { used: true })
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -39,11 +39,51 @@ export const LoadDbSettingsFromEnv = (): DbSettings => {
|
|||
}
|
||||
}
|
||||
|
||||
/* const MainDbEntitiesNames = ['User', 'UserReceivingInvoice', 'UserReceivingAddress', 'AddressReceivingTransaction', 'UserInvoicePayment', 'UserTransactionPayment',
|
||||
'UserBasicAuth', 'UserEphemeralKey', 'Product', 'UserToUserPayment', 'Application', 'ApplicationUser', 'UserToUserPayment', 'LspOrder', 'LndNodeInfo', 'TrackedProvider',
|
||||
'InviteToken', 'DebitAccess', 'UserOffer'] as const
|
||||
type MainDbEntitiesName = typeof MainDbEntitiesNames[number]
|
||||
|
||||
const MetricsDbEntitiesNames = ['BalanceEvent', 'ChannelBalanceEvent', 'ChannelRouting', 'RootOperation'] as const
|
||||
type MetricsDbEntitiesName = typeof MetricsDbEntitiesNames[number] */
|
||||
|
||||
export const MainDbEntities = {
|
||||
'AddressReceivingTransaction': AddressReceivingTransaction,
|
||||
'Application': Application,
|
||||
'ApplicationUser': ApplicationUser,
|
||||
'User': User,
|
||||
'UserReceivingAddress': UserReceivingAddress,
|
||||
'UserReceivingInvoice': UserReceivingInvoice,
|
||||
'UserInvoicePayment': UserInvoicePayment,
|
||||
'UserTransactionPayment': UserTransactionPayment,
|
||||
'UserBasicAuth': UserBasicAuth,
|
||||
'UserEphemeralKey': UserEphemeralKey,
|
||||
'UserToUserPayment': UserToUserPayment,
|
||||
'LspOrder': LspOrder,
|
||||
'LndNodeInfo': LndNodeInfo,
|
||||
'TrackedProvider': TrackedProvider,
|
||||
'InviteToken': InviteToken,
|
||||
'DebitAccess': DebitAccess,
|
||||
'UserOffer': UserOffer,
|
||||
'Product': Product
|
||||
}
|
||||
export type MainDbNames = keyof typeof MainDbEntities
|
||||
export const MainDbEntitiesNames = Object.keys(MainDbEntities)
|
||||
|
||||
const MetricsDbEntities = {
|
||||
'BalanceEvent': BalanceEvent,
|
||||
'ChannelBalanceEvent': ChannelBalanceEvent,
|
||||
'ChannelRouting': ChannelRouting,
|
||||
'RootOperation': RootOperation
|
||||
}
|
||||
export type MetricsDbNames = keyof typeof MetricsDbEntities
|
||||
export const MetricsDbEntitiesNames = Object.keys(MetricsDbEntities)
|
||||
|
||||
export const newMetricsDb = async (settings: DbSettings, metricsMigrations: Function[]): Promise<{ source: DataSource, executedMigrations: Migration[] }> => {
|
||||
const source = await new DataSource({
|
||||
type: "sqlite",
|
||||
database: settings.metricsDatabaseFile,
|
||||
entities: [BalanceEvent, ChannelBalanceEvent, ChannelRouting, RootOperation],
|
||||
entities: Object.values(MetricsDbEntities),
|
||||
migrations: metricsMigrations
|
||||
}).initialize();
|
||||
const log = getLogger({});
|
||||
|
|
@ -62,10 +102,7 @@ export default async (settings: DbSettings, migrations: Function[]): Promise<{ s
|
|||
type: "sqlite",
|
||||
database: settings.databaseFile,
|
||||
// logging: true,
|
||||
entities: [User, UserReceivingInvoice, UserReceivingAddress, AddressReceivingTransaction, UserInvoicePayment, UserTransactionPayment,
|
||||
UserBasicAuth, UserEphemeralKey, Product, UserToUserPayment, Application, ApplicationUser, UserToUserPayment, LspOrder, LndNodeInfo, TrackedProvider,
|
||||
InviteToken, DebitAccess, UserOffer
|
||||
],
|
||||
entities: Object.values(MainDbEntities),
|
||||
//synchronize: true,
|
||||
migrations
|
||||
}).initialize()
|
||||
|
|
@ -79,7 +116,7 @@ export default async (settings: DbSettings, migrations: Function[]): Promise<{ s
|
|||
return { source, executedMigrations: [] }
|
||||
}
|
||||
|
||||
export const runFakeMigration = async (databaseFile: string, migrations: Function[]) => {
|
||||
/* export const runFakeMigration = async (databaseFile: string, migrations: Function[]) => {
|
||||
const source = await new DataSource({
|
||||
type: "sqlite",
|
||||
database: databaseFile,
|
||||
|
|
@ -90,4 +127,4 @@ export const runFakeMigration = async (databaseFile: string, migrations: Functio
|
|||
migrations
|
||||
}).initialize()
|
||||
return source.runMigrations({ fake: true })
|
||||
}
|
||||
} */
|
||||
|
|
@ -1,47 +1,42 @@
|
|||
import { DataSource, EntityManager } from "typeorm"
|
||||
import UserStorage from './userStorage.js';
|
||||
import TransactionsQueue from "./transactionsQueue.js";
|
||||
import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js";
|
||||
import { StorageInterface } from "./storageInterface.js";
|
||||
type AccessToAdd = {
|
||||
npub: string
|
||||
rules?: DebitAccessRules
|
||||
authorize: boolean
|
||||
}
|
||||
export default class {
|
||||
DB: DataSource | EntityManager
|
||||
txQueue: TransactionsQueue
|
||||
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
|
||||
this.DB = DB
|
||||
this.txQueue = txQueue
|
||||
dbs: StorageInterface
|
||||
constructor(dbs: StorageInterface) {
|
||||
this.dbs = dbs
|
||||
}
|
||||
|
||||
async AddDebitAccess(appUserId: string, access: AccessToAdd, entityManager = this.DB) {
|
||||
const entry = entityManager.getRepository(DebitAccess).create({
|
||||
async AddDebitAccess(appUserId: string, access: AccessToAdd) {
|
||||
return this.dbs.CreateAndSave<DebitAccess>('DebitAccess', {
|
||||
app_user_id: appUserId,
|
||||
npub: access.npub,
|
||||
authorized: access.authorize,
|
||||
rules: access.rules,
|
||||
})
|
||||
return this.txQueue.PushToQueue<DebitAccess>({ exec: async db => db.getRepository(DebitAccess).save(entry), dbTx: false })
|
||||
}
|
||||
|
||||
async GetAllUserDebitAccess(appUserId: string) {
|
||||
return this.DB.getRepository(DebitAccess).find({ where: { app_user_id: appUserId } })
|
||||
async GetAllUserDebitAccess(appUserId: string, txId?: string) {
|
||||
return this.dbs.Find<DebitAccess>('DebitAccess', { where: { app_user_id: appUserId } }, txId)
|
||||
}
|
||||
|
||||
async GetDebitAccess(appUserId: string, authorizedPub: string) {
|
||||
return this.DB.getRepository(DebitAccess).findOne({ where: { app_user_id: appUserId, npub: authorizedPub } })
|
||||
async GetDebitAccess(appUserId: string, authorizedPub: string, txId?: string) {
|
||||
return this.dbs.FindOne<DebitAccess>('DebitAccess', { where: { app_user_id: appUserId, npub: authorizedPub } }, txId)
|
||||
}
|
||||
|
||||
async IncrementDebitAccess(appUserId: string, authorizedPub: string, amount: number) {
|
||||
return this.DB.getRepository(DebitAccess).increment({ app_user_id: appUserId, npub: authorizedPub }, 'total_debits', amount)
|
||||
async IncrementDebitAccess(appUserId: string, authorizedPub: string, amount: number, txId?: string) {
|
||||
return this.dbs.Increment<DebitAccess>('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, 'total_debits', amount, txId)
|
||||
}
|
||||
|
||||
async UpdateDebitAccess(appUserId: string, authorizedPub: string, authorized: boolean) {
|
||||
return this.DB.getRepository(DebitAccess).update({ app_user_id: appUserId, npub: authorizedPub }, { authorized })
|
||||
async UpdateDebitAccess(appUserId: string, authorizedPub: string, authorized: boolean, txId?: string) {
|
||||
return this.dbs.Update<DebitAccess>('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, { authorized }, txId)
|
||||
}
|
||||
async UpdateDebitAccessRules(appUserId: string, authorizedPub: string, rules?: DebitAccessRules) {
|
||||
return this.DB.getRepository(DebitAccess).update({ app_user_id: appUserId, npub: authorizedPub }, { rules: rules || null })
|
||||
async UpdateDebitAccessRules(appUserId: string, authorizedPub: string, rules?: DebitAccessRules, txId?: string) {
|
||||
return this.dbs.Update<DebitAccess>('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, { rules: rules || null }, txId)
|
||||
}
|
||||
|
||||
async DenyDebitAccess(appUserId: string, pub: string) {
|
||||
|
|
@ -52,7 +47,7 @@ export default class {
|
|||
await this.UpdateDebitAccess(appUserId, pub, false)
|
||||
}
|
||||
|
||||
async RemoveDebitAccess(appUserId: string, authorizedPub: string) {
|
||||
return this.DB.getRepository(DebitAccess).delete({ app_user_id: appUserId, npub: authorizedPub })
|
||||
async RemoveDebitAccess(appUserId: string, authorizedPub: string, txId?: string) {
|
||||
return this.dbs.Delete<DebitAccess>('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, txId)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,3 @@
|
|||
import { DataSource, EntityManager } from "typeorm"
|
||||
import fs from 'fs'
|
||||
import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db.js"
|
||||
import ProductStorage from './productStorage.js'
|
||||
|
|
@ -7,11 +6,13 @@ import UserStorage from "./userStorage.js";
|
|||
import PaymentStorage from "./paymentStorage.js";
|
||||
import MetricsStorage from "./metricsStorage.js";
|
||||
import MetricsEventStorage from "./metricsEventStorage.js";
|
||||
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
||||
import EventsLogManager from "./eventsLog.js";
|
||||
import { LiquidityStorage } from "./liquidityStorage.js";
|
||||
import DebitStorage from "./debitStorage.js"
|
||||
import OfferStorage from "./offerStorage.js"
|
||||
import { StorageInterface, TX } from "./storageInterface.js";
|
||||
import { allMetricsMigrations, allMigrations } from "./migrations/runner.js"
|
||||
import { PubLogger } from "../helpers/logger.js"
|
||||
export type StorageSettings = {
|
||||
dbSettings: DbSettings
|
||||
eventLogPath: string
|
||||
|
|
@ -21,9 +22,10 @@ export const LoadStorageSettingsFromEnv = (): StorageSettings => {
|
|||
return { dbSettings: LoadDbSettingsFromEnv(), eventLogPath: "logs/eventLogV3.csv", dataDir: process.env.DATA_DIR || "" }
|
||||
}
|
||||
export default class {
|
||||
DB: DataSource | EntityManager
|
||||
//DB: DataSource | EntityManager
|
||||
settings: StorageSettings
|
||||
txQueue: TransactionsQueue
|
||||
//txQueue: TransactionsQueue
|
||||
dbs: StorageInterface
|
||||
productStorage: ProductStorage
|
||||
applicationStorage: ApplicationStorage
|
||||
userStorage: UserStorage
|
||||
|
|
@ -38,25 +40,35 @@ export default class {
|
|||
this.settings = settings
|
||||
this.eventsLog = new EventsLogManager(settings.eventLogPath)
|
||||
}
|
||||
async Connect(migrations: Function[], metricsMigrations: Function[]) {
|
||||
const { source, executedMigrations } = await NewDB(this.settings.dbSettings, migrations)
|
||||
this.DB = source
|
||||
this.txQueue = new TransactionsQueue("main", this.DB)
|
||||
this.userStorage = new UserStorage(this.DB, this.txQueue, this.eventsLog)
|
||||
this.productStorage = new ProductStorage(this.DB, this.txQueue)
|
||||
this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue)
|
||||
this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue)
|
||||
async Connect(log: PubLogger) {
|
||||
this.dbs = new StorageInterface()
|
||||
await this.dbs.Connect(this.settings.dbSettings)
|
||||
//const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations)
|
||||
//this.DB = source
|
||||
//this.txQueue = new TransactionsQueue("main", this.DB)
|
||||
this.userStorage = new UserStorage(this.dbs, this.eventsLog)
|
||||
this.productStorage = new ProductStorage(this.dbs)
|
||||
this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage)
|
||||
this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage)
|
||||
this.metricsStorage = new MetricsStorage(this.settings)
|
||||
this.metricsEventStorage = new MetricsEventStorage(this.settings)
|
||||
this.liquidityStorage = new LiquidityStorage(this.DB, this.txQueue)
|
||||
this.debitStorage = new DebitStorage(this.DB, this.txQueue)
|
||||
this.offerStorage = new OfferStorage(this.DB, this.txQueue)
|
||||
this.liquidityStorage = new LiquidityStorage(this.dbs)
|
||||
this.debitStorage = new DebitStorage(this.dbs)
|
||||
this.offerStorage = new OfferStorage(this.dbs)
|
||||
try { if (this.settings.dataDir) fs.mkdirSync(this.settings.dataDir) } catch (e) { }
|
||||
const executedMetricsMigrations = await this.metricsStorage.Connect(metricsMigrations)
|
||||
return { executedMigrations, executedMetricsMigrations };
|
||||
const executedMetricsMigrations = await this.metricsStorage.Connect(allMetricsMigrations)
|
||||
/* if (executedMigrations.length > 0) {
|
||||
log(executedMigrations.length, "new migrations executed")
|
||||
log("-------------------")
|
||||
|
||||
} */
|
||||
if (executedMetricsMigrations.length > 0) {
|
||||
log(executedMetricsMigrations.length, "new metrics migrations executed")
|
||||
log("-------------------")
|
||||
}
|
||||
}
|
||||
|
||||
StartTransaction<T>(exec: TX<T>, description?: string) {
|
||||
return this.txQueue.PushToQueue({ exec, dbTx: true, description })
|
||||
return this.dbs.Tx(tx => exec(tx), description)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,72 +1,66 @@
|
|||
import { DataSource, EntityManager, IsNull, MoreThan, Not } from "typeorm"
|
||||
import { IsNull, MoreThan, Not } from "typeorm"
|
||||
import { LspOrder } from "./entity/LspOrder.js";
|
||||
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
||||
import { LndNodeInfo } from "./entity/LndNodeInfo.js";
|
||||
import { TrackedProvider } from "./entity/TrackedProvider.js";
|
||||
import { StorageInterface } from "./storageInterface.js";
|
||||
export class LiquidityStorage {
|
||||
DB: DataSource | EntityManager
|
||||
txQueue: TransactionsQueue
|
||||
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
|
||||
this.DB = DB
|
||||
this.txQueue = txQueue
|
||||
dbs: StorageInterface
|
||||
constructor(dbs: StorageInterface) {
|
||||
this.dbs = dbs
|
||||
}
|
||||
|
||||
GetLatestLspOrder() {
|
||||
return this.DB.getRepository(LspOrder).findOne({ where: { serial_id: MoreThan(0) }, order: { serial_id: "DESC" } })
|
||||
return this.dbs.FindOne<LspOrder>('LspOrder', { where: { serial_id: MoreThan(0) }, order: { serial_id: "DESC" } })
|
||||
}
|
||||
|
||||
SaveLspOrder(order: Partial<LspOrder>) {
|
||||
const entry = this.DB.getRepository(LspOrder).create(order)
|
||||
return this.txQueue.PushToQueue<LspOrder>({ exec: async db => db.getRepository(LspOrder).save(entry), dbTx: false })
|
||||
return this.dbs.CreateAndSave<LspOrder>('LspOrder', order)
|
||||
}
|
||||
|
||||
async GetNoodeSeed(pubkey: string) {
|
||||
return this.DB.getRepository(LndNodeInfo).findOne({ where: { pubkey, seed: Not(IsNull()) } })
|
||||
return this.dbs.FindOne<LndNodeInfo>('LndNodeInfo', { where: { pubkey, seed: Not(IsNull()) } })
|
||||
}
|
||||
|
||||
async SaveNodeSeed(pubkey: string, seed: string) {
|
||||
const existing = await this.DB.getRepository(LndNodeInfo).findOne({ where: { pubkey } })
|
||||
const existing = await this.dbs.FindOne<LndNodeInfo>('LndNodeInfo', { where: { pubkey } })
|
||||
if (existing) {
|
||||
throw new Error("A seed already exists for this pub key")
|
||||
}
|
||||
const entry = this.DB.getRepository(LndNodeInfo).create({ pubkey, seed })
|
||||
return this.txQueue.PushToQueue<LndNodeInfo>({ exec: async db => db.getRepository(LndNodeInfo).save(entry), dbTx: false })
|
||||
return this.dbs.CreateAndSave<LndNodeInfo>('LndNodeInfo', { pubkey, seed })
|
||||
}
|
||||
|
||||
async SaveNodeBackup(pubkey: string, backup: string) {
|
||||
const existing = await this.DB.getRepository(LndNodeInfo).findOne({ where: { pubkey } })
|
||||
const existing = await this.dbs.FindOne<LndNodeInfo>('LndNodeInfo', { where: { pubkey } })
|
||||
if (existing) {
|
||||
await this.DB.getRepository(LndNodeInfo).update(existing.serial_id, { backup })
|
||||
await this.dbs.Update<LndNodeInfo>('LndNodeInfo', existing.serial_id, { backup })
|
||||
return
|
||||
}
|
||||
const entry = this.DB.getRepository(LndNodeInfo).create({ pubkey, backup })
|
||||
await this.txQueue.PushToQueue<LndNodeInfo>({ exec: async db => db.getRepository(LndNodeInfo).save(entry), dbTx: false })
|
||||
return this.dbs.CreateAndSave<LndNodeInfo>('LndNodeInfo', { pubkey, backup })
|
||||
}
|
||||
|
||||
async GetTrackedProviders() {
|
||||
return this.DB.getRepository(TrackedProvider).find({})
|
||||
return this.dbs.Find<TrackedProvider>('TrackedProvider', {})
|
||||
}
|
||||
|
||||
async GetTrackedProvider(providerType: 'lnd' | 'lnPub', pub: string) {
|
||||
return this.DB.getRepository(TrackedProvider).findOne({ where: { provider_pubkey: pub, provider_type: providerType } })
|
||||
return this.dbs.FindOne<TrackedProvider>('TrackedProvider', { where: { provider_pubkey: pub, provider_type: providerType } })
|
||||
}
|
||||
async CreateTrackedProvider(providerType: 'lnd' | 'lnPub', pub: string, latestBalance = 0) {
|
||||
const entry = this.DB.getRepository(TrackedProvider).create({ provider_pubkey: pub, provider_type: providerType, latest_balance: latestBalance })
|
||||
return this.txQueue.PushToQueue<TrackedProvider>({ exec: async db => db.getRepository(TrackedProvider).save(entry), dbTx: false })
|
||||
return this.dbs.CreateAndSave<TrackedProvider>('TrackedProvider', { provider_pubkey: pub, provider_type: providerType, latest_balance: latestBalance })
|
||||
}
|
||||
async UpdateTrackedProviderBalance(providerType: 'lnd' | 'lnPub', pub: string, latestBalance: number) {
|
||||
console.log("updating tracked balance:", latestBalance)
|
||||
return this.DB.getRepository(TrackedProvider).update({ provider_pubkey: pub, provider_type: providerType }, { latest_balance: latestBalance })
|
||||
return this.dbs.Update<TrackedProvider>('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, { latest_balance: latestBalance })
|
||||
}
|
||||
async IncrementTrackedProviderBalance(providerType: 'lnd' | 'lnPub', pub: string, amount: number) {
|
||||
if (amount < 0) {
|
||||
return this.DB.getRepository(TrackedProvider).increment({ provider_pubkey: pub, provider_type: providerType }, "latest_balance", amount)
|
||||
return this.dbs.Increment<TrackedProvider>('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, "latest_balance", amount)
|
||||
} else {
|
||||
return this.DB.getRepository(TrackedProvider).decrement({ provider_pubkey: pub, provider_type: providerType }, "latest_balance", -amount)
|
||||
return this.dbs.Decrement<TrackedProvider>('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, "latest_balance", -amount)
|
||||
}
|
||||
|
||||
}
|
||||
async UpdateTrackedProviderDisruption(providerType: 'lnd' | 'lnPub', pub: string, latestDisruptionAtUnix: number) {
|
||||
return this.DB.getRepository(TrackedProvider).update({ provider_pubkey: pub, provider_type: providerType }, { latest_distruption_at_unix: latestDisruptionAtUnix })
|
||||
return this.dbs.Update<TrackedProvider>('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, { latest_distruption_at_unix: latestDisruptionAtUnix })
|
||||
}
|
||||
}
|
||||
|
|
@ -1,14 +1,12 @@
|
|||
import { Between, DataSource, EntityManager, FindManyOptions, FindOperator, LessThanOrEqual, MoreThanOrEqual } from "typeorm"
|
||||
import { BalanceEvent } from "./entity/BalanceEvent.js"
|
||||
import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js"
|
||||
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
||||
import TransactionsQueue from "./transactionsQueue.js";
|
||||
import { StorageSettings } from "./index.js";
|
||||
import { newMetricsDb } from "./db.js";
|
||||
import { ChannelRouting } from "./entity/ChannelRouting.js";
|
||||
import { RootOperation } from "./entity/RootOperation.js";
|
||||
export default class {
|
||||
|
||||
|
||||
DB: DataSource | EntityManager
|
||||
settings: StorageSettings
|
||||
txQueue: TransactionsQueue
|
||||
|
|
|
|||
|
|
@ -1,6 +1,3 @@
|
|||
import { PubLogger } from '../../helpers/logger.js'
|
||||
import { DbSettings, runFakeMigration } from '../db.js'
|
||||
import Storage, { StorageSettings } from '../index.js'
|
||||
import { Initial1703170309875 } from './1703170309875-initial.js'
|
||||
import { LndMetrics1703170330183 } from './1703170330183-lnd_metrics.js'
|
||||
import { ChannelRouting1709316653538 } from './1709316653538-channel_routing.js'
|
||||
|
|
@ -18,26 +15,21 @@ import { DebitToPub1727105758354 } from './1727105758354-debit_to_pub.js'
|
|||
import { UserCbUrl1727112281043 } from './1727112281043-user_cb_url.js'
|
||||
import { RootOps1732566440447 } from './1732566440447-root_ops.js'
|
||||
import { UserOffer1733502626042 } from './1733502626042-user_offer.js'
|
||||
const allMigrations = [Initial1703170309875, LspOrder1718387847693, LiquidityProvider1719335699480, LndNodeInfo1720187506189, TrackedProvider1720814323679, CreateInviteTokenTable1721751414878, PaymentIndex1721760297610, DebitAccess1726496225078, DebitAccessFixes1726685229264, DebitToPub1727105758354, UserCbUrl1727112281043, UserOffer1733502626042]
|
||||
const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, RootOps1732566440447]
|
||||
export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise<boolean> => {
|
||||
if (arg === 'fake_initial_migration') {
|
||||
runFakeMigration(settings.databaseFile, [Initial1703170309875])
|
||||
return true
|
||||
}
|
||||
export const allMigrations = [Initial1703170309875, LspOrder1718387847693, LiquidityProvider1719335699480, LndNodeInfo1720187506189, TrackedProvider1720814323679, CreateInviteTokenTable1721751414878, PaymentIndex1721760297610, DebitAccess1726496225078, DebitAccessFixes1726685229264, DebitToPub1727105758354, UserCbUrl1727112281043, UserOffer1733502626042]
|
||||
export const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, RootOps1732566440447]
|
||||
/* export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise<boolean> => {
|
||||
await connectAndMigrate(log, storageManager, allMigrations, allMetricsMigrations)
|
||||
return false
|
||||
}
|
||||
|
||||
const connectAndMigrate = async (log: PubLogger, storageManager: Storage, migrations: Function[], metricsMigrations: Function[]) => {
|
||||
const { executedMigrations, executedMetricsMigrations } = await storageManager.Connect(migrations, metricsMigrations)
|
||||
if (migrations.length > 0) {
|
||||
log(executedMigrations.length, "of", migrations.length, "migrations were executed correctly")
|
||||
log(executedMigrations)
|
||||
if (executedMigrations.length > 0) {
|
||||
log(executedMigrations.length, "new migrations executed")
|
||||
log("-------------------")
|
||||
|
||||
} if (metricsMigrations.length > 0) {
|
||||
log(executedMetricsMigrations.length, "of", metricsMigrations.length, "metrics migrations were executed correctly")
|
||||
log(executedMetricsMigrations)
|
||||
} if (executedMetricsMigrations.length > 0) {
|
||||
log(executedMetricsMigrations.length, "new metrics migrations executed")
|
||||
log("-------------------")
|
||||
}
|
||||
}
|
||||
} */
|
||||
|
|
@ -1,49 +1,43 @@
|
|||
import { DataSource, EntityManager } from "typeorm"
|
||||
import crypto from 'crypto';
|
||||
import UserStorage from './userStorage.js';
|
||||
import TransactionsQueue from "./transactionsQueue.js";
|
||||
import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js";
|
||||
import { UserOffer } from "./entity/UserOffer.js";
|
||||
import { StorageInterface } from "./storageInterface.js";
|
||||
export default class {
|
||||
|
||||
|
||||
DB: DataSource | EntityManager
|
||||
txQueue: TransactionsQueue
|
||||
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
|
||||
this.DB = DB
|
||||
this.txQueue = txQueue
|
||||
dbs: StorageInterface
|
||||
constructor(dbs: StorageInterface) {
|
||||
this.dbs = dbs
|
||||
}
|
||||
async AddDefaultUserOffer(appUserId: string): Promise<UserOffer> {
|
||||
const newUserOffer = this.DB.getRepository(UserOffer).create({
|
||||
return this.dbs.CreateAndSave<UserOffer>('UserOffer', {
|
||||
app_user_id: appUserId,
|
||||
offer_id: appUserId,
|
||||
label: 'Default NIP-69 Offer',
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserOffer>({ exec: async db => db.getRepository(UserOffer).save(newUserOffer), dbTx: false, description: `add default offer for ${appUserId}` })
|
||||
}
|
||||
async AddUserOffer(appUserId: string, req: Partial<UserOffer>): Promise<UserOffer> {
|
||||
const newUserOffer = this.DB.getRepository(UserOffer).create({
|
||||
const offer = await this.dbs.CreateAndSave<UserOffer>('UserOffer', {
|
||||
...req,
|
||||
app_user_id: appUserId,
|
||||
offer_id: crypto.randomBytes(34).toString('hex')
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserOffer>({ exec: async db => db.getRepository(UserOffer).save(newUserOffer), dbTx: false, description: `add offer for ${appUserId}: ${req.label} ` })
|
||||
return offer
|
||||
}
|
||||
|
||||
async DeleteUserOffer(appUserId: string, offerId: string, entityManager = this.DB) {
|
||||
await entityManager.getRepository(UserOffer).delete({ app_user_id: appUserId, offer_id: offerId })
|
||||
async DeleteUserOffer(appUserId: string, offerId: string, txId?: string) {
|
||||
await this.dbs.Delete<UserOffer>('UserOffer', { app_user_id: appUserId, offer_id: offerId }, txId)
|
||||
}
|
||||
async UpdateUserOffer(app_user_id: string, offerId: string, req: Partial<UserOffer>) {
|
||||
return this.DB.getRepository(UserOffer).update({ app_user_id, offer_id: offerId }, req)
|
||||
async UpdateUserOffer(app_user_id: string, offerId: string, req: Partial<UserOffer>, txId?: string) {
|
||||
return this.dbs.Update<UserOffer>('UserOffer', { app_user_id, offer_id: offerId }, req, txId)
|
||||
}
|
||||
|
||||
async GetUserOffers(app_user_id: string): Promise<UserOffer[]> {
|
||||
return this.DB.getRepository(UserOffer).find({ where: { app_user_id } })
|
||||
return this.dbs.Find<UserOffer>('UserOffer', { where: { app_user_id } })
|
||||
}
|
||||
async GetUserOffer(app_user_id: string, offer_id: string): Promise<UserOffer | null> {
|
||||
return this.DB.getRepository(UserOffer).findOne({ where: { app_user_id, offer_id } })
|
||||
return this.dbs.FindOne<UserOffer>('UserOffer', { where: { app_user_id, offer_id } })
|
||||
}
|
||||
async GetOffer(offer_id: string): Promise<UserOffer | null> {
|
||||
return this.DB.getRepository(UserOffer).findOne({ where: { offer_id } })
|
||||
return this.dbs.FindOne<UserOffer>('UserOffer', { where: { offer_id } })
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
import crypto from 'crypto';
|
||||
import { Between, DataSource, EntityManager, FindOperator, IsNull, LessThanOrEqual, MoreThan, MoreThanOrEqual, Not } from "typeorm"
|
||||
import { Between, FindOperator, IsNull, LessThanOrEqual, MoreThan, MoreThanOrEqual, Not } from "typeorm"
|
||||
import { User } from './entity/User.js';
|
||||
import { UserTransactionPayment } from './entity/UserTransactionPayment.js';
|
||||
import { EphemeralKeyType, UserEphemeralKey } from './entity/UserEphemeralKey.js';
|
||||
|
|
@ -13,20 +13,19 @@ import { UserToUserPayment } from './entity/UserToUserPayment.js';
|
|||
import { Application } from './entity/Application.js';
|
||||
import TransactionsQueue from "./transactionsQueue.js";
|
||||
import { LoggedEvent } from './eventsLog.js';
|
||||
import { StorageInterface } from './storageInterface.js';
|
||||
export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo, offerId?: string, payerData?: Record<string, string> }
|
||||
export const defaultInvoiceExpiry = 60 * 60
|
||||
export default class {
|
||||
DB: DataSource | EntityManager
|
||||
dbs: StorageInterface
|
||||
userStorage: UserStorage
|
||||
txQueue: TransactionsQueue
|
||||
constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) {
|
||||
this.DB = DB
|
||||
constructor(dbs: StorageInterface, userStorage: UserStorage) {
|
||||
this.dbs = dbs
|
||||
this.userStorage = userStorage
|
||||
this.txQueue = txQueue
|
||||
}
|
||||
|
||||
async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, dbTx: EntityManager | DataSource) {
|
||||
const newAddressTransaction = dbTx.getRepository(AddressReceivingTransaction).create({
|
||||
async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, txId: string) {
|
||||
return this.dbs.CreateAndSave<AddressReceivingTransaction>('AddressReceivingTransaction', {
|
||||
user_address: address,
|
||||
tx_hash: txHash,
|
||||
output_index: outputIndex,
|
||||
|
|
@ -36,12 +35,11 @@ export default class {
|
|||
internal,
|
||||
broadcast_height: height,
|
||||
confs: internal ? 10 : 0
|
||||
})
|
||||
return dbTx.getRepository(AddressReceivingTransaction).save(newAddressTransaction)
|
||||
}, txId)
|
||||
}
|
||||
|
||||
GetUserReceivingTransactions(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<AddressReceivingTransaction[]> {
|
||||
return entityManager.getRepository(AddressReceivingTransaction).find({
|
||||
GetUserReceivingTransactions(userId: string, fromIndex: number, take = 50, txId?: string): Promise<AddressReceivingTransaction[]> {
|
||||
return this.dbs.Find<AddressReceivingTransaction>('AddressReceivingTransaction', {
|
||||
where: {
|
||||
user_address: { user: { user_id: userId } },
|
||||
serial_id: MoreThanOrEqual(fromIndex),
|
||||
|
|
@ -51,33 +49,32 @@ export default class {
|
|||
paid_at_unix: 'DESC'
|
||||
},
|
||||
take
|
||||
})
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async GetExistingUserAddress(userId: string, linkedApplication: Application, entityManager = this.DB) {
|
||||
return entityManager.getRepository(UserReceivingAddress).findOne({ where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } })
|
||||
async GetExistingUserAddress(userId: string, linkedApplication: Application, txId?: string) {
|
||||
return this.dbs.FindOne<UserReceivingAddress>('UserReceivingAddress', { where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }, txId)
|
||||
}
|
||||
|
||||
async AddUserAddress(user: User, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise<UserReceivingAddress> {
|
||||
const newUserAddress = this.DB.getRepository(UserReceivingAddress).create({
|
||||
async AddUserAddress(user: User, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}, txId?: string): Promise<UserReceivingAddress> {
|
||||
return this.dbs.CreateAndSave<UserReceivingAddress>('UserReceivingAddress', {
|
||||
address,
|
||||
callbackUrl: opts.callbackUrl || "",
|
||||
linkedApplication: opts.linkedApplication,
|
||||
user
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${user.user_id} linked to ${opts.linkedApplication?.app_id}: ${address} ` })
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, dbTx: EntityManager | DataSource) {
|
||||
async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, txId: string) {
|
||||
const i: Partial<UserReceivingInvoice> = { paid_at_unix: Math.floor(Date.now() / 1000), paid_amount: amount, service_fee: serviceFee, internal }
|
||||
if (!internal) {
|
||||
i.paidByLnd = true
|
||||
}
|
||||
return dbTx.getRepository(UserReceivingInvoice).update(invoice.serial_id, i)
|
||||
return this.dbs.Update<UserReceivingInvoice>('UserReceivingInvoice', invoice.serial_id, i, txId)
|
||||
}
|
||||
|
||||
GetUserInvoicesFlaggedAsPaid(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserReceivingInvoice[]> {
|
||||
return entityManager.getRepository(UserReceivingInvoice).find({
|
||||
GetUserInvoicesFlaggedAsPaid(userId: string, fromIndex: number, take = 50, txId?: string): Promise<UserReceivingInvoice[]> {
|
||||
return this.dbs.Find<UserReceivingInvoice>('UserReceivingInvoice', {
|
||||
where: {
|
||||
user: {
|
||||
user_id: userId
|
||||
|
|
@ -89,11 +86,11 @@ export default class {
|
|||
paid_at_unix: 'DESC'
|
||||
},
|
||||
take
|
||||
})
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }, providerDestination?: string): Promise<UserReceivingInvoice> {
|
||||
const newUserInvoice = this.DB.getRepository(UserReceivingInvoice).create({
|
||||
async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }, providerDestination?: string, txId?: string): Promise<UserReceivingInvoice> {
|
||||
return this.dbs.CreateAndSave<UserReceivingInvoice>('UserReceivingInvoice', {
|
||||
invoice: invoice,
|
||||
callbackUrl: options.callbackUrl,
|
||||
user: user,
|
||||
|
|
@ -105,60 +102,34 @@ export default class {
|
|||
liquidityProvider: providerDestination,
|
||||
offer_id: options.offerId,
|
||||
payer_data: options.payerData,
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserReceivingInvoice>({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false, description: `add invoice for ${user.user_id} linked to ${options.linkedApplication?.app_id}: ${invoice} ` })
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async GetAddressOwner(address: string, entityManager = this.DB): Promise<UserReceivingAddress | null> {
|
||||
return entityManager.getRepository(UserReceivingAddress).findOne({
|
||||
where: {
|
||||
address
|
||||
}
|
||||
})
|
||||
async GetAddressOwner(address: string, txId?: string): Promise<UserReceivingAddress | null> {
|
||||
return this.dbs.FindOne<UserReceivingAddress>('UserReceivingAddress', { where: { address } }, txId)
|
||||
}
|
||||
|
||||
async GetAddressReceivingTransactionOwner(address: string, txHash: string, entityManager = this.DB): Promise<AddressReceivingTransaction | null> {
|
||||
return entityManager.getRepository(AddressReceivingTransaction).findOne({
|
||||
where: {
|
||||
user_address: { address },
|
||||
tx_hash: txHash
|
||||
}
|
||||
})
|
||||
async GetAddressReceivingTransactionOwner(address: string, txHash: string, txId?: string): Promise<AddressReceivingTransaction | null> {
|
||||
return this.dbs.FindOne<AddressReceivingTransaction>('AddressReceivingTransaction', { where: { user_address: { address }, tx_hash: txHash } }, txId)
|
||||
}
|
||||
async GetUserTransactionPaymentOwner(address: string, txHash: string, entityManager = this.DB): Promise<UserTransactionPayment | null> {
|
||||
return entityManager.getRepository(UserTransactionPayment).findOne({
|
||||
where: {
|
||||
address,
|
||||
tx_hash: txHash
|
||||
}
|
||||
})
|
||||
async GetUserTransactionPaymentOwner(address: string, txHash: string, txId?: string): Promise<UserTransactionPayment | null> {
|
||||
return this.dbs.FindOne<UserTransactionPayment>('UserTransactionPayment', { where: { address, tx_hash: txHash } }, txId)
|
||||
}
|
||||
|
||||
async GetInvoiceOwner(paymentRequest: string, entityManager = this.DB): Promise<UserReceivingInvoice | null> {
|
||||
return entityManager.getRepository(UserReceivingInvoice).findOne({
|
||||
where: {
|
||||
invoice: paymentRequest
|
||||
}
|
||||
})
|
||||
async GetInvoiceOwner(paymentRequest: string, txId?: string): Promise<UserReceivingInvoice | null> {
|
||||
return this.dbs.FindOne<UserReceivingInvoice>('UserReceivingInvoice', { where: { invoice: paymentRequest } }, txId)
|
||||
}
|
||||
async GetPaymentOwner(paymentRequest: string, entityManager = this.DB): Promise<UserInvoicePayment | null> {
|
||||
return entityManager.getRepository(UserInvoicePayment).findOne({
|
||||
where: {
|
||||
invoice: paymentRequest
|
||||
}
|
||||
})
|
||||
async GetPaymentOwner(paymentRequest: string, txId?: string): Promise<UserInvoicePayment | null> {
|
||||
return this.dbs.FindOne<UserInvoicePayment>('UserInvoicePayment', { where: { invoice: paymentRequest } }, txId)
|
||||
}
|
||||
async GetUser2UserPayment(serialId: number, entityManager = this.DB): Promise<UserToUserPayment | null> {
|
||||
return entityManager.getRepository(UserToUserPayment).findOne({
|
||||
where: {
|
||||
serial_id: serialId
|
||||
}
|
||||
})
|
||||
async GetUser2UserPayment(serialId: number, txId?: string): Promise<UserToUserPayment | null> {
|
||||
return this.dbs.FindOne<UserToUserPayment>('UserToUserPayment', { where: { serial_id: serialId } }, txId)
|
||||
}
|
||||
|
||||
async AddPendingExternalPayment(userId: string, invoice: string, amounts: { payAmount: number, serviceFee: number, networkFee: number }, linkedApplication: Application, liquidityProvider: string | undefined, dbTx: DataSource | EntityManager, debitNpub?: string): Promise<UserInvoicePayment> {
|
||||
const newPayment = dbTx.getRepository(UserInvoicePayment).create({
|
||||
user: await this.userStorage.GetUser(userId, dbTx),
|
||||
async AddPendingExternalPayment(userId: string, invoice: string, amounts: { payAmount: number, serviceFee: number, networkFee: number }, linkedApplication: Application, liquidityProvider: string | undefined, txId: string, debitNpub?: string): Promise<UserInvoicePayment> {
|
||||
const user = await this.userStorage.GetUser(userId)
|
||||
return this.dbs.CreateAndSave<UserInvoicePayment>('UserInvoicePayment', {
|
||||
user,
|
||||
paid_amount: amounts.payAmount,
|
||||
invoice,
|
||||
routing_fees: amounts.networkFee,
|
||||
|
|
@ -168,18 +139,17 @@ export default class {
|
|||
linkedApplication,
|
||||
liquidityProvider,
|
||||
debit_to_pub: debitNpub
|
||||
})
|
||||
return dbTx.getRepository(UserInvoicePayment).save(newPayment)
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async GetMaxPaymentIndex(entityManager = this.DB) {
|
||||
return entityManager.getRepository(UserInvoicePayment).find({ order: { paymentIndex: 'DESC' }, take: 1 })
|
||||
async GetMaxPaymentIndex(txId?: string) {
|
||||
return this.dbs.Find<UserInvoicePayment>('UserInvoicePayment', { order: { paymentIndex: 'DESC' }, take: 1 }, txId)
|
||||
}
|
||||
|
||||
async SetExternalPaymentIndex(invoicePaymentSerialId: number, index: number, entityManager = this.DB) {
|
||||
return entityManager.getRepository(UserInvoicePayment).update(invoicePaymentSerialId, { paymentIndex: index })
|
||||
async SetExternalPaymentIndex(invoicePaymentSerialId: number, index: number, txId?: string) {
|
||||
return this.dbs.Update<UserInvoicePayment>('UserInvoicePayment', invoicePaymentSerialId, { paymentIndex: index }, txId)
|
||||
}
|
||||
async UpdateExternalPayment(invoicePaymentSerialId: number, routingFees: number, serviceFees: number, success: boolean, providerDestination?: string, entityManager = this.DB) {
|
||||
async UpdateExternalPayment(invoicePaymentSerialId: number, routingFees: number, serviceFees: number, success: boolean, providerDestination?: string, txId?: string) {
|
||||
const up: Partial<UserInvoicePayment> = {
|
||||
routing_fees: routingFees,
|
||||
service_fees: serviceFees,
|
||||
|
|
@ -188,11 +158,12 @@ export default class {
|
|||
if (providerDestination) {
|
||||
up.liquidityProvider = providerDestination
|
||||
}
|
||||
return entityManager.getRepository(UserInvoicePayment).update(invoicePaymentSerialId, up)
|
||||
return this.dbs.Update<UserInvoicePayment>('UserInvoicePayment', invoicePaymentSerialId, up, txId)
|
||||
}
|
||||
|
||||
async AddInternalPayment(userId: string, invoice: string, amount: number, serviceFees: number, linkedApplication: Application, debitNpub?: string): Promise<UserInvoicePayment> {
|
||||
const newPayment = this.DB.getRepository(UserInvoicePayment).create({
|
||||
const user = await this.userStorage.GetUser(userId)
|
||||
return this.dbs.CreateAndSave<UserInvoicePayment>('UserInvoicePayment', {
|
||||
user: await this.userStorage.GetUser(userId),
|
||||
paid_amount: amount,
|
||||
invoice,
|
||||
|
|
@ -203,11 +174,10 @@ export default class {
|
|||
linkedApplication,
|
||||
debit_to_pub: debitNpub
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserInvoicePayment>({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add internal invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` })
|
||||
}
|
||||
|
||||
GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserInvoicePayment[]> {
|
||||
return entityManager.getRepository(UserInvoicePayment).find({
|
||||
GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, txId?: string): Promise<UserInvoicePayment[]> {
|
||||
return this.dbs.Find<UserInvoicePayment>('UserInvoicePayment', {
|
||||
where: {
|
||||
user: {
|
||||
user_id: userId
|
||||
|
|
@ -219,10 +189,10 @@ export default class {
|
|||
paid_at_unix: 'DESC'
|
||||
},
|
||||
take
|
||||
})
|
||||
}, txId)
|
||||
}
|
||||
|
||||
GetUserDebitPayments(userId: string, sinceUnix: number, debitToNpub: string, entityManager = this.DB): Promise<UserInvoicePayment[]> {
|
||||
GetUserDebitPayments(userId: string, sinceUnix: number, debitToNpub: string, txId?: string): Promise<UserInvoicePayment[]> {
|
||||
const pending = {
|
||||
user: { user_id: userId },
|
||||
debit_to_pub: debitToNpub,
|
||||
|
|
@ -233,16 +203,12 @@ export default class {
|
|||
debit_to_pub: debitToNpub,
|
||||
paid_at_unix: MoreThan(sinceUnix),
|
||||
}
|
||||
return entityManager.getRepository(UserInvoicePayment).find({
|
||||
where: [pending, paid],
|
||||
order: {
|
||||
paid_at_unix: 'DESC'
|
||||
}
|
||||
})
|
||||
return this.dbs.Find<UserInvoicePayment>('UserInvoicePayment', { where: [pending, paid], order: { paid_at_unix: 'DESC' } }, txId)
|
||||
}
|
||||
|
||||
async AddUserTransactionPayment(userId: string, address: string, txHash: string, txOutput: number, amount: number, chainFees: number, serviceFees: number, internal: boolean, height: number, linkedApplication: Application): Promise<UserTransactionPayment> {
|
||||
const newTx = this.DB.getRepository(UserTransactionPayment).create({
|
||||
const user = await this.userStorage.GetUser(userId)
|
||||
return this.dbs.CreateAndSave<UserTransactionPayment>('UserTransactionPayment', {
|
||||
user: await this.userStorage.GetUser(userId),
|
||||
address,
|
||||
paid_amount: amount,
|
||||
|
|
@ -256,11 +222,10 @@ export default class {
|
|||
confs: internal ? 10 : 0,
|
||||
linkedApplication
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserTransactionPayment>({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false, description: `add tx payment for ${userId} linked to ${linkedApplication.app_id}: ${address}, amt: ${amount} ` })
|
||||
}
|
||||
|
||||
GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserTransactionPayment[]> {
|
||||
return entityManager.getRepository(UserTransactionPayment).find({
|
||||
GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, txId?: string): Promise<UserTransactionPayment[]> {
|
||||
return this.dbs.Find<UserTransactionPayment>('UserTransactionPayment', {
|
||||
where: {
|
||||
user: {
|
||||
user_id: userId
|
||||
|
|
@ -272,70 +237,64 @@ export default class {
|
|||
paid_at_unix: 'DESC'
|
||||
},
|
||||
take
|
||||
})
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async GetPendingTransactions(entityManager = this.DB) {
|
||||
const incoming = await entityManager.getRepository(AddressReceivingTransaction).find({ where: { confs: 0 } })
|
||||
const outgoing = await entityManager.getRepository(UserTransactionPayment).find({ where: { confs: 0 } })
|
||||
async GetPendingTransactions(txId?: string) {
|
||||
const incoming = await this.dbs.Find<AddressReceivingTransaction>('AddressReceivingTransaction', { where: { confs: 0 } }, txId)
|
||||
const outgoing = await this.dbs.Find<UserTransactionPayment>('UserTransactionPayment', { where: { confs: 0 } }, txId)
|
||||
return { incoming, outgoing }
|
||||
}
|
||||
|
||||
async UpdateAddressReceivingTransaction(serialId: number, update: Partial<AddressReceivingTransaction>, entityManager = this.DB) {
|
||||
return entityManager.getRepository(AddressReceivingTransaction).update(serialId, update)
|
||||
async UpdateAddressReceivingTransaction(serialId: number, update: Partial<AddressReceivingTransaction>, txId?: string) {
|
||||
return this.dbs.Update<AddressReceivingTransaction>('AddressReceivingTransaction', serialId, update, txId)
|
||||
}
|
||||
async UpdateUserTransactionPayment(serialId: number, update: Partial<UserTransactionPayment>, entityManager = this.DB) {
|
||||
await entityManager.getRepository(UserTransactionPayment).update(serialId, update)
|
||||
async UpdateUserTransactionPayment(serialId: number, update: Partial<UserTransactionPayment>, txId?: string) {
|
||||
return this.dbs.Update<UserTransactionPayment>('UserTransactionPayment', serialId, update, txId)
|
||||
}
|
||||
|
||||
|
||||
async AddUserEphemeralKey(userId: string, keyType: EphemeralKeyType, linkedApplication: Application): Promise<UserEphemeralKey> {
|
||||
const found = await this.DB.getRepository(UserEphemeralKey).findOne({ where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } })
|
||||
const found = await this.dbs.FindOne<UserEphemeralKey>('UserEphemeralKey', { where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } })
|
||||
if (found) {
|
||||
return found
|
||||
}
|
||||
const newKey = this.DB.getRepository(UserEphemeralKey).create({
|
||||
|
||||
return this.dbs.CreateAndSave<UserEphemeralKey>('UserEphemeralKey', {
|
||||
user: await this.userStorage.GetUser(userId),
|
||||
key: crypto.randomBytes(31).toString('hex'),
|
||||
type: keyType,
|
||||
linkedApplication
|
||||
})
|
||||
return this.txQueue.PushToQueue<UserEphemeralKey>({ exec: async db => db.getRepository(UserEphemeralKey).save(newKey), dbTx: false })
|
||||
}
|
||||
|
||||
async UseUserEphemeralKey(key: string, keyType: EphemeralKeyType, persist = false, entityManager = this.DB): Promise<UserEphemeralKey> {
|
||||
const found = await entityManager.getRepository(UserEphemeralKey).findOne({
|
||||
where: {
|
||||
key: key,
|
||||
type: keyType
|
||||
}
|
||||
})
|
||||
async UseUserEphemeralKey(key: string, keyType: EphemeralKeyType, persist = false, txId?: string): Promise<UserEphemeralKey> {
|
||||
const found = await this.dbs.FindOne<UserEphemeralKey>('UserEphemeralKey', { where: { key: key, type: keyType } })
|
||||
if (!found) {
|
||||
throw new Error("the provided ephemeral key is invalid")
|
||||
}
|
||||
if (!persist) {
|
||||
await entityManager.getRepository(UserEphemeralKey).delete(found.serial_id)
|
||||
await this.dbs.Delete<UserEphemeralKey>('UserEphemeralKey', found.serial_id, txId)
|
||||
}
|
||||
return found
|
||||
}
|
||||
|
||||
async AddPendingUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, dbTx: DataSource | EntityManager) {
|
||||
const entry = dbTx.getRepository(UserToUserPayment).create({
|
||||
from_user: await this.userStorage.GetUser(fromUserId, dbTx),
|
||||
to_user: await this.userStorage.GetUser(toUserId, dbTx),
|
||||
async AddPendingUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, txId: string) {
|
||||
return this.dbs.CreateAndSave<UserToUserPayment>('UserToUserPayment', {
|
||||
from_user: await this.userStorage.GetUser(fromUserId, txId),
|
||||
to_user: await this.userStorage.GetUser(toUserId, txId),
|
||||
paid_at_unix: 0,
|
||||
paid_amount: amount,
|
||||
service_fees: fee,
|
||||
linkedApplication
|
||||
})
|
||||
return dbTx.getRepository(UserToUserPayment).save(entry)
|
||||
}, txId)
|
||||
}
|
||||
async SetPendingUserToUserPaymentAsPaid(serialId: number, dbTx: DataSource | EntityManager) {
|
||||
dbTx.getRepository(UserToUserPayment).update(serialId, { paid_at_unix: Math.floor(Date.now() / 1000) })
|
||||
async SetPendingUserToUserPaymentAsPaid(serialId: number, txId: string) {
|
||||
return this.dbs.Update<UserToUserPayment>('UserToUserPayment', serialId, { paid_at_unix: Math.floor(Date.now() / 1000) }, txId)
|
||||
}
|
||||
|
||||
GetUserToUserReceivedPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB) {
|
||||
return entityManager.getRepository(UserToUserPayment).find({
|
||||
GetUserToUserReceivedPayments(userId: string, fromIndex: number, take = 50, txId?: string) {
|
||||
return this.dbs.Find<UserToUserPayment>('UserToUserPayment', {
|
||||
where: {
|
||||
to_user: {
|
||||
user_id: userId
|
||||
|
|
@ -347,11 +306,12 @@ export default class {
|
|||
paid_at_unix: 'DESC'
|
||||
},
|
||||
take
|
||||
})
|
||||
}, txId)
|
||||
}
|
||||
|
||||
GetUserToUserSentPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB) {
|
||||
return entityManager.getRepository(UserToUserPayment).find({
|
||||
GetUserToUserSentPayments(userId: string, fromIndex: number, take = 50, txId?: string) {
|
||||
|
||||
return this.dbs.Find<UserToUserPayment>('UserToUserPayment', {
|
||||
where: {
|
||||
from_user: {
|
||||
user_id: userId
|
||||
|
|
@ -363,19 +323,19 @@ export default class {
|
|||
paid_at_unix: 'DESC'
|
||||
},
|
||||
take
|
||||
})
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async GetTotalFeesPaidInApp(app: Application | null, entityManager = this.DB) {
|
||||
async GetTotalFeesPaidInApp(app: Application | null, txId?: string) {
|
||||
if (!app) {
|
||||
return 0
|
||||
}
|
||||
const entries = await Promise.all([
|
||||
entityManager.getRepository(UserReceivingInvoice).sum("service_fee", { linkedApplication: { app_id: app.app_id } }),
|
||||
entityManager.getRepository(AddressReceivingTransaction).sum("service_fee", { user_address: { linkedApplication: { app_id: app.app_id } } }),
|
||||
entityManager.getRepository(UserInvoicePayment).sum("service_fees", { linkedApplication: { app_id: app.app_id } }),
|
||||
entityManager.getRepository(UserTransactionPayment).sum("service_fees", { linkedApplication: { app_id: app.app_id } }),
|
||||
entityManager.getRepository(UserToUserPayment).sum("service_fees", { linkedApplication: { app_id: app.app_id } })
|
||||
this.dbs.Sum<UserReceivingInvoice>('UserReceivingInvoice', "service_fee", { linkedApplication: { app_id: app.app_id } }, txId),
|
||||
this.dbs.Sum<AddressReceivingTransaction>('AddressReceivingTransaction', "service_fee", { user_address: { linkedApplication: { app_id: app.app_id } } }, txId),
|
||||
this.dbs.Sum<UserInvoicePayment>('UserInvoicePayment', "service_fees", { linkedApplication: { app_id: app.app_id } }, txId),
|
||||
this.dbs.Sum<UserTransactionPayment>('UserTransactionPayment', "service_fees", { linkedApplication: { app_id: app.app_id } }, txId),
|
||||
this.dbs.Sum<UserToUserPayment>('UserToUserPayment', "service_fees", { linkedApplication: { app_id: app.app_id } }, txId)
|
||||
])
|
||||
let total = 0
|
||||
entries.forEach(e => {
|
||||
|
|
@ -386,7 +346,7 @@ export default class {
|
|||
return total
|
||||
}
|
||||
|
||||
async GetAppOperations(application: Application | null, { from, to }: { from?: number, to?: number }, entityManager = this.DB) {
|
||||
async GetAppOperations(application: Application | null, { from, to }: { from?: number, to?: number }) {
|
||||
const q = application ? { app_id: application.app_id } : IsNull()
|
||||
let time: { created_at?: FindOperator<Date> } = {}
|
||||
if (!!from && !!to) {
|
||||
|
|
@ -398,13 +358,14 @@ export default class {
|
|||
}
|
||||
|
||||
const [receivingInvoices, receivingAddresses, outgoingInvoices, outgoingTransactions, userToUser] = await Promise.all([
|
||||
entityManager.getRepository(UserReceivingInvoice).find({ where: { linkedApplication: q, ...time } }),
|
||||
entityManager.getRepository(UserReceivingAddress).find({ where: { linkedApplication: q, ...time } }),
|
||||
entityManager.getRepository(UserInvoicePayment).find({ where: { linkedApplication: q, ...time } }),
|
||||
entityManager.getRepository(UserTransactionPayment).find({ where: { linkedApplication: q, ...time } }),
|
||||
entityManager.getRepository(UserToUserPayment).find({ where: { linkedApplication: q, ...time } })
|
||||
this.dbs.Find<UserReceivingInvoice>('UserReceivingInvoice', { where: { linkedApplication: q, ...time } }),
|
||||
this.dbs.Find<UserReceivingAddress>('UserReceivingAddress', { where: { linkedApplication: q, ...time } }),
|
||||
this.dbs.Find<UserInvoicePayment>('UserInvoicePayment', { where: { linkedApplication: q, ...time } }),
|
||||
this.dbs.Find<UserTransactionPayment>('UserTransactionPayment', { where: { linkedApplication: q, ...time } }),
|
||||
this.dbs.Find<UserToUserPayment>('UserToUserPayment', { where: { linkedApplication: q, ...time } })
|
||||
])
|
||||
const receivingTransactions = await Promise.all(receivingAddresses.map(addr => entityManager.getRepository(AddressReceivingTransaction).find({ where: { user_address: { serial_id: addr.serial_id }, ...time } })))
|
||||
const receivingTransactions = await Promise.all(receivingAddresses.map(addr =>
|
||||
this.dbs.Find<AddressReceivingTransaction>('AddressReceivingTransaction', { where: { user_address: { serial_id: addr.serial_id }, ...time } })))
|
||||
return {
|
||||
receivingInvoices, receivingAddresses, receivingTransactions,
|
||||
outgoingInvoices, outgoingTransactions,
|
||||
|
|
@ -412,11 +373,11 @@ export default class {
|
|||
}
|
||||
}
|
||||
|
||||
async UserHasOutgoingOperation(userId: string, entityManager = this.DB) {
|
||||
async UserHasOutgoingOperation(userId: string) {
|
||||
const [i, tx, u2u] = await Promise.all([
|
||||
entityManager.getRepository(UserInvoicePayment).findOne({ where: { user: { user_id: userId } } }),
|
||||
entityManager.getRepository(UserTransactionPayment).findOne({ where: { user: { user_id: userId } } }),
|
||||
entityManager.getRepository(UserToUserPayment).findOne({ where: { from_user: { user_id: userId } } }),
|
||||
this.dbs.FindOne<UserInvoicePayment>('UserInvoicePayment', { where: { user: { user_id: userId } } }),
|
||||
this.dbs.FindOne<UserTransactionPayment>('UserTransactionPayment', { where: { user: { user_id: userId } } }),
|
||||
this.dbs.FindOne<UserToUserPayment>('UserToUserPayment', { where: { from_user: { user_id: userId } } }),
|
||||
])
|
||||
return !!i || !!tx || !!u2u
|
||||
}
|
||||
|
|
@ -424,42 +385,50 @@ export default class {
|
|||
async VerifyDbEvent(e: LoggedEvent) {
|
||||
switch (e.type) {
|
||||
case "new_invoice":
|
||||
return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } })
|
||||
return orFail(this.dbs.FindOne<UserReceivingInvoice>('UserReceivingInvoice', { where: { invoice: e.data, user: { user_id: e.userId } } }))
|
||||
case 'new_address':
|
||||
return this.DB.getRepository(UserReceivingAddress).findOneOrFail({ where: { address: e.data, user: { user_id: e.userId } } })
|
||||
return orFail(this.dbs.FindOne<UserReceivingAddress>('UserReceivingAddress', { where: { address: e.data, user: { user_id: e.userId } } }))
|
||||
case 'invoice_paid':
|
||||
return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId }, paid_at_unix: MoreThan(0) } })
|
||||
return orFail(this.dbs.FindOne<UserReceivingInvoice>('UserReceivingInvoice', { where: { invoice: e.data, user: { user_id: e.userId }, paid_at_unix: MoreThan(0) } }))
|
||||
case 'invoice_payment':
|
||||
return this.DB.getRepository(UserInvoicePayment).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } })
|
||||
return orFail(this.dbs.FindOne<UserInvoicePayment>('UserInvoicePayment', { where: { invoice: e.data, user: { user_id: e.userId } } }))
|
||||
case 'address_paid':
|
||||
const [receivingAddress, receivedHash] = e.data.split(":")
|
||||
return this.DB.getRepository(AddressReceivingTransaction).findOneOrFail({ where: { user_address: { address: receivingAddress }, tx_hash: receivedHash, confs: MoreThan(0) } })
|
||||
return orFail(this.dbs.FindOne<AddressReceivingTransaction>('AddressReceivingTransaction', { where: { user_address: { address: receivingAddress }, tx_hash: receivedHash, confs: MoreThan(0) } }))
|
||||
case 'address_payment':
|
||||
const [sentAddress, sentHash] = e.data.split(":")
|
||||
return this.DB.getRepository(UserTransactionPayment).findOneOrFail({ where: { address: sentAddress, tx_hash: sentHash, user: { user_id: e.userId } } })
|
||||
return orFail(this.dbs.FindOne<UserTransactionPayment>('UserTransactionPayment', { where: { address: sentAddress, tx_hash: sentHash, user: { user_id: e.userId } } }))
|
||||
case 'u2u_receiver':
|
||||
return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { from_user: { user_id: e.data }, to_user: { user_id: e.userId } } })
|
||||
return orFail(this.dbs.FindOne<UserToUserPayment>('UserToUserPayment', { where: { from_user: { user_id: e.data }, to_user: { user_id: e.userId } } }))
|
||||
case 'u2u_sender':
|
||||
return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { to_user: { user_id: e.data }, from_user: { user_id: e.userId } } })
|
||||
return orFail(this.dbs.FindOne<UserToUserPayment>('UserToUserPayment', { where: { to_user: { user_id: e.data }, from_user: { user_id: e.userId } } }))
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async GetTotalUsersBalance(entityManager = this.DB) {
|
||||
const total = await entityManager.getRepository(User).sum("balance_sats")
|
||||
async GetTotalUsersBalance(txId?: string) {
|
||||
const total = await this.dbs.Sum<User>('User', "balance_sats", {})
|
||||
return total || 0
|
||||
}
|
||||
|
||||
async GetPendingPayments(entityManager = this.DB) {
|
||||
return entityManager.getRepository(UserInvoicePayment).find({ where: { paid_at_unix: 0 } })
|
||||
async GetPendingPayments(txId?: string) {
|
||||
return this.dbs.Find<UserInvoicePayment>('UserInvoicePayment', { where: { paid_at_unix: 0 } })
|
||||
}
|
||||
|
||||
async GetOfferInvoices(offerId: string, includeUnpaid: boolean, entityManager = this.DB) {
|
||||
async GetOfferInvoices(offerId: string, includeUnpaid: boolean, txId?: string) {
|
||||
const where: { offer_id: string, paid_at_unix?: FindOperator<number> } = { offer_id: offerId }
|
||||
if (!includeUnpaid) {
|
||||
where.paid_at_unix = MoreThan(0)
|
||||
}
|
||||
return entityManager.getRepository(UserReceivingInvoice).find({ where })
|
||||
return this.dbs.Find<UserReceivingInvoice>('UserReceivingInvoice', { where })
|
||||
}
|
||||
}
|
||||
|
||||
const orFail = async <T>(resultPromise: Promise<T | null>) => {
|
||||
const result = await resultPromise
|
||||
if (!result) {
|
||||
throw new Error("the requested value was not found")
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
@ -1,23 +1,19 @@
|
|||
import { DataSource, EntityManager } from "typeorm"
|
||||
import { Product } from "./entity/Product.js"
|
||||
import { User } from "./entity/User.js"
|
||||
import TransactionsQueue, { TX } from "./transactionsQueue.js";
|
||||
import { StorageInterface } from "./storageInterface.js";
|
||||
export default class {
|
||||
DB: DataSource | EntityManager
|
||||
txQueue: TransactionsQueue
|
||||
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) {
|
||||
this.DB = DB
|
||||
this.txQueue = txQueue
|
||||
dbs: StorageInterface
|
||||
constructor(dbs: StorageInterface) {
|
||||
this.dbs = dbs
|
||||
}
|
||||
async AddProduct(name: string, priceSats: number, user: User): Promise<Product> {
|
||||
const newProduct = this.DB.getRepository(Product).create({
|
||||
return this.dbs.CreateAndSave<Product>('Product', {
|
||||
name: name, price_sats: priceSats, owner: user
|
||||
})
|
||||
return this.txQueue.PushToQueue<Product>({ exec: async db => db.getRepository(Product).save(newProduct), dbTx: false })
|
||||
}
|
||||
|
||||
async GetProduct(id: string, entityManager = this.DB): Promise<Product> {
|
||||
const product = await entityManager.getRepository(Product).findOne({ where: { product_id: id } })
|
||||
async GetProduct(id: string, txId?: string): Promise<Product> {
|
||||
const product = await this.dbs.FindOne<Product>('Product', { where: { product_id: id } }, txId)
|
||||
if (!product) {
|
||||
throw new Error("product not found")
|
||||
}
|
||||
|
|
|
|||
164
src/services/storage/storageInterface.ts
Normal file
164
src/services/storage/storageInterface.ts
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
import { fork } from 'child_process';
|
||||
import { EventEmitter } from 'events';
|
||||
import { DbSettings, MainDbNames } from './db.js';
|
||||
import { DeepPartial, FindOptionsWhere } from 'typeorm';
|
||||
import {
|
||||
ConnectOperation, DeleteOperation, RemoveOperation, FindOneOperation,
|
||||
FindOperation, UpdateOperation, CreateAndSaveOperation, StartTxOperation,
|
||||
EndTxOperation, QueryOptions, OperationResponse,
|
||||
IStorageOperation,
|
||||
IncrementOperation,
|
||||
DecrementOperation,
|
||||
SumOperation,
|
||||
WhereCondition
|
||||
} from './storageProcessor.js';
|
||||
import { PickKeysByType } from 'typeorm/common/PickKeysByType.js';
|
||||
|
||||
export type TX<T> = (txId: string) => Promise<T>
|
||||
|
||||
export class StorageInterface extends EventEmitter {
|
||||
private process: any;
|
||||
private isConnected: boolean = false;
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
this.initializeSubprocess();
|
||||
}
|
||||
|
||||
private initializeSubprocess() {
|
||||
this.process = fork('./build/src/services/storage/storageProcessor');
|
||||
|
||||
this.process.on('message', (response: OperationResponse<any>) => {
|
||||
this.emit(response.opId, response);
|
||||
});
|
||||
|
||||
this.process.on('error', (error: Error) => {
|
||||
console.error('Storage processor error:', error);
|
||||
this.isConnected = false;
|
||||
});
|
||||
|
||||
this.process.on('exit', (code: number) => {
|
||||
console.log(`Storage processor exited with code ${code}`);
|
||||
this.isConnected = false;
|
||||
});
|
||||
|
||||
this.isConnected = true;
|
||||
}
|
||||
|
||||
Connect(settings: DbSettings): Promise<number> {
|
||||
const opId = Math.random().toString()
|
||||
const connectOp: ConnectOperation = { type: 'connect', opId, settings }
|
||||
return this.handleOp<number>(connectOp)
|
||||
}
|
||||
|
||||
Delete<T>(entity: MainDbNames, q: number | FindOptionsWhere<T>, txId?: string): Promise<number> {
|
||||
const opId = Math.random().toString()
|
||||
const deleteOp: DeleteOperation<T> = { type: 'delete', entity, opId, q, txId }
|
||||
return this.handleOp<number>(deleteOp)
|
||||
}
|
||||
|
||||
Remove<T>(entity: MainDbNames, q: T, txId?: string): Promise<T> {
|
||||
const opId = Math.random().toString()
|
||||
const removeOp: RemoveOperation<T> = { type: 'remove', entity, opId, q, txId }
|
||||
return this.handleOp<T>(removeOp)
|
||||
}
|
||||
|
||||
FindOne<T>(entity: MainDbNames, q: QueryOptions<T>, txId?: string): Promise<T | null> {
|
||||
const opId = Math.random().toString()
|
||||
const findOp: FindOneOperation<T> = { type: 'findOne', entity, opId, q, txId }
|
||||
return this.handleOp<T | null>(findOp)
|
||||
}
|
||||
|
||||
Find<T>(entity: MainDbNames, q: QueryOptions<T>, txId?: string): Promise<T[]> {
|
||||
const opId = Math.random().toString()
|
||||
const findOp: FindOperation<T> = { type: 'find', entity, opId, q, txId }
|
||||
return this.handleOp<T[]>(findOp)
|
||||
}
|
||||
|
||||
Sum<T>(entity: MainDbNames, columnName: PickKeysByType<T, number>, q: WhereCondition<T>, txId?: string): Promise<number> {
|
||||
const opId = Math.random().toString()
|
||||
const sumOp: SumOperation<T> = { type: 'sum', entity, opId, columnName, q, txId }
|
||||
return this.handleOp<number>(sumOp)
|
||||
}
|
||||
|
||||
Update<T>(entity: MainDbNames, q: number | FindOptionsWhere<T>, toUpdate: DeepPartial<T>, txId?: string): Promise<number> {
|
||||
const opId = Math.random().toString()
|
||||
const updateOp: UpdateOperation<T> = { type: 'update', entity, opId, toUpdate, q, txId }
|
||||
return this.handleOp<number>(updateOp)
|
||||
}
|
||||
|
||||
Increment<T>(entity: MainDbNames, q: FindOptionsWhere<T>, propertyPath: string, value: number | string, txId?: string): Promise<number> {
|
||||
const opId = Math.random().toString()
|
||||
const incrementOp: IncrementOperation<T> = { type: 'increment', entity, opId, q, propertyPath, value, txId }
|
||||
return this.handleOp<number>(incrementOp)
|
||||
}
|
||||
|
||||
Decrement<T>(entity: MainDbNames, q: FindOptionsWhere<T>, propertyPath: string, value: number | string, txId?: string): Promise<number> {
|
||||
const opId = Math.random().toString()
|
||||
const decrementOp: DecrementOperation<T> = { type: 'decrement', entity, opId, q, propertyPath, value, txId }
|
||||
return this.handleOp<number>(decrementOp)
|
||||
}
|
||||
|
||||
CreateAndSave<T>(entity: MainDbNames, toSave: DeepPartial<T>, txId?: string): Promise<T> {
|
||||
const opId = Math.random().toString()
|
||||
const createAndSaveOp: CreateAndSaveOperation<T> = { type: 'createAndSave', entity, opId, toSave, txId }
|
||||
return this.handleOp<T>(createAndSaveOp)
|
||||
}
|
||||
|
||||
async StartTx(description?: string): Promise<string> {
|
||||
const opId = Math.random().toString()
|
||||
const startTxOp: StartTxOperation = { type: 'startTx', opId, description }
|
||||
await this.handleOp<void>(startTxOp)
|
||||
return opId
|
||||
}
|
||||
|
||||
async EndTx<T>(txId: string, success: boolean, data: T): Promise<T> {
|
||||
const opId = Math.random().toString()
|
||||
const endTxOp: EndTxOperation<T> = success ? { type: 'endTx', opId, txId, success, data } : { type: 'endTx', opId, txId, success }
|
||||
return this.handleOp<T>(endTxOp)
|
||||
}
|
||||
|
||||
async Tx<T>(exec: TX<T>, description?: string): Promise<T> {
|
||||
const txId = await this.StartTx()
|
||||
try {
|
||||
const res = await exec(txId)
|
||||
await this.EndTx(txId, true, res)
|
||||
return res
|
||||
} catch (err: any) {
|
||||
await this.EndTx(txId, false, err.message)
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
private handleOp<T>(op: IStorageOperation): Promise<T> {
|
||||
this.checkConnected()
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const responseHandler = (response: OperationResponse<T>) => {
|
||||
if (!response.success) {
|
||||
reject(new Error(response.error));
|
||||
return
|
||||
}
|
||||
if (response.type !== op.type) {
|
||||
reject(new Error('Invalid response type'));
|
||||
return
|
||||
}
|
||||
resolve(response.data);
|
||||
}
|
||||
this.once(op.opId, responseHandler)
|
||||
this.process.send(op)
|
||||
})
|
||||
}
|
||||
|
||||
private checkConnected() {
|
||||
if (!this.isConnected) {
|
||||
throw new Error('Storage processor is not connected');
|
||||
}
|
||||
}
|
||||
|
||||
public disconnect() {
|
||||
if (this.process) {
|
||||
this.process.kill();
|
||||
this.isConnected = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
394
src/services/storage/storageProcessor.ts
Normal file
394
src/services/storage/storageProcessor.ts
Normal file
|
|
@ -0,0 +1,394 @@
|
|||
import { DataSource, EntityManager, DeepPartial, FindOptionsWhere, FindOptionsOrder } from 'typeorm';
|
||||
import NewDB, { DbSettings, MainDbEntities, MainDbNames, newMetricsDb } from './db.js';
|
||||
import { PubLogger, getLogger } from '../helpers/logger.js';
|
||||
import { allMetricsMigrations, allMigrations } from './migrations/runner.js';
|
||||
import transactionsQueue from './transactionsQueue.js';
|
||||
import { PickKeysByType } from 'typeorm/common/PickKeysByType';
|
||||
|
||||
export type WhereCondition<T> = FindOptionsWhere<T> | FindOptionsWhere<T>[]
|
||||
export type QueryOptions<T> = {
|
||||
where?: WhereCondition<T>
|
||||
order?: FindOptionsOrder<T>
|
||||
take?: number
|
||||
skip?: number
|
||||
}
|
||||
export type ConnectOperation = {
|
||||
type: 'connect'
|
||||
opId: string
|
||||
settings: DbSettings
|
||||
}
|
||||
|
||||
export type StartTxOperation = {
|
||||
type: 'startTx'
|
||||
opId: string
|
||||
description?: string
|
||||
}
|
||||
|
||||
export type EndTxOperation<T> = {
|
||||
type: 'endTx'
|
||||
txId: string
|
||||
opId: string
|
||||
} & ({ success: true, data: T } | { success: false })
|
||||
|
||||
export type DeleteOperation<T> = {
|
||||
type: 'delete'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
q: number | FindOptionsWhere<T>
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type RemoveOperation<T> = {
|
||||
type: 'remove'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
q: T
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type UpdateOperation<T> = {
|
||||
type: 'update'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
toUpdate: DeepPartial<T>
|
||||
q: number | FindOptionsWhere<T>
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type IncrementOperation<T> = {
|
||||
type: 'increment'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
q: FindOptionsWhere<T>
|
||||
propertyPath: string,
|
||||
value: number | string
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type DecrementOperation<T> = {
|
||||
type: 'decrement'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
q: FindOptionsWhere<T>
|
||||
propertyPath: string,
|
||||
value: number | string
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type FindOneOperation<T> = {
|
||||
type: 'findOne'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
q: QueryOptions<T>
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type FindOperation<T> = {
|
||||
type: 'find'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
q: QueryOptions<T>
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type SumOperation<T> = {
|
||||
type: 'sum'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
columnName: PickKeysByType<T, number>
|
||||
q: WhereCondition<T>
|
||||
txId?: string
|
||||
}
|
||||
|
||||
export type CreateAndSaveOperation<T> = {
|
||||
type: 'createAndSave'
|
||||
entity: MainDbNames
|
||||
opId: string
|
||||
toSave: DeepPartial<T>
|
||||
txId?: string
|
||||
description?: string
|
||||
}
|
||||
|
||||
export type ErrorOperationResponse = { success: false, error: string, opId: string }
|
||||
|
||||
export interface IStorageOperation {
|
||||
opId: string
|
||||
type: string
|
||||
}
|
||||
|
||||
export type StorageOperation<T> = ConnectOperation | StartTxOperation | EndTxOperation<T> | DeleteOperation<T> | RemoveOperation<T> | UpdateOperation<T> |
|
||||
FindOneOperation<T> | FindOperation<T> | CreateAndSaveOperation<T> | IncrementOperation<T> | DecrementOperation<T> | SumOperation<T>
|
||||
|
||||
export type SuccessOperationResponse<T> = { success: true, type: string, data: T, opId: string }
|
||||
export type OperationResponse<T> = SuccessOperationResponse<T> | ErrorOperationResponse
|
||||
|
||||
type ActiveTransaction = {
|
||||
txId: string
|
||||
manager: EntityManager | DataSource
|
||||
resolve: (value: any) => void
|
||||
reject: (reason?: any) => void
|
||||
}
|
||||
|
||||
class StorageProcessor {
|
||||
private log: PubLogger = console.log
|
||||
private DB: DataSource
|
||||
private txQueue: transactionsQueue
|
||||
//private locked: boolean = false
|
||||
private activeTransaction: ActiveTransaction | null = null
|
||||
//private queue: StartTxOperation[] = []
|
||||
|
||||
constructor() {
|
||||
if (!process.send) {
|
||||
throw new Error('This process must be spawned as a child process');
|
||||
}
|
||||
this.log = getLogger({ component: 'StorageProcessor' })
|
||||
process.on('message', (operation: StorageOperation<any>) => {
|
||||
this.handleOperation(operation);
|
||||
});
|
||||
|
||||
process.on('error', (error: Error) => {
|
||||
console.error('Error in storage processor:', error);
|
||||
});
|
||||
}
|
||||
|
||||
private async handleOperation(operation: StorageOperation<any>) {
|
||||
try {
|
||||
const opId = operation.opId;
|
||||
switch (operation.type) {
|
||||
case 'connect':
|
||||
return this.handleConnect(operation);
|
||||
case 'startTx':
|
||||
return this.handleStartTx(operation);
|
||||
case 'endTx':
|
||||
return this.handleEndTx(operation);
|
||||
case 'delete':
|
||||
return this.handleDelete(operation);
|
||||
case 'remove':
|
||||
return this.handleRemove(operation);
|
||||
case 'update':
|
||||
return this.handleUpdate(operation);
|
||||
case 'increment':
|
||||
return this.handleIncrement(operation);
|
||||
case 'decrement':
|
||||
return this.handleDecrement(operation);
|
||||
case 'findOne':
|
||||
return this.handleFindOne(operation);
|
||||
case 'find':
|
||||
return this.handleFind(operation);
|
||||
case 'sum':
|
||||
return this.handleSum(operation);
|
||||
case 'createAndSave':
|
||||
return this.handleCreateAndSave(operation);
|
||||
default:
|
||||
this.sendResponse({
|
||||
success: false,
|
||||
error: `Unknown operation type: ${(operation as any).type}`,
|
||||
opId
|
||||
})
|
||||
return
|
||||
}
|
||||
} catch (error) {
|
||||
this.sendResponse({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Unknown error occurred',
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async handleConnect(operation: ConnectOperation) {
|
||||
const { source, executedMigrations } = await NewDB(operation.settings, allMigrations)
|
||||
this.DB = source
|
||||
this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB)
|
||||
if (executedMigrations.length > 0) {
|
||||
this.log(executedMigrations.length, "new migrations executed")
|
||||
this.log("-------------------")
|
||||
}
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'connect',
|
||||
data: executedMigrations.length,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private async handleStartTx(operation: StartTxOperation) {
|
||||
const res = await this.txQueue.PushToQueue({
|
||||
dbTx: false,
|
||||
description: operation.description || "startTx",
|
||||
exec: tx => new Promise((resolve, reject) => {
|
||||
this.activeTransaction = {
|
||||
txId: operation.opId,
|
||||
manager: tx,
|
||||
resolve,
|
||||
reject
|
||||
}
|
||||
})
|
||||
})
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'startTx',
|
||||
data: res,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleEndTx(operation: EndTxOperation<any>) {
|
||||
const activeTx = this.activeTransaction
|
||||
if (!activeTx || activeTx.txId !== operation.txId) {
|
||||
throw new Error('Transaction to end not found');
|
||||
}
|
||||
if (operation.success) {
|
||||
activeTx.resolve(true)
|
||||
} else {
|
||||
activeTx.reject(new Error('Transaction failed'))
|
||||
}
|
||||
this.activeTransaction = null
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'endTx',
|
||||
data: operation.success,
|
||||
opId: operation.opId
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private getManager(txId?: string): DataSource | EntityManager {
|
||||
if (txId) {
|
||||
if (!this.activeTransaction || this.activeTransaction.txId !== txId) {
|
||||
throw new Error('Transaction not found');
|
||||
}
|
||||
return this.activeTransaction.manager
|
||||
}
|
||||
return this.DB
|
||||
}
|
||||
|
||||
private async handleDelete(operation: DeleteOperation<any>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).delete(operation.q)
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'delete',
|
||||
data: res.affected || 0,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleRemove(operation: RemoveOperation<any>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).remove(operation.q)
|
||||
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'remove',
|
||||
data: res,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleUpdate(operation: UpdateOperation<any>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate)
|
||||
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'update',
|
||||
data: res.affected || 0,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleIncrement(operation: IncrementOperation<any>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value)
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'increment',
|
||||
data: res.affected || 0,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleDecrement(operation: DecrementOperation<any>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value)
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'decrement',
|
||||
data: res.affected || 0,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleFindOne(operation: FindOneOperation<any>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).findOne(operation.q)
|
||||
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'findOne',
|
||||
data: res,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleFind(operation: FindOperation<any>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).find(operation.q)
|
||||
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'find',
|
||||
data: res,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleSum(operation: SumOperation<object>) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = await manager.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q)
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'sum',
|
||||
data: res || 0,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async handleCreateAndSave(operation: CreateAndSaveOperation<any>) {
|
||||
const saved = await this.createAndSave(operation)
|
||||
|
||||
this.sendResponse({
|
||||
success: true,
|
||||
type: 'createAndSave',
|
||||
data: saved,
|
||||
opId: operation.opId
|
||||
});
|
||||
}
|
||||
|
||||
private async createAndSave(operation: CreateAndSaveOperation<any>) {
|
||||
if (operation.txId) {
|
||||
const manager = this.getManager(operation.txId);
|
||||
const res = manager.getRepository(MainDbEntities[operation.entity]).create(operation.toSave)
|
||||
return manager.getRepository(MainDbEntities[operation.entity]).save(res)
|
||||
}
|
||||
return this.txQueue.PushToQueue({
|
||||
dbTx: false,
|
||||
description: operation.description || "createAndSave",
|
||||
exec: async tx => {
|
||||
const res = tx.getRepository(MainDbEntities[operation.entity]).create(operation.toSave)
|
||||
return tx.getRepository(MainDbEntities[operation.entity]).save(res)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private sendResponse(response: OperationResponse<any>) {
|
||||
if (process.send) {
|
||||
process.send(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the storage processor
|
||||
new StorageProcessor();
|
||||
|
|
@ -1,8 +1,8 @@
|
|||
import { DataSource, EntityManager, EntityTarget } from "typeorm"
|
||||
import { PubLogger, getLogger } from "../helpers/logger.js"
|
||||
|
||||
export type TX<T> = (entityManager: EntityManager | DataSource) => Promise<T>
|
||||
export type TxOperation<T> = {
|
||||
type TX<T> = (entityManager: EntityManager | DataSource) => Promise<T>
|
||||
type TxOperation<T> = {
|
||||
exec: TX<T>
|
||||
dbTx: boolean
|
||||
description?: string
|
||||
|
|
|
|||
|
|
@ -1,74 +1,61 @@
|
|||
import crypto from 'crypto';
|
||||
import { DataSource, EntityManager } from "typeorm"
|
||||
import { User } from './entity/User.js';
|
||||
import { UserBasicAuth } from './entity/UserBasicAuth.js';
|
||||
import { getLogger } from '../helpers/logger.js';
|
||||
import TransactionsQueue from "./transactionsQueue.js";
|
||||
import EventsLogManager from './eventsLog.js';
|
||||
import { StorageInterface } from './storageInterface.js';
|
||||
export default class {
|
||||
DB: DataSource | EntityManager
|
||||
txQueue: TransactionsQueue
|
||||
dbs: StorageInterface
|
||||
eventsLog: EventsLogManager
|
||||
constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue, eventsLog: EventsLogManager) {
|
||||
this.DB = DB
|
||||
this.txQueue = txQueue
|
||||
constructor(dbs: StorageInterface, eventsLog: EventsLogManager) {
|
||||
this.dbs = dbs
|
||||
this.eventsLog = eventsLog
|
||||
}
|
||||
|
||||
async AddUser(balance: number, dbTx: DataSource | EntityManager): Promise<User> {
|
||||
async AddUser(balance: number, txId: string): Promise<User> {
|
||||
if (balance && process.env.ALLOW_BALANCE_MIGRATION !== 'true') {
|
||||
throw new Error("balance migration is not allowed")
|
||||
}
|
||||
getLogger({})("Adding user with balance", balance)
|
||||
const newUser = dbTx.getRepository(User).create({
|
||||
return this.dbs.CreateAndSave<User>('User', {
|
||||
user_id: crypto.randomBytes(32).toString('hex'),
|
||||
balance_sats: balance
|
||||
})
|
||||
return dbTx.getRepository(User).save(newUser)
|
||||
}, txId)
|
||||
}
|
||||
|
||||
async AddBasicUser(name: string, secret: string): Promise<UserBasicAuth> {
|
||||
return this.DB.transaction(async tx => {
|
||||
const user = await this.AddUser(0, tx)
|
||||
const newUserAuth = tx.getRepository(UserBasicAuth).create({
|
||||
user: user,
|
||||
name: name,
|
||||
secret_sha256: crypto.createHash('sha256').update(secret).digest('base64')
|
||||
/* async AddBasicUser(name: string, secret: string): Promise<UserBasicAuth> {
|
||||
return this.DB.transaction(async tx => {
|
||||
const user = await this.AddUser(0, tx)
|
||||
const newUserAuth = tx.getRepository(UserBasicAuth).create({
|
||||
user: user,
|
||||
name: name,
|
||||
secret_sha256: crypto.createHash('sha256').update(secret).digest('base64')
|
||||
})
|
||||
return tx.getRepository(UserBasicAuth).save(newUserAuth)
|
||||
})
|
||||
return tx.getRepository(UserBasicAuth).save(newUserAuth)
|
||||
})
|
||||
|
||||
} */
|
||||
FindUser(userId: string, txId?: string) {
|
||||
return this.dbs.FindOne<User>('User', { where: { user_id: userId } }, txId)
|
||||
}
|
||||
FindUser(userId: string, entityManager = this.DB) {
|
||||
return entityManager.getRepository(User).findOne({
|
||||
where: {
|
||||
user_id: userId
|
||||
}
|
||||
})
|
||||
}
|
||||
async GetUser(userId: string, entityManager = this.DB): Promise<User> {
|
||||
const user = await this.FindUser(userId, entityManager)
|
||||
async GetUser(userId: string, txId?: string): Promise<User> {
|
||||
const user = await this.FindUser(userId, txId)
|
||||
if (!user) {
|
||||
throw new Error(`user ${userId} not found`) // TODO: fix logs doxing
|
||||
}
|
||||
return user
|
||||
}
|
||||
|
||||
async UnbanUser(userId: string, entityManager = this.DB) {
|
||||
const res = await entityManager.getRepository(User).update({
|
||||
user_id: userId
|
||||
}, { locked: false })
|
||||
if (!res.affected) {
|
||||
async UnbanUser(userId: string, txId?: string) {
|
||||
const affected = await this.dbs.Update<User>('User', { user_id: userId }, { locked: false }, txId)
|
||||
if (!affected) {
|
||||
throw new Error("unaffected user unlock for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
}
|
||||
|
||||
async BanUser(userId: string, entityManager = this.DB) {
|
||||
const user = await this.GetUser(userId, entityManager)
|
||||
const res = await entityManager.getRepository(User).update({
|
||||
user_id: userId
|
||||
}, { balance_sats: 0, locked: true })
|
||||
if (!res.affected) {
|
||||
async BanUser(userId: string, txId?: string) {
|
||||
const user = await this.GetUser(userId, txId)
|
||||
const affected = await this.dbs.Update<User>('User', { user_id: userId }, { balance_sats: 0, locked: true }, txId)
|
||||
if (!affected) {
|
||||
throw new Error("unaffected ban user for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
if (user.balance_sats > 0) {
|
||||
|
|
@ -76,53 +63,45 @@ export default class {
|
|||
}
|
||||
return user
|
||||
}
|
||||
async IncrementUserBalance(userId: string, increment: number, reason: string, entityManager?: DataSource | EntityManager) {
|
||||
if (entityManager) {
|
||||
return this.IncrementUserBalanceInTx(userId, increment, reason, entityManager)
|
||||
async IncrementUserBalance(userId: string, increment: number, reason: string, txId?: string) {
|
||||
if (txId) {
|
||||
return this.IncrementUserBalanceInTx(userId, increment, reason, txId)
|
||||
}
|
||||
await this.txQueue.PushToQueue({
|
||||
dbTx: true,
|
||||
description: `incrementing user ${userId} balance by ${increment}`,
|
||||
exec: async tx => {
|
||||
await this.IncrementUserBalanceInTx(userId, increment, reason, tx)
|
||||
}
|
||||
})
|
||||
|
||||
await this.dbs.Tx(async tx => {
|
||||
await this.IncrementUserBalanceInTx(userId, increment, reason, tx)
|
||||
}, `incrementing user ${userId} balance by ${increment}`)
|
||||
}
|
||||
async IncrementUserBalanceInTx(userId: string, increment: number, reason: string, dbTx: DataSource | EntityManager) {
|
||||
const user = await this.GetUser(userId, dbTx)
|
||||
const res = await dbTx.getRepository(User).increment({
|
||||
user_id: userId,
|
||||
}, "balance_sats", increment)
|
||||
if (!res.affected) {
|
||||
|
||||
async IncrementUserBalanceInTx(userId: string, increment: number, reason: string, txId: string) {
|
||||
const user = await this.GetUser(userId, txId)
|
||||
const affected = await this.dbs.Increment<User>('User', { user_id: userId }, "balance_sats", increment, txId)
|
||||
if (!affected) {
|
||||
getLogger({ userId: userId, component: "balanceUpdates" })("user unaffected by increment")
|
||||
throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
getLogger({ userId: userId, component: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
|
||||
this.eventsLog.LogEvent({ type: 'balance_increment', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: increment })
|
||||
}
|
||||
async DecrementUserBalance(userId: string, decrement: number, reason: string, entityManager?: DataSource | EntityManager) {
|
||||
if (entityManager) {
|
||||
return this.DecrementUserBalanceInTx(userId, decrement, reason, entityManager)
|
||||
|
||||
async DecrementUserBalance(userId: string, decrement: number, reason: string, txId?: string) {
|
||||
if (txId) {
|
||||
return this.DecrementUserBalanceInTx(userId, decrement, reason, txId)
|
||||
}
|
||||
await this.txQueue.PushToQueue({
|
||||
dbTx: true,
|
||||
description: `decrementing user ${userId} balance by ${decrement}`,
|
||||
exec: async tx => {
|
||||
await this.DecrementUserBalanceInTx(userId, decrement, reason, tx)
|
||||
}
|
||||
})
|
||||
|
||||
await this.dbs.Tx(async tx => {
|
||||
await this.DecrementUserBalanceInTx(userId, decrement, reason, tx)
|
||||
}, `decrementing user ${userId} balance by ${decrement}`)
|
||||
}
|
||||
|
||||
async DecrementUserBalanceInTx(userId: string, decrement: number, reason: string, dbTx: DataSource | EntityManager) {
|
||||
const user = await this.GetUser(userId, dbTx)
|
||||
async DecrementUserBalanceInTx(userId: string, decrement: number, reason: string, txId: string) {
|
||||
const user = await this.GetUser(userId, txId)
|
||||
if (!user || user.balance_sats < decrement) {
|
||||
getLogger({ userId: userId, component: "balanceUpdates" })("not enough balance to decrement")
|
||||
throw new Error("not enough balance to decrement")
|
||||
}
|
||||
const res = await dbTx.getRepository(User).decrement({
|
||||
user_id: userId,
|
||||
}, "balance_sats", decrement)
|
||||
if (!res.affected) {
|
||||
const affected = await this.dbs.Decrement<User>('User', { user_id: userId }, "balance_sats", decrement, txId)
|
||||
if (!affected) {
|
||||
getLogger({ userId: userId, component: "balanceUpdates" })("user unaffected by decrement")
|
||||
throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing
|
||||
}
|
||||
|
|
@ -130,8 +109,8 @@ export default class {
|
|||
this.eventsLog.LogEvent({ type: 'balance_decrement', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: decrement })
|
||||
}
|
||||
|
||||
async UpdateUser(userId: string, update: Partial<User>, entityManager = this.DB) {
|
||||
const user = await this.GetUser(userId, entityManager)
|
||||
await entityManager.getRepository(User).update(user.serial_id, update)
|
||||
async UpdateUser(userId: string, update: Partial<User>, txId?: string) {
|
||||
const user = await this.GetUser(userId, txId)
|
||||
await this.dbs.Update<User>('User', user.serial_id, update, txId)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue