Merge branch 'master' of github.com:shocknet/Lightning.Pub

This commit is contained in:
boufni95 2024-03-06 15:18:05 +01:00
commit a165559f14
8 changed files with 36 additions and 48 deletions

View file

@ -215,7 +215,8 @@ export default class {
}
} else {
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee)
this.addressPaidCb({ hash: crypto.randomBytes(32).toString("hex"), index: 0 }, req.address, req.amoutSats, true)
txId = crypto.randomBytes(32).toString("hex")
this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true)
}
if (isAppUserPayment && serviceFee > 0) {

View file

@ -68,4 +68,17 @@ export default async (settings: DbSettings, migrations: Function[]): Promise<{ s
return { source, executedMigrations }
}
return { source, executedMigrations: [] }
}
export const runFakeMigration = async (databaseFile: string, migrations: Function[]) => {
const source = await new DataSource({
type: "sqlite",
database: databaseFile,
// logging: true,
entities: [User, UserReceivingInvoice, UserReceivingAddress, AddressReceivingTransaction, UserInvoicePayment, UserTransactionPayment,
UserBasicAuth, UserEphemeralKey, Product, UserToUserPayment, Application, ApplicationUser, UserToUserPayment],
//synchronize: true,
migrations
}).initialize()
return source.runMigrations({ fake: true })
}

View file

@ -38,7 +38,7 @@ export default class {
return { executedMigrations, executedMetricsMigrations };
}
StartTransaction(exec: TX<void>) {
return this.txQueue.PushToQueue({ exec, dbTx: true })
StartTransaction(exec: TX<void>, description?: string) {
return this.txQueue.PushToQueue({ exec, dbTx: true, description })
}
}

View file

@ -72,10 +72,10 @@ export default class {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "receive_errors", event.receive_errors)
}
if (event.forward_errors_as_input) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forwar_errors_as_input", event.forward_errors_as_input)
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_input", event.forward_errors_as_input)
}
if (event.forward_errors_as_output) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forwar_errors_as_output", event.forward_errors_as_output)
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "forward_errors_as_output", event.forward_errors_as_output)
}
if (event.missed_forward_fee_as_input) {
await repo.increment({ day_unix: dayUnix, channel_id: channelId }, "missed_forward_fee_as_input", event.missed_forward_fee_as_input)

View file

@ -1,38 +1,21 @@
import { PubLogger } from '../../helpers/logger.js'
import { DbSettings } from '../db.js'
import { DbSettings, runFakeMigration } from '../db.js'
import Storage, { StorageSettings } from '../index.js'
import { Initial1703170309875 } from './1703170309875-initial.js'
import { LndMetrics1703170330183 } from './1703170330183-lnd_metrics.js'
import { ChannelRouting1709316653538 } from './1709316653538-channel_routing.js'
const allMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538]
const allMigrations = [Initial1703170309875]
const allMetricsMigrations = [LndMetrics1703170330183, ChannelRouting1709316653538]
export const TypeOrmMigrationRunner = async (log: PubLogger, storageManager: Storage, settings: DbSettings, arg: string | undefined): Promise<boolean> => {
if (arg === 'initial_migration') {
await connectAndMigrate(log, storageManager, true, settings, [Initial1703170309875], [])
if (arg === 'fake_initial_migration') {
runFakeMigration(settings.databaseFile, [Initial1703170309875])
return true
} else if (arg === 'lnd_metrics_migration') {
await connectAndMigrate(log, storageManager, true, settings, [], [LndMetrics1703170330183])
return true
} else if (arg === 'channel_routing_migration') {
await connectAndMigrate(log, storageManager, true, settings, [], [ChannelRouting1709316653538])
return true
} else if (arg === 'all_migrations') {
await connectAndMigrate(log, storageManager, true, settings, [], allMigrations)
return true
} else if (settings.migrate) {
await connectAndMigrate(log, storageManager, false, settings, [], allMigrations)
return false
}
await connectAndMigrate(log, storageManager, false, settings, [], [])
await connectAndMigrate(log, storageManager, allMigrations, allMetricsMigrations)
return false
}
const connectAndMigrate = async (log: PubLogger, storageManager: Storage, manual: boolean, settings: DbSettings, migrations: Function[], metricsMigrations: Function[]) => {
if (manual && settings.migrate) {
throw new Error("auto migration is enabled, no need to run manual migration")
}
if (migrations.length > 0) {
log("will add", migrations.length, "typeorm migrations...")
}
const connectAndMigrate = async (log: PubLogger, storageManager: Storage, migrations: Function[], metricsMigrations: Function[]) => {
const { executedMigrations, executedMetricsMigrations } = await storageManager.Connect(migrations, metricsMigrations)
if (migrations.length > 0) {
log(executedMigrations.length, "of", migrations.length, "migrations were executed correctly")
@ -43,5 +26,4 @@ const connectAndMigrate = async (log: PubLogger, storageManager: Storage, manual
log(executedMetricsMigrations.length, "of", metricsMigrations.length, "metrics migrations were executed correctly")
log(executedMetricsMigrations)
}
}

View file

@ -63,7 +63,7 @@ export default class {
linkedApplication: opts.linkedApplication,
user: await this.userStorage.GetUser(userId)
})
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false })
return this.txQueue.PushToQueue<UserReceivingAddress>({ exec: async db => db.getRepository(UserReceivingAddress).save(newUserAddress), dbTx: false, description: `add address for ${userId} linked to ${opts.linkedApplication?.app_id}: ${address} ` })
}
async FlagInvoiceAsPaid(invoice: UserReceivingInvoice, amount: number, serviceFee: number, internal: boolean, entityManager = this.DB) {
@ -101,7 +101,7 @@ export default class {
linkedApplication: options.linkedApplication,
zap_info: options.zapInfo
})
return this.txQueue.PushToQueue<UserReceivingInvoice>({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false })
return this.txQueue.PushToQueue<UserReceivingInvoice>({ exec: async db => db.getRepository(UserReceivingInvoice).save(newUserInvoice), dbTx: false, description: `add invoice for ${user.user_id} linked to ${options.linkedApplication?.app_id}: ${invoice} ` })
}
async GetAddressOwner(address: string, entityManager = this.DB): Promise<UserReceivingAddress | null> {
@ -131,7 +131,7 @@ export default class {
internal,
linkedApplication
})
return this.txQueue.PushToQueue<UserInvoicePayment>({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false })
return this.txQueue.PushToQueue<UserInvoicePayment>({ exec: async db => db.getRepository(UserInvoicePayment).save(newPayment), dbTx: false, description: `add invoice payment for ${userId} linked to ${linkedApplication.app_id}: ${invoice}, amt: ${amount} ` })
}
GetUserInvoicePayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserInvoicePayment[]> {
@ -165,7 +165,7 @@ export default class {
confs: internal ? 10 : 0,
linkedApplication
})
return this.txQueue.PushToQueue<UserTransactionPayment>({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false })
return this.txQueue.PushToQueue<UserTransactionPayment>({ exec: async db => db.getRepository(UserTransactionPayment).save(newTx), dbTx: false, description: `add tx payment for ${userId} linked to ${linkedApplication.app_id}: ${address}, amt: ${amount} ` })
}
GetUserTransactionPayments(userId: string, fromIndex: number, take = 50, entityManager = this.DB): Promise<UserTransactionPayment[]> {

View file

@ -5,6 +5,7 @@ export type TX<T> = (entityManager: EntityManager | DataSource) => Promise<T>
export type TxOperation<T> = {
exec: TX<T>
dbTx: boolean
description?: string
}
export default class {
@ -19,10 +20,9 @@ export default class {
PushToQueue<T>(op: TxOperation<T>) {
if (!this.pendingTx) {
this.log("executing item immediately")
return this.execQueueItem(op)
}
this.log("holding item in queue")
this.log("queue not empty, possibly stuck")
return new Promise<T>((res, rej) => {
this.transactionsQueue.push({ op, res, rej })
})
@ -32,16 +32,14 @@ export default class {
this.pendingTx = false
const next = this.transactionsQueue.pop()
if (!next) {
this.log("no more items in queue")
this.log("queue is clear")
return
}
try {
this.log("executing next item in queue")
const res = await this.execQueueItem(next.op)
this.log("resolving next item in queue")
if (next.op.description) this.log("done", next.op.description)
next.res(res)
} catch (err: any) {
this.log("rejecting next item in queue")
next.rej(err.message)
}
}
@ -52,21 +50,17 @@ export default class {
}
this.pendingTx = true
if (op.dbTx) {
this.log("executing item transaction")
return this.doTransaction(op.exec)
}
this.log("executing item operation")
return this.doOperation(op.exec)
}
async doOperation<T>(exec: TX<T>) {
try {
const res = await exec(this.DB)
this.log("executing item operation done")
this.execNextInQueue()
return res
} catch (err) {
this.log("executing item operation failed")
this.execNextInQueue()
throw err
}
@ -77,11 +71,9 @@ export default class {
return this.DB.transaction(async tx => {
try {
const res = await exec(tx)
this.log("executing item transaction done")
this.execNextInQueue()
return res
} catch (err) {
this.log("executing item transaction failed")
this.execNextInQueue()
throw err
}

View file

@ -74,7 +74,7 @@ export default class {
if (!res.affected) {
throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing
}
getLogger({ userId: userId })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
}
async DecrementUserBalance(userId: string, decrement: number, entityManager = this.DB) {
const user = await this.GetUser(userId, entityManager)
@ -87,7 +87,7 @@ export default class {
if (!res.affected) {
throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing
}
getLogger({ userId: userId })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
}
async UpdateUser(userId: string, update: Partial<User>, entityManager = this.DB) {