From ebff8f458123495a2c7b1c973170ab0150ad272f Mon Sep 17 00:00:00 2001 From: Daniel Lugo Date: Sat, 25 Jan 2020 18:46:00 -0400 Subject: [PATCH] propagate disconnect --- services/gunDB/contact-api/events.js | 218 +++++++++++++++++---------- services/gunDB/contact-api/schema.js | 5 + 2 files changed, 143 insertions(+), 80 deletions(-) diff --git a/services/gunDB/contact-api/events.js b/services/gunDB/contact-api/events.js index a0c14d9f..58d30224 100644 --- a/services/gunDB/contact-api/events.js +++ b/services/gunDB/contact-api/events.js @@ -3,8 +3,11 @@ */ const debounce = require('lodash/debounce') +const { getGun } = require('../Mediator/index') + const Actions = require('./actions') const ErrorCode = require('./errorCode') +const Getters = require('./getters') const Key = require('./key') const Schema = require('./schema') const Utils = require('./utils') @@ -175,7 +178,7 @@ const __onSentRequestToUser = async (cb, user, SEA) => { } /** - * @param {(userToOutgoing: Record) => void} cb + * @param {(userToIncoming: Record) => void} cb * @param {UserGUNNode} user Pass only for testing purposes. * @param {ISEA} SEA * @returns {Promise} @@ -188,7 +191,7 @@ const __onUserToIncoming = async (cb, user, SEA) => { const callb = debounce(cb, DEBOUNCE_WAIT_TIME) /** @type {Record} */ - const userToOutgoing = {} + const userToIncoming = {} const mySecret = await SEA.secret(user._.sea.epub, user._.sea) if (typeof mySecret !== 'string') { @@ -200,7 +203,14 @@ const __onUserToIncoming = async (cb, user, SEA) => { .map() .on(async (encryptedIncomingID, userPub) => { if (typeof encryptedIncomingID !== 'string') { - console.error('got a non string value') + if (encryptedIncomingID === null) { + // on disconnect + delete userToIncoming[userPub] + } else { + console.error( + 'got a non string non null value inside user to incoming' + ) + } return } @@ -216,9 +226,9 @@ const __onUserToIncoming = async (cb, user, SEA) => { return } - userToOutgoing[userPub] = incomingID + userToIncoming[userPub] = incomingID - callb(userToOutgoing) + callb(userToIncoming) }) } @@ -434,13 +444,25 @@ const onOutgoing = async ( */ const outgoingsWithMessageListeners = [] + /** @type {Set} */ + const outgoingsDisconnected = new Set() + user .get(Key.OUTGOINGS) .map() .on(async (data, key) => { if (!Schema.isPartialOutgoing(data)) { - console.warn('not partial outgoing') - console.warn(JSON.stringify(data)) + // if user disconnected + if (data === null) { + delete outgoings[key] + outgoingsDisconnected.add(key) + } else { + console.warn('not partial outgoing') + console.warn(JSON.stringify(data)) + } + + callb(outgoings) + return } @@ -464,6 +486,10 @@ const onOutgoing = async ( onOutgoingMessage( key, (msg, msgKey) => { + if (outgoingsDisconnected.has(key)) { + return + } + outgoings[key].messages = { ...outgoings[key].messages, [msgKey]: msg @@ -517,6 +543,9 @@ const onChats = (cb, gun, user, SEA) => { */ const usersWithIncomingListeners = [] + /** @type {Set} */ + const userWithDisconnectionListeners = new Set() + const _callCB = () => { // Only provide chats that have incoming listeners which would be contacts // that were actually accepted / are going on @@ -544,16 +573,21 @@ const onChats = (cb, gun, user, SEA) => { callCB() onOutgoing( - outgoings => { - for (const outgoing of Object.values(outgoings)) { + async outgoings => { + await Utils.asyncForEach(Object.values(outgoings), async outgoing => { const recipientPK = outgoing.with + const incomingID = await Getters.userToIncomingID(recipientPK) if (!recipientPKToChat[recipientPK]) { + // eslint-disable-next-line require-atomic-updates recipientPKToChat[recipientPK] = { messages: [], recipientAvatar: '', recipientDisplayName: Utils.defaultName(recipientPK), - recipientPublicKey: recipientPK + recipientPublicKey: recipientPK, + didDisconnect: + !!incomingID && + (await Utils.didDisconnect(recipientPK, incomingID)) } } @@ -569,7 +603,7 @@ const onChats = (cb, gun, user, SEA) => { }) } } - } + }) callCB() }, @@ -578,77 +612,101 @@ const onChats = (cb, gun, user, SEA) => { ) __onUserToIncoming( - uti => { - for (const [recipientPK, incomingFeedID] of Object.entries(uti)) { - if (!recipientPKToChat[recipientPK]) { - recipientPKToChat[recipientPK] = { - messages: [], - recipientAvatar: '', - recipientDisplayName: Utils.defaultName(recipientPK), - recipientPublicKey: recipientPK + async uti => { + await Utils.asyncForEach( + Object.entries(uti), + async ([recipientPK, incomingFeedID]) => { + if (!recipientPKToChat[recipientPK]) { + // eslint-disable-next-line require-atomic-updates + recipientPKToChat[recipientPK] = { + messages: [], + recipientAvatar: '', + recipientDisplayName: Utils.defaultName(recipientPK), + recipientPublicKey: recipientPK, + didDisconnect: await Utils.didDisconnect( + recipientPK, + incomingFeedID + ) + } + } + + const chat = recipientPKToChat[recipientPK] + + if (!userWithDisconnectionListeners.has(recipientPK)) { + userWithDisconnectionListeners.add(recipientPK) + + getGun() + .user(recipientPK) + .get(Key.OUTGOINGS) + .get(incomingFeedID) + .on(data => { + if (data === null) { + chat.didDisconnect = true + + callCB() + } + }) + } + + if (!usersWithIncomingListeners.includes(recipientPK)) { + usersWithIncomingListeners.push(recipientPK) + + onIncomingMessages( + msgs => { + for (const [msgK, msg] of Object.entries(msgs)) { + const { messages } = chat + + if (!messages.find(_msg => _msg.id === msgK)) { + messages.push({ + body: msg.body, + id: msgK, + outgoing: false, + timestamp: msg.timestamp + }) + } + } + + callCB() + }, + recipientPK, + incomingFeedID, + gun, + user, + SEA + ) + } + + if (!usersWithAvatarListeners.includes(recipientPK)) { + usersWithAvatarListeners.push(recipientPK) + + gun + .user(recipientPK) + .get(Key.PROFILE) + .get(Key.AVATAR) + .on(avatar => { + if (typeof avatar === 'string') { + chat.recipientAvatar = avatar + callCB() + } + }) + } + + if (!usersWithDisplayNameListeners.includes(recipientPK)) { + usersWithDisplayNameListeners.push(recipientPK) + + gun + .user(recipientPK) + .get(Key.PROFILE) + .get(Key.DISPLAY_NAME) + .on(displayName => { + if (typeof displayName === 'string') { + chat.recipientDisplayName = displayName + callCB() + } + }) } } - - const chat = recipientPKToChat[recipientPK] - - if (!usersWithIncomingListeners.includes(recipientPK)) { - usersWithIncomingListeners.push(recipientPK) - - onIncomingMessages( - msgs => { - for (const [msgK, msg] of Object.entries(msgs)) { - const { messages } = chat - - if (!messages.find(_msg => _msg.id === msgK)) { - messages.push({ - body: msg.body, - id: msgK, - outgoing: false, - timestamp: msg.timestamp - }) - } - } - - callCB() - }, - recipientPK, - incomingFeedID, - gun, - user, - SEA - ) - } - - if (!usersWithAvatarListeners.includes(recipientPK)) { - usersWithAvatarListeners.push(recipientPK) - - gun - .user(recipientPK) - .get(Key.PROFILE) - .get(Key.AVATAR) - .on(avatar => { - if (typeof avatar === 'string') { - chat.recipientAvatar = avatar - callCB() - } - }) - } - - if (!usersWithDisplayNameListeners.includes(recipientPK)) { - usersWithDisplayNameListeners.push(recipientPK) - - gun - .user(recipientPK) - .get(Key.PROFILE) - .get(Key.DISPLAY_NAME) - .on(displayName => { - if (typeof displayName === 'string') { - chat.recipientDisplayName = displayName - callCB() - } - }) - } - } + ) }, user, SEA diff --git a/services/gunDB/contact-api/schema.js b/services/gunDB/contact-api/schema.js index 9a7d57bf..fceb2d11 100644 --- a/services/gunDB/contact-api/schema.js +++ b/services/gunDB/contact-api/schema.js @@ -69,6 +69,7 @@ exports.isChatMessage = item => { * @prop {string} recipientPublicKey A way to uniquely identify each chat. * @prop {ChatMessage[]} messages Sorted from most recent to least recent. * @prop {string|null} recipientDisplayName + * @prop {boolean} didDisconnect True if the recipient performed a disconnect. */ /** @@ -102,6 +103,10 @@ exports.isChat = item => { return false } + if (typeof obj.didDisconnect !== 'boolean') { + return false + } + return obj.messages.every(msg => exports.isChatMessage(msg)) }