close stream when socket disconnects

This commit is contained in:
hatim boufnichel 2020-09-08 14:13:14 +02:00
parent 48c7d6d3cc
commit bb599e531a

View file

@ -111,9 +111,9 @@ module.exports = (
}
}
const onNewInvoice = socket => {
const onNewInvoice = (socket, subID) => {
const { lightning } = LightningServices.services
logger.warn('Subscribing to invoices socket...')
logger.warn('Subscribing to invoices socket...' + subID)
const stream = lightning.subscribeInvoices({})
stream.on('data', data => {
logger.info('[SOCKET] New invoice data:', data)
@ -125,46 +125,54 @@ module.exports = (
process.nextTick(() => onNewInvoice(socket))
})
stream.on('error', err => {
logger.error('New invoice stream error:', err)
logger.error('New invoice stream error:' + subID, err)
})
stream.on('status', status => {
logger.warn('New invoice stream status:', status)
logger.warn('New invoice stream status:' + subID, status)
if (status.code === 14) {
// Prevents call stack overflow exceptions
logger.error(
'[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...'
)
process.nextTick(() => setTimeout(() => onNewInvoice(socket), 30000))
process.nextTick(() =>
setTimeout(() => onNewInvoice(socket, subID), 30000)
)
}
})
return () => {
stream.cancel()
}
}
const onNewTransaction = socket => {
const onNewTransaction = (socket, subID) => {
const { lightning } = LightningServices.services
const stream = lightning.subscribeTransactions({})
logger.warn('Subscribing to transactions socket...')
logger.warn('Subscribing to transactions socket...' + subID)
stream.on('data', data => {
logger.info('[SOCKET] New transaction data:', data)
emitEncryptedEvent({ eventName: 'transaction:new', data, socket })
})
stream.on('end', () => {
logger.info('New invoice stream ended, starting a new one...')
logger.info('New transactions stream ended, starting a new one...')
process.nextTick(() => onNewTransaction(socket))
})
stream.on('error', err => {
logger.error('New invoice stream error:', err)
logger.error('New transactions stream error:' + subID, err)
})
stream.on('status', status => {
logger.error('New invoice stream status:', status)
logger.error('New transactions stream status:' + subID, status)
if (status.code === 14) {
logger.error(
'[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...'
)
process.nextTick(() =>
setTimeout(() => onNewTransaction(socket), 30000)
setTimeout(() => onNewTransaction(socket, subID), 30000)
)
}
})
return () => {
stream.cancel()
}
}
io.on('connection', socket => {
@ -174,6 +182,8 @@ module.exports = (
const isOneTimeUseSocket = !!socket.handshake.query.IS_GUN_AUTH
const isLNDSocket = !!socket.handshake.query.IS_LND_SOCKET
const isNotificationsSocket = !!socket.handshake.query
.IS_NOTIFICATIONS_SOCKET
if (!isLNDSocket) {
/** printing out the client who joined */
logger.info('New socket client connected (id=' + socket.id + ').')
@ -203,9 +213,16 @@ module.exports = (
})
} else {
if (isLNDSocket) {
logger.info('[LND] New LND Socket created')
onNewInvoice(socket)
onNewTransaction(socket)
const subID = Math.floor(Math.random() * 1000).toString()
const isNotifications = isNotificationsSocket ? 'notifications' : ''
logger.info('[LND] New LND Socket created:' + isNotifications + subID)
const cancelInvoiceStream = onNewInvoice(socket, subID)
const cancelTransactionStream = onNewTransaction(socket, subID)
socket.on('disconnect', () => {
logger.info('LND socket disconnected:' + isNotifications + subID)
cancelInvoiceStream()
cancelTransactionStream()
})
return
}
logger.info('New socket is NOT one time use')