diff --git a/src/sockets.js b/src/sockets.js index 76d195fc..c29e7e9e 100644 --- a/src/sockets.js +++ b/src/sockets.js @@ -111,9 +111,9 @@ module.exports = ( } } - const onNewInvoice = socket => { + const onNewInvoice = (socket, subID) => { const { lightning } = LightningServices.services - logger.warn('Subscribing to invoices socket...') + logger.warn('Subscribing to invoices socket...' + subID) const stream = lightning.subscribeInvoices({}) stream.on('data', data => { logger.info('[SOCKET] New invoice data:', data) @@ -125,46 +125,54 @@ module.exports = ( process.nextTick(() => onNewInvoice(socket)) }) stream.on('error', err => { - logger.error('New invoice stream error:', err) + logger.error('New invoice stream error:' + subID, err) }) stream.on('status', status => { - logger.warn('New invoice stream status:', status) + logger.warn('New invoice stream status:' + subID, status) if (status.code === 14) { // Prevents call stack overflow exceptions logger.error( '[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...' ) - process.nextTick(() => setTimeout(() => onNewInvoice(socket), 30000)) + process.nextTick(() => + setTimeout(() => onNewInvoice(socket, subID), 30000) + ) } }) + return () => { + stream.cancel() + } } - const onNewTransaction = socket => { + const onNewTransaction = (socket, subID) => { const { lightning } = LightningServices.services const stream = lightning.subscribeTransactions({}) - logger.warn('Subscribing to transactions socket...') + logger.warn('Subscribing to transactions socket...' + subID) stream.on('data', data => { logger.info('[SOCKET] New transaction data:', data) emitEncryptedEvent({ eventName: 'transaction:new', data, socket }) }) stream.on('end', () => { - logger.info('New invoice stream ended, starting a new one...') + logger.info('New transactions stream ended, starting a new one...') process.nextTick(() => onNewTransaction(socket)) }) stream.on('error', err => { - logger.error('New invoice stream error:', err) + logger.error('New transactions stream error:' + subID, err) }) stream.on('status', status => { - logger.error('New invoice stream status:', status) + logger.error('New transactions stream status:' + subID, status) if (status.code === 14) { logger.error( '[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...' ) process.nextTick(() => - setTimeout(() => onNewTransaction(socket), 30000) + setTimeout(() => onNewTransaction(socket, subID), 30000) ) } }) + return () => { + stream.cancel() + } } io.on('connection', socket => { @@ -174,6 +182,8 @@ module.exports = ( const isOneTimeUseSocket = !!socket.handshake.query.IS_GUN_AUTH const isLNDSocket = !!socket.handshake.query.IS_LND_SOCKET + const isNotificationsSocket = !!socket.handshake.query + .IS_NOTIFICATIONS_SOCKET if (!isLNDSocket) { /** printing out the client who joined */ logger.info('New socket client connected (id=' + socket.id + ').') @@ -203,9 +213,16 @@ module.exports = ( }) } else { if (isLNDSocket) { - logger.info('[LND] New LND Socket created') - onNewInvoice(socket) - onNewTransaction(socket) + const subID = Math.floor(Math.random() * 1000).toString() + const isNotifications = isNotificationsSocket ? 'notifications' : '' + logger.info('[LND] New LND Socket created:' + isNotifications + subID) + const cancelInvoiceStream = onNewInvoice(socket, subID) + const cancelTransactionStream = onNewTransaction(socket, subID) + socket.on('disconnect', () => { + logger.info('LND socket disconnected:' + isNotifications + subID) + cancelInvoiceStream() + cancelTransactionStream() + }) return } logger.info('New socket is NOT one time use')