commit
5f0c778434
10 changed files with 183 additions and 8 deletions
|
|
@ -44,3 +44,9 @@ MIGRATE_DB=false
|
|||
RECORD_PERFORMANCE=true
|
||||
SKIP_SANITY_CHECK=false
|
||||
DISABLE_EXTERNAL_PAYMENTS=false
|
||||
|
||||
# Max difference between users balance and LND balance since beginning of app execution
|
||||
WATCHDOG_MAX_DIFF_SATS=10000
|
||||
|
||||
# Max difference between users balance and LND balance after each payment
|
||||
WATCHDOG_MAX_UPDATE_DIFF_SATS=1000
|
||||
|
|
@ -10,6 +10,17 @@ export const EnvMustBeInteger = (name: string): number => {
|
|||
}
|
||||
return +env
|
||||
}
|
||||
export const EnvCanBeInteger = (name: string, defaultValue = 0): number => {
|
||||
const env = process.env[name]
|
||||
if (!env) {
|
||||
return defaultValue
|
||||
}
|
||||
const envNum = +env
|
||||
if (isNaN(envNum) || !Number.isInteger(envNum)) {
|
||||
throw new Error(`${name} ENV must be an integer number or nothing`);
|
||||
}
|
||||
return envNum
|
||||
}
|
||||
export const EnvCanBeBoolean = (name: string): boolean => {
|
||||
const env = process.env[name]
|
||||
if (!env) return false
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ export interface LightningHandler {
|
|||
GetForwardingHistory(indexOffset: number): Promise<{ fee: number, chanIdIn: string, chanIdOut: string, timestampNs: number, offset: number }[]>
|
||||
GetAllPaidInvoices(max: number): Promise<ListInvoiceResponse>
|
||||
GetAllPayments(max: number): Promise<ListPaymentsResponse>
|
||||
LockOutgoingOperations(): void
|
||||
UnlockOutgoingOperations(): void
|
||||
}
|
||||
|
||||
export default (settings: LndSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb, newBlockCb: NewBlockCb, htlcCb: HtlcCb): LightningHandler => {
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ export default class {
|
|||
newBlockCb: NewBlockCb
|
||||
htlcCb: HtlcCb
|
||||
log = getLogger({ appName: 'lndManager' })
|
||||
outgoingOpsLocked = false
|
||||
constructor(settings: LndSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb, newBlockCb: NewBlockCb, htlcCb: HtlcCb) {
|
||||
this.settings = settings
|
||||
this.addressPaidCb = addressPaidCb
|
||||
|
|
@ -60,6 +61,14 @@ export default class {
|
|||
this.router = new RouterClient(transport)
|
||||
this.chainNotifier = new ChainNotifierClient(transport)
|
||||
}
|
||||
|
||||
LockOutgoingOperations(): void {
|
||||
this.outgoingOpsLocked = true
|
||||
}
|
||||
UnlockOutgoingOperations(): void {
|
||||
this.outgoingOpsLocked = false
|
||||
}
|
||||
|
||||
SetMockInvoiceAsPaid(invoice: string, amount: number): Promise<void> {
|
||||
throw new Error("SetMockInvoiceAsPaid only available in mock mode")
|
||||
}
|
||||
|
|
@ -251,6 +260,10 @@ export default class {
|
|||
return { local: r.localBalance ? Number(r.localBalance.sat) : 0, remote: r.remoteBalance ? Number(r.remoteBalance.sat) : 0 }
|
||||
}
|
||||
async PayInvoice(invoice: string, amount: number, feeLimit: number): Promise<PaidInvoice> {
|
||||
if (this.outgoingOpsLocked) {
|
||||
this.log("outgoing ops locked, rejecting payment request")
|
||||
throw new Error("lnd node is currently out of sync")
|
||||
}
|
||||
await this.Health()
|
||||
this.log("paying invoice", invoice, "for", amount, "sats")
|
||||
const abortController = new AbortController()
|
||||
|
|
@ -287,6 +300,10 @@ export default class {
|
|||
}
|
||||
|
||||
async PayAddress(address: string, amount: number, satPerVByte: number, label = ""): Promise<SendCoinsResponse> {
|
||||
if (this.outgoingOpsLocked) {
|
||||
this.log("outgoing ops locked, rejecting payment request")
|
||||
throw new Error("lnd node is currently out of sync")
|
||||
}
|
||||
await this.Health()
|
||||
this.log("sending chain TX for", amount, "sats", "to", address)
|
||||
const res = await this.lightning.sendCoins(SendCoinsReq(address, amount, satPerVByte, label), DeadLineMetadata())
|
||||
|
|
|
|||
|
|
@ -131,6 +131,12 @@ export default class {
|
|||
async GetAllPayments(max: number): Promise<ListPaymentsResponse> {
|
||||
throw new Error("not implemented")
|
||||
}
|
||||
LockOutgoingOperations() {
|
||||
throw new Error("not implemented")
|
||||
}
|
||||
UnlockOutgoingOperations() {
|
||||
throw new Error("not implemented")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
111
src/services/lnd/watchdog.ts
Normal file
111
src/services/lnd/watchdog.ts
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
import { EnvCanBeInteger } from "../helpers/envParser.js";
|
||||
import { getLogger } from "../helpers/logger.js";
|
||||
import { LightningHandler } from "./index.js";
|
||||
export type WatchdogSettings = {
|
||||
maxDiffSats: number
|
||||
}
|
||||
export const LoadWatchdogSettingsFromEnv = (test = false): WatchdogSettings => {
|
||||
return {
|
||||
maxDiffSats: EnvCanBeInteger("WATCHDOG_MAX_DIFF_SATS")
|
||||
}
|
||||
}
|
||||
export class Watchdog {
|
||||
initialLndBalance: number;
|
||||
initialUsersBalance: number;
|
||||
lnd: LightningHandler;
|
||||
settings: WatchdogSettings;
|
||||
log = getLogger({ appName: "watchdog" })
|
||||
constructor(settings: WatchdogSettings, lnd: LightningHandler) {
|
||||
this.lnd = lnd;
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
SeedLndBalance = async (totalUsersBalance: number) => {
|
||||
this.initialLndBalance = await this.getTotalLndBalance()
|
||||
this.initialUsersBalance = totalUsersBalance
|
||||
}
|
||||
|
||||
getTotalLndBalance = async () => {
|
||||
const { channelsBalance, confirmedBalance } = await this.lnd.GetBalance()
|
||||
return confirmedBalance + channelsBalance.reduce((acc, { localBalanceSats }) => acc + localBalanceSats, 0)
|
||||
}
|
||||
|
||||
checkBalanceUpdate = (deltaLnd: number, deltaUsers: number) => {
|
||||
this.log("LND balance update:", deltaLnd, "sats since app startup")
|
||||
this.log("Users balance update:", deltaUsers, "sats since app startup")
|
||||
|
||||
const result = this.checkDeltas(deltaLnd, deltaUsers)
|
||||
switch (result.type) {
|
||||
case 'mismatch':
|
||||
if (deltaLnd < 0) {
|
||||
this.log("WARNING! LND balance decreased while users balance increased creating a difference of", result.absoluteDiff, "sats")
|
||||
if (result.absoluteDiff > this.settings.maxDiffSats) {
|
||||
this.log("Difference is too big for an update, locking outgoing operations")
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
this.log("LND balance increased while users balance decreased creating a difference of", result.absoluteDiff, "sats, could be caused by data loss, or liquidity injection")
|
||||
return false
|
||||
}
|
||||
break
|
||||
case 'negative':
|
||||
if (Math.abs(deltaLnd) > Math.abs(deltaUsers)) {
|
||||
this.log("WARNING! LND balance decreased more than users balance with a difference of", result.absoluteDiff, "sats")
|
||||
if (result.absoluteDiff > this.settings.maxDiffSats) {
|
||||
this.log("Difference is too big for an update, locking outgoing operations")
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
this.log("LND balance decreased less than users balance with a difference of", result.absoluteDiff, "sats, could be caused by data loss, or liquidity injection")
|
||||
return false
|
||||
}
|
||||
break
|
||||
case 'positive':
|
||||
if (deltaLnd < deltaUsers) {
|
||||
this.log("WARNING! LND balance increased less than users balance with a difference of", result.absoluteDiff, "sats")
|
||||
if (result.absoluteDiff > this.settings.maxDiffSats) {
|
||||
this.log("Difference is too big for an update, locking outgoing operations")
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
this.log("LND balance increased more than users balance with a difference of", result.absoluteDiff, "sats, could be caused by data loss, or liquidity injection")
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
PaymentRequested = async (totalUsersBalance: number) => {
|
||||
this.log("Payment requested, checking balance")
|
||||
const totalLndBalance = await this.getTotalLndBalance()
|
||||
const deltaLnd = totalLndBalance - this.initialLndBalance
|
||||
const deltaUsers = totalUsersBalance - this.initialUsersBalance
|
||||
const deny = this.checkBalanceUpdate(deltaLnd, deltaUsers)
|
||||
if (deny) {
|
||||
this.log("Balance mismatch detected in absolute update, locking outgoing operations")
|
||||
this.lnd.LockOutgoingOperations()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
checkDeltas = (deltaLnd: number, deltaUsers: number): DeltaCheckResult => {
|
||||
if (deltaLnd < 0) {
|
||||
if (deltaUsers < 0) {
|
||||
const diff = Math.abs(deltaLnd - deltaUsers)
|
||||
return { type: 'negative', absoluteDiff: diff, relativeDiff: diff / Math.max(deltaLnd, deltaUsers) }
|
||||
} else {
|
||||
const diff = Math.abs(deltaLnd) + deltaUsers
|
||||
return { type: 'mismatch', absoluteDiff: diff }
|
||||
}
|
||||
} else {
|
||||
if (deltaUsers < 0) {
|
||||
const diff = deltaLnd + Math.abs(deltaUsers)
|
||||
return { type: 'mismatch', absoluteDiff: diff }
|
||||
} else {
|
||||
const diff = Math.abs(deltaLnd - deltaUsers)
|
||||
return { type: 'positive', absoluteDiff: diff, relativeDiff: diff / Math.max(deltaLnd, deltaUsers) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
type DeltaCheckResult = { type: 'negative' | 'positive', absoluteDiff: number, relativeDiff: number } | { type: 'mismatch', absoluteDiff: number }
|
||||
|
|
@ -90,12 +90,12 @@ export default class {
|
|||
if (!updateResult.affected) {
|
||||
throw new Error("unable to flag chain transaction as paid")
|
||||
}
|
||||
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, amount - serviceFee, userAddress.address, tx)
|
||||
const addressData = `${userAddress.address}:${tx_hash}`
|
||||
this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount })
|
||||
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, amount - serviceFee, addressData, tx)
|
||||
if (serviceFee > 0) {
|
||||
await this.storage.userStorage.IncrementUserBalance(userAddress.linkedApplication.owner.user_id, serviceFee, 'fees', tx)
|
||||
}
|
||||
const addressData = `${userAddress.address}:${tx_hash}`
|
||||
this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount })
|
||||
const operationId = `${Types.UserOperationType.INCOMING_TX}-${serialId}`
|
||||
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: serviceFee, confirmed: true, tx_hash: c.tx.tx_hash, internal: c.tx.internal }
|
||||
this.sendOperationToNostr(userAddress.linkedApplication!, userAddress.user.user_id, op)
|
||||
|
|
@ -125,12 +125,13 @@ export default class {
|
|||
// This call will fail if the transaction is already registered
|
||||
const addedTx = await this.storage.paymentStorage.AddAddressReceivingTransaction(userAddress, txOutput.hash, txOutput.index, amount, fee, internal, blockHeight, tx)
|
||||
if (internal) {
|
||||
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, addedTx.paid_amount - fee, userAddress.address, tx)
|
||||
const addressData = `${address}:${txOutput.hash}`
|
||||
this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount })
|
||||
await this.storage.userStorage.IncrementUserBalance(userAddress.user.user_id, addedTx.paid_amount - fee, addressData, tx)
|
||||
if (fee > 0) {
|
||||
await this.storage.userStorage.IncrementUserBalance(userAddress.linkedApplication.owner.user_id, fee, 'fees', tx)
|
||||
}
|
||||
const addressData = `${address}:${txOutput.hash}`
|
||||
this.storage.eventsLog.LogEvent({ type: 'address_paid', userId: userAddress.user.user_id, appId: userAddress.linkedApplication.app_id, appUserId: "", balance: userAddress.user.balance_sats, data: addressData, amount })
|
||||
|
||||
}
|
||||
const operationId = `${Types.UserOperationType.INCOMING_TX}-${addedTx.serial_id}`
|
||||
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_TX, identifier: userAddress.address, operationId, network_fee: 0, service_fee: fee, confirmed: internal, tx_hash: txOutput.hash, internal: false }
|
||||
|
|
@ -160,11 +161,11 @@ export default class {
|
|||
}
|
||||
try {
|
||||
await this.storage.paymentStorage.FlagInvoiceAsPaid(userInvoice, amount, fee, internal, tx)
|
||||
this.storage.eventsLog.LogEvent({ type: 'invoice_paid', userId: userInvoice.user.user_id, appId: userInvoice.linkedApplication.app_id, appUserId: "", balance: userInvoice.user.balance_sats, data: paymentRequest, amount })
|
||||
await this.storage.userStorage.IncrementUserBalance(userInvoice.user.user_id, amount - fee, userInvoice.invoice, tx)
|
||||
if (fee > 0) {
|
||||
await this.storage.userStorage.IncrementUserBalance(userInvoice.linkedApplication.owner.user_id, fee, 'fees', tx)
|
||||
}
|
||||
this.storage.eventsLog.LogEvent({ type: 'invoice_paid', userId: userInvoice.user.user_id, appId: userInvoice.linkedApplication.app_id, appUserId: "", balance: userInvoice.user.balance_sats, data: paymentRequest, amount })
|
||||
await this.triggerPaidCallback(log, userInvoice.callbackUrl)
|
||||
const operationId = `${Types.UserOperationType.INCOMING_INVOICE}-${userInvoice.serial_id}`
|
||||
const op = { amount, paidAtUnix: Date.now() / 1000, inbound: true, type: Types.UserOperationType.INCOMING_INVOICE, identifier: userInvoice.invoice, operationId, network_fee: 0, service_fee: fee, confirmed: true, tx_hash: "", internal }
|
||||
|
|
@ -282,6 +283,9 @@ export default class {
|
|||
throw new Error("sanity check on balance failed, expected: " + u.updatedBalance + " found: " + user.balance_sats)
|
||||
}
|
||||
}))
|
||||
|
||||
const total = await this.storage.paymentStorage.GetTotalUsersBalance()
|
||||
return total || 0
|
||||
}
|
||||
|
||||
checkUserEntry(e: LoggedEvent, u: { ts: number, updatedBalance: number } | undefined) {
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import { SendCoinsResponse } from '../../../proto/lnd/lightning.js'
|
|||
import { Event, verifiedSymbol, verifySignature } from '../nostr/tools/event.js'
|
||||
import { AddressReceivingTransaction } from '../storage/entity/AddressReceivingTransaction.js'
|
||||
import { UserTransactionPayment } from '../storage/entity/UserTransactionPayment.js'
|
||||
import { Watchdog } from '../lnd/watchdog.js'
|
||||
interface UserOperationInfo {
|
||||
serial_id: number
|
||||
paid_amount: number
|
||||
|
|
@ -45,10 +46,12 @@ export default class {
|
|||
addressPaidCb: AddressPaidCb
|
||||
invoicePaidCb: InvoicePaidCb
|
||||
log = getLogger({ appName: "PaymentManager" })
|
||||
watchDog: Watchdog
|
||||
constructor(storage: Storage, lnd: LightningHandler, settings: MainSettings, addressPaidCb: AddressPaidCb, invoicePaidCb: InvoicePaidCb) {
|
||||
this.storage = storage
|
||||
this.settings = settings
|
||||
this.lnd = lnd
|
||||
this.watchDog = new Watchdog(settings.watchDogSettings, lnd)
|
||||
this.addressPaidCb = addressPaidCb
|
||||
this.invoicePaidCb = invoicePaidCb
|
||||
}
|
||||
|
|
@ -140,8 +143,14 @@ export default class {
|
|||
}
|
||||
}
|
||||
|
||||
async WatchdogCheck() {
|
||||
const total = await this.storage.paymentStorage.GetTotalUsersBalance()
|
||||
await this.watchDog.PaymentRequested(total || 0)
|
||||
}
|
||||
|
||||
async PayInvoice(userId: string, req: Types.PayInvoiceRequest, linkedApplication: Application): Promise<Types.PayInvoiceResponse> {
|
||||
this.log("paying invoice", req.invoice, "for user", userId, "with amount", req.amount)
|
||||
await this.WatchdogCheck()
|
||||
const maybeBanned = await this.storage.userStorage.GetUser(userId)
|
||||
if (maybeBanned.locked) {
|
||||
throw new Error("user is banned, cannot send payment")
|
||||
|
|
@ -202,6 +211,7 @@ export default class {
|
|||
|
||||
async PayAddress(ctx: Types.UserContext, req: Types.PayAddressRequest): Promise<Types.PayAddressResponse> {
|
||||
throw new Error("address payment currently disabled, use Lightning instead")
|
||||
await this.WatchdogCheck()
|
||||
this.log("paying address", req.address, "for user", ctx.user_id, "with amount", req.amoutSats)
|
||||
const maybeBanned = await this.storage.userStorage.GetUser(ctx.user_id)
|
||||
if (maybeBanned.locked) {
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
import { LoadStorageSettingsFromEnv, StorageSettings } from '../storage/index.js'
|
||||
import { LndSettings } from '../lnd/settings.js'
|
||||
import { LoadWatchdogSettingsFromEnv, WatchdogSettings } from '../lnd/watchdog.js'
|
||||
import { LoadLndSettingsFromEnv } from '../lnd/index.js'
|
||||
import { EnvMustBeInteger, EnvMustBeNonEmptyString } from '../helpers/envParser.js'
|
||||
export type MainSettings = {
|
||||
storageSettings: StorageSettings,
|
||||
lndSettings: LndSettings,
|
||||
watchDogSettings: WatchdogSettings,
|
||||
jwtSecret: string
|
||||
incomingTxFee: number
|
||||
outgoingTxFee: number
|
||||
|
|
@ -22,6 +24,7 @@ export type MainSettings = {
|
|||
}
|
||||
export const LoadMainSettingsFromEnv = (): MainSettings => {
|
||||
return {
|
||||
watchDogSettings: LoadWatchdogSettingsFromEnv(),
|
||||
lndSettings: LoadLndSettingsFromEnv(),
|
||||
storageSettings: LoadStorageSettingsFromEnv(),
|
||||
jwtSecret: EnvMustBeNonEmptyString("JWT_SECRET"),
|
||||
|
|
@ -37,7 +40,8 @@ export const LoadMainSettingsFromEnv = (): MainSettings => {
|
|||
servicePort: EnvMustBeInteger("PORT"),
|
||||
recordPerformance: process.env.RECORD_PERFORMANCE === 'true' || false,
|
||||
skipSanityCheck: process.env.SKIP_SANITY_CHECK === 'true' || false,
|
||||
disableExternalPayments: process.env.DISABLE_EXTERNAL_PAYMENTS === 'true' || false
|
||||
disableExternalPayments: process.env.DISABLE_EXTERNAL_PAYMENTS === 'true' || false,
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -360,4 +360,8 @@ export default class {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async GetTotalUsersBalance(entityManager = this.DB) {
|
||||
return entityManager.getRepository(User).sum("balance_sats")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue