propagate disconnect
This commit is contained in:
parent
f2c1fa5f12
commit
ebff8f4581
2 changed files with 143 additions and 80 deletions
|
|
@ -3,8 +3,11 @@
|
||||||
*/
|
*/
|
||||||
const debounce = require('lodash/debounce')
|
const debounce = require('lodash/debounce')
|
||||||
|
|
||||||
|
const { getGun } = require('../Mediator/index')
|
||||||
|
|
||||||
const Actions = require('./actions')
|
const Actions = require('./actions')
|
||||||
const ErrorCode = require('./errorCode')
|
const ErrorCode = require('./errorCode')
|
||||||
|
const Getters = require('./getters')
|
||||||
const Key = require('./key')
|
const Key = require('./key')
|
||||||
const Schema = require('./schema')
|
const Schema = require('./schema')
|
||||||
const Utils = require('./utils')
|
const Utils = require('./utils')
|
||||||
|
|
@ -175,7 +178,7 @@ const __onSentRequestToUser = async (cb, user, SEA) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {(userToOutgoing: Record<string, string>) => void} cb
|
* @param {(userToIncoming: Record<string, string>) => void} cb
|
||||||
* @param {UserGUNNode} user Pass only for testing purposes.
|
* @param {UserGUNNode} user Pass only for testing purposes.
|
||||||
* @param {ISEA} SEA
|
* @param {ISEA} SEA
|
||||||
* @returns {Promise<void>}
|
* @returns {Promise<void>}
|
||||||
|
|
@ -188,7 +191,7 @@ const __onUserToIncoming = async (cb, user, SEA) => {
|
||||||
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
||||||
|
|
||||||
/** @type {Record<string, string>} */
|
/** @type {Record<string, string>} */
|
||||||
const userToOutgoing = {}
|
const userToIncoming = {}
|
||||||
|
|
||||||
const mySecret = await SEA.secret(user._.sea.epub, user._.sea)
|
const mySecret = await SEA.secret(user._.sea.epub, user._.sea)
|
||||||
if (typeof mySecret !== 'string') {
|
if (typeof mySecret !== 'string') {
|
||||||
|
|
@ -200,7 +203,14 @@ const __onUserToIncoming = async (cb, user, SEA) => {
|
||||||
.map()
|
.map()
|
||||||
.on(async (encryptedIncomingID, userPub) => {
|
.on(async (encryptedIncomingID, userPub) => {
|
||||||
if (typeof encryptedIncomingID !== 'string') {
|
if (typeof encryptedIncomingID !== 'string') {
|
||||||
console.error('got a non string value')
|
if (encryptedIncomingID === null) {
|
||||||
|
// on disconnect
|
||||||
|
delete userToIncoming[userPub]
|
||||||
|
} else {
|
||||||
|
console.error(
|
||||||
|
'got a non string non null value inside user to incoming'
|
||||||
|
)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -216,9 +226,9 @@ const __onUserToIncoming = async (cb, user, SEA) => {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
userToOutgoing[userPub] = incomingID
|
userToIncoming[userPub] = incomingID
|
||||||
|
|
||||||
callb(userToOutgoing)
|
callb(userToIncoming)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -434,13 +444,25 @@ const onOutgoing = async (
|
||||||
*/
|
*/
|
||||||
const outgoingsWithMessageListeners = []
|
const outgoingsWithMessageListeners = []
|
||||||
|
|
||||||
|
/** @type {Set<string>} */
|
||||||
|
const outgoingsDisconnected = new Set()
|
||||||
|
|
||||||
user
|
user
|
||||||
.get(Key.OUTGOINGS)
|
.get(Key.OUTGOINGS)
|
||||||
.map()
|
.map()
|
||||||
.on(async (data, key) => {
|
.on(async (data, key) => {
|
||||||
if (!Schema.isPartialOutgoing(data)) {
|
if (!Schema.isPartialOutgoing(data)) {
|
||||||
console.warn('not partial outgoing')
|
// if user disconnected
|
||||||
console.warn(JSON.stringify(data))
|
if (data === null) {
|
||||||
|
delete outgoings[key]
|
||||||
|
outgoingsDisconnected.add(key)
|
||||||
|
} else {
|
||||||
|
console.warn('not partial outgoing')
|
||||||
|
console.warn(JSON.stringify(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
callb(outgoings)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -464,6 +486,10 @@ const onOutgoing = async (
|
||||||
onOutgoingMessage(
|
onOutgoingMessage(
|
||||||
key,
|
key,
|
||||||
(msg, msgKey) => {
|
(msg, msgKey) => {
|
||||||
|
if (outgoingsDisconnected.has(key)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
outgoings[key].messages = {
|
outgoings[key].messages = {
|
||||||
...outgoings[key].messages,
|
...outgoings[key].messages,
|
||||||
[msgKey]: msg
|
[msgKey]: msg
|
||||||
|
|
@ -517,6 +543,9 @@ const onChats = (cb, gun, user, SEA) => {
|
||||||
*/
|
*/
|
||||||
const usersWithIncomingListeners = []
|
const usersWithIncomingListeners = []
|
||||||
|
|
||||||
|
/** @type {Set<string>} */
|
||||||
|
const userWithDisconnectionListeners = new Set()
|
||||||
|
|
||||||
const _callCB = () => {
|
const _callCB = () => {
|
||||||
// Only provide chats that have incoming listeners which would be contacts
|
// Only provide chats that have incoming listeners which would be contacts
|
||||||
// that were actually accepted / are going on
|
// that were actually accepted / are going on
|
||||||
|
|
@ -544,16 +573,21 @@ const onChats = (cb, gun, user, SEA) => {
|
||||||
callCB()
|
callCB()
|
||||||
|
|
||||||
onOutgoing(
|
onOutgoing(
|
||||||
outgoings => {
|
async outgoings => {
|
||||||
for (const outgoing of Object.values(outgoings)) {
|
await Utils.asyncForEach(Object.values(outgoings), async outgoing => {
|
||||||
const recipientPK = outgoing.with
|
const recipientPK = outgoing.with
|
||||||
|
const incomingID = await Getters.userToIncomingID(recipientPK)
|
||||||
|
|
||||||
if (!recipientPKToChat[recipientPK]) {
|
if (!recipientPKToChat[recipientPK]) {
|
||||||
|
// eslint-disable-next-line require-atomic-updates
|
||||||
recipientPKToChat[recipientPK] = {
|
recipientPKToChat[recipientPK] = {
|
||||||
messages: [],
|
messages: [],
|
||||||
recipientAvatar: '',
|
recipientAvatar: '',
|
||||||
recipientDisplayName: Utils.defaultName(recipientPK),
|
recipientDisplayName: Utils.defaultName(recipientPK),
|
||||||
recipientPublicKey: recipientPK
|
recipientPublicKey: recipientPK,
|
||||||
|
didDisconnect:
|
||||||
|
!!incomingID &&
|
||||||
|
(await Utils.didDisconnect(recipientPK, incomingID))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -569,7 +603,7 @@ const onChats = (cb, gun, user, SEA) => {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
callCB()
|
callCB()
|
||||||
},
|
},
|
||||||
|
|
@ -578,77 +612,101 @@ const onChats = (cb, gun, user, SEA) => {
|
||||||
)
|
)
|
||||||
|
|
||||||
__onUserToIncoming(
|
__onUserToIncoming(
|
||||||
uti => {
|
async uti => {
|
||||||
for (const [recipientPK, incomingFeedID] of Object.entries(uti)) {
|
await Utils.asyncForEach(
|
||||||
if (!recipientPKToChat[recipientPK]) {
|
Object.entries(uti),
|
||||||
recipientPKToChat[recipientPK] = {
|
async ([recipientPK, incomingFeedID]) => {
|
||||||
messages: [],
|
if (!recipientPKToChat[recipientPK]) {
|
||||||
recipientAvatar: '',
|
// eslint-disable-next-line require-atomic-updates
|
||||||
recipientDisplayName: Utils.defaultName(recipientPK),
|
recipientPKToChat[recipientPK] = {
|
||||||
recipientPublicKey: recipientPK
|
messages: [],
|
||||||
|
recipientAvatar: '',
|
||||||
|
recipientDisplayName: Utils.defaultName(recipientPK),
|
||||||
|
recipientPublicKey: recipientPK,
|
||||||
|
didDisconnect: await Utils.didDisconnect(
|
||||||
|
recipientPK,
|
||||||
|
incomingFeedID
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const chat = recipientPKToChat[recipientPK]
|
||||||
|
|
||||||
|
if (!userWithDisconnectionListeners.has(recipientPK)) {
|
||||||
|
userWithDisconnectionListeners.add(recipientPK)
|
||||||
|
|
||||||
|
getGun()
|
||||||
|
.user(recipientPK)
|
||||||
|
.get(Key.OUTGOINGS)
|
||||||
|
.get(incomingFeedID)
|
||||||
|
.on(data => {
|
||||||
|
if (data === null) {
|
||||||
|
chat.didDisconnect = true
|
||||||
|
|
||||||
|
callCB()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!usersWithIncomingListeners.includes(recipientPK)) {
|
||||||
|
usersWithIncomingListeners.push(recipientPK)
|
||||||
|
|
||||||
|
onIncomingMessages(
|
||||||
|
msgs => {
|
||||||
|
for (const [msgK, msg] of Object.entries(msgs)) {
|
||||||
|
const { messages } = chat
|
||||||
|
|
||||||
|
if (!messages.find(_msg => _msg.id === msgK)) {
|
||||||
|
messages.push({
|
||||||
|
body: msg.body,
|
||||||
|
id: msgK,
|
||||||
|
outgoing: false,
|
||||||
|
timestamp: msg.timestamp
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
callCB()
|
||||||
|
},
|
||||||
|
recipientPK,
|
||||||
|
incomingFeedID,
|
||||||
|
gun,
|
||||||
|
user,
|
||||||
|
SEA
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!usersWithAvatarListeners.includes(recipientPK)) {
|
||||||
|
usersWithAvatarListeners.push(recipientPK)
|
||||||
|
|
||||||
|
gun
|
||||||
|
.user(recipientPK)
|
||||||
|
.get(Key.PROFILE)
|
||||||
|
.get(Key.AVATAR)
|
||||||
|
.on(avatar => {
|
||||||
|
if (typeof avatar === 'string') {
|
||||||
|
chat.recipientAvatar = avatar
|
||||||
|
callCB()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!usersWithDisplayNameListeners.includes(recipientPK)) {
|
||||||
|
usersWithDisplayNameListeners.push(recipientPK)
|
||||||
|
|
||||||
|
gun
|
||||||
|
.user(recipientPK)
|
||||||
|
.get(Key.PROFILE)
|
||||||
|
.get(Key.DISPLAY_NAME)
|
||||||
|
.on(displayName => {
|
||||||
|
if (typeof displayName === 'string') {
|
||||||
|
chat.recipientDisplayName = displayName
|
||||||
|
callCB()
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
)
|
||||||
const chat = recipientPKToChat[recipientPK]
|
|
||||||
|
|
||||||
if (!usersWithIncomingListeners.includes(recipientPK)) {
|
|
||||||
usersWithIncomingListeners.push(recipientPK)
|
|
||||||
|
|
||||||
onIncomingMessages(
|
|
||||||
msgs => {
|
|
||||||
for (const [msgK, msg] of Object.entries(msgs)) {
|
|
||||||
const { messages } = chat
|
|
||||||
|
|
||||||
if (!messages.find(_msg => _msg.id === msgK)) {
|
|
||||||
messages.push({
|
|
||||||
body: msg.body,
|
|
||||||
id: msgK,
|
|
||||||
outgoing: false,
|
|
||||||
timestamp: msg.timestamp
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
callCB()
|
|
||||||
},
|
|
||||||
recipientPK,
|
|
||||||
incomingFeedID,
|
|
||||||
gun,
|
|
||||||
user,
|
|
||||||
SEA
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!usersWithAvatarListeners.includes(recipientPK)) {
|
|
||||||
usersWithAvatarListeners.push(recipientPK)
|
|
||||||
|
|
||||||
gun
|
|
||||||
.user(recipientPK)
|
|
||||||
.get(Key.PROFILE)
|
|
||||||
.get(Key.AVATAR)
|
|
||||||
.on(avatar => {
|
|
||||||
if (typeof avatar === 'string') {
|
|
||||||
chat.recipientAvatar = avatar
|
|
||||||
callCB()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!usersWithDisplayNameListeners.includes(recipientPK)) {
|
|
||||||
usersWithDisplayNameListeners.push(recipientPK)
|
|
||||||
|
|
||||||
gun
|
|
||||||
.user(recipientPK)
|
|
||||||
.get(Key.PROFILE)
|
|
||||||
.get(Key.DISPLAY_NAME)
|
|
||||||
.on(displayName => {
|
|
||||||
if (typeof displayName === 'string') {
|
|
||||||
chat.recipientDisplayName = displayName
|
|
||||||
callCB()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
user,
|
user,
|
||||||
SEA
|
SEA
|
||||||
|
|
|
||||||
|
|
@ -69,6 +69,7 @@ exports.isChatMessage = item => {
|
||||||
* @prop {string} recipientPublicKey A way to uniquely identify each chat.
|
* @prop {string} recipientPublicKey A way to uniquely identify each chat.
|
||||||
* @prop {ChatMessage[]} messages Sorted from most recent to least recent.
|
* @prop {ChatMessage[]} messages Sorted from most recent to least recent.
|
||||||
* @prop {string|null} recipientDisplayName
|
* @prop {string|null} recipientDisplayName
|
||||||
|
* @prop {boolean} didDisconnect True if the recipient performed a disconnect.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -102,6 +103,10 @@ exports.isChat = item => {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (typeof obj.didDisconnect !== 'boolean') {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
return obj.messages.every(msg => exports.isChatMessage(msg))
|
return obj.messages.every(msg => exports.isChatMessage(msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue