diff --git a/services/gunDB/contact-api/streams/addresses.js b/services/gunDB/contact-api/streams/addresses.js deleted file mode 100644 index 705e8b4c..00000000 --- a/services/gunDB/contact-api/streams/addresses.js +++ /dev/null @@ -1,56 +0,0 @@ -/** @format */ -const logger = require('winston') -const size = require('lodash/size') - -const Key = require('../key') -/** - * @typedef {Record} Addresses - */ - -/** @type {Addresses} */ -const pubToAddress = {} - -/** @type {Set<() => void>} */ -const listeners = new Set() - -listeners.add(() => { - logger.info(`pubToAddress length: ${size(pubToAddress)}`) -}) - -const notify = () => listeners.forEach(l => l()) - -/** @type {Set} */ -const subbedPublicKeys = new Set() - -/** - * @param {() => void} cb - * @param {string=} pub - */ -const onAddresses = (cb, pub) => { - listeners.add(cb) - cb() - if (pub && subbedPublicKeys.add(pub)) { - require('../../Mediator') - .getGun() - .user(pub) - .get(Key.CURRENT_HANDSHAKE_ADDRESS) - .on(addr => { - if (typeof addr === 'string' || addr === null) { - pubToAddress[pub] = addr - } else { - pubToAddress[pub] = null - } - notify() - }) - } - return () => { - listeners.delete(cb) - } -} - -const getAddresses = () => pubToAddress - -module.exports = { - onAddresses, - getAddresses -} diff --git a/services/gunDB/contact-api/streams/index.js b/services/gunDB/contact-api/streams/index.js index 3f586954..7ff4d7be 100644 --- a/services/gunDB/contact-api/streams/index.js +++ b/services/gunDB/contact-api/streams/index.js @@ -1,98 +1,6 @@ /** @format */ -const { Schema, Utils: CommonUtils } = require('shock-common') - -const Key = require('../key') -const Utils = require('../utils') - -/** - * @typedef {import('shock-common').Schema.StoredRequest} StoredRequest - * @typedef {(reqs: StoredRequest[]) => void} StoredRequestsListener - */ - -/** @type {Set} */ -const storedRequestsListeners = new Set() - -/** - * @type {StoredRequest[]} - */ -let encryptedStoredReqs = [] - -/** - * @type {StoredRequest[]} - */ -let currentStoredReqs = [] - -const getStoredReqs = () => currentStoredReqs - -const processStoredReqs = async () => { - const ereqs = encryptedStoredReqs - encryptedStoredReqs = [] - const mySecret = await Utils.mySecret() - const SEA = require('../../Mediator').mySEA - const finalReqs = await CommonUtils.asyncMap(ereqs, async er => { - /** @type {StoredRequest} */ - const r = { - handshakeAddress: await SEA.decrypt(er.handshakeAddress, mySecret), - recipientPub: await SEA.decrypt(er.recipientPub, mySecret), - sentReqID: await SEA.decrypt(er.sentReqID, mySecret), - timestamp: er.timestamp - } - - return r - }) - currentStoredReqs = finalReqs - storedRequestsListeners.forEach(l => l(currentStoredReqs)) -} - -let storedReqsSubbed = false - -/** - * @param {StoredRequestsListener} cb - */ -const onStoredReqs = cb => { - storedRequestsListeners.add(cb) - - if (!storedReqsSubbed) { - require('../../Mediator') - .getUser() - .get(Key.STORED_REQS) - .open(d => { - if (typeof d === 'object' && d !== null) { - //@ts-ignore - encryptedStoredReqs = /** @type {StoredRequest[]} */ (Object.values( - d - ).filter(i => Schema.isStoredRequest(i))) - } - - processStoredReqs() - }) - - storedReqsSubbed = true - } - - cb(currentStoredReqs) - - return () => { - storedRequestsListeners.delete(cb) - } -} module.exports = { - onPubToIncoming: require('./pubToIncoming').onPubToIncoming, - getPubToIncoming: require('./pubToIncoming').getPubToIncoming, - setPubToIncoming: require('./pubToIncoming').setPubToIncoming, - - onPubToFeed: require('./pubToFeed').onPubToFeed, - getPubToFeed: require('./pubToFeed').getPubToFeed, - - onStoredReqs, - getStoredReqs, - onAddresses: require('./addresses').onAddresses, - getAddresses: require('./addresses').getAddresses, - onLastSentReqIDs: require('./lastSentReqID').onLastSentReqIDs, - getSentReqIDs: require('./lastSentReqID').getSentReqIDs, - PubToIncoming: require('./pubToIncoming'), - getPubToLastSeenApp: require('./pubToLastSeenApp').getPubToLastSeenApp, onPubToLastSeenApp: require('./pubToLastSeenApp').on } diff --git a/services/gunDB/contact-api/streams/lastSentReqID.js b/services/gunDB/contact-api/streams/lastSentReqID.js deleted file mode 100644 index 431adb58..00000000 --- a/services/gunDB/contact-api/streams/lastSentReqID.js +++ /dev/null @@ -1,56 +0,0 @@ -/** @format */ -const logger = require('winston') -const { Constants } = require('shock-common') - -const Key = require('../key') - -/** @type {Record} */ -let pubToLastSentReqID = {} - -/** @type {Set<() => void>} */ -const listeners = new Set() -const notify = () => listeners.forEach(l => l()) - -let subbed = false - -/** - * @param {() => void} cb - */ -const onLastSentReqIDs = cb => { - listeners.add(cb) - cb() - - if (!subbed) { - const user = require('../../Mediator').getUser() - if (!user.is) { - logger.warn('lastSentReqID() -> tried to sub without authing') - throw new Error(Constants.ErrorCode.NOT_AUTH) - } - - user.get(Key.USER_TO_LAST_REQUEST_SENT).open(data => { - if (typeof data === 'object' && data !== null) { - for (const [pub, id] of Object.entries(data)) { - if (typeof id === 'string' || id === null) { - pubToLastSentReqID[pub] = id - } - } - } else { - pubToLastSentReqID = {} - } - - notify() - }) - subbed = true - } - - return () => { - listeners.delete(cb) - } -} - -const getSentReqIDs = () => pubToLastSentReqID - -module.exports = { - onLastSentReqIDs, - getSentReqIDs -} diff --git a/services/gunDB/contact-api/streams/pubToFeed.js b/services/gunDB/contact-api/streams/pubToFeed.js deleted file mode 100644 index f140aaea..00000000 --- a/services/gunDB/contact-api/streams/pubToFeed.js +++ /dev/null @@ -1,260 +0,0 @@ -/** @format */ -const uuidv1 = require('uuid/v1') -const logger = require('winston') -const debounce = require('lodash/debounce') -const { Schema, Utils: CommonUtils } = require('shock-common') -const size = require('lodash/size') - -const Key = require('../key') -const Utils = require('../utils') -/** - * @typedef {import('shock-common').Schema.ChatMessage} Message - * @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData - */ - -const PubToIncoming = require('./pubToIncoming') - -/** - * @typedef {Record} Feeds - * @typedef {(feeds: Feeds) => void} FeedsListener - */ - -/** @type {Set} */ -const feedsListeners = new Set() - -/** - * @type {Feeds} - */ -let pubToFeed = {} - -const getPubToFeed = () => pubToFeed - -feedsListeners.add(() => { - logger.info(`new pubToFeed length: ${size(getPubToFeed())}`) -}) - -/** @param {Feeds} ptf */ -const setPubToFeed = ptf => { - pubToFeed = ptf - feedsListeners.forEach(l => { - l(pubToFeed) - }) -} - -/** - * If at one point we subscribed to a feed, record it here. Keeps track of it - * for unsubbing. - * - * Since we can't really unsub in GUN, what we do is that each listener created - * checks the last incoming feed, if it was created for other feed that is not - * the latest, it becomes inactive. - * @type {Record} - */ -const pubToLastIncoming = {} - -/** - * Any pub-feed pair listener will write its update id here when fired up. Avoid - * race conditions between different listeners and between different invocations - * of the same listener. - * @type {Record} - */ -const pubToLastUpdate = {} - -/** - * Performs a sub to a pub feed pair that will only emit if it is the last - * subbed feed for that pub, according to `pubToLastIncoming`. This listener is - * not in charge of writing to the cache. - * @param {[ string , string ]} param0 - * @returns {(data: OpenListenerData) => void} - */ -const onOpenForPubFeedPair = ([pub, feed]) => - debounce(async data => { - try { - // did invalidate - if (pubToLastIncoming[pub] !== feed) { - return - } - - if ( - // did disconnect - data === null || - // interpret as disconnect - typeof data !== 'object' - ) { - // invalidate this listener. If a reconnection happens it will be for a - // different pub-feed pair. - pubToLastIncoming[pub] = null - setImmediate(() => { - logger.info( - `onOpenForPubFeedPair -> didDisconnect -> pub: ${pub} - feed: ${feed}` - ) - }) - // signal disconnect to listeners listeners should rely on pubToFeed for - // disconnect status instead of pub-to-incoming. Only the latter will - // detect remote disconnection - setPubToFeed({ - ...getPubToFeed(), - [pub]: /** @type {'disconnected'} */ ('disconnected') - }) - return - } - //@ts-ignore - const incoming = /** @type {import('shock-common').Schema.Outgoing} */ (data) - - // incomplete data, let's not assume anything - if ( - typeof incoming.with !== 'string' || - typeof incoming.messages !== 'object' - ) { - return - } - - /** @type {import('shock-common').Schema.ChatMessage[]} */ - const newMsgs = Object.entries(incoming.messages) - // filter out messages with incomplete data - .filter(([_, msg]) => Schema.isMessage(msg)) - .map(([id, msg]) => { - /** @type {import('shock-common').Schema.ChatMessage} */ - const m = { - // we'll decrypt later - body: msg.body, - id, - outgoing: false, - timestamp: msg.timestamp - } - - return m - }) - - if (newMsgs.length === 0) { - setPubToFeed({ - ...getPubToFeed(), - [pub]: [] - }) - return - } - - const thisUpdate = uuidv1() - pubToLastUpdate[pub] = thisUpdate - - const user = require('../../Mediator').getUser() - if (!user.is) { - logger.warn('pubToFeed -> onOpenForPubFeedPair() -> user is not auth') - } - const SEA = require('../../Mediator').mySEA - - const ourSecret = await SEA.secret(await Utils.pubToEpub(pub), user._.sea) - - const decryptedMsgs = await CommonUtils.asyncMap(newMsgs, async m => { - /** @type {import('shock-common').Schema.ChatMessage} */ - const decryptedMsg = { - ...m, - body: await SEA.decrypt(m.body, ourSecret) - } - - return decryptedMsg - }) - - // this listener got invalidated while we were awaiting the async operations - // above. - if (pubToLastUpdate[pub] !== thisUpdate) { - return - } - - setPubToFeed({ - ...getPubToFeed(), - [pub]: decryptedMsgs - }) - } catch (err) { - logger.warn(`error inside pub to pk-feed pair: ${pub} -- ${feed}`) - logger.error(err) - } - }, 750) - -const react = () => { - const pubToIncoming = PubToIncoming.getPubToIncoming() - - const gun = require('../../Mediator').getGun() - - /** @type {Feeds} */ - const newPubToFeed = {} - - for (const [pub, inc] of Object.entries(pubToIncoming)) { - /** - * empty string -> null - * @type {string|null} - */ - const newIncoming = inc || null - - if ( - // if disconnected, the same incoming feed will try to overwrite the - // nulled out pubToLastIncoming[pub] entry. Making the listener for that - // pub feed pair fire up again, etc. Now. When the user disconnects from - // this side of things. He will overwrite the pub to incoming with null. - // Let's allow that. - newIncoming === pubToLastIncoming[pub] && - !(pubToFeed[pub] === 'disconnected' && newIncoming === null) - ) { - // eslint-disable-next-line no-continue - continue - } - - // will invalidate stale listeners (a listener for an outdated incoming feed - // id) - pubToLastIncoming[pub] = newIncoming - // Invalidate pending writes from stale listener(s) for the old incoming - // address. - pubToLastUpdate[pub] = uuidv1() - newPubToFeed[pub] = newIncoming ? [] : null - - // sub to this incoming feed - if (typeof newIncoming === 'string') { - // perform sub to pub-incoming_feed pair - // leave all of the sideffects from this for the next tick - setImmediate(() => { - gun - .user(pub) - .get(Key.OUTGOINGS) - .get(newIncoming) - .open(onOpenForPubFeedPair([pub, newIncoming])) - }) - } - } - - if (Object.keys(newPubToFeed).length > 0) { - setPubToFeed({ - ...getPubToFeed(), - ...newPubToFeed - }) - } -} - -let subbed = false - -/** - * Array.isArray(pubToFeed[pub]) means a Handshake is in place, look for - * incoming messages here. - * pubToIncoming[pub] === null means a disconnection took place. - * typeof pubToIncoming[pub] === 'undefined' means none of the above. - * @param {FeedsListener} cb - * @returns {() => void} - */ -const onPubToFeed = cb => { - feedsListeners.add(cb) - cb(getPubToFeed()) - - if (!subbed) { - PubToIncoming.onPubToIncoming(react) - subbed = true - } - - return () => { - feedsListeners.delete(cb) - } -} - -module.exports = { - getPubToFeed, - setPubToFeed, - onPubToFeed -} diff --git a/services/gunDB/contact-api/streams/pubToIncoming.js b/services/gunDB/contact-api/streams/pubToIncoming.js deleted file mode 100644 index 2834ca79..00000000 --- a/services/gunDB/contact-api/streams/pubToIncoming.js +++ /dev/null @@ -1,105 +0,0 @@ -/** @format */ -const uuidv1 = require('uuid/v1') -const debounce = require('lodash/debounce') -const logger = require('winston') -const { Utils: CommonUtils } = require('shock-common') -const size = require('lodash/size') - -const { USER_TO_INCOMING } = require('../key') -/** @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData */ - -/** - * @typedef {Record} PubToIncoming - */ - -/** @type {Set<() => void>} */ -const listeners = new Set() - -/** @type {PubToIncoming} */ -let pubToIncoming = {} - -const getPubToIncoming = () => pubToIncoming -/** - * @param {PubToIncoming} pti - * @returns {void} - */ -const setPubToIncoming = pti => { - pubToIncoming = pti - listeners.forEach(l => l()) -} - -let latestUpdate = uuidv1() - -listeners.add(() => { - logger.info(`new pubToIncoming length: ${size(getPubToIncoming())}`) -}) - -const onOpen = debounce(async uti => { - const SEA = require('../../Mediator').mySEA - const mySec = require('../../Mediator').getMySecret() - const thisUpdate = uuidv1() - latestUpdate = thisUpdate - - if (typeof uti !== 'object' || uti === null) { - setPubToIncoming({}) - return - } - - /** @type {PubToIncoming} */ - const newPubToIncoming = {} - - await CommonUtils.asyncForEach( - Object.entries(uti), - async ([pub, encFeedID]) => { - if (encFeedID === null) { - newPubToIncoming[pub] = null - return - } - - if (typeof encFeedID === 'string') { - newPubToIncoming[pub] = await SEA.decrypt(encFeedID, mySec) - } - } - ) - - // avoid old data from overwriting new data if decrypting took longer to - // process for the older open() call than for the newer open() call - if (latestUpdate === thisUpdate) { - setPubToIncoming(newPubToIncoming) - } -}, 750) - -let subbed = false - -/** - * @param {() => void} cb - * @returns {() => void} - */ -const onPubToIncoming = cb => { - if (!listeners.add(cb)) { - throw new Error('Tried to subscribe twice') - } - - cb() - - if (!subbed) { - const user = require('../../Mediator').getUser() - if (!user.is) { - logger.warn(`subscribing to pubToIncoming on a unauth user`) - } - user.get(USER_TO_INCOMING).open(onOpen) - subbed = true - } - - return () => { - if (!listeners.delete(cb)) { - throw new Error('Tried to unsubscribe twice') - } - } -} - -module.exports = { - getPubToIncoming, - setPubToIncoming, - onPubToIncoming -}