Remove old chat code
This commit is contained in:
parent
148b42c5ed
commit
c70062b86f
5 changed files with 0 additions and 569 deletions
|
|
@ -1,56 +0,0 @@
|
|||
/** @format */
|
||||
const logger = require('winston')
|
||||
const size = require('lodash/size')
|
||||
|
||||
const Key = require('../key')
|
||||
/**
|
||||
* @typedef {Record<string, string|null|undefined>} Addresses
|
||||
*/
|
||||
|
||||
/** @type {Addresses} */
|
||||
const pubToAddress = {}
|
||||
|
||||
/** @type {Set<() => void>} */
|
||||
const listeners = new Set()
|
||||
|
||||
listeners.add(() => {
|
||||
logger.info(`pubToAddress length: ${size(pubToAddress)}`)
|
||||
})
|
||||
|
||||
const notify = () => listeners.forEach(l => l())
|
||||
|
||||
/** @type {Set<string>} */
|
||||
const subbedPublicKeys = new Set()
|
||||
|
||||
/**
|
||||
* @param {() => void} cb
|
||||
* @param {string=} pub
|
||||
*/
|
||||
const onAddresses = (cb, pub) => {
|
||||
listeners.add(cb)
|
||||
cb()
|
||||
if (pub && subbedPublicKeys.add(pub)) {
|
||||
require('../../Mediator')
|
||||
.getGun()
|
||||
.user(pub)
|
||||
.get(Key.CURRENT_HANDSHAKE_ADDRESS)
|
||||
.on(addr => {
|
||||
if (typeof addr === 'string' || addr === null) {
|
||||
pubToAddress[pub] = addr
|
||||
} else {
|
||||
pubToAddress[pub] = null
|
||||
}
|
||||
notify()
|
||||
})
|
||||
}
|
||||
return () => {
|
||||
listeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
const getAddresses = () => pubToAddress
|
||||
|
||||
module.exports = {
|
||||
onAddresses,
|
||||
getAddresses
|
||||
}
|
||||
|
|
@ -1,98 +1,6 @@
|
|||
/** @format */
|
||||
const { Schema, Utils: CommonUtils } = require('shock-common')
|
||||
|
||||
const Key = require('../key')
|
||||
const Utils = require('../utils')
|
||||
|
||||
/**
|
||||
* @typedef {import('shock-common').Schema.StoredRequest} StoredRequest
|
||||
* @typedef {(reqs: StoredRequest[]) => void} StoredRequestsListener
|
||||
*/
|
||||
|
||||
/** @type {Set<StoredRequestsListener>} */
|
||||
const storedRequestsListeners = new Set()
|
||||
|
||||
/**
|
||||
* @type {StoredRequest[]}
|
||||
*/
|
||||
let encryptedStoredReqs = []
|
||||
|
||||
/**
|
||||
* @type {StoredRequest[]}
|
||||
*/
|
||||
let currentStoredReqs = []
|
||||
|
||||
const getStoredReqs = () => currentStoredReqs
|
||||
|
||||
const processStoredReqs = async () => {
|
||||
const ereqs = encryptedStoredReqs
|
||||
encryptedStoredReqs = []
|
||||
const mySecret = await Utils.mySecret()
|
||||
const SEA = require('../../Mediator').mySEA
|
||||
const finalReqs = await CommonUtils.asyncMap(ereqs, async er => {
|
||||
/** @type {StoredRequest} */
|
||||
const r = {
|
||||
handshakeAddress: await SEA.decrypt(er.handshakeAddress, mySecret),
|
||||
recipientPub: await SEA.decrypt(er.recipientPub, mySecret),
|
||||
sentReqID: await SEA.decrypt(er.sentReqID, mySecret),
|
||||
timestamp: er.timestamp
|
||||
}
|
||||
|
||||
return r
|
||||
})
|
||||
currentStoredReqs = finalReqs
|
||||
storedRequestsListeners.forEach(l => l(currentStoredReqs))
|
||||
}
|
||||
|
||||
let storedReqsSubbed = false
|
||||
|
||||
/**
|
||||
* @param {StoredRequestsListener} cb
|
||||
*/
|
||||
const onStoredReqs = cb => {
|
||||
storedRequestsListeners.add(cb)
|
||||
|
||||
if (!storedReqsSubbed) {
|
||||
require('../../Mediator')
|
||||
.getUser()
|
||||
.get(Key.STORED_REQS)
|
||||
.open(d => {
|
||||
if (typeof d === 'object' && d !== null) {
|
||||
//@ts-ignore
|
||||
encryptedStoredReqs = /** @type {StoredRequest[]} */ (Object.values(
|
||||
d
|
||||
).filter(i => Schema.isStoredRequest(i)))
|
||||
}
|
||||
|
||||
processStoredReqs()
|
||||
})
|
||||
|
||||
storedReqsSubbed = true
|
||||
}
|
||||
|
||||
cb(currentStoredReqs)
|
||||
|
||||
return () => {
|
||||
storedRequestsListeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
onPubToIncoming: require('./pubToIncoming').onPubToIncoming,
|
||||
getPubToIncoming: require('./pubToIncoming').getPubToIncoming,
|
||||
setPubToIncoming: require('./pubToIncoming').setPubToIncoming,
|
||||
|
||||
onPubToFeed: require('./pubToFeed').onPubToFeed,
|
||||
getPubToFeed: require('./pubToFeed').getPubToFeed,
|
||||
|
||||
onStoredReqs,
|
||||
getStoredReqs,
|
||||
onAddresses: require('./addresses').onAddresses,
|
||||
getAddresses: require('./addresses').getAddresses,
|
||||
onLastSentReqIDs: require('./lastSentReqID').onLastSentReqIDs,
|
||||
getSentReqIDs: require('./lastSentReqID').getSentReqIDs,
|
||||
PubToIncoming: require('./pubToIncoming'),
|
||||
|
||||
getPubToLastSeenApp: require('./pubToLastSeenApp').getPubToLastSeenApp,
|
||||
onPubToLastSeenApp: require('./pubToLastSeenApp').on
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,56 +0,0 @@
|
|||
/** @format */
|
||||
const logger = require('winston')
|
||||
const { Constants } = require('shock-common')
|
||||
|
||||
const Key = require('../key')
|
||||
|
||||
/** @type {Record<string, string|null|undefined>} */
|
||||
let pubToLastSentReqID = {}
|
||||
|
||||
/** @type {Set<() => void>} */
|
||||
const listeners = new Set()
|
||||
const notify = () => listeners.forEach(l => l())
|
||||
|
||||
let subbed = false
|
||||
|
||||
/**
|
||||
* @param {() => void} cb
|
||||
*/
|
||||
const onLastSentReqIDs = cb => {
|
||||
listeners.add(cb)
|
||||
cb()
|
||||
|
||||
if (!subbed) {
|
||||
const user = require('../../Mediator').getUser()
|
||||
if (!user.is) {
|
||||
logger.warn('lastSentReqID() -> tried to sub without authing')
|
||||
throw new Error(Constants.ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
user.get(Key.USER_TO_LAST_REQUEST_SENT).open(data => {
|
||||
if (typeof data === 'object' && data !== null) {
|
||||
for (const [pub, id] of Object.entries(data)) {
|
||||
if (typeof id === 'string' || id === null) {
|
||||
pubToLastSentReqID[pub] = id
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pubToLastSentReqID = {}
|
||||
}
|
||||
|
||||
notify()
|
||||
})
|
||||
subbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
listeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
const getSentReqIDs = () => pubToLastSentReqID
|
||||
|
||||
module.exports = {
|
||||
onLastSentReqIDs,
|
||||
getSentReqIDs
|
||||
}
|
||||
|
|
@ -1,260 +0,0 @@
|
|||
/** @format */
|
||||
const uuidv1 = require('uuid/v1')
|
||||
const logger = require('winston')
|
||||
const debounce = require('lodash/debounce')
|
||||
const { Schema, Utils: CommonUtils } = require('shock-common')
|
||||
const size = require('lodash/size')
|
||||
|
||||
const Key = require('../key')
|
||||
const Utils = require('../utils')
|
||||
/**
|
||||
* @typedef {import('shock-common').Schema.ChatMessage} Message
|
||||
* @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData
|
||||
*/
|
||||
|
||||
const PubToIncoming = require('./pubToIncoming')
|
||||
|
||||
/**
|
||||
* @typedef {Record<string, Message[]|null|undefined|'disconnected'>} Feeds
|
||||
* @typedef {(feeds: Feeds) => void} FeedsListener
|
||||
*/
|
||||
|
||||
/** @type {Set<FeedsListener>} */
|
||||
const feedsListeners = new Set()
|
||||
|
||||
/**
|
||||
* @type {Feeds}
|
||||
*/
|
||||
let pubToFeed = {}
|
||||
|
||||
const getPubToFeed = () => pubToFeed
|
||||
|
||||
feedsListeners.add(() => {
|
||||
logger.info(`new pubToFeed length: ${size(getPubToFeed())}`)
|
||||
})
|
||||
|
||||
/** @param {Feeds} ptf */
|
||||
const setPubToFeed = ptf => {
|
||||
pubToFeed = ptf
|
||||
feedsListeners.forEach(l => {
|
||||
l(pubToFeed)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* If at one point we subscribed to a feed, record it here. Keeps track of it
|
||||
* for unsubbing.
|
||||
*
|
||||
* Since we can't really unsub in GUN, what we do is that each listener created
|
||||
* checks the last incoming feed, if it was created for other feed that is not
|
||||
* the latest, it becomes inactive.
|
||||
* @type {Record<string, string|undefined|null>}
|
||||
*/
|
||||
const pubToLastIncoming = {}
|
||||
|
||||
/**
|
||||
* Any pub-feed pair listener will write its update id here when fired up. Avoid
|
||||
* race conditions between different listeners and between different invocations
|
||||
* of the same listener.
|
||||
* @type {Record<string, string>}
|
||||
*/
|
||||
const pubToLastUpdate = {}
|
||||
|
||||
/**
|
||||
* Performs a sub to a pub feed pair that will only emit if it is the last
|
||||
* subbed feed for that pub, according to `pubToLastIncoming`. This listener is
|
||||
* not in charge of writing to the cache.
|
||||
* @param {[ string , string ]} param0
|
||||
* @returns {(data: OpenListenerData) => void}
|
||||
*/
|
||||
const onOpenForPubFeedPair = ([pub, feed]) =>
|
||||
debounce(async data => {
|
||||
try {
|
||||
// did invalidate
|
||||
if (pubToLastIncoming[pub] !== feed) {
|
||||
return
|
||||
}
|
||||
|
||||
if (
|
||||
// did disconnect
|
||||
data === null ||
|
||||
// interpret as disconnect
|
||||
typeof data !== 'object'
|
||||
) {
|
||||
// invalidate this listener. If a reconnection happens it will be for a
|
||||
// different pub-feed pair.
|
||||
pubToLastIncoming[pub] = null
|
||||
setImmediate(() => {
|
||||
logger.info(
|
||||
`onOpenForPubFeedPair -> didDisconnect -> pub: ${pub} - feed: ${feed}`
|
||||
)
|
||||
})
|
||||
// signal disconnect to listeners listeners should rely on pubToFeed for
|
||||
// disconnect status instead of pub-to-incoming. Only the latter will
|
||||
// detect remote disconnection
|
||||
setPubToFeed({
|
||||
...getPubToFeed(),
|
||||
[pub]: /** @type {'disconnected'} */ ('disconnected')
|
||||
})
|
||||
return
|
||||
}
|
||||
//@ts-ignore
|
||||
const incoming = /** @type {import('shock-common').Schema.Outgoing} */ (data)
|
||||
|
||||
// incomplete data, let's not assume anything
|
||||
if (
|
||||
typeof incoming.with !== 'string' ||
|
||||
typeof incoming.messages !== 'object'
|
||||
) {
|
||||
return
|
||||
}
|
||||
|
||||
/** @type {import('shock-common').Schema.ChatMessage[]} */
|
||||
const newMsgs = Object.entries(incoming.messages)
|
||||
// filter out messages with incomplete data
|
||||
.filter(([_, msg]) => Schema.isMessage(msg))
|
||||
.map(([id, msg]) => {
|
||||
/** @type {import('shock-common').Schema.ChatMessage} */
|
||||
const m = {
|
||||
// we'll decrypt later
|
||||
body: msg.body,
|
||||
id,
|
||||
outgoing: false,
|
||||
timestamp: msg.timestamp
|
||||
}
|
||||
|
||||
return m
|
||||
})
|
||||
|
||||
if (newMsgs.length === 0) {
|
||||
setPubToFeed({
|
||||
...getPubToFeed(),
|
||||
[pub]: []
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const thisUpdate = uuidv1()
|
||||
pubToLastUpdate[pub] = thisUpdate
|
||||
|
||||
const user = require('../../Mediator').getUser()
|
||||
if (!user.is) {
|
||||
logger.warn('pubToFeed -> onOpenForPubFeedPair() -> user is not auth')
|
||||
}
|
||||
const SEA = require('../../Mediator').mySEA
|
||||
|
||||
const ourSecret = await SEA.secret(await Utils.pubToEpub(pub), user._.sea)
|
||||
|
||||
const decryptedMsgs = await CommonUtils.asyncMap(newMsgs, async m => {
|
||||
/** @type {import('shock-common').Schema.ChatMessage} */
|
||||
const decryptedMsg = {
|
||||
...m,
|
||||
body: await SEA.decrypt(m.body, ourSecret)
|
||||
}
|
||||
|
||||
return decryptedMsg
|
||||
})
|
||||
|
||||
// this listener got invalidated while we were awaiting the async operations
|
||||
// above.
|
||||
if (pubToLastUpdate[pub] !== thisUpdate) {
|
||||
return
|
||||
}
|
||||
|
||||
setPubToFeed({
|
||||
...getPubToFeed(),
|
||||
[pub]: decryptedMsgs
|
||||
})
|
||||
} catch (err) {
|
||||
logger.warn(`error inside pub to pk-feed pair: ${pub} -- ${feed}`)
|
||||
logger.error(err)
|
||||
}
|
||||
}, 750)
|
||||
|
||||
const react = () => {
|
||||
const pubToIncoming = PubToIncoming.getPubToIncoming()
|
||||
|
||||
const gun = require('../../Mediator').getGun()
|
||||
|
||||
/** @type {Feeds} */
|
||||
const newPubToFeed = {}
|
||||
|
||||
for (const [pub, inc] of Object.entries(pubToIncoming)) {
|
||||
/**
|
||||
* empty string -> null
|
||||
* @type {string|null}
|
||||
*/
|
||||
const newIncoming = inc || null
|
||||
|
||||
if (
|
||||
// if disconnected, the same incoming feed will try to overwrite the
|
||||
// nulled out pubToLastIncoming[pub] entry. Making the listener for that
|
||||
// pub feed pair fire up again, etc. Now. When the user disconnects from
|
||||
// this side of things. He will overwrite the pub to incoming with null.
|
||||
// Let's allow that.
|
||||
newIncoming === pubToLastIncoming[pub] &&
|
||||
!(pubToFeed[pub] === 'disconnected' && newIncoming === null)
|
||||
) {
|
||||
// eslint-disable-next-line no-continue
|
||||
continue
|
||||
}
|
||||
|
||||
// will invalidate stale listeners (a listener for an outdated incoming feed
|
||||
// id)
|
||||
pubToLastIncoming[pub] = newIncoming
|
||||
// Invalidate pending writes from stale listener(s) for the old incoming
|
||||
// address.
|
||||
pubToLastUpdate[pub] = uuidv1()
|
||||
newPubToFeed[pub] = newIncoming ? [] : null
|
||||
|
||||
// sub to this incoming feed
|
||||
if (typeof newIncoming === 'string') {
|
||||
// perform sub to pub-incoming_feed pair
|
||||
// leave all of the sideffects from this for the next tick
|
||||
setImmediate(() => {
|
||||
gun
|
||||
.user(pub)
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(newIncoming)
|
||||
.open(onOpenForPubFeedPair([pub, newIncoming]))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if (Object.keys(newPubToFeed).length > 0) {
|
||||
setPubToFeed({
|
||||
...getPubToFeed(),
|
||||
...newPubToFeed
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
let subbed = false
|
||||
|
||||
/**
|
||||
* Array.isArray(pubToFeed[pub]) means a Handshake is in place, look for
|
||||
* incoming messages here.
|
||||
* pubToIncoming[pub] === null means a disconnection took place.
|
||||
* typeof pubToIncoming[pub] === 'undefined' means none of the above.
|
||||
* @param {FeedsListener} cb
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onPubToFeed = cb => {
|
||||
feedsListeners.add(cb)
|
||||
cb(getPubToFeed())
|
||||
|
||||
if (!subbed) {
|
||||
PubToIncoming.onPubToIncoming(react)
|
||||
subbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
feedsListeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getPubToFeed,
|
||||
setPubToFeed,
|
||||
onPubToFeed
|
||||
}
|
||||
|
|
@ -1,105 +0,0 @@
|
|||
/** @format */
|
||||
const uuidv1 = require('uuid/v1')
|
||||
const debounce = require('lodash/debounce')
|
||||
const logger = require('winston')
|
||||
const { Utils: CommonUtils } = require('shock-common')
|
||||
const size = require('lodash/size')
|
||||
|
||||
const { USER_TO_INCOMING } = require('../key')
|
||||
/** @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData */
|
||||
|
||||
/**
|
||||
* @typedef {Record<string, string|null|undefined>} PubToIncoming
|
||||
*/
|
||||
|
||||
/** @type {Set<() => void>} */
|
||||
const listeners = new Set()
|
||||
|
||||
/** @type {PubToIncoming} */
|
||||
let pubToIncoming = {}
|
||||
|
||||
const getPubToIncoming = () => pubToIncoming
|
||||
/**
|
||||
* @param {PubToIncoming} pti
|
||||
* @returns {void}
|
||||
*/
|
||||
const setPubToIncoming = pti => {
|
||||
pubToIncoming = pti
|
||||
listeners.forEach(l => l())
|
||||
}
|
||||
|
||||
let latestUpdate = uuidv1()
|
||||
|
||||
listeners.add(() => {
|
||||
logger.info(`new pubToIncoming length: ${size(getPubToIncoming())}`)
|
||||
})
|
||||
|
||||
const onOpen = debounce(async uti => {
|
||||
const SEA = require('../../Mediator').mySEA
|
||||
const mySec = require('../../Mediator').getMySecret()
|
||||
const thisUpdate = uuidv1()
|
||||
latestUpdate = thisUpdate
|
||||
|
||||
if (typeof uti !== 'object' || uti === null) {
|
||||
setPubToIncoming({})
|
||||
return
|
||||
}
|
||||
|
||||
/** @type {PubToIncoming} */
|
||||
const newPubToIncoming = {}
|
||||
|
||||
await CommonUtils.asyncForEach(
|
||||
Object.entries(uti),
|
||||
async ([pub, encFeedID]) => {
|
||||
if (encFeedID === null) {
|
||||
newPubToIncoming[pub] = null
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof encFeedID === 'string') {
|
||||
newPubToIncoming[pub] = await SEA.decrypt(encFeedID, mySec)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
// avoid old data from overwriting new data if decrypting took longer to
|
||||
// process for the older open() call than for the newer open() call
|
||||
if (latestUpdate === thisUpdate) {
|
||||
setPubToIncoming(newPubToIncoming)
|
||||
}
|
||||
}, 750)
|
||||
|
||||
let subbed = false
|
||||
|
||||
/**
|
||||
* @param {() => void} cb
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onPubToIncoming = cb => {
|
||||
if (!listeners.add(cb)) {
|
||||
throw new Error('Tried to subscribe twice')
|
||||
}
|
||||
|
||||
cb()
|
||||
|
||||
if (!subbed) {
|
||||
const user = require('../../Mediator').getUser()
|
||||
if (!user.is) {
|
||||
logger.warn(`subscribing to pubToIncoming on a unauth user`)
|
||||
}
|
||||
user.get(USER_TO_INCOMING).open(onOpen)
|
||||
subbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
if (!listeners.delete(cb)) {
|
||||
throw new Error('Tried to unsubscribe twice')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getPubToIncoming,
|
||||
setPubToIncoming,
|
||||
onPubToIncoming
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue