pubtoincoming fixes
This commit is contained in:
parent
5e6142fd1b
commit
6768533515
2 changed files with 146 additions and 124 deletions
|
|
@ -97,128 +97,6 @@ const onDisplayName = (cb, pub) => {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('../schema').ChatMessage[]} Message
|
||||
* @typedef {Record<string, Message|null>} Incomings
|
||||
* @typedef {(incomings: Incomings) => void} IncomingsListener
|
||||
*/
|
||||
|
||||
/**
|
||||
* @type {Incomings}
|
||||
*/
|
||||
const pubToIncoming = {}
|
||||
|
||||
const getPubToIncoming = () => pubToIncoming
|
||||
|
||||
/** @type {Set<IncomingsListener>} */
|
||||
const incomingsListeners = new Set()
|
||||
|
||||
const notifyIncomingsListeners = () => {
|
||||
incomingsListeners.forEach(l => l(pubToIncoming))
|
||||
}
|
||||
|
||||
/** @type {Set<string>} */
|
||||
const pubFeedPairsWithIncomingListeners = new Set()
|
||||
|
||||
let subbed = false
|
||||
|
||||
/**
|
||||
* @param {IncomingsListener} cb
|
||||
*/
|
||||
const onIncoming = cb => {
|
||||
incomingsListeners.add(cb)
|
||||
|
||||
const user = require('../../Mediator').getUser()
|
||||
const SEA = require('../../Mediator').mySEA
|
||||
|
||||
if (!subbed) {
|
||||
user.get(Key.USER_TO_INCOMING).open(uti => {
|
||||
if (typeof uti !== 'object' || uti === null) {
|
||||
return
|
||||
}
|
||||
|
||||
Object.entries(uti).forEach(async ([pub, encFeed]) => {
|
||||
if (typeof encFeed !== 'string') {
|
||||
return
|
||||
}
|
||||
const ourSecret = await SEA.secret(
|
||||
await Utils.pubToEpub(pub),
|
||||
user._.sea
|
||||
)
|
||||
const mySecret = await Utils.mySecret()
|
||||
|
||||
const feed = await SEA.decrypt(encFeed, mySecret)
|
||||
|
||||
if (pubFeedPairsWithIncomingListeners.add(pub + '--' + feed)) {
|
||||
require('../../Mediator')
|
||||
.getGun()
|
||||
.user(pub)
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(feed)
|
||||
.open(async data => {
|
||||
if (data === null) {
|
||||
pubToIncoming[pub] = null
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data !== 'object') {
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data.with !== 'string') {
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data.messages !== 'object') {
|
||||
return
|
||||
}
|
||||
|
||||
if (data.messages === null) {
|
||||
return
|
||||
}
|
||||
|
||||
const msgs = /** @type {[string, Schema.Message][]} */ (Object.entries(
|
||||
data.messages
|
||||
).filter(([_, msg]) => Schema.isMessage(msg)))
|
||||
|
||||
// eslint-disable-next-line require-atomic-updates
|
||||
pubToIncoming[pub] = await Utils.asyncMap(
|
||||
msgs,
|
||||
async ([msgid, msg]) => {
|
||||
let decryptedBody = ''
|
||||
|
||||
if (msg.body === INITIAL_MSG) {
|
||||
decryptedBody = INITIAL_MSG
|
||||
} else {
|
||||
decryptedBody = await SEA.decrypt(msg.body, ourSecret)
|
||||
}
|
||||
|
||||
/** @type {Schema.ChatMessage} */
|
||||
const finalMsg = {
|
||||
body: decryptedBody,
|
||||
id: msgid,
|
||||
outgoing: false,
|
||||
timestamp: msg.timestamp
|
||||
}
|
||||
|
||||
return finalMsg
|
||||
}
|
||||
)
|
||||
|
||||
notifyIncomingsListeners()
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
subbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
incomingsListeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('../schema').StoredRequest} StoredRequest
|
||||
* @typedef {(reqs: StoredRequest[]) => void} StoredRequestsListener
|
||||
|
|
@ -296,8 +174,8 @@ module.exports = {
|
|||
getPubToAvatar,
|
||||
onDisplayName,
|
||||
getPubToDn,
|
||||
onIncoming,
|
||||
getPubToIncoming,
|
||||
onIncoming: require('./pubToIncoming').onIncoming,
|
||||
getPubToIncoming: require('./pubToIncoming').getPubToIncoming,
|
||||
onStoredReqs,
|
||||
getStoredReqs,
|
||||
onAddresses: require('./addresses').onAddresses,
|
||||
|
|
|
|||
144
services/gunDB/contact-api/streams/pubToIncoming.js
Normal file
144
services/gunDB/contact-api/streams/pubToIncoming.js
Normal file
|
|
@ -0,0 +1,144 @@
|
|||
/** @format */
|
||||
const { INITIAL_MSG } = require('../actions')
|
||||
const Schema = require('../schema')
|
||||
const Key = require('../key')
|
||||
const Utils = require('../utils')
|
||||
|
||||
/**
|
||||
* @typedef {import('../schema').ChatMessage} Message
|
||||
* @typedef {Record<string, Message[]|null>} Incomings
|
||||
* @typedef {(incomings: Incomings) => void} IncomingsListener
|
||||
*/
|
||||
|
||||
/**
|
||||
* @type {Incomings}
|
||||
*/
|
||||
const currentPubToIncoming = {}
|
||||
|
||||
const getPubToIncoming = () => currentPubToIncoming
|
||||
|
||||
/** @type {Set<IncomingsListener>} */
|
||||
const incomingsListeners = new Set()
|
||||
|
||||
const notifyIncomingsListeners = () => {
|
||||
incomingsListeners.forEach(l => l(currentPubToIncoming))
|
||||
}
|
||||
|
||||
/** @type {Set<string>} */
|
||||
const pubFeedPairsWithIncomingListeners = new Set()
|
||||
|
||||
let subbed = false
|
||||
|
||||
/**
|
||||
* @param {IncomingsListener} cb
|
||||
*/
|
||||
const onIncoming = cb => {
|
||||
incomingsListeners.add(cb)
|
||||
|
||||
const user = require('../../Mediator').getUser()
|
||||
const SEA = require('../../Mediator').mySEA
|
||||
|
||||
if (!subbed) {
|
||||
user.get(Key.USER_TO_INCOMING).open(uti => {
|
||||
if (typeof uti !== 'object' || uti === null) {
|
||||
return
|
||||
}
|
||||
|
||||
Object.entries(uti).forEach(async ([pub, encFeed]) => {
|
||||
if (typeof encFeed !== 'string') {
|
||||
return
|
||||
}
|
||||
const ourSecret = await SEA.secret(
|
||||
await Utils.pubToEpub(pub),
|
||||
user._.sea
|
||||
)
|
||||
const mySecret = await Utils.mySecret()
|
||||
|
||||
const feed = await SEA.decrypt(encFeed, mySecret)
|
||||
|
||||
if (pubFeedPairsWithIncomingListeners.add(pub + '--' + feed)) {
|
||||
require('../../Mediator')
|
||||
.getGun()
|
||||
.user(pub)
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(feed)
|
||||
.open(async data => {
|
||||
if (data === null) {
|
||||
currentPubToIncoming[pub] = null
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data !== 'object') {
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data.with !== 'string') {
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data.messages !== 'object') {
|
||||
return
|
||||
}
|
||||
|
||||
if (data.messages === null) {
|
||||
return
|
||||
}
|
||||
|
||||
if (!Array.isArray(currentPubToIncoming[pub])) {
|
||||
currentPubToIncoming[pub] = [
|
||||
{
|
||||
body: INITIAL_MSG,
|
||||
// hack one year
|
||||
timestamp: Date.now() - 31556952,
|
||||
id: Math.random().toString(),
|
||||
outgoing: false
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
const msgs = /** @type {[string, Schema.Message][]} */ (Object.entries(
|
||||
data.messages
|
||||
).filter(([_, msg]) => Schema.isMessage(msg)))
|
||||
|
||||
// eslint-disable-next-line require-atomic-updates
|
||||
currentPubToIncoming[pub] = await Utils.asyncMap(
|
||||
msgs,
|
||||
async ([msgid, msg]) => {
|
||||
let decryptedBody = ''
|
||||
|
||||
if (msg.body === INITIAL_MSG) {
|
||||
decryptedBody = INITIAL_MSG
|
||||
} else {
|
||||
decryptedBody = await SEA.decrypt(msg.body, ourSecret)
|
||||
}
|
||||
|
||||
/** @type {Schema.ChatMessage} */
|
||||
const finalMsg = {
|
||||
body: decryptedBody,
|
||||
id: msgid,
|
||||
outgoing: false,
|
||||
timestamp: msg.timestamp
|
||||
}
|
||||
|
||||
return finalMsg
|
||||
}
|
||||
)
|
||||
|
||||
notifyIncomingsListeners()
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
subbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
incomingsListeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
onIncoming,
|
||||
getPubToIncoming
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue