diff --git a/src/services/storage/storageProcessor.ts b/src/services/storage/storageProcessor.ts index f4e67454..2b73fe9e 100644 --- a/src/services/storage/storageProcessor.ts +++ b/src/services/storage/storageProcessor.ts @@ -160,29 +160,41 @@ class StorageProcessor { } switch (operation.type) { case 'connect': - return this.handleConnect(operation); + await this.handleConnect(operation); + break; case 'startTx': - return this.handleStartTx(operation); + await this.handleStartTx(operation); + break; case 'endTx': - return this.handleEndTx(operation); + await this.handleEndTx(operation); + break; case 'delete': - return this.handleDelete(operation); + await this.handleDelete(operation); + break; case 'remove': - return this.handleRemove(operation); + await this.handleRemove(operation); + break; case 'update': - return this.handleUpdate(operation); + await this.handleUpdate(operation); + break; case 'increment': - return this.handleIncrement(operation); + await this.handleIncrement(operation); + break; case 'decrement': - return this.handleDecrement(operation); + await this.handleDecrement(operation); + break; case 'findOne': - return this.handleFindOne(operation); + await this.handleFindOne(operation); + break; case 'find': - return this.handleFind(operation); + await this.handleFind(operation); + break; case 'sum': - return this.handleSum(operation); + await this.handleSum(operation); + break; case 'createAndSave': - return this.handleCreateAndSave(operation); + await this.handleCreateAndSave(operation); + break; default: this.sendResponse({ success: false, @@ -415,7 +427,9 @@ class StorageProcessor { return this.txQueue.PushToQueue({ dbTx: false, description: "write", - exec: write + exec: tx => { + return write(tx) + } }) } diff --git a/src/tests/testStorage.spec.ts b/src/tests/testStorage.spec.ts index 13516436..91035196 100644 --- a/src/tests/testStorage.spec.ts +++ b/src/tests/testStorage.spec.ts @@ -10,9 +10,13 @@ export default async (T: StorageTestBase) => { const u = await testCanCreateUser(T) await testCanReadUser(T, u) await testConcurrentReads(T, u) - //T.storage.dbs.setDebug(true) - //await testTransactionIsolation(T, u) - //T.storage.dbs.setDebug(false) + + // RWMutex specific tests + await testMultipleConcurrentReads(T, u) + await testWriteDuringReads(T, u) + await testSequentialWrites(T, u) + await testTransactionWithConcurrentReads(T, u) + await testUserCRUD(T) await testErrorHandling(T, u) } @@ -56,57 +60,6 @@ const testConcurrentReads = async (T: StorageTestBase, user: User) => { T.d('Finished testConcurrentReads') } -/* const testTransactionIsolation = async (T: StorageTestBase, user: User) => { - T.d('Starting testTransactionIsolation') - // Start a transaction - // Check initial balance before transaction - const userBefore = await T.storage.dbs.FindOne('User', { - where: { user_id: user.user_id } - }) - T.expect(userBefore?.balance_sats).to.not.equal(1000, 'User should not start with balance of 1000') - - const txId = await T.storage.dbs.StartTx('test-transaction') - - try { - // Update user balance in transaction - const initialBalance = 1000 - const where: FindOptionsWhere = { user_id: user.user_id } - - await T.storage.dbs.Update('User', - where, - { balance_sats: initialBalance }, - txId - ) - - // Verify balance is updated in transaction - const userInTx = await T.storage.dbs.FindOne('User', - { where }, - txId - ) - T.expect(userInTx?.balance_sats).to.be.equal(initialBalance) - - // Verify balance is not visible outside transaction - const userOutsideTx = await T.storage.dbs.FindOne('User', - { where } - ) - T.expect(userOutsideTx?.balance_sats).to.not.equal(initialBalance) - - // Commit the transaction - await T.storage.dbs.EndTx(txId, true, null) - - // Verify balance is now visible - const userAfterCommit = await T.storage.dbs.FindOne('User', - { where } - ) - T.expect(userAfterCommit?.balance_sats).to.be.equal(initialBalance) - } catch (error) { - // Rollback on error - await T.storage.dbs.EndTx(txId, false, error instanceof Error ? error.message : 'Unknown error') - throw error - } - T.d('Finished testTransactionIsolation') -} */ - const testUserCRUD = async (T: StorageTestBase) => { T.d('Starting testUserCRUD') // Create @@ -191,3 +144,118 @@ const testErrorHandling = async (T: StorageTestBase, user: User) => { } T.d('Finished testErrorHandling') } + +const testMultipleConcurrentReads = async (T: StorageTestBase, user: User) => { + T.d('Starting testMultipleConcurrentReads') + + // Create multiple concurrent read operations + const readPromises = Array(5).fill(null).map(() => + T.storage.dbs.FindOne('User', { + where: { user_id: user.user_id } + }) + ) + + // All reads should complete successfully + const results = await Promise.all(readPromises) + results.forEach(result => { + T.expect(result?.user_id).to.be.equal(user.user_id) + }) + + T.d('Finished testMultipleConcurrentReads') +} + +const testWriteDuringReads = async (T: StorageTestBase, user: User) => { + T.d('Starting testWriteDuringReads') + + // Start multiple read operations + const readPromises = Array(3).fill(null).map(() => + T.storage.dbs.FindOne('User', { + where: { user_id: user.user_id } + }) + ) + + // Start a write operation that should wait for reads to complete + const writePromise = T.storage.dbs.Update('User', + { user_id: user.user_id }, + { balance_sats: 100 } + ) + + // All operations should complete without errors + await Promise.all([...readPromises, writePromise]) + + // Verify the write completed + const finalState = await T.storage.dbs.FindOne('User', { + where: { user_id: user.user_id } + }) + T.expect(finalState?.balance_sats).to.be.equal(100) + + T.d('Finished testWriteDuringReads') +} + +const testSequentialWrites = async (T: StorageTestBase, user: User) => { + T.d('Starting testSequentialWrites') + + const initialBalance = 200 + const finalBalance = 300 + + // First write operation + await T.storage.dbs.Update('User', + { user_id: user.user_id }, + { balance_sats: initialBalance } + ) + + // Verify first write + const midResult = await T.storage.dbs.FindOne('User', { + where: { user_id: user.user_id } + }) + T.expect(midResult?.balance_sats).to.be.equal(initialBalance) + + // Second write operation + await T.storage.dbs.Update('User', + { user_id: user.user_id }, + { balance_sats: finalBalance } + ) + + // Verify second write + const finalResult = await T.storage.dbs.FindOne('User', { + where: { user_id: user.user_id } + }) + T.expect(finalResult?.balance_sats).to.be.equal(finalBalance) + + T.d('Finished testSequentialWrites') +} + +const testTransactionWithConcurrentReads = async (T: StorageTestBase, user: User) => { + T.d('Starting testTransactionWithConcurrentReads') + + const txId = await T.storage.dbs.StartTx('rwmutex-test') + try { + // Start the write operation in transaction + await T.storage.dbs.Update('User', + { user_id: user.user_id }, + { balance_sats: 400 }, + txId + ) + + // Attempt concurrent reads (should wait for transaction) + const readPromises = Array(3).fill(null).map(() => + T.storage.dbs.FindOne('User', { + where: { user_id: user.user_id } + }) + ) + + // Complete transaction + await T.storage.dbs.EndTx(txId, true, null) + + // Now reads should complete and see the updated value + const results = await Promise.all(readPromises) + results.forEach(result => { + T.expect(result?.balance_sats).to.be.equal(400) + }) + } catch (error) { + await T.storage.dbs.EndTx(txId, false, error instanceof Error ? error.message : 'Unknown error') + throw error + } + + T.d('Finished testTransactionWithConcurrentReads') +} diff --git a/test-data/metric_cache/last24hSF.json b/test-data/metric_cache/last24hSF.json new file mode 100644 index 00000000..0637a088 --- /dev/null +++ b/test-data/metric_cache/last24hSF.json @@ -0,0 +1 @@ +[] \ No newline at end of file