diff --git a/src/services/main/index.ts b/src/services/main/index.ts index 9a83cce2..91bfd677 100644 --- a/src/services/main/index.ts +++ b/src/services/main/index.ts @@ -136,8 +136,8 @@ export default class { log(ERROR, "an address was paid, that has no linked application") return } - const updateResult = await this.storage.paymentStorage.UpdateAddressReceivingTransaction(serialId, { confs: c.confs }, tx) - if (!updateResult.affected) { + const affected = await this.storage.paymentStorage.UpdateAddressReceivingTransaction(serialId, { confs: c.confs }, tx) + if (!affected) { throw new Error("unable to flag chain transaction as paid") } const addressData = `${userAddress.address}:${tx_hash}` diff --git a/src/services/main/init.ts b/src/services/main/init.ts index fca0a012..2389510b 100644 --- a/src/services/main/init.ts +++ b/src/services/main/init.ts @@ -2,7 +2,7 @@ import { PubLogger, getLogger } from "../helpers/logger.js" import { LiquidityProvider } from "./liquidityProvider.js" import { Unlocker } from "./unlocker.js" import Storage from "../storage/index.js" -import { TypeOrmMigrationRunner } from "../storage/migrations/runner.js" +/* import { TypeOrmMigrationRunner } from "../storage/migrations/runner.js" */ import Main from "./index.js" import SanityChecker from "./sanityChecker.js" import { LoadMainSettingsFromEnv, MainSettings } from "./settings.js" @@ -18,10 +18,11 @@ export type AppData = { export const initMainHandler = async (log: PubLogger, mainSettings: MainSettings) => { const utils = new Utils(mainSettings) const storageManager = new Storage(mainSettings.storageSettings) - const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) - if (manualMigration) { - return - } + await storageManager.Connect(log) + /* const manualMigration = await TypeOrmMigrationRunner(log, storageManager, mainSettings.storageSettings.dbSettings, process.argv[2]) + if (manualMigration) { + return + } */ const unlocker = new Unlocker(mainSettings, storageManager) await unlocker.Unlock() const adminManager = new AdminManager(mainSettings, storageManager) diff --git a/src/services/main/paymentManager.ts b/src/services/main/paymentManager.ts index 797bfe8c..2a1c1c34 100644 --- a/src/services/main/paymentManager.ts +++ b/src/services/main/paymentManager.ts @@ -89,27 +89,23 @@ export default class { if (state.paid_at_unix < 0) { const fullAmount = p.paid_amount + p.service_fees + p.routing_fees log("found a failed provider payment, refunding", fullAmount, "sats to user", p.user.user_id) - await this.storage.txQueue.PushToQueue({ - dbTx: true, description: "refund failed provider payment", exec: async tx => { - await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx) - await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx) - } - }) + await this.storage.StartTransaction(async tx => { + await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx) + await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx) + }, "refund failed provider payment") return } else if (state.paid_at_unix > 0) { log("provider payment succeeded", p.serial_id, "updating payment info") const routingFeeLimit = p.routing_fees const serviceFee = p.service_fees const actualFee = state.network_fee + state.service_fee - await this.storage.txQueue.PushToQueue({ - dbTx: true, description: "pending provider payment success after restart", exec: async tx => { - if (routingFeeLimit - actualFee > 0) { - this.log("refund pending provider payment routing fee", routingFeeLimit, actualFee, "sats") - await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx) - } - await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx) + await this.storage.StartTransaction(async tx => { + if (routingFeeLimit - actualFee > 0) { + this.log("refund pending provider payment routing fee", routingFeeLimit, actualFee, "sats") + await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx) } - }) + await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx) + }, "pending provider payment success after restart") if (p.linkedApplication && p.user.user_id !== p.linkedApplication.owner.user_id && serviceFee > 0) { await this.storage.userStorage.IncrementUserBalance(p.linkedApplication.owner.user_id, serviceFee, "fees") } @@ -140,15 +136,13 @@ export default class { const routingFeeLimit = p.routing_fees const serviceFee = p.service_fees const actualFee = Number(payment.feeSat) - await this.storage.txQueue.PushToQueue({ - dbTx: true, description: "pending payment success after restart", exec: async tx => { - if (routingFeeLimit - actualFee > 0) { - this.log("refund pending payment routing fee", routingFeeLimit, actualFee, "sats") - await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx) - } - await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx) + await this.storage.StartTransaction(async tx => { + if (routingFeeLimit - actualFee > 0) { + this.log("refund pending payment routing fee", routingFeeLimit, actualFee, "sats") + await this.storage.userStorage.IncrementUserBalance(p.user.user_id, routingFeeLimit - actualFee, "routing_fee_refund:" + p.invoice, tx) } - }) + await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, actualFee, p.service_fees, true, undefined, tx) + }, "pending payment success after restart") if (p.linkedApplication && p.user.user_id !== p.linkedApplication.owner.user_id && serviceFee > 0) { await this.storage.userStorage.IncrementUserBalance(p.linkedApplication.owner.user_id, serviceFee, "fees") } @@ -158,12 +152,11 @@ export default class { case Payment_PaymentStatus.FAILED: const fullAmount = p.paid_amount + p.service_fees + p.routing_fees log("found a failed pending payment, refunding", fullAmount, "sats to user", p.user.user_id) - await this.storage.txQueue.PushToQueue({ - dbTx: true, description: "refund failed pending payment", exec: async tx => { - await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx) - await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx) - } - }) + await this.storage.StartTransaction(async tx => { + await this.storage.userStorage.IncrementUserBalance(p.user.user_id, fullAmount, "payment_refund:" + p.invoice, tx) + await this.storage.paymentStorage.UpdateExternalPayment(p.serial_id, 0, 0, false, undefined, tx) + }, "refund failed pending payment") + return default: break; } @@ -319,12 +312,10 @@ export default class { const routingFeeLimit = this.lnd.GetFeeLimitAmount(payAmount) const use = await this.liquidityManager.beforeOutInvoicePayment(payAmount) const provider = use === 'provider' ? this.lnd.liquidProvider.GetProviderDestination() : undefined - const pendingPayment = await this.storage.txQueue.PushToQueue({ - dbTx: true, description: "payment started", exec: async tx => { - await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, invoice, tx) - return await this.storage.paymentStorage.AddPendingExternalPayment(userId, invoice, { payAmount, serviceFee, networkFee: routingFeeLimit }, linkedApplication, provider, tx, debitNpub) - } - }) + const pendingPayment = await this.storage.StartTransaction(async tx => { + await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, invoice, tx) + return await this.storage.paymentStorage.AddPendingExternalPayment(userId, invoice, { payAmount, serviceFee, networkFee: routingFeeLimit }, linkedApplication, provider, tx, debitNpub) + }, "payment started") this.log("ready to pay") try { const payment = await this.lnd.PayInvoice(invoice, amountForLnd, routingFeeLimit, payAmount, { useProvider: use === 'provider', from: 'user' }, index => { diff --git a/src/services/storage/applicationStorage.ts b/src/services/storage/applicationStorage.ts index a911e89a..9bd8c6ba 100644 --- a/src/services/storage/applicationStorage.ts +++ b/src/services/storage/applicationStorage.ts @@ -1,69 +1,59 @@ import crypto from 'crypto'; -import { Between, DataSource, EntityManager, FindOperator, IsNull, LessThanOrEqual, MoreThanOrEqual } from "typeorm" +import { Between, FindOperator, IsNull, LessThanOrEqual, MoreThanOrEqual } from "typeorm" import { generateSecretKey, getPublicKey } from 'nostr-tools'; import { Application } from "./entity/Application.js" import UserStorage from './userStorage.js'; import { ApplicationUser } from './entity/ApplicationUser.js'; import { getLogger } from '../helpers/logger.js'; -import TransactionsQueue, { TX } from "./transactionsQueue.js"; import { User } from './entity/User.js'; import { InviteToken } from './entity/InviteToken.js'; +import { StorageInterface } from './storageInterface.js'; export default class { - DB: DataSource | EntityManager + dbs: StorageInterface userStorage: UserStorage - txQueue: TransactionsQueue - constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) { - this.DB = DB + constructor(dbs: StorageInterface, userStorage: UserStorage) { + this.dbs = dbs this.userStorage = userStorage - this.txQueue = txQueue } async AddApplication(name: string, allowUserCreation: boolean): Promise { - return this.DB.transaction(async tx => { - const owner = await this.userStorage.AddUser(0, tx) - const repo = this.DB.getRepository(Application) - const newApplication = repo.create({ + return this.dbs.Tx(async txId => { + const owner = await this.userStorage.AddUser(0, txId) + return this.dbs.CreateAndSave('Application', { app_id: crypto.randomBytes(32).toString('hex'), name, owner, allow_user_creation: allowUserCreation - }) - return tx.getRepository(Application).save(newApplication) + }, txId) }) } - async GetApplicationByName(name: string, entityManager = this.DB) { - const found = await entityManager.getRepository(Application).findOne({ - where: { - name - } - }) + async GetApplicationByName(name: string, txId?: string) { + const found = await this.dbs.FindOne('Application', { where: { name } }, txId) if (!found) { throw new Error(`application ${name} not found`) } return found } - async GetApplications(entityManager = this.DB): Promise { - return entityManager.getRepository(Application).find() + async GetApplications(txId?: string): Promise { + return this.dbs.Find('Application', {}, txId) } - async GetApplication(appId: string, entityManager = this.DB): Promise { + async GetApplication(appId: string, txId?: string): Promise { if (!appId) { throw new Error("invalid app id provided") } - const found = await entityManager.getRepository(Application).findOne({ - where: { - app_id: appId - } - }) + + + const found = await this.dbs.FindOne('Application', { where: { app_id: appId } }, txId) if (!found) { throw new Error(`application ${appId} not found`) } return found } - async UpdateApplication(app: Application, update: Partial, entityManager = this.DB) { - await entityManager.getRepository(Application).update(app.serial_id, update) + async UpdateApplication(app: Application, update: Partial, txId?: string) { + await this.dbs.Update('Application', app.serial_id, update, txId) } async GenerateApplicationKeys(app: Application) { @@ -75,32 +65,27 @@ export default class { } async AddApplicationUser(application: Application, userIdentifier: string, balance: number, nostrPub?: string) { - return this.DB.transaction(async tx => { - const user = await this.userStorage.AddUser(balance, tx) - const repo = tx.getRepository(ApplicationUser) - const appUser = repo.create({ + return this.dbs.Tx(async txId => { + const user = await this.userStorage.AddUser(balance, txId) + return this.dbs.CreateAndSave('ApplicationUser', { user: user, application, identifier: userIdentifier, nostr_public_key: nostrPub - }) - return repo.save(appUser) + }, txId) }) } - async GetApplicationUserIfExists(application: Application, userIdentifier: string, entityManager = this.DB): Promise { - return entityManager.getRepository(ApplicationUser).findOne({ where: { identifier: userIdentifier, application: { serial_id: application.serial_id } } }) + async GetApplicationUserIfExists(application: Application, userIdentifier: string, txId?: string): Promise { + return this.dbs.FindOne('ApplicationUser', { where: { identifier: userIdentifier, application: { serial_id: application.serial_id } } }, txId) } - async GetOrCreateNostrAppUser(application: Application, nostrPub: string, entityManager = this.DB): Promise { + async GetOrCreateNostrAppUser(application: Application, nostrPub: string, txId?: string): Promise { if (!nostrPub) { throw new Error("no nostrPub provided") } - const user = await entityManager.getRepository(ApplicationUser).findOne({ where: { nostr_public_key: nostrPub } }) + const user = await this.dbs.FindOne('ApplicationUser', { where: { nostr_public_key: nostrPub } }, txId) if (user) { - //if (user.application.app_id !== application.app_id) { - // throw new Error("tried to access a user of application:" + user.application.app_id + "from application:" + application.app_id) - //} return user } if (!application.allow_user_creation) { @@ -109,20 +94,20 @@ export default class { return this.AddApplicationUser(application, crypto.randomBytes(32).toString('hex'), 0, nostrPub) } - async FindNostrAppUser(nostrPub: string, entityManager = this.DB) { - return entityManager.getRepository(ApplicationUser).findOne({ where: { nostr_public_key: nostrPub } }) + async FindNostrAppUser(nostrPub: string, txId?: string) { + return this.dbs.FindOne('ApplicationUser', { where: { nostr_public_key: nostrPub } }, txId) } - async GetOrCreateApplicationUser(application: Application, userIdentifier: string, balance: number, entityManager = this.DB): Promise<{ user: ApplicationUser, created: boolean }> { - const user = await this.GetApplicationUserIfExists(application, userIdentifier, entityManager) + async GetOrCreateApplicationUser(application: Application, userIdentifier: string, balance: number): Promise<{ user: ApplicationUser, created: boolean }> { + const user = await this.GetApplicationUserIfExists(application, userIdentifier) if (user) { return { user, created: false } } return { user: await this.AddApplicationUser(application, userIdentifier, balance), created: true } } - async GetApplicationUser(application: Application, userIdentifier: string, entityManager = this.DB): Promise { - const found = await this.GetApplicationUserIfExists(application, userIdentifier, entityManager) + async GetApplicationUser(application: Application, userIdentifier: string, txId?: string): Promise { + const found = await this.GetApplicationUserIfExists(application, userIdentifier, txId) if (!found) { getLogger({ appName: application.name })("user", userIdentifier, "not found", application.name) throw new Error(`application user not found`) @@ -134,7 +119,7 @@ export default class { return found } - async GetApplicationUsers(application: Application | null, { from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetApplicationUsers(application: Application | null, { from, to }: { from?: number, to?: number }, txId?: string) { const q = application ? { app_id: application.app_id } : IsNull() let time: { created_at?: FindOperator } = {} if (!!from && !!to) { @@ -144,61 +129,53 @@ export default class { } else if (!!to) { time.created_at = LessThanOrEqual(new Date(to * 1000)) } - return entityManager.getRepository(ApplicationUser).find({ where: { application: q, ...time } }) + return this.dbs.Find('ApplicationUser', { where: { application: q, ...time } }, txId) } - async GetAppUserFromUser(application: Application, userId: string, entityManager = this.DB): Promise { - return entityManager.getRepository(ApplicationUser).findOne({ where: { user: { user_id: userId }, application: { app_id: application.app_id } } }) + async GetAppUserFromUser(application: Application, userId: string, txId?: string): Promise { + return this.dbs.FindOne('ApplicationUser', { where: { user: { user_id: userId }, application: { app_id: application.app_id } } }, txId) } - async GetAllAppUsersFromUser(userId: string, entityManager = this.DB): Promise { - return entityManager.getRepository(ApplicationUser).find({ where: { user: { user_id: userId } } }) + async GetAllAppUsersFromUser(userId: string, txId?: string): Promise { + return this.dbs.Find('ApplicationUser', { where: { user: { user_id: userId } } }, txId) } - async IsApplicationOwner(userId: string, entityManager = this.DB) { - return entityManager.getRepository(Application).findOne({ where: { owner: { user_id: userId } } }) + async IsApplicationOwner(userId: string, txId?: string) { + return this.dbs.FindOne('Application', { where: { owner: { user_id: userId } } }, txId) } - async AddNPubToApplicationUser(serialId: number, nPub: string, entityManager = this.DB) { - return entityManager.getRepository(ApplicationUser).update(serialId, { nostr_public_key: nPub }) + async AddNPubToApplicationUser(serialId: number, nPub: string, txId?: string) { + return this.dbs.Update('ApplicationUser', serialId, { nostr_public_key: nPub }, txId) } - async UpdateUserCallbackUrl(application: Application, userIdentifier: string, callbackUrl: string, entityManager = this.DB) { - return entityManager.getRepository(ApplicationUser).update({ application: { app_id: application.app_id }, identifier: userIdentifier }, { callback_url: callbackUrl }) + async UpdateUserCallbackUrl(application: Application, userIdentifier: string, callbackUrl: string, txId?: string) { + return this.dbs.Update('ApplicationUser', { application: { app_id: application.app_id }, identifier: userIdentifier }, { callback_url: callbackUrl }, txId) } - async RemoveApplicationUserAndBaseUser(appUser: ApplicationUser, entityManager = this.DB) { + async RemoveApplicationUserAndBaseUser(appUser: ApplicationUser, txId?: string) { const baseUser = appUser.user; - await entityManager.getRepository(ApplicationUser).remove(appUser); - await entityManager.getRepository(User).remove(baseUser); + this.dbs.Remove('ApplicationUser', appUser, txId) + this.dbs.Remove('User', baseUser, txId) } async AddInviteToken(app: Application, sats?: number) { - return this.txQueue.PushToQueue({ - dbTx: false, - exec: async tx => { - const inviteRepo = tx.getRepository(InviteToken); - const newInviteToken = inviteRepo.create({ - inviteToken: crypto.randomBytes(32).toString('hex'), - used: false, - sats: sats, - application: app - }); - - return inviteRepo.save(newInviteToken) - } + return this.dbs.CreateAndSave('InviteToken', { + inviteToken: crypto.randomBytes(32).toString('hex'), + used: false, + sats: sats, + application: app }) } async FindInviteToken(token: string) { - return this.DB.getRepository(InviteToken).findOne({ where: { inviteToken: token } }) + return this.dbs.FindOne('InviteToken', { where: { inviteToken: token } }) } async SetInviteTokenAsUsed(inviteToken: InviteToken) { - return this.DB.getRepository(InviteToken).update(inviteToken, { used: true }); + return this.dbs.Update('InviteToken', inviteToken, { used: true }) } } \ No newline at end of file diff --git a/src/services/storage/db.ts b/src/services/storage/db.ts index d3af00a6..39b67ac1 100644 --- a/src/services/storage/db.ts +++ b/src/services/storage/db.ts @@ -39,11 +39,51 @@ export const LoadDbSettingsFromEnv = (): DbSettings => { } } +/* const MainDbEntitiesNames = ['User', 'UserReceivingInvoice', 'UserReceivingAddress', 'AddressReceivingTransaction', 'UserInvoicePayment', 'UserTransactionPayment', + 'UserBasicAuth', 'UserEphemeralKey', 'Product', 'UserToUserPayment', 'Application', 'ApplicationUser', 'UserToUserPayment', 'LspOrder', 'LndNodeInfo', 'TrackedProvider', + 'InviteToken', 'DebitAccess', 'UserOffer'] as const +type MainDbEntitiesName = typeof MainDbEntitiesNames[number] + +const MetricsDbEntitiesNames = ['BalanceEvent', 'ChannelBalanceEvent', 'ChannelRouting', 'RootOperation'] as const +type MetricsDbEntitiesName = typeof MetricsDbEntitiesNames[number] */ + +export const MainDbEntities = { + 'AddressReceivingTransaction': AddressReceivingTransaction, + 'Application': Application, + 'ApplicationUser': ApplicationUser, + 'User': User, + 'UserReceivingAddress': UserReceivingAddress, + 'UserReceivingInvoice': UserReceivingInvoice, + 'UserInvoicePayment': UserInvoicePayment, + 'UserTransactionPayment': UserTransactionPayment, + 'UserBasicAuth': UserBasicAuth, + 'UserEphemeralKey': UserEphemeralKey, + 'UserToUserPayment': UserToUserPayment, + 'LspOrder': LspOrder, + 'LndNodeInfo': LndNodeInfo, + 'TrackedProvider': TrackedProvider, + 'InviteToken': InviteToken, + 'DebitAccess': DebitAccess, + 'UserOffer': UserOffer, + 'Product': Product +} +export type MainDbNames = keyof typeof MainDbEntities +export const MainDbEntitiesNames = Object.keys(MainDbEntities) + +const MetricsDbEntities = { + 'BalanceEvent': BalanceEvent, + 'ChannelBalanceEvent': ChannelBalanceEvent, + 'ChannelRouting': ChannelRouting, + 'RootOperation': RootOperation +} +export type MetricsDbNames = keyof typeof MetricsDbEntities +export const MetricsDbEntitiesNames = Object.keys(MetricsDbEntities) + export const newMetricsDb = async (settings: DbSettings, metricsMigrations: Function[]): Promise<{ source: DataSource, executedMigrations: Migration[] }> => { const source = await new DataSource({ type: "sqlite", database: settings.metricsDatabaseFile, - entities: [BalanceEvent, ChannelBalanceEvent, ChannelRouting, RootOperation], + entities: Object.values(MetricsDbEntities), migrations: metricsMigrations }).initialize(); const log = getLogger({}); @@ -62,10 +102,7 @@ export default async (settings: DbSettings, migrations: Function[]): Promise<{ s type: "sqlite", database: settings.databaseFile, // logging: true, - entities: [User, UserReceivingInvoice, UserReceivingAddress, AddressReceivingTransaction, UserInvoicePayment, UserTransactionPayment, - UserBasicAuth, UserEphemeralKey, Product, UserToUserPayment, Application, ApplicationUser, UserToUserPayment, LspOrder, LndNodeInfo, TrackedProvider, - InviteToken, DebitAccess, UserOffer - ], + entities: Object.values(MainDbEntities), //synchronize: true, migrations }).initialize() @@ -79,7 +116,7 @@ export default async (settings: DbSettings, migrations: Function[]): Promise<{ s return { source, executedMigrations: [] } } -export const runFakeMigration = async (databaseFile: string, migrations: Function[]) => { +/* export const runFakeMigration = async (databaseFile: string, migrations: Function[]) => { const source = await new DataSource({ type: "sqlite", database: databaseFile, @@ -90,4 +127,4 @@ export const runFakeMigration = async (databaseFile: string, migrations: Functio migrations }).initialize() return source.runMigrations({ fake: true }) -} \ No newline at end of file +} */ \ No newline at end of file diff --git a/src/services/storage/debitStorage.ts b/src/services/storage/debitStorage.ts index fcee0c20..30fff55b 100644 --- a/src/services/storage/debitStorage.ts +++ b/src/services/storage/debitStorage.ts @@ -1,47 +1,42 @@ -import { DataSource, EntityManager } from "typeorm" -import UserStorage from './userStorage.js'; -import TransactionsQueue from "./transactionsQueue.js"; import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js"; +import { StorageInterface } from "./storageInterface.js"; type AccessToAdd = { npub: string rules?: DebitAccessRules authorize: boolean } export default class { - DB: DataSource | EntityManager - txQueue: TransactionsQueue - constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { - this.DB = DB - this.txQueue = txQueue + dbs: StorageInterface + constructor(dbs: StorageInterface) { + this.dbs = dbs } - async AddDebitAccess(appUserId: string, access: AccessToAdd, entityManager = this.DB) { - const entry = entityManager.getRepository(DebitAccess).create({ + async AddDebitAccess(appUserId: string, access: AccessToAdd) { + return this.dbs.CreateAndSave('DebitAccess', { app_user_id: appUserId, npub: access.npub, authorized: access.authorize, rules: access.rules, }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(DebitAccess).save(entry), dbTx: false }) } - async GetAllUserDebitAccess(appUserId: string) { - return this.DB.getRepository(DebitAccess).find({ where: { app_user_id: appUserId } }) + async GetAllUserDebitAccess(appUserId: string, txId?: string) { + return this.dbs.Find('DebitAccess', { where: { app_user_id: appUserId } }, txId) } - async GetDebitAccess(appUserId: string, authorizedPub: string) { - return this.DB.getRepository(DebitAccess).findOne({ where: { app_user_id: appUserId, npub: authorizedPub } }) + async GetDebitAccess(appUserId: string, authorizedPub: string, txId?: string) { + return this.dbs.FindOne('DebitAccess', { where: { app_user_id: appUserId, npub: authorizedPub } }, txId) } - async IncrementDebitAccess(appUserId: string, authorizedPub: string, amount: number) { - return this.DB.getRepository(DebitAccess).increment({ app_user_id: appUserId, npub: authorizedPub }, 'total_debits', amount) + async IncrementDebitAccess(appUserId: string, authorizedPub: string, amount: number, txId?: string) { + return this.dbs.Increment('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, 'total_debits', amount, txId) } - async UpdateDebitAccess(appUserId: string, authorizedPub: string, authorized: boolean) { - return this.DB.getRepository(DebitAccess).update({ app_user_id: appUserId, npub: authorizedPub }, { authorized }) + async UpdateDebitAccess(appUserId: string, authorizedPub: string, authorized: boolean, txId?: string) { + return this.dbs.Update('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, { authorized }, txId) } - async UpdateDebitAccessRules(appUserId: string, authorizedPub: string, rules?: DebitAccessRules) { - return this.DB.getRepository(DebitAccess).update({ app_user_id: appUserId, npub: authorizedPub }, { rules: rules || null }) + async UpdateDebitAccessRules(appUserId: string, authorizedPub: string, rules?: DebitAccessRules, txId?: string) { + return this.dbs.Update('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, { rules: rules || null }, txId) } async DenyDebitAccess(appUserId: string, pub: string) { @@ -52,7 +47,7 @@ export default class { await this.UpdateDebitAccess(appUserId, pub, false) } - async RemoveDebitAccess(appUserId: string, authorizedPub: string) { - return this.DB.getRepository(DebitAccess).delete({ app_user_id: appUserId, npub: authorizedPub }) + async RemoveDebitAccess(appUserId: string, authorizedPub: string, txId?: string) { + return this.dbs.Delete('DebitAccess', { app_user_id: appUserId, npub: authorizedPub }, txId) } } \ No newline at end of file diff --git a/src/services/storage/index.ts b/src/services/storage/index.ts index 5f94a546..842c8b66 100644 --- a/src/services/storage/index.ts +++ b/src/services/storage/index.ts @@ -1,4 +1,3 @@ -import { DataSource, EntityManager } from "typeorm" import fs from 'fs' import NewDB, { DbSettings, LoadDbSettingsFromEnv } from "./db.js" import ProductStorage from './productStorage.js' @@ -7,11 +6,13 @@ import UserStorage from "./userStorage.js"; import PaymentStorage from "./paymentStorage.js"; import MetricsStorage from "./metricsStorage.js"; import MetricsEventStorage from "./metricsEventStorage.js"; -import TransactionsQueue, { TX } from "./transactionsQueue.js"; import EventsLogManager from "./eventsLog.js"; import { LiquidityStorage } from "./liquidityStorage.js"; import DebitStorage from "./debitStorage.js" import OfferStorage from "./offerStorage.js" +import { StorageInterface, TX } from "./storageInterface.js"; +import { allMetricsMigrations, allMigrations } from "./migrations/runner.js" +import { PubLogger } from "../helpers/logger.js" export type StorageSettings = { dbSettings: DbSettings eventLogPath: string @@ -21,9 +22,10 @@ export const LoadStorageSettingsFromEnv = (): StorageSettings => { return { dbSettings: LoadDbSettingsFromEnv(), eventLogPath: "logs/eventLogV3.csv", dataDir: process.env.DATA_DIR || "" } } export default class { - DB: DataSource | EntityManager + //DB: DataSource | EntityManager settings: StorageSettings - txQueue: TransactionsQueue + //txQueue: TransactionsQueue + dbs: StorageInterface productStorage: ProductStorage applicationStorage: ApplicationStorage userStorage: UserStorage @@ -38,25 +40,35 @@ export default class { this.settings = settings this.eventsLog = new EventsLogManager(settings.eventLogPath) } - async Connect(migrations: Function[], metricsMigrations: Function[]) { - const { source, executedMigrations } = await NewDB(this.settings.dbSettings, migrations) - this.DB = source - this.txQueue = new TransactionsQueue("main", this.DB) - this.userStorage = new UserStorage(this.DB, this.txQueue, this.eventsLog) - this.productStorage = new ProductStorage(this.DB, this.txQueue) - this.applicationStorage = new ApplicationStorage(this.DB, this.userStorage, this.txQueue) - this.paymentStorage = new PaymentStorage(this.DB, this.userStorage, this.txQueue) + async Connect(log: PubLogger) { + this.dbs = new StorageInterface() + await this.dbs.Connect(this.settings.dbSettings) + //const { source, executedMigrations } = await NewDB(this.settings.dbSettings, allMigrations) + //this.DB = source + //this.txQueue = new TransactionsQueue("main", this.DB) + this.userStorage = new UserStorage(this.dbs, this.eventsLog) + this.productStorage = new ProductStorage(this.dbs) + this.applicationStorage = new ApplicationStorage(this.dbs, this.userStorage) + this.paymentStorage = new PaymentStorage(this.dbs, this.userStorage) this.metricsStorage = new MetricsStorage(this.settings) this.metricsEventStorage = new MetricsEventStorage(this.settings) - this.liquidityStorage = new LiquidityStorage(this.DB, this.txQueue) - this.debitStorage = new DebitStorage(this.DB, this.txQueue) - this.offerStorage = new OfferStorage(this.DB, this.txQueue) + this.liquidityStorage = new LiquidityStorage(this.dbs) + this.debitStorage = new DebitStorage(this.dbs) + this.offerStorage = new OfferStorage(this.dbs) try { if (this.settings.dataDir) fs.mkdirSync(this.settings.dataDir) } catch (e) { } - const executedMetricsMigrations = await this.metricsStorage.Connect(metricsMigrations) - return { executedMigrations, executedMetricsMigrations }; + const executedMetricsMigrations = await this.metricsStorage.Connect(allMetricsMigrations) + /* if (executedMigrations.length > 0) { + log(executedMigrations.length, "new migrations executed") + log("-------------------") + + } */ + if (executedMetricsMigrations.length > 0) { + log(executedMetricsMigrations.length, "new metrics migrations executed") + log("-------------------") + } } StartTransaction(exec: TX, description?: string) { - return this.txQueue.PushToQueue({ exec, dbTx: true, description }) + return this.dbs.Tx(tx => exec(tx), description) } } \ No newline at end of file diff --git a/src/services/storage/liquidityStorage.ts b/src/services/storage/liquidityStorage.ts index f4de48d4..2c405a41 100644 --- a/src/services/storage/liquidityStorage.ts +++ b/src/services/storage/liquidityStorage.ts @@ -1,72 +1,66 @@ -import { DataSource, EntityManager, IsNull, MoreThan, Not } from "typeorm" +import { IsNull, MoreThan, Not } from "typeorm" import { LspOrder } from "./entity/LspOrder.js"; -import TransactionsQueue, { TX } from "./transactionsQueue.js"; import { LndNodeInfo } from "./entity/LndNodeInfo.js"; import { TrackedProvider } from "./entity/TrackedProvider.js"; +import { StorageInterface } from "./storageInterface.js"; export class LiquidityStorage { - DB: DataSource | EntityManager - txQueue: TransactionsQueue - constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { - this.DB = DB - this.txQueue = txQueue + dbs: StorageInterface + constructor(dbs: StorageInterface) { + this.dbs = dbs } GetLatestLspOrder() { - return this.DB.getRepository(LspOrder).findOne({ where: { serial_id: MoreThan(0) }, order: { serial_id: "DESC" } }) + return this.dbs.FindOne('LspOrder', { where: { serial_id: MoreThan(0) }, order: { serial_id: "DESC" } }) } SaveLspOrder(order: Partial) { - const entry = this.DB.getRepository(LspOrder).create(order) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(LspOrder).save(entry), dbTx: false }) + return this.dbs.CreateAndSave('LspOrder', order) } async GetNoodeSeed(pubkey: string) { - return this.DB.getRepository(LndNodeInfo).findOne({ where: { pubkey, seed: Not(IsNull()) } }) + return this.dbs.FindOne('LndNodeInfo', { where: { pubkey, seed: Not(IsNull()) } }) } async SaveNodeSeed(pubkey: string, seed: string) { - const existing = await this.DB.getRepository(LndNodeInfo).findOne({ where: { pubkey } }) + const existing = await this.dbs.FindOne('LndNodeInfo', { where: { pubkey } }) if (existing) { throw new Error("A seed already exists for this pub key") } - const entry = this.DB.getRepository(LndNodeInfo).create({ pubkey, seed }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(LndNodeInfo).save(entry), dbTx: false }) + return this.dbs.CreateAndSave('LndNodeInfo', { pubkey, seed }) } async SaveNodeBackup(pubkey: string, backup: string) { - const existing = await this.DB.getRepository(LndNodeInfo).findOne({ where: { pubkey } }) + const existing = await this.dbs.FindOne('LndNodeInfo', { where: { pubkey } }) if (existing) { - await this.DB.getRepository(LndNodeInfo).update(existing.serial_id, { backup }) + await this.dbs.Update('LndNodeInfo', existing.serial_id, { backup }) return } - const entry = this.DB.getRepository(LndNodeInfo).create({ pubkey, backup }) - await this.txQueue.PushToQueue({ exec: async db => db.getRepository(LndNodeInfo).save(entry), dbTx: false }) + return this.dbs.CreateAndSave('LndNodeInfo', { pubkey, backup }) } async GetTrackedProviders() { - return this.DB.getRepository(TrackedProvider).find({}) + return this.dbs.Find('TrackedProvider', {}) } async GetTrackedProvider(providerType: 'lnd' | 'lnPub', pub: string) { - return this.DB.getRepository(TrackedProvider).findOne({ where: { provider_pubkey: pub, provider_type: providerType } }) + return this.dbs.FindOne('TrackedProvider', { where: { provider_pubkey: pub, provider_type: providerType } }) } async CreateTrackedProvider(providerType: 'lnd' | 'lnPub', pub: string, latestBalance = 0) { - const entry = this.DB.getRepository(TrackedProvider).create({ provider_pubkey: pub, provider_type: providerType, latest_balance: latestBalance }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(TrackedProvider).save(entry), dbTx: false }) + return this.dbs.CreateAndSave('TrackedProvider', { provider_pubkey: pub, provider_type: providerType, latest_balance: latestBalance }) } async UpdateTrackedProviderBalance(providerType: 'lnd' | 'lnPub', pub: string, latestBalance: number) { console.log("updating tracked balance:", latestBalance) - return this.DB.getRepository(TrackedProvider).update({ provider_pubkey: pub, provider_type: providerType }, { latest_balance: latestBalance }) + return this.dbs.Update('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, { latest_balance: latestBalance }) } async IncrementTrackedProviderBalance(providerType: 'lnd' | 'lnPub', pub: string, amount: number) { if (amount < 0) { - return this.DB.getRepository(TrackedProvider).increment({ provider_pubkey: pub, provider_type: providerType }, "latest_balance", amount) + return this.dbs.Increment('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, "latest_balance", amount) } else { - return this.DB.getRepository(TrackedProvider).decrement({ provider_pubkey: pub, provider_type: providerType }, "latest_balance", -amount) + return this.dbs.Decrement('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, "latest_balance", -amount) } } async UpdateTrackedProviderDisruption(providerType: 'lnd' | 'lnPub', pub: string, latestDisruptionAtUnix: number) { - return this.DB.getRepository(TrackedProvider).update({ provider_pubkey: pub, provider_type: providerType }, { latest_distruption_at_unix: latestDisruptionAtUnix }) + return this.dbs.Update('TrackedProvider', { provider_pubkey: pub, provider_type: providerType }, { latest_distruption_at_unix: latestDisruptionAtUnix }) } } \ No newline at end of file diff --git a/src/services/storage/metricsStorage.ts b/src/services/storage/metricsStorage.ts index 16e6e72d..839ef357 100644 --- a/src/services/storage/metricsStorage.ts +++ b/src/services/storage/metricsStorage.ts @@ -1,14 +1,12 @@ import { Between, DataSource, EntityManager, FindManyOptions, FindOperator, LessThanOrEqual, MoreThanOrEqual } from "typeorm" import { BalanceEvent } from "./entity/BalanceEvent.js" import { ChannelBalanceEvent } from "./entity/ChannelsBalanceEvent.js" -import TransactionsQueue, { TX } from "./transactionsQueue.js"; +import TransactionsQueue from "./transactionsQueue.js"; import { StorageSettings } from "./index.js"; import { newMetricsDb } from "./db.js"; import { ChannelRouting } from "./entity/ChannelRouting.js"; import { RootOperation } from "./entity/RootOperation.js"; export default class { - - DB: DataSource | EntityManager settings: StorageSettings txQueue: TransactionsQueue diff --git a/src/services/storage/migrations/runner.ts b/src/services/storage/migrations/runner.ts index 70e5d905..06be66d2 100644 --- a/src/services/storage/migrations/runner.ts +++ b/src/services/storage/migrations/runner.ts @@ -1,6 +1,3 @@ -import { PubLogger } from '../../helpers/logger.js' -import { DbSettings, runFakeMigration } from '../db.js' -import Storage, { StorageSettings } from '../index.js' import { Initial1703170309875 } from './1703170309875-initial.js' import { LndMetrics1703170330183 } from './1703170330183-lnd_metrics.js' import { ChannelRouting1709316653538 } from './1709316653538-channel_routing.js' @@ -18,26 +15,21 @@ import { DebitToPub1727105758354 } from './1727105758354-debit_to_pub.js' import { UserCbUrl1727112281043 } from './1727112281043-user_cb_url.js' import { RootOps1732566440447 } from './1732566440447-root_ops.js' import { UserOffer1733502626042 } from './1733502626042-user_offer.js' -const allMigrations = [Initial1703170309875, LspOrder1718387847693, LiquidityProvider1719335699480, LndNodeInfo1720187506189, TrackedProvider1720814323679, CreateInviteTokenTable1721751414878, PaymentIndex1721760297610, DebitAccess1726496225078, DebitAccessFixes1726685229264, DebitToPub1727105758354, UserCbUrl1727112281043, UserOffer1733502626042] -const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, RootOps1732566440447] -export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise => { - if (arg === 'fake_initial_migration') { - runFakeMigration(settings.databaseFile, [Initial1703170309875]) - return true - } +export const allMigrations = [Initial1703170309875, LspOrder1718387847693, LiquidityProvider1719335699480, LndNodeInfo1720187506189, TrackedProvider1720814323679, CreateInviteTokenTable1721751414878, PaymentIndex1721760297610, DebitAccess1726496225078, DebitAccessFixes1726685229264, DebitToPub1727105758354, UserCbUrl1727112281043, UserOffer1733502626042] +export const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538, HtlcCount1724266887195, BalanceEvents1724860966825, RootOps1732566440447] +/* export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise => { await connectAndMigrate(log, storageManager, allMigrations, allMetricsMigrations) return false } const connectAndMigrate = async (log: PubLogger, storageManager: Storage, migrations: Function[], metricsMigrations: Function[]) => { const { executedMigrations, executedMetricsMigrations } = await storageManager.Connect(migrations, metricsMigrations) - if (migrations.length > 0) { - log(executedMigrations.length, "of", migrations.length, "migrations were executed correctly") - log(executedMigrations) + if (executedMigrations.length > 0) { + log(executedMigrations.length, "new migrations executed") log("-------------------") - } if (metricsMigrations.length > 0) { - log(executedMetricsMigrations.length, "of", metricsMigrations.length, "metrics migrations were executed correctly") - log(executedMetricsMigrations) + } if (executedMetricsMigrations.length > 0) { + log(executedMetricsMigrations.length, "new metrics migrations executed") + log("-------------------") } -} \ No newline at end of file +} */ \ No newline at end of file diff --git a/src/services/storage/offerStorage.ts b/src/services/storage/offerStorage.ts index 41af3ca6..41712d92 100644 --- a/src/services/storage/offerStorage.ts +++ b/src/services/storage/offerStorage.ts @@ -1,49 +1,43 @@ -import { DataSource, EntityManager } from "typeorm" import crypto from 'crypto'; -import UserStorage from './userStorage.js'; -import TransactionsQueue from "./transactionsQueue.js"; -import { DebitAccess, DebitAccessRules } from "./entity/DebitAccess.js"; import { UserOffer } from "./entity/UserOffer.js"; +import { StorageInterface } from "./storageInterface.js"; export default class { - DB: DataSource | EntityManager - txQueue: TransactionsQueue - constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { - this.DB = DB - this.txQueue = txQueue + dbs: StorageInterface + constructor(dbs: StorageInterface) { + this.dbs = dbs } async AddDefaultUserOffer(appUserId: string): Promise { - const newUserOffer = this.DB.getRepository(UserOffer).create({ + return this.dbs.CreateAndSave('UserOffer', { app_user_id: appUserId, offer_id: appUserId, label: 'Default NIP-69 Offer', }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserOffer).save(newUserOffer), dbTx: false, description: `add default offer for ${appUserId}` }) } async AddUserOffer(appUserId: string, req: Partial): Promise { - const newUserOffer = this.DB.getRepository(UserOffer).create({ + const offer = await this.dbs.CreateAndSave('UserOffer', { ...req, app_user_id: appUserId, offer_id: crypto.randomBytes(34).toString('hex') }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserOffer).save(newUserOffer), dbTx: false, description: `add offer for ${appUserId}: ${req.label} ` }) + return offer } - async DeleteUserOffer(appUserId: string, offerId: string, entityManager = this.DB) { - await entityManager.getRepository(UserOffer).delete({ app_user_id: appUserId, offer_id: offerId }) + async DeleteUserOffer(appUserId: string, offerId: string, txId?: string) { + await this.dbs.Delete('UserOffer', { app_user_id: appUserId, offer_id: offerId }, txId) } - async UpdateUserOffer(app_user_id: string, offerId: string, req: Partial) { - return this.DB.getRepository(UserOffer).update({ app_user_id, offer_id: offerId }, req) + async UpdateUserOffer(app_user_id: string, offerId: string, req: Partial, txId?: string) { + return this.dbs.Update('UserOffer', { app_user_id, offer_id: offerId }, req, txId) } async GetUserOffers(app_user_id: string): Promise { - return this.DB.getRepository(UserOffer).find({ where: { app_user_id } }) + return this.dbs.Find('UserOffer', { where: { app_user_id } }) } async GetUserOffer(app_user_id: string, offer_id: string): Promise { - return this.DB.getRepository(UserOffer).findOne({ where: { app_user_id, offer_id } }) + return this.dbs.FindOne('UserOffer', { where: { app_user_id, offer_id } }) } async GetOffer(offer_id: string): Promise { - return this.DB.getRepository(UserOffer).findOne({ where: { offer_id } }) + return this.dbs.FindOne('UserOffer', { where: { offer_id } }) } } \ No newline at end of file diff --git a/src/services/storage/paymentStorage.ts b/src/services/storage/paymentStorage.ts index 2964b085..821b049b 100644 --- a/src/services/storage/paymentStorage.ts +++ b/src/services/storage/paymentStorage.ts @@ -1,5 +1,5 @@ import crypto from 'crypto'; -import { Between, DataSource, EntityManager, FindOperator, IsNull, LessThanOrEqual, MoreThan, MoreThanOrEqual, Not } from "typeorm" +import { Between, FindOperator, IsNull, LessThanOrEqual, MoreThan, MoreThanOrEqual, Not } from "typeorm" import { User } from './entity/User.js'; import { UserTransactionPayment } from './entity/UserTransactionPayment.js'; import { EphemeralKeyType, UserEphemeralKey } from './entity/UserEphemeralKey.js'; @@ -13,20 +13,19 @@ import { UserToUserPayment } from './entity/UserToUserPayment.js'; import { Application } from './entity/Application.js'; import TransactionsQueue from "./transactionsQueue.js"; import { LoggedEvent } from './eventsLog.js'; +import { StorageInterface } from './storageInterface.js'; export type InboundOptionals = { product?: Product, callbackUrl?: string, expiry: number, expectedPayer?: User, linkedApplication?: Application, zapInfo?: ZapInfo, offerId?: string, payerData?: Record } export const defaultInvoiceExpiry = 60 * 60 export default class { - DB: DataSource | EntityManager + dbs: StorageInterface userStorage: UserStorage - txQueue: TransactionsQueue - constructor(DB: DataSource | EntityManager, userStorage: UserStorage, txQueue: TransactionsQueue) { - this.DB = DB + constructor(dbs: StorageInterface, userStorage: UserStorage) { + this.dbs = dbs this.userStorage = userStorage - this.txQueue = txQueue } - async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, dbTx: EntityManager | DataSource) { - const newAddressTransaction = dbTx.getRepository(AddressReceivingTransaction).create({ + async AddAddressReceivingTransaction(address: UserReceivingAddress, txHash: string, outputIndex: number, amount: number, serviceFee: number, internal: boolean, height: number, txId: string) { + return this.dbs.CreateAndSave('AddressReceivingTransaction', { user_address: address, tx_hash: txHash, output_index: outputIndex, @@ -36,12 +35,11 @@ export default class { internal, broadcast_height: height, confs: internal ? 10 : 0 - }) - return dbTx.getRepository(AddressReceivingTransaction).save(newAddressTransaction) + }, txId) } - GetUserReceivingTransactions(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { - return entityManager.getRepository(AddressReceivingTransaction).find({ + GetUserReceivingTransactions(userId: string, fromIndex: number, take = 50, txId?: string): Promise { + return this.dbs.Find('AddressReceivingTransaction', { where: { user_address: { user: { user_id: userId } }, serial_id: MoreThanOrEqual(fromIndex), @@ -51,33 +49,32 @@ export default class { paid_at_unix: 'DESC' }, take - }) + }, txId) } - async GetExistingUserAddress(userId: string, linkedApplication: Application, entityManager = this.DB) { - return entityManager.getRepository(UserReceivingAddress).findOne({ where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }) + async GetExistingUserAddress(userId: string, linkedApplication: Application, txId?: string) { + return this.dbs.FindOne('UserReceivingAddress', { where: { user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }, txId) } - async AddUserAddress(user: User, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}): Promise { - const newUserAddress = this.DB.getRepository(UserReceivingAddress).create({ + async AddUserAddress(user: User, address: string, opts: { callbackUrl?: string, linkedApplication?: Application } = {}, txId?: string): Promise { + return this.dbs.CreateAndSave('UserReceivingAddress', { address, callbackUrl: opts.callbackUrl || "", linkedApplication: opts.linkedApplication, user - }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${user.user_id} linked to ${opts.linkedApplication?.app_id}: ${address} ` }) + }, txId) } - async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, dbTx: EntityManager | DataSource) { + async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, txId: string) { const i: Partial = { paid_at_unix: Math.floor(Date.now() / 1000), paid_amount: amount, service_fee: serviceFee, internal } if (!internal) { i.paidByLnd = true } - return dbTx.getRepository(UserReceivingInvoice).update(invoice.serial_id, i) + return this.dbs.Update('UserReceivingInvoice', invoice.serial_id, i, txId) } - GetUserInvoicesFlaggedAsPaid(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { - return entityManager.getRepository(UserReceivingInvoice).find({ + GetUserInvoicesFlaggedAsPaid(userId: string, fromIndex: number, take = 50, txId?: string): Promise { + return this.dbs.Find('UserReceivingInvoice', { where: { user: { user_id: userId @@ -89,11 +86,11 @@ export default class { paid_at_unix: 'DESC' }, take - }) + }, txId) } - async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }, providerDestination?: string): Promise { - const newUserInvoice = this.DB.getRepository(UserReceivingInvoice).create({ + async AddUserInvoice(user: User, invoice: string, options: InboundOptionals = { expiry: defaultInvoiceExpiry }, providerDestination?: string, txId?: string): Promise { + return this.dbs.CreateAndSave('UserReceivingInvoice', { invoice: invoice, callbackUrl: options.callbackUrl, user: user, @@ -105,60 +102,34 @@ export default class { liquidityProvider: providerDestination, offer_id: options.offerId, payer_data: options.payerData, - }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false, description: `add invoice for ${user.user_id} linked to ${options.linkedApplication?.app_id}: ${invoice} ` }) + }, txId) } - async GetAddressOwner(address: string, entityManager = this.DB): Promise { - return entityManager.getRepository(UserReceivingAddress).findOne({ - where: { - address - } - }) + async GetAddressOwner(address: string, txId?: string): Promise { + return this.dbs.FindOne('UserReceivingAddress', { where: { address } }, txId) } - async GetAddressReceivingTransactionOwner(address: string, txHash: string, entityManager = this.DB): Promise { - return entityManager.getRepository(AddressReceivingTransaction).findOne({ - where: { - user_address: { address }, - tx_hash: txHash - } - }) + async GetAddressReceivingTransactionOwner(address: string, txHash: string, txId?: string): Promise { + return this.dbs.FindOne('AddressReceivingTransaction', { where: { user_address: { address }, tx_hash: txHash } }, txId) } - async GetUserTransactionPaymentOwner(address: string, txHash: string, entityManager = this.DB): Promise { - return entityManager.getRepository(UserTransactionPayment).findOne({ - where: { - address, - tx_hash: txHash - } - }) + async GetUserTransactionPaymentOwner(address: string, txHash: string, txId?: string): Promise { + return this.dbs.FindOne('UserTransactionPayment', { where: { address, tx_hash: txHash } }, txId) } - async GetInvoiceOwner(paymentRequest: string, entityManager = this.DB): Promise { - return entityManager.getRepository(UserReceivingInvoice).findOne({ - where: { - invoice: paymentRequest - } - }) + async GetInvoiceOwner(paymentRequest: string, txId?: string): Promise { + return this.dbs.FindOne('UserReceivingInvoice', { where: { invoice: paymentRequest } }, txId) } - async GetPaymentOwner(paymentRequest: string, entityManager = this.DB): Promise { - return entityManager.getRepository(UserInvoicePayment).findOne({ - where: { - invoice: paymentRequest - } - }) + async GetPaymentOwner(paymentRequest: string, txId?: string): Promise { + return this.dbs.FindOne('UserInvoicePayment', { where: { invoice: paymentRequest } }, txId) } - async GetUser2UserPayment(serialId: number, entityManager = this.DB): Promise { - return entityManager.getRepository(UserToUserPayment).findOne({ - where: { - serial_id: serialId - } - }) + async GetUser2UserPayment(serialId: number, txId?: string): Promise { + return this.dbs.FindOne('UserToUserPayment', { where: { serial_id: serialId } }, txId) } - async AddPendingExternalPayment(userId: string, invoice: string, amounts: { payAmount: number, serviceFee: number, networkFee: number }, linkedApplication: Application, liquidityProvider: string | undefined, dbTx: DataSource | EntityManager, debitNpub?: string): Promise { - const newPayment = dbTx.getRepository(UserInvoicePayment).create({ - user: await this.userStorage.GetUser(userId, dbTx), + async AddPendingExternalPayment(userId: string, invoice: string, amounts: { payAmount: number, serviceFee: number, networkFee: number }, linkedApplication: Application, liquidityProvider: string | undefined, txId: string, debitNpub?: string): Promise { + const user = await this.userStorage.GetUser(userId) + return this.dbs.CreateAndSave('UserInvoicePayment', { + user, paid_amount: amounts.payAmount, invoice, routing_fees: amounts.networkFee, @@ -168,18 +139,17 @@ export default class { linkedApplication, liquidityProvider, debit_to_pub: debitNpub - }) - return dbTx.getRepository(UserInvoicePayment).save(newPayment) + }, txId) } - async GetMaxPaymentIndex(entityManager = this.DB) { - return entityManager.getRepository(UserInvoicePayment).find({ order: { paymentIndex: 'DESC' }, take: 1 }) + async GetMaxPaymentIndex(txId?: string) { + return this.dbs.Find('UserInvoicePayment', { order: { paymentIndex: 'DESC' }, take: 1 }, txId) } - async SetExternalPaymentIndex(invoicePaymentSerialId: number, index: number, entityManager = this.DB) { - return entityManager.getRepository(UserInvoicePayment).update(invoicePaymentSerialId, { paymentIndex: index }) + async SetExternalPaymentIndex(invoicePaymentSerialId: number, index: number, txId?: string) { + return this.dbs.Update('UserInvoicePayment', invoicePaymentSerialId, { paymentIndex: index }, txId) } - async UpdateExternalPayment(invoicePaymentSerialId: number, routingFees: number, serviceFees: number, success: boolean, providerDestination?: string, entityManager = this.DB) { + async UpdateExternalPayment(invoicePaymentSerialId: number, routingFees: number, serviceFees: number, success: boolean, providerDestination?: string, txId?: string) { const up: Partial = { routing_fees: routingFees, service_fees: serviceFees, @@ -188,11 +158,12 @@ export default class { if (providerDestination) { up.liquidityProvider = providerDestination } - return entityManager.getRepository(UserInvoicePayment).update(invoicePaymentSerialId, up) + return this.dbs.Update('UserInvoicePayment', invoicePaymentSerialId, up, txId) } async AddInternalPayment(userId: string, invoice: string, amount: number, serviceFees: number, linkedApplication: Application, debitNpub?: string): Promise { - const newPayment = this.DB.getRepository(UserInvoicePayment).create({ + const user = await this.userStorage.GetUser(userId) + return this.dbs.CreateAndSave('UserInvoicePayment', { user: await this.userStorage.GetUser(userId), paid_amount: amount, invoice, @@ -203,11 +174,10 @@ export default class { linkedApplication, debit_to_pub: debitNpub }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add internal invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` }) } - GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { - return entityManager.getRepository(UserInvoicePayment).find({ + GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, txId?: string): Promise { + return this.dbs.Find('UserInvoicePayment', { where: { user: { user_id: userId @@ -219,10 +189,10 @@ export default class { paid_at_unix: 'DESC' }, take - }) + }, txId) } - GetUserDebitPayments(userId: string, sinceUnix: number, debitToNpub: string, entityManager = this.DB): Promise { + GetUserDebitPayments(userId: string, sinceUnix: number, debitToNpub: string, txId?: string): Promise { const pending = { user: { user_id: userId }, debit_to_pub: debitToNpub, @@ -233,16 +203,12 @@ export default class { debit_to_pub: debitToNpub, paid_at_unix: MoreThan(sinceUnix), } - return entityManager.getRepository(UserInvoicePayment).find({ - where: [pending, paid], - order: { - paid_at_unix: 'DESC' - } - }) + return this.dbs.Find('UserInvoicePayment', { where: [pending, paid], order: { paid_at_unix: 'DESC' } }, txId) } async AddUserTransactionPayment(userId: string, address: string, txHash: string, txOutput: number, amount: number, chainFees: number, serviceFees: number, internal: boolean, height: number, linkedApplication: Application): Promise { - const newTx = this.DB.getRepository(UserTransactionPayment).create({ + const user = await this.userStorage.GetUser(userId) + return this.dbs.CreateAndSave('UserTransactionPayment', { user: await this.userStorage.GetUser(userId), address, paid_amount: amount, @@ -256,11 +222,10 @@ export default class { confs: internal ? 10 : 0, linkedApplication }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false, description: `add tx payment for ${userId} linked to ${linkedApplication.app_id}: ${address}, amt: ${amount} ` }) } - GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise { - return entityManager.getRepository(UserTransactionPayment).find({ + GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, txId?: string): Promise { + return this.dbs.Find('UserTransactionPayment', { where: { user: { user_id: userId @@ -272,70 +237,64 @@ export default class { paid_at_unix: 'DESC' }, take - }) + }, txId) } - async GetPendingTransactions(entityManager = this.DB) { - const incoming = await entityManager.getRepository(AddressReceivingTransaction).find({ where: { confs: 0 } }) - const outgoing = await entityManager.getRepository(UserTransactionPayment).find({ where: { confs: 0 } }) + async GetPendingTransactions(txId?: string) { + const incoming = await this.dbs.Find('AddressReceivingTransaction', { where: { confs: 0 } }, txId) + const outgoing = await this.dbs.Find('UserTransactionPayment', { where: { confs: 0 } }, txId) return { incoming, outgoing } } - async UpdateAddressReceivingTransaction(serialId: number, update: Partial, entityManager = this.DB) { - return entityManager.getRepository(AddressReceivingTransaction).update(serialId, update) + async UpdateAddressReceivingTransaction(serialId: number, update: Partial, txId?: string) { + return this.dbs.Update('AddressReceivingTransaction', serialId, update, txId) } - async UpdateUserTransactionPayment(serialId: number, update: Partial, entityManager = this.DB) { - await entityManager.getRepository(UserTransactionPayment).update(serialId, update) + async UpdateUserTransactionPayment(serialId: number, update: Partial, txId?: string) { + return this.dbs.Update('UserTransactionPayment', serialId, update, txId) } async AddUserEphemeralKey(userId: string, keyType: EphemeralKeyType, linkedApplication: Application): Promise { - const found = await this.DB.getRepository(UserEphemeralKey).findOne({ where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }) + const found = await this.dbs.FindOne('UserEphemeralKey', { where: { type: keyType, user: { user_id: userId }, linkedApplication: { app_id: linkedApplication.app_id } } }) if (found) { return found } - const newKey = this.DB.getRepository(UserEphemeralKey).create({ + + return this.dbs.CreateAndSave('UserEphemeralKey', { user: await this.userStorage.GetUser(userId), key: crypto.randomBytes(31).toString('hex'), type: keyType, linkedApplication }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(UserEphemeralKey).save(newKey), dbTx: false }) } - async UseUserEphemeralKey(key: string, keyType: EphemeralKeyType, persist = false, entityManager = this.DB): Promise { - const found = await entityManager.getRepository(UserEphemeralKey).findOne({ - where: { - key: key, - type: keyType - } - }) + async UseUserEphemeralKey(key: string, keyType: EphemeralKeyType, persist = false, txId?: string): Promise { + const found = await this.dbs.FindOne('UserEphemeralKey', { where: { key: key, type: keyType } }) if (!found) { throw new Error("the provided ephemeral key is invalid") } if (!persist) { - await entityManager.getRepository(UserEphemeralKey).delete(found.serial_id) + await this.dbs.Delete('UserEphemeralKey', found.serial_id, txId) } return found } - async AddPendingUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, dbTx: DataSource | EntityManager) { - const entry = dbTx.getRepository(UserToUserPayment).create({ - from_user: await this.userStorage.GetUser(fromUserId, dbTx), - to_user: await this.userStorage.GetUser(toUserId, dbTx), + async AddPendingUserToUserPayment(fromUserId: string, toUserId: string, amount: number, fee: number, linkedApplication: Application, txId: string) { + return this.dbs.CreateAndSave('UserToUserPayment', { + from_user: await this.userStorage.GetUser(fromUserId, txId), + to_user: await this.userStorage.GetUser(toUserId, txId), paid_at_unix: 0, paid_amount: amount, service_fees: fee, linkedApplication - }) - return dbTx.getRepository(UserToUserPayment).save(entry) + }, txId) } - async SetPendingUserToUserPaymentAsPaid(serialId: number, dbTx: DataSource | EntityManager) { - dbTx.getRepository(UserToUserPayment).update(serialId, { paid_at_unix: Math.floor(Date.now() / 1000) }) + async SetPendingUserToUserPaymentAsPaid(serialId: number, txId: string) { + return this.dbs.Update('UserToUserPayment', serialId, { paid_at_unix: Math.floor(Date.now() / 1000) }, txId) } - GetUserToUserReceivedPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB) { - return entityManager.getRepository(UserToUserPayment).find({ + GetUserToUserReceivedPayments(userId: string, fromIndex: number, take = 50, txId?: string) { + return this.dbs.Find('UserToUserPayment', { where: { to_user: { user_id: userId @@ -347,11 +306,12 @@ export default class { paid_at_unix: 'DESC' }, take - }) + }, txId) } - GetUserToUserSentPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB) { - return entityManager.getRepository(UserToUserPayment).find({ + GetUserToUserSentPayments(userId: string, fromIndex: number, take = 50, txId?: string) { + + return this.dbs.Find('UserToUserPayment', { where: { from_user: { user_id: userId @@ -363,19 +323,19 @@ export default class { paid_at_unix: 'DESC' }, take - }) + }, txId) } - async GetTotalFeesPaidInApp(app: Application | null, entityManager = this.DB) { + async GetTotalFeesPaidInApp(app: Application | null, txId?: string) { if (!app) { return 0 } const entries = await Promise.all([ - entityManager.getRepository(UserReceivingInvoice).sum("service_fee", { linkedApplication: { app_id: app.app_id } }), - entityManager.getRepository(AddressReceivingTransaction).sum("service_fee", { user_address: { linkedApplication: { app_id: app.app_id } } }), - entityManager.getRepository(UserInvoicePayment).sum("service_fees", { linkedApplication: { app_id: app.app_id } }), - entityManager.getRepository(UserTransactionPayment).sum("service_fees", { linkedApplication: { app_id: app.app_id } }), - entityManager.getRepository(UserToUserPayment).sum("service_fees", { linkedApplication: { app_id: app.app_id } }) + this.dbs.Sum('UserReceivingInvoice', "service_fee", { linkedApplication: { app_id: app.app_id } }, txId), + this.dbs.Sum('AddressReceivingTransaction', "service_fee", { user_address: { linkedApplication: { app_id: app.app_id } } }, txId), + this.dbs.Sum('UserInvoicePayment', "service_fees", { linkedApplication: { app_id: app.app_id } }, txId), + this.dbs.Sum('UserTransactionPayment', "service_fees", { linkedApplication: { app_id: app.app_id } }, txId), + this.dbs.Sum('UserToUserPayment', "service_fees", { linkedApplication: { app_id: app.app_id } }, txId) ]) let total = 0 entries.forEach(e => { @@ -386,7 +346,7 @@ export default class { return total } - async GetAppOperations(application: Application | null, { from, to }: { from?: number, to?: number }, entityManager = this.DB) { + async GetAppOperations(application: Application | null, { from, to }: { from?: number, to?: number }) { const q = application ? { app_id: application.app_id } : IsNull() let time: { created_at?: FindOperator } = {} if (!!from && !!to) { @@ -398,13 +358,14 @@ export default class { } const [receivingInvoices, receivingAddresses, outgoingInvoices, outgoingTransactions, userToUser] = await Promise.all([ - entityManager.getRepository(UserReceivingInvoice).find({ where: { linkedApplication: q, ...time } }), - entityManager.getRepository(UserReceivingAddress).find({ where: { linkedApplication: q, ...time } }), - entityManager.getRepository(UserInvoicePayment).find({ where: { linkedApplication: q, ...time } }), - entityManager.getRepository(UserTransactionPayment).find({ where: { linkedApplication: q, ...time } }), - entityManager.getRepository(UserToUserPayment).find({ where: { linkedApplication: q, ...time } }) + this.dbs.Find('UserReceivingInvoice', { where: { linkedApplication: q, ...time } }), + this.dbs.Find('UserReceivingAddress', { where: { linkedApplication: q, ...time } }), + this.dbs.Find('UserInvoicePayment', { where: { linkedApplication: q, ...time } }), + this.dbs.Find('UserTransactionPayment', { where: { linkedApplication: q, ...time } }), + this.dbs.Find('UserToUserPayment', { where: { linkedApplication: q, ...time } }) ]) - const receivingTransactions = await Promise.all(receivingAddresses.map(addr => entityManager.getRepository(AddressReceivingTransaction).find({ where: { user_address: { serial_id: addr.serial_id }, ...time } }))) + const receivingTransactions = await Promise.all(receivingAddresses.map(addr => + this.dbs.Find('AddressReceivingTransaction', { where: { user_address: { serial_id: addr.serial_id }, ...time } }))) return { receivingInvoices, receivingAddresses, receivingTransactions, outgoingInvoices, outgoingTransactions, @@ -412,11 +373,11 @@ export default class { } } - async UserHasOutgoingOperation(userId: string, entityManager = this.DB) { + async UserHasOutgoingOperation(userId: string) { const [i, tx, u2u] = await Promise.all([ - entityManager.getRepository(UserInvoicePayment).findOne({ where: { user: { user_id: userId } } }), - entityManager.getRepository(UserTransactionPayment).findOne({ where: { user: { user_id: userId } } }), - entityManager.getRepository(UserToUserPayment).findOne({ where: { from_user: { user_id: userId } } }), + this.dbs.FindOne('UserInvoicePayment', { where: { user: { user_id: userId } } }), + this.dbs.FindOne('UserTransactionPayment', { where: { user: { user_id: userId } } }), + this.dbs.FindOne('UserToUserPayment', { where: { from_user: { user_id: userId } } }), ]) return !!i || !!tx || !!u2u } @@ -424,42 +385,50 @@ export default class { async VerifyDbEvent(e: LoggedEvent) { switch (e.type) { case "new_invoice": - return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } }) + return orFail(this.dbs.FindOne('UserReceivingInvoice', { where: { invoice: e.data, user: { user_id: e.userId } } })) case 'new_address': - return this.DB.getRepository(UserReceivingAddress).findOneOrFail({ where: { address: e.data, user: { user_id: e.userId } } }) + return orFail(this.dbs.FindOne('UserReceivingAddress', { where: { address: e.data, user: { user_id: e.userId } } })) case 'invoice_paid': - return this.DB.getRepository(UserReceivingInvoice).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId }, paid_at_unix: MoreThan(0) } }) + return orFail(this.dbs.FindOne('UserReceivingInvoice', { where: { invoice: e.data, user: { user_id: e.userId }, paid_at_unix: MoreThan(0) } })) case 'invoice_payment': - return this.DB.getRepository(UserInvoicePayment).findOneOrFail({ where: { invoice: e.data, user: { user_id: e.userId } } }) + return orFail(this.dbs.FindOne('UserInvoicePayment', { where: { invoice: e.data, user: { user_id: e.userId } } })) case 'address_paid': const [receivingAddress, receivedHash] = e.data.split(":") - return this.DB.getRepository(AddressReceivingTransaction).findOneOrFail({ where: { user_address: { address: receivingAddress }, tx_hash: receivedHash, confs: MoreThan(0) } }) + return orFail(this.dbs.FindOne('AddressReceivingTransaction', { where: { user_address: { address: receivingAddress }, tx_hash: receivedHash, confs: MoreThan(0) } })) case 'address_payment': const [sentAddress, sentHash] = e.data.split(":") - return this.DB.getRepository(UserTransactionPayment).findOneOrFail({ where: { address: sentAddress, tx_hash: sentHash, user: { user_id: e.userId } } }) + return orFail(this.dbs.FindOne('UserTransactionPayment', { where: { address: sentAddress, tx_hash: sentHash, user: { user_id: e.userId } } })) case 'u2u_receiver': - return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { from_user: { user_id: e.data }, to_user: { user_id: e.userId } } }) + return orFail(this.dbs.FindOne('UserToUserPayment', { where: { from_user: { user_id: e.data }, to_user: { user_id: e.userId } } })) case 'u2u_sender': - return this.DB.getRepository(UserToUserPayment).findOneOrFail({ where: { to_user: { user_id: e.data }, from_user: { user_id: e.userId } } }) + return orFail(this.dbs.FindOne('UserToUserPayment', { where: { to_user: { user_id: e.data }, from_user: { user_id: e.userId } } })) default: break; } } - async GetTotalUsersBalance(entityManager = this.DB) { - const total = await entityManager.getRepository(User).sum("balance_sats") + async GetTotalUsersBalance(txId?: string) { + const total = await this.dbs.Sum('User', "balance_sats", {}) return total || 0 } - async GetPendingPayments(entityManager = this.DB) { - return entityManager.getRepository(UserInvoicePayment).find({ where: { paid_at_unix: 0 } }) + async GetPendingPayments(txId?: string) { + return this.dbs.Find('UserInvoicePayment', { where: { paid_at_unix: 0 } }) } - async GetOfferInvoices(offerId: string, includeUnpaid: boolean, entityManager = this.DB) { + async GetOfferInvoices(offerId: string, includeUnpaid: boolean, txId?: string) { const where: { offer_id: string, paid_at_unix?: FindOperator } = { offer_id: offerId } if (!includeUnpaid) { where.paid_at_unix = MoreThan(0) } - return entityManager.getRepository(UserReceivingInvoice).find({ where }) + return this.dbs.Find('UserReceivingInvoice', { where }) } +} + +const orFail = async (resultPromise: Promise) => { + const result = await resultPromise + if (!result) { + throw new Error("the requested value was not found") + } + return result } \ No newline at end of file diff --git a/src/services/storage/productStorage.ts b/src/services/storage/productStorage.ts index 10635afa..c93be22a 100644 --- a/src/services/storage/productStorage.ts +++ b/src/services/storage/productStorage.ts @@ -1,23 +1,19 @@ -import { DataSource, EntityManager } from "typeorm" import { Product } from "./entity/Product.js" import { User } from "./entity/User.js" -import TransactionsQueue, { TX } from "./transactionsQueue.js"; +import { StorageInterface } from "./storageInterface.js"; export default class { - DB: DataSource | EntityManager - txQueue: TransactionsQueue - constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue) { - this.DB = DB - this.txQueue = txQueue + dbs: StorageInterface + constructor(dbs: StorageInterface) { + this.dbs = dbs } async AddProduct(name: string, priceSats: number, user: User): Promise { - const newProduct = this.DB.getRepository(Product).create({ + return this.dbs.CreateAndSave('Product', { name: name, price_sats: priceSats, owner: user }) - return this.txQueue.PushToQueue({ exec: async db => db.getRepository(Product).save(newProduct), dbTx: false }) } - async GetProduct(id: string, entityManager = this.DB): Promise { - const product = await entityManager.getRepository(Product).findOne({ where: { product_id: id } }) + async GetProduct(id: string, txId?: string): Promise { + const product = await this.dbs.FindOne('Product', { where: { product_id: id } }, txId) if (!product) { throw new Error("product not found") } diff --git a/src/services/storage/storageInterface.ts b/src/services/storage/storageInterface.ts new file mode 100644 index 00000000..818ef89e --- /dev/null +++ b/src/services/storage/storageInterface.ts @@ -0,0 +1,164 @@ +import { fork } from 'child_process'; +import { EventEmitter } from 'events'; +import { DbSettings, MainDbNames } from './db.js'; +import { DeepPartial, FindOptionsWhere } from 'typeorm'; +import { + ConnectOperation, DeleteOperation, RemoveOperation, FindOneOperation, + FindOperation, UpdateOperation, CreateAndSaveOperation, StartTxOperation, + EndTxOperation, QueryOptions, OperationResponse, + IStorageOperation, + IncrementOperation, + DecrementOperation, + SumOperation, + WhereCondition +} from './storageProcessor.js'; +import { PickKeysByType } from 'typeorm/common/PickKeysByType.js'; + +export type TX = (txId: string) => Promise + +export class StorageInterface extends EventEmitter { + private process: any; + private isConnected: boolean = false; + + constructor() { + super(); + this.initializeSubprocess(); + } + + private initializeSubprocess() { + this.process = fork('./build/src/services/storage/storageProcessor'); + + this.process.on('message', (response: OperationResponse) => { + this.emit(response.opId, response); + }); + + this.process.on('error', (error: Error) => { + console.error('Storage processor error:', error); + this.isConnected = false; + }); + + this.process.on('exit', (code: number) => { + console.log(`Storage processor exited with code ${code}`); + this.isConnected = false; + }); + + this.isConnected = true; + } + + Connect(settings: DbSettings): Promise { + const opId = Math.random().toString() + const connectOp: ConnectOperation = { type: 'connect', opId, settings } + return this.handleOp(connectOp) + } + + Delete(entity: MainDbNames, q: number | FindOptionsWhere, txId?: string): Promise { + const opId = Math.random().toString() + const deleteOp: DeleteOperation = { type: 'delete', entity, opId, q, txId } + return this.handleOp(deleteOp) + } + + Remove(entity: MainDbNames, q: T, txId?: string): Promise { + const opId = Math.random().toString() + const removeOp: RemoveOperation = { type: 'remove', entity, opId, q, txId } + return this.handleOp(removeOp) + } + + FindOne(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + const opId = Math.random().toString() + const findOp: FindOneOperation = { type: 'findOne', entity, opId, q, txId } + return this.handleOp(findOp) + } + + Find(entity: MainDbNames, q: QueryOptions, txId?: string): Promise { + const opId = Math.random().toString() + const findOp: FindOperation = { type: 'find', entity, opId, q, txId } + return this.handleOp(findOp) + } + + Sum(entity: MainDbNames, columnName: PickKeysByType, q: WhereCondition, txId?: string): Promise { + const opId = Math.random().toString() + const sumOp: SumOperation = { type: 'sum', entity, opId, columnName, q, txId } + return this.handleOp(sumOp) + } + + Update(entity: MainDbNames, q: number | FindOptionsWhere, toUpdate: DeepPartial, txId?: string): Promise { + const opId = Math.random().toString() + const updateOp: UpdateOperation = { type: 'update', entity, opId, toUpdate, q, txId } + return this.handleOp(updateOp) + } + + Increment(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + const opId = Math.random().toString() + const incrementOp: IncrementOperation = { type: 'increment', entity, opId, q, propertyPath, value, txId } + return this.handleOp(incrementOp) + } + + Decrement(entity: MainDbNames, q: FindOptionsWhere, propertyPath: string, value: number | string, txId?: string): Promise { + const opId = Math.random().toString() + const decrementOp: DecrementOperation = { type: 'decrement', entity, opId, q, propertyPath, value, txId } + return this.handleOp(decrementOp) + } + + CreateAndSave(entity: MainDbNames, toSave: DeepPartial, txId?: string): Promise { + const opId = Math.random().toString() + const createAndSaveOp: CreateAndSaveOperation = { type: 'createAndSave', entity, opId, toSave, txId } + return this.handleOp(createAndSaveOp) + } + + async StartTx(description?: string): Promise { + const opId = Math.random().toString() + const startTxOp: StartTxOperation = { type: 'startTx', opId, description } + await this.handleOp(startTxOp) + return opId + } + + async EndTx(txId: string, success: boolean, data: T): Promise { + const opId = Math.random().toString() + const endTxOp: EndTxOperation = success ? { type: 'endTx', opId, txId, success, data } : { type: 'endTx', opId, txId, success } + return this.handleOp(endTxOp) + } + + async Tx(exec: TX, description?: string): Promise { + const txId = await this.StartTx() + try { + const res = await exec(txId) + await this.EndTx(txId, true, res) + return res + } catch (err: any) { + await this.EndTx(txId, false, err.message) + throw err + } + } + + private handleOp(op: IStorageOperation): Promise { + this.checkConnected() + return new Promise((resolve, reject) => { + const responseHandler = (response: OperationResponse) => { + if (!response.success) { + reject(new Error(response.error)); + return + } + if (response.type !== op.type) { + reject(new Error('Invalid response type')); + return + } + resolve(response.data); + } + this.once(op.opId, responseHandler) + this.process.send(op) + }) + } + + private checkConnected() { + if (!this.isConnected) { + throw new Error('Storage processor is not connected'); + } + } + + public disconnect() { + if (this.process) { + this.process.kill(); + this.isConnected = false; + } + } +} \ No newline at end of file diff --git a/src/services/storage/storageProcessor.ts b/src/services/storage/storageProcessor.ts new file mode 100644 index 00000000..6ac844c7 --- /dev/null +++ b/src/services/storage/storageProcessor.ts @@ -0,0 +1,394 @@ +import { DataSource, EntityManager, DeepPartial, FindOptionsWhere, FindOptionsOrder } from 'typeorm'; +import NewDB, { DbSettings, MainDbEntities, MainDbNames, newMetricsDb } from './db.js'; +import { PubLogger, getLogger } from '../helpers/logger.js'; +import { allMetricsMigrations, allMigrations } from './migrations/runner.js'; +import transactionsQueue from './transactionsQueue.js'; +import { PickKeysByType } from 'typeorm/common/PickKeysByType'; + +export type WhereCondition = FindOptionsWhere | FindOptionsWhere[] +export type QueryOptions = { + where?: WhereCondition + order?: FindOptionsOrder + take?: number + skip?: number +} +export type ConnectOperation = { + type: 'connect' + opId: string + settings: DbSettings +} + +export type StartTxOperation = { + type: 'startTx' + opId: string + description?: string +} + +export type EndTxOperation = { + type: 'endTx' + txId: string + opId: string +} & ({ success: true, data: T } | { success: false }) + +export type DeleteOperation = { + type: 'delete' + entity: MainDbNames + opId: string + q: number | FindOptionsWhere + txId?: string +} + +export type RemoveOperation = { + type: 'remove' + entity: MainDbNames + opId: string + q: T + txId?: string +} + +export type UpdateOperation = { + type: 'update' + entity: MainDbNames + opId: string + toUpdate: DeepPartial + q: number | FindOptionsWhere + txId?: string +} + +export type IncrementOperation = { + type: 'increment' + entity: MainDbNames + opId: string + q: FindOptionsWhere + propertyPath: string, + value: number | string + txId?: string +} + +export type DecrementOperation = { + type: 'decrement' + entity: MainDbNames + opId: string + q: FindOptionsWhere + propertyPath: string, + value: number | string + txId?: string +} + +export type FindOneOperation = { + type: 'findOne' + entity: MainDbNames + opId: string + q: QueryOptions + txId?: string +} + +export type FindOperation = { + type: 'find' + entity: MainDbNames + opId: string + q: QueryOptions + txId?: string +} + +export type SumOperation = { + type: 'sum' + entity: MainDbNames + opId: string + columnName: PickKeysByType + q: WhereCondition + txId?: string +} + +export type CreateAndSaveOperation = { + type: 'createAndSave' + entity: MainDbNames + opId: string + toSave: DeepPartial + txId?: string + description?: string +} + +export type ErrorOperationResponse = { success: false, error: string, opId: string } + +export interface IStorageOperation { + opId: string + type: string +} + +export type StorageOperation = ConnectOperation | StartTxOperation | EndTxOperation | DeleteOperation | RemoveOperation | UpdateOperation | + FindOneOperation | FindOperation | CreateAndSaveOperation | IncrementOperation | DecrementOperation | SumOperation + +export type SuccessOperationResponse = { success: true, type: string, data: T, opId: string } +export type OperationResponse = SuccessOperationResponse | ErrorOperationResponse + +type ActiveTransaction = { + txId: string + manager: EntityManager | DataSource + resolve: (value: any) => void + reject: (reason?: any) => void +} + +class StorageProcessor { + private log: PubLogger = console.log + private DB: DataSource + private txQueue: transactionsQueue + //private locked: boolean = false + private activeTransaction: ActiveTransaction | null = null + //private queue: StartTxOperation[] = [] + + constructor() { + if (!process.send) { + throw new Error('This process must be spawned as a child process'); + } + this.log = getLogger({ component: 'StorageProcessor' }) + process.on('message', (operation: StorageOperation) => { + this.handleOperation(operation); + }); + + process.on('error', (error: Error) => { + console.error('Error in storage processor:', error); + }); + } + + private async handleOperation(operation: StorageOperation) { + try { + const opId = operation.opId; + switch (operation.type) { + case 'connect': + return this.handleConnect(operation); + case 'startTx': + return this.handleStartTx(operation); + case 'endTx': + return this.handleEndTx(operation); + case 'delete': + return this.handleDelete(operation); + case 'remove': + return this.handleRemove(operation); + case 'update': + return this.handleUpdate(operation); + case 'increment': + return this.handleIncrement(operation); + case 'decrement': + return this.handleDecrement(operation); + case 'findOne': + return this.handleFindOne(operation); + case 'find': + return this.handleFind(operation); + case 'sum': + return this.handleSum(operation); + case 'createAndSave': + return this.handleCreateAndSave(operation); + default: + this.sendResponse({ + success: false, + error: `Unknown operation type: ${(operation as any).type}`, + opId + }) + return + } + } catch (error) { + this.sendResponse({ + success: false, + error: error instanceof Error ? error.message : 'Unknown error occurred', + opId: operation.opId + }); + } + } + + private async handleConnect(operation: ConnectOperation) { + const { source, executedMigrations } = await NewDB(operation.settings, allMigrations) + this.DB = source + this.txQueue = new transactionsQueue('StorageProcessorQueue', this.DB) + if (executedMigrations.length > 0) { + this.log(executedMigrations.length, "new migrations executed") + this.log("-------------------") + } + this.sendResponse({ + success: true, + type: 'connect', + data: executedMigrations.length, + opId: operation.opId + }); + } + + + private async handleStartTx(operation: StartTxOperation) { + const res = await this.txQueue.PushToQueue({ + dbTx: false, + description: operation.description || "startTx", + exec: tx => new Promise((resolve, reject) => { + this.activeTransaction = { + txId: operation.opId, + manager: tx, + resolve, + reject + } + }) + }) + this.sendResponse({ + success: true, + type: 'startTx', + data: res, + opId: operation.opId + }); + } + + private async handleEndTx(operation: EndTxOperation) { + const activeTx = this.activeTransaction + if (!activeTx || activeTx.txId !== operation.txId) { + throw new Error('Transaction to end not found'); + } + if (operation.success) { + activeTx.resolve(true) + } else { + activeTx.reject(new Error('Transaction failed')) + } + this.activeTransaction = null + this.sendResponse({ + success: true, + type: 'endTx', + data: operation.success, + opId: operation.opId + }); + + } + + private getManager(txId?: string): DataSource | EntityManager { + if (txId) { + if (!this.activeTransaction || this.activeTransaction.txId !== txId) { + throw new Error('Transaction not found'); + } + return this.activeTransaction.manager + } + return this.DB + } + + private async handleDelete(operation: DeleteOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).delete(operation.q) + this.sendResponse({ + success: true, + type: 'delete', + data: res.affected || 0, + opId: operation.opId + }); + } + + private async handleRemove(operation: RemoveOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).remove(operation.q) + + this.sendResponse({ + success: true, + type: 'remove', + data: res, + opId: operation.opId + }); + } + + private async handleUpdate(operation: UpdateOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate) + + this.sendResponse({ + success: true, + type: 'update', + data: res.affected || 0, + opId: operation.opId + }); + } + + private async handleIncrement(operation: IncrementOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value) + this.sendResponse({ + success: true, + type: 'increment', + data: res.affected || 0, + opId: operation.opId + }); + } + + private async handleDecrement(operation: DecrementOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value) + this.sendResponse({ + success: true, + type: 'decrement', + data: res.affected || 0, + opId: operation.opId + }); + } + + private async handleFindOne(operation: FindOneOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).findOne(operation.q) + + this.sendResponse({ + success: true, + type: 'findOne', + data: res, + opId: operation.opId + }); + } + + private async handleFind(operation: FindOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).find(operation.q) + + this.sendResponse({ + success: true, + type: 'find', + data: res, + opId: operation.opId + }); + } + + private async handleSum(operation: SumOperation) { + const manager = this.getManager(operation.txId); + const res = await manager.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q) + this.sendResponse({ + success: true, + type: 'sum', + data: res || 0, + opId: operation.opId + }); + } + + private async handleCreateAndSave(operation: CreateAndSaveOperation) { + const saved = await this.createAndSave(operation) + + this.sendResponse({ + success: true, + type: 'createAndSave', + data: saved, + opId: operation.opId + }); + } + + private async createAndSave(operation: CreateAndSaveOperation) { + if (operation.txId) { + const manager = this.getManager(operation.txId); + const res = manager.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) + return manager.getRepository(MainDbEntities[operation.entity]).save(res) + } + return this.txQueue.PushToQueue({ + dbTx: false, + description: operation.description || "createAndSave", + exec: async tx => { + const res = tx.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) + return tx.getRepository(MainDbEntities[operation.entity]).save(res) + } + }) + } + + private sendResponse(response: OperationResponse) { + if (process.send) { + process.send(response); + } + } +} + +// Start the storage processor +new StorageProcessor(); diff --git a/src/services/storage/transactionsQueue.ts b/src/services/storage/transactionsQueue.ts index 12dd2333..c151211a 100644 --- a/src/services/storage/transactionsQueue.ts +++ b/src/services/storage/transactionsQueue.ts @@ -1,8 +1,8 @@ import { DataSource, EntityManager, EntityTarget } from "typeorm" import { PubLogger, getLogger } from "../helpers/logger.js" -export type TX = (entityManager: EntityManager | DataSource) => Promise -export type TxOperation = { +type TX = (entityManager: EntityManager | DataSource) => Promise +type TxOperation = { exec: TX dbTx: boolean description?: string diff --git a/src/services/storage/userStorage.ts b/src/services/storage/userStorage.ts index 602d3084..b35d40ae 100644 --- a/src/services/storage/userStorage.ts +++ b/src/services/storage/userStorage.ts @@ -1,74 +1,61 @@ import crypto from 'crypto'; -import { DataSource, EntityManager } from "typeorm" import { User } from './entity/User.js'; import { UserBasicAuth } from './entity/UserBasicAuth.js'; import { getLogger } from '../helpers/logger.js'; -import TransactionsQueue from "./transactionsQueue.js"; import EventsLogManager from './eventsLog.js'; +import { StorageInterface } from './storageInterface.js'; export default class { - DB: DataSource | EntityManager - txQueue: TransactionsQueue + dbs: StorageInterface eventsLog: EventsLogManager - constructor(DB: DataSource | EntityManager, txQueue: TransactionsQueue, eventsLog: EventsLogManager) { - this.DB = DB - this.txQueue = txQueue + constructor(dbs: StorageInterface, eventsLog: EventsLogManager) { + this.dbs = dbs this.eventsLog = eventsLog } - async AddUser(balance: number, dbTx: DataSource | EntityManager): Promise { + async AddUser(balance: number, txId: string): Promise { if (balance && process.env.ALLOW_BALANCE_MIGRATION !== 'true') { throw new Error("balance migration is not allowed") } getLogger({})("Adding user with balance", balance) - const newUser = dbTx.getRepository(User).create({ + return this.dbs.CreateAndSave('User', { user_id: crypto.randomBytes(32).toString('hex'), balance_sats: balance - }) - return dbTx.getRepository(User).save(newUser) + }, txId) } - async AddBasicUser(name: string, secret: string): Promise { - return this.DB.transaction(async tx => { - const user = await this.AddUser(0, tx) - const newUserAuth = tx.getRepository(UserBasicAuth).create({ - user: user, - name: name, - secret_sha256: crypto.createHash('sha256').update(secret).digest('base64') + /* async AddBasicUser(name: string, secret: string): Promise { + return this.DB.transaction(async tx => { + const user = await this.AddUser(0, tx) + const newUserAuth = tx.getRepository(UserBasicAuth).create({ + user: user, + name: name, + secret_sha256: crypto.createHash('sha256').update(secret).digest('base64') + }) + return tx.getRepository(UserBasicAuth).save(newUserAuth) }) - return tx.getRepository(UserBasicAuth).save(newUserAuth) - }) - + } */ + FindUser(userId: string, txId?: string) { + return this.dbs.FindOne('User', { where: { user_id: userId } }, txId) } - FindUser(userId: string, entityManager = this.DB) { - return entityManager.getRepository(User).findOne({ - where: { - user_id: userId - } - }) - } - async GetUser(userId: string, entityManager = this.DB): Promise { - const user = await this.FindUser(userId, entityManager) + async GetUser(userId: string, txId?: string): Promise { + const user = await this.FindUser(userId, txId) if (!user) { throw new Error(`user ${userId} not found`) // TODO: fix logs doxing } return user } - async UnbanUser(userId: string, entityManager = this.DB) { - const res = await entityManager.getRepository(User).update({ - user_id: userId - }, { locked: false }) - if (!res.affected) { + async UnbanUser(userId: string, txId?: string) { + const affected = await this.dbs.Update('User', { user_id: userId }, { locked: false }, txId) + if (!affected) { throw new Error("unaffected user unlock for " + userId) // TODO: fix logs doxing } } - async BanUser(userId: string, entityManager = this.DB) { - const user = await this.GetUser(userId, entityManager) - const res = await entityManager.getRepository(User).update({ - user_id: userId - }, { balance_sats: 0, locked: true }) - if (!res.affected) { + async BanUser(userId: string, txId?: string) { + const user = await this.GetUser(userId, txId) + const affected = await this.dbs.Update('User', { user_id: userId }, { balance_sats: 0, locked: true }, txId) + if (!affected) { throw new Error("unaffected ban user for " + userId) // TODO: fix logs doxing } if (user.balance_sats > 0) { @@ -76,53 +63,45 @@ export default class { } return user } - async IncrementUserBalance(userId: string, increment: number, reason: string, entityManager?: DataSource | EntityManager) { - if (entityManager) { - return this.IncrementUserBalanceInTx(userId, increment, reason, entityManager) + async IncrementUserBalance(userId: string, increment: number, reason: string, txId?: string) { + if (txId) { + return this.IncrementUserBalanceInTx(userId, increment, reason, txId) } - await this.txQueue.PushToQueue({ - dbTx: true, - description: `incrementing user ${userId} balance by ${increment}`, - exec: async tx => { - await this.IncrementUserBalanceInTx(userId, increment, reason, tx) - } - }) + + await this.dbs.Tx(async tx => { + await this.IncrementUserBalanceInTx(userId, increment, reason, tx) + }, `incrementing user ${userId} balance by ${increment}`) } - async IncrementUserBalanceInTx(userId: string, increment: number, reason: string, dbTx: DataSource | EntityManager) { - const user = await this.GetUser(userId, dbTx) - const res = await dbTx.getRepository(User).increment({ - user_id: userId, - }, "balance_sats", increment) - if (!res.affected) { + + async IncrementUserBalanceInTx(userId: string, increment: number, reason: string, txId: string) { + const user = await this.GetUser(userId, txId) + const affected = await this.dbs.Increment('User', { user_id: userId }, "balance_sats", increment, txId) + if (!affected) { getLogger({ userId: userId, component: "balanceUpdates" })("user unaffected by increment") throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing } getLogger({ userId: userId, component: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats") this.eventsLog.LogEvent({ type: 'balance_increment', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: increment }) } - async DecrementUserBalance(userId: string, decrement: number, reason: string, entityManager?: DataSource | EntityManager) { - if (entityManager) { - return this.DecrementUserBalanceInTx(userId, decrement, reason, entityManager) + + async DecrementUserBalance(userId: string, decrement: number, reason: string, txId?: string) { + if (txId) { + return this.DecrementUserBalanceInTx(userId, decrement, reason, txId) } - await this.txQueue.PushToQueue({ - dbTx: true, - description: `decrementing user ${userId} balance by ${decrement}`, - exec: async tx => { - await this.DecrementUserBalanceInTx(userId, decrement, reason, tx) - } - }) + + await this.dbs.Tx(async tx => { + await this.DecrementUserBalanceInTx(userId, decrement, reason, tx) + }, `decrementing user ${userId} balance by ${decrement}`) } - async DecrementUserBalanceInTx(userId: string, decrement: number, reason: string, dbTx: DataSource | EntityManager) { - const user = await this.GetUser(userId, dbTx) + async DecrementUserBalanceInTx(userId: string, decrement: number, reason: string, txId: string) { + const user = await this.GetUser(userId, txId) if (!user || user.balance_sats < decrement) { getLogger({ userId: userId, component: "balanceUpdates" })("not enough balance to decrement") throw new Error("not enough balance to decrement") } - const res = await dbTx.getRepository(User).decrement({ - user_id: userId, - }, "balance_sats", decrement) - if (!res.affected) { + const affected = await this.dbs.Decrement('User', { user_id: userId }, "balance_sats", decrement, txId) + if (!affected) { getLogger({ userId: userId, component: "balanceUpdates" })("user unaffected by decrement") throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing } @@ -130,8 +109,8 @@ export default class { this.eventsLog.LogEvent({ type: 'balance_decrement', userId, appId: "", appUserId: "", balance: user.balance_sats, data: reason, amount: decrement }) } - async UpdateUser(userId: string, update: Partial, entityManager = this.DB) { - const user = await this.GetUser(userId, entityManager) - await entityManager.getRepository(User).update(user.serial_id, update) + async UpdateUser(userId: string, update: Partial, txId?: string) { + const user = await this.GetUser(userId, txId) + await this.dbs.Update('User', user.serial_id, update, txId) } } \ No newline at end of file