No nested tryAndWait
This commit is contained in:
parent
4212ea4612
commit
19f14e6d83
1 changed files with 92 additions and 87 deletions
|
|
@ -1,6 +1,8 @@
|
||||||
/**
|
/**
|
||||||
* @format
|
* @format
|
||||||
*/
|
*/
|
||||||
|
const logger = require('winston')
|
||||||
|
|
||||||
const ErrorCode = require('../errorCode')
|
const ErrorCode = require('../errorCode')
|
||||||
const Key = require('../key')
|
const Key = require('../key')
|
||||||
const Schema = require('../schema')
|
const Schema = require('../schema')
|
||||||
|
|
@ -12,6 +14,8 @@ const Utils = require('../utils')
|
||||||
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
|
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
let procid = 0
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws {Error} NOT_AUTH
|
* @throws {Error} NOT_AUTH
|
||||||
* @param {UserGUNNode} user
|
* @param {UserGUNNode} user
|
||||||
|
|
@ -23,17 +27,16 @@ const onAcceptedRequests = (user, SEA) => {
|
||||||
throw new Error(ErrorCode.NOT_AUTH)
|
throw new Error(ErrorCode.NOT_AUTH)
|
||||||
}
|
}
|
||||||
|
|
||||||
const mySecret = require('../../Mediator').getMySecret()
|
procid++
|
||||||
|
|
||||||
if (typeof mySecret !== 'string') {
|
|
||||||
console.log("Jobs.onAcceptedRequests() -> typeof mySecret !== 'string'")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
user
|
user
|
||||||
.get(Key.STORED_REQS)
|
.get(Key.STORED_REQS)
|
||||||
.map()
|
.map()
|
||||||
.once(async (storedReq, id) => {
|
.once(async (storedReq, id) => {
|
||||||
|
logger.info(
|
||||||
|
`------------------------------------\nPROCID:${procid}\n---------------------------------------`
|
||||||
|
)
|
||||||
|
const mySecret = require('../../Mediator').getMySecret()
|
||||||
try {
|
try {
|
||||||
if (!Schema.isStoredRequest(storedReq)) {
|
if (!Schema.isStoredRequest(storedReq)) {
|
||||||
throw new TypeError(
|
throw new TypeError(
|
||||||
|
|
@ -70,99 +73,101 @@ const onAcceptedRequests = (user, SEA) => {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const gun = require('../../Mediator').getGun()
|
||||||
|
const user = require('../../Mediator').getUser()
|
||||||
|
|
||||||
const recipientEpub = await Utils.pubToEpub(recipientPub)
|
const recipientEpub = await Utils.pubToEpub(recipientPub)
|
||||||
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
|
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
|
||||||
|
|
||||||
if (typeof ourSecret !== 'string') {
|
await new Promise((res, rej) => {
|
||||||
throw new TypeError("typeof ourSecret !== 'string'")
|
gun
|
||||||
}
|
.get(Key.HANDSHAKE_NODES)
|
||||||
|
.get(requestAddress)
|
||||||
await Utils.tryAndWait(
|
.get(sentReqID)
|
||||||
(gun, user) =>
|
.on(async sentReq => {
|
||||||
new Promise((res, rej) => {
|
if (!Schema.isHandshakeRequest(sentReq)) {
|
||||||
gun
|
rej(
|
||||||
.get(Key.HANDSHAKE_NODES)
|
new Error(
|
||||||
.get(requestAddress)
|
'sent request found in handshake node not a handshake request'
|
||||||
.get(sentReqID)
|
|
||||||
.on(async sentReq => {
|
|
||||||
if (!Schema.isHandshakeRequest(sentReq)) {
|
|
||||||
rej(
|
|
||||||
new Error(
|
|
||||||
'sent request found in handshake node not a handshake request'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// The response can be decrypted with the same secret regardless of who
|
|
||||||
// wrote to it last (see HandshakeRequest definition).
|
|
||||||
// This could be our feed ID for the recipient, or the recipient's feed
|
|
||||||
// id if he accepted the request.
|
|
||||||
const feedID = await SEA.decrypt(sentReq.response, ourSecret)
|
|
||||||
|
|
||||||
if (typeof feedID !== 'string') {
|
|
||||||
throw new TypeError("typeof feedID !== 'string'")
|
|
||||||
}
|
|
||||||
|
|
||||||
const maybeFeedOnRecipientsOutgoings = await Utils.tryAndWait(
|
|
||||||
gun =>
|
|
||||||
new Promise(res => {
|
|
||||||
gun
|
|
||||||
.user(recipientPub)
|
|
||||||
.get(Key.OUTGOINGS)
|
|
||||||
.get(feedID)
|
|
||||||
.once(feed => {
|
|
||||||
res(feed)
|
|
||||||
})
|
|
||||||
}),
|
|
||||||
// retry on undefined, might be a false negative
|
|
||||||
v => typeof v === 'undefined'
|
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
const feedIDExistsOnRecipientsOutgoings =
|
// The response can be decrypted with the same secret regardless of who
|
||||||
typeof maybeFeedOnRecipientsOutgoings === 'object' &&
|
// wrote to it last (see HandshakeRequest definition).
|
||||||
maybeFeedOnRecipientsOutgoings !== null
|
// This could be our feed ID for the recipient, or the recipient's feed
|
||||||
|
// id if he accepted the request.
|
||||||
|
const feedID = await SEA.decrypt(sentReq.response, ourSecret)
|
||||||
|
|
||||||
if (!feedIDExistsOnRecipientsOutgoings) {
|
if (typeof feedID !== 'string') {
|
||||||
return
|
throw new TypeError("typeof feedID !== 'string'")
|
||||||
}
|
}
|
||||||
|
|
||||||
const encryptedForMeIncomingID = await SEA.encrypt(
|
logger.info(`onAcceptedRequests -> decrypted feed ID: ${feedID}`)
|
||||||
feedID,
|
|
||||||
mySecret
|
|
||||||
)
|
|
||||||
|
|
||||||
await new Promise((res, rej) => {
|
logger.info(
|
||||||
user
|
'Will now try to access the other users outgoing feed'
|
||||||
.get(Key.USER_TO_INCOMING)
|
)
|
||||||
.get(recipientPub)
|
|
||||||
.put(encryptedForMeIncomingID, ack => {
|
const maybeFeedOnRecipientsOutgoings = await Utils.tryAndWait(
|
||||||
if (ack.err) {
|
gun =>
|
||||||
rej(new Error(ack.err))
|
new Promise(res => {
|
||||||
} else {
|
gun
|
||||||
res()
|
.user(recipientPub)
|
||||||
}
|
.get(Key.OUTGOINGS)
|
||||||
|
.get(feedID)
|
||||||
|
.once(feed => {
|
||||||
|
res(feed)
|
||||||
})
|
})
|
||||||
})
|
}),
|
||||||
|
// retry on undefined, might be a false negative
|
||||||
|
v => typeof v === 'undefined'
|
||||||
|
)
|
||||||
|
|
||||||
await new Promise((res, rej) => {
|
const feedIDExistsOnRecipientsOutgoings =
|
||||||
user
|
typeof maybeFeedOnRecipientsOutgoings === 'object' &&
|
||||||
.get(Key.STORED_REQS)
|
maybeFeedOnRecipientsOutgoings !== null
|
||||||
.get(id)
|
|
||||||
.put(null, ack => {
|
|
||||||
if (ack.err) {
|
|
||||||
rej(new Error(ack.err))
|
|
||||||
} else {
|
|
||||||
res()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// ensure this listeners gets called at least once
|
if (!feedIDExistsOnRecipientsOutgoings) {
|
||||||
res()
|
return
|
||||||
})
|
}
|
||||||
|
|
||||||
|
const encryptedForMeIncomingID = await SEA.encrypt(
|
||||||
|
feedID,
|
||||||
|
mySecret
|
||||||
|
)
|
||||||
|
|
||||||
|
await new Promise((res, rej) => {
|
||||||
|
user
|
||||||
|
.get(Key.USER_TO_INCOMING)
|
||||||
|
.get(recipientPub)
|
||||||
|
.put(encryptedForMeIncomingID, ack => {
|
||||||
|
if (ack.err) {
|
||||||
|
rej(new Error(ack.err))
|
||||||
|
} else {
|
||||||
|
res()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
await new Promise((res, rej) => {
|
||||||
|
user
|
||||||
|
.get(Key.STORED_REQS)
|
||||||
|
.get(id)
|
||||||
|
.put(null, ack => {
|
||||||
|
if (ack.err) {
|
||||||
|
rej(new Error(ack.err))
|
||||||
|
} else {
|
||||||
|
res()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// ensure this listeners gets called at least once
|
||||||
|
res()
|
||||||
})
|
})
|
||||||
)
|
})
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.warn(`Jobs.onAcceptedRequests() -> ${err.message}`)
|
console.warn(`Jobs.onAcceptedRequests() -> ${err.message}`)
|
||||||
console.log(err)
|
console.log(err)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue