diff --git a/services/gunDB/contact-api/streams/index.js b/services/gunDB/contact-api/streams/index.js index b3df5683..63201275 100644 --- a/services/gunDB/contact-api/streams/index.js +++ b/services/gunDB/contact-api/streams/index.js @@ -180,5 +180,6 @@ module.exports = { onAddresses: require('./addresses').onAddresses, getAddresses: require('./addresses').getAddresses, onLastSentReqIDs: require('./lastSentReqID').onLastSentReqIDs, - getSentReqIDs: require('./lastSentReqID').getSentReqIDs + getSentReqIDs: require('./lastSentReqID').getSentReqIDs, + PubToIncoming: require('./pubToIncoming') } diff --git a/services/gunDB/contact-api/streams/pubToIncoming.js b/services/gunDB/contact-api/streams/pubToIncoming.js index a8d54621..ba2b156e 100644 --- a/services/gunDB/contact-api/streams/pubToIncoming.js +++ b/services/gunDB/contact-api/streams/pubToIncoming.js @@ -1,146 +1,93 @@ /** @format */ -const { INITIAL_MSG } = require('../actions') -const Schema = require('../schema') -const Key = require('../key') -const Utils = require('../utils') +const uuidv1 = require('uuid/v1') +const debounce = require('lodash/debounce') + +const { USER_TO_INCOMING } = require('../key') +const { asyncForEach } = require('../utils') +/** @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData */ /** - * @typedef {import('../schema').ChatMessage} Message - * @typedef {Record} Incomings - * @typedef {(incomings: Incomings) => void} IncomingsListener + * @typedef {Record} PubToIncoming */ +/** @type {Set<() => void>} */ +const listeners = new Set() + +/** @type {PubToIncoming} */ +let pubToIncoming = {} + +const getPubToIncoming = () => pubToIncoming /** - * @type {Incomings} + * @param {PubToIncoming} pti + * @returns {void} */ -const currentPubToIncoming = {} - -const getPubToIncoming = () => currentPubToIncoming - -/** @type {Set} */ -const incomingsListeners = new Set() - -const notifyIncomingsListeners = () => { - incomingsListeners.forEach(l => l(currentPubToIncoming)) +const setPubToIncoming = pti => { + pubToIncoming = pti + listeners.forEach(l => l()) } -/** @type {Set} */ -const pubFeedPairsWithIncomingListeners = new Set() +let latestUpdate = uuidv1() + +const onOpen = debounce(async uti => { + const SEA = require('../../Mediator').mySEA + const mySec = require('../../Mediator').getMySecret() + const thisUpdate = uuidv1() + latestUpdate = thisUpdate + + if (typeof uti !== 'object' || uti === null) { + setPubToIncoming({}) + return + } + + /** @type {PubToIncoming} */ + const newPubToIncoming = {} + + await asyncForEach(Object.entries(uti), async ([pub, encFeedID]) => { + if (encFeedID === null) { + newPubToIncoming[pub] = null + return + } + + if (typeof encFeedID === 'string') { + newPubToIncoming[pub] = await SEA.decrypt(encFeedID, mySec) + } + }) + + // avoid old data from overwriting new data if decrypting took longer to + // process for the older open() call than for the newer open() call + if (latestUpdate === thisUpdate) { + setPubToIncoming(newPubToIncoming) + } +}, 750) let subbed = false /** - * @param {IncomingsListener} cb + * @param {() => void} cb + * @returns {() => void} */ -const onIncoming = cb => { - incomingsListeners.add(cb) +const onPubToIncoming = cb => { + if (!listeners.add(cb)) { + throw new Error('Tried to subscribe twice') + } - const user = require('../../Mediator').getUser() - const SEA = require('../../Mediator').mySEA + cb() if (!subbed) { - user.get(Key.USER_TO_INCOMING).open(uti => { - if (typeof uti !== 'object' || uti === null) { - return - } - - Object.entries(uti).forEach(async ([pub, encFeed]) => { - if (typeof encFeed !== 'string') { - return - } - const ourSecret = await SEA.secret( - await Utils.pubToEpub(pub), - user._.sea - ) - const mySecret = await Utils.mySecret() - - const feed = await SEA.decrypt(encFeed, mySecret) - - if (pubFeedPairsWithIncomingListeners.add(pub + '--' + feed)) { - require('../../Mediator') - .getGun() - .user(pub) - .get(Key.OUTGOINGS) - .get(feed) - .open(async data => { - if (data === null) { - currentPubToIncoming[pub] = null - return - } - - if (typeof data !== 'object') { - return - } - - if (typeof data.with !== 'string') { - return - } - - if (typeof data.messages !== 'object') { - return - } - - if (data.messages === null) { - return - } - - if (!Array.isArray(currentPubToIncoming[pub])) { - currentPubToIncoming[pub] = [ - { - body: INITIAL_MSG, - // hack one year - timestamp: Date.now() - 31556952, - id: Math.random().toString(), - outgoing: false - } - ] - } - - const msgs = /** @type {[string, Schema.Message][]} */ (Object.entries( - data.messages - ).filter(([_, msg]) => Schema.isMessage(msg))) - - // eslint-disable-next-line require-atomic-updates - currentPubToIncoming[pub] = await Utils.asyncMap( - msgs, - async ([msgid, msg]) => { - let decryptedBody = '' - - if (msg.body === INITIAL_MSG) { - decryptedBody = INITIAL_MSG - } else { - decryptedBody = await SEA.decrypt(msg.body, ourSecret) - } - - /** @type {Schema.ChatMessage} */ - const finalMsg = { - body: decryptedBody, - id: msgid, - outgoing: false, - timestamp: msg.timestamp - } - - return finalMsg - } - ) - - notifyIncomingsListeners() - }) - } - }) - }) - + const user = require('../../Mediator').getUser() + user.get(USER_TO_INCOMING).open(onOpen) subbed = true } - cb(getPubToIncoming()) - return () => { - incomingsListeners.delete(cb) + if (!listeners.delete(cb)) { + throw new Error('Tried to unsubscribe twice') + } } } module.exports = { - onIncoming, - getPubToIncoming + getPubToIncoming, + setPubToIncoming, + onPubToIncoming }