From 527072f538f480e8819893cd964a8ba2fc7bbf81 Mon Sep 17 00:00:00 2001 From: Daniel Lugo Date: Wed, 29 Jan 2020 11:34:14 -0400 Subject: [PATCH] new onoutgoing --- services/gunDB/contact-api/events.js | 328 ++++++++++----------------- 1 file changed, 116 insertions(+), 212 deletions(-) diff --git a/services/gunDB/contact-api/events.js b/services/gunDB/contact-api/events.js index 332a00a5..70c222ca 100644 --- a/services/gunDB/contact-api/events.js +++ b/services/gunDB/contact-api/events.js @@ -27,105 +27,6 @@ const Config = require('../config') const DEBOUNCE_WAIT_TIME = 500 -/** - * @param {string} outgoingKey - * @param {(message: Message, key: string) => void} cb - * @param {UserGUNNode} user - * @param {ISEA} SEA - * @returns {Promise} - */ -const __onOutgoingMessage = async (outgoingKey, cb, user, SEA) => { - if (!user.is) { - throw new Error(ErrorCode.NOT_AUTH) - } - - const mySecret = await SEA.secret(user._.sea.epub, user._.sea) - if (typeof mySecret !== 'string') { - throw new TypeError("typeof mySecret !== 'string'") - } - - const callb = debounce(cb, DEBOUNCE_WAIT_TIME) - /** @type {string} */ - const encryptedForMeRecipientPublicKey = await Utils.tryAndWait( - (_, user) => - new Promise((res, rej) => { - user - .get(Key.OUTGOINGS) - .get(outgoingKey) - .get('with') - .once(erpk => { - if (typeof erpk !== 'string') { - rej( - new TypeError("Expected outgoing.get('with') to be an string.") - ) - } else if (erpk.length === 0) { - rej( - new TypeError( - "Expected outgoing.get('with') to be a populated." - ) - ) - } else { - res(erpk) - } - }) - }) - ) - - const recipientPublicKey = await SEA.decrypt( - encryptedForMeRecipientPublicKey, - mySecret - ) - - if (typeof recipientPublicKey !== 'string') { - throw new TypeError( - "__onOutgoingMessage() -> typeof recipientPublicKey !== 'string'" - ) - } - - /** @type {string} */ - const recipientEpub = await Utils.pubToEpub(recipientPublicKey) - - const ourSecret = await SEA.secret(recipientEpub, user._.sea) - - if (typeof ourSecret !== 'string') { - throw new TypeError( - "__onOutgoingMessage() -> typeof ourSecret !== 'string'" - ) - } - - user - .get(Key.OUTGOINGS) - .get(outgoingKey) - .get(Key.MESSAGES) - .map() - .on(async (msg, key) => { - if (!Schema.isMessage(msg)) { - console.warn('non message received: ' + JSON.stringify(msg)) - return - } - - let { body } = msg - - if (body !== Actions.INITIAL_MSG) { - const decrypted = await SEA.decrypt(body, ourSecret) - - if (typeof decrypted !== 'string') { - console.log("__onOutgoingMessage() -> typeof decrypted !== 'string'") - } else { - body = decrypted - } - } - - callb( - { - body, - timestamp: msg.timestamp - }, - key - ) - }) -} - /** * Maps a sent request ID to the public key of the user it was sent to. * @param {(requestToUser: Record) => void} cb @@ -407,101 +308,105 @@ const onIncomingMessages = (cb, userPK, incomingFeedID, gun, user, SEA) => { } /** - * - * @param {(outgoings: Record) => void} cb - * @param {UserGUNNode} user - * @param {ISEA} SEA - * @param {typeof __onOutgoingMessage} onOutgoingMessage + * @typedef {Record} Outgoings + * @typedef {(outgoings: Outgoings) => void} OutgoingsListener */ -const onOutgoing = async ( - cb, - user, - SEA, - onOutgoingMessage = __onOutgoingMessage -) => { - if (!user.is) { - throw new Error(ErrorCode.NOT_AUTH) - } - const callb = debounce(cb, DEBOUNCE_WAIT_TIME) +/** + * @type {Outgoings} + */ +export let currentOutgoings = {} - const mySecret = await SEA.secret(user._.sea.epub, user._.sea) - if (typeof mySecret !== 'string') { - throw new TypeError("onOutgoing() -> typeof mySecret !== 'string'") - } +/** + * @type {Outgoings} + */ +export let encryptedOutgoings = {} - /** - * @type {Record} - */ - const outgoings = {} +/** @type {Set} */ +const outgoingsListeners = new Set() - callb(outgoings) +export const notifyOutgoingsListeners = () => { + outgoingsListeners.forEach(l => l(currentOutgoings)) +} - /** - * @type {string[]} - */ - const outgoingsWithMessageListeners = [] +/** @type {UserGUNNode|null} */ +let lastUserWithListener = null - /** @type {Set} */ - const outgoingsDisconnected = new Set() +const processOutgoings = async () => { + const outs = encryptedOutgoings + encryptedOutgoings = {} + const mySecret = await Utils.mySecret() + const SEA = require('../Mediator').mySEA + await Utils.asyncForEach(Object.entries(outs), async ([id, out]) => { + if (out === null) { + currentOutgoings[id] = null + return + } - user - .get(Key.OUTGOINGS) - .map() - .on(async (data, key) => { - if (!Schema.isPartialOutgoing(data)) { - // if user disconnected - if (data === null) { - delete outgoings[key] - outgoingsDisconnected.add(key) - } else { - console.warn('not partial outgoing') - console.warn(JSON.stringify(data)) + if (typeof currentOutgoings[id] === 'undefined') { + // We disable this rule because we are awaiting the result of the whole + // for each AND each callback looks only at one single ID + // eslint-disable-next-line require-atomic-updates + currentOutgoings[id] = { + messages: {}, + with: await SEA.decrypt(out.with, mySecret) + } + } + + const currentOut = currentOutgoings[id] + if (currentOut === null) { + return + } + + // on each open() only "messages" should change, not "with" + // also messages are non-nullable and non-editable + + await Utils.asyncForEach( + Object.entries(out.messages), + async ([msgID, msg]) => { + if (!currentOut.messages[msgID]) { + // each callback only looks at one particular msgID + // eslint-disable-next-line require-atomic-updates + currentOut.messages[msgID] = { + body: await SEA.decrypt(msg.body, mySecret), + timestamp: msg.timestamp + } } + } + ) + }) + notifyOutgoingsListeners() +} - callb(outgoings) +/** + * @param {OutgoingsListener} cb + * @returns {() => void} + */ +const onOutgoing = cb => { + cb(currentOutgoings) + const currentUser = require('../Mediator').getUser() + + if (lastUserWithListener !== currentUser) { + // in case user changed gun alias + currentOutgoings = {} + encryptedOutgoings = {} + lastUserWithListener = currentUser + + currentUser.get(Key.OUTGOINGS).open(data => { + // deactivate this listener when user changes + if (lastUserWithListener !== require('../Mediator').getUser()) { return } - - const decryptedRecipientPublicKey = await SEA.decrypt(data.with, mySecret) - - if (typeof decryptedRecipientPublicKey !== 'string') { - console.log( - "onOutgoing() -> typeof decryptedRecipientPublicKey !== 'string'" - ) - return - } - - outgoings[key] = { - messages: outgoings[key] ? outgoings[key].messages : {}, - with: decryptedRecipientPublicKey - } - - if (!outgoingsWithMessageListeners.includes(key)) { - outgoingsWithMessageListeners.push(key) - - onOutgoingMessage( - key, - (msg, msgKey) => { - if (outgoingsDisconnected.has(key)) { - return - } - - outgoings[key].messages = { - ...outgoings[key].messages, - [msgKey]: msg - } - - callb(outgoings) - }, - user, - SEA - ) - } - - callb(outgoings) + // @ts-ignore Let's skip schema checks for perf reasons + encryptedOutgoings = data + processOutgoings() }) + } + + return () => { + outgoingsListeners.delete(cb) + } } /** @@ -570,44 +475,43 @@ const onChats = (cb, gun, user, SEA) => { callCB() - onOutgoing( - async outgoings => { - await Utils.asyncForEach(Object.values(outgoings), async outgoing => { - const recipientPK = outgoing.with - const incomingID = await Getters.userToIncomingID(recipientPK) - const didDisconnect = - !!incomingID && (await Utils.didDisconnect(recipientPK, incomingID)) + onOutgoing(async outgoings => { + await Utils.asyncForEach(Object.values(outgoings), async outgoing => { + if (outgoing === null) { + return + } + const recipientPK = outgoing.with + const incomingID = await Getters.userToIncomingID(recipientPK) + const didDisconnect = + !!incomingID && (await Utils.didDisconnect(recipientPK, incomingID)) - if (!recipientPKToChat[recipientPK]) { - recipientPKToChat[recipientPK] = { - messages: [], - recipientAvatar: null, - recipientDisplayName: Utils.defaultName(recipientPK), - recipientPublicKey: recipientPK, - didDisconnect, - id: recipientPK + incomingID - } + if (!recipientPKToChat[recipientPK]) { + recipientPKToChat[recipientPK] = { + messages: [], + recipientAvatar: null, + recipientDisplayName: Utils.defaultName(recipientPK), + recipientPublicKey: recipientPK, + didDisconnect, + id: recipientPK + incomingID } + } - const { messages } = recipientPKToChat[recipientPK] + const { messages } = recipientPKToChat[recipientPK] - for (const [msgK, msg] of Object.entries(outgoing.messages)) { - if (!messages.find(_msg => _msg.id === msgK)) { - messages.push({ - body: msg.body, - id: msgK, - outgoing: true, - timestamp: msg.timestamp - }) - } + for (const [msgK, msg] of Object.entries(outgoing.messages)) { + if (!messages.find(_msg => _msg.id === msgK)) { + messages.push({ + body: msg.body, + id: msgK, + outgoing: true, + timestamp: msg.timestamp + }) } - }) + } + }) - callCB() - }, - user, - SEA - ) + callCB() + }) __onUserToIncoming( async uti => {