From 9a95baf03c3eae8b21e9bfb1084105364f5aa719 Mon Sep 17 00:00:00 2001 From: emad-salah Date: Sun, 18 Apr 2021 21:15:48 +0000 Subject: [PATCH 1/4] GunDB Socket.io functionality rewritten --- services/gunDB/Mediator/index.js | 2 +- services/gunDB/sockets/index.js | 366 ++++++++++++++++++++++++ services/gunDB/sockets/subscriptions.js | 127 ++++++++ src/sockets.js | 290 +------------------ utils/ECC/socket.js | 63 +++- 5 files changed, 554 insertions(+), 294 deletions(-) create mode 100644 services/gunDB/sockets/index.js create mode 100644 services/gunDB/sockets/subscriptions.js diff --git a/services/gunDB/Mediator/index.js b/services/gunDB/Mediator/index.js index cbcaf748..990d23ce 100644 --- a/services/gunDB/Mediator/index.js +++ b/services/gunDB/Mediator/index.js @@ -257,7 +257,7 @@ const Config = require('../config') /** * @typedef {object} SimpleSocket * @prop {(eventName: string, data?: Emission|EncryptedEmissionLegacy|EncryptedEmission|ValidDataValue) => void} emit - * @prop {(eventName: string, handler: (data: any) => void) => void} on + * @prop {(eventName: string, handler: (data: any, callback: (err?: any, data?: any) => void) => void) => void} on * @prop {{ auth: { [key: string]: any } }} handshake */ diff --git a/services/gunDB/sockets/index.js b/services/gunDB/sockets/index.js new file mode 100644 index 00000000..0a31b554 --- /dev/null +++ b/services/gunDB/sockets/index.js @@ -0,0 +1,366 @@ +/** + * @format + */ + +const logger = require('winston') +const Common = require('shock-common') +const uuidv4 = require('uuid/v4') + +const { getGun, getUser, isAuthenticated } = require('../Mediator') +const { deepDecryptIfNeeded } = require('../rpc') +const Subscriptions = require('./subscriptions') +const GunEvents = require('../contact-api/events') +const { + encryptedEmit, + encryptedOn, + encryptedCallback +} = require('../../../utils/ECC/socket') +const TipsForwarder = require('../../tipsCallback') +const auth = require('../../auth/auth') + +const ALLOWED_GUN_METHODS = ['map', 'map.on', 'on', 'once', 'load', 'then'] + +/** + * @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue + */ + +/** + * @typedef {(data: ValidDataValue, key: string, _msg: any, event: any) => (void | Promise)} GunListener + * @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions + */ + +/** + * @param {string} token + * @returns {Promise} + */ +const isValidToken = async token => { + const validation = await auth.validateToken(token) + + if (typeof validation !== 'object') { + return false + } + + if (validation === null) { + return false + } + + if (typeof validation.valid !== 'boolean') { + return false + } + + return validation.valid +} + +/** + * @param {string} root + */ +const getNode = root => { + if (root === '$gun') { + return getGun() + } + + if (root === '$user') { + return getUser() + } + + return getGun().user(root) +} + +/** + * @param {import("../contact-api/SimpleGUN").GUNNode} node + * @param {string} path + */ +const getGunQuery = (node, path) => { + const bits = path.split('>') + const query = bits.reduce((gunQuery, bit) => gunQuery.get(bit), node) + return query +} + +/** + * Dynamically construct a GunDB query call + * @param {any} query + * @param {string} method + */ +const getGunListener = (query, method) => { + const methods = method.split('.') + const listener = methods.reduce((listener, method) => { + if (typeof listener === 'function') { + return listener[method]() + } + + return listener[method]() + }, query) + + return listener +} + +/** + * @param {Object} queryData + * @param {(eventName: string, ...args: any[]) => Promise} queryData.emit + * @param {string} queryData.publicKeyForDecryption + * @param {string} queryData.subscriptionId + * @param {string} queryData.deviceId + * @returns {GunListener} + */ +const queryListenerCallback = ({ + emit, + publicKeyForDecryption, + subscriptionId, + deviceId +}) => async (data, key, _msg, event) => { + try { + const subscription = Subscriptions.get({ + deviceId, + subscriptionId + }) + if (subscription && event && event.off) { + event.off() + } + const eventName = `subscription:${subscriptionId}` + if (publicKeyForDecryption?.length > 15) { + const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption) + + emit(eventName, decData, key) + return + } + + emit(eventName, data, key) + } catch (err) { + logger.error(`Error for gun rpc socket: ${err.message}`) + } +} + +/** + * @param {Object} GunSocketOptions + * @param {() => (import('./subscriptions').Unsubscribe | void)} GunSocketOptions.handler + * @param {string} GunSocketOptions.subscriptionId + * @param {string} GunSocketOptions.encryptionId + * @param {import('socket.io').Socket} GunSocketOptions.socket + * @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise} + */ +const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { + return async ({ reconnect = false, token }, response) => { + const callback = encryptedCallback(socket, response) + const emit = encryptedEmit(socket) + const subscription = Subscriptions.get({ + deviceId: encryptionId, + subscriptionId + }) + + if (subscription && !reconnect) { + callback({ + field: 'subscription', + message: + "You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true " + }) + return + } + + if (reconnect) { + Subscriptions.remove({ + deviceId: encryptionId, + subscriptionId + }) + } + + if (!subscription || reconnect) { + const isAuth = await isValidToken(token) + + if (!isAuth) { + logger.warn('invalid token specified') + emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } + + const unsubscribe = handler() + + if (unsubscribe) { + Subscriptions.attachUnsubscribe({ + deviceId: encryptionId, + subscriptionId, + unsubscribe + }) + } + } + + callback(null, { + message: 'Subscribed successfully!', + success: true + }) + } +} + +/** @param {import('socket.io').Socket} socket */ +const startSocket = socket => { + try { + if (!isAuthenticated()) { + socket.emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } + + const emit = encryptedEmit(socket) + const on = encryptedOn(socket) + const { encryptionId } = socket.handshake.auth + + on('subscribe:query', ({ $shock, publicKey }, response) => { + const [root, path, method] = $shock.split('::') + const callback = encryptedCallback(socket, response) + + if (!ALLOWED_GUN_METHODS.includes(method)) { + callback(`Invalid method for gun rpc call: ${method}, query: ${$shock}`) + return + } + + const subscriptionId = uuidv4() + const queryCallback = queryListenerCallback({ + emit, + publicKeyForDecryption: publicKey, + subscriptionId, + deviceId: encryptionId + }) + + const node = getNode(root) + const query = getGunQuery(node, path) + /** @type {(cb?: GunListener) => void} */ + const listener = getGunListener(query, method) + + Subscriptions.add({ + deviceId: encryptionId, + subscriptionId + }) + + callback(null, { + subscriptionId + }) + + listener(queryCallback) + }) + + const onChats = () => { + return GunEvents.onChats(chats => { + const processed = chats.map( + ({ + didDisconnect, + id, + lastSeenApp, + messages, + recipientPublicKey + }) => { + /** @type {Common.Schema.Chat} */ + const stripped = { + didDisconnect, + id, + lastSeenApp, + messages, + recipientAvatar: null, + recipientDisplayName: null, + recipientPublicKey + } + + return stripped + } + ) + + emit('chats', processed) + }) + } + + on( + 'subscribe:chats', + wrap({ + handler: onChats, + encryptionId, + subscriptionId: 'chats', + socket + }) + ) + + const onSentRequests = () => { + return GunEvents.onSimplerSentRequests(sentReqs => { + const processed = sentReqs.map( + ({ + id, + recipientChangedRequestAddress, + recipientPublicKey, + timestamp + }) => { + /** + * @type {Common.Schema.SimpleSentRequest} + */ + const stripped = { + id, + recipientAvatar: null, + recipientChangedRequestAddress, + recipientDisplayName: null, + recipientPublicKey, + timestamp + } + + return stripped + } + ) + emit('sentRequests', processed) + }) + } + + on( + 'subscribe:sentRequests', + wrap({ + handler: onSentRequests, + encryptionId, + subscriptionId: 'sentRequests', + socket + }) + ) + + const onReceivedRequests = () => { + return GunEvents.onSimplerReceivedRequests(receivedReqs => { + const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => { + /** @type {Common.Schema.SimpleReceivedRequest} */ + const stripped = { + id, + requestorAvatar: null, + requestorDisplayName: null, + requestorPK, + timestamp + } + + return stripped + }) + + emit('receivedRequests', processed) + }) + } + + on( + 'subscribe:receivedRequests', + wrap({ + handler: onReceivedRequests, + encryptionId, + subscriptionId: 'receivedRequests', + socket + }) + ) + + on('streams:postID', postID => { + TipsForwarder.addSocket(postID, socket) + }) + + on('unsubscribe', (subscriptionId, response) => { + const callback = encryptedCallback(socket, response) + Subscriptions.remove({ deviceId: encryptionId, subscriptionId }) + callback(null, { + message: 'Unsubscribed successfully!', + success: true + }) + }) + + socket.on('disconnect', () => { + Subscriptions.removeDevice({ deviceId: encryptionId }) + }) + } catch (err) { + logger.error('GUNRPC: ' + err.message) + } +} + +module.exports = startSocket diff --git a/services/gunDB/sockets/subscriptions.js b/services/gunDB/sockets/subscriptions.js new file mode 100644 index 00000000..353c00f8 --- /dev/null +++ b/services/gunDB/sockets/subscriptions.js @@ -0,0 +1,127 @@ +/** + * @typedef {() => void} Unsubscribe + */ + +/** @type {Map void, metadata?: object }>>} */ +const userSubscriptions = new Map() + +/** + * Adds a new Subscription + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + * @param {(Unsubscribe)=} subscription.unsubscribe + * @param {(object)=} subscription.metadata + */ +const add = ({ deviceId, subscriptionId, unsubscribe, metadata }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + const subscriptions = deviceSubscriptions ?? new Map() + subscriptions.set(subscriptionId, { + subscriptionId, + unsubscribe, + metadata + }) + userSubscriptions.set(deviceId, subscriptions) +} + +/** + * Adds a new Subscription + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + * @param {Unsubscribe} subscription.unsubscribe + */ +const attachUnsubscribe = ({ + deviceId, + subscriptionId, + unsubscribe +}) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + const subscriptions = deviceSubscriptions + + if (!subscriptions) { + return + } + + const subscription = subscriptions.get(subscriptionId) + + if (!subscription) { + return + } + + subscriptions.set(subscriptionId, { + ...subscription, + unsubscribe + }) + userSubscriptions.set(deviceId, subscriptions) +} + +/** + * Unsubscribes from a GunDB query + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + */ +const remove = ({ deviceId, subscriptionId }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + const subscriptions = deviceSubscriptions ?? new Map() + const subscription = subscriptions.get(subscriptionId); + + if (subscription?.unsubscribe) { + subscription.unsubscribe() + } + + subscriptions.delete(subscriptionId) + userSubscriptions.set(deviceId, subscriptions) +} + +/** + * Unsubscribes from all GunDB queries for a specific device + * @param {Object} subscription + * @param {string} subscription.deviceId + */ +const removeDevice = ({ deviceId }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId); + + if (!deviceSubscriptions) { + return + } + + Array.from(deviceSubscriptions.values()).map(subscription => { + if (subscription && subscription.unsubscribe) { + subscription.unsubscribe() + } + + return subscription + }) + + userSubscriptions.set(deviceId, new Map()); +} + +/** + * Retrieves the specified subscription's info if it exists + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + */ +const get = ({ deviceId, subscriptionId }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + if (!deviceSubscriptions) { + return false + } + + const subscription = deviceSubscriptions.get(subscriptionId) + return subscription +} + +module.exports = { + add, + attachUnsubscribe, + get, + remove, + removeDevice +} \ No newline at end of file diff --git a/src/sockets.js b/src/sockets.js index fcc42b7a..08350b15 100644 --- a/src/sockets.js +++ b/src/sockets.js @@ -8,16 +8,10 @@ const Common = require('shock-common') const mapValues = require('lodash/mapValues') const auth = require('../services/auth/auth') -const Encryption = require('../utils/encryptionStore') const LightningServices = require('../utils/lightningServices') -const { - getGun, - getUser, - isAuthenticated -} = require('../services/gunDB/Mediator') -const { deepDecryptIfNeeded } = require('../services/gunDB/rpc') +const { isAuthenticated } = require('../services/gunDB/Mediator') +const initGunDBSocket = require('../services/gunDB/sockets') const GunEvents = require('../services/gunDB/contact-api/events') -const SchemaManager = require('../services/schema') const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket') const TipsForwarder = require('../services/tipsCallback') /** @@ -29,204 +23,7 @@ module.exports = ( /** @type {import('socket.io').Server} */ io ) => { - // This should be used for encrypting and emitting your data - const encryptedEmitLegacy = ({ eventName, data, socket }) => { - try { - if (Encryption.isNonEncrypted(eventName)) { - return socket.emit(eventName, data) - } - - const deviceId = socket.handshake.auth['x-shockwallet-device-id'] - const authorized = Encryption.isAuthorizedDevice({ deviceId }) - - if (!deviceId) { - throw { - field: 'deviceId', - message: 'Please specify a device ID' - } - } - - if (!authorized) { - throw { - field: 'deviceId', - message: 'Please exchange keys with the API before using the socket' - } - } - - const encryptedMessage = Encryption.encryptMessage({ - message: data, - deviceId - }) - - return socket.emit(eventName, encryptedMessage) - } catch (err) { - logger.error( - `[SOCKET] An error has occurred while encrypting an event (${eventName}):`, - err - ) - - return socket.emit('encryption:error', err) - } - } - - const onNewInvoice = (socket, subID) => { - const { lightning } = LightningServices.services - logger.warn('Subscribing to invoices socket...' + subID) - const stream = lightning.subscribeInvoices({}) - stream.on('data', data => { - logger.info('[SOCKET] New invoice data:', data) - encryptedEmitLegacy({ eventName: 'invoice:new', data, socket }) - if (!data.settled) { - return - } - SchemaManager.AddOrder({ - type: 'invoice', - amount: parseInt(data.amt_paid_sat, 10), - coordinateHash: data.r_hash.toString('hex'), - coordinateIndex: parseInt(data.add_index, 10), - inbound: true, - toLndPub: data.payment_addr - }) - }) - stream.on('end', () => { - logger.info('New invoice stream ended, starting a new one...') - // Prevents call stack overflow exceptions - //process.nextTick(() => onNewInvoice(socket)) - }) - stream.on('error', err => { - logger.error('New invoice stream error:' + subID, err) - }) - stream.on('status', status => { - logger.warn('New invoice stream status:' + subID, status) - switch (status.code) { - case 0: { - logger.info('[event:invoice:new] stream ok') - break - } - case 1: { - logger.info( - '[event:invoice:new] stream canceled, probably socket disconnected' - ) - break - } - case 2: { - logger.warn('[event:invoice:new] got UNKNOWN error status') - break - } - case 12: { - logger.warn( - '[event:invoice:new] LND locked, new registration in 60 seconds' - ) - process.nextTick(() => - setTimeout(() => onNewInvoice(socket, subID), 60000) - ) - break - } - case 13: { - //https://grpc.github.io/grpc/core/md_doc_statuscodes.html - logger.error('[event:invoice:new] INTERNAL LND error') - break - } - case 14: { - logger.error( - '[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...' - ) - process.nextTick(() => - setTimeout(() => onNewInvoice(socket, subID), 30000) - ) - break - } - default: { - logger.error('[event:invoice:new] UNKNOWN LND error') - } - } - }) - return () => { - stream.cancel() - } - } - - const onNewTransaction = (socket, subID) => { - const { lightning } = LightningServices.services - const stream = lightning.subscribeTransactions({}) - logger.warn('Subscribing to transactions socket...' + subID) - stream.on('data', data => { - logger.info('[SOCKET] New transaction data:', data) - - Promise.all(data.dest_addresses.map(SchemaManager.isTmpChainOrder)).then( - responses => { - const hasOrder = responses.some(res => res !== false) - if (hasOrder && data.num_confirmations > 0) { - //buddy needs to manage this - } else { - //business as usual - encryptedEmitLegacy({ eventName: 'transaction:new', data, socket }) - } - } - ) - }) - stream.on('end', () => { - logger.info('New transactions stream ended, starting a new one...') - //process.nextTick(() => onNewTransaction(socket)) - }) - stream.on('error', err => { - logger.error('New transactions stream error:' + subID, err) - }) - stream.on('status', status => { - logger.info('New transactions stream status:' + subID, status) - switch (status.code) { - case 0: { - logger.info('[event:transaction:new] stream ok') - break - } - case 1: { - logger.info( - '[event:transaction:new] stream canceled, probably socket disconnected' - ) - break - } - case 2: { - //Happens to fire when the grpc client lose access to macaroon file - logger.warn('[event:transaction:new] got UNKNOWN error status') - break - } - case 12: { - logger.warn( - '[event:transaction:new] LND locked, new registration in 60 seconds' - ) - process.nextTick(() => - setTimeout(() => onNewTransaction(socket, subID), 60000) - ) - break - } - case 13: { - //https://grpc.github.io/grpc/core/md_doc_statuscodes.html - logger.error('[event:transaction:new] INTERNAL LND error') - break - } - case 14: { - logger.error( - '[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...' - ) - process.nextTick(() => - setTimeout(() => onNewTransaction(socket, subID), 30000) - ) - break - } - default: { - logger.error('[event:transaction:new] UNKNOWN LND error') - } - } - }) - return () => { - stream.cancel() - } - } - io.on('connection', socket => { - logger.info(`io.onconnection`) - logger.info('socket.handshake', socket.handshake) - const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET const isNotificationsSocket = !!socket.handshake.auth .IS_NOTIFICATIONS_SOCKET @@ -240,89 +37,6 @@ module.exports = ( const subID = Math.floor(Math.random() * 1000).toString() const isNotifications = isNotificationsSocket ? 'notifications' : '' logger.info('[LND] New LND Socket created:' + isNotifications + subID) - /* not used by wallet anymore - const cancelInvoiceStream = onNewInvoice(socket, subID) - const cancelTransactionStream = onNewTransaction(socket, subID) - socket.on('disconnect', () => { - logger.info('LND socket disconnected:' + isNotifications + subID) - cancelInvoiceStream() - cancelTransactionStream() - })*/ - } - }) - - io.of('gun').on('connect', socket => { - // TODO: off() - - try { - if (!isAuthenticated()) { - socket.emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - const emit = encryptedEmit(socket) - - const { $shock, publicKeyForDecryption } = socket.handshake.auth - - const [root, path, method] = $shock.split('::') - - // eslint-disable-next-line init-declarations - let node - - if (root === '$gun') { - node = getGun() - } else if (root === '$user') { - node = getUser() - } else { - node = getGun().user(root) - } - - for (const bit of path.split('>')) { - node = node.get(bit) - } - - /** - * @param {ValidDataValue} data - * @param {string} key - */ - const listener = async (data, key) => { - try { - if ( - typeof publicKeyForDecryption === 'string' && - publicKeyForDecryption !== 'undefined' && - publicKeyForDecryption.length > 15 - ) { - const decData = await deepDecryptIfNeeded( - data, - publicKeyForDecryption - ) - - emit('$shock', decData, key) - } else { - emit('$shock', data, key) - } - } catch (err) { - logger.error( - `Error for gun rpc socket, query ${$shock} -> ${err.message}` - ) - } - } - - if (method === 'on') { - node.on(listener) - } else if (method === 'open') { - node.open(listener) - } else if (method === 'map.on') { - node.map().on(listener) - } else if (method === 'map.once') { - node.map().once(listener) - } else { - throw new TypeError( - `Invalid method for gun rpc call : ${method}, query: ${$shock}` - ) - } - } catch (err) { - logger.error('GUNRPC: ' + err.message) } }) diff --git a/utils/ECC/socket.js b/utils/ECC/socket.js index 771aee6d..8068bb27 100644 --- a/utils/ECC/socket.js +++ b/utils/ECC/socket.js @@ -20,6 +20,7 @@ const nonEncryptedEvents = [ * @typedef {import('../../services/gunDB/Mediator').EncryptedEmission} EncryptedEmission * @typedef {import('../../services/gunDB/Mediator').EncryptedEmissionLegacy} EncryptedEmissionLegacy * @typedef {import('../../services/gunDB/contact-api/SimpleGUN').ValidDataValue} ValidDataValue + * @typedef {(data: any, callback: (error?: any, data?: any) => void) => void} SocketOnListener */ /** @@ -83,7 +84,7 @@ const encryptedEmit = socket => async (eventName, ...args) => { /** * @param {SimpleSocket} socket - * @returns {(eventName: string, callback: (data: any) => void) => void} + * @returns {(eventName: string, callback: SocketOnListener) => void} */ const encryptedOn = socket => (eventName, callback) => { try { @@ -110,9 +111,9 @@ const encryptedOn = socket => (eventName, callback) => { } } - socket.on(eventName, async data => { + socket.on(eventName, async (data, response) => { if (isNonEncrypted(eventName)) { - callback(data) + callback(data, response) return } @@ -122,7 +123,7 @@ const encryptedOn = socket => (eventName, callback) => { encryptedMessage: data }) - callback(safeParseJSON(decryptedMessage)) + callback(safeParseJSON(decryptedMessage), response) } }) } catch (err) { @@ -135,8 +136,60 @@ const encryptedOn = socket => (eventName, callback) => { } } +/** + * @param {SimpleSocket} socket + * @param {(error?: any, data?: any) => void} callback + * @returns {(...args: any[]) => Promise} + */ +const encryptedCallback = (socket, callback) => async (...args) => { + try { + const deviceId = socket.handshake.auth.encryptionId + + if (!deviceId) { + throw { + field: 'deviceId', + message: 'Please specify a device ID' + } + } + + const authorized = ECC.isAuthorizedDevice({ deviceId }) + + if (!authorized) { + throw { + field: 'deviceId', + message: 'Please exchange keys with the API before using the socket' + } + } + + const encryptedArgs = await Promise.all( + args.map(async data => { + if (!data) { + return data + } + + const encryptedMessage = await ECC.encryptMessage({ + message: typeof data === 'object' ? JSON.stringify(data) : data, + deviceId + }) + + return encryptedMessage + }) + ) + + return callback(...encryptedArgs) + } catch (err) { + logger.error( + `[SOCKET] An error has occurred while emitting an event response:`, + err + ) + + return socket.emit('encryption:error', err) + } +} + module.exports = { isNonEncrypted, encryptedOn, - encryptedEmit + encryptedEmit, + encryptedCallback } From 0bb25f97eaad5afdc198e8822b23ec48a6ad4300 Mon Sep 17 00:00:00 2001 From: emad-salah Date: Sun, 18 Apr 2021 21:17:01 +0000 Subject: [PATCH 2/4] Minor callback fix --- utils/ECC/socket.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/utils/ECC/socket.js b/utils/ECC/socket.js index 8068bb27..ea50d906 100644 --- a/utils/ECC/socket.js +++ b/utils/ECC/socket.js @@ -124,7 +124,10 @@ const encryptedOn = socket => (eventName, callback) => { }) callback(safeParseJSON(decryptedMessage), response) + return } + + callback(data, response) }) } catch (err) { logger.error( From e0310f13caabe4a114659ed6d3edeb0db9f658e9 Mon Sep 17 00:00:00 2001 From: emad-salah Date: Wed, 21 Apr 2021 12:22:04 +0000 Subject: [PATCH 3/4] Bug fixes --- services/gunDB/sockets/index.js | 154 +++++++++++++-------- src/sockets.js | 228 +------------------------------- 2 files changed, 106 insertions(+), 276 deletions(-) diff --git a/services/gunDB/sockets/index.js b/services/gunDB/sockets/index.js index 0a31b554..10f715e6 100644 --- a/services/gunDB/sockets/index.js +++ b/services/gunDB/sockets/index.js @@ -18,14 +18,22 @@ const { const TipsForwarder = require('../../tipsCallback') const auth = require('../../auth/auth') -const ALLOWED_GUN_METHODS = ['map', 'map.on', 'on', 'once', 'load', 'then'] +const ALLOWED_GUN_METHODS = [ + 'map', + 'map.on', + 'on', + 'once', + 'load', + 'then', + 'open' +] /** * @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue */ /** - * @typedef {(data: ValidDataValue, key: string, _msg: any, event: any) => (void | Promise)} GunListener + * @typedef {(data: ValidDataValue, key?: string, _msg?: any, event?: any) => (void | Promise)} GunListener * @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions */ @@ -77,21 +85,34 @@ const getGunQuery = (node, path) => { } /** - * Dynamically construct a GunDB query call + * Executes a GunDB query call using the specified method * @param {any} query * @param {string} method + * @param {GunListener} listener */ -const getGunListener = (query, method) => { - const methods = method.split('.') - const listener = methods.reduce((listener, method) => { - if (typeof listener === 'function') { - return listener[method]() +const executeGunQuery = (query, method, listener) => { + if (!ALLOWED_GUN_METHODS.includes(method)) { + throw { + field: 'method', + message: `Invalid GunDB method specified (${method}). ` } + } - return listener[method]() - }, query) + if (method === 'on') { + return query.on(listener) + } - return listener + if (method === 'open') { + return query.open(listener) + } + + if (method === 'map.on') { + return query.map().on(listener) + } + + if (method === 'map.once') { + return query.map().once(listener) + } } /** @@ -113,18 +134,21 @@ const queryListenerCallback = ({ deviceId, subscriptionId }) - if (subscription && event && event.off) { - event.off() + if (subscription && !subscription.unsubscribe && event) { + Subscriptions.attachUnsubscribe({ + deviceId, + subscriptionId, + unsubscribe: () => event.off() + }) } - const eventName = `subscription:${subscriptionId}` + const eventName = `query:data` if (publicKeyForDecryption?.length > 15) { const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption) - - emit(eventName, decData, key) + emit(eventName, { subscriptionId, response: { data: decData, key } }) return } - emit(eventName, data, key) + emit(eventName, { subscriptionId, response: { data, key } }) } catch (err) { logger.error(`Error for gun rpc socket: ${err.message}`) } @@ -138,8 +162,18 @@ const queryListenerCallback = ({ * @param {import('socket.io').Socket} GunSocketOptions.socket * @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise} */ -const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { - return async ({ reconnect = false, token }, response) => { +const wrap = ({ handler, subscriptionId, encryptionId, socket }) => async ( + { reconnect, token }, + response +) => { + try { + logger.info('Subscribe function executing...') + if (!isAuthenticated()) { + logger.warn('GunDB is not yet authenticated') + socket.emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } + const callback = encryptedCallback(socket, response) const emit = encryptedEmit(socket) const subscription = Subscriptions.get({ @@ -148,15 +182,21 @@ const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { }) if (subscription && !reconnect) { - callback({ + const error = { field: 'subscription', message: "You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true " - }) + } + logger.error('Duplicate subscription:', error) + callback(error) return } - if (reconnect) { + if (subscription && reconnect) { + if (subscription.unsubscribe) { + subscription.unsubscribe() + } + Subscriptions.remove({ deviceId: encryptionId, subscriptionId @@ -187,53 +227,59 @@ const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { message: 'Subscribed successfully!', success: true }) + } catch (error) { + logger.error('Socket wrapper error:', error) } } /** @param {import('socket.io').Socket} socket */ const startSocket = socket => { try { - if (!isAuthenticated()) { - socket.emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - const emit = encryptedEmit(socket) const on = encryptedOn(socket) const { encryptionId } = socket.handshake.auth on('subscribe:query', ({ $shock, publicKey }, response) => { - const [root, path, method] = $shock.split('::') - const callback = encryptedCallback(socket, response) - - if (!ALLOWED_GUN_METHODS.includes(method)) { - callback(`Invalid method for gun rpc call: ${method}, query: ${$shock}`) - return - } - const subscriptionId = uuidv4() - const queryCallback = queryListenerCallback({ - emit, - publicKeyForDecryption: publicKey, - subscriptionId, - deviceId: encryptionId - }) + try { + if (!isAuthenticated()) { + socket.emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } - const node = getNode(root) - const query = getGunQuery(node, path) - /** @type {(cb?: GunListener) => void} */ - const listener = getGunListener(query, method) + const [root, path, method] = $shock.split('::') + const socketCallback = encryptedCallback(socket, response) - Subscriptions.add({ - deviceId: encryptionId, - subscriptionId - }) + if (!ALLOWED_GUN_METHODS.includes(method)) { + socketCallback( + `Invalid method for gun rpc call: ${method}, query: ${$shock}` + ) + return + } - callback(null, { - subscriptionId - }) + Subscriptions.add({ + deviceId: encryptionId, + subscriptionId + }) - listener(queryCallback) + const queryCallback = queryListenerCallback({ + emit, + publicKeyForDecryption: publicKey, + subscriptionId, + deviceId: encryptionId + }) + + socketCallback(null, { + subscriptionId + }) + + const node = getNode(root) + const query = getGunQuery(node, path) + + executeGunQuery(query, method, queryCallback) + } catch (error) { + emit(`query:error`, { subscriptionId, response: { data: error } }) + } }) const onChats = () => { @@ -346,7 +392,7 @@ const startSocket = socket => { TipsForwarder.addSocket(postID, socket) }) - on('unsubscribe', (subscriptionId, response) => { + on('unsubscribe', ({ subscriptionId }, response) => { const callback = encryptedCallback(socket, response) Subscriptions.remove({ deviceId: encryptionId, subscriptionId }) callback(null, { diff --git a/src/sockets.js b/src/sockets.js index 08350b15..16838705 100644 --- a/src/sockets.js +++ b/src/sockets.js @@ -11,7 +11,6 @@ const auth = require('../services/auth/auth') const LightningServices = require('../utils/lightningServices') const { isAuthenticated } = require('../services/gunDB/Mediator') const initGunDBSocket = require('../services/gunDB/sockets') -const GunEvents = require('../services/gunDB/contact-api/events') const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket') const TipsForwarder = require('../services/tipsCallback') /** @@ -23,7 +22,7 @@ module.exports = ( /** @type {import('socket.io').Server} */ io ) => { - io.on('connection', socket => { + io.on('connect', socket => { const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET const isNotificationsSocket = !!socket.handshake.auth .IS_NOTIFICATIONS_SOCKET @@ -105,6 +104,10 @@ module.exports = ( } }) + io.of('gun').on('connect', socket => { + initGunDBSocket(socket) + }) + /** * @param {string} token * @returns {Promise} @@ -129,7 +132,7 @@ module.exports = ( /** @type {null|NodeJS.Timeout} */ let pingIntervalID = null - + // TODO: Unused? io.of('shockping').on( 'connect', // TODO: make this sync @@ -181,225 +184,6 @@ module.exports = ( } ) - // TODO: do this through rpc - - const emptyUnsub = () => {} - - let chatsUnsub = emptyUnsub - - io.of('chats').on('connect', async socket => { - const on = encryptedOn(socket) - const emit = encryptedEmit(socket) - - try { - if (!isAuthenticated()) { - logger.info( - 'not authenticated in gun for chats socket, will send NOT_AUTH' - ) - emit(Common.Constants.ErrorCode.NOT_AUTH) - - return - } - - logger.info('now checking token for chats socket') - const { token } = socket.handshake.auth - const isAuth = await isValidToken(token) - - if (!isAuth) { - logger.warn('invalid token for chats socket') - emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - if (chatsUnsub !== emptyUnsub) { - logger.error( - 'Tried to set chats socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead' - ) - chatsUnsub() - chatsUnsub = emptyUnsub - } - - /** - * @param {Common.Schema.Chat[]} chats - */ - const onChats = chats => { - const processed = chats.map( - ({ - didDisconnect, - id, - lastSeenApp, - messages, - recipientPublicKey - }) => { - /** @type {Common.Schema.Chat} */ - const stripped = { - didDisconnect, - id, - lastSeenApp, - messages, - recipientAvatar: null, - recipientDisplayName: null, - recipientPublicKey - } - - return stripped - } - ) - - emit('$shock', processed) - } - - chatsUnsub = GunEvents.onChats(onChats) - - on('disconnect', () => { - chatsUnsub() - chatsUnsub = emptyUnsub - }) - } catch (e) { - logger.error('Error inside chats socket connect: ' + e.message) - emit('$error', e.message) - } - }) - - let sentReqsUnsub = emptyUnsub - - io.of('sentReqs').on('connect', async socket => { - const on = encryptedOn(socket) - const emit = encryptedEmit(socket) - - try { - if (!isAuthenticated()) { - logger.info( - 'not authenticated in gun for sentReqs socket, will send NOT_AUTH' - ) - emit(Common.Constants.ErrorCode.NOT_AUTH) - - return - } - - logger.info('now checking token for sentReqs socket') - const { token } = socket.handshake.auth - const isAuth = await isValidToken(token) - - if (!isAuth) { - logger.warn('invalid token for sentReqs socket') - emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - if (sentReqsUnsub !== emptyUnsub) { - logger.error( - 'Tried to set sentReqs socket twice, this might be due to an app restart and the old socket not being recycled by io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead' - ) - sentReqsUnsub() - sentReqsUnsub = emptyUnsub - } - - /** - * @param {Common.Schema.SimpleSentRequest[]} sentReqs - */ - const onSentReqs = sentReqs => { - const processed = sentReqs.map( - ({ - id, - recipientChangedRequestAddress, - recipientPublicKey, - timestamp - }) => { - /** - * @type {Common.Schema.SimpleSentRequest} - */ - const stripped = { - id, - recipientAvatar: null, - recipientChangedRequestAddress, - recipientDisplayName: null, - recipientPublicKey, - timestamp - } - - return stripped - } - ) - emit('$shock', processed) - } - - sentReqsUnsub = GunEvents.onSimplerSentRequests(onSentReqs) - - on('disconnect', () => { - sentReqsUnsub() - sentReqsUnsub = emptyUnsub - }) - } catch (e) { - logger.error('Error inside sentReqs socket connect: ' + e.message) - emit('$error', e.message) - } - }) - - let receivedReqsUnsub = emptyUnsub - - io.of('receivedReqs').on('connect', async socket => { - const on = encryptedOn(socket) - const emit = encryptedEmit(socket) - try { - if (!isAuthenticated()) { - logger.info( - 'not authenticated in gun for receivedReqs socket, will send NOT_AUTH' - ) - emit(Common.Constants.ErrorCode.NOT_AUTH) - - return - } - - logger.info('now checking token for receivedReqs socket') - const { token } = socket.handshake.auth - const isAuth = await isValidToken(token) - - if (!isAuth) { - logger.warn('invalid token for receivedReqs socket') - emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - if (receivedReqsUnsub !== emptyUnsub) { - logger.error( - 'Tried to set receivedReqs socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead' - ) - receivedReqsUnsub() - receivedReqsUnsub = emptyUnsub - } - - /** - * @param {ReadonlyArray} receivedReqs - */ - const onReceivedReqs = receivedReqs => { - const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => { - /** @type {Common.Schema.SimpleReceivedRequest} */ - const stripped = { - id, - requestorAvatar: null, - requestorDisplayName: null, - requestorPK, - timestamp - } - - return stripped - }) - - emit('$shock', processed) - } - - receivedReqsUnsub = GunEvents.onSimplerReceivedRequests(onReceivedReqs) - - on('disconnect', () => { - receivedReqsUnsub() - receivedReqsUnsub = emptyUnsub - }) - } catch (e) { - logger.error('Error inside receivedReqs socket connect: ' + e.message) - emit('$error', e.message) - } - }) io.of('streams').on('connect', socket => { console.log('a user connected') socket.on('postID', postID => { From 45ccd627b5d8627b0e9cf01d67ea63b1794dcec7 Mon Sep 17 00:00:00 2001 From: emad-salah Date: Wed, 21 Apr 2021 21:23:22 +0000 Subject: [PATCH 4/4] Bug fixes --- services/gunDB/sockets/index.js | 5 ----- 1 file changed, 5 deletions(-) diff --git a/services/gunDB/sockets/index.js b/services/gunDB/sockets/index.js index 10f715e6..f1d24708 100644 --- a/services/gunDB/sockets/index.js +++ b/services/gunDB/sockets/index.js @@ -15,7 +15,6 @@ const { encryptedOn, encryptedCallback } = require('../../../utils/ECC/socket') -const TipsForwarder = require('../../tipsCallback') const auth = require('../../auth/auth') const ALLOWED_GUN_METHODS = [ @@ -388,10 +387,6 @@ const startSocket = socket => { }) ) - on('streams:postID', postID => { - TipsForwarder.addSocket(postID, socket) - }) - on('unsubscribe', ({ subscriptionId }, response) => { const callback = encryptedCallback(socket, response) Subscriptions.remove({ deviceId: encryptionId, subscriptionId })