new onoutgoing
This commit is contained in:
parent
ba99c7e400
commit
527072f538
1 changed files with 116 additions and 212 deletions
|
|
@ -27,105 +27,6 @@ const Config = require('../config')
|
|||
|
||||
const DEBOUNCE_WAIT_TIME = 500
|
||||
|
||||
/**
|
||||
* @param {string} outgoingKey
|
||||
* @param {(message: Message, key: string) => void} cb
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const __onOutgoingMessage = async (outgoingKey, cb, user, SEA) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
const mySecret = await SEA.secret(user._.sea.epub, user._.sea)
|
||||
if (typeof mySecret !== 'string') {
|
||||
throw new TypeError("typeof mySecret !== 'string'")
|
||||
}
|
||||
|
||||
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
||||
/** @type {string} */
|
||||
const encryptedForMeRecipientPublicKey = await Utils.tryAndWait(
|
||||
(_, user) =>
|
||||
new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(outgoingKey)
|
||||
.get('with')
|
||||
.once(erpk => {
|
||||
if (typeof erpk !== 'string') {
|
||||
rej(
|
||||
new TypeError("Expected outgoing.get('with') to be an string.")
|
||||
)
|
||||
} else if (erpk.length === 0) {
|
||||
rej(
|
||||
new TypeError(
|
||||
"Expected outgoing.get('with') to be a populated."
|
||||
)
|
||||
)
|
||||
} else {
|
||||
res(erpk)
|
||||
}
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
const recipientPublicKey = await SEA.decrypt(
|
||||
encryptedForMeRecipientPublicKey,
|
||||
mySecret
|
||||
)
|
||||
|
||||
if (typeof recipientPublicKey !== 'string') {
|
||||
throw new TypeError(
|
||||
"__onOutgoingMessage() -> typeof recipientPublicKey !== 'string'"
|
||||
)
|
||||
}
|
||||
|
||||
/** @type {string} */
|
||||
const recipientEpub = await Utils.pubToEpub(recipientPublicKey)
|
||||
|
||||
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
|
||||
|
||||
if (typeof ourSecret !== 'string') {
|
||||
throw new TypeError(
|
||||
"__onOutgoingMessage() -> typeof ourSecret !== 'string'"
|
||||
)
|
||||
}
|
||||
|
||||
user
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(outgoingKey)
|
||||
.get(Key.MESSAGES)
|
||||
.map()
|
||||
.on(async (msg, key) => {
|
||||
if (!Schema.isMessage(msg)) {
|
||||
console.warn('non message received: ' + JSON.stringify(msg))
|
||||
return
|
||||
}
|
||||
|
||||
let { body } = msg
|
||||
|
||||
if (body !== Actions.INITIAL_MSG) {
|
||||
const decrypted = await SEA.decrypt(body, ourSecret)
|
||||
|
||||
if (typeof decrypted !== 'string') {
|
||||
console.log("__onOutgoingMessage() -> typeof decrypted !== 'string'")
|
||||
} else {
|
||||
body = decrypted
|
||||
}
|
||||
}
|
||||
|
||||
callb(
|
||||
{
|
||||
body,
|
||||
timestamp: msg.timestamp
|
||||
},
|
||||
key
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps a sent request ID to the public key of the user it was sent to.
|
||||
* @param {(requestToUser: Record<string, string>) => void} cb
|
||||
|
|
@ -407,101 +308,105 @@ const onIncomingMessages = (cb, userPK, incomingFeedID, gun, user, SEA) => {
|
|||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {(outgoings: Record<string, Outgoing>) => void} cb
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @param {typeof __onOutgoingMessage} onOutgoingMessage
|
||||
* @typedef {Record<string, Outgoing|null>} Outgoings
|
||||
* @typedef {(outgoings: Outgoings) => void} OutgoingsListener
|
||||
*/
|
||||
const onOutgoing = async (
|
||||
cb,
|
||||
user,
|
||||
SEA,
|
||||
onOutgoingMessage = __onOutgoingMessage
|
||||
) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
||||
|
||||
const mySecret = await SEA.secret(user._.sea.epub, user._.sea)
|
||||
if (typeof mySecret !== 'string') {
|
||||
throw new TypeError("onOutgoing() -> typeof mySecret !== 'string'")
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {Record<string, Outgoing>}
|
||||
* @type {Outgoings}
|
||||
*/
|
||||
const outgoings = {}
|
||||
|
||||
callb(outgoings)
|
||||
export let currentOutgoings = {}
|
||||
|
||||
/**
|
||||
* @type {string[]}
|
||||
* @type {Outgoings}
|
||||
*/
|
||||
const outgoingsWithMessageListeners = []
|
||||
export let encryptedOutgoings = {}
|
||||
|
||||
/** @type {Set<string>} */
|
||||
const outgoingsDisconnected = new Set()
|
||||
/** @type {Set<OutgoingsListener>} */
|
||||
const outgoingsListeners = new Set()
|
||||
|
||||
user
|
||||
.get(Key.OUTGOINGS)
|
||||
.map()
|
||||
.on(async (data, key) => {
|
||||
if (!Schema.isPartialOutgoing(data)) {
|
||||
// if user disconnected
|
||||
if (data === null) {
|
||||
delete outgoings[key]
|
||||
outgoingsDisconnected.add(key)
|
||||
} else {
|
||||
console.warn('not partial outgoing')
|
||||
console.warn(JSON.stringify(data))
|
||||
export const notifyOutgoingsListeners = () => {
|
||||
outgoingsListeners.forEach(l => l(currentOutgoings))
|
||||
}
|
||||
|
||||
callb(outgoings)
|
||||
/** @type {UserGUNNode|null} */
|
||||
let lastUserWithListener = null
|
||||
|
||||
const processOutgoings = async () => {
|
||||
const outs = encryptedOutgoings
|
||||
encryptedOutgoings = {}
|
||||
const mySecret = await Utils.mySecret()
|
||||
const SEA = require('../Mediator').mySEA
|
||||
await Utils.asyncForEach(Object.entries(outs), async ([id, out]) => {
|
||||
if (out === null) {
|
||||
currentOutgoings[id] = null
|
||||
return
|
||||
}
|
||||
|
||||
const decryptedRecipientPublicKey = await SEA.decrypt(data.with, mySecret)
|
||||
if (typeof currentOutgoings[id] === 'undefined') {
|
||||
// We disable this rule because we are awaiting the result of the whole
|
||||
// for each AND each callback looks only at one single ID
|
||||
// eslint-disable-next-line require-atomic-updates
|
||||
currentOutgoings[id] = {
|
||||
messages: {},
|
||||
with: await SEA.decrypt(out.with, mySecret)
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof decryptedRecipientPublicKey !== 'string') {
|
||||
console.log(
|
||||
"onOutgoing() -> typeof decryptedRecipientPublicKey !== 'string'"
|
||||
const currentOut = currentOutgoings[id]
|
||||
if (currentOut === null) {
|
||||
return
|
||||
}
|
||||
|
||||
// on each open() only "messages" should change, not "with"
|
||||
// also messages are non-nullable and non-editable
|
||||
|
||||
await Utils.asyncForEach(
|
||||
Object.entries(out.messages),
|
||||
async ([msgID, msg]) => {
|
||||
if (!currentOut.messages[msgID]) {
|
||||
// each callback only looks at one particular msgID
|
||||
// eslint-disable-next-line require-atomic-updates
|
||||
currentOut.messages[msgID] = {
|
||||
body: await SEA.decrypt(msg.body, mySecret),
|
||||
timestamp: msg.timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
outgoings[key] = {
|
||||
messages: outgoings[key] ? outgoings[key].messages : {},
|
||||
with: decryptedRecipientPublicKey
|
||||
}
|
||||
|
||||
if (!outgoingsWithMessageListeners.includes(key)) {
|
||||
outgoingsWithMessageListeners.push(key)
|
||||
|
||||
onOutgoingMessage(
|
||||
key,
|
||||
(msg, msgKey) => {
|
||||
if (outgoingsDisconnected.has(key)) {
|
||||
return
|
||||
}
|
||||
|
||||
outgoings[key].messages = {
|
||||
...outgoings[key].messages,
|
||||
[msgKey]: msg
|
||||
}
|
||||
|
||||
callb(outgoings)
|
||||
},
|
||||
user,
|
||||
SEA
|
||||
)
|
||||
}
|
||||
|
||||
callb(outgoings)
|
||||
})
|
||||
notifyOutgoingsListeners()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {OutgoingsListener} cb
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onOutgoing = cb => {
|
||||
cb(currentOutgoings)
|
||||
|
||||
const currentUser = require('../Mediator').getUser()
|
||||
|
||||
if (lastUserWithListener !== currentUser) {
|
||||
// in case user changed gun alias
|
||||
currentOutgoings = {}
|
||||
encryptedOutgoings = {}
|
||||
lastUserWithListener = currentUser
|
||||
|
||||
currentUser.get(Key.OUTGOINGS).open(data => {
|
||||
// deactivate this listener when user changes
|
||||
if (lastUserWithListener !== require('../Mediator').getUser()) {
|
||||
return
|
||||
}
|
||||
// @ts-ignore Let's skip schema checks for perf reasons
|
||||
encryptedOutgoings = data
|
||||
processOutgoings()
|
||||
})
|
||||
}
|
||||
|
||||
return () => {
|
||||
outgoingsListeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -570,9 +475,11 @@ const onChats = (cb, gun, user, SEA) => {
|
|||
|
||||
callCB()
|
||||
|
||||
onOutgoing(
|
||||
async outgoings => {
|
||||
onOutgoing(async outgoings => {
|
||||
await Utils.asyncForEach(Object.values(outgoings), async outgoing => {
|
||||
if (outgoing === null) {
|
||||
return
|
||||
}
|
||||
const recipientPK = outgoing.with
|
||||
const incomingID = await Getters.userToIncomingID(recipientPK)
|
||||
const didDisconnect =
|
||||
|
|
@ -604,10 +511,7 @@ const onChats = (cb, gun, user, SEA) => {
|
|||
})
|
||||
|
||||
callCB()
|
||||
},
|
||||
user,
|
||||
SEA
|
||||
)
|
||||
})
|
||||
|
||||
__onUserToIncoming(
|
||||
async uti => {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue