From c2c63f957659a5d43ac239540cfe25acc08bfd73 Mon Sep 17 00:00:00 2001 From: Daniel Lugo Date: Wed, 29 Jan 2020 18:29:21 -0400 Subject: [PATCH] new events and streams --- services/gunDB/Mediator/index.js | 97 +++---- services/gunDB/contact-api/events/index.js | 254 +----------------- .../gunDB/contact-api/events/onSentReqs.js | 103 +++++++ .../gunDB/contact-api/streams/addresses.js | 48 ++++ .../{streams.js => streams/index.js} | 42 +-- .../contact-api/streams/lastSentReqID.js | 50 ++++ 6 files changed, 269 insertions(+), 325 deletions(-) create mode 100644 services/gunDB/contact-api/events/onSentReqs.js create mode 100644 services/gunDB/contact-api/streams/addresses.js rename services/gunDB/contact-api/{streams.js => streams/index.js} (86%) create mode 100644 services/gunDB/contact-api/streams/lastSentReqID.js diff --git a/services/gunDB/Mediator/index.js b/services/gunDB/Mediator/index.js index 03bd2a39..d01b9ea7 100644 --- a/services/gunDB/Mediator/index.js +++ b/services/gunDB/Mediator/index.js @@ -474,10 +474,7 @@ class Mediator { }) }), 300 - ), - gun, - user, - mySEA + ) ) } catch (err) { console.log(err) @@ -593,10 +590,7 @@ class Mediator { }) }), 350 - ), - gun, - user, - mySEA + ) ) } catch (err) { if (Config.SHOW_LOG) { @@ -835,24 +829,19 @@ class Mediator { await throwOnInvalidToken(token) - API.Events.onChats( - chats => { - if (Config.SHOW_LOG) { - console.log('---chats---') - console.log(chats) - console.log('-----------------------') - } + API.Events.onChats(chats => { + if (Config.SHOW_LOG) { + console.log('---chats---') + console.log(chats) + console.log('-----------------------') + } - this.socket.emit(Event.ON_CHATS, { - msg: chats, - ok: true, - origBody: body - }) - }, - gun, - user, - mySEA - ) + this.socket.emit(Event.ON_CHATS, { + msg: chats, + ok: true, + origBody: body + }) + }) } catch (err) { console.log(err) this.socket.emit(Event.ON_CHATS, { @@ -936,24 +925,19 @@ class Mediator { await throwOnInvalidToken(token) - API.Events.onSimplerReceivedRequests( - receivedRequests => { - if (Config.SHOW_LOG) { - console.log('---receivedRequests---') - console.log(receivedRequests) - console.log('-----------------------') - } + API.Events.onSimplerReceivedRequests(receivedRequests => { + if (Config.SHOW_LOG) { + console.log('---receivedRequests---') + console.log(receivedRequests) + console.log('-----------------------') + } - this.socket.emit(Event.ON_RECEIVED_REQUESTS, { - msg: receivedRequests, - ok: true, - origBody: body - }) - }, - gun, - user, - mySEA - ) + this.socket.emit(Event.ON_RECEIVED_REQUESTS, { + msg: receivedRequests, + ok: true, + origBody: body + }) + }) } catch (err) { console.log(err) this.socket.emit(Event.ON_RECEIVED_REQUESTS, { @@ -973,24 +957,19 @@ class Mediator { await throwOnInvalidToken(token) - await API.Events.onSimplerSentRequests( - sentRequests => { - if (Config.SHOW_LOG) { - console.log('---sentRequests---') - console.log(sentRequests) - console.log('-----------------------') - } + await API.Events.onSimplerSentRequests(sentRequests => { + if (Config.SHOW_LOG) { + console.log('---sentRequests---') + console.log(sentRequests) + console.log('-----------------------') + } - this.socket.emit(Event.ON_SENT_REQUESTS, { - msg: sentRequests, - ok: true, - origBody: body - }) - }, - gun, - user, - mySEA - ) + this.socket.emit(Event.ON_SENT_REQUESTS, { + msg: sentRequests, + ok: true, + origBody: body + }) + }) } catch (err) { console.log(err) this.socket.emit(Event.ON_SENT_REQUESTS, { diff --git a/services/gunDB/contact-api/events/index.js b/services/gunDB/contact-api/events/index.js index 9ebcaeb5..18ab7f01 100644 --- a/services/gunDB/contact-api/events/index.js +++ b/services/gunDB/contact-api/events/index.js @@ -441,6 +441,7 @@ let pubToDn = {} /** @type {Streams.Incomings} */ let pubToIncoming = {} + /** * @typedef {(chats: Chat[]) => void} ChatsListener */ @@ -539,256 +540,6 @@ const onChats = cb => { } } -/** - * @param {(sentRequests: SimpleSentRequest[]) => void} cb - * @param {GUNNode} gun - * @param {UserGUNNode} user - * @param {ISEA} SEA - * @returns {Promise} - */ -const onSimplerSentRequests = async (cb, gun, user, SEA) => { - if (!user.is) { - throw new Error(ErrorCode.NOT_AUTH) - } - /** - * @type {Record} - */ - const storedReqs = {} - /** - * @type {Partial>} - */ - const recipientToAvatar = {} - /** - * @type {Partial>} - */ - const recipientToDisplayName = {} - /** - * @type {Partial>} - */ - const recipientToCurrentHandshakeAddress = {} - /** - * @type {Record} - */ - const simpleSentRequests = {} - /** - * Keep track of recipients that already have listeners for their avatars. - * @type {string[]} - */ - const recipientsWithAvatarListener = [] - /** - * Keep track of recipients that already have listeners for their display - * name. - * @type {string[]} - */ - const recipientsWithDisplayNameListener = [] - /** - * Keep track of recipients that already have listeners for their current - * handshake node. - * @type {string[]} - */ - const recipientsWithCurrentHandshakeAddressListener = [] - - const mySecret = await SEA.secret(user._.sea.epub, user._.sea) - - if (typeof mySecret !== 'string') { - throw new TypeError("typeof mySecret !== 'string'") - } - - const callCB = debounce(async () => { - try { - console.log('\n') - console.log('------simplerSentRequests: rawRequests ------') - console.log(storedReqs) - console.log('\n') - - const entries = Object.entries(storedReqs) - - /** @type {Promise[]} */ - const promises = entries.map(([, storedReq]) => - (async () => { - const recipientPub = await SEA.decrypt( - storedReq.recipientPub, - mySecret - ) - if (typeof recipientPub !== 'string') { - throw new TypeError() - } - const requestAddress = await SEA.decrypt( - storedReq.handshakeAddress, - mySecret - ) - if (typeof requestAddress !== 'string') { - throw new TypeError() - } - const sentReqID = await SEA.decrypt(storedReq.sentReqID, mySecret) - if (typeof sentReqID !== 'string') { - throw new TypeError() - } - - /** @type {Schema.HandshakeRequest} */ - const sentReq = await Utils.tryAndWait(async gun => { - const data = await gun - .get(Key.HANDSHAKE_NODES) - .get(requestAddress) - .get(sentReqID) - .then() - - if (Schema.isHandshakeRequest(data)) { - return data - } - - throw new TypeError('sent req not a handshake request') - }) - - const latestReqIDForRecipient = await Utils.recipientPubToLastReqSentID( - recipientPub - ) - - if ( - await Utils.reqWasAccepted( - sentReq.response, - recipientPub, - user, - SEA - ) - ) { - return null - } - - if ( - !recipientsWithCurrentHandshakeAddressListener.includes( - recipientPub - ) - ) { - recipientsWithCurrentHandshakeAddressListener.push(recipientPub) - - gun - .user(recipientPub) - .get(Key.CURRENT_HANDSHAKE_ADDRESS) - .on(addr => { - if (typeof addr !== 'string') { - console.log( - "onSimplerSentRequests() -> typeof addr !== 'string'" - ) - - return - } - - recipientToCurrentHandshakeAddress[recipientPub] = addr - - callCB() - }) - } - - if (!recipientsWithAvatarListener.includes(recipientPub)) { - recipientsWithAvatarListener.push(recipientPub) - - gun - .user(recipientPub) - .get(Key.PROFILE) - .get(Key.AVATAR) - .on(avatar => { - if (typeof avatar === 'string' || avatar === null) { - recipientToAvatar[recipientPub] = avatar - callCB() - } - }) - } - - if (!recipientsWithDisplayNameListener.includes(recipientPub)) { - recipientsWithDisplayNameListener.push(recipientPub) - - gun - .user(recipientPub) - .get(Key.PROFILE) - .get(Key.DISPLAY_NAME) - .on(displayName => { - if (typeof displayName === 'string' || displayName === null) { - recipientToDisplayName[recipientPub] = displayName - callCB() - } - }) - } - - const isStaleRequest = latestReqIDForRecipient !== sentReqID - - if (isStaleRequest) { - return null - } - - /** - * @type {SimpleSentRequest} - */ - const res = { - id: sentReqID, - recipientAvatar: recipientToAvatar[recipientPub] || null, - recipientChangedRequestAddress: false, - recipientDisplayName: - recipientToDisplayName[recipientPub] || - Utils.defaultName(recipientPub), - recipientPublicKey: recipientPub, - timestamp: sentReq.timestamp - } - - return res - })() - ) - - const reqsOrNulls = await Promise.all(promises) - - console.log('\n') - console.log('------simplerSentRequests: reqsOrNulls ------') - console.log(reqsOrNulls) - console.log('\n') - - /** @type {SimpleSentRequest[]} */ - // @ts-ignore - const reqs = reqsOrNulls.filter(item => item !== null) - - for (const req of reqs) { - simpleSentRequests[req.id] = req - } - } catch (err) { - console.log(`onSimplerSentRequests() -> callCB() -> ${err.message}`) - } finally { - cb(Object.values(simpleSentRequests)) - } - }, DEBOUNCE_WAIT_TIME) - - callCB() - - // force a refresh when a request is accepted - user.get(Key.USER_TO_INCOMING).on(() => { - callCB() - }) - - user - .get(Key.STORED_REQS) - .map() - .on((sentRequest, sentRequestID) => { - try { - if (!Schema.isStoredRequest(sentRequest)) { - console.log('\n') - console.log( - '------simplerSentRequests: !Schema.isHandshakeRequest(sentRequest) ------' - ) - console.log(sentRequest) - console.log('\n') - - return - } - - storedReqs[sentRequestID] = sentRequest - - callCB() - } catch (err) { - console.log( - `onSimplerSentRequests() -> sentRequestID: ${sentRequestID} -> ${err.message}` - ) - } - }) -} - /** @type {string|null} */ let currentBio = null @@ -854,7 +605,8 @@ module.exports = { onOutgoing, onChats, onSimplerReceivedRequests: require('./onReceivedReqs'), - onSimplerSentRequests, + onSimplerSentRequests: require('./onSentReqs').onSentReqs, + getCurrentSentReqs: require('./onSentReqs').getCurrentSentReqs, onBio, onSeedBackup } diff --git a/services/gunDB/contact-api/events/onSentReqs.js b/services/gunDB/contact-api/events/onSentReqs.js new file mode 100644 index 00000000..09057ef3 --- /dev/null +++ b/services/gunDB/contact-api/events/onSentReqs.js @@ -0,0 +1,103 @@ +/** @format */ +const Streams = require('../streams') +/** + * @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode + * @typedef {import('../SimpleGUN').GUNNode} GUNNode + * @typedef {import('../SimpleGUN').ISEA} ISEA + * @typedef {import('../SimpleGUN').ListenerData} ListenerData + * @typedef {import('../schema').HandshakeRequest} HandshakeRequest + * @typedef {import('../schema').Message} Message + * @typedef {import('../schema').Outgoing} Outgoing + * @typedef {import('../schema').PartialOutgoing} PartialOutgoing + * @typedef {import('../schema').Chat} Chat + * @typedef {import('../schema').ChatMessage} ChatMessage + * @typedef {import('../schema').SimpleSentRequest} SimpleSentRequest + * @typedef {import('../schema').SimpleReceivedRequest} SimpleReceivedRequest + */ + +/** + * @typedef {(chats: SimpleSentRequest[]) => void} Listener + */ + +/** @type {Set} */ +const listeners = new Set() + +/** @type {SimpleSentRequest[]} */ +let currentReqs = [] + +const getCurrentSentReqs = () => currentReqs + +const react = () => { + /** @type {SimpleSentRequest[]} */ + const finalSentReqs = [] + + const pubToHAddr = Streams.getAddresses() + const storedReqs = Streams.getStoredReqs() + const pubToLastSentReqID = Streams.getSentReqIDs() + const pubToIncoming = Streams.getPubToIncoming() + const pubToAvatar = Streams.getPubToAvatar() + const pubToDN = Streams.getPubToDn() + + for (const storedReq of storedReqs) { + const { handshakeAddress, recipientPub, sentReqID, timestamp } = storedReq + const currAddress = pubToHAddr[recipientPub] + + const lastReqID = pubToLastSentReqID[recipientPub] + const isStale = typeof lastReqID !== 'undefined' && lastReqID !== sentReqID + const aHandshakeWasEstablishedAtSomePoint = + typeof pubToIncoming[recipientPub] !== 'undefined' + + if (isStale && aHandshakeWasEstablishedAtSomePoint) { + // eslint-disable-next-line no-continue + continue + } + + finalSentReqs.push({ + id: sentReqID, + recipientAvatar: pubToAvatar[recipientPub] || null, + recipientChangedRequestAddress: + typeof currAddress && handshakeAddress !== currAddress, + recipientDisplayName: pubToDN[recipientPub] || null, + recipientPublicKey: recipientPub, + timestamp + }) + } + + currentReqs = finalSentReqs + + listeners.forEach(l => l(currentReqs)) +} + +let subbed = false + +/** + * Massages all of the more primitive data structures into a more manageable + * 'Chat' paradigm. + * @param {Listener} cb + * @returns {() => void} + */ +const onSentReqs = cb => { + listeners.add(cb) + + if (!subbed) { + Streams.onAddresses(react) + Streams.onStoredReqs(react) + Streams.onLastSentReqIDs(react) + Streams.onIncoming(react) + Streams.onAvatar(react) + Streams.onDisplayName(react) + + subbed = true + } + + cb(currentReqs) + + return () => { + listeners.delete(cb) + } +} + +module.exports = { + onSentReqs, + getCurrentSentReqs +} diff --git a/services/gunDB/contact-api/streams/addresses.js b/services/gunDB/contact-api/streams/addresses.js new file mode 100644 index 00000000..cc6d9343 --- /dev/null +++ b/services/gunDB/contact-api/streams/addresses.js @@ -0,0 +1,48 @@ +/** @format */ +const Key = require('../key') +/** + * @typedef {Record} Addresses + */ + +/** @type {Addresses} */ +const pubToAddress = {} + +/** @type {Set<() => void>} */ +const listeners = new Set() +const notify = () => listeners.forEach(l => l()) + +/** @type {Set} */ +const subbedPublicKeys = new Set() + +/** + * @param {() => void} cb + * @param {string=} pub + */ +const onAddresses = (cb, pub) => { + listeners.add(cb) + cb() + if (pub && subbedPublicKeys.add(pub)) { + require('../../Mediator') + .getGun() + .user(pub) + .get(Key.CURRENT_HANDSHAKE_ADDRESS) + .on(addr => { + if (typeof addr === 'string' || addr === null) { + pubToAddress[pub] = addr + } else { + pubToAddress[pub] = null + } + notify() + }) + } + return () => { + listeners.delete(cb) + } +} + +const getAddresses = () => pubToAddress + +module.exports = { + onAddresses, + getAddresses +} diff --git a/services/gunDB/contact-api/streams.js b/services/gunDB/contact-api/streams/index.js similarity index 86% rename from services/gunDB/contact-api/streams.js rename to services/gunDB/contact-api/streams/index.js index 8bedcf3f..248da93d 100644 --- a/services/gunDB/contact-api/streams.js +++ b/services/gunDB/contact-api/streams/index.js @@ -1,8 +1,8 @@ /** @format */ -const { INITIAL_MSG } = require('./actions') -const Key = require('./key') -const Schema = require('./schema') -const Utils = require('./utils') +const { INITIAL_MSG } = require('../actions') +const Key = require('../key') +const Schema = require('../schema') +const Utils = require('../utils') /** * @typedef {Record} Avatars * @typedef {(avatars: Avatars) => void} AvatarListener @@ -11,6 +11,8 @@ const Utils = require('./utils') /** @type {Avatars} */ const pubToAvatar = {} +const getPubToAvatar = () => pubToAvatar + /** @type {Set} */ const avatarListeners = new Set() @@ -29,7 +31,7 @@ const onAvatar = (cb, pub) => { avatarListeners.add(cb) cb(pubToAvatar) if (pub && pubsWithAvatarListeners.add(pub)) { - require('../Mediator') + require('../../Mediator') .getGun() .user(pub) .get(Key.PROFILE) @@ -56,6 +58,8 @@ const onAvatar = (cb, pub) => { /** @type {DisplayNames} */ const pubToDisplayName = {} +const getPubToDn = () => pubToDisplayName + /** @type {Set} */ const displayNameListeners = new Set() @@ -74,7 +78,7 @@ const onDisplayName = (cb, pub) => { displayNameListeners.add(cb) cb(pubToDisplayName) if (pub && pubsWithDisplayNameListeners.add(pub)) { - require('../Mediator') + require('../../Mediator') .getGun() .user(pub) .get(Key.PROFILE) @@ -94,7 +98,7 @@ const onDisplayName = (cb, pub) => { } /** - * @typedef {import('./schema').ChatMessage[]} Message + * @typedef {import('../schema').ChatMessage[]} Message * @typedef {Record} Incomings * @typedef {(incomings: Incomings) => void} IncomingsListener */ @@ -104,6 +108,8 @@ const onDisplayName = (cb, pub) => { */ const pubToIncoming = {} +const getPubToIncoming = () => pubToIncoming + /** @type {Set} */ const incomingsListeners = new Set() @@ -120,8 +126,8 @@ const pubFeedPairsWithIncomingListeners = new Set() const onIncoming = cb => { incomingsListeners.add(cb) - const user = require('../Mediator').getUser() - const SEA = require('../Mediator').mySEA + const user = require('../../Mediator').getUser() + const SEA = require('../../Mediator').mySEA user.get(Key.USER_TO_INCOMING).open(uti => { if (typeof uti !== 'object' || uti === null) { @@ -139,7 +145,7 @@ const onIncoming = cb => { user._.sea ) - require('../Mediator') + require('../../Mediator') .getGun() .user(pub) .get(Key.OUTGOINGS) @@ -206,7 +212,7 @@ const onIncoming = cb => { } /** - * @typedef {import('./schema').StoredRequest} StoredRequest + * @typedef {import('../schema').StoredRequest} StoredRequest * @typedef {(reqs: StoredRequest[]) => void} StoredRequestsListener */ @@ -229,7 +235,7 @@ const processStoredReqs = async () => { const ereqs = encryptedStoredReqs encryptedStoredReqs = [] const mySecret = await Utils.mySecret() - const SEA = require('../Mediator').mySEA + const SEA = require('../../Mediator').mySEA const finalReqs = await Utils.asyncMap(ereqs, async er => { /** @type {StoredRequest} */ const r = { @@ -248,14 +254,13 @@ const processStoredReqs = async () => { let subbed = false /** - * * @param {StoredRequestsListener} cb */ const onStoredReqs = cb => { storedRequestsListeners.add(cb) if (!subbed) { - require('../Mediator') + require('../../Mediator') .getUser() .get(Key.STORED_REQS) .open(d => { @@ -280,8 +285,15 @@ const onStoredReqs = cb => { module.exports = { onAvatar, + getPubToAvatar, onDisplayName, + getPubToDn, onIncoming, + getPubToIncoming, onStoredReqs, - getStoredReqs + getStoredReqs, + onAddresses: require('./addresses').onAddresses, + getAddresses: require('./addresses').getAddresses, + onLastSentReqIDs: require('./lastSentReqID').onLastSentReqIDs, + getSentReqIDs: require('./lastSentReqID').getSentReqIDs } diff --git a/services/gunDB/contact-api/streams/lastSentReqID.js b/services/gunDB/contact-api/streams/lastSentReqID.js new file mode 100644 index 00000000..0c552922 --- /dev/null +++ b/services/gunDB/contact-api/streams/lastSentReqID.js @@ -0,0 +1,50 @@ +/** @format */ +const Key = require('../key') + +/** @type {Record} */ +let pubToLastSentReqID = {} + +/** @type {Set<() => void>} */ +const listeners = new Set() +const notify = () => listeners.forEach(l => l()) + +let subbed = false + +/** + * @param {() => void} cb + */ +const onLastSentReqIDs = cb => { + listeners.add(cb) + cb() + + if (!subbed) { + require('../../Mediator') + .getUser() + .get(Key.USER_TO_LAST_REQUEST_SENT) + .open(data => { + if (typeof data === 'object' && data !== null) { + for (const [pub, id] of Object.entries(data)) { + if (typeof id === 'string' || id === null) { + pubToLastSentReqID[pub] = id + } + } + } else { + pubToLastSentReqID = {} + } + + notify() + }) + subbed = true + } + + return () => { + listeners.delete(cb) + } +} + +const getSentReqIDs = () => pubToLastSentReqID + +module.exports = { + onLastSentReqIDs, + getSentReqIDs +}