better received reqs

This commit is contained in:
Daniel Lugo 2020-01-29 16:34:40 -04:00
parent 64cff4b64b
commit 8513493176
2 changed files with 167 additions and 328 deletions

View file

@ -3,26 +3,25 @@
*/ */
const debounce = require('lodash/debounce') const debounce = require('lodash/debounce')
const Actions = require('./actions') const Actions = require('../actions')
const ErrorCode = require('./errorCode') const ErrorCode = require('../errorCode')
const Key = require('./key') const Key = require('../key')
const Schema = require('./schema') const Schema = require('../schema')
const Streams = require('./streams') const Streams = require('../streams')
const Utils = require('./utils') const Utils = require('../utils')
const Config = require('../config')
/** /**
* @typedef {import('./SimpleGUN').UserGUNNode} UserGUNNode * @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
* @typedef {import('./SimpleGUN').GUNNode} GUNNode * @typedef {import('../SimpleGUN').GUNNode} GUNNode
* @typedef {import('./SimpleGUN').ISEA} ISEA * @typedef {import('../SimpleGUN').ISEA} ISEA
* @typedef {import('./SimpleGUN').ListenerData} ListenerData * @typedef {import('../SimpleGUN').ListenerData} ListenerData
* @typedef {import('./schema').HandshakeRequest} HandshakeRequest * @typedef {import('../schema').HandshakeRequest} HandshakeRequest
* @typedef {import('./schema').Message} Message * @typedef {import('../schema').Message} Message
* @typedef {import('./schema').Outgoing} Outgoing * @typedef {import('../schema').Outgoing} Outgoing
* @typedef {import('./schema').PartialOutgoing} PartialOutgoing * @typedef {import('../schema').PartialOutgoing} PartialOutgoing
* @typedef {import('./schema').Chat} Chat * @typedef {import('../schema').Chat} Chat
* @typedef {import('./schema').ChatMessage} ChatMessage * @typedef {import('../schema').ChatMessage} ChatMessage
* @typedef {import('./schema').SimpleSentRequest} SimpleSentRequest * @typedef {import('../schema').SimpleSentRequest} SimpleSentRequest
* @typedef {import('./schema').SimpleReceivedRequest} SimpleReceivedRequest * @typedef {import('../schema').SimpleReceivedRequest} SimpleReceivedRequest
*/ */
const DEBOUNCE_WAIT_TIME = 500 const DEBOUNCE_WAIT_TIME = 500
@ -336,8 +335,8 @@ const processOutgoings = async () => {
const outs = encryptedOutgoings const outs = encryptedOutgoings
encryptedOutgoings = {} encryptedOutgoings = {}
const mySecret = await Utils.mySecret() const mySecret = await Utils.mySecret()
const SEA = require('../Mediator').mySEA const SEA = require('../../Mediator').mySEA
const user = require('../Mediator').getUser() const user = require('../../Mediator').getUser()
await Utils.asyncForEach(Object.entries(outs), async ([id, out]) => { await Utils.asyncForEach(Object.entries(outs), async ([id, out]) => {
if (out === null) { if (out === null) {
currentOutgoings[id] = null currentOutgoings[id] = null
@ -407,7 +406,7 @@ const onOutgoing = cb => {
outgoingsListeners.add(cb) outgoingsListeners.add(cb)
cb(currentOutgoings) cb(currentOutgoings)
const currentUser = require('../Mediator').getUser() const currentUser = require('../../Mediator').getUser()
if (lastUserWithListener !== currentUser) { if (lastUserWithListener !== currentUser) {
// in case user changed gun alias // in case user changed gun alias
@ -417,7 +416,7 @@ const onOutgoing = cb => {
currentUser.get(Key.OUTGOINGS).open(data => { currentUser.get(Key.OUTGOINGS).open(data => {
// deactivate this listener when user changes // deactivate this listener when user changes
if (lastUserWithListener !== require('../Mediator').getUser()) { if (lastUserWithListener !== require('../../Mediator').getUser()) {
return return
} }
// @ts-ignore Let's skip schema checks for perf reasons // @ts-ignore Let's skip schema checks for perf reasons
@ -540,310 +539,6 @@ const onChats = cb => {
} }
} }
/**
*
* @param {(simpleReceivedRequests: SimpleReceivedRequest[]) => void} cb
* @param {GUNNode} gun
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @returns {void}
*/
const onSimplerReceivedRequests = (cb, gun, user, SEA) => {
try {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
/** @type {Record<string, HandshakeRequest>} */
const idToRequest = {}
/** @type {string[]} */
const requestorsWithAvatarListeners = []
/** @type {string[]} */
const requestorsWithDisplayNameListeners = []
/**
* @type {Partial<Record<string, string|null>>}
*/
const requestorToAvatar = {}
/**
* @type {Partial<Record<string, string|null>>}
*/
const requestorToDisplayName = {}
/** @type {Set<string>} */
const requestorsAlreadyAccepted = new Set()
/**
* We cannot call gun.off(), so keep track of the current handshake addres.
* And only run the listeners for the handshake nodes if they are for the
* current handshake address node.
*/
let currentHandshakeAddress = ''
////////////////////////////////////////////////////////////////////////////
const _callCB = async () => {
try {
const requestEntries = Object.entries(idToRequest)
if (Config.SHOW_LOG) {
console.log('requestorsAlreadyAccepted')
console.log(requestorsAlreadyAccepted)
console.log('/requestorsAlreadyAccepted')
}
if (Config.SHOW_LOG) {
console.log('raw requests:')
console.log(idToRequest)
console.log('/raw requests')
}
// avoid race conditions due to gun's reactive nature.
const onlyInCurrentHandshakeNode = await Utils.asyncFilter(
requestEntries,
async ([id]) => {
try {
const HNAddr = await Utils.tryAndWait(async (_, user) => {
const data = await user
.get(Key.CURRENT_HANDSHAKE_ADDRESS)
.then()
if (typeof data !== 'string') {
throw new Error('handshake address not an string')
}
return data
})
const maybeHreq = await Utils.tryAndWait(gun =>
gun
.get(Key.HANDSHAKE_NODES)
.get(HNAddr)
.get(id)
.then()
)
return Schema.isHandshakeRequest(maybeHreq)
} catch (err) {
console.log(`error for request ID: ${id}`)
throw err
}
}
)
if (Config.SHOW_LOG) {
console.log('onlyInCurrentHandshakeNode')
console.log(onlyInCurrentHandshakeNode)
console.log('/onlyInCurrentHandshakeNode')
}
// USER-TO-INCOMING (which indicates acceptance of this request) write
// might not be in there by the time we are looking at these requests.
// Let's account for this.
const notAccepted = await Utils.asyncFilter(
onlyInCurrentHandshakeNode,
async ([reqID, req]) => {
try {
if (requestorsAlreadyAccepted.has(req.from)) {
return false
}
const requestorEpub = await Utils.pubToEpub(req.from)
const ourSecret = await SEA.secret(requestorEpub, user._.sea)
if (typeof ourSecret !== 'string') {
throw new TypeError('typeof ourSecret !== "string"')
}
const decryptedResponse = await SEA.decrypt(
req.response,
ourSecret
)
if (typeof decryptedResponse !== 'string') {
throw new TypeError('typeof decryptedResponse !== "string"')
}
const outfeedID = decryptedResponse
if (Config.SHOW_LOG) {
console.log('\n')
console.log('--------outfeedID----------')
console.log(outfeedID)
console.log('------------------')
console.log('\n')
}
const maybeOutfeed = await Utils.tryAndWait(gun =>
gun
.user(req.from)
.get(Key.OUTGOINGS)
.get(outfeedID)
.then()
)
if (Config.SHOW_LOG) {
console.log('\n')
console.log('--------maybeOutfeed----------')
console.log(maybeOutfeed)
console.log('------------------')
console.log('\n')
}
const wasAccepted = Schema.isHandshakeRequest(maybeOutfeed)
return !wasAccepted
} catch (err) {
console.log(`error for request ID: ${reqID}`)
throw err
}
}
)
if (Config.SHOW_LOG) {
console.log('notAccepted')
console.log(notAccepted)
console.log('/notAccepted')
}
const simpleReceivedReqs = notAccepted.map(([reqID, req]) => {
try {
const { from: requestorPub } = req
/** @type {SimpleReceivedRequest} */
const simpleReceivedReq = {
id: reqID,
requestorAvatar: requestorToAvatar[requestorPub] || null,
requestorDisplayName:
requestorToDisplayName[requestorPub] ||
Utils.defaultName(requestorPub),
requestorPK: requestorPub,
response: req.response,
timestamp: req.timestamp
}
return simpleReceivedReq
} catch (err) {
console.log(`error for request ID: ${reqID}`)
throw err
}
})
cb(simpleReceivedReqs)
} catch (err) {
console.error(err)
}
}
const callCB = debounce(_callCB, DEBOUNCE_WAIT_TIME)
callCB()
user
.get(Key.USER_TO_INCOMING)
.map()
.on((incomingID, userPK) => {
const disconnected = incomingID === null
if (disconnected) {
requestorsAlreadyAccepted.delete(userPK)
} else {
requestorsAlreadyAccepted.add(userPK)
}
callCB()
})
////////////////////////////////////////////////////////////////////////////
/**
* @param {string} addr
* @returns {(req: ListenerData, reqID: string) => void}
*/
const listenerForAddr = addr => (req, reqID) => {
try {
if (addr !== currentHandshakeAddress) {
console.log(
'onSimplerReceivedRequests() -> listenerForAddr() -> stale handshake address, quitting'
)
return
}
if (!Schema.isHandshakeRequest(req)) {
console.log(
'onSimplerReceivedRequests() -> listenerForAddr() -> bad handshake request, quitting'
)
console.log(req)
return
}
idToRequest[reqID] = req
callCB()
if (!requestorsWithAvatarListeners.includes(req.from)) {
requestorsWithAvatarListeners.push(req.from)
gun
.user(req.from)
.get(Key.PROFILE)
.get(Key.AVATAR)
.on(avatar => {
if (typeof avatar === 'string' || avatar === null) {
// || handles empty strings
requestorToAvatar[req.from] = avatar || null
callCB()
}
})
}
if (!requestorsWithDisplayNameListeners.includes(req.from)) {
requestorsWithDisplayNameListeners.push(req.from)
gun
.user(req.from)
.get(Key.PROFILE)
.get(Key.DISPLAY_NAME)
.on(displayName => {
if (typeof displayName === 'string' || displayName === null) {
// || handles empty strings
requestorToDisplayName[req.from] = displayName || null
callCB()
}
})
}
} catch (err) {
console.log('onSimplerReceivedRequests() -> listenerForAddr() ->')
console.log(err)
}
callCB()
}
////////////////////////////////////////////////////////////////////////////
user.get(Key.CURRENT_HANDSHAKE_ADDRESS).on(addr => {
if (typeof addr !== 'string') {
throw new TypeError('current handshake address not an string')
}
console.log(
`onSimplerReceivedRequests() -> setting current address to ${addr}`
)
currentHandshakeAddress = addr
gun
.get(Key.HANDSHAKE_NODES)
.get(addr)
.map()
.on(listenerForAddr(addr))
callCB()
})
} catch (err) {
console.log(`onSimplerReceivedRequests() -> ${err.message}`)
}
}
/** /**
* @param {(sentRequests: SimpleSentRequest[]) => void} cb * @param {(sentRequests: SimpleSentRequest[]) => void} cb
* @param {GUNNode} gun * @param {GUNNode} gun
@ -1158,7 +853,7 @@ module.exports = {
onIncomingMessages, onIncomingMessages,
onOutgoing, onOutgoing,
onChats, onChats,
onSimplerReceivedRequests, onSimplerReceivedRequests: require('./onReceivedReqs'),
onSimplerSentRequests, onSimplerSentRequests,
onBio, onBio,
onSeedBackup onSeedBackup

View file

@ -0,0 +1,144 @@
/** @format */
const Events = require('./index')
const Key = require('../key')
const Schema = require('../schema')
const Streams = require('../streams')
/**
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
* @typedef {import('../SimpleGUN').ISEA} ISEA
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
* @typedef {import('../schema').HandshakeRequest} HandshakeRequest
* @typedef {import('../schema').Message} Message
* @typedef {import('../schema').Outgoing} Outgoing
* @typedef {import('../schema').PartialOutgoing} PartialOutgoing
* @typedef {import('../schema').Chat} Chat
* @typedef {import('../schema').ChatMessage} ChatMessage
* @typedef {import('../schema').SimpleSentRequest} SimpleSentRequest
* @typedef {import('../schema').SimpleReceivedRequest} SimpleReceivedRequest
*/
/**
* @typedef {(chats: SimpleReceivedRequest[]) => void} Listener
*/
const listeners = new Set()
/** @type {Streams.Avatars} */
let pubToAvatar = {}
/** @type {Streams.DisplayNames} */
let pubToDn = {}
/** @type {Streams.Incomings} */
let pubToIncoming = {}
/** @type {SimpleReceivedRequest[]} */
let currentReqs = []
/** @type {string|null} */
let currentAddress = null
/** @type {Record<string, HandshakeRequest>} */
let currentNode = {}
const react = () => {
/** @type {SimpleReceivedRequest[]} */
const finalReqs = []
for (const [id, req] of Object.entries(currentNode)) {
const notAccepted = typeof pubToIncoming[req.from] === 'undefined'
if (notAccepted) {
finalReqs.push({
id,
requestorAvatar: pubToAvatar[req.from] || null,
requestorDisplayName: pubToDn[req.from] || null,
requestorPK: req.from,
response: req.response,
timestamp: req.timestamp
})
}
}
currentReqs = finalReqs
listeners.forEach(l => l(currentReqs))
}
/**
*
* @param {string} addr
* @returns {(data: import('../SimpleGUN').OpenListenerData) => void}
*/
const listenerForAddr = addr => data => {
if (addr !== currentAddress) {
return
}
if (typeof data === 'object' && data !== null) {
for (const [id, req] of Object.entries(data)) {
if (!Schema.isHandshakeRequest(req)) {
return
}
currentNode[id] = req
}
react()
}
}
let subbed = false
/**
* Massages all of the more primitive data structures into a more manageable
* 'Chat' paradigm.
* @param {Listener} cb
* @returns {() => void}
*/
const onReceivedReqs = cb => {
listeners.add(cb)
if (!subbed) {
Events.onCurrentHandshakeAddress(addr => {
if (currentAddress !== addr) {
currentAddress = addr
currentNode = {}
if (typeof addr === 'string') {
require('../../Mediator')
.getGun()
.get(Key.HANDSHAKE_NODES)
.get(addr)
.open(listenerForAddr(addr))
}
react()
}
}, require('../../Mediator').getUser())
Streams.onAvatar(pta => {
pubToAvatar = pta
react()
})
Streams.onDisplayName(ptd => {
pubToDn = ptd
react()
})
Streams.onIncoming(pti => {
pubToIncoming = pti
react()
})
subbed = true
}
cb(currentReqs)
return () => {
listeners.delete(cb)
}
}
module.exports = onReceivedReqs