pubToIncoming stream

This commit is contained in:
Daniel Lugo 2020-02-09 18:05:45 -04:00
parent 6774278e77
commit 12887a78b4
2 changed files with 69 additions and 121 deletions

View file

@ -180,5 +180,6 @@ module.exports = {
onAddresses: require('./addresses').onAddresses,
getAddresses: require('./addresses').getAddresses,
onLastSentReqIDs: require('./lastSentReqID').onLastSentReqIDs,
getSentReqIDs: require('./lastSentReqID').getSentReqIDs
getSentReqIDs: require('./lastSentReqID').getSentReqIDs,
PubToIncoming: require('./pubToIncoming')
}

View file

@ -1,146 +1,93 @@
/** @format */
const { INITIAL_MSG } = require('../actions')
const Schema = require('../schema')
const Key = require('../key')
const Utils = require('../utils')
const uuidv1 = require('uuid/v1')
const debounce = require('lodash/debounce')
const { USER_TO_INCOMING } = require('../key')
const { asyncForEach } = require('../utils')
/** @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData */
/**
* @typedef {import('../schema').ChatMessage} Message
* @typedef {Record<string, Message[]|null|undefined>} Incomings
* @typedef {(incomings: Incomings) => void} IncomingsListener
* @typedef {Record<string, string|null|undefined>} PubToIncoming
*/
/** @type {Set<() => void>} */
const listeners = new Set()
/** @type {PubToIncoming} */
let pubToIncoming = {}
const getPubToIncoming = () => pubToIncoming
/**
* @type {Incomings}
* @param {PubToIncoming} pti
* @returns {void}
*/
const currentPubToIncoming = {}
const getPubToIncoming = () => currentPubToIncoming
/** @type {Set<IncomingsListener>} */
const incomingsListeners = new Set()
const notifyIncomingsListeners = () => {
incomingsListeners.forEach(l => l(currentPubToIncoming))
const setPubToIncoming = pti => {
pubToIncoming = pti
listeners.forEach(l => l())
}
/** @type {Set<string>} */
const pubFeedPairsWithIncomingListeners = new Set()
let latestUpdate = uuidv1()
const onOpen = debounce(async uti => {
const SEA = require('../../Mediator').mySEA
const mySec = require('../../Mediator').getMySecret()
const thisUpdate = uuidv1()
latestUpdate = thisUpdate
if (typeof uti !== 'object' || uti === null) {
setPubToIncoming({})
return
}
/** @type {PubToIncoming} */
const newPubToIncoming = {}
await asyncForEach(Object.entries(uti), async ([pub, encFeedID]) => {
if (encFeedID === null) {
newPubToIncoming[pub] = null
return
}
if (typeof encFeedID === 'string') {
newPubToIncoming[pub] = await SEA.decrypt(encFeedID, mySec)
}
})
// avoid old data from overwriting new data if decrypting took longer to
// process for the older open() call than for the newer open() call
if (latestUpdate === thisUpdate) {
setPubToIncoming(newPubToIncoming)
}
}, 750)
let subbed = false
/**
* @param {IncomingsListener} cb
* @param {() => void} cb
* @returns {() => void}
*/
const onIncoming = cb => {
incomingsListeners.add(cb)
const onPubToIncoming = cb => {
if (!listeners.add(cb)) {
throw new Error('Tried to subscribe twice')
}
const user = require('../../Mediator').getUser()
const SEA = require('../../Mediator').mySEA
cb()
if (!subbed) {
user.get(Key.USER_TO_INCOMING).open(uti => {
if (typeof uti !== 'object' || uti === null) {
return
}
Object.entries(uti).forEach(async ([pub, encFeed]) => {
if (typeof encFeed !== 'string') {
return
}
const ourSecret = await SEA.secret(
await Utils.pubToEpub(pub),
user._.sea
)
const mySecret = await Utils.mySecret()
const feed = await SEA.decrypt(encFeed, mySecret)
if (pubFeedPairsWithIncomingListeners.add(pub + '--' + feed)) {
require('../../Mediator')
.getGun()
.user(pub)
.get(Key.OUTGOINGS)
.get(feed)
.open(async data => {
if (data === null) {
currentPubToIncoming[pub] = null
return
}
if (typeof data !== 'object') {
return
}
if (typeof data.with !== 'string') {
return
}
if (typeof data.messages !== 'object') {
return
}
if (data.messages === null) {
return
}
if (!Array.isArray(currentPubToIncoming[pub])) {
currentPubToIncoming[pub] = [
{
body: INITIAL_MSG,
// hack one year
timestamp: Date.now() - 31556952,
id: Math.random().toString(),
outgoing: false
}
]
}
const msgs = /** @type {[string, Schema.Message][]} */ (Object.entries(
data.messages
).filter(([_, msg]) => Schema.isMessage(msg)))
// eslint-disable-next-line require-atomic-updates
currentPubToIncoming[pub] = await Utils.asyncMap(
msgs,
async ([msgid, msg]) => {
let decryptedBody = ''
if (msg.body === INITIAL_MSG) {
decryptedBody = INITIAL_MSG
} else {
decryptedBody = await SEA.decrypt(msg.body, ourSecret)
}
/** @type {Schema.ChatMessage} */
const finalMsg = {
body: decryptedBody,
id: msgid,
outgoing: false,
timestamp: msg.timestamp
}
return finalMsg
}
)
notifyIncomingsListeners()
})
}
})
})
const user = require('../../Mediator').getUser()
user.get(USER_TO_INCOMING).open(onOpen)
subbed = true
}
cb(getPubToIncoming())
return () => {
incomingsListeners.delete(cb)
if (!listeners.delete(cb)) {
throw new Error('Tried to unsubscribe twice')
}
}
}
module.exports = {
onIncoming,
getPubToIncoming
getPubToIncoming,
setPubToIncoming,
onPubToIncoming
}