From e0310f13caabe4a114659ed6d3edeb0db9f658e9 Mon Sep 17 00:00:00 2001 From: emad-salah Date: Wed, 21 Apr 2021 12:22:04 +0000 Subject: [PATCH] Bug fixes --- services/gunDB/sockets/index.js | 154 +++++++++++++-------- src/sockets.js | 228 +------------------------------- 2 files changed, 106 insertions(+), 276 deletions(-) diff --git a/services/gunDB/sockets/index.js b/services/gunDB/sockets/index.js index 0a31b554..10f715e6 100644 --- a/services/gunDB/sockets/index.js +++ b/services/gunDB/sockets/index.js @@ -18,14 +18,22 @@ const { const TipsForwarder = require('../../tipsCallback') const auth = require('../../auth/auth') -const ALLOWED_GUN_METHODS = ['map', 'map.on', 'on', 'once', 'load', 'then'] +const ALLOWED_GUN_METHODS = [ + 'map', + 'map.on', + 'on', + 'once', + 'load', + 'then', + 'open' +] /** * @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue */ /** - * @typedef {(data: ValidDataValue, key: string, _msg: any, event: any) => (void | Promise)} GunListener + * @typedef {(data: ValidDataValue, key?: string, _msg?: any, event?: any) => (void | Promise)} GunListener * @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions */ @@ -77,21 +85,34 @@ const getGunQuery = (node, path) => { } /** - * Dynamically construct a GunDB query call + * Executes a GunDB query call using the specified method * @param {any} query * @param {string} method + * @param {GunListener} listener */ -const getGunListener = (query, method) => { - const methods = method.split('.') - const listener = methods.reduce((listener, method) => { - if (typeof listener === 'function') { - return listener[method]() +const executeGunQuery = (query, method, listener) => { + if (!ALLOWED_GUN_METHODS.includes(method)) { + throw { + field: 'method', + message: `Invalid GunDB method specified (${method}). ` } + } - return listener[method]() - }, query) + if (method === 'on') { + return query.on(listener) + } - return listener + if (method === 'open') { + return query.open(listener) + } + + if (method === 'map.on') { + return query.map().on(listener) + } + + if (method === 'map.once') { + return query.map().once(listener) + } } /** @@ -113,18 +134,21 @@ const queryListenerCallback = ({ deviceId, subscriptionId }) - if (subscription && event && event.off) { - event.off() + if (subscription && !subscription.unsubscribe && event) { + Subscriptions.attachUnsubscribe({ + deviceId, + subscriptionId, + unsubscribe: () => event.off() + }) } - const eventName = `subscription:${subscriptionId}` + const eventName = `query:data` if (publicKeyForDecryption?.length > 15) { const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption) - - emit(eventName, decData, key) + emit(eventName, { subscriptionId, response: { data: decData, key } }) return } - emit(eventName, data, key) + emit(eventName, { subscriptionId, response: { data, key } }) } catch (err) { logger.error(`Error for gun rpc socket: ${err.message}`) } @@ -138,8 +162,18 @@ const queryListenerCallback = ({ * @param {import('socket.io').Socket} GunSocketOptions.socket * @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise} */ -const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { - return async ({ reconnect = false, token }, response) => { +const wrap = ({ handler, subscriptionId, encryptionId, socket }) => async ( + { reconnect, token }, + response +) => { + try { + logger.info('Subscribe function executing...') + if (!isAuthenticated()) { + logger.warn('GunDB is not yet authenticated') + socket.emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } + const callback = encryptedCallback(socket, response) const emit = encryptedEmit(socket) const subscription = Subscriptions.get({ @@ -148,15 +182,21 @@ const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { }) if (subscription && !reconnect) { - callback({ + const error = { field: 'subscription', message: "You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true " - }) + } + logger.error('Duplicate subscription:', error) + callback(error) return } - if (reconnect) { + if (subscription && reconnect) { + if (subscription.unsubscribe) { + subscription.unsubscribe() + } + Subscriptions.remove({ deviceId: encryptionId, subscriptionId @@ -187,53 +227,59 @@ const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { message: 'Subscribed successfully!', success: true }) + } catch (error) { + logger.error('Socket wrapper error:', error) } } /** @param {import('socket.io').Socket} socket */ const startSocket = socket => { try { - if (!isAuthenticated()) { - socket.emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - const emit = encryptedEmit(socket) const on = encryptedOn(socket) const { encryptionId } = socket.handshake.auth on('subscribe:query', ({ $shock, publicKey }, response) => { - const [root, path, method] = $shock.split('::') - const callback = encryptedCallback(socket, response) - - if (!ALLOWED_GUN_METHODS.includes(method)) { - callback(`Invalid method for gun rpc call: ${method}, query: ${$shock}`) - return - } - const subscriptionId = uuidv4() - const queryCallback = queryListenerCallback({ - emit, - publicKeyForDecryption: publicKey, - subscriptionId, - deviceId: encryptionId - }) + try { + if (!isAuthenticated()) { + socket.emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } - const node = getNode(root) - const query = getGunQuery(node, path) - /** @type {(cb?: GunListener) => void} */ - const listener = getGunListener(query, method) + const [root, path, method] = $shock.split('::') + const socketCallback = encryptedCallback(socket, response) - Subscriptions.add({ - deviceId: encryptionId, - subscriptionId - }) + if (!ALLOWED_GUN_METHODS.includes(method)) { + socketCallback( + `Invalid method for gun rpc call: ${method}, query: ${$shock}` + ) + return + } - callback(null, { - subscriptionId - }) + Subscriptions.add({ + deviceId: encryptionId, + subscriptionId + }) - listener(queryCallback) + const queryCallback = queryListenerCallback({ + emit, + publicKeyForDecryption: publicKey, + subscriptionId, + deviceId: encryptionId + }) + + socketCallback(null, { + subscriptionId + }) + + const node = getNode(root) + const query = getGunQuery(node, path) + + executeGunQuery(query, method, queryCallback) + } catch (error) { + emit(`query:error`, { subscriptionId, response: { data: error } }) + } }) const onChats = () => { @@ -346,7 +392,7 @@ const startSocket = socket => { TipsForwarder.addSocket(postID, socket) }) - on('unsubscribe', (subscriptionId, response) => { + on('unsubscribe', ({ subscriptionId }, response) => { const callback = encryptedCallback(socket, response) Subscriptions.remove({ deviceId: encryptionId, subscriptionId }) callback(null, { diff --git a/src/sockets.js b/src/sockets.js index 08350b15..16838705 100644 --- a/src/sockets.js +++ b/src/sockets.js @@ -11,7 +11,6 @@ const auth = require('../services/auth/auth') const LightningServices = require('../utils/lightningServices') const { isAuthenticated } = require('../services/gunDB/Mediator') const initGunDBSocket = require('../services/gunDB/sockets') -const GunEvents = require('../services/gunDB/contact-api/events') const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket') const TipsForwarder = require('../services/tipsCallback') /** @@ -23,7 +22,7 @@ module.exports = ( /** @type {import('socket.io').Server} */ io ) => { - io.on('connection', socket => { + io.on('connect', socket => { const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET const isNotificationsSocket = !!socket.handshake.auth .IS_NOTIFICATIONS_SOCKET @@ -105,6 +104,10 @@ module.exports = ( } }) + io.of('gun').on('connect', socket => { + initGunDBSocket(socket) + }) + /** * @param {string} token * @returns {Promise} @@ -129,7 +132,7 @@ module.exports = ( /** @type {null|NodeJS.Timeout} */ let pingIntervalID = null - + // TODO: Unused? io.of('shockping').on( 'connect', // TODO: make this sync @@ -181,225 +184,6 @@ module.exports = ( } ) - // TODO: do this through rpc - - const emptyUnsub = () => {} - - let chatsUnsub = emptyUnsub - - io.of('chats').on('connect', async socket => { - const on = encryptedOn(socket) - const emit = encryptedEmit(socket) - - try { - if (!isAuthenticated()) { - logger.info( - 'not authenticated in gun for chats socket, will send NOT_AUTH' - ) - emit(Common.Constants.ErrorCode.NOT_AUTH) - - return - } - - logger.info('now checking token for chats socket') - const { token } = socket.handshake.auth - const isAuth = await isValidToken(token) - - if (!isAuth) { - logger.warn('invalid token for chats socket') - emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - if (chatsUnsub !== emptyUnsub) { - logger.error( - 'Tried to set chats socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead' - ) - chatsUnsub() - chatsUnsub = emptyUnsub - } - - /** - * @param {Common.Schema.Chat[]} chats - */ - const onChats = chats => { - const processed = chats.map( - ({ - didDisconnect, - id, - lastSeenApp, - messages, - recipientPublicKey - }) => { - /** @type {Common.Schema.Chat} */ - const stripped = { - didDisconnect, - id, - lastSeenApp, - messages, - recipientAvatar: null, - recipientDisplayName: null, - recipientPublicKey - } - - return stripped - } - ) - - emit('$shock', processed) - } - - chatsUnsub = GunEvents.onChats(onChats) - - on('disconnect', () => { - chatsUnsub() - chatsUnsub = emptyUnsub - }) - } catch (e) { - logger.error('Error inside chats socket connect: ' + e.message) - emit('$error', e.message) - } - }) - - let sentReqsUnsub = emptyUnsub - - io.of('sentReqs').on('connect', async socket => { - const on = encryptedOn(socket) - const emit = encryptedEmit(socket) - - try { - if (!isAuthenticated()) { - logger.info( - 'not authenticated in gun for sentReqs socket, will send NOT_AUTH' - ) - emit(Common.Constants.ErrorCode.NOT_AUTH) - - return - } - - logger.info('now checking token for sentReqs socket') - const { token } = socket.handshake.auth - const isAuth = await isValidToken(token) - - if (!isAuth) { - logger.warn('invalid token for sentReqs socket') - emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - if (sentReqsUnsub !== emptyUnsub) { - logger.error( - 'Tried to set sentReqs socket twice, this might be due to an app restart and the old socket not being recycled by io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead' - ) - sentReqsUnsub() - sentReqsUnsub = emptyUnsub - } - - /** - * @param {Common.Schema.SimpleSentRequest[]} sentReqs - */ - const onSentReqs = sentReqs => { - const processed = sentReqs.map( - ({ - id, - recipientChangedRequestAddress, - recipientPublicKey, - timestamp - }) => { - /** - * @type {Common.Schema.SimpleSentRequest} - */ - const stripped = { - id, - recipientAvatar: null, - recipientChangedRequestAddress, - recipientDisplayName: null, - recipientPublicKey, - timestamp - } - - return stripped - } - ) - emit('$shock', processed) - } - - sentReqsUnsub = GunEvents.onSimplerSentRequests(onSentReqs) - - on('disconnect', () => { - sentReqsUnsub() - sentReqsUnsub = emptyUnsub - }) - } catch (e) { - logger.error('Error inside sentReqs socket connect: ' + e.message) - emit('$error', e.message) - } - }) - - let receivedReqsUnsub = emptyUnsub - - io.of('receivedReqs').on('connect', async socket => { - const on = encryptedOn(socket) - const emit = encryptedEmit(socket) - try { - if (!isAuthenticated()) { - logger.info( - 'not authenticated in gun for receivedReqs socket, will send NOT_AUTH' - ) - emit(Common.Constants.ErrorCode.NOT_AUTH) - - return - } - - logger.info('now checking token for receivedReqs socket') - const { token } = socket.handshake.auth - const isAuth = await isValidToken(token) - - if (!isAuth) { - logger.warn('invalid token for receivedReqs socket') - emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - if (receivedReqsUnsub !== emptyUnsub) { - logger.error( - 'Tried to set receivedReqs socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead' - ) - receivedReqsUnsub() - receivedReqsUnsub = emptyUnsub - } - - /** - * @param {ReadonlyArray} receivedReqs - */ - const onReceivedReqs = receivedReqs => { - const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => { - /** @type {Common.Schema.SimpleReceivedRequest} */ - const stripped = { - id, - requestorAvatar: null, - requestorDisplayName: null, - requestorPK, - timestamp - } - - return stripped - }) - - emit('$shock', processed) - } - - receivedReqsUnsub = GunEvents.onSimplerReceivedRequests(onReceivedReqs) - - on('disconnect', () => { - receivedReqsUnsub() - receivedReqsUnsub = emptyUnsub - }) - } catch (e) { - logger.error('Error inside receivedReqs socket connect: ' + e.message) - emit('$error', e.message) - } - }) io.of('streams').on('connect', socket => { console.log('a user connected') socket.on('postID', postID => {