new events and streams

This commit is contained in:
Daniel Lugo 2020-01-29 18:29:21 -04:00
parent 8d28395e77
commit c2c63f9576
6 changed files with 269 additions and 325 deletions

View file

@ -474,10 +474,7 @@ class Mediator {
})
}),
300
),
gun,
user,
mySEA
)
)
} catch (err) {
console.log(err)
@ -593,10 +590,7 @@ class Mediator {
})
}),
350
),
gun,
user,
mySEA
)
)
} catch (err) {
if (Config.SHOW_LOG) {
@ -835,24 +829,19 @@ class Mediator {
await throwOnInvalidToken(token)
API.Events.onChats(
chats => {
if (Config.SHOW_LOG) {
console.log('---chats---')
console.log(chats)
console.log('-----------------------')
}
API.Events.onChats(chats => {
if (Config.SHOW_LOG) {
console.log('---chats---')
console.log(chats)
console.log('-----------------------')
}
this.socket.emit(Event.ON_CHATS, {
msg: chats,
ok: true,
origBody: body
})
},
gun,
user,
mySEA
)
this.socket.emit(Event.ON_CHATS, {
msg: chats,
ok: true,
origBody: body
})
})
} catch (err) {
console.log(err)
this.socket.emit(Event.ON_CHATS, {
@ -936,24 +925,19 @@ class Mediator {
await throwOnInvalidToken(token)
API.Events.onSimplerReceivedRequests(
receivedRequests => {
if (Config.SHOW_LOG) {
console.log('---receivedRequests---')
console.log(receivedRequests)
console.log('-----------------------')
}
API.Events.onSimplerReceivedRequests(receivedRequests => {
if (Config.SHOW_LOG) {
console.log('---receivedRequests---')
console.log(receivedRequests)
console.log('-----------------------')
}
this.socket.emit(Event.ON_RECEIVED_REQUESTS, {
msg: receivedRequests,
ok: true,
origBody: body
})
},
gun,
user,
mySEA
)
this.socket.emit(Event.ON_RECEIVED_REQUESTS, {
msg: receivedRequests,
ok: true,
origBody: body
})
})
} catch (err) {
console.log(err)
this.socket.emit(Event.ON_RECEIVED_REQUESTS, {
@ -973,24 +957,19 @@ class Mediator {
await throwOnInvalidToken(token)
await API.Events.onSimplerSentRequests(
sentRequests => {
if (Config.SHOW_LOG) {
console.log('---sentRequests---')
console.log(sentRequests)
console.log('-----------------------')
}
await API.Events.onSimplerSentRequests(sentRequests => {
if (Config.SHOW_LOG) {
console.log('---sentRequests---')
console.log(sentRequests)
console.log('-----------------------')
}
this.socket.emit(Event.ON_SENT_REQUESTS, {
msg: sentRequests,
ok: true,
origBody: body
})
},
gun,
user,
mySEA
)
this.socket.emit(Event.ON_SENT_REQUESTS, {
msg: sentRequests,
ok: true,
origBody: body
})
})
} catch (err) {
console.log(err)
this.socket.emit(Event.ON_SENT_REQUESTS, {

View file

@ -441,6 +441,7 @@ let pubToDn = {}
/** @type {Streams.Incomings} */
let pubToIncoming = {}
/**
* @typedef {(chats: Chat[]) => void} ChatsListener
*/
@ -539,256 +540,6 @@ const onChats = cb => {
}
}
/**
* @param {(sentRequests: SimpleSentRequest[]) => void} cb
* @param {GUNNode} gun
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @returns {Promise<void>}
*/
const onSimplerSentRequests = async (cb, gun, user, SEA) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
/**
* @type {Record<string, Schema.StoredRequest>}
*/
const storedReqs = {}
/**
* @type {Partial<Record<string, string|null>>}
*/
const recipientToAvatar = {}
/**
* @type {Partial<Record<string, string|null>>}
*/
const recipientToDisplayName = {}
/**
* @type {Partial<Record<string, string|null>>}
*/
const recipientToCurrentHandshakeAddress = {}
/**
* @type {Record<string, SimpleSentRequest>}
*/
const simpleSentRequests = {}
/**
* Keep track of recipients that already have listeners for their avatars.
* @type {string[]}
*/
const recipientsWithAvatarListener = []
/**
* Keep track of recipients that already have listeners for their display
* name.
* @type {string[]}
*/
const recipientsWithDisplayNameListener = []
/**
* Keep track of recipients that already have listeners for their current
* handshake node.
* @type {string[]}
*/
const recipientsWithCurrentHandshakeAddressListener = []
const mySecret = await SEA.secret(user._.sea.epub, user._.sea)
if (typeof mySecret !== 'string') {
throw new TypeError("typeof mySecret !== 'string'")
}
const callCB = debounce(async () => {
try {
console.log('\n')
console.log('------simplerSentRequests: rawRequests ------')
console.log(storedReqs)
console.log('\n')
const entries = Object.entries(storedReqs)
/** @type {Promise<null|SimpleSentRequest>[]} */
const promises = entries.map(([, storedReq]) =>
(async () => {
const recipientPub = await SEA.decrypt(
storedReq.recipientPub,
mySecret
)
if (typeof recipientPub !== 'string') {
throw new TypeError()
}
const requestAddress = await SEA.decrypt(
storedReq.handshakeAddress,
mySecret
)
if (typeof requestAddress !== 'string') {
throw new TypeError()
}
const sentReqID = await SEA.decrypt(storedReq.sentReqID, mySecret)
if (typeof sentReqID !== 'string') {
throw new TypeError()
}
/** @type {Schema.HandshakeRequest} */
const sentReq = await Utils.tryAndWait(async gun => {
const data = await gun
.get(Key.HANDSHAKE_NODES)
.get(requestAddress)
.get(sentReqID)
.then()
if (Schema.isHandshakeRequest(data)) {
return data
}
throw new TypeError('sent req not a handshake request')
})
const latestReqIDForRecipient = await Utils.recipientPubToLastReqSentID(
recipientPub
)
if (
await Utils.reqWasAccepted(
sentReq.response,
recipientPub,
user,
SEA
)
) {
return null
}
if (
!recipientsWithCurrentHandshakeAddressListener.includes(
recipientPub
)
) {
recipientsWithCurrentHandshakeAddressListener.push(recipientPub)
gun
.user(recipientPub)
.get(Key.CURRENT_HANDSHAKE_ADDRESS)
.on(addr => {
if (typeof addr !== 'string') {
console.log(
"onSimplerSentRequests() -> typeof addr !== 'string'"
)
return
}
recipientToCurrentHandshakeAddress[recipientPub] = addr
callCB()
})
}
if (!recipientsWithAvatarListener.includes(recipientPub)) {
recipientsWithAvatarListener.push(recipientPub)
gun
.user(recipientPub)
.get(Key.PROFILE)
.get(Key.AVATAR)
.on(avatar => {
if (typeof avatar === 'string' || avatar === null) {
recipientToAvatar[recipientPub] = avatar
callCB()
}
})
}
if (!recipientsWithDisplayNameListener.includes(recipientPub)) {
recipientsWithDisplayNameListener.push(recipientPub)
gun
.user(recipientPub)
.get(Key.PROFILE)
.get(Key.DISPLAY_NAME)
.on(displayName => {
if (typeof displayName === 'string' || displayName === null) {
recipientToDisplayName[recipientPub] = displayName
callCB()
}
})
}
const isStaleRequest = latestReqIDForRecipient !== sentReqID
if (isStaleRequest) {
return null
}
/**
* @type {SimpleSentRequest}
*/
const res = {
id: sentReqID,
recipientAvatar: recipientToAvatar[recipientPub] || null,
recipientChangedRequestAddress: false,
recipientDisplayName:
recipientToDisplayName[recipientPub] ||
Utils.defaultName(recipientPub),
recipientPublicKey: recipientPub,
timestamp: sentReq.timestamp
}
return res
})()
)
const reqsOrNulls = await Promise.all(promises)
console.log('\n')
console.log('------simplerSentRequests: reqsOrNulls ------')
console.log(reqsOrNulls)
console.log('\n')
/** @type {SimpleSentRequest[]} */
// @ts-ignore
const reqs = reqsOrNulls.filter(item => item !== null)
for (const req of reqs) {
simpleSentRequests[req.id] = req
}
} catch (err) {
console.log(`onSimplerSentRequests() -> callCB() -> ${err.message}`)
} finally {
cb(Object.values(simpleSentRequests))
}
}, DEBOUNCE_WAIT_TIME)
callCB()
// force a refresh when a request is accepted
user.get(Key.USER_TO_INCOMING).on(() => {
callCB()
})
user
.get(Key.STORED_REQS)
.map()
.on((sentRequest, sentRequestID) => {
try {
if (!Schema.isStoredRequest(sentRequest)) {
console.log('\n')
console.log(
'------simplerSentRequests: !Schema.isHandshakeRequest(sentRequest) ------'
)
console.log(sentRequest)
console.log('\n')
return
}
storedReqs[sentRequestID] = sentRequest
callCB()
} catch (err) {
console.log(
`onSimplerSentRequests() -> sentRequestID: ${sentRequestID} -> ${err.message}`
)
}
})
}
/** @type {string|null} */
let currentBio = null
@ -854,7 +605,8 @@ module.exports = {
onOutgoing,
onChats,
onSimplerReceivedRequests: require('./onReceivedReqs'),
onSimplerSentRequests,
onSimplerSentRequests: require('./onSentReqs').onSentReqs,
getCurrentSentReqs: require('./onSentReqs').getCurrentSentReqs,
onBio,
onSeedBackup
}

View file

@ -0,0 +1,103 @@
/** @format */
const Streams = require('../streams')
/**
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
* @typedef {import('../SimpleGUN').ISEA} ISEA
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
* @typedef {import('../schema').HandshakeRequest} HandshakeRequest
* @typedef {import('../schema').Message} Message
* @typedef {import('../schema').Outgoing} Outgoing
* @typedef {import('../schema').PartialOutgoing} PartialOutgoing
* @typedef {import('../schema').Chat} Chat
* @typedef {import('../schema').ChatMessage} ChatMessage
* @typedef {import('../schema').SimpleSentRequest} SimpleSentRequest
* @typedef {import('../schema').SimpleReceivedRequest} SimpleReceivedRequest
*/
/**
* @typedef {(chats: SimpleSentRequest[]) => void} Listener
*/
/** @type {Set<Listener>} */
const listeners = new Set()
/** @type {SimpleSentRequest[]} */
let currentReqs = []
const getCurrentSentReqs = () => currentReqs
const react = () => {
/** @type {SimpleSentRequest[]} */
const finalSentReqs = []
const pubToHAddr = Streams.getAddresses()
const storedReqs = Streams.getStoredReqs()
const pubToLastSentReqID = Streams.getSentReqIDs()
const pubToIncoming = Streams.getPubToIncoming()
const pubToAvatar = Streams.getPubToAvatar()
const pubToDN = Streams.getPubToDn()
for (const storedReq of storedReqs) {
const { handshakeAddress, recipientPub, sentReqID, timestamp } = storedReq
const currAddress = pubToHAddr[recipientPub]
const lastReqID = pubToLastSentReqID[recipientPub]
const isStale = typeof lastReqID !== 'undefined' && lastReqID !== sentReqID
const aHandshakeWasEstablishedAtSomePoint =
typeof pubToIncoming[recipientPub] !== 'undefined'
if (isStale && aHandshakeWasEstablishedAtSomePoint) {
// eslint-disable-next-line no-continue
continue
}
finalSentReqs.push({
id: sentReqID,
recipientAvatar: pubToAvatar[recipientPub] || null,
recipientChangedRequestAddress:
typeof currAddress && handshakeAddress !== currAddress,
recipientDisplayName: pubToDN[recipientPub] || null,
recipientPublicKey: recipientPub,
timestamp
})
}
currentReqs = finalSentReqs
listeners.forEach(l => l(currentReqs))
}
let subbed = false
/**
* Massages all of the more primitive data structures into a more manageable
* 'Chat' paradigm.
* @param {Listener} cb
* @returns {() => void}
*/
const onSentReqs = cb => {
listeners.add(cb)
if (!subbed) {
Streams.onAddresses(react)
Streams.onStoredReqs(react)
Streams.onLastSentReqIDs(react)
Streams.onIncoming(react)
Streams.onAvatar(react)
Streams.onDisplayName(react)
subbed = true
}
cb(currentReqs)
return () => {
listeners.delete(cb)
}
}
module.exports = {
onSentReqs,
getCurrentSentReqs
}

View file

@ -0,0 +1,48 @@
/** @format */
const Key = require('../key')
/**
* @typedef {Record<string, string|null|undefined>} Addresses
*/
/** @type {Addresses} */
const pubToAddress = {}
/** @type {Set<() => void>} */
const listeners = new Set()
const notify = () => listeners.forEach(l => l())
/** @type {Set<string>} */
const subbedPublicKeys = new Set()
/**
* @param {() => void} cb
* @param {string=} pub
*/
const onAddresses = (cb, pub) => {
listeners.add(cb)
cb()
if (pub && subbedPublicKeys.add(pub)) {
require('../../Mediator')
.getGun()
.user(pub)
.get(Key.CURRENT_HANDSHAKE_ADDRESS)
.on(addr => {
if (typeof addr === 'string' || addr === null) {
pubToAddress[pub] = addr
} else {
pubToAddress[pub] = null
}
notify()
})
}
return () => {
listeners.delete(cb)
}
}
const getAddresses = () => pubToAddress
module.exports = {
onAddresses,
getAddresses
}

View file

@ -1,8 +1,8 @@
/** @format */
const { INITIAL_MSG } = require('./actions')
const Key = require('./key')
const Schema = require('./schema')
const Utils = require('./utils')
const { INITIAL_MSG } = require('../actions')
const Key = require('../key')
const Schema = require('../schema')
const Utils = require('../utils')
/**
* @typedef {Record<string, string|null|undefined>} Avatars
* @typedef {(avatars: Avatars) => void} AvatarListener
@ -11,6 +11,8 @@ const Utils = require('./utils')
/** @type {Avatars} */
const pubToAvatar = {}
const getPubToAvatar = () => pubToAvatar
/** @type {Set<AvatarListener>} */
const avatarListeners = new Set()
@ -29,7 +31,7 @@ const onAvatar = (cb, pub) => {
avatarListeners.add(cb)
cb(pubToAvatar)
if (pub && pubsWithAvatarListeners.add(pub)) {
require('../Mediator')
require('../../Mediator')
.getGun()
.user(pub)
.get(Key.PROFILE)
@ -56,6 +58,8 @@ const onAvatar = (cb, pub) => {
/** @type {DisplayNames} */
const pubToDisplayName = {}
const getPubToDn = () => pubToDisplayName
/** @type {Set<DisplayNameListener>} */
const displayNameListeners = new Set()
@ -74,7 +78,7 @@ const onDisplayName = (cb, pub) => {
displayNameListeners.add(cb)
cb(pubToDisplayName)
if (pub && pubsWithDisplayNameListeners.add(pub)) {
require('../Mediator')
require('../../Mediator')
.getGun()
.user(pub)
.get(Key.PROFILE)
@ -94,7 +98,7 @@ const onDisplayName = (cb, pub) => {
}
/**
* @typedef {import('./schema').ChatMessage[]} Message
* @typedef {import('../schema').ChatMessage[]} Message
* @typedef {Record<string, Message|null>} Incomings
* @typedef {(incomings: Incomings) => void} IncomingsListener
*/
@ -104,6 +108,8 @@ const onDisplayName = (cb, pub) => {
*/
const pubToIncoming = {}
const getPubToIncoming = () => pubToIncoming
/** @type {Set<IncomingsListener>} */
const incomingsListeners = new Set()
@ -120,8 +126,8 @@ const pubFeedPairsWithIncomingListeners = new Set()
const onIncoming = cb => {
incomingsListeners.add(cb)
const user = require('../Mediator').getUser()
const SEA = require('../Mediator').mySEA
const user = require('../../Mediator').getUser()
const SEA = require('../../Mediator').mySEA
user.get(Key.USER_TO_INCOMING).open(uti => {
if (typeof uti !== 'object' || uti === null) {
@ -139,7 +145,7 @@ const onIncoming = cb => {
user._.sea
)
require('../Mediator')
require('../../Mediator')
.getGun()
.user(pub)
.get(Key.OUTGOINGS)
@ -206,7 +212,7 @@ const onIncoming = cb => {
}
/**
* @typedef {import('./schema').StoredRequest} StoredRequest
* @typedef {import('../schema').StoredRequest} StoredRequest
* @typedef {(reqs: StoredRequest[]) => void} StoredRequestsListener
*/
@ -229,7 +235,7 @@ const processStoredReqs = async () => {
const ereqs = encryptedStoredReqs
encryptedStoredReqs = []
const mySecret = await Utils.mySecret()
const SEA = require('../Mediator').mySEA
const SEA = require('../../Mediator').mySEA
const finalReqs = await Utils.asyncMap(ereqs, async er => {
/** @type {StoredRequest} */
const r = {
@ -248,14 +254,13 @@ const processStoredReqs = async () => {
let subbed = false
/**
*
* @param {StoredRequestsListener} cb
*/
const onStoredReqs = cb => {
storedRequestsListeners.add(cb)
if (!subbed) {
require('../Mediator')
require('../../Mediator')
.getUser()
.get(Key.STORED_REQS)
.open(d => {
@ -280,8 +285,15 @@ const onStoredReqs = cb => {
module.exports = {
onAvatar,
getPubToAvatar,
onDisplayName,
getPubToDn,
onIncoming,
getPubToIncoming,
onStoredReqs,
getStoredReqs
getStoredReqs,
onAddresses: require('./addresses').onAddresses,
getAddresses: require('./addresses').getAddresses,
onLastSentReqIDs: require('./lastSentReqID').onLastSentReqIDs,
getSentReqIDs: require('./lastSentReqID').getSentReqIDs
}

View file

@ -0,0 +1,50 @@
/** @format */
const Key = require('../key')
/** @type {Record<string, string|null|undefined>} */
let pubToLastSentReqID = {}
/** @type {Set<() => void>} */
const listeners = new Set()
const notify = () => listeners.forEach(l => l())
let subbed = false
/**
* @param {() => void} cb
*/
const onLastSentReqIDs = cb => {
listeners.add(cb)
cb()
if (!subbed) {
require('../../Mediator')
.getUser()
.get(Key.USER_TO_LAST_REQUEST_SENT)
.open(data => {
if (typeof data === 'object' && data !== null) {
for (const [pub, id] of Object.entries(data)) {
if (typeof id === 'string' || id === null) {
pubToLastSentReqID[pub] = id
}
}
} else {
pubToLastSentReqID = {}
}
notify()
})
subbed = true
}
return () => {
listeners.delete(cb)
}
}
const getSentReqIDs = () => pubToLastSentReqID
module.exports = {
onLastSentReqIDs,
getSentReqIDs
}