From 0681464817dc16301ca9c45c995c2a01b5f5eff2 Mon Sep 17 00:00:00 2001 From: Daniel Lugo Date: Sun, 13 Jun 2021 12:40:45 -0400 Subject: [PATCH] Remove outdated gun events --- services/gunDB/Mediator/index.js | 3 - services/gunDB/contact-api/events/index.js | 427 +----------------- .../contact-api/events/onReceivedReqs.js | 138 ------ .../gunDB/contact-api/events/onSentReqs.js | 128 ------ 4 files changed, 5 insertions(+), 691 deletions(-) delete mode 100644 services/gunDB/contact-api/events/onReceivedReqs.js delete mode 100644 services/gunDB/contact-api/events/onSentReqs.js diff --git a/services/gunDB/Mediator/index.js b/services/gunDB/Mediator/index.js index 66b2ed24..d7e813a6 100644 --- a/services/gunDB/Mediator/index.js +++ b/services/gunDB/Mediator/index.js @@ -394,8 +394,6 @@ const authenticate = async (alias, pass, __user) => { API.Jobs.onOrders(_user, gun, mySEA) API.Jobs.lastSeenNode(_user) - API.Events.onCurrentHandshakeAddress(() => {}, user)() - // API.Events.onOutgoing(() => {})() API.Events.onSeedBackup(() => {}, user, mySEA) return _user._.sea.pub @@ -452,7 +450,6 @@ const authenticate = async (alias, pass, __user) => { API.Jobs.onOrders(_user, gun, mySEA) API.Jobs.lastSeenNode(_user) - API.Events.onCurrentHandshakeAddress(() => {}, user)() API.Events.onSeedBackup(() => {}, user, mySEA) return ack.sea.pub diff --git a/services/gunDB/contact-api/events/index.js b/services/gunDB/contact-api/events/index.js index bd488978..29a44a2b 100644 --- a/services/gunDB/contact-api/events/index.js +++ b/services/gunDB/contact-api/events/index.js @@ -2,427 +2,22 @@ * @prettier */ const debounce = require('lodash/debounce') -const logger = require('winston') + const { - Constants: { ErrorCode }, - Schema, - Utils: CommonUtils + Constants: { ErrorCode } } = require('shock-common') const Key = require('../key') -const Utils = require('../utils') -/** - * @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode - * @typedef {import('../SimpleGUN').GUNNode} GUNNode - * @typedef {import('../SimpleGUN').ISEA} ISEA - * @typedef {import('../SimpleGUN').ListenerData} ListenerData - * @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest - * @typedef {import('shock-common').Schema.Message} Message - * @typedef {import('shock-common').Schema.Outgoing} Outgoing - * @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing - * @typedef {import('shock-common').Schema.Chat} Chat - * @typedef {import('shock-common').Schema.ChatMessage} ChatMessage - * @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest - * @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest - */ const DEBOUNCE_WAIT_TIME = 500 -/** - * @param {(userToIncoming: Record) => void} cb - * @param {UserGUNNode} user Pass only for testing purposes. - * @param {ISEA} SEA - * @returns {void} - */ -const __onUserToIncoming = (cb, user, SEA) => { - if (!user.is) { - throw new Error(ErrorCode.NOT_AUTH) - } - - const callb = debounce(cb, DEBOUNCE_WAIT_TIME) - - /** @type {Record} */ - const userToIncoming = {} - - const mySecret = require('../../Mediator').getMySecret() - - user - .get(Key.USER_TO_INCOMING) - .map() - .on(async (encryptedIncomingID, userPub) => { - if (typeof encryptedIncomingID !== 'string') { - if (encryptedIncomingID === null) { - // on disconnect - delete userToIncoming[userPub] - } else { - logger.error( - 'got a non string non null value inside user to incoming' - ) - } - return - } - - if (encryptedIncomingID.length === 0) { - logger.error('got an empty string value') - return - } - - const incomingID = await SEA.decrypt(encryptedIncomingID, mySecret) - - if (typeof incomingID === 'undefined') { - logger.warn('could not decrypt incomingID inside __onUserToIncoming') - return - } - - userToIncoming[userPub] = incomingID - - callb(userToIncoming) - }) -} - -/** @type {Set<(addr: string|null) => void>} */ -const addressListeners = new Set() - -/** @type {string|null} */ -let currentAddress = null - -const getHandshakeAddress = () => currentAddress - -/** @param {string|null} addr */ -const setAddress = addr => { - currentAddress = addr - addressListeners.forEach(l => l(currentAddress)) -} - -let addrSubbed = false - -/** - * @param {(currentHandshakeAddress: string|null) => void} cb - * @param {UserGUNNode} user - * @returns {() => void} - */ -const onCurrentHandshakeAddress = (cb, user) => { - if (!user.is) { - throw new Error(ErrorCode.NOT_AUTH) - } - - addressListeners.add(cb) - - cb(currentAddress) - - if (!addrSubbed) { - addrSubbed = true - - user.get(Key.CURRENT_HANDSHAKE_ADDRESS).on(addr => { - if (typeof addr !== 'string') { - logger.error('expected handshake address to be an string') - - setAddress(null) - - return - } - - setAddress(addr) - }) - } - - return () => { - addressListeners.delete(cb) - } -} - -/** - * @param {(messages: Record) => void} cb - * @param {string} userPK Public key of the user from whom the incoming - * messages will be obtained. - * @param {string} incomingFeedID ID of the outgoing feed from which the - * incoming messages will be obtained. - * @param {GUNNode} gun (Pass only for testing purposes) - * @param {UserGUNNode} user - * @param {ISEA} SEA - * @returns {void} - */ -const onIncomingMessages = (cb, userPK, incomingFeedID, gun, user, SEA) => { - if (!user.is) { - throw new Error(ErrorCode.NOT_AUTH) - } - - const callb = debounce(cb, DEBOUNCE_WAIT_TIME) - - const otherUser = gun.user(userPK) - - /** - * @type {Record} - */ - const messages = {} - - callb(messages) - - otherUser - .get(Key.OUTGOINGS) - .get(incomingFeedID) - .get(Key.MESSAGES) - .map() - .on(async (data, key) => { - if (!Schema.isMessage(data)) { - logger.warn('non-message received') - return - } - - /** @type {string} */ - const recipientEpub = await Utils.pubToEpub(userPK) - - const secret = await SEA.secret(recipientEpub, user._.sea) - - let { body } = data - body = await SEA.decrypt(body, secret) - - messages[key] = { - body, - timestamp: data.timestamp - } - - callb(messages) - }) -} - -/** - * @typedef {Record} Outgoings - * @typedef {(outgoings: Outgoings) => void} OutgoingsListener - */ - -/** - * @type {Outgoings} - */ -let currentOutgoings = {} - -const getCurrentOutgoings = () => currentOutgoings - -/** @type {Set} */ -const outgoingsListeners = new Set() - -outgoingsListeners.add(o => { - const values = Object.values(o) - const nulls = values.filter(x => x === null).length - const nonNulls = values.length - nulls - - logger.info(`new outgoings, ${nulls} nulls and ${nonNulls} nonNulls`) -}) - -const notifyOutgoingsListeners = () => { - outgoingsListeners.forEach(l => l(currentOutgoings)) -} - -let outSubbed = false - -/** - * @param {OutgoingsListener} cb - * @returns {() => void} - */ -const onOutgoing = cb => { - outgoingsListeners.add(cb) - cb(currentOutgoings) - - if (!outSubbed) { - const user = require('../../Mediator').getUser() - user.get(Key.OUTGOINGS).open( - debounce(async data => { - try { - if (typeof data !== 'object' || data === null) { - currentOutgoings = {} - notifyOutgoingsListeners() - return - } - - /** @type {Record} */ - const newOuts = {} - - const SEA = require('../../Mediator').mySEA - const mySecret = await Utils.mySecret() - - await CommonUtils.asyncForEach( - Object.entries(data), - async ([id, out]) => { - if (typeof out !== 'object') { - return - } - - if (out === null) { - newOuts[id] = null - return - } - - const { with: encPub, messages } = out - - if (typeof encPub !== 'string') { - return - } - - const pub = await SEA.decrypt(encPub, mySecret) - - if (!newOuts[id]) { - newOuts[id] = { - with: pub, - messages: {} - } - } - - const ourSec = await SEA.secret( - await Utils.pubToEpub(pub), - user._.sea - ) - - if (typeof messages === 'object' && messages !== null) { - await CommonUtils.asyncForEach( - Object.entries(messages), - async ([mid, msg]) => { - if (typeof msg === 'object' && msg !== null) { - if ( - typeof msg.body === 'string' && - typeof msg.timestamp === 'number' - ) { - const newOut = newOuts[id] - if (!newOut) { - return - } - newOut.messages[mid] = { - body: await SEA.decrypt(msg.body, ourSec), - timestamp: msg.timestamp - } - } - } - } - ) - } - } - ) - - currentOutgoings = newOuts - notifyOutgoingsListeners() - } catch (e) { - logger.info('--------------------------') - logger.info('Events -> onOutgoing') - logger.info(e) - logger.info('--------------------------') - } - }, 400) - ) - - outSubbed = true - } - - return () => { - outgoingsListeners.delete(cb) - } -} -//////////////////////////////////////////////////////////////////////////////// -/** - * @typedef {(chats: Chat[]) => void} ChatsListener - */ - -/** @type {Chat[]} */ -let currentChats = [] - -const getChats = () => currentChats - -/** @type {Set} */ -const chatsListeners = new Set() - -chatsListeners.add(c => { - logger.info(`Chats: ${c.length}`) -}) - -const notifyChatsListeners = () => { - chatsListeners.forEach(l => l(currentChats)) -} - -const processChats = debounce(() => { - const Streams = require('../streams') - const currentOutgoings = getCurrentOutgoings() - const existingOutgoings = /** @type {[string, Outgoing][]} */ (Object.entries( - currentOutgoings - ).filter(([_, o]) => o !== null)) - const pubToFeed = Streams.getPubToFeed() - - /** @type {Chat[]} */ - const newChats = [] - - for (const [outID, out] of existingOutgoings) { - /** @type {ChatMessage[]} */ - let msgs = Object.entries(out.messages) - .map(([mid, m]) => ({ - id: mid, - outgoing: true, - body: m.body, - timestamp: m.timestamp - })) - // filter out null messages - .filter(m => typeof m.body === 'string') - - const incoming = pubToFeed[out.with] - - if (Array.isArray(incoming)) { - msgs = [...msgs, ...incoming] - } - - /** @type {Chat} */ - const chat = { - recipientPublicKey: out.with, - didDisconnect: pubToFeed[out.with] === 'disconnected', - id: out.with + outID, - messages: msgs, - recipientAvatar: null, - recipientDisplayName: null, - lastSeenApp: null - } - - newChats.push(chat) - } - - currentChats = newChats.filter( - c => - Array.isArray(pubToFeed[c.recipientPublicKey]) || - pubToFeed[c.recipientPublicKey] === 'disconnected' - ) - - notifyChatsListeners() -}, 750) - -let onChatsSubbed = false - -/** - * Massages all of the more primitive data structures into a more manageable - * 'Chat' paradigm. - * @param {ChatsListener} cb - * @returns {() => void} - */ -const onChats = cb => { - if (!chatsListeners.add(cb)) { - throw new Error('Tried to subscribe twice') - } - cb(currentChats) - - if (!onChatsSubbed) { - const Streams = require('../streams') - // onOutgoing(processChats) - Streams.onPubToFeed(processChats) - onChatsSubbed = true - } - - return () => { - if (!chatsListeners.delete(cb)) { - throw new Error('Tried to unsubscribe twice') - } - } -} - /** @type {string|null} */ let currentSeedBackup = null /** * @param {(seedBackup: string|null) => void} cb - * @param {UserGUNNode} user - * @param {ISEA} SEA + * @param {import('../SimpleGUN').UserGUNNode} user + * @param {import('../SimpleGUN').ISEA} SEA * @throws {Error} If user hasn't been auth. * @returns {void} */ @@ -445,17 +40,5 @@ const onSeedBackup = (cb, user, SEA) => { } module.exports = { - __onUserToIncoming, - onCurrentHandshakeAddress, - onIncomingMessages, - onOutgoing, - getCurrentOutgoings, - onSimplerReceivedRequests: require('./onReceivedReqs').onReceivedReqs, - onSimplerSentRequests: require('./onSentReqs').onSentReqs, - getCurrentSentReqs: require('./onSentReqs').getCurrentSentReqs, - getCurrentReceivedReqs: require('./onReceivedReqs').getReceivedReqs, - onSeedBackup, - onChats, - getHandshakeAddress, - getChats + onSeedBackup } diff --git a/services/gunDB/contact-api/events/onReceivedReqs.js b/services/gunDB/contact-api/events/onReceivedReqs.js deleted file mode 100644 index 8c5d3226..00000000 --- a/services/gunDB/contact-api/events/onReceivedReqs.js +++ /dev/null @@ -1,138 +0,0 @@ -/** @format */ -const debounce = require('lodash/debounce') -const logger = require('winston') -const { Schema } = require('shock-common') -const size = require('lodash/size') - -const Key = require('../key') -const Streams = require('../streams') - -/** - * @typedef {Readonly} SimpleReceivedRequest - * @typedef {(reqs: ReadonlyArray) => void} Listener - */ - -/** @type {Set} */ -const listeners = new Set() - -/** @type {string|null} */ -let currentAddress = null - -/** @type {Record} */ -let currReceivedReqsMap = {} - -/** - * Unprocessed requests in current handshake node. - * @type {Record} - */ -let currAddressData = {} - -/** @returns {SimpleReceivedRequest[]} */ -const getReceivedReqs = () => Object.values(currReceivedReqsMap) -/** @param {Record} reqs */ -const setReceivedReqsMap = reqs => { - currReceivedReqsMap = reqs - listeners.forEach(l => l(getReceivedReqs())) -} - -listeners.add(() => { - logger.info(`new received reqs: ${size(getReceivedReqs())}`) -}) - -const react = debounce(() => { - /** @type {Record} */ - const newReceivedReqsMap = {} - - const pubToFeed = Streams.getPubToFeed() - - for (const [id, req] of Object.entries(currAddressData)) { - const inContact = Array.isArray(pubToFeed[req.from]) - const isDisconnected = pubToFeed[req.from] === 'disconnected' - - if (!inContact && !isDisconnected) { - newReceivedReqsMap[req.from] = { - id, - requestorAvatar: null, - requestorDisplayName: null, - requestorPK: req.from, - timestamp: req.timestamp - } - } - } - - setReceivedReqsMap(newReceivedReqsMap) -}, 750) - -/** - * @param {string} addr - * @returns {(data: import('../SimpleGUN').OpenListenerData) => void} - */ -const listenerForAddr = addr => data => { - // did invalidate - if (addr !== currentAddress) { - return - } - - if (typeof data !== 'object' || data === null) { - currAddressData = {} - } else { - for (const [id, req] of Object.entries(data)) { - // no need to update them just write them once - if (Schema.isHandshakeRequest(req) && !currAddressData[id]) { - currAddressData[id] = req - } - } - } - - logger.info('data for address length: ' + size(currAddressData)) - - react() -} - -let subbed = false - -/** - * @param {Listener} cb - * @returns {() => void} - */ -const onReceivedReqs = cb => { - listeners.add(cb) - cb(getReceivedReqs()) - - if (!subbed) { - const user = require('../../Mediator').getUser() - if (!user.is) { - logger.warn('Tried subscribing to onReceivedReqs without authing') - } - require('./index').onCurrentHandshakeAddress(addr => { - if (currentAddress === addr) { - return - } - - currentAddress = addr - currAddressData = {} - setReceivedReqsMap({}) - - if (typeof addr === 'string') { - require('../../Mediator') - .getGun() - .get(Key.HANDSHAKE_NODES) - .get(addr) - .open(listenerForAddr(addr)) - } - }, user) - - Streams.onPubToFeed(react) - - subbed = true - } - - return () => { - listeners.delete(cb) - } -} - -module.exports = { - getReceivedReqs, - onReceivedReqs -} diff --git a/services/gunDB/contact-api/events/onSentReqs.js b/services/gunDB/contact-api/events/onSentReqs.js deleted file mode 100644 index 3aefb6ef..00000000 --- a/services/gunDB/contact-api/events/onSentReqs.js +++ /dev/null @@ -1,128 +0,0 @@ -/** @format */ -const debounce = require('lodash/debounce') -const logger = require('winston') -const size = require('lodash/size') - -const Streams = require('../streams') -/** - * @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode - * @typedef {import('../SimpleGUN').GUNNode} GUNNode - * @typedef {import('../SimpleGUN').ISEA} ISEA - * @typedef {import('../SimpleGUN').ListenerData} ListenerData - * @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest - * @typedef {import('shock-common').Schema.Message} Message - * @typedef {import('shock-common').Schema.Outgoing} Outgoing - * @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing - * @typedef {import('shock-common').Schema.Chat} Chat - * @typedef {import('shock-common').Schema.ChatMessage} ChatMessage - * @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest - * @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest - */ - -/** - * @typedef {(chats: SimpleSentRequest[]) => void} Listener - */ - -/** @type {Set} */ -const listeners = new Set() - -/** @type {SimpleSentRequest[]} */ -let currentReqs = [] - -listeners.add(() => { - logger.info(`new sent reqs length: ${size(currentReqs)}`) -}) - -const getCurrentSentReqs = () => currentReqs - -// any time any of the streams we use notifies us that it changed, we fire up -// react() -const react = debounce(() => { - /** @type {SimpleSentRequest[]} */ - const newReqs = [] - - // reactive streams - // maps a pk to its current handshake address - const pubToHAddr = Streams.getAddresses() - // a set or list containing copies of sent requests - const storedReqs = Streams.getStoredReqs() - // maps a pk to the last request sent to it (so old stored reqs are invalidated) - const pubToLastSentReqID = Streams.getSentReqIDs() - // maps a pk to a feed, messages if subbed and pk is pubbing, null / - // 'disconnected' otherwise - const pubToFeed = Streams.getPubToFeed() - - logger.info(`pubToLastSentREqID length: ${size(pubToLastSentReqID)}`) - - for (const storedReq of storedReqs) { - const { handshakeAddress, recipientPub, sentReqID, timestamp } = storedReq - const currAddress = pubToHAddr[recipientPub] - - const lastReqID = pubToLastSentReqID[recipientPub] - // invalidate if this stored request is not the last one sent to this - // particular pk - const isStale = typeof lastReqID !== 'undefined' && lastReqID !== sentReqID - // invalidate if we are in a pub/sub state to this pk (handshake in place) - const isConnected = Array.isArray(pubToFeed[recipientPub]) - - if (isStale || isConnected) { - // eslint-disable-next-line no-continue - continue - } - - // no address for this pk? let's ask the corresponding stream to sub to - // gun.user(pk).get('currentAddr') - if (typeof currAddress === 'undefined') { - // eslint-disable-next-line no-empty-function - Streams.onAddresses(() => {}, recipientPub)() - } - - newReqs.push({ - id: sentReqID, - recipientAvatar: null, - recipientChangedRequestAddress: - // if we haven't received the other's user current handshake address, - // let's assume he hasn't changed it and that this request is still - // valid - typeof currAddress !== 'undefined' && handshakeAddress !== currAddress, - recipientDisplayName: null, - recipientPublicKey: recipientPub, - timestamp - }) - } - - currentReqs = newReqs - - listeners.forEach(l => l(currentReqs)) -}, 750) - -let subbed = false - -/** - * Massages all of the more primitive data structures into a more manageable - * 'Chat' paradigm. - * @param {Listener} cb - * @returns {() => void} - */ -const onSentReqs = cb => { - listeners.add(cb) - cb(currentReqs) - - if (!subbed) { - Streams.onAddresses(react) - Streams.onStoredReqs(react) - Streams.onLastSentReqIDs(react) - Streams.onPubToFeed(react) - - subbed = true - } - - return () => { - listeners.delete(cb) - } -} - -module.exports = { - onSentReqs, - getCurrentSentReqs -}