Merge pull request #184 from shocknet/fix/dead-sockets
close stream when socket disconnects
This commit is contained in:
commit
0100df4d73
1 changed files with 31 additions and 14 deletions
|
|
@ -111,9 +111,9 @@ module.exports = (
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const onNewInvoice = socket => {
|
const onNewInvoice = (socket, subID) => {
|
||||||
const { lightning } = LightningServices.services
|
const { lightning } = LightningServices.services
|
||||||
logger.warn('Subscribing to invoices socket...')
|
logger.warn('Subscribing to invoices socket...' + subID)
|
||||||
const stream = lightning.subscribeInvoices({})
|
const stream = lightning.subscribeInvoices({})
|
||||||
stream.on('data', data => {
|
stream.on('data', data => {
|
||||||
logger.info('[SOCKET] New invoice data:', data)
|
logger.info('[SOCKET] New invoice data:', data)
|
||||||
|
|
@ -125,46 +125,54 @@ module.exports = (
|
||||||
process.nextTick(() => onNewInvoice(socket))
|
process.nextTick(() => onNewInvoice(socket))
|
||||||
})
|
})
|
||||||
stream.on('error', err => {
|
stream.on('error', err => {
|
||||||
logger.error('New invoice stream error:', err)
|
logger.error('New invoice stream error:' + subID, err)
|
||||||
})
|
})
|
||||||
stream.on('status', status => {
|
stream.on('status', status => {
|
||||||
logger.warn('New invoice stream status:', status)
|
logger.warn('New invoice stream status:' + subID, status)
|
||||||
if (status.code === 14) {
|
if (status.code === 14) {
|
||||||
// Prevents call stack overflow exceptions
|
// Prevents call stack overflow exceptions
|
||||||
logger.error(
|
logger.error(
|
||||||
'[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...'
|
'[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...'
|
||||||
)
|
)
|
||||||
process.nextTick(() => setTimeout(() => onNewInvoice(socket), 30000))
|
process.nextTick(() =>
|
||||||
|
setTimeout(() => onNewInvoice(socket, subID), 30000)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
return () => {
|
||||||
|
stream.cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const onNewTransaction = socket => {
|
const onNewTransaction = (socket, subID) => {
|
||||||
const { lightning } = LightningServices.services
|
const { lightning } = LightningServices.services
|
||||||
const stream = lightning.subscribeTransactions({})
|
const stream = lightning.subscribeTransactions({})
|
||||||
logger.warn('Subscribing to transactions socket...')
|
logger.warn('Subscribing to transactions socket...' + subID)
|
||||||
stream.on('data', data => {
|
stream.on('data', data => {
|
||||||
logger.info('[SOCKET] New transaction data:', data)
|
logger.info('[SOCKET] New transaction data:', data)
|
||||||
emitEncryptedEvent({ eventName: 'transaction:new', data, socket })
|
emitEncryptedEvent({ eventName: 'transaction:new', data, socket })
|
||||||
})
|
})
|
||||||
stream.on('end', () => {
|
stream.on('end', () => {
|
||||||
logger.info('New invoice stream ended, starting a new one...')
|
logger.info('New transactions stream ended, starting a new one...')
|
||||||
process.nextTick(() => onNewTransaction(socket))
|
process.nextTick(() => onNewTransaction(socket))
|
||||||
})
|
})
|
||||||
stream.on('error', err => {
|
stream.on('error', err => {
|
||||||
logger.error('New invoice stream error:', err)
|
logger.error('New transactions stream error:' + subID, err)
|
||||||
})
|
})
|
||||||
stream.on('status', status => {
|
stream.on('status', status => {
|
||||||
logger.error('New invoice stream status:', status)
|
logger.error('New transactions stream status:' + subID, status)
|
||||||
if (status.code === 14) {
|
if (status.code === 14) {
|
||||||
logger.error(
|
logger.error(
|
||||||
'[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...'
|
'[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...'
|
||||||
)
|
)
|
||||||
process.nextTick(() =>
|
process.nextTick(() =>
|
||||||
setTimeout(() => onNewTransaction(socket), 30000)
|
setTimeout(() => onNewTransaction(socket, subID), 30000)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
return () => {
|
||||||
|
stream.cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
io.on('connection', socket => {
|
io.on('connection', socket => {
|
||||||
|
|
@ -174,6 +182,8 @@ module.exports = (
|
||||||
|
|
||||||
const isOneTimeUseSocket = !!socket.handshake.query.IS_GUN_AUTH
|
const isOneTimeUseSocket = !!socket.handshake.query.IS_GUN_AUTH
|
||||||
const isLNDSocket = !!socket.handshake.query.IS_LND_SOCKET
|
const isLNDSocket = !!socket.handshake.query.IS_LND_SOCKET
|
||||||
|
const isNotificationsSocket = !!socket.handshake.query
|
||||||
|
.IS_NOTIFICATIONS_SOCKET
|
||||||
if (!isLNDSocket) {
|
if (!isLNDSocket) {
|
||||||
/** printing out the client who joined */
|
/** printing out the client who joined */
|
||||||
logger.info('New socket client connected (id=' + socket.id + ').')
|
logger.info('New socket client connected (id=' + socket.id + ').')
|
||||||
|
|
@ -203,9 +213,16 @@ module.exports = (
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
if (isLNDSocket) {
|
if (isLNDSocket) {
|
||||||
logger.info('[LND] New LND Socket created')
|
const subID = Math.floor(Math.random() * 1000).toString()
|
||||||
onNewInvoice(socket)
|
const isNotifications = isNotificationsSocket ? 'notifications' : ''
|
||||||
onNewTransaction(socket)
|
logger.info('[LND] New LND Socket created:' + isNotifications + subID)
|
||||||
|
const cancelInvoiceStream = onNewInvoice(socket, subID)
|
||||||
|
const cancelTransactionStream = onNewTransaction(socket, subID)
|
||||||
|
socket.on('disconnect', () => {
|
||||||
|
logger.info('LND socket disconnected:' + isNotifications + subID)
|
||||||
|
cancelInvoiceStream()
|
||||||
|
cancelTransactionStream()
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.info('New socket is NOT one time use')
|
logger.info('New socket is NOT one time use')
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue