diff --git a/services/gunDB/Mediator/index.js b/services/gunDB/Mediator/index.js index cbcaf748..990d23ce 100644 --- a/services/gunDB/Mediator/index.js +++ b/services/gunDB/Mediator/index.js @@ -257,7 +257,7 @@ const Config = require('../config') /** * @typedef {object} SimpleSocket * @prop {(eventName: string, data?: Emission|EncryptedEmissionLegacy|EncryptedEmission|ValidDataValue) => void} emit - * @prop {(eventName: string, handler: (data: any) => void) => void} on + * @prop {(eventName: string, handler: (data: any, callback: (err?: any, data?: any) => void) => void) => void} on * @prop {{ auth: { [key: string]: any } }} handshake */ diff --git a/services/gunDB/sockets/index.js b/services/gunDB/sockets/index.js new file mode 100644 index 00000000..0a31b554 --- /dev/null +++ b/services/gunDB/sockets/index.js @@ -0,0 +1,366 @@ +/** + * @format + */ + +const logger = require('winston') +const Common = require('shock-common') +const uuidv4 = require('uuid/v4') + +const { getGun, getUser, isAuthenticated } = require('../Mediator') +const { deepDecryptIfNeeded } = require('../rpc') +const Subscriptions = require('./subscriptions') +const GunEvents = require('../contact-api/events') +const { + encryptedEmit, + encryptedOn, + encryptedCallback +} = require('../../../utils/ECC/socket') +const TipsForwarder = require('../../tipsCallback') +const auth = require('../../auth/auth') + +const ALLOWED_GUN_METHODS = ['map', 'map.on', 'on', 'once', 'load', 'then'] + +/** + * @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue + */ + +/** + * @typedef {(data: ValidDataValue, key: string, _msg: any, event: any) => (void | Promise)} GunListener + * @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions + */ + +/** + * @param {string} token + * @returns {Promise} + */ +const isValidToken = async token => { + const validation = await auth.validateToken(token) + + if (typeof validation !== 'object') { + return false + } + + if (validation === null) { + return false + } + + if (typeof validation.valid !== 'boolean') { + return false + } + + return validation.valid +} + +/** + * @param {string} root + */ +const getNode = root => { + if (root === '$gun') { + return getGun() + } + + if (root === '$user') { + return getUser() + } + + return getGun().user(root) +} + +/** + * @param {import("../contact-api/SimpleGUN").GUNNode} node + * @param {string} path + */ +const getGunQuery = (node, path) => { + const bits = path.split('>') + const query = bits.reduce((gunQuery, bit) => gunQuery.get(bit), node) + return query +} + +/** + * Dynamically construct a GunDB query call + * @param {any} query + * @param {string} method + */ +const getGunListener = (query, method) => { + const methods = method.split('.') + const listener = methods.reduce((listener, method) => { + if (typeof listener === 'function') { + return listener[method]() + } + + return listener[method]() + }, query) + + return listener +} + +/** + * @param {Object} queryData + * @param {(eventName: string, ...args: any[]) => Promise} queryData.emit + * @param {string} queryData.publicKeyForDecryption + * @param {string} queryData.subscriptionId + * @param {string} queryData.deviceId + * @returns {GunListener} + */ +const queryListenerCallback = ({ + emit, + publicKeyForDecryption, + subscriptionId, + deviceId +}) => async (data, key, _msg, event) => { + try { + const subscription = Subscriptions.get({ + deviceId, + subscriptionId + }) + if (subscription && event && event.off) { + event.off() + } + const eventName = `subscription:${subscriptionId}` + if (publicKeyForDecryption?.length > 15) { + const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption) + + emit(eventName, decData, key) + return + } + + emit(eventName, data, key) + } catch (err) { + logger.error(`Error for gun rpc socket: ${err.message}`) + } +} + +/** + * @param {Object} GunSocketOptions + * @param {() => (import('./subscriptions').Unsubscribe | void)} GunSocketOptions.handler + * @param {string} GunSocketOptions.subscriptionId + * @param {string} GunSocketOptions.encryptionId + * @param {import('socket.io').Socket} GunSocketOptions.socket + * @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise} + */ +const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { + return async ({ reconnect = false, token }, response) => { + const callback = encryptedCallback(socket, response) + const emit = encryptedEmit(socket) + const subscription = Subscriptions.get({ + deviceId: encryptionId, + subscriptionId + }) + + if (subscription && !reconnect) { + callback({ + field: 'subscription', + message: + "You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true " + }) + return + } + + if (reconnect) { + Subscriptions.remove({ + deviceId: encryptionId, + subscriptionId + }) + } + + if (!subscription || reconnect) { + const isAuth = await isValidToken(token) + + if (!isAuth) { + logger.warn('invalid token specified') + emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } + + const unsubscribe = handler() + + if (unsubscribe) { + Subscriptions.attachUnsubscribe({ + deviceId: encryptionId, + subscriptionId, + unsubscribe + }) + } + } + + callback(null, { + message: 'Subscribed successfully!', + success: true + }) + } +} + +/** @param {import('socket.io').Socket} socket */ +const startSocket = socket => { + try { + if (!isAuthenticated()) { + socket.emit(Common.Constants.ErrorCode.NOT_AUTH) + return + } + + const emit = encryptedEmit(socket) + const on = encryptedOn(socket) + const { encryptionId } = socket.handshake.auth + + on('subscribe:query', ({ $shock, publicKey }, response) => { + const [root, path, method] = $shock.split('::') + const callback = encryptedCallback(socket, response) + + if (!ALLOWED_GUN_METHODS.includes(method)) { + callback(`Invalid method for gun rpc call: ${method}, query: ${$shock}`) + return + } + + const subscriptionId = uuidv4() + const queryCallback = queryListenerCallback({ + emit, + publicKeyForDecryption: publicKey, + subscriptionId, + deviceId: encryptionId + }) + + const node = getNode(root) + const query = getGunQuery(node, path) + /** @type {(cb?: GunListener) => void} */ + const listener = getGunListener(query, method) + + Subscriptions.add({ + deviceId: encryptionId, + subscriptionId + }) + + callback(null, { + subscriptionId + }) + + listener(queryCallback) + }) + + const onChats = () => { + return GunEvents.onChats(chats => { + const processed = chats.map( + ({ + didDisconnect, + id, + lastSeenApp, + messages, + recipientPublicKey + }) => { + /** @type {Common.Schema.Chat} */ + const stripped = { + didDisconnect, + id, + lastSeenApp, + messages, + recipientAvatar: null, + recipientDisplayName: null, + recipientPublicKey + } + + return stripped + } + ) + + emit('chats', processed) + }) + } + + on( + 'subscribe:chats', + wrap({ + handler: onChats, + encryptionId, + subscriptionId: 'chats', + socket + }) + ) + + const onSentRequests = () => { + return GunEvents.onSimplerSentRequests(sentReqs => { + const processed = sentReqs.map( + ({ + id, + recipientChangedRequestAddress, + recipientPublicKey, + timestamp + }) => { + /** + * @type {Common.Schema.SimpleSentRequest} + */ + const stripped = { + id, + recipientAvatar: null, + recipientChangedRequestAddress, + recipientDisplayName: null, + recipientPublicKey, + timestamp + } + + return stripped + } + ) + emit('sentRequests', processed) + }) + } + + on( + 'subscribe:sentRequests', + wrap({ + handler: onSentRequests, + encryptionId, + subscriptionId: 'sentRequests', + socket + }) + ) + + const onReceivedRequests = () => { + return GunEvents.onSimplerReceivedRequests(receivedReqs => { + const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => { + /** @type {Common.Schema.SimpleReceivedRequest} */ + const stripped = { + id, + requestorAvatar: null, + requestorDisplayName: null, + requestorPK, + timestamp + } + + return stripped + }) + + emit('receivedRequests', processed) + }) + } + + on( + 'subscribe:receivedRequests', + wrap({ + handler: onReceivedRequests, + encryptionId, + subscriptionId: 'receivedRequests', + socket + }) + ) + + on('streams:postID', postID => { + TipsForwarder.addSocket(postID, socket) + }) + + on('unsubscribe', (subscriptionId, response) => { + const callback = encryptedCallback(socket, response) + Subscriptions.remove({ deviceId: encryptionId, subscriptionId }) + callback(null, { + message: 'Unsubscribed successfully!', + success: true + }) + }) + + socket.on('disconnect', () => { + Subscriptions.removeDevice({ deviceId: encryptionId }) + }) + } catch (err) { + logger.error('GUNRPC: ' + err.message) + } +} + +module.exports = startSocket diff --git a/services/gunDB/sockets/subscriptions.js b/services/gunDB/sockets/subscriptions.js new file mode 100644 index 00000000..353c00f8 --- /dev/null +++ b/services/gunDB/sockets/subscriptions.js @@ -0,0 +1,127 @@ +/** + * @typedef {() => void} Unsubscribe + */ + +/** @type {Map void, metadata?: object }>>} */ +const userSubscriptions = new Map() + +/** + * Adds a new Subscription + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + * @param {(Unsubscribe)=} subscription.unsubscribe + * @param {(object)=} subscription.metadata + */ +const add = ({ deviceId, subscriptionId, unsubscribe, metadata }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + const subscriptions = deviceSubscriptions ?? new Map() + subscriptions.set(subscriptionId, { + subscriptionId, + unsubscribe, + metadata + }) + userSubscriptions.set(deviceId, subscriptions) +} + +/** + * Adds a new Subscription + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + * @param {Unsubscribe} subscription.unsubscribe + */ +const attachUnsubscribe = ({ + deviceId, + subscriptionId, + unsubscribe +}) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + const subscriptions = deviceSubscriptions + + if (!subscriptions) { + return + } + + const subscription = subscriptions.get(subscriptionId) + + if (!subscription) { + return + } + + subscriptions.set(subscriptionId, { + ...subscription, + unsubscribe + }) + userSubscriptions.set(deviceId, subscriptions) +} + +/** + * Unsubscribes from a GunDB query + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + */ +const remove = ({ deviceId, subscriptionId }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + const subscriptions = deviceSubscriptions ?? new Map() + const subscription = subscriptions.get(subscriptionId); + + if (subscription?.unsubscribe) { + subscription.unsubscribe() + } + + subscriptions.delete(subscriptionId) + userSubscriptions.set(deviceId, subscriptions) +} + +/** + * Unsubscribes from all GunDB queries for a specific device + * @param {Object} subscription + * @param {string} subscription.deviceId + */ +const removeDevice = ({ deviceId }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId); + + if (!deviceSubscriptions) { + return + } + + Array.from(deviceSubscriptions.values()).map(subscription => { + if (subscription && subscription.unsubscribe) { + subscription.unsubscribe() + } + + return subscription + }) + + userSubscriptions.set(deviceId, new Map()); +} + +/** + * Retrieves the specified subscription's info if it exists + * @param {Object} subscription + * @param {string} subscription.deviceId + * @param {string} subscription.subscriptionId + */ +const get = ({ deviceId, subscriptionId }) => { + const deviceSubscriptions = userSubscriptions.get(deviceId) + + if (!deviceSubscriptions) { + return false + } + + const subscription = deviceSubscriptions.get(subscriptionId) + return subscription +} + +module.exports = { + add, + attachUnsubscribe, + get, + remove, + removeDevice +} \ No newline at end of file diff --git a/src/sockets.js b/src/sockets.js index fcc42b7a..08350b15 100644 --- a/src/sockets.js +++ b/src/sockets.js @@ -8,16 +8,10 @@ const Common = require('shock-common') const mapValues = require('lodash/mapValues') const auth = require('../services/auth/auth') -const Encryption = require('../utils/encryptionStore') const LightningServices = require('../utils/lightningServices') -const { - getGun, - getUser, - isAuthenticated -} = require('../services/gunDB/Mediator') -const { deepDecryptIfNeeded } = require('../services/gunDB/rpc') +const { isAuthenticated } = require('../services/gunDB/Mediator') +const initGunDBSocket = require('../services/gunDB/sockets') const GunEvents = require('../services/gunDB/contact-api/events') -const SchemaManager = require('../services/schema') const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket') const TipsForwarder = require('../services/tipsCallback') /** @@ -29,204 +23,7 @@ module.exports = ( /** @type {import('socket.io').Server} */ io ) => { - // This should be used for encrypting and emitting your data - const encryptedEmitLegacy = ({ eventName, data, socket }) => { - try { - if (Encryption.isNonEncrypted(eventName)) { - return socket.emit(eventName, data) - } - - const deviceId = socket.handshake.auth['x-shockwallet-device-id'] - const authorized = Encryption.isAuthorizedDevice({ deviceId }) - - if (!deviceId) { - throw { - field: 'deviceId', - message: 'Please specify a device ID' - } - } - - if (!authorized) { - throw { - field: 'deviceId', - message: 'Please exchange keys with the API before using the socket' - } - } - - const encryptedMessage = Encryption.encryptMessage({ - message: data, - deviceId - }) - - return socket.emit(eventName, encryptedMessage) - } catch (err) { - logger.error( - `[SOCKET] An error has occurred while encrypting an event (${eventName}):`, - err - ) - - return socket.emit('encryption:error', err) - } - } - - const onNewInvoice = (socket, subID) => { - const { lightning } = LightningServices.services - logger.warn('Subscribing to invoices socket...' + subID) - const stream = lightning.subscribeInvoices({}) - stream.on('data', data => { - logger.info('[SOCKET] New invoice data:', data) - encryptedEmitLegacy({ eventName: 'invoice:new', data, socket }) - if (!data.settled) { - return - } - SchemaManager.AddOrder({ - type: 'invoice', - amount: parseInt(data.amt_paid_sat, 10), - coordinateHash: data.r_hash.toString('hex'), - coordinateIndex: parseInt(data.add_index, 10), - inbound: true, - toLndPub: data.payment_addr - }) - }) - stream.on('end', () => { - logger.info('New invoice stream ended, starting a new one...') - // Prevents call stack overflow exceptions - //process.nextTick(() => onNewInvoice(socket)) - }) - stream.on('error', err => { - logger.error('New invoice stream error:' + subID, err) - }) - stream.on('status', status => { - logger.warn('New invoice stream status:' + subID, status) - switch (status.code) { - case 0: { - logger.info('[event:invoice:new] stream ok') - break - } - case 1: { - logger.info( - '[event:invoice:new] stream canceled, probably socket disconnected' - ) - break - } - case 2: { - logger.warn('[event:invoice:new] got UNKNOWN error status') - break - } - case 12: { - logger.warn( - '[event:invoice:new] LND locked, new registration in 60 seconds' - ) - process.nextTick(() => - setTimeout(() => onNewInvoice(socket, subID), 60000) - ) - break - } - case 13: { - //https://grpc.github.io/grpc/core/md_doc_statuscodes.html - logger.error('[event:invoice:new] INTERNAL LND error') - break - } - case 14: { - logger.error( - '[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...' - ) - process.nextTick(() => - setTimeout(() => onNewInvoice(socket, subID), 30000) - ) - break - } - default: { - logger.error('[event:invoice:new] UNKNOWN LND error') - } - } - }) - return () => { - stream.cancel() - } - } - - const onNewTransaction = (socket, subID) => { - const { lightning } = LightningServices.services - const stream = lightning.subscribeTransactions({}) - logger.warn('Subscribing to transactions socket...' + subID) - stream.on('data', data => { - logger.info('[SOCKET] New transaction data:', data) - - Promise.all(data.dest_addresses.map(SchemaManager.isTmpChainOrder)).then( - responses => { - const hasOrder = responses.some(res => res !== false) - if (hasOrder && data.num_confirmations > 0) { - //buddy needs to manage this - } else { - //business as usual - encryptedEmitLegacy({ eventName: 'transaction:new', data, socket }) - } - } - ) - }) - stream.on('end', () => { - logger.info('New transactions stream ended, starting a new one...') - //process.nextTick(() => onNewTransaction(socket)) - }) - stream.on('error', err => { - logger.error('New transactions stream error:' + subID, err) - }) - stream.on('status', status => { - logger.info('New transactions stream status:' + subID, status) - switch (status.code) { - case 0: { - logger.info('[event:transaction:new] stream ok') - break - } - case 1: { - logger.info( - '[event:transaction:new] stream canceled, probably socket disconnected' - ) - break - } - case 2: { - //Happens to fire when the grpc client lose access to macaroon file - logger.warn('[event:transaction:new] got UNKNOWN error status') - break - } - case 12: { - logger.warn( - '[event:transaction:new] LND locked, new registration in 60 seconds' - ) - process.nextTick(() => - setTimeout(() => onNewTransaction(socket, subID), 60000) - ) - break - } - case 13: { - //https://grpc.github.io/grpc/core/md_doc_statuscodes.html - logger.error('[event:transaction:new] INTERNAL LND error') - break - } - case 14: { - logger.error( - '[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...' - ) - process.nextTick(() => - setTimeout(() => onNewTransaction(socket, subID), 30000) - ) - break - } - default: { - logger.error('[event:transaction:new] UNKNOWN LND error') - } - } - }) - return () => { - stream.cancel() - } - } - io.on('connection', socket => { - logger.info(`io.onconnection`) - logger.info('socket.handshake', socket.handshake) - const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET const isNotificationsSocket = !!socket.handshake.auth .IS_NOTIFICATIONS_SOCKET @@ -240,89 +37,6 @@ module.exports = ( const subID = Math.floor(Math.random() * 1000).toString() const isNotifications = isNotificationsSocket ? 'notifications' : '' logger.info('[LND] New LND Socket created:' + isNotifications + subID) - /* not used by wallet anymore - const cancelInvoiceStream = onNewInvoice(socket, subID) - const cancelTransactionStream = onNewTransaction(socket, subID) - socket.on('disconnect', () => { - logger.info('LND socket disconnected:' + isNotifications + subID) - cancelInvoiceStream() - cancelTransactionStream() - })*/ - } - }) - - io.of('gun').on('connect', socket => { - // TODO: off() - - try { - if (!isAuthenticated()) { - socket.emit(Common.Constants.ErrorCode.NOT_AUTH) - return - } - - const emit = encryptedEmit(socket) - - const { $shock, publicKeyForDecryption } = socket.handshake.auth - - const [root, path, method] = $shock.split('::') - - // eslint-disable-next-line init-declarations - let node - - if (root === '$gun') { - node = getGun() - } else if (root === '$user') { - node = getUser() - } else { - node = getGun().user(root) - } - - for (const bit of path.split('>')) { - node = node.get(bit) - } - - /** - * @param {ValidDataValue} data - * @param {string} key - */ - const listener = async (data, key) => { - try { - if ( - typeof publicKeyForDecryption === 'string' && - publicKeyForDecryption !== 'undefined' && - publicKeyForDecryption.length > 15 - ) { - const decData = await deepDecryptIfNeeded( - data, - publicKeyForDecryption - ) - - emit('$shock', decData, key) - } else { - emit('$shock', data, key) - } - } catch (err) { - logger.error( - `Error for gun rpc socket, query ${$shock} -> ${err.message}` - ) - } - } - - if (method === 'on') { - node.on(listener) - } else if (method === 'open') { - node.open(listener) - } else if (method === 'map.on') { - node.map().on(listener) - } else if (method === 'map.once') { - node.map().once(listener) - } else { - throw new TypeError( - `Invalid method for gun rpc call : ${method}, query: ${$shock}` - ) - } - } catch (err) { - logger.error('GUNRPC: ' + err.message) } }) diff --git a/utils/ECC/socket.js b/utils/ECC/socket.js index 771aee6d..8068bb27 100644 --- a/utils/ECC/socket.js +++ b/utils/ECC/socket.js @@ -20,6 +20,7 @@ const nonEncryptedEvents = [ * @typedef {import('../../services/gunDB/Mediator').EncryptedEmission} EncryptedEmission * @typedef {import('../../services/gunDB/Mediator').EncryptedEmissionLegacy} EncryptedEmissionLegacy * @typedef {import('../../services/gunDB/contact-api/SimpleGUN').ValidDataValue} ValidDataValue + * @typedef {(data: any, callback: (error?: any, data?: any) => void) => void} SocketOnListener */ /** @@ -83,7 +84,7 @@ const encryptedEmit = socket => async (eventName, ...args) => { /** * @param {SimpleSocket} socket - * @returns {(eventName: string, callback: (data: any) => void) => void} + * @returns {(eventName: string, callback: SocketOnListener) => void} */ const encryptedOn = socket => (eventName, callback) => { try { @@ -110,9 +111,9 @@ const encryptedOn = socket => (eventName, callback) => { } } - socket.on(eventName, async data => { + socket.on(eventName, async (data, response) => { if (isNonEncrypted(eventName)) { - callback(data) + callback(data, response) return } @@ -122,7 +123,7 @@ const encryptedOn = socket => (eventName, callback) => { encryptedMessage: data }) - callback(safeParseJSON(decryptedMessage)) + callback(safeParseJSON(decryptedMessage), response) } }) } catch (err) { @@ -135,8 +136,60 @@ const encryptedOn = socket => (eventName, callback) => { } } +/** + * @param {SimpleSocket} socket + * @param {(error?: any, data?: any) => void} callback + * @returns {(...args: any[]) => Promise} + */ +const encryptedCallback = (socket, callback) => async (...args) => { + try { + const deviceId = socket.handshake.auth.encryptionId + + if (!deviceId) { + throw { + field: 'deviceId', + message: 'Please specify a device ID' + } + } + + const authorized = ECC.isAuthorizedDevice({ deviceId }) + + if (!authorized) { + throw { + field: 'deviceId', + message: 'Please exchange keys with the API before using the socket' + } + } + + const encryptedArgs = await Promise.all( + args.map(async data => { + if (!data) { + return data + } + + const encryptedMessage = await ECC.encryptMessage({ + message: typeof data === 'object' ? JSON.stringify(data) : data, + deviceId + }) + + return encryptedMessage + }) + ) + + return callback(...encryptedArgs) + } catch (err) { + logger.error( + `[SOCKET] An error has occurred while emitting an event response:`, + err + ) + + return socket.emit('encryption:error', err) + } +} + module.exports = { isNonEncrypted, encryptedOn, - encryptedEmit + encryptedEmit, + encryptedCallback }