From 22a1c10b4eac17f533e00412f95cd81fe5f44e15 Mon Sep 17 00:00:00 2001 From: boufni95 Date: Thu, 13 Mar 2025 22:50:12 +0000 Subject: [PATCH] RWMutex --- src/services/storage/storageProcessor.ts | 84 ++++++++++++++--------- src/services/storage/transactionsQueue.ts | 59 +++++++++++++++- src/tests/testRunner.ts | 1 + src/tests/testStorage.spec.ts | 6 +- 4 files changed, 114 insertions(+), 36 deletions(-) diff --git a/src/services/storage/storageProcessor.ts b/src/services/storage/storageProcessor.ts index 067d3079..f4e67454 100644 --- a/src/services/storage/storageProcessor.ts +++ b/src/services/storage/storageProcessor.ts @@ -268,19 +268,25 @@ class StorageProcessor { } + private getTx(txId: string) { + if (!this.activeTransaction || this.activeTransaction.txId !== txId) { + throw new Error('Transaction not found'); + } + return this.activeTransaction.manager + } + private getManager(txId?: string): DataSource | EntityManager { if (txId) { - if (!this.activeTransaction || this.activeTransaction.txId !== txId) { - throw new Error('Transaction not found'); - } - return this.activeTransaction.manager + return this.getTx(txId) } return this.DB } private async handleDelete(operation: DeleteOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).delete(operation.q) + + const res = await this.handleWrite(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).delete(operation.q) + }) this.sendResponse({ success: true, type: 'delete', @@ -290,8 +296,9 @@ class StorageProcessor { } private async handleRemove(operation: RemoveOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).remove(operation.q) + const res = await this.handleWrite(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).remove(operation.q) + }) this.sendResponse({ success: true, @@ -302,8 +309,9 @@ class StorageProcessor { } private async handleUpdate(operation: UpdateOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate) + const res = await this.handleWrite(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate) + }) this.sendResponse({ success: true, @@ -314,8 +322,10 @@ class StorageProcessor { } private async handleIncrement(operation: IncrementOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value) + const res = await this.handleWrite(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value) + }) + this.sendResponse({ success: true, type: 'increment', @@ -325,8 +335,10 @@ class StorageProcessor { } private async handleDecrement(operation: DecrementOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value) + const res = await this.handleWrite(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value) + }) + this.sendResponse({ success: true, type: 'decrement', @@ -336,8 +348,9 @@ class StorageProcessor { } private async handleFindOne(operation: FindOneOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).findOne(operation.q) + const res = await this.handleRead(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).findOne(operation.q) + }) this.sendResponse({ success: true, @@ -348,8 +361,9 @@ class StorageProcessor { } private async handleFind(operation: FindOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).find(operation.q) + const res = await this.handleRead(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).find(operation.q) + }) this.sendResponse({ success: true, @@ -360,8 +374,9 @@ class StorageProcessor { } private async handleSum(operation: SumOperation) { - const manager = this.getManager(operation.txId); - const res = await manager.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q) + const res = await this.handleRead(operation.txId, eM => { + return eM.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q) + }) this.sendResponse({ success: true, type: 'sum', @@ -371,7 +386,10 @@ class StorageProcessor { } private async handleCreateAndSave(operation: CreateAndSaveOperation) { - const saved = await this.createAndSave(operation) + const saved = await this.handleWrite(operation.txId, async eM => { + const res = eM.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) + return eM.getRepository(MainDbEntities[operation.entity]).save(res) + }) this.sendResponse({ success: true, @@ -381,19 +399,23 @@ class StorageProcessor { }); } - private async createAndSave(operation: CreateAndSaveOperation) { - if (operation.txId) { - const manager = this.getManager(operation.txId); - const res = manager.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) - return manager.getRepository(MainDbEntities[operation.entity]).save(res) + private async handleRead(txId: string | undefined, read: (tx: DataSource | EntityManager) => Promise) { + if (txId) { + const tx = this.getTx(txId) + return read(tx) + } + return this.txQueue.Read(read) + } + + private async handleWrite(txId: string | undefined, write: (tx: DataSource | EntityManager) => Promise) { + if (txId) { + const tx = this.getTx(txId) + return write(tx) } return this.txQueue.PushToQueue({ dbTx: false, - description: operation.description || "createAndSave", - exec: async tx => { - const res = tx.getRepository(MainDbEntities[operation.entity]).create(operation.toSave) - return tx.getRepository(MainDbEntities[operation.entity]).save(res) - } + description: "write", + exec: write }) } diff --git a/src/services/storage/transactionsQueue.ts b/src/services/storage/transactionsQueue.ts index c151211a..4a1d4ee8 100644 --- a/src/services/storage/transactionsQueue.ts +++ b/src/services/storage/transactionsQueue.ts @@ -7,19 +7,73 @@ type TxOperation = { dbTx: boolean description?: string } - +/* type Locks = { + beforeQueue: () => Promise + afterQueue: () => void +} */ export default class { DB: DataSource | EntityManager pendingTx: boolean transactionsQueue: { op: TxOperation, res: (v: any) => void, rej: (message: string) => void }[] = [] + readersQueue: { res: () => void, rej: (message: string) => void }[] = [] + activeReaders = 0 + writeRequested = false log: PubLogger + constructor(name: string, DB: DataSource | EntityManager) { this.DB = DB this.log = getLogger({ component: name }) + + } + + private async executeRead(read: (tx: DataSource | EntityManager) => Promise) { + try { + this.activeReaders++ + const res = await read(this.DB) + this.doneReading() + return res + } catch (err) { + this.doneReading() + throw err + } + } + async Read(read: (tx: DataSource | EntityManager) => Promise) { + console.log("Read", this.activeReaders, this.pendingTx, this.writeRequested) + if (!this.writeRequested) { + return this.executeRead(read) + } + await this.waitWritingDone() + return this.executeRead(read) + } + + async waitWritingDone() { + if (!this.writeRequested) { + return + } + return new Promise((res, rej) => { + this.readersQueue.push({ res, rej }) + }) + } + + doneWriting() { + this.writeRequested = false + this.readersQueue.forEach(r => { + r.res() + }) + this.readersQueue = [] + } + + doneReading() { + this.activeReaders-- + if (this.activeReaders === 0 && !this.pendingTx) { + this.execNextInQueue() + } } PushToQueue(op: TxOperation) { - if (!this.pendingTx) { + console.log("PushToQueue", this.activeReaders, this.pendingTx, this.writeRequested) + this.writeRequested = true + if (!this.pendingTx && this.activeReaders === 0) { return this.execQueueItem(op) } this.log("pushing to queue", this.transactionsQueue.length) @@ -32,6 +86,7 @@ export default class { this.pendingTx = false const next = this.transactionsQueue.pop() if (!next) { + this.doneWriting() return } try { diff --git a/src/tests/testRunner.ts b/src/tests/testRunner.ts index d12f0cbc..d283cc43 100644 --- a/src/tests/testRunner.ts +++ b/src/tests/testRunner.ts @@ -46,6 +46,7 @@ const start = async () => { await runTestFile(file, module) } else { console.log("running all tests") + await setupNetwork() for (const { file, module } of modules) { await runTestFile(file, module) } diff --git a/src/tests/testStorage.spec.ts b/src/tests/testStorage.spec.ts index 35715f95..f34412a6 100644 --- a/src/tests/testStorage.spec.ts +++ b/src/tests/testStorage.spec.ts @@ -3,16 +3,16 @@ import { defaultInvoiceExpiry } from '../services/storage/paymentStorage.js' import { runSanityCheck, safelySetUserBalance, StorageTestBase, TestBase } from './testBase.js' import { FindOptionsWhere } from 'typeorm' export const ignore = false -export const dev = true +export const dev = false export const requires = 'storage' export default async (T: StorageTestBase) => { const u = await testCanCreateUser(T) await testCanReadUser(T, u) await testConcurrentReads(T, u) - T.storage.dbs.setDebug(true) + //T.storage.dbs.setDebug(true) await testTransactionIsolation(T, u) - T.storage.dbs.setDebug(false) + //T.storage.dbs.setDebug(false) await testUserCRUD(T) await testErrorHandling(T, u) }