mutex tests
This commit is contained in:
parent
d6c97651f0
commit
c4f7a84eb3
3 changed files with 150 additions and 67 deletions
|
|
@ -160,29 +160,41 @@ class StorageProcessor {
|
|||
}
|
||||
switch (operation.type) {
|
||||
case 'connect':
|
||||
return this.handleConnect(operation);
|
||||
await this.handleConnect(operation);
|
||||
break;
|
||||
case 'startTx':
|
||||
return this.handleStartTx(operation);
|
||||
await this.handleStartTx(operation);
|
||||
break;
|
||||
case 'endTx':
|
||||
return this.handleEndTx(operation);
|
||||
await this.handleEndTx(operation);
|
||||
break;
|
||||
case 'delete':
|
||||
return this.handleDelete(operation);
|
||||
await this.handleDelete(operation);
|
||||
break;
|
||||
case 'remove':
|
||||
return this.handleRemove(operation);
|
||||
await this.handleRemove(operation);
|
||||
break;
|
||||
case 'update':
|
||||
return this.handleUpdate(operation);
|
||||
await this.handleUpdate(operation);
|
||||
break;
|
||||
case 'increment':
|
||||
return this.handleIncrement(operation);
|
||||
await this.handleIncrement(operation);
|
||||
break;
|
||||
case 'decrement':
|
||||
return this.handleDecrement(operation);
|
||||
await this.handleDecrement(operation);
|
||||
break;
|
||||
case 'findOne':
|
||||
return this.handleFindOne(operation);
|
||||
await this.handleFindOne(operation);
|
||||
break;
|
||||
case 'find':
|
||||
return this.handleFind(operation);
|
||||
await this.handleFind(operation);
|
||||
break;
|
||||
case 'sum':
|
||||
return this.handleSum(operation);
|
||||
await this.handleSum(operation);
|
||||
break;
|
||||
case 'createAndSave':
|
||||
return this.handleCreateAndSave(operation);
|
||||
await this.handleCreateAndSave(operation);
|
||||
break;
|
||||
default:
|
||||
this.sendResponse({
|
||||
success: false,
|
||||
|
|
@ -415,7 +427,9 @@ class StorageProcessor {
|
|||
return this.txQueue.PushToQueue({
|
||||
dbTx: false,
|
||||
description: "write",
|
||||
exec: write
|
||||
exec: tx => {
|
||||
return write(tx)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,9 +10,13 @@ export default async (T: StorageTestBase) => {
|
|||
const u = await testCanCreateUser(T)
|
||||
await testCanReadUser(T, u)
|
||||
await testConcurrentReads(T, u)
|
||||
//T.storage.dbs.setDebug(true)
|
||||
//await testTransactionIsolation(T, u)
|
||||
//T.storage.dbs.setDebug(false)
|
||||
|
||||
// RWMutex specific tests
|
||||
await testMultipleConcurrentReads(T, u)
|
||||
await testWriteDuringReads(T, u)
|
||||
await testSequentialWrites(T, u)
|
||||
await testTransactionWithConcurrentReads(T, u)
|
||||
|
||||
await testUserCRUD(T)
|
||||
await testErrorHandling(T, u)
|
||||
}
|
||||
|
|
@ -56,57 +60,6 @@ const testConcurrentReads = async (T: StorageTestBase, user: User) => {
|
|||
T.d('Finished testConcurrentReads')
|
||||
}
|
||||
|
||||
/* const testTransactionIsolation = async (T: StorageTestBase, user: User) => {
|
||||
T.d('Starting testTransactionIsolation')
|
||||
// Start a transaction
|
||||
// Check initial balance before transaction
|
||||
const userBefore = await T.storage.dbs.FindOne<User>('User', {
|
||||
where: { user_id: user.user_id }
|
||||
})
|
||||
T.expect(userBefore?.balance_sats).to.not.equal(1000, 'User should not start with balance of 1000')
|
||||
|
||||
const txId = await T.storage.dbs.StartTx('test-transaction')
|
||||
|
||||
try {
|
||||
// Update user balance in transaction
|
||||
const initialBalance = 1000
|
||||
const where: FindOptionsWhere<User> = { user_id: user.user_id }
|
||||
|
||||
await T.storage.dbs.Update<User>('User',
|
||||
where,
|
||||
{ balance_sats: initialBalance },
|
||||
txId
|
||||
)
|
||||
|
||||
// Verify balance is updated in transaction
|
||||
const userInTx = await T.storage.dbs.FindOne<User>('User',
|
||||
{ where },
|
||||
txId
|
||||
)
|
||||
T.expect(userInTx?.balance_sats).to.be.equal(initialBalance)
|
||||
|
||||
// Verify balance is not visible outside transaction
|
||||
const userOutsideTx = await T.storage.dbs.FindOne<User>('User',
|
||||
{ where }
|
||||
)
|
||||
T.expect(userOutsideTx?.balance_sats).to.not.equal(initialBalance)
|
||||
|
||||
// Commit the transaction
|
||||
await T.storage.dbs.EndTx(txId, true, null)
|
||||
|
||||
// Verify balance is now visible
|
||||
const userAfterCommit = await T.storage.dbs.FindOne<User>('User',
|
||||
{ where }
|
||||
)
|
||||
T.expect(userAfterCommit?.balance_sats).to.be.equal(initialBalance)
|
||||
} catch (error) {
|
||||
// Rollback on error
|
||||
await T.storage.dbs.EndTx(txId, false, error instanceof Error ? error.message : 'Unknown error')
|
||||
throw error
|
||||
}
|
||||
T.d('Finished testTransactionIsolation')
|
||||
} */
|
||||
|
||||
const testUserCRUD = async (T: StorageTestBase) => {
|
||||
T.d('Starting testUserCRUD')
|
||||
// Create
|
||||
|
|
@ -191,3 +144,118 @@ const testErrorHandling = async (T: StorageTestBase, user: User) => {
|
|||
}
|
||||
T.d('Finished testErrorHandling')
|
||||
}
|
||||
|
||||
const testMultipleConcurrentReads = async (T: StorageTestBase, user: User) => {
|
||||
T.d('Starting testMultipleConcurrentReads')
|
||||
|
||||
// Create multiple concurrent read operations
|
||||
const readPromises = Array(5).fill(null).map(() =>
|
||||
T.storage.dbs.FindOne<User>('User', {
|
||||
where: { user_id: user.user_id }
|
||||
})
|
||||
)
|
||||
|
||||
// All reads should complete successfully
|
||||
const results = await Promise.all(readPromises)
|
||||
results.forEach(result => {
|
||||
T.expect(result?.user_id).to.be.equal(user.user_id)
|
||||
})
|
||||
|
||||
T.d('Finished testMultipleConcurrentReads')
|
||||
}
|
||||
|
||||
const testWriteDuringReads = async (T: StorageTestBase, user: User) => {
|
||||
T.d('Starting testWriteDuringReads')
|
||||
|
||||
// Start multiple read operations
|
||||
const readPromises = Array(3).fill(null).map(() =>
|
||||
T.storage.dbs.FindOne<User>('User', {
|
||||
where: { user_id: user.user_id }
|
||||
})
|
||||
)
|
||||
|
||||
// Start a write operation that should wait for reads to complete
|
||||
const writePromise = T.storage.dbs.Update<User>('User',
|
||||
{ user_id: user.user_id },
|
||||
{ balance_sats: 100 }
|
||||
)
|
||||
|
||||
// All operations should complete without errors
|
||||
await Promise.all([...readPromises, writePromise])
|
||||
|
||||
// Verify the write completed
|
||||
const finalState = await T.storage.dbs.FindOne<User>('User', {
|
||||
where: { user_id: user.user_id }
|
||||
})
|
||||
T.expect(finalState?.balance_sats).to.be.equal(100)
|
||||
|
||||
T.d('Finished testWriteDuringReads')
|
||||
}
|
||||
|
||||
const testSequentialWrites = async (T: StorageTestBase, user: User) => {
|
||||
T.d('Starting testSequentialWrites')
|
||||
|
||||
const initialBalance = 200
|
||||
const finalBalance = 300
|
||||
|
||||
// First write operation
|
||||
await T.storage.dbs.Update<User>('User',
|
||||
{ user_id: user.user_id },
|
||||
{ balance_sats: initialBalance }
|
||||
)
|
||||
|
||||
// Verify first write
|
||||
const midResult = await T.storage.dbs.FindOne<User>('User', {
|
||||
where: { user_id: user.user_id }
|
||||
})
|
||||
T.expect(midResult?.balance_sats).to.be.equal(initialBalance)
|
||||
|
||||
// Second write operation
|
||||
await T.storage.dbs.Update<User>('User',
|
||||
{ user_id: user.user_id },
|
||||
{ balance_sats: finalBalance }
|
||||
)
|
||||
|
||||
// Verify second write
|
||||
const finalResult = await T.storage.dbs.FindOne<User>('User', {
|
||||
where: { user_id: user.user_id }
|
||||
})
|
||||
T.expect(finalResult?.balance_sats).to.be.equal(finalBalance)
|
||||
|
||||
T.d('Finished testSequentialWrites')
|
||||
}
|
||||
|
||||
const testTransactionWithConcurrentReads = async (T: StorageTestBase, user: User) => {
|
||||
T.d('Starting testTransactionWithConcurrentReads')
|
||||
|
||||
const txId = await T.storage.dbs.StartTx('rwmutex-test')
|
||||
try {
|
||||
// Start the write operation in transaction
|
||||
await T.storage.dbs.Update<User>('User',
|
||||
{ user_id: user.user_id },
|
||||
{ balance_sats: 400 },
|
||||
txId
|
||||
)
|
||||
|
||||
// Attempt concurrent reads (should wait for transaction)
|
||||
const readPromises = Array(3).fill(null).map(() =>
|
||||
T.storage.dbs.FindOne<User>('User', {
|
||||
where: { user_id: user.user_id }
|
||||
})
|
||||
)
|
||||
|
||||
// Complete transaction
|
||||
await T.storage.dbs.EndTx(txId, true, null)
|
||||
|
||||
// Now reads should complete and see the updated value
|
||||
const results = await Promise.all(readPromises)
|
||||
results.forEach(result => {
|
||||
T.expect(result?.balance_sats).to.be.equal(400)
|
||||
})
|
||||
} catch (error) {
|
||||
await T.storage.dbs.EndTx(txId, false, error instanceof Error ? error.message : 'Unknown error')
|
||||
throw error
|
||||
}
|
||||
|
||||
T.d('Finished testTransactionWithConcurrentReads')
|
||||
}
|
||||
|
|
|
|||
1
test-data/metric_cache/last24hSF.json
Normal file
1
test-data/metric_cache/last24hSF.json
Normal file
|
|
@ -0,0 +1 @@
|
|||
[]
|
||||
Loading…
Add table
Add a link
Reference in a new issue