This commit is contained in:
boufni95 2025-03-13 22:50:12 +00:00
parent 7d693247c0
commit 22a1c10b4e
4 changed files with 114 additions and 36 deletions

View file

@ -268,19 +268,25 @@ class StorageProcessor {
}
private getTx(txId: string) {
if (!this.activeTransaction || this.activeTransaction.txId !== txId) {
throw new Error('Transaction not found');
}
return this.activeTransaction.manager
}
private getManager(txId?: string): DataSource | EntityManager {
if (txId) {
if (!this.activeTransaction || this.activeTransaction.txId !== txId) {
throw new Error('Transaction not found');
}
return this.activeTransaction.manager
return this.getTx(txId)
}
return this.DB
}
private async handleDelete(operation: DeleteOperation<any>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).delete(operation.q)
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).delete(operation.q)
})
this.sendResponse({
success: true,
type: 'delete',
@ -290,8 +296,9 @@ class StorageProcessor {
}
private async handleRemove(operation: RemoveOperation<any>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).remove(operation.q)
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).remove(operation.q)
})
this.sendResponse({
success: true,
@ -302,8 +309,9 @@ class StorageProcessor {
}
private async handleUpdate(operation: UpdateOperation<any>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate)
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).update(operation.q, operation.toUpdate)
})
this.sendResponse({
success: true,
@ -314,8 +322,10 @@ class StorageProcessor {
}
private async handleIncrement(operation: IncrementOperation<any>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value)
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).increment(operation.q, operation.propertyPath, operation.value)
})
this.sendResponse({
success: true,
type: 'increment',
@ -325,8 +335,10 @@ class StorageProcessor {
}
private async handleDecrement(operation: DecrementOperation<any>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value)
const res = await this.handleWrite(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).decrement(operation.q, operation.propertyPath, operation.value)
})
this.sendResponse({
success: true,
type: 'decrement',
@ -336,8 +348,9 @@ class StorageProcessor {
}
private async handleFindOne(operation: FindOneOperation<any>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).findOne(operation.q)
const res = await this.handleRead(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).findOne(operation.q)
})
this.sendResponse({
success: true,
@ -348,8 +361,9 @@ class StorageProcessor {
}
private async handleFind(operation: FindOperation<any>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).find(operation.q)
const res = await this.handleRead(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).find(operation.q)
})
this.sendResponse({
success: true,
@ -360,8 +374,9 @@ class StorageProcessor {
}
private async handleSum(operation: SumOperation<object>) {
const manager = this.getManager(operation.txId);
const res = await manager.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q)
const res = await this.handleRead(operation.txId, eM => {
return eM.getRepository(MainDbEntities[operation.entity]).sum(operation.columnName, operation.q)
})
this.sendResponse({
success: true,
type: 'sum',
@ -371,7 +386,10 @@ class StorageProcessor {
}
private async handleCreateAndSave(operation: CreateAndSaveOperation<any>) {
const saved = await this.createAndSave(operation)
const saved = await this.handleWrite(operation.txId, async eM => {
const res = eM.getRepository(MainDbEntities[operation.entity]).create(operation.toSave)
return eM.getRepository(MainDbEntities[operation.entity]).save(res)
})
this.sendResponse({
success: true,
@ -381,19 +399,23 @@ class StorageProcessor {
});
}
private async createAndSave(operation: CreateAndSaveOperation<any>) {
if (operation.txId) {
const manager = this.getManager(operation.txId);
const res = manager.getRepository(MainDbEntities[operation.entity]).create(operation.toSave)
return manager.getRepository(MainDbEntities[operation.entity]).save(res)
private async handleRead(txId: string | undefined, read: (tx: DataSource | EntityManager) => Promise<any>) {
if (txId) {
const tx = this.getTx(txId)
return read(tx)
}
return this.txQueue.Read(read)
}
private async handleWrite(txId: string | undefined, write: (tx: DataSource | EntityManager) => Promise<any>) {
if (txId) {
const tx = this.getTx(txId)
return write(tx)
}
return this.txQueue.PushToQueue({
dbTx: false,
description: operation.description || "createAndSave",
exec: async tx => {
const res = tx.getRepository(MainDbEntities[operation.entity]).create(operation.toSave)
return tx.getRepository(MainDbEntities[operation.entity]).save(res)
}
description: "write",
exec: write
})
}

View file

@ -7,19 +7,73 @@ type TxOperation<T> = {
dbTx: boolean
description?: string
}
/* type Locks = {
beforeQueue: () => Promise<void>
afterQueue: () => void
} */
export default class {
DB: DataSource | EntityManager
pendingTx: boolean
transactionsQueue: { op: TxOperation<any>, res: (v: any) => void, rej: (message: string) => void }[] = []
readersQueue: { res: () => void, rej: (message: string) => void }[] = []
activeReaders = 0
writeRequested = false
log: PubLogger
constructor(name: string, DB: DataSource | EntityManager) {
this.DB = DB
this.log = getLogger({ component: name })
}
private async executeRead(read: (tx: DataSource | EntityManager) => Promise<any>) {
try {
this.activeReaders++
const res = await read(this.DB)
this.doneReading()
return res
} catch (err) {
this.doneReading()
throw err
}
}
async Read(read: (tx: DataSource | EntityManager) => Promise<any>) {
console.log("Read", this.activeReaders, this.pendingTx, this.writeRequested)
if (!this.writeRequested) {
return this.executeRead(read)
}
await this.waitWritingDone()
return this.executeRead(read)
}
async waitWritingDone() {
if (!this.writeRequested) {
return
}
return new Promise<void>((res, rej) => {
this.readersQueue.push({ res, rej })
})
}
doneWriting() {
this.writeRequested = false
this.readersQueue.forEach(r => {
r.res()
})
this.readersQueue = []
}
doneReading() {
this.activeReaders--
if (this.activeReaders === 0 && !this.pendingTx) {
this.execNextInQueue()
}
}
PushToQueue<T>(op: TxOperation<T>) {
if (!this.pendingTx) {
console.log("PushToQueue", this.activeReaders, this.pendingTx, this.writeRequested)
this.writeRequested = true
if (!this.pendingTx && this.activeReaders === 0) {
return this.execQueueItem(op)
}
this.log("pushing to queue", this.transactionsQueue.length)
@ -32,6 +86,7 @@ export default class {
this.pendingTx = false
const next = this.transactionsQueue.pop()
if (!next) {
this.doneWriting()
return
}
try {

View file

@ -46,6 +46,7 @@ const start = async () => {
await runTestFile(file, module)
} else {
console.log("running all tests")
await setupNetwork()
for (const { file, module } of modules) {
await runTestFile(file, module)
}

View file

@ -3,16 +3,16 @@ import { defaultInvoiceExpiry } from '../services/storage/paymentStorage.js'
import { runSanityCheck, safelySetUserBalance, StorageTestBase, TestBase } from './testBase.js'
import { FindOptionsWhere } from 'typeorm'
export const ignore = false
export const dev = true
export const dev = false
export const requires = 'storage'
export default async (T: StorageTestBase) => {
const u = await testCanCreateUser(T)
await testCanReadUser(T, u)
await testConcurrentReads(T, u)
T.storage.dbs.setDebug(true)
//T.storage.dbs.setDebug(true)
await testTransactionIsolation(T, u)
T.storage.dbs.setDebug(false)
//T.storage.dbs.setDebug(false)
await testUserCRUD(T)
await testErrorHandling(T, u)
}