From 851349317669660a4608c711e5eb41be42f09b5b Mon Sep 17 00:00:00 2001 From: Daniel Lugo Date: Wed, 29 Jan 2020 16:34:40 -0400 Subject: [PATCH] better received reqs --- .../{events.js => events/index.js} | 351 ++---------------- .../contact-api/events/onReceivedReqs.js | 144 +++++++ 2 files changed, 167 insertions(+), 328 deletions(-) rename services/gunDB/contact-api/{events.js => events/index.js} (67%) create mode 100644 services/gunDB/contact-api/events/onReceivedReqs.js diff --git a/services/gunDB/contact-api/events.js b/services/gunDB/contact-api/events/index.js similarity index 67% rename from services/gunDB/contact-api/events.js rename to services/gunDB/contact-api/events/index.js index aac6c600..9ebcaeb5 100644 --- a/services/gunDB/contact-api/events.js +++ b/services/gunDB/contact-api/events/index.js @@ -3,26 +3,25 @@ */ const debounce = require('lodash/debounce') -const Actions = require('./actions') -const ErrorCode = require('./errorCode') -const Key = require('./key') -const Schema = require('./schema') -const Streams = require('./streams') -const Utils = require('./utils') -const Config = require('../config') +const Actions = require('../actions') +const ErrorCode = require('../errorCode') +const Key = require('../key') +const Schema = require('../schema') +const Streams = require('../streams') +const Utils = require('../utils') /** - * @typedef {import('./SimpleGUN').UserGUNNode} UserGUNNode - * @typedef {import('./SimpleGUN').GUNNode} GUNNode - * @typedef {import('./SimpleGUN').ISEA} ISEA - * @typedef {import('./SimpleGUN').ListenerData} ListenerData - * @typedef {import('./schema').HandshakeRequest} HandshakeRequest - * @typedef {import('./schema').Message} Message - * @typedef {import('./schema').Outgoing} Outgoing - * @typedef {import('./schema').PartialOutgoing} PartialOutgoing - * @typedef {import('./schema').Chat} Chat - * @typedef {import('./schema').ChatMessage} ChatMessage - * @typedef {import('./schema').SimpleSentRequest} SimpleSentRequest - * @typedef {import('./schema').SimpleReceivedRequest} SimpleReceivedRequest + * @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode + * @typedef {import('../SimpleGUN').GUNNode} GUNNode + * @typedef {import('../SimpleGUN').ISEA} ISEA + * @typedef {import('../SimpleGUN').ListenerData} ListenerData + * @typedef {import('../schema').HandshakeRequest} HandshakeRequest + * @typedef {import('../schema').Message} Message + * @typedef {import('../schema').Outgoing} Outgoing + * @typedef {import('../schema').PartialOutgoing} PartialOutgoing + * @typedef {import('../schema').Chat} Chat + * @typedef {import('../schema').ChatMessage} ChatMessage + * @typedef {import('../schema').SimpleSentRequest} SimpleSentRequest + * @typedef {import('../schema').SimpleReceivedRequest} SimpleReceivedRequest */ const DEBOUNCE_WAIT_TIME = 500 @@ -336,8 +335,8 @@ const processOutgoings = async () => { const outs = encryptedOutgoings encryptedOutgoings = {} const mySecret = await Utils.mySecret() - const SEA = require('../Mediator').mySEA - const user = require('../Mediator').getUser() + const SEA = require('../../Mediator').mySEA + const user = require('../../Mediator').getUser() await Utils.asyncForEach(Object.entries(outs), async ([id, out]) => { if (out === null) { currentOutgoings[id] = null @@ -407,7 +406,7 @@ const onOutgoing = cb => { outgoingsListeners.add(cb) cb(currentOutgoings) - const currentUser = require('../Mediator').getUser() + const currentUser = require('../../Mediator').getUser() if (lastUserWithListener !== currentUser) { // in case user changed gun alias @@ -417,7 +416,7 @@ const onOutgoing = cb => { currentUser.get(Key.OUTGOINGS).open(data => { // deactivate this listener when user changes - if (lastUserWithListener !== require('../Mediator').getUser()) { + if (lastUserWithListener !== require('../../Mediator').getUser()) { return } // @ts-ignore Let's skip schema checks for perf reasons @@ -540,310 +539,6 @@ const onChats = cb => { } } -/** - * - * @param {(simpleReceivedRequests: SimpleReceivedRequest[]) => void} cb - * @param {GUNNode} gun - * @param {UserGUNNode} user - * @param {ISEA} SEA - * @returns {void} - */ -const onSimplerReceivedRequests = (cb, gun, user, SEA) => { - try { - if (!user.is) { - throw new Error(ErrorCode.NOT_AUTH) - } - - /** @type {Record} */ - const idToRequest = {} - - /** @type {string[]} */ - const requestorsWithAvatarListeners = [] - - /** @type {string[]} */ - const requestorsWithDisplayNameListeners = [] - - /** - * @type {Partial>} - */ - const requestorToAvatar = {} - - /** - * @type {Partial>} - */ - const requestorToDisplayName = {} - - /** @type {Set} */ - const requestorsAlreadyAccepted = new Set() - - /** - * We cannot call gun.off(), so keep track of the current handshake addres. - * And only run the listeners for the handshake nodes if they are for the - * current handshake address node. - */ - let currentHandshakeAddress = '' - - //////////////////////////////////////////////////////////////////////////// - - const _callCB = async () => { - try { - const requestEntries = Object.entries(idToRequest) - - if (Config.SHOW_LOG) { - console.log('requestorsAlreadyAccepted') - console.log(requestorsAlreadyAccepted) - console.log('/requestorsAlreadyAccepted') - } - - if (Config.SHOW_LOG) { - console.log('raw requests:') - console.log(idToRequest) - console.log('/raw requests') - } - - // avoid race conditions due to gun's reactive nature. - const onlyInCurrentHandshakeNode = await Utils.asyncFilter( - requestEntries, - async ([id]) => { - try { - const HNAddr = await Utils.tryAndWait(async (_, user) => { - const data = await user - .get(Key.CURRENT_HANDSHAKE_ADDRESS) - .then() - - if (typeof data !== 'string') { - throw new Error('handshake address not an string') - } - - return data - }) - - const maybeHreq = await Utils.tryAndWait(gun => - gun - .get(Key.HANDSHAKE_NODES) - .get(HNAddr) - .get(id) - .then() - ) - - return Schema.isHandshakeRequest(maybeHreq) - } catch (err) { - console.log(`error for request ID: ${id}`) - throw err - } - } - ) - - if (Config.SHOW_LOG) { - console.log('onlyInCurrentHandshakeNode') - console.log(onlyInCurrentHandshakeNode) - console.log('/onlyInCurrentHandshakeNode') - } - - // USER-TO-INCOMING (which indicates acceptance of this request) write - // might not be in there by the time we are looking at these requests. - // Let's account for this. - const notAccepted = await Utils.asyncFilter( - onlyInCurrentHandshakeNode, - async ([reqID, req]) => { - try { - if (requestorsAlreadyAccepted.has(req.from)) { - return false - } - - const requestorEpub = await Utils.pubToEpub(req.from) - - const ourSecret = await SEA.secret(requestorEpub, user._.sea) - if (typeof ourSecret !== 'string') { - throw new TypeError('typeof ourSecret !== "string"') - } - - const decryptedResponse = await SEA.decrypt( - req.response, - ourSecret - ) - - if (typeof decryptedResponse !== 'string') { - throw new TypeError('typeof decryptedResponse !== "string"') - } - - const outfeedID = decryptedResponse - - if (Config.SHOW_LOG) { - console.log('\n') - console.log('--------outfeedID----------') - console.log(outfeedID) - console.log('------------------') - console.log('\n') - } - - const maybeOutfeed = await Utils.tryAndWait(gun => - gun - .user(req.from) - .get(Key.OUTGOINGS) - .get(outfeedID) - .then() - ) - - if (Config.SHOW_LOG) { - console.log('\n') - console.log('--------maybeOutfeed----------') - console.log(maybeOutfeed) - console.log('------------------') - console.log('\n') - } - - const wasAccepted = Schema.isHandshakeRequest(maybeOutfeed) - - return !wasAccepted - } catch (err) { - console.log(`error for request ID: ${reqID}`) - throw err - } - } - ) - - if (Config.SHOW_LOG) { - console.log('notAccepted') - console.log(notAccepted) - console.log('/notAccepted') - } - - const simpleReceivedReqs = notAccepted.map(([reqID, req]) => { - try { - const { from: requestorPub } = req - - /** @type {SimpleReceivedRequest} */ - const simpleReceivedReq = { - id: reqID, - requestorAvatar: requestorToAvatar[requestorPub] || null, - requestorDisplayName: - requestorToDisplayName[requestorPub] || - Utils.defaultName(requestorPub), - requestorPK: requestorPub, - response: req.response, - timestamp: req.timestamp - } - - return simpleReceivedReq - } catch (err) { - console.log(`error for request ID: ${reqID}`) - throw err - } - }) - - cb(simpleReceivedReqs) - } catch (err) { - console.error(err) - } - } - - const callCB = debounce(_callCB, DEBOUNCE_WAIT_TIME) - callCB() - - user - .get(Key.USER_TO_INCOMING) - .map() - .on((incomingID, userPK) => { - const disconnected = incomingID === null - if (disconnected) { - requestorsAlreadyAccepted.delete(userPK) - } else { - requestorsAlreadyAccepted.add(userPK) - } - - callCB() - }) - - //////////////////////////////////////////////////////////////////////////// - /** - * @param {string} addr - * @returns {(req: ListenerData, reqID: string) => void} - */ - const listenerForAddr = addr => (req, reqID) => { - try { - if (addr !== currentHandshakeAddress) { - console.log( - 'onSimplerReceivedRequests() -> listenerForAddr() -> stale handshake address, quitting' - ) - return - } - - if (!Schema.isHandshakeRequest(req)) { - console.log( - 'onSimplerReceivedRequests() -> listenerForAddr() -> bad handshake request, quitting' - ) - console.log(req) - return - } - - idToRequest[reqID] = req - callCB() - - if (!requestorsWithAvatarListeners.includes(req.from)) { - requestorsWithAvatarListeners.push(req.from) - - gun - .user(req.from) - .get(Key.PROFILE) - .get(Key.AVATAR) - .on(avatar => { - if (typeof avatar === 'string' || avatar === null) { - // || handles empty strings - requestorToAvatar[req.from] = avatar || null - - callCB() - } - }) - } - - if (!requestorsWithDisplayNameListeners.includes(req.from)) { - requestorsWithDisplayNameListeners.push(req.from) - - gun - .user(req.from) - .get(Key.PROFILE) - .get(Key.DISPLAY_NAME) - .on(displayName => { - if (typeof displayName === 'string' || displayName === null) { - // || handles empty strings - requestorToDisplayName[req.from] = displayName || null - - callCB() - } - }) - } - } catch (err) { - console.log('onSimplerReceivedRequests() -> listenerForAddr() ->') - console.log(err) - } - - callCB() - } - //////////////////////////////////////////////////////////////////////////// - user.get(Key.CURRENT_HANDSHAKE_ADDRESS).on(addr => { - if (typeof addr !== 'string') { - throw new TypeError('current handshake address not an string') - } - - console.log( - `onSimplerReceivedRequests() -> setting current address to ${addr}` - ) - currentHandshakeAddress = addr - - gun - .get(Key.HANDSHAKE_NODES) - .get(addr) - .map() - .on(listenerForAddr(addr)) - - callCB() - }) - } catch (err) { - console.log(`onSimplerReceivedRequests() -> ${err.message}`) - } -} - /** * @param {(sentRequests: SimpleSentRequest[]) => void} cb * @param {GUNNode} gun @@ -1158,7 +853,7 @@ module.exports = { onIncomingMessages, onOutgoing, onChats, - onSimplerReceivedRequests, + onSimplerReceivedRequests: require('./onReceivedReqs'), onSimplerSentRequests, onBio, onSeedBackup diff --git a/services/gunDB/contact-api/events/onReceivedReqs.js b/services/gunDB/contact-api/events/onReceivedReqs.js new file mode 100644 index 00000000..849a3ec9 --- /dev/null +++ b/services/gunDB/contact-api/events/onReceivedReqs.js @@ -0,0 +1,144 @@ +/** @format */ +const Events = require('./index') +const Key = require('../key') +const Schema = require('../schema') +const Streams = require('../streams') +/** + * @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode + * @typedef {import('../SimpleGUN').GUNNode} GUNNode + * @typedef {import('../SimpleGUN').ISEA} ISEA + * @typedef {import('../SimpleGUN').ListenerData} ListenerData + * @typedef {import('../schema').HandshakeRequest} HandshakeRequest + * @typedef {import('../schema').Message} Message + * @typedef {import('../schema').Outgoing} Outgoing + * @typedef {import('../schema').PartialOutgoing} PartialOutgoing + * @typedef {import('../schema').Chat} Chat + * @typedef {import('../schema').ChatMessage} ChatMessage + * @typedef {import('../schema').SimpleSentRequest} SimpleSentRequest + * @typedef {import('../schema').SimpleReceivedRequest} SimpleReceivedRequest + */ + +/** + * @typedef {(chats: SimpleReceivedRequest[]) => void} Listener + */ + +const listeners = new Set() + +/** @type {Streams.Avatars} */ +let pubToAvatar = {} + +/** @type {Streams.DisplayNames} */ +let pubToDn = {} + +/** @type {Streams.Incomings} */ +let pubToIncoming = {} + +/** @type {SimpleReceivedRequest[]} */ +let currentReqs = [] + +/** @type {string|null} */ +let currentAddress = null + +/** @type {Record} */ +let currentNode = {} + +const react = () => { + /** @type {SimpleReceivedRequest[]} */ + const finalReqs = [] + + for (const [id, req] of Object.entries(currentNode)) { + const notAccepted = typeof pubToIncoming[req.from] === 'undefined' + + if (notAccepted) { + finalReqs.push({ + id, + requestorAvatar: pubToAvatar[req.from] || null, + requestorDisplayName: pubToDn[req.from] || null, + requestorPK: req.from, + response: req.response, + timestamp: req.timestamp + }) + } + } + + currentReqs = finalReqs + + listeners.forEach(l => l(currentReqs)) +} + +/** + * + * @param {string} addr + * @returns {(data: import('../SimpleGUN').OpenListenerData) => void} + */ +const listenerForAddr = addr => data => { + if (addr !== currentAddress) { + return + } + + if (typeof data === 'object' && data !== null) { + for (const [id, req] of Object.entries(data)) { + if (!Schema.isHandshakeRequest(req)) { + return + } + + currentNode[id] = req + } + + react() + } +} + +let subbed = false + +/** + * Massages all of the more primitive data structures into a more manageable + * 'Chat' paradigm. + * @param {Listener} cb + * @returns {() => void} + */ +const onReceivedReqs = cb => { + listeners.add(cb) + + if (!subbed) { + Events.onCurrentHandshakeAddress(addr => { + if (currentAddress !== addr) { + currentAddress = addr + currentNode = {} + + if (typeof addr === 'string') { + require('../../Mediator') + .getGun() + .get(Key.HANDSHAKE_NODES) + .get(addr) + .open(listenerForAddr(addr)) + } + + react() + } + }, require('../../Mediator').getUser()) + + Streams.onAvatar(pta => { + pubToAvatar = pta + react() + }) + Streams.onDisplayName(ptd => { + pubToDn = ptd + react() + }) + Streams.onIncoming(pti => { + pubToIncoming = pti + react() + }) + + subbed = true + } + + cb(currentReqs) + + return () => { + listeners.delete(cb) + } +} + +module.exports = onReceivedReqs