decrement instead of lock
This commit is contained in:
parent
11879fbfb4
commit
8918703204
5 changed files with 77 additions and 30 deletions
|
|
@ -3,15 +3,18 @@ import Nostr from "./services/nostr/index.js"
|
||||||
import { NostrSend, NostrSettings } from "./services/nostr/handler.js"
|
import { NostrSend, NostrSettings } from "./services/nostr/handler.js"
|
||||||
import * as Types from '../proto/autogenerated/ts/types.js'
|
import * as Types from '../proto/autogenerated/ts/types.js'
|
||||||
import NewNostrTransport, { NostrRequest } from '../proto/autogenerated/ts/nostr_transport.js';
|
import NewNostrTransport, { NostrRequest } from '../proto/autogenerated/ts/nostr_transport.js';
|
||||||
|
import { getLogger } from "./services/helpers/logger.js";
|
||||||
|
|
||||||
export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSettings: NostrSettings): { Stop: () => void, Send: NostrSend } => {
|
export default (serverMethods: Types.ServerMethods, mainHandler: Main, nostrSettings: NostrSettings): { Stop: () => void, Send: NostrSend } => {
|
||||||
|
const log = getLogger({})
|
||||||
const nostrTransport = NewNostrTransport(serverMethods, {
|
const nostrTransport = NewNostrTransport(serverMethods, {
|
||||||
NostrUserAuthGuard: async (appId, pub) => {
|
NostrUserAuthGuard: async (appId, pub) => {
|
||||||
const app = await mainHandler.storage.applicationStorage.GetApplication(appId || "")
|
const app = await mainHandler.storage.applicationStorage.GetApplication(appId || "")
|
||||||
let nostrUser = await mainHandler.storage.applicationStorage.GetOrCreateNostrAppUser(app, pub || "")
|
let nostrUser = await mainHandler.storage.applicationStorage.GetOrCreateNostrAppUser(app, pub || "")
|
||||||
return { user_id: nostrUser.user.user_id, app_user_id: nostrUser.identifier, app_id: appId || "" }
|
return { user_id: nostrUser.user.user_id, app_user_id: nostrUser.identifier, app_id: appId || "" }
|
||||||
},
|
},
|
||||||
metricsCallback: metrics => mainHandler.settings.recordPerformance ? mainHandler.metricsManager.AddMetrics(metrics) : null
|
metricsCallback: metrics => mainHandler.settings.recordPerformance ? mainHandler.metricsManager.AddMetrics(metrics) : null,
|
||||||
|
logger: { log, error: err => log("ERROR", err) }
|
||||||
})
|
})
|
||||||
const nostr = new Nostr(nostrSettings, event => {
|
const nostr = new Nostr(nostrSettings, event => {
|
||||||
let j: NostrRequest
|
let j: NostrRequest
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ export default class {
|
||||||
invoicePaidCb: InvoicePaidCb
|
invoicePaidCb: InvoicePaidCb
|
||||||
newBlockCb: NewBlockCb
|
newBlockCb: NewBlockCb
|
||||||
htlcCb: HtlcCb
|
htlcCb: HtlcCb
|
||||||
log = getLogger({})
|
log = getLogger({ appName: 'lndManager' })
|
||||||
constructor(settings: LndSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb, newBlockCb: NewBlockCb, htlcCb: HtlcCb) {
|
constructor(settings: LndSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb, newBlockCb: NewBlockCb, htlcCb: HtlcCb) {
|
||||||
this.settings = settings
|
this.settings = settings
|
||||||
this.addressPaidCb = addressPaidCb
|
this.addressPaidCb = addressPaidCb
|
||||||
|
|
@ -251,11 +251,13 @@ export default class {
|
||||||
}
|
}
|
||||||
async PayInvoice(invoice: string, amount: number, feeLimit: number): Promise<PaidInvoice> {
|
async PayInvoice(invoice: string, amount: number, feeLimit: number): Promise<PaidInvoice> {
|
||||||
await this.Health()
|
await this.Health()
|
||||||
|
this.log("paying invoice", invoice, "for", amount, "sats")
|
||||||
const abortController = new AbortController()
|
const abortController = new AbortController()
|
||||||
const req = PayInvoiceReq(invoice, amount, feeLimit)
|
const req = PayInvoiceReq(invoice, amount, feeLimit)
|
||||||
const stream = this.router.sendPaymentV2(req, { abort: abortController.signal })
|
const stream = this.router.sendPaymentV2(req, { abort: abortController.signal })
|
||||||
return new Promise((res, rej) => {
|
return new Promise((res, rej) => {
|
||||||
stream.responses.onError(error => {
|
stream.responses.onError(error => {
|
||||||
|
this.log("invoice payment failed", error)
|
||||||
rej(error)
|
rej(error)
|
||||||
})
|
})
|
||||||
stream.responses.onMessage(payment => {
|
stream.responses.onMessage(payment => {
|
||||||
|
|
@ -285,8 +287,9 @@ export default class {
|
||||||
|
|
||||||
async PayAddress(address: string, amount: number, satPerVByte: number, label = ""): Promise<SendCoinsResponse> {
|
async PayAddress(address: string, amount: number, satPerVByte: number, label = ""): Promise<SendCoinsResponse> {
|
||||||
await this.Health()
|
await this.Health()
|
||||||
|
this.log("sending chain TX for", amount, "sats", "to", address)
|
||||||
const res = await this.lightning.sendCoins(SendCoinsReq(address, amount, satPerVByte, label), DeadLineMetadata())
|
const res = await this.lightning.sendCoins(SendCoinsReq(address, amount, satPerVByte, label), DeadLineMetadata())
|
||||||
this.log("sent chain TX for", amount, "sats")
|
this.log("sent chain TX for", amount, "sats", "to", address)
|
||||||
return res.response
|
return res.response
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ export default class {
|
||||||
lnd: LightningHandler
|
lnd: LightningHandler
|
||||||
addressPaidCb: AddressPaidCb
|
addressPaidCb: AddressPaidCb
|
||||||
invoicePaidCb: InvoicePaidCb
|
invoicePaidCb: InvoicePaidCb
|
||||||
|
log = getLogger({ appName: "PaymentManager" })
|
||||||
constructor(storage: Storage, lnd: LightningHandler, settings: MainSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb) {
|
constructor(storage: Storage, lnd: LightningHandler, settings: MainSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb) {
|
||||||
this.storage = storage
|
this.storage = storage
|
||||||
this.settings = settings
|
this.settings = settings
|
||||||
|
|
@ -117,20 +118,6 @@ export default class {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async lockUserWithMinBalance(userId: string, minBalance: number) {
|
|
||||||
return this.storage.StartTransaction(async tx => {
|
|
||||||
const user = await this.storage.userStorage.GetUser(userId, tx)
|
|
||||||
if (user.locked) {
|
|
||||||
throw new Error("user is already withdrawing")
|
|
||||||
}
|
|
||||||
if (user.balance_sats < minBalance) {
|
|
||||||
throw new Error("insufficient balance")
|
|
||||||
}
|
|
||||||
// this call will fail if the user is already locked
|
|
||||||
await this.storage.userStorage.LockUser(userId, tx)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
GetMaxPayableInvoice(balance: number, appUser: boolean): number {
|
GetMaxPayableInvoice(balance: number, appUser: boolean): number {
|
||||||
let maxWithinServiceFee = 0
|
let maxWithinServiceFee = 0
|
||||||
if (appUser) {
|
if (appUser) {
|
||||||
|
|
@ -162,17 +149,20 @@ export default class {
|
||||||
const internalInvoice = await this.storage.paymentStorage.GetInvoiceOwner(req.invoice)
|
const internalInvoice = await this.storage.paymentStorage.GetInvoiceOwner(req.invoice)
|
||||||
let payment: PaidInvoice | null = null
|
let payment: PaidInvoice | null = null
|
||||||
if (!internalInvoice) {
|
if (!internalInvoice) {
|
||||||
|
this.log("paying external invoice", req.invoice)
|
||||||
const routingFeeLimit = this.lnd.GetFeeLimitAmount(payAmount)
|
const routingFeeLimit = this.lnd.GetFeeLimitAmount(payAmount)
|
||||||
await this.lockUserWithMinBalance(userId, totalAmountToDecrement + routingFeeLimit)
|
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, req.invoice)
|
||||||
try {
|
try {
|
||||||
payment = await this.lnd.PayInvoice(req.invoice, req.amount, routingFeeLimit)
|
payment = await this.lnd.PayInvoice(req.invoice, req.amount, routingFeeLimit)
|
||||||
await this.storage.userStorage.DecrementUserBalance(userId, totalAmountToDecrement + payment.feeSat, req.invoice)
|
if (routingFeeLimit - payment.feeSat > 0) {
|
||||||
await this.storage.userStorage.UnlockUser(userId)
|
await this.storage.userStorage.IncrementUserBalance(userId, routingFeeLimit - payment.feeSat, "routing_fee_refund")
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
await this.storage.userStorage.UnlockUser(userId)
|
await this.storage.userStorage.IncrementUserBalance(userId, totalAmountToDecrement + routingFeeLimit, "payment_refund")
|
||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
this.log("paying internal invoice", req.invoice)
|
||||||
if (internalInvoice.paid_at_unix > 0) {
|
if (internalInvoice.paid_at_unix > 0) {
|
||||||
throw new Error("this invoice was already paid")
|
throw new Error("this invoice was already paid")
|
||||||
}
|
}
|
||||||
|
|
@ -197,6 +187,7 @@ export default class {
|
||||||
|
|
||||||
|
|
||||||
async PayAddress(ctx: Types.UserContext, req: Types.PayAddressRequest): Promise<Types.PayAddressResponse> {
|
async PayAddress(ctx: Types.UserContext, req: Types.PayAddressRequest): Promise<Types.PayAddressResponse> {
|
||||||
|
throw new Error("address payment currently disabled, use Lightning instead")
|
||||||
const { blockHeight } = await this.lnd.GetInfo()
|
const { blockHeight } = await this.lnd.GetInfo()
|
||||||
const app = await this.storage.applicationStorage.GetApplication(ctx.app_id)
|
const app = await this.storage.applicationStorage.GetApplication(ctx.app_id)
|
||||||
const serviceFee = this.getServiceFee(Types.UserOperationType.OUTGOING_TX, req.amoutSats, false)
|
const serviceFee = this.getServiceFee(Types.UserOperationType.OUTGOING_TX, req.amoutSats, false)
|
||||||
|
|
@ -205,21 +196,21 @@ export default class {
|
||||||
let txId = ""
|
let txId = ""
|
||||||
let chainFees = 0
|
let chainFees = 0
|
||||||
if (!internalAddress) {
|
if (!internalAddress) {
|
||||||
|
this.log("paying external address")
|
||||||
const estimate = await this.lnd.EstimateChainFees(req.address, req.amoutSats, 1)
|
const estimate = await this.lnd.EstimateChainFees(req.address, req.amoutSats, 1)
|
||||||
const vBytes = Math.ceil(Number(estimate.feeSat / estimate.satPerVbyte))
|
const vBytes = Math.ceil(Number(estimate.feeSat / estimate.satPerVbyte))
|
||||||
chainFees = vBytes * req.satsPerVByte
|
chainFees = vBytes * req.satsPerVByte
|
||||||
const total = req.amoutSats + chainFees
|
const total = req.amoutSats + chainFees
|
||||||
await this.lockUserWithMinBalance(ctx.user_id, total + serviceFee)
|
this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee, req.address)
|
||||||
try {
|
try {
|
||||||
const payment = await this.lnd.PayAddress(req.address, req.amoutSats, req.satsPerVByte)
|
const payment = await this.lnd.PayAddress(req.address, req.amoutSats, req.satsPerVByte)
|
||||||
txId = payment.txid
|
txId = payment.txid
|
||||||
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, total + serviceFee, req.address)
|
|
||||||
await this.storage.userStorage.UnlockUser(ctx.user_id)
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
await this.storage.userStorage.UnlockUser(ctx.user_id)
|
await this.storage.userStorage.IncrementUserBalance(ctx.user_id, total + serviceFee, req.address)
|
||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
this.log("paying internal address")
|
||||||
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee, req.address)
|
await this.storage.userStorage.DecrementUserBalance(ctx.user_id, req.amoutSats + serviceFee, req.address)
|
||||||
txId = crypto.randomBytes(32).toString("hex")
|
txId = crypto.randomBytes(32).toString("hex")
|
||||||
this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true)
|
this.addressPaidCb({ hash: txId, index: 0 }, req.address, req.amoutSats, true)
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,12 @@ export type LoggedEvent = {
|
||||||
data: string
|
data: string
|
||||||
amount: number
|
amount: number
|
||||||
}
|
}
|
||||||
|
type TimeEntry = {
|
||||||
|
timestamp: number
|
||||||
|
amount: number
|
||||||
|
balance: number
|
||||||
|
userId: string
|
||||||
|
}
|
||||||
const columns = ["timestampMs", "userId", "appUserId", "appId", "balance", "type", "data", "amount"]
|
const columns = ["timestampMs", "userId", "appUserId", "appId", "balance", "type", "data", "amount"]
|
||||||
type StringerWrite = (chunk: any, cb: (error: Error | null | undefined) => void) => boolean
|
type StringerWrite = (chunk: any, cb: (error: Error | null | undefined) => void) => boolean
|
||||||
export default class EventsLogManager {
|
export default class EventsLogManager {
|
||||||
|
|
@ -38,20 +44,21 @@ export default class EventsLogManager {
|
||||||
this.write([Date.now(), e.userId, e.appUserId, e.appId, e.balance, e.type, e.data, e.amount])
|
this.write([Date.now(), e.userId, e.appUserId, e.appId, e.balance, e.type, e.data, e.amount])
|
||||||
}
|
}
|
||||||
|
|
||||||
GetAllLogs = async (): Promise<LoggedEvent[]> => {
|
GetAllLogs = async (path?: string): Promise<LoggedEvent[]> => {
|
||||||
const logs = await this.Read()
|
const logs = await this.Read(path)
|
||||||
this.log("found", logs.length, "event logs")
|
this.log("found", logs.length, "event logs")
|
||||||
return logs
|
return logs
|
||||||
}
|
}
|
||||||
|
|
||||||
Read = async (): Promise<LoggedEvent[]> => {
|
Read = async (path?: string): Promise<LoggedEvent[]> => {
|
||||||
const exists = fs.existsSync(eventLogPath)
|
const filePath = path ? path : eventLogPath
|
||||||
|
const exists = fs.existsSync(filePath)
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
return []
|
return []
|
||||||
}
|
}
|
||||||
return new Promise<LoggedEvent[]>((res, rej) => {
|
return new Promise<LoggedEvent[]>((res, rej) => {
|
||||||
const result: LoggedEvent[] = []
|
const result: LoggedEvent[] = []
|
||||||
fs.createReadStream(eventLogPath)
|
fs.createReadStream(filePath)
|
||||||
.pipe(parse({ delimiter: ",", from_line: 2 }))
|
.pipe(parse({ delimiter: ",", from_line: 2 }))
|
||||||
.on('data', data => { result.push(this.parseEvent(data)) })
|
.on('data', data => { result.push(this.parseEvent(data)) })
|
||||||
.on('error', err => { rej(err) })
|
.on('error', err => { rej(err) })
|
||||||
|
|
@ -73,4 +80,44 @@ export default class EventsLogManager {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ignoredKeys = ['fees', "bc1qkafgye62h2zhzlwtrga6jytz2p7af4lg8fwqt6", "6eb1d279f95377b8514aad3b79ff1cddbe9f5d3b95653b55719850df9df63821", "b11585413bfa7bf65a5f1263e3100e53b4c9afe6b5d8c94c6b85017dfcbf3d49"]
|
||||||
|
createTimeSeries = (events: LoggedEvent[]) => {
|
||||||
|
const dataAppIds: Record<string, string> = {}
|
||||||
|
const order: { timestamp: number, data: string, type: 'inc' | 'dec' }[] = []
|
||||||
|
const incrementEntries: Record<string, TimeEntry> = {}
|
||||||
|
const decrementEntries: Record<string, TimeEntry> = {}
|
||||||
|
events.forEach(e => {
|
||||||
|
if (this.ignoredKeys.includes(e.data)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (e.type === 'balance_increment') {
|
||||||
|
if (incrementEntries[e.data]) {
|
||||||
|
throw new Error("increment duplicate! " + e.data)
|
||||||
|
}
|
||||||
|
incrementEntries[e.data] = { timestamp: e.timestampMs, balance: e.balance, amount: e.amount, userId: e.userId }
|
||||||
|
order.push({ timestamp: e.timestampMs, data: e.data, type: 'inc' })
|
||||||
|
} else if (e.type === 'balance_decrement') {
|
||||||
|
if (decrementEntries[e.data]) {
|
||||||
|
throw new Error("decrement duplicate! " + e.data)
|
||||||
|
}
|
||||||
|
decrementEntries[e.data] = { timestamp: e.timestampMs, balance: e.balance, amount: e.amount, userId: e.userId }
|
||||||
|
order.push({ timestamp: e.timestampMs, data: e.data, type: 'dec' })
|
||||||
|
} else if (e.appId) {
|
||||||
|
dataAppIds[e.data] = e.appId
|
||||||
|
}
|
||||||
|
})
|
||||||
|
const full = order.map(o => {
|
||||||
|
const { type } = o
|
||||||
|
if (type === 'inc') {
|
||||||
|
const entry = incrementEntries[o.data]
|
||||||
|
return { timestamp: entry.timestamp, amount: entry.amount, balance: entry.balance, userId: entry.userId, appId: dataAppIds[o.data], internal: !!decrementEntries[o.data] }
|
||||||
|
} else {
|
||||||
|
const entry = decrementEntries[o.data]
|
||||||
|
return { timestamp: entry.timestamp, amount: -entry.amount, balance: entry.balance, userId: entry.userId, appId: dataAppIds[o.data], internal: !!incrementEntries[o.data] }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
full.sort((a, b) => a.timestamp - b.timestamp)
|
||||||
|
fs.writeFileSync("timeSeries.json", JSON.stringify(full, null, 2))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -75,6 +75,7 @@ export default class {
|
||||||
user_id: userId,
|
user_id: userId,
|
||||||
}, "balance_sats", increment)
|
}, "balance_sats", increment)
|
||||||
if (!res.affected) {
|
if (!res.affected) {
|
||||||
|
getLogger({ userId: userId, appName: "balanceUpdates" })("user unaffected by increment")
|
||||||
throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing
|
throw new Error("unaffected balance increment for " + userId) // TODO: fix logs doxing
|
||||||
}
|
}
|
||||||
getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
|
getLogger({ userId: userId, appName: "balanceUpdates" })("incremented balance from", user.balance_sats, "sats, by", increment, "sats")
|
||||||
|
|
@ -83,12 +84,14 @@ export default class {
|
||||||
async DecrementUserBalance(userId: string, decrement: number, reason: string, entityManager = this.DB) {
|
async DecrementUserBalance(userId: string, decrement: number, reason: string, entityManager = this.DB) {
|
||||||
const user = await this.GetUser(userId, entityManager)
|
const user = await this.GetUser(userId, entityManager)
|
||||||
if (!user || user.balance_sats < decrement) {
|
if (!user || user.balance_sats < decrement) {
|
||||||
|
getLogger({ userId: userId, appName: "balanceUpdates" })("user to decrement not found")
|
||||||
throw new Error("not enough balance to decrement")
|
throw new Error("not enough balance to decrement")
|
||||||
}
|
}
|
||||||
const res = await entityManager.getRepository(User).decrement({
|
const res = await entityManager.getRepository(User).decrement({
|
||||||
user_id: userId,
|
user_id: userId,
|
||||||
}, "balance_sats", decrement)
|
}, "balance_sats", decrement)
|
||||||
if (!res.affected) {
|
if (!res.affected) {
|
||||||
|
getLogger({ userId: userId, appName: "balanceUpdates" })("user unaffected by decrement")
|
||||||
throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing
|
throw new Error("unaffected balance decrement for " + userId) // TODO: fix logs doxing
|
||||||
}
|
}
|
||||||
getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
|
getLogger({ userId: userId, appName: "balanceUpdates" })("decremented balance from", user.balance_sats, "sats, by", decrement, "sats")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue