Remove outdated gun events
This commit is contained in:
parent
b87bcb6bcb
commit
0681464817
4 changed files with 5 additions and 691 deletions
|
|
@ -394,8 +394,6 @@ const authenticate = async (alias, pass, __user) => {
|
||||||
API.Jobs.onOrders(_user, gun, mySEA)
|
API.Jobs.onOrders(_user, gun, mySEA)
|
||||||
API.Jobs.lastSeenNode(_user)
|
API.Jobs.lastSeenNode(_user)
|
||||||
|
|
||||||
API.Events.onCurrentHandshakeAddress(() => {}, user)()
|
|
||||||
// API.Events.onOutgoing(() => {})()
|
|
||||||
API.Events.onSeedBackup(() => {}, user, mySEA)
|
API.Events.onSeedBackup(() => {}, user, mySEA)
|
||||||
|
|
||||||
return _user._.sea.pub
|
return _user._.sea.pub
|
||||||
|
|
@ -452,7 +450,6 @@ const authenticate = async (alias, pass, __user) => {
|
||||||
API.Jobs.onOrders(_user, gun, mySEA)
|
API.Jobs.onOrders(_user, gun, mySEA)
|
||||||
API.Jobs.lastSeenNode(_user)
|
API.Jobs.lastSeenNode(_user)
|
||||||
|
|
||||||
API.Events.onCurrentHandshakeAddress(() => {}, user)()
|
|
||||||
API.Events.onSeedBackup(() => {}, user, mySEA)
|
API.Events.onSeedBackup(() => {}, user, mySEA)
|
||||||
|
|
||||||
return ack.sea.pub
|
return ack.sea.pub
|
||||||
|
|
|
||||||
|
|
@ -2,427 +2,22 @@
|
||||||
* @prettier
|
* @prettier
|
||||||
*/
|
*/
|
||||||
const debounce = require('lodash/debounce')
|
const debounce = require('lodash/debounce')
|
||||||
const logger = require('winston')
|
|
||||||
const {
|
const {
|
||||||
Constants: { ErrorCode },
|
Constants: { ErrorCode }
|
||||||
Schema,
|
|
||||||
Utils: CommonUtils
|
|
||||||
} = require('shock-common')
|
} = require('shock-common')
|
||||||
|
|
||||||
const Key = require('../key')
|
const Key = require('../key')
|
||||||
const Utils = require('../utils')
|
|
||||||
/**
|
|
||||||
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
|
|
||||||
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
|
|
||||||
* @typedef {import('../SimpleGUN').ISEA} ISEA
|
|
||||||
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
|
|
||||||
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
|
|
||||||
* @typedef {import('shock-common').Schema.Message} Message
|
|
||||||
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
|
|
||||||
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
|
|
||||||
* @typedef {import('shock-common').Schema.Chat} Chat
|
|
||||||
* @typedef {import('shock-common').Schema.ChatMessage} ChatMessage
|
|
||||||
* @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest
|
|
||||||
* @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest
|
|
||||||
*/
|
|
||||||
|
|
||||||
const DEBOUNCE_WAIT_TIME = 500
|
const DEBOUNCE_WAIT_TIME = 500
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {(userToIncoming: Record<string, string>) => void} cb
|
|
||||||
* @param {UserGUNNode} user Pass only for testing purposes.
|
|
||||||
* @param {ISEA} SEA
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
const __onUserToIncoming = (cb, user, SEA) => {
|
|
||||||
if (!user.is) {
|
|
||||||
throw new Error(ErrorCode.NOT_AUTH)
|
|
||||||
}
|
|
||||||
|
|
||||||
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
|
||||||
|
|
||||||
/** @type {Record<string, string>} */
|
|
||||||
const userToIncoming = {}
|
|
||||||
|
|
||||||
const mySecret = require('../../Mediator').getMySecret()
|
|
||||||
|
|
||||||
user
|
|
||||||
.get(Key.USER_TO_INCOMING)
|
|
||||||
.map()
|
|
||||||
.on(async (encryptedIncomingID, userPub) => {
|
|
||||||
if (typeof encryptedIncomingID !== 'string') {
|
|
||||||
if (encryptedIncomingID === null) {
|
|
||||||
// on disconnect
|
|
||||||
delete userToIncoming[userPub]
|
|
||||||
} else {
|
|
||||||
logger.error(
|
|
||||||
'got a non string non null value inside user to incoming'
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (encryptedIncomingID.length === 0) {
|
|
||||||
logger.error('got an empty string value')
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const incomingID = await SEA.decrypt(encryptedIncomingID, mySecret)
|
|
||||||
|
|
||||||
if (typeof incomingID === 'undefined') {
|
|
||||||
logger.warn('could not decrypt incomingID inside __onUserToIncoming')
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
userToIncoming[userPub] = incomingID
|
|
||||||
|
|
||||||
callb(userToIncoming)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @type {Set<(addr: string|null) => void>} */
|
|
||||||
const addressListeners = new Set()
|
|
||||||
|
|
||||||
/** @type {string|null} */
|
|
||||||
let currentAddress = null
|
|
||||||
|
|
||||||
const getHandshakeAddress = () => currentAddress
|
|
||||||
|
|
||||||
/** @param {string|null} addr */
|
|
||||||
const setAddress = addr => {
|
|
||||||
currentAddress = addr
|
|
||||||
addressListeners.forEach(l => l(currentAddress))
|
|
||||||
}
|
|
||||||
|
|
||||||
let addrSubbed = false
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {(currentHandshakeAddress: string|null) => void} cb
|
|
||||||
* @param {UserGUNNode} user
|
|
||||||
* @returns {() => void}
|
|
||||||
*/
|
|
||||||
const onCurrentHandshakeAddress = (cb, user) => {
|
|
||||||
if (!user.is) {
|
|
||||||
throw new Error(ErrorCode.NOT_AUTH)
|
|
||||||
}
|
|
||||||
|
|
||||||
addressListeners.add(cb)
|
|
||||||
|
|
||||||
cb(currentAddress)
|
|
||||||
|
|
||||||
if (!addrSubbed) {
|
|
||||||
addrSubbed = true
|
|
||||||
|
|
||||||
user.get(Key.CURRENT_HANDSHAKE_ADDRESS).on(addr => {
|
|
||||||
if (typeof addr !== 'string') {
|
|
||||||
logger.error('expected handshake address to be an string')
|
|
||||||
|
|
||||||
setAddress(null)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
setAddress(addr)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
addressListeners.delete(cb)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {(messages: Record<string, Message>) => void} cb
|
|
||||||
* @param {string} userPK Public key of the user from whom the incoming
|
|
||||||
* messages will be obtained.
|
|
||||||
* @param {string} incomingFeedID ID of the outgoing feed from which the
|
|
||||||
* incoming messages will be obtained.
|
|
||||||
* @param {GUNNode} gun (Pass only for testing purposes)
|
|
||||||
* @param {UserGUNNode} user
|
|
||||||
* @param {ISEA} SEA
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
const onIncomingMessages = (cb, userPK, incomingFeedID, gun, user, SEA) => {
|
|
||||||
if (!user.is) {
|
|
||||||
throw new Error(ErrorCode.NOT_AUTH)
|
|
||||||
}
|
|
||||||
|
|
||||||
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
|
||||||
|
|
||||||
const otherUser = gun.user(userPK)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @type {Record<string, Message>}
|
|
||||||
*/
|
|
||||||
const messages = {}
|
|
||||||
|
|
||||||
callb(messages)
|
|
||||||
|
|
||||||
otherUser
|
|
||||||
.get(Key.OUTGOINGS)
|
|
||||||
.get(incomingFeedID)
|
|
||||||
.get(Key.MESSAGES)
|
|
||||||
.map()
|
|
||||||
.on(async (data, key) => {
|
|
||||||
if (!Schema.isMessage(data)) {
|
|
||||||
logger.warn('non-message received')
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @type {string} */
|
|
||||||
const recipientEpub = await Utils.pubToEpub(userPK)
|
|
||||||
|
|
||||||
const secret = await SEA.secret(recipientEpub, user._.sea)
|
|
||||||
|
|
||||||
let { body } = data
|
|
||||||
body = await SEA.decrypt(body, secret)
|
|
||||||
|
|
||||||
messages[key] = {
|
|
||||||
body,
|
|
||||||
timestamp: data.timestamp
|
|
||||||
}
|
|
||||||
|
|
||||||
callb(messages)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {Record<string, Outgoing|null>} Outgoings
|
|
||||||
* @typedef {(outgoings: Outgoings) => void} OutgoingsListener
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @type {Outgoings}
|
|
||||||
*/
|
|
||||||
let currentOutgoings = {}
|
|
||||||
|
|
||||||
const getCurrentOutgoings = () => currentOutgoings
|
|
||||||
|
|
||||||
/** @type {Set<OutgoingsListener>} */
|
|
||||||
const outgoingsListeners = new Set()
|
|
||||||
|
|
||||||
outgoingsListeners.add(o => {
|
|
||||||
const values = Object.values(o)
|
|
||||||
const nulls = values.filter(x => x === null).length
|
|
||||||
const nonNulls = values.length - nulls
|
|
||||||
|
|
||||||
logger.info(`new outgoings, ${nulls} nulls and ${nonNulls} nonNulls`)
|
|
||||||
})
|
|
||||||
|
|
||||||
const notifyOutgoingsListeners = () => {
|
|
||||||
outgoingsListeners.forEach(l => l(currentOutgoings))
|
|
||||||
}
|
|
||||||
|
|
||||||
let outSubbed = false
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {OutgoingsListener} cb
|
|
||||||
* @returns {() => void}
|
|
||||||
*/
|
|
||||||
const onOutgoing = cb => {
|
|
||||||
outgoingsListeners.add(cb)
|
|
||||||
cb(currentOutgoings)
|
|
||||||
|
|
||||||
if (!outSubbed) {
|
|
||||||
const user = require('../../Mediator').getUser()
|
|
||||||
user.get(Key.OUTGOINGS).open(
|
|
||||||
debounce(async data => {
|
|
||||||
try {
|
|
||||||
if (typeof data !== 'object' || data === null) {
|
|
||||||
currentOutgoings = {}
|
|
||||||
notifyOutgoingsListeners()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @type {Record<string, Outgoing|null>} */
|
|
||||||
const newOuts = {}
|
|
||||||
|
|
||||||
const SEA = require('../../Mediator').mySEA
|
|
||||||
const mySecret = await Utils.mySecret()
|
|
||||||
|
|
||||||
await CommonUtils.asyncForEach(
|
|
||||||
Object.entries(data),
|
|
||||||
async ([id, out]) => {
|
|
||||||
if (typeof out !== 'object') {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (out === null) {
|
|
||||||
newOuts[id] = null
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const { with: encPub, messages } = out
|
|
||||||
|
|
||||||
if (typeof encPub !== 'string') {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const pub = await SEA.decrypt(encPub, mySecret)
|
|
||||||
|
|
||||||
if (!newOuts[id]) {
|
|
||||||
newOuts[id] = {
|
|
||||||
with: pub,
|
|
||||||
messages: {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const ourSec = await SEA.secret(
|
|
||||||
await Utils.pubToEpub(pub),
|
|
||||||
user._.sea
|
|
||||||
)
|
|
||||||
|
|
||||||
if (typeof messages === 'object' && messages !== null) {
|
|
||||||
await CommonUtils.asyncForEach(
|
|
||||||
Object.entries(messages),
|
|
||||||
async ([mid, msg]) => {
|
|
||||||
if (typeof msg === 'object' && msg !== null) {
|
|
||||||
if (
|
|
||||||
typeof msg.body === 'string' &&
|
|
||||||
typeof msg.timestamp === 'number'
|
|
||||||
) {
|
|
||||||
const newOut = newOuts[id]
|
|
||||||
if (!newOut) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
newOut.messages[mid] = {
|
|
||||||
body: await SEA.decrypt(msg.body, ourSec),
|
|
||||||
timestamp: msg.timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
currentOutgoings = newOuts
|
|
||||||
notifyOutgoingsListeners()
|
|
||||||
} catch (e) {
|
|
||||||
logger.info('--------------------------')
|
|
||||||
logger.info('Events -> onOutgoing')
|
|
||||||
logger.info(e)
|
|
||||||
logger.info('--------------------------')
|
|
||||||
}
|
|
||||||
}, 400)
|
|
||||||
)
|
|
||||||
|
|
||||||
outSubbed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
outgoingsListeners.delete(cb)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/**
|
|
||||||
* @typedef {(chats: Chat[]) => void} ChatsListener
|
|
||||||
*/
|
|
||||||
|
|
||||||
/** @type {Chat[]} */
|
|
||||||
let currentChats = []
|
|
||||||
|
|
||||||
const getChats = () => currentChats
|
|
||||||
|
|
||||||
/** @type {Set<ChatsListener>} */
|
|
||||||
const chatsListeners = new Set()
|
|
||||||
|
|
||||||
chatsListeners.add(c => {
|
|
||||||
logger.info(`Chats: ${c.length}`)
|
|
||||||
})
|
|
||||||
|
|
||||||
const notifyChatsListeners = () => {
|
|
||||||
chatsListeners.forEach(l => l(currentChats))
|
|
||||||
}
|
|
||||||
|
|
||||||
const processChats = debounce(() => {
|
|
||||||
const Streams = require('../streams')
|
|
||||||
const currentOutgoings = getCurrentOutgoings()
|
|
||||||
const existingOutgoings = /** @type {[string, Outgoing][]} */ (Object.entries(
|
|
||||||
currentOutgoings
|
|
||||||
).filter(([_, o]) => o !== null))
|
|
||||||
const pubToFeed = Streams.getPubToFeed()
|
|
||||||
|
|
||||||
/** @type {Chat[]} */
|
|
||||||
const newChats = []
|
|
||||||
|
|
||||||
for (const [outID, out] of existingOutgoings) {
|
|
||||||
/** @type {ChatMessage[]} */
|
|
||||||
let msgs = Object.entries(out.messages)
|
|
||||||
.map(([mid, m]) => ({
|
|
||||||
id: mid,
|
|
||||||
outgoing: true,
|
|
||||||
body: m.body,
|
|
||||||
timestamp: m.timestamp
|
|
||||||
}))
|
|
||||||
// filter out null messages
|
|
||||||
.filter(m => typeof m.body === 'string')
|
|
||||||
|
|
||||||
const incoming = pubToFeed[out.with]
|
|
||||||
|
|
||||||
if (Array.isArray(incoming)) {
|
|
||||||
msgs = [...msgs, ...incoming]
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @type {Chat} */
|
|
||||||
const chat = {
|
|
||||||
recipientPublicKey: out.with,
|
|
||||||
didDisconnect: pubToFeed[out.with] === 'disconnected',
|
|
||||||
id: out.with + outID,
|
|
||||||
messages: msgs,
|
|
||||||
recipientAvatar: null,
|
|
||||||
recipientDisplayName: null,
|
|
||||||
lastSeenApp: null
|
|
||||||
}
|
|
||||||
|
|
||||||
newChats.push(chat)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentChats = newChats.filter(
|
|
||||||
c =>
|
|
||||||
Array.isArray(pubToFeed[c.recipientPublicKey]) ||
|
|
||||||
pubToFeed[c.recipientPublicKey] === 'disconnected'
|
|
||||||
)
|
|
||||||
|
|
||||||
notifyChatsListeners()
|
|
||||||
}, 750)
|
|
||||||
|
|
||||||
let onChatsSubbed = false
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Massages all of the more primitive data structures into a more manageable
|
|
||||||
* 'Chat' paradigm.
|
|
||||||
* @param {ChatsListener} cb
|
|
||||||
* @returns {() => void}
|
|
||||||
*/
|
|
||||||
const onChats = cb => {
|
|
||||||
if (!chatsListeners.add(cb)) {
|
|
||||||
throw new Error('Tried to subscribe twice')
|
|
||||||
}
|
|
||||||
cb(currentChats)
|
|
||||||
|
|
||||||
if (!onChatsSubbed) {
|
|
||||||
const Streams = require('../streams')
|
|
||||||
// onOutgoing(processChats)
|
|
||||||
Streams.onPubToFeed(processChats)
|
|
||||||
onChatsSubbed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
if (!chatsListeners.delete(cb)) {
|
|
||||||
throw new Error('Tried to unsubscribe twice')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @type {string|null} */
|
/** @type {string|null} */
|
||||||
let currentSeedBackup = null
|
let currentSeedBackup = null
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {(seedBackup: string|null) => void} cb
|
* @param {(seedBackup: string|null) => void} cb
|
||||||
* @param {UserGUNNode} user
|
* @param {import('../SimpleGUN').UserGUNNode} user
|
||||||
* @param {ISEA} SEA
|
* @param {import('../SimpleGUN').ISEA} SEA
|
||||||
* @throws {Error} If user hasn't been auth.
|
* @throws {Error} If user hasn't been auth.
|
||||||
* @returns {void}
|
* @returns {void}
|
||||||
*/
|
*/
|
||||||
|
|
@ -445,17 +40,5 @@ const onSeedBackup = (cb, user, SEA) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
__onUserToIncoming,
|
onSeedBackup
|
||||||
onCurrentHandshakeAddress,
|
|
||||||
onIncomingMessages,
|
|
||||||
onOutgoing,
|
|
||||||
getCurrentOutgoings,
|
|
||||||
onSimplerReceivedRequests: require('./onReceivedReqs').onReceivedReqs,
|
|
||||||
onSimplerSentRequests: require('./onSentReqs').onSentReqs,
|
|
||||||
getCurrentSentReqs: require('./onSentReqs').getCurrentSentReqs,
|
|
||||||
getCurrentReceivedReqs: require('./onReceivedReqs').getReceivedReqs,
|
|
||||||
onSeedBackup,
|
|
||||||
onChats,
|
|
||||||
getHandshakeAddress,
|
|
||||||
getChats
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,138 +0,0 @@
|
||||||
/** @format */
|
|
||||||
const debounce = require('lodash/debounce')
|
|
||||||
const logger = require('winston')
|
|
||||||
const { Schema } = require('shock-common')
|
|
||||||
const size = require('lodash/size')
|
|
||||||
|
|
||||||
const Key = require('../key')
|
|
||||||
const Streams = require('../streams')
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {Readonly<import('shock-common').Schema.SimpleReceivedRequest>} SimpleReceivedRequest
|
|
||||||
* @typedef {(reqs: ReadonlyArray<SimpleReceivedRequest>) => void} Listener
|
|
||||||
*/
|
|
||||||
|
|
||||||
/** @type {Set<Listener>} */
|
|
||||||
const listeners = new Set()
|
|
||||||
|
|
||||||
/** @type {string|null} */
|
|
||||||
let currentAddress = null
|
|
||||||
|
|
||||||
/** @type {Record<string, SimpleReceivedRequest>} */
|
|
||||||
let currReceivedReqsMap = {}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unprocessed requests in current handshake node.
|
|
||||||
* @type {Record<string, import('shock-common').Schema.HandshakeRequest>}
|
|
||||||
*/
|
|
||||||
let currAddressData = {}
|
|
||||||
|
|
||||||
/** @returns {SimpleReceivedRequest[]} */
|
|
||||||
const getReceivedReqs = () => Object.values(currReceivedReqsMap)
|
|
||||||
/** @param {Record<string, SimpleReceivedRequest>} reqs */
|
|
||||||
const setReceivedReqsMap = reqs => {
|
|
||||||
currReceivedReqsMap = reqs
|
|
||||||
listeners.forEach(l => l(getReceivedReqs()))
|
|
||||||
}
|
|
||||||
|
|
||||||
listeners.add(() => {
|
|
||||||
logger.info(`new received reqs: ${size(getReceivedReqs())}`)
|
|
||||||
})
|
|
||||||
|
|
||||||
const react = debounce(() => {
|
|
||||||
/** @type {Record<string, SimpleReceivedRequest>} */
|
|
||||||
const newReceivedReqsMap = {}
|
|
||||||
|
|
||||||
const pubToFeed = Streams.getPubToFeed()
|
|
||||||
|
|
||||||
for (const [id, req] of Object.entries(currAddressData)) {
|
|
||||||
const inContact = Array.isArray(pubToFeed[req.from])
|
|
||||||
const isDisconnected = pubToFeed[req.from] === 'disconnected'
|
|
||||||
|
|
||||||
if (!inContact && !isDisconnected) {
|
|
||||||
newReceivedReqsMap[req.from] = {
|
|
||||||
id,
|
|
||||||
requestorAvatar: null,
|
|
||||||
requestorDisplayName: null,
|
|
||||||
requestorPK: req.from,
|
|
||||||
timestamp: req.timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
setReceivedReqsMap(newReceivedReqsMap)
|
|
||||||
}, 750)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {string} addr
|
|
||||||
* @returns {(data: import('../SimpleGUN').OpenListenerData) => void}
|
|
||||||
*/
|
|
||||||
const listenerForAddr = addr => data => {
|
|
||||||
// did invalidate
|
|
||||||
if (addr !== currentAddress) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (typeof data !== 'object' || data === null) {
|
|
||||||
currAddressData = {}
|
|
||||||
} else {
|
|
||||||
for (const [id, req] of Object.entries(data)) {
|
|
||||||
// no need to update them just write them once
|
|
||||||
if (Schema.isHandshakeRequest(req) && !currAddressData[id]) {
|
|
||||||
currAddressData[id] = req
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('data for address length: ' + size(currAddressData))
|
|
||||||
|
|
||||||
react()
|
|
||||||
}
|
|
||||||
|
|
||||||
let subbed = false
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {Listener} cb
|
|
||||||
* @returns {() => void}
|
|
||||||
*/
|
|
||||||
const onReceivedReqs = cb => {
|
|
||||||
listeners.add(cb)
|
|
||||||
cb(getReceivedReqs())
|
|
||||||
|
|
||||||
if (!subbed) {
|
|
||||||
const user = require('../../Mediator').getUser()
|
|
||||||
if (!user.is) {
|
|
||||||
logger.warn('Tried subscribing to onReceivedReqs without authing')
|
|
||||||
}
|
|
||||||
require('./index').onCurrentHandshakeAddress(addr => {
|
|
||||||
if (currentAddress === addr) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
currentAddress = addr
|
|
||||||
currAddressData = {}
|
|
||||||
setReceivedReqsMap({})
|
|
||||||
|
|
||||||
if (typeof addr === 'string') {
|
|
||||||
require('../../Mediator')
|
|
||||||
.getGun()
|
|
||||||
.get(Key.HANDSHAKE_NODES)
|
|
||||||
.get(addr)
|
|
||||||
.open(listenerForAddr(addr))
|
|
||||||
}
|
|
||||||
}, user)
|
|
||||||
|
|
||||||
Streams.onPubToFeed(react)
|
|
||||||
|
|
||||||
subbed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
listeners.delete(cb)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
getReceivedReqs,
|
|
||||||
onReceivedReqs
|
|
||||||
}
|
|
||||||
|
|
@ -1,128 +0,0 @@
|
||||||
/** @format */
|
|
||||||
const debounce = require('lodash/debounce')
|
|
||||||
const logger = require('winston')
|
|
||||||
const size = require('lodash/size')
|
|
||||||
|
|
||||||
const Streams = require('../streams')
|
|
||||||
/**
|
|
||||||
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
|
|
||||||
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
|
|
||||||
* @typedef {import('../SimpleGUN').ISEA} ISEA
|
|
||||||
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
|
|
||||||
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
|
|
||||||
* @typedef {import('shock-common').Schema.Message} Message
|
|
||||||
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
|
|
||||||
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
|
|
||||||
* @typedef {import('shock-common').Schema.Chat} Chat
|
|
||||||
* @typedef {import('shock-common').Schema.ChatMessage} ChatMessage
|
|
||||||
* @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest
|
|
||||||
* @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {(chats: SimpleSentRequest[]) => void} Listener
|
|
||||||
*/
|
|
||||||
|
|
||||||
/** @type {Set<Listener>} */
|
|
||||||
const listeners = new Set()
|
|
||||||
|
|
||||||
/** @type {SimpleSentRequest[]} */
|
|
||||||
let currentReqs = []
|
|
||||||
|
|
||||||
listeners.add(() => {
|
|
||||||
logger.info(`new sent reqs length: ${size(currentReqs)}`)
|
|
||||||
})
|
|
||||||
|
|
||||||
const getCurrentSentReqs = () => currentReqs
|
|
||||||
|
|
||||||
// any time any of the streams we use notifies us that it changed, we fire up
|
|
||||||
// react()
|
|
||||||
const react = debounce(() => {
|
|
||||||
/** @type {SimpleSentRequest[]} */
|
|
||||||
const newReqs = []
|
|
||||||
|
|
||||||
// reactive streams
|
|
||||||
// maps a pk to its current handshake address
|
|
||||||
const pubToHAddr = Streams.getAddresses()
|
|
||||||
// a set or list containing copies of sent requests
|
|
||||||
const storedReqs = Streams.getStoredReqs()
|
|
||||||
// maps a pk to the last request sent to it (so old stored reqs are invalidated)
|
|
||||||
const pubToLastSentReqID = Streams.getSentReqIDs()
|
|
||||||
// maps a pk to a feed, messages if subbed and pk is pubbing, null /
|
|
||||||
// 'disconnected' otherwise
|
|
||||||
const pubToFeed = Streams.getPubToFeed()
|
|
||||||
|
|
||||||
logger.info(`pubToLastSentREqID length: ${size(pubToLastSentReqID)}`)
|
|
||||||
|
|
||||||
for (const storedReq of storedReqs) {
|
|
||||||
const { handshakeAddress, recipientPub, sentReqID, timestamp } = storedReq
|
|
||||||
const currAddress = pubToHAddr[recipientPub]
|
|
||||||
|
|
||||||
const lastReqID = pubToLastSentReqID[recipientPub]
|
|
||||||
// invalidate if this stored request is not the last one sent to this
|
|
||||||
// particular pk
|
|
||||||
const isStale = typeof lastReqID !== 'undefined' && lastReqID !== sentReqID
|
|
||||||
// invalidate if we are in a pub/sub state to this pk (handshake in place)
|
|
||||||
const isConnected = Array.isArray(pubToFeed[recipientPub])
|
|
||||||
|
|
||||||
if (isStale || isConnected) {
|
|
||||||
// eslint-disable-next-line no-continue
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// no address for this pk? let's ask the corresponding stream to sub to
|
|
||||||
// gun.user(pk).get('currentAddr')
|
|
||||||
if (typeof currAddress === 'undefined') {
|
|
||||||
// eslint-disable-next-line no-empty-function
|
|
||||||
Streams.onAddresses(() => {}, recipientPub)()
|
|
||||||
}
|
|
||||||
|
|
||||||
newReqs.push({
|
|
||||||
id: sentReqID,
|
|
||||||
recipientAvatar: null,
|
|
||||||
recipientChangedRequestAddress:
|
|
||||||
// if we haven't received the other's user current handshake address,
|
|
||||||
// let's assume he hasn't changed it and that this request is still
|
|
||||||
// valid
|
|
||||||
typeof currAddress !== 'undefined' && handshakeAddress !== currAddress,
|
|
||||||
recipientDisplayName: null,
|
|
||||||
recipientPublicKey: recipientPub,
|
|
||||||
timestamp
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
currentReqs = newReqs
|
|
||||||
|
|
||||||
listeners.forEach(l => l(currentReqs))
|
|
||||||
}, 750)
|
|
||||||
|
|
||||||
let subbed = false
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Massages all of the more primitive data structures into a more manageable
|
|
||||||
* 'Chat' paradigm.
|
|
||||||
* @param {Listener} cb
|
|
||||||
* @returns {() => void}
|
|
||||||
*/
|
|
||||||
const onSentReqs = cb => {
|
|
||||||
listeners.add(cb)
|
|
||||||
cb(currentReqs)
|
|
||||||
|
|
||||||
if (!subbed) {
|
|
||||||
Streams.onAddresses(react)
|
|
||||||
Streams.onStoredReqs(react)
|
|
||||||
Streams.onLastSentReqIDs(react)
|
|
||||||
Streams.onPubToFeed(react)
|
|
||||||
|
|
||||||
subbed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
listeners.delete(cb)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
onSentReqs,
|
|
||||||
getCurrentSentReqs
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue