From 676853351594ba83fb5eaaefce434eba25be5a4f Mon Sep 17 00:00:00 2001 From: Daniel Lugo Date: Wed, 29 Jan 2020 19:38:19 -0400 Subject: [PATCH] pubtoincoming fixes --- services/gunDB/contact-api/streams/index.js | 126 +-------------- .../contact-api/streams/pubToIncoming.js | 144 ++++++++++++++++++ 2 files changed, 146 insertions(+), 124 deletions(-) create mode 100644 services/gunDB/contact-api/streams/pubToIncoming.js diff --git a/services/gunDB/contact-api/streams/index.js b/services/gunDB/contact-api/streams/index.js index 08d0d120..b620cd10 100644 --- a/services/gunDB/contact-api/streams/index.js +++ b/services/gunDB/contact-api/streams/index.js @@ -97,128 +97,6 @@ const onDisplayName = (cb, pub) => { } } -/** - * @typedef {import('../schema').ChatMessage[]} Message - * @typedef {Record} Incomings - * @typedef {(incomings: Incomings) => void} IncomingsListener - */ - -/** - * @type {Incomings} - */ -const pubToIncoming = {} - -const getPubToIncoming = () => pubToIncoming - -/** @type {Set} */ -const incomingsListeners = new Set() - -const notifyIncomingsListeners = () => { - incomingsListeners.forEach(l => l(pubToIncoming)) -} - -/** @type {Set} */ -const pubFeedPairsWithIncomingListeners = new Set() - -let subbed = false - -/** - * @param {IncomingsListener} cb - */ -const onIncoming = cb => { - incomingsListeners.add(cb) - - const user = require('../../Mediator').getUser() - const SEA = require('../../Mediator').mySEA - - if (!subbed) { - user.get(Key.USER_TO_INCOMING).open(uti => { - if (typeof uti !== 'object' || uti === null) { - return - } - - Object.entries(uti).forEach(async ([pub, encFeed]) => { - if (typeof encFeed !== 'string') { - return - } - const ourSecret = await SEA.secret( - await Utils.pubToEpub(pub), - user._.sea - ) - const mySecret = await Utils.mySecret() - - const feed = await SEA.decrypt(encFeed, mySecret) - - if (pubFeedPairsWithIncomingListeners.add(pub + '--' + feed)) { - require('../../Mediator') - .getGun() - .user(pub) - .get(Key.OUTGOINGS) - .get(feed) - .open(async data => { - if (data === null) { - pubToIncoming[pub] = null - return - } - - if (typeof data !== 'object') { - return - } - - if (typeof data.with !== 'string') { - return - } - - if (typeof data.messages !== 'object') { - return - } - - if (data.messages === null) { - return - } - - const msgs = /** @type {[string, Schema.Message][]} */ (Object.entries( - data.messages - ).filter(([_, msg]) => Schema.isMessage(msg))) - - // eslint-disable-next-line require-atomic-updates - pubToIncoming[pub] = await Utils.asyncMap( - msgs, - async ([msgid, msg]) => { - let decryptedBody = '' - - if (msg.body === INITIAL_MSG) { - decryptedBody = INITIAL_MSG - } else { - decryptedBody = await SEA.decrypt(msg.body, ourSecret) - } - - /** @type {Schema.ChatMessage} */ - const finalMsg = { - body: decryptedBody, - id: msgid, - outgoing: false, - timestamp: msg.timestamp - } - - return finalMsg - } - ) - - notifyIncomingsListeners() - }) - } - }) - }) - - subbed = true - } - - return () => { - incomingsListeners.delete(cb) - } -} - /** * @typedef {import('../schema').StoredRequest} StoredRequest * @typedef {(reqs: StoredRequest[]) => void} StoredRequestsListener @@ -296,8 +174,8 @@ module.exports = { getPubToAvatar, onDisplayName, getPubToDn, - onIncoming, - getPubToIncoming, + onIncoming: require('./pubToIncoming').onIncoming, + getPubToIncoming: require('./pubToIncoming').getPubToIncoming, onStoredReqs, getStoredReqs, onAddresses: require('./addresses').onAddresses, diff --git a/services/gunDB/contact-api/streams/pubToIncoming.js b/services/gunDB/contact-api/streams/pubToIncoming.js new file mode 100644 index 00000000..36eb3dff --- /dev/null +++ b/services/gunDB/contact-api/streams/pubToIncoming.js @@ -0,0 +1,144 @@ +/** @format */ +const { INITIAL_MSG } = require('../actions') +const Schema = require('../schema') +const Key = require('../key') +const Utils = require('../utils') + +/** + * @typedef {import('../schema').ChatMessage} Message + * @typedef {Record} Incomings + * @typedef {(incomings: Incomings) => void} IncomingsListener + */ + +/** + * @type {Incomings} + */ +const currentPubToIncoming = {} + +const getPubToIncoming = () => currentPubToIncoming + +/** @type {Set} */ +const incomingsListeners = new Set() + +const notifyIncomingsListeners = () => { + incomingsListeners.forEach(l => l(currentPubToIncoming)) +} + +/** @type {Set} */ +const pubFeedPairsWithIncomingListeners = new Set() + +let subbed = false + +/** + * @param {IncomingsListener} cb + */ +const onIncoming = cb => { + incomingsListeners.add(cb) + + const user = require('../../Mediator').getUser() + const SEA = require('../../Mediator').mySEA + + if (!subbed) { + user.get(Key.USER_TO_INCOMING).open(uti => { + if (typeof uti !== 'object' || uti === null) { + return + } + + Object.entries(uti).forEach(async ([pub, encFeed]) => { + if (typeof encFeed !== 'string') { + return + } + const ourSecret = await SEA.secret( + await Utils.pubToEpub(pub), + user._.sea + ) + const mySecret = await Utils.mySecret() + + const feed = await SEA.decrypt(encFeed, mySecret) + + if (pubFeedPairsWithIncomingListeners.add(pub + '--' + feed)) { + require('../../Mediator') + .getGun() + .user(pub) + .get(Key.OUTGOINGS) + .get(feed) + .open(async data => { + if (data === null) { + currentPubToIncoming[pub] = null + return + } + + if (typeof data !== 'object') { + return + } + + if (typeof data.with !== 'string') { + return + } + + if (typeof data.messages !== 'object') { + return + } + + if (data.messages === null) { + return + } + + if (!Array.isArray(currentPubToIncoming[pub])) { + currentPubToIncoming[pub] = [ + { + body: INITIAL_MSG, + // hack one year + timestamp: Date.now() - 31556952, + id: Math.random().toString(), + outgoing: false + } + ] + } + + const msgs = /** @type {[string, Schema.Message][]} */ (Object.entries( + data.messages + ).filter(([_, msg]) => Schema.isMessage(msg))) + + // eslint-disable-next-line require-atomic-updates + currentPubToIncoming[pub] = await Utils.asyncMap( + msgs, + async ([msgid, msg]) => { + let decryptedBody = '' + + if (msg.body === INITIAL_MSG) { + decryptedBody = INITIAL_MSG + } else { + decryptedBody = await SEA.decrypt(msg.body, ourSecret) + } + + /** @type {Schema.ChatMessage} */ + const finalMsg = { + body: decryptedBody, + id: msgid, + outgoing: false, + timestamp: msg.timestamp + } + + return finalMsg + } + ) + + notifyIncomingsListeners() + }) + } + }) + }) + + subbed = true + } + + return () => { + incomingsListeners.delete(cb) + } +} + +module.exports = { + onIncoming, + getPubToIncoming +}