Merge branch 'master' into logs/better-logs
This commit is contained in:
commit
fa39128886
11 changed files with 144 additions and 1942 deletions
6
.vscode/settings.json
vendored
6
.vscode/settings.json
vendored
|
|
@ -4,9 +4,5 @@
|
|||
"debug.node.autoAttach": "on",
|
||||
"editor.formatOnSave": true,
|
||||
"editor.defaultFormatter": "esbenp.prettier-vscode",
|
||||
"cSpell.words": [
|
||||
"Epub",
|
||||
"ISEA",
|
||||
"Reqs"
|
||||
]
|
||||
"cSpell.words": ["Epub", "GUNRPC", "ISEA", "PUBKEY", "Reqs", "uuidv"]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -335,7 +335,15 @@ const authenticate = async (alias, pass, __user) => {
|
|||
},
|
||||
ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(`Error initializing follows: ${ack.err}`))
|
||||
rej(
|
||||
new Error(
|
||||
`Error initializing follows: ${JSON.stringify(
|
||||
ack.err,
|
||||
null,
|
||||
4
|
||||
)}`
|
||||
)
|
||||
)
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
|
|
@ -366,7 +374,15 @@ const authenticate = async (alias, pass, __user) => {
|
|||
},
|
||||
ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(`Error initializing follows: ${ack.err}`))
|
||||
rej(
|
||||
new Error(
|
||||
`Error initializing follows: ${JSON.stringify(
|
||||
ack.err,
|
||||
null,
|
||||
4
|
||||
)}`
|
||||
)
|
||||
)
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
|
|
@ -375,16 +391,10 @@ const authenticate = async (alias, pass, __user) => {
|
|||
}))
|
||||
|
||||
// move this to a subscription; implement off() ? todo
|
||||
API.Jobs.onAcceptedRequests(_user, mySEA)
|
||||
API.Jobs.onOrders(_user, gun, mySEA)
|
||||
API.Jobs.lastSeenNode(_user)
|
||||
|
||||
API.Events.onChats(() => {})()
|
||||
API.Events.onCurrentHandshakeAddress(() => {}, user)()
|
||||
API.Events.onOutgoing(() => {})()
|
||||
API.Events.onSeedBackup(() => {}, user, mySEA)
|
||||
API.Events.onSimplerReceivedRequests(() => {})()
|
||||
API.Events.onSimplerSentRequests(() => {})()
|
||||
|
||||
return _user._.sea.pub
|
||||
}
|
||||
|
|
@ -421,7 +431,15 @@ const authenticate = async (alias, pass, __user) => {
|
|||
},
|
||||
ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(`Error initializing follows: ${ack.err}`))
|
||||
rej(
|
||||
new Error(
|
||||
`Error initializing follows: ${JSON.stringify(
|
||||
ack.err,
|
||||
null,
|
||||
4
|
||||
)}`
|
||||
)
|
||||
)
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
|
|
@ -429,17 +447,10 @@ const authenticate = async (alias, pass, __user) => {
|
|||
)
|
||||
}))
|
||||
|
||||
API.Jobs.onAcceptedRequests(_user, mySEA)
|
||||
API.Jobs.onOrders(_user, gun, mySEA)
|
||||
API.Jobs.lastSeenNode(_user)
|
||||
|
||||
API.Events.onChats(() => {})()
|
||||
API.Events.onCurrentHandshakeAddress(() => {}, user)()
|
||||
|
||||
API.Events.onOutgoing(() => {})()
|
||||
API.Events.onSeedBackup(() => {}, user, mySEA)
|
||||
API.Events.onSimplerReceivedRequests(() => {})()
|
||||
API.Events.onSimplerSentRequests(() => {})()
|
||||
|
||||
return ack.sea.pub
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -26,262 +26,10 @@ const LNDHealthMananger = require('../../../utils/lightningServices/errors')
|
|||
const { enrollContentTokens, selfContentToken } = require('../../seed')
|
||||
|
||||
/**
|
||||
* @typedef {import('./SimpleGUN').GUNNode} GUNNode
|
||||
* @typedef {import('./SimpleGUN').ISEA} ISEA
|
||||
* @typedef {import('./SimpleGUN').UserGUNNode} UserGUNNode
|
||||
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
|
||||
* @typedef {import('shock-common').Schema.StoredRequest} StoredReq
|
||||
* @typedef {import('shock-common').Schema.Message} Message
|
||||
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
|
||||
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
|
||||
* @typedef {import('shock-common').Schema.Order} Order
|
||||
* @typedef {import('./SimpleGUN').Ack} Ack
|
||||
*/
|
||||
|
||||
/**
|
||||
* Create a an outgoing feed. The feed will have an initial special acceptance
|
||||
* message. Returns a promise that resolves to the id of the newly-created
|
||||
* outgoing feed.
|
||||
*
|
||||
* If an outgoing feed is already created for the recipient, then returns the id
|
||||
* of that one.
|
||||
* @param {string} withPublicKey Public key of the intended recipient of the
|
||||
* outgoing feed that will be created.
|
||||
* @throws {Error} If the outgoing feed cannot be created or if the initial
|
||||
* message for it also cannot be created. These errors aren't coded as they are
|
||||
* not meant to be caught outside of this module.
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @returns {Promise<string>}
|
||||
*/
|
||||
const __createOutgoingFeed = async (withPublicKey, user, SEA) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
const mySecret = require('../Mediator').getMySecret()
|
||||
const encryptedForMeRecipientPub = await SEA.encrypt(withPublicKey, mySecret)
|
||||
const ourSecret = await SEA.secret(
|
||||
await Utils.pubToEpub(withPublicKey),
|
||||
user._.sea
|
||||
)
|
||||
|
||||
const maybeOutgoingID = await Utils.recipientToOutgoingID(withPublicKey)
|
||||
|
||||
let outgoingFeedID = ''
|
||||
|
||||
// if there was no stored outgoing, create an outgoing feed
|
||||
if (typeof maybeOutgoingID !== 'string') {
|
||||
/** @type {PartialOutgoing} */
|
||||
const newPartialOutgoingFeed = {
|
||||
with: encryptedForMeRecipientPub
|
||||
}
|
||||
|
||||
/** @type {string} */
|
||||
const newOutgoingFeedID = await new Promise((res, rej) => {
|
||||
const _outFeedNode = user
|
||||
.get(Key.OUTGOINGS)
|
||||
//@ts-ignore
|
||||
.set(newPartialOutgoingFeed, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res(_outFeedNode._.get)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
if (typeof newOutgoingFeedID !== 'string') {
|
||||
throw new TypeError('typeof newOutgoingFeedID !== "string"')
|
||||
}
|
||||
|
||||
/** @type {Message} */
|
||||
const initialMsg = {
|
||||
body: await SEA.encrypt(Constants.Misc.INITIAL_MSG, ourSecret),
|
||||
timestamp: Date.now()
|
||||
}
|
||||
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(newOutgoingFeedID)
|
||||
.get(Key.MESSAGES)
|
||||
//@ts-ignore
|
||||
.set(initialMsg, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
|
||||
const encryptedForMeNewOutgoingFeedID = await SEA.encrypt(
|
||||
newOutgoingFeedID,
|
||||
mySecret
|
||||
)
|
||||
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.RECIPIENT_TO_OUTGOING)
|
||||
.get(withPublicKey)
|
||||
.put(encryptedForMeNewOutgoingFeedID, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
|
||||
outgoingFeedID = newOutgoingFeedID
|
||||
}
|
||||
|
||||
// otherwise decrypt stored outgoing
|
||||
else {
|
||||
outgoingFeedID = maybeOutgoingID
|
||||
}
|
||||
|
||||
if (typeof outgoingFeedID === 'undefined') {
|
||||
throw new TypeError(
|
||||
'__createOutgoingFeed() -> typeof outgoingFeedID === "undefined"'
|
||||
)
|
||||
}
|
||||
|
||||
if (typeof outgoingFeedID !== 'string') {
|
||||
throw new TypeError(
|
||||
'__createOutgoingFeed() -> expected outgoingFeedID to be an string'
|
||||
)
|
||||
}
|
||||
|
||||
if (outgoingFeedID.length === 0) {
|
||||
throw new TypeError(
|
||||
'__createOutgoingFeed() -> expected outgoingFeedID to be a populated string.'
|
||||
)
|
||||
}
|
||||
|
||||
return outgoingFeedID
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a request's ID, that should be found on the user's current handshake
|
||||
* node, accept the request by creating an outgoing feed intended for the
|
||||
* requestor, then encrypting and putting the id of this newly created outgoing
|
||||
* feed on the response prop of the request.
|
||||
* @param {string} requestID The id for the request to accept.
|
||||
* @param {GUNNode} gun
|
||||
* @param {UserGUNNode} user Pass only for testing purposes.
|
||||
* @param {ISEA} SEA
|
||||
* @param {typeof __createOutgoingFeed} outgoingFeedCreator Pass only
|
||||
* for testing. purposes.
|
||||
* @throws {Error} Throws if trying to accept an invalid request, or an error on
|
||||
* gun's part.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const acceptRequest = async (
|
||||
requestID,
|
||||
gun,
|
||||
user,
|
||||
SEA,
|
||||
outgoingFeedCreator = __createOutgoingFeed
|
||||
) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
const handshakeAddress = await Utils.tryAndWait(async (_, user) => {
|
||||
const addr = await user.get(Key.CURRENT_HANDSHAKE_ADDRESS).then()
|
||||
|
||||
if (typeof addr !== 'string') {
|
||||
throw new TypeError("typeof addr !== 'string'")
|
||||
}
|
||||
|
||||
return addr
|
||||
})
|
||||
|
||||
const {
|
||||
response: encryptedForUsIncomingID,
|
||||
from: senderPublicKey
|
||||
} = await Utils.tryAndWait(async gun => {
|
||||
const hr = await gun
|
||||
.get(Key.HANDSHAKE_NODES)
|
||||
.get(handshakeAddress)
|
||||
.get(requestID)
|
||||
.then()
|
||||
|
||||
if (!Schema.isHandshakeRequest(hr)) {
|
||||
throw new Error(ErrorCode.TRIED_TO_ACCEPT_AN_INVALID_REQUEST)
|
||||
}
|
||||
|
||||
return hr
|
||||
})
|
||||
|
||||
/** @type {string} */
|
||||
const requestorEpub = await Utils.pubToEpub(senderPublicKey)
|
||||
|
||||
const ourSecret = await SEA.secret(requestorEpub, user._.sea)
|
||||
if (typeof ourSecret !== 'string') {
|
||||
throw new TypeError("typeof ourSecret !== 'string'")
|
||||
}
|
||||
|
||||
const incomingID = await SEA.decrypt(encryptedForUsIncomingID, ourSecret)
|
||||
if (typeof incomingID !== 'string') {
|
||||
throw new TypeError("typeof incomingID !== 'string'")
|
||||
}
|
||||
|
||||
const newlyCreatedOutgoingFeedID = await outgoingFeedCreator(
|
||||
senderPublicKey,
|
||||
user,
|
||||
SEA
|
||||
)
|
||||
|
||||
const mySecret = require('../Mediator').getMySecret()
|
||||
const encryptedForMeIncomingID = await SEA.encrypt(incomingID, mySecret)
|
||||
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.USER_TO_INCOMING)
|
||||
.get(senderPublicKey)
|
||||
.put(encryptedForMeIncomingID, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// NOTE: perform non-reversable actions before destructive actions
|
||||
// In case any of the non-reversable actions reject.
|
||||
// In this case, writing to the response is the non-revesarble op.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
const encryptedForUsOutgoingID = await SEA.encrypt(
|
||||
newlyCreatedOutgoingFeedID,
|
||||
ourSecret
|
||||
)
|
||||
//why await if you dont need the response?
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
gun
|
||||
.get(Key.HANDSHAKE_NODES)
|
||||
.get(handshakeAddress)
|
||||
.get(requestID)
|
||||
.put(
|
||||
{
|
||||
response: encryptedForUsOutgoingID
|
||||
},
|
||||
ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
}
|
||||
)
|
||||
}))
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} user
|
||||
* @param {string} pass
|
||||
|
|
@ -374,438 +122,6 @@ const generateHandshakeAddress = async () => {
|
|||
}))
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} pub
|
||||
* @throws {Error}
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const cleanup = async pub => {
|
||||
const user = require('../Mediator').getUser()
|
||||
|
||||
const outGoingID = await Utils.recipientToOutgoingID(pub)
|
||||
|
||||
const promises = []
|
||||
|
||||
promises.push(
|
||||
/** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.USER_TO_INCOMING)
|
||||
.get(pub)
|
||||
.put(null, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
)
|
||||
|
||||
promises.push(
|
||||
/** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.RECIPIENT_TO_OUTGOING)
|
||||
.get(pub)
|
||||
.put(null, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
)
|
||||
|
||||
promises.push(
|
||||
/** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.USER_TO_LAST_REQUEST_SENT)
|
||||
.get(pub)
|
||||
.put(null, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
)
|
||||
|
||||
if (outGoingID) {
|
||||
promises.push(
|
||||
/** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(outGoingID)
|
||||
.put(null, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
)
|
||||
}
|
||||
|
||||
await Promise.all(promises)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} recipientPublicKey
|
||||
* @param {GUNNode} gun
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @throws {Error|TypeError}
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const sendHandshakeRequest = async (recipientPublicKey, gun, user, SEA) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
await cleanup(recipientPublicKey)
|
||||
|
||||
if (typeof recipientPublicKey !== 'string') {
|
||||
throw new TypeError(
|
||||
`recipientPublicKey is not string, got: ${typeof recipientPublicKey}`
|
||||
)
|
||||
}
|
||||
|
||||
if (recipientPublicKey.length === 0) {
|
||||
throw new TypeError('recipientPublicKey is an string of length 0')
|
||||
}
|
||||
|
||||
if (recipientPublicKey === user.is.pub) {
|
||||
throw new Error('Do not send a request to yourself')
|
||||
}
|
||||
|
||||
logger.info('sendHR() -> before recipientEpub')
|
||||
|
||||
/** @type {string} */
|
||||
const recipientEpub = await Utils.pubToEpub(recipientPublicKey)
|
||||
|
||||
logger.info('sendHR() -> before mySecret')
|
||||
|
||||
const mySecret = require('../Mediator').getMySecret()
|
||||
logger.info('sendHR() -> before ourSecret')
|
||||
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
|
||||
|
||||
// check if successful handshake is present
|
||||
|
||||
logger.info('sendHR() -> before alreadyHandshaked')
|
||||
|
||||
/** @type {boolean} */
|
||||
const alreadyHandshaked = await Utils.successfulHandshakeAlreadyExists(
|
||||
recipientPublicKey
|
||||
)
|
||||
|
||||
if (alreadyHandshaked) {
|
||||
throw new Error(ErrorCode.ALREADY_HANDSHAKED)
|
||||
}
|
||||
|
||||
logger.info('sendHR() -> before maybeLastRequestIDSentToUser')
|
||||
|
||||
// check that we have already sent a request to this user, on his current
|
||||
// handshake node
|
||||
const maybeLastRequestIDSentToUser = await Utils.tryAndWait((_, user) =>
|
||||
user
|
||||
.get(Key.USER_TO_LAST_REQUEST_SENT)
|
||||
.get(recipientPublicKey)
|
||||
.then()
|
||||
)
|
||||
|
||||
logger.info('sendHR() -> before currentHandshakeAddress')
|
||||
|
||||
const currentHandshakeAddress = await Utils.tryAndWait(
|
||||
gun =>
|
||||
Common.Utils.makePromise(res => {
|
||||
gun
|
||||
.user(recipientPublicKey)
|
||||
.get(Key.CURRENT_HANDSHAKE_ADDRESS)
|
||||
.once(
|
||||
data => {
|
||||
res(data)
|
||||
},
|
||||
{ wait: 1000 }
|
||||
)
|
||||
}),
|
||||
data => typeof data !== 'string'
|
||||
)
|
||||
|
||||
if (typeof currentHandshakeAddress !== 'string') {
|
||||
throw new TypeError(
|
||||
'expected current handshake address found on recipients user node to be an string'
|
||||
)
|
||||
}
|
||||
|
||||
if (typeof maybeLastRequestIDSentToUser === 'string') {
|
||||
if (maybeLastRequestIDSentToUser.length < 5) {
|
||||
throw new TypeError(
|
||||
'sendHandshakeRequest() -> maybeLastRequestIDSentToUser.length < 5'
|
||||
)
|
||||
}
|
||||
|
||||
const lastRequestIDSentToUser = maybeLastRequestIDSentToUser
|
||||
|
||||
logger.info('sendHR() -> before alreadyContactedOnCurrHandshakeNode')
|
||||
|
||||
const hrInHandshakeNode = await Utils.tryAndWait(
|
||||
gun =>
|
||||
new Promise(res => {
|
||||
gun
|
||||
.get(Key.HANDSHAKE_NODES)
|
||||
.get(currentHandshakeAddress)
|
||||
.get(lastRequestIDSentToUser)
|
||||
.once(data => {
|
||||
res(data)
|
||||
})
|
||||
}),
|
||||
// force retry on undefined in case the undefined was a false negative
|
||||
v => typeof v === 'undefined'
|
||||
)
|
||||
|
||||
const alreadyContactedOnCurrHandshakeNode =
|
||||
typeof hrInHandshakeNode !== 'undefined'
|
||||
|
||||
if (alreadyContactedOnCurrHandshakeNode) {
|
||||
throw new Error(ErrorCode.ALREADY_REQUESTED_HANDSHAKE)
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('sendHR() -> before __createOutgoingFeed')
|
||||
|
||||
const outgoingFeedID = await __createOutgoingFeed(
|
||||
recipientPublicKey,
|
||||
user,
|
||||
SEA
|
||||
)
|
||||
|
||||
logger.info('sendHR() -> before encryptedForUsOutgoingFeedID')
|
||||
|
||||
const encryptedForUsOutgoingFeedID = await SEA.encrypt(
|
||||
outgoingFeedID,
|
||||
ourSecret
|
||||
)
|
||||
|
||||
const timestamp = Date.now()
|
||||
|
||||
/** @type {HandshakeRequest} */
|
||||
const handshakeRequestData = {
|
||||
from: user.is.pub,
|
||||
response: encryptedForUsOutgoingFeedID,
|
||||
timestamp
|
||||
}
|
||||
|
||||
const encryptedForMeRecipientPublicKey = await SEA.encrypt(
|
||||
recipientPublicKey,
|
||||
mySecret
|
||||
)
|
||||
|
||||
logger.info('sendHR() -> before newHandshakeRequestID')
|
||||
/** @type {string} */
|
||||
const newHandshakeRequestID = await new Promise((res, rej) => {
|
||||
const hr = gun
|
||||
.get(Key.HANDSHAKE_NODES)
|
||||
.get(currentHandshakeAddress)
|
||||
//@ts-ignore
|
||||
.set(handshakeRequestData, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(`Error trying to create request: ${ack.err}`))
|
||||
} else {
|
||||
res(hr._.get)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.USER_TO_LAST_REQUEST_SENT)
|
||||
.get(recipientPublicKey)
|
||||
.put(newHandshakeRequestID, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
|
||||
// This needs to come before the write to sent requests. Because that write
|
||||
// triggers Jobs.onAcceptedRequests and it in turn reads from request-to-user
|
||||
|
||||
/**
|
||||
* @type {StoredReq}
|
||||
*/
|
||||
const storedReq = {
|
||||
sentReqID: await SEA.encrypt(newHandshakeRequestID, mySecret),
|
||||
recipientPub: encryptedForMeRecipientPublicKey,
|
||||
handshakeAddress: await SEA.encrypt(currentHandshakeAddress, mySecret),
|
||||
timestamp
|
||||
}
|
||||
//why await if you dont need the response?
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
//@ts-ignore
|
||||
user.get(Key.STORED_REQS).set(storedReq, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(
|
||||
new Error(
|
||||
`Error saving newly created request to sent requests: ${ack.err}`
|
||||
)
|
||||
)
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the message id.
|
||||
* @param {string} recipientPublicKey
|
||||
* @param {string} body
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @returns {Promise<import('shock-common').Schema.ChatMessage>} The message id.
|
||||
*/
|
||||
const sendMessageNew = async (recipientPublicKey, body, user, SEA) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
if (typeof recipientPublicKey !== 'string') {
|
||||
throw new TypeError(
|
||||
`expected recipientPublicKey to be an string, but instead got: ${typeof recipientPublicKey}`
|
||||
)
|
||||
}
|
||||
|
||||
if (recipientPublicKey.length === 0) {
|
||||
throw new TypeError(
|
||||
'expected recipientPublicKey to be an string of length greater than zero'
|
||||
)
|
||||
}
|
||||
|
||||
if (typeof body !== 'string') {
|
||||
throw new TypeError(
|
||||
`expected message to be an string, instead got: ${typeof body}`
|
||||
)
|
||||
}
|
||||
|
||||
if (body.length === 0) {
|
||||
throw new TypeError(
|
||||
'expected message to be an string of length greater than zero'
|
||||
)
|
||||
}
|
||||
|
||||
const outgoingID = await Utils.recipientToOutgoingID(recipientPublicKey)
|
||||
|
||||
if (outgoingID === null) {
|
||||
throw new Error(
|
||||
`Could not fetch an outgoing id for user: ${recipientPublicKey}`
|
||||
)
|
||||
}
|
||||
|
||||
const recipientEpub = await Utils.pubToEpub(recipientPublicKey)
|
||||
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
|
||||
if (typeof ourSecret !== 'string') {
|
||||
throw new TypeError("sendMessage() -> typeof ourSecret !== 'string'")
|
||||
}
|
||||
const encryptedBody = await SEA.encrypt(body, ourSecret)
|
||||
|
||||
const newMessage = {
|
||||
body: encryptedBody,
|
||||
timestamp: Date.now()
|
||||
}
|
||||
|
||||
return new Promise((res, rej) => {
|
||||
const msgNode = user
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(outgoingID)
|
||||
.get(Key.MESSAGES)
|
||||
.set(newMessage, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res({
|
||||
body,
|
||||
id: msgNode._.get,
|
||||
outgoing: true,
|
||||
timestamp: newMessage.timestamp
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the message id.
|
||||
* @param {string} recipientPublicKey
|
||||
* @param {string} body
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @returns {Promise<string>} The message id.
|
||||
*/
|
||||
const sendMessage = async (recipientPublicKey, body, user, SEA) =>
|
||||
(await sendMessageNew(recipientPublicKey, body, user, SEA)).id
|
||||
|
||||
/**
|
||||
* @param {string} recipientPub
|
||||
* @param {string} msgID
|
||||
* @param {UserGUNNode} user
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const deleteMessage = async (recipientPub, msgID, user) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
if (typeof recipientPub !== 'string') {
|
||||
throw new TypeError(
|
||||
`expected recipientPublicKey to be an string, but instead got: ${typeof recipientPub}`
|
||||
)
|
||||
}
|
||||
|
||||
if (recipientPub.length === 0) {
|
||||
throw new TypeError(
|
||||
'expected recipientPublicKey to be an string of length greater than zero'
|
||||
)
|
||||
}
|
||||
|
||||
if (typeof msgID !== 'string') {
|
||||
throw new TypeError(
|
||||
`expected msgID to be an string, instead got: ${typeof msgID}`
|
||||
)
|
||||
}
|
||||
|
||||
if (msgID.length === 0) {
|
||||
throw new TypeError(
|
||||
'expected msgID to be an string of length greater than zero'
|
||||
)
|
||||
}
|
||||
|
||||
const outgoingID = await Utils.recipientToOutgoingID(recipientPub)
|
||||
|
||||
if (outgoingID === null) {
|
||||
throw new Error(`Could not fetch an outgoing id for user: ${recipientPub}`)
|
||||
}
|
||||
|
||||
return new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(outgoingID)
|
||||
.get(Key.MESSAGES)
|
||||
.get(msgID)
|
||||
.put(null, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string|null} avatar
|
||||
* @param {UserGUNNode} user
|
||||
|
|
@ -924,52 +240,6 @@ const setSeedServiceData = (encryptedSeedServiceData, user) =>
|
|||
})
|
||||
})
|
||||
|
||||
/**
|
||||
* @param {string} initialMsg
|
||||
* @param {string} recipientPublicKey
|
||||
* @param {GUNNode} gun
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @throws {Error|TypeError}
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const sendHRWithInitialMsg = async (
|
||||
initialMsg,
|
||||
recipientPublicKey,
|
||||
gun,
|
||||
user,
|
||||
SEA
|
||||
) => {
|
||||
/** @type {boolean} */
|
||||
const alreadyHandshaked = await Utils.tryAndWait(
|
||||
(_, user) =>
|
||||
new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.USER_TO_INCOMING)
|
||||
.get(recipientPublicKey)
|
||||
.once(inc => {
|
||||
if (typeof inc !== 'string') {
|
||||
res(false)
|
||||
} else if (inc.length === 0) {
|
||||
rej(
|
||||
new Error(
|
||||
`sendHRWithInitialMsg()-> obtained encryptedIncomingId from user-to-incoming an string but of length 0`
|
||||
)
|
||||
)
|
||||
} else {
|
||||
res(true)
|
||||
}
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
if (!alreadyHandshaked) {
|
||||
await sendHandshakeRequest(recipientPublicKey, gun, user, SEA)
|
||||
}
|
||||
|
||||
await sendMessage(recipientPublicKey, initialMsg, user, SEA)
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {object} SpontPaymentOptions
|
||||
* @prop {Common.Schema.OrderTargetType} type
|
||||
|
|
@ -1035,7 +305,7 @@ const sendSpontaneousPayment = async (
|
|||
|
||||
logger.info('sendPayment() -> will now create order:')
|
||||
|
||||
/** @type {Order} */
|
||||
/** @type {import('shock-common').Schema.Order} */
|
||||
const order = {
|
||||
amount: amount.toString(),
|
||||
from: getUser()._.sea.pub,
|
||||
|
|
@ -1395,18 +665,6 @@ const saveChannelsBackup = async (backups, user, SEA) => {
|
|||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} pub
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const disconnect = async pub => {
|
||||
if (!(await Utils.successfulHandshakeAlreadyExists(pub))) {
|
||||
throw new Error('No handshake exists for this pub')
|
||||
}
|
||||
|
||||
await Promise.all([cleanup(pub), generateHandshakeAddress()])
|
||||
}
|
||||
|
||||
/**
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
|
|
@ -1789,15 +1047,9 @@ const initWall = async () => {
|
|||
}
|
||||
|
||||
module.exports = {
|
||||
__createOutgoingFeed,
|
||||
acceptRequest,
|
||||
authenticate,
|
||||
blacklist,
|
||||
generateHandshakeAddress,
|
||||
sendHandshakeRequest,
|
||||
deleteMessage,
|
||||
sendMessage,
|
||||
sendHRWithInitialMsg,
|
||||
setAvatar,
|
||||
setDisplayName,
|
||||
sendPayment,
|
||||
|
|
@ -1805,14 +1057,12 @@ module.exports = {
|
|||
setBio,
|
||||
saveSeedBackup,
|
||||
saveChannelsBackup,
|
||||
disconnect,
|
||||
setLastSeenApp,
|
||||
createPost,
|
||||
deletePost,
|
||||
follow,
|
||||
unfollow,
|
||||
initWall,
|
||||
sendMessageNew,
|
||||
sendSpontaneousPayment,
|
||||
createPostNew,
|
||||
setDefaultSeedProvider,
|
||||
|
|
|
|||
|
|
@ -2,427 +2,22 @@
|
|||
* @prettier
|
||||
*/
|
||||
const debounce = require('lodash/debounce')
|
||||
const logger = require('winston')
|
||||
|
||||
const {
|
||||
Constants: { ErrorCode },
|
||||
Schema,
|
||||
Utils: CommonUtils
|
||||
Constants: { ErrorCode }
|
||||
} = require('shock-common')
|
||||
|
||||
const Key = require('../key')
|
||||
const Utils = require('../utils')
|
||||
/**
|
||||
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
|
||||
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
|
||||
* @typedef {import('../SimpleGUN').ISEA} ISEA
|
||||
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
|
||||
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
|
||||
* @typedef {import('shock-common').Schema.Message} Message
|
||||
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
|
||||
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
|
||||
* @typedef {import('shock-common').Schema.Chat} Chat
|
||||
* @typedef {import('shock-common').Schema.ChatMessage} ChatMessage
|
||||
* @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest
|
||||
* @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest
|
||||
*/
|
||||
|
||||
const DEBOUNCE_WAIT_TIME = 500
|
||||
|
||||
/**
|
||||
* @param {(userToIncoming: Record<string, string>) => void} cb
|
||||
* @param {UserGUNNode} user Pass only for testing purposes.
|
||||
* @param {ISEA} SEA
|
||||
* @returns {void}
|
||||
*/
|
||||
const __onUserToIncoming = (cb, user, SEA) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
||||
|
||||
/** @type {Record<string, string>} */
|
||||
const userToIncoming = {}
|
||||
|
||||
const mySecret = require('../../Mediator').getMySecret()
|
||||
|
||||
user
|
||||
.get(Key.USER_TO_INCOMING)
|
||||
.map()
|
||||
.on(async (encryptedIncomingID, userPub) => {
|
||||
if (typeof encryptedIncomingID !== 'string') {
|
||||
if (encryptedIncomingID === null) {
|
||||
// on disconnect
|
||||
delete userToIncoming[userPub]
|
||||
} else {
|
||||
logger.error(
|
||||
'got a non string non null value inside user to incoming'
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if (encryptedIncomingID.length === 0) {
|
||||
logger.error('got an empty string value')
|
||||
return
|
||||
}
|
||||
|
||||
const incomingID = await SEA.decrypt(encryptedIncomingID, mySecret)
|
||||
|
||||
if (typeof incomingID === 'undefined') {
|
||||
logger.warn('could not decrypt incomingID inside __onUserToIncoming')
|
||||
return
|
||||
}
|
||||
|
||||
userToIncoming[userPub] = incomingID
|
||||
|
||||
callb(userToIncoming)
|
||||
})
|
||||
}
|
||||
|
||||
/** @type {Set<(addr: string|null) => void>} */
|
||||
const addressListeners = new Set()
|
||||
|
||||
/** @type {string|null} */
|
||||
let currentAddress = null
|
||||
|
||||
const getHandshakeAddress = () => currentAddress
|
||||
|
||||
/** @param {string|null} addr */
|
||||
const setAddress = addr => {
|
||||
currentAddress = addr
|
||||
addressListeners.forEach(l => l(currentAddress))
|
||||
}
|
||||
|
||||
let addrSubbed = false
|
||||
|
||||
/**
|
||||
* @param {(currentHandshakeAddress: string|null) => void} cb
|
||||
* @param {UserGUNNode} user
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onCurrentHandshakeAddress = (cb, user) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
addressListeners.add(cb)
|
||||
|
||||
cb(currentAddress)
|
||||
|
||||
if (!addrSubbed) {
|
||||
addrSubbed = true
|
||||
|
||||
user.get(Key.CURRENT_HANDSHAKE_ADDRESS).on(addr => {
|
||||
if (typeof addr !== 'string') {
|
||||
logger.error('expected handshake address to be an string')
|
||||
|
||||
setAddress(null)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
setAddress(addr)
|
||||
})
|
||||
}
|
||||
|
||||
return () => {
|
||||
addressListeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {(messages: Record<string, Message>) => void} cb
|
||||
* @param {string} userPK Public key of the user from whom the incoming
|
||||
* messages will be obtained.
|
||||
* @param {string} incomingFeedID ID of the outgoing feed from which the
|
||||
* incoming messages will be obtained.
|
||||
* @param {GUNNode} gun (Pass only for testing purposes)
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @returns {void}
|
||||
*/
|
||||
const onIncomingMessages = (cb, userPK, incomingFeedID, gun, user, SEA) => {
|
||||
if (!user.is) {
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
|
||||
|
||||
const otherUser = gun.user(userPK)
|
||||
|
||||
/**
|
||||
* @type {Record<string, Message>}
|
||||
*/
|
||||
const messages = {}
|
||||
|
||||
callb(messages)
|
||||
|
||||
otherUser
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(incomingFeedID)
|
||||
.get(Key.MESSAGES)
|
||||
.map()
|
||||
.on(async (data, key) => {
|
||||
if (!Schema.isMessage(data)) {
|
||||
logger.warn('non-message received')
|
||||
return
|
||||
}
|
||||
|
||||
/** @type {string} */
|
||||
const recipientEpub = await Utils.pubToEpub(userPK)
|
||||
|
||||
const secret = await SEA.secret(recipientEpub, user._.sea)
|
||||
|
||||
let { body } = data
|
||||
body = await SEA.decrypt(body, secret)
|
||||
|
||||
messages[key] = {
|
||||
body,
|
||||
timestamp: data.timestamp
|
||||
}
|
||||
|
||||
callb(messages)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {Record<string, Outgoing|null>} Outgoings
|
||||
* @typedef {(outgoings: Outgoings) => void} OutgoingsListener
|
||||
*/
|
||||
|
||||
/**
|
||||
* @type {Outgoings}
|
||||
*/
|
||||
let currentOutgoings = {}
|
||||
|
||||
const getCurrentOutgoings = () => currentOutgoings
|
||||
|
||||
/** @type {Set<OutgoingsListener>} */
|
||||
const outgoingsListeners = new Set()
|
||||
|
||||
outgoingsListeners.add(o => {
|
||||
const values = Object.values(o)
|
||||
const nulls = values.filter(x => x === null).length
|
||||
const nonNulls = values.length - nulls
|
||||
|
||||
logger.info(`new outgoings, ${nulls} nulls and ${nonNulls} nonNulls`)
|
||||
})
|
||||
|
||||
const notifyOutgoingsListeners = () => {
|
||||
outgoingsListeners.forEach(l => l(currentOutgoings))
|
||||
}
|
||||
|
||||
let outSubbed = false
|
||||
|
||||
/**
|
||||
* @param {OutgoingsListener} cb
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onOutgoing = cb => {
|
||||
outgoingsListeners.add(cb)
|
||||
cb(currentOutgoings)
|
||||
|
||||
if (!outSubbed) {
|
||||
const user = require('../../Mediator').getUser()
|
||||
user.get(Key.OUTGOINGS).open(
|
||||
debounce(async data => {
|
||||
try {
|
||||
if (typeof data !== 'object' || data === null) {
|
||||
currentOutgoings = {}
|
||||
notifyOutgoingsListeners()
|
||||
return
|
||||
}
|
||||
|
||||
/** @type {Record<string, Outgoing|null>} */
|
||||
const newOuts = {}
|
||||
|
||||
const SEA = require('../../Mediator').mySEA
|
||||
const mySecret = await Utils.mySecret()
|
||||
|
||||
await CommonUtils.asyncForEach(
|
||||
Object.entries(data),
|
||||
async ([id, out]) => {
|
||||
if (typeof out !== 'object') {
|
||||
return
|
||||
}
|
||||
|
||||
if (out === null) {
|
||||
newOuts[id] = null
|
||||
return
|
||||
}
|
||||
|
||||
const { with: encPub, messages } = out
|
||||
|
||||
if (typeof encPub !== 'string') {
|
||||
return
|
||||
}
|
||||
|
||||
const pub = await SEA.decrypt(encPub, mySecret)
|
||||
|
||||
if (!newOuts[id]) {
|
||||
newOuts[id] = {
|
||||
with: pub,
|
||||
messages: {}
|
||||
}
|
||||
}
|
||||
|
||||
const ourSec = await SEA.secret(
|
||||
await Utils.pubToEpub(pub),
|
||||
user._.sea
|
||||
)
|
||||
|
||||
if (typeof messages === 'object' && messages !== null) {
|
||||
await CommonUtils.asyncForEach(
|
||||
Object.entries(messages),
|
||||
async ([mid, msg]) => {
|
||||
if (typeof msg === 'object' && msg !== null) {
|
||||
if (
|
||||
typeof msg.body === 'string' &&
|
||||
typeof msg.timestamp === 'number'
|
||||
) {
|
||||
const newOut = newOuts[id]
|
||||
if (!newOut) {
|
||||
return
|
||||
}
|
||||
newOut.messages[mid] = {
|
||||
body: await SEA.decrypt(msg.body, ourSec),
|
||||
timestamp: msg.timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
currentOutgoings = newOuts
|
||||
notifyOutgoingsListeners()
|
||||
} catch (e) {
|
||||
logger.info('--------------------------')
|
||||
logger.info('Events -> onOutgoing')
|
||||
logger.info(e)
|
||||
logger.info('--------------------------')
|
||||
}
|
||||
}, 400)
|
||||
)
|
||||
|
||||
outSubbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
outgoingsListeners.delete(cb)
|
||||
}
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/**
|
||||
* @typedef {(chats: Chat[]) => void} ChatsListener
|
||||
*/
|
||||
|
||||
/** @type {Chat[]} */
|
||||
let currentChats = []
|
||||
|
||||
const getChats = () => currentChats
|
||||
|
||||
/** @type {Set<ChatsListener>} */
|
||||
const chatsListeners = new Set()
|
||||
|
||||
chatsListeners.add(c => {
|
||||
logger.info(`Chats: ${c.length}`)
|
||||
})
|
||||
|
||||
const notifyChatsListeners = () => {
|
||||
chatsListeners.forEach(l => l(currentChats))
|
||||
}
|
||||
|
||||
const processChats = debounce(() => {
|
||||
const Streams = require('../streams')
|
||||
const currentOutgoings = getCurrentOutgoings()
|
||||
const existingOutgoings = /** @type {[string, Outgoing][]} */ (Object.entries(
|
||||
currentOutgoings
|
||||
).filter(([_, o]) => o !== null))
|
||||
const pubToFeed = Streams.getPubToFeed()
|
||||
|
||||
/** @type {Chat[]} */
|
||||
const newChats = []
|
||||
|
||||
for (const [outID, out] of existingOutgoings) {
|
||||
/** @type {ChatMessage[]} */
|
||||
let msgs = Object.entries(out.messages)
|
||||
.map(([mid, m]) => ({
|
||||
id: mid,
|
||||
outgoing: true,
|
||||
body: m.body,
|
||||
timestamp: m.timestamp
|
||||
}))
|
||||
// filter out null messages
|
||||
.filter(m => typeof m.body === 'string')
|
||||
|
||||
const incoming = pubToFeed[out.with]
|
||||
|
||||
if (Array.isArray(incoming)) {
|
||||
msgs = [...msgs, ...incoming]
|
||||
}
|
||||
|
||||
/** @type {Chat} */
|
||||
const chat = {
|
||||
recipientPublicKey: out.with,
|
||||
didDisconnect: pubToFeed[out.with] === 'disconnected',
|
||||
id: out.with + outID,
|
||||
messages: msgs,
|
||||
recipientAvatar: null,
|
||||
recipientDisplayName: null,
|
||||
lastSeenApp: null
|
||||
}
|
||||
|
||||
newChats.push(chat)
|
||||
}
|
||||
|
||||
currentChats = newChats.filter(
|
||||
c =>
|
||||
Array.isArray(pubToFeed[c.recipientPublicKey]) ||
|
||||
pubToFeed[c.recipientPublicKey] === 'disconnected'
|
||||
)
|
||||
|
||||
notifyChatsListeners()
|
||||
}, 750)
|
||||
|
||||
let onChatsSubbed = false
|
||||
|
||||
/**
|
||||
* Massages all of the more primitive data structures into a more manageable
|
||||
* 'Chat' paradigm.
|
||||
* @param {ChatsListener} cb
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onChats = cb => {
|
||||
if (!chatsListeners.add(cb)) {
|
||||
throw new Error('Tried to subscribe twice')
|
||||
}
|
||||
cb(currentChats)
|
||||
|
||||
if (!onChatsSubbed) {
|
||||
const Streams = require('../streams')
|
||||
onOutgoing(processChats)
|
||||
Streams.onPubToFeed(processChats)
|
||||
onChatsSubbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
if (!chatsListeners.delete(cb)) {
|
||||
throw new Error('Tried to unsubscribe twice')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** @type {string|null} */
|
||||
let currentSeedBackup = null
|
||||
|
||||
/**
|
||||
* @param {(seedBackup: string|null) => void} cb
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @param {import('../SimpleGUN').UserGUNNode} user
|
||||
* @param {import('../SimpleGUN').ISEA} SEA
|
||||
* @throws {Error} If user hasn't been auth.
|
||||
* @returns {void}
|
||||
*/
|
||||
|
|
@ -445,17 +40,5 @@ const onSeedBackup = (cb, user, SEA) => {
|
|||
}
|
||||
|
||||
module.exports = {
|
||||
__onUserToIncoming,
|
||||
onCurrentHandshakeAddress,
|
||||
onIncomingMessages,
|
||||
onOutgoing,
|
||||
getCurrentOutgoings,
|
||||
onSimplerReceivedRequests: require('./onReceivedReqs').onReceivedReqs,
|
||||
onSimplerSentRequests: require('./onSentReqs').onSentReqs,
|
||||
getCurrentSentReqs: require('./onSentReqs').getCurrentSentReqs,
|
||||
getCurrentReceivedReqs: require('./onReceivedReqs').getReceivedReqs,
|
||||
onSeedBackup,
|
||||
onChats,
|
||||
getHandshakeAddress,
|
||||
getChats
|
||||
onSeedBackup
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,138 +0,0 @@
|
|||
/** @format */
|
||||
const debounce = require('lodash/debounce')
|
||||
const logger = require('winston')
|
||||
const { Schema } = require('shock-common')
|
||||
const size = require('lodash/size')
|
||||
|
||||
const Key = require('../key')
|
||||
const Streams = require('../streams')
|
||||
|
||||
/**
|
||||
* @typedef {Readonly<import('shock-common').Schema.SimpleReceivedRequest>} SimpleReceivedRequest
|
||||
* @typedef {(reqs: ReadonlyArray<SimpleReceivedRequest>) => void} Listener
|
||||
*/
|
||||
|
||||
/** @type {Set<Listener>} */
|
||||
const listeners = new Set()
|
||||
|
||||
/** @type {string|null} */
|
||||
let currentAddress = null
|
||||
|
||||
/** @type {Record<string, SimpleReceivedRequest>} */
|
||||
let currReceivedReqsMap = {}
|
||||
|
||||
/**
|
||||
* Unprocessed requests in current handshake node.
|
||||
* @type {Record<string, import('shock-common').Schema.HandshakeRequest>}
|
||||
*/
|
||||
let currAddressData = {}
|
||||
|
||||
/** @returns {SimpleReceivedRequest[]} */
|
||||
const getReceivedReqs = () => Object.values(currReceivedReqsMap)
|
||||
/** @param {Record<string, SimpleReceivedRequest>} reqs */
|
||||
const setReceivedReqsMap = reqs => {
|
||||
currReceivedReqsMap = reqs
|
||||
listeners.forEach(l => l(getReceivedReqs()))
|
||||
}
|
||||
|
||||
listeners.add(() => {
|
||||
logger.info(`new received reqs: ${size(getReceivedReqs())}`)
|
||||
})
|
||||
|
||||
const react = debounce(() => {
|
||||
/** @type {Record<string, SimpleReceivedRequest>} */
|
||||
const newReceivedReqsMap = {}
|
||||
|
||||
const pubToFeed = Streams.getPubToFeed()
|
||||
|
||||
for (const [id, req] of Object.entries(currAddressData)) {
|
||||
const inContact = Array.isArray(pubToFeed[req.from])
|
||||
const isDisconnected = pubToFeed[req.from] === 'disconnected'
|
||||
|
||||
if (!inContact && !isDisconnected) {
|
||||
newReceivedReqsMap[req.from] = {
|
||||
id,
|
||||
requestorAvatar: null,
|
||||
requestorDisplayName: null,
|
||||
requestorPK: req.from,
|
||||
timestamp: req.timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setReceivedReqsMap(newReceivedReqsMap)
|
||||
}, 750)
|
||||
|
||||
/**
|
||||
* @param {string} addr
|
||||
* @returns {(data: import('../SimpleGUN').OpenListenerData) => void}
|
||||
*/
|
||||
const listenerForAddr = addr => data => {
|
||||
// did invalidate
|
||||
if (addr !== currentAddress) {
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data !== 'object' || data === null) {
|
||||
currAddressData = {}
|
||||
} else {
|
||||
for (const [id, req] of Object.entries(data)) {
|
||||
// no need to update them just write them once
|
||||
if (Schema.isHandshakeRequest(req) && !currAddressData[id]) {
|
||||
currAddressData[id] = req
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('data for address length: ' + size(currAddressData))
|
||||
|
||||
react()
|
||||
}
|
||||
|
||||
let subbed = false
|
||||
|
||||
/**
|
||||
* @param {Listener} cb
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onReceivedReqs = cb => {
|
||||
listeners.add(cb)
|
||||
cb(getReceivedReqs())
|
||||
|
||||
if (!subbed) {
|
||||
const user = require('../../Mediator').getUser()
|
||||
if (!user.is) {
|
||||
logger.warn('Tried subscribing to onReceivedReqs without authing')
|
||||
}
|
||||
require('./index').onCurrentHandshakeAddress(addr => {
|
||||
if (currentAddress === addr) {
|
||||
return
|
||||
}
|
||||
|
||||
currentAddress = addr
|
||||
currAddressData = {}
|
||||
setReceivedReqsMap({})
|
||||
|
||||
if (typeof addr === 'string') {
|
||||
require('../../Mediator')
|
||||
.getGun()
|
||||
.get(Key.HANDSHAKE_NODES)
|
||||
.get(addr)
|
||||
.open(listenerForAddr(addr))
|
||||
}
|
||||
}, user)
|
||||
|
||||
Streams.onPubToFeed(react)
|
||||
|
||||
subbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
listeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getReceivedReqs,
|
||||
onReceivedReqs
|
||||
}
|
||||
|
|
@ -1,128 +0,0 @@
|
|||
/** @format */
|
||||
const debounce = require('lodash/debounce')
|
||||
const logger = require('winston')
|
||||
const size = require('lodash/size')
|
||||
|
||||
const Streams = require('../streams')
|
||||
/**
|
||||
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
|
||||
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
|
||||
* @typedef {import('../SimpleGUN').ISEA} ISEA
|
||||
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
|
||||
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
|
||||
* @typedef {import('shock-common').Schema.Message} Message
|
||||
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
|
||||
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
|
||||
* @typedef {import('shock-common').Schema.Chat} Chat
|
||||
* @typedef {import('shock-common').Schema.ChatMessage} ChatMessage
|
||||
* @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest
|
||||
* @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {(chats: SimpleSentRequest[]) => void} Listener
|
||||
*/
|
||||
|
||||
/** @type {Set<Listener>} */
|
||||
const listeners = new Set()
|
||||
|
||||
/** @type {SimpleSentRequest[]} */
|
||||
let currentReqs = []
|
||||
|
||||
listeners.add(() => {
|
||||
logger.info(`new sent reqs length: ${size(currentReqs)}`)
|
||||
})
|
||||
|
||||
const getCurrentSentReqs = () => currentReqs
|
||||
|
||||
// any time any of the streams we use notifies us that it changed, we fire up
|
||||
// react()
|
||||
const react = debounce(() => {
|
||||
/** @type {SimpleSentRequest[]} */
|
||||
const newReqs = []
|
||||
|
||||
// reactive streams
|
||||
// maps a pk to its current handshake address
|
||||
const pubToHAddr = Streams.getAddresses()
|
||||
// a set or list containing copies of sent requests
|
||||
const storedReqs = Streams.getStoredReqs()
|
||||
// maps a pk to the last request sent to it (so old stored reqs are invalidated)
|
||||
const pubToLastSentReqID = Streams.getSentReqIDs()
|
||||
// maps a pk to a feed, messages if subbed and pk is pubbing, null /
|
||||
// 'disconnected' otherwise
|
||||
const pubToFeed = Streams.getPubToFeed()
|
||||
|
||||
logger.info(`pubToLastSentREqID length: ${size(pubToLastSentReqID)}`)
|
||||
|
||||
for (const storedReq of storedReqs) {
|
||||
const { handshakeAddress, recipientPub, sentReqID, timestamp } = storedReq
|
||||
const currAddress = pubToHAddr[recipientPub]
|
||||
|
||||
const lastReqID = pubToLastSentReqID[recipientPub]
|
||||
// invalidate if this stored request is not the last one sent to this
|
||||
// particular pk
|
||||
const isStale = typeof lastReqID !== 'undefined' && lastReqID !== sentReqID
|
||||
// invalidate if we are in a pub/sub state to this pk (handshake in place)
|
||||
const isConnected = Array.isArray(pubToFeed[recipientPub])
|
||||
|
||||
if (isStale || isConnected) {
|
||||
// eslint-disable-next-line no-continue
|
||||
continue
|
||||
}
|
||||
|
||||
// no address for this pk? let's ask the corresponding stream to sub to
|
||||
// gun.user(pk).get('currentAddr')
|
||||
if (typeof currAddress === 'undefined') {
|
||||
// eslint-disable-next-line no-empty-function
|
||||
Streams.onAddresses(() => {}, recipientPub)()
|
||||
}
|
||||
|
||||
newReqs.push({
|
||||
id: sentReqID,
|
||||
recipientAvatar: null,
|
||||
recipientChangedRequestAddress:
|
||||
// if we haven't received the other's user current handshake address,
|
||||
// let's assume he hasn't changed it and that this request is still
|
||||
// valid
|
||||
typeof currAddress !== 'undefined' && handshakeAddress !== currAddress,
|
||||
recipientDisplayName: null,
|
||||
recipientPublicKey: recipientPub,
|
||||
timestamp
|
||||
})
|
||||
}
|
||||
|
||||
currentReqs = newReqs
|
||||
|
||||
listeners.forEach(l => l(currentReqs))
|
||||
}, 750)
|
||||
|
||||
let subbed = false
|
||||
|
||||
/**
|
||||
* Massages all of the more primitive data structures into a more manageable
|
||||
* 'Chat' paradigm.
|
||||
* @param {Listener} cb
|
||||
* @returns {() => void}
|
||||
*/
|
||||
const onSentReqs = cb => {
|
||||
listeners.add(cb)
|
||||
cb(currentReqs)
|
||||
|
||||
if (!subbed) {
|
||||
Streams.onAddresses(react)
|
||||
Streams.onStoredReqs(react)
|
||||
Streams.onLastSentReqIDs(react)
|
||||
Streams.onPubToFeed(react)
|
||||
|
||||
subbed = true
|
||||
}
|
||||
|
||||
return () => {
|
||||
listeners.delete(cb)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
onSentReqs,
|
||||
getCurrentSentReqs
|
||||
}
|
||||
|
|
@ -9,12 +9,10 @@
|
|||
* tasks accept factories that are homonymous to the events on this same module.
|
||||
*/
|
||||
|
||||
const onAcceptedRequests = require('./onAcceptedRequests')
|
||||
const onOrders = require('./onOrders')
|
||||
const lastSeenNode = require('./lastSeenNode')
|
||||
|
||||
module.exports = {
|
||||
onAcceptedRequests,
|
||||
onOrders,
|
||||
lastSeenNode
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,191 +0,0 @@
|
|||
/**
|
||||
* @format
|
||||
*/
|
||||
const logger = require('winston')
|
||||
const {
|
||||
Constants: { ErrorCode },
|
||||
Schema
|
||||
} = require('shock-common')
|
||||
const size = require('lodash/size')
|
||||
|
||||
const Key = require('../key')
|
||||
const Utils = require('../utils')
|
||||
|
||||
/**
|
||||
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
|
||||
* @typedef {import('../SimpleGUN').ISEA} ISEA
|
||||
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
|
||||
*/
|
||||
|
||||
let procid = 0
|
||||
|
||||
/**
|
||||
* @throws {Error} NOT_AUTH
|
||||
* @param {UserGUNNode} user
|
||||
* @param {ISEA} SEA
|
||||
* @returns {void}
|
||||
*/
|
||||
const onAcceptedRequests = (user, SEA) => {
|
||||
if (!user.is) {
|
||||
logger.warn('onAcceptedRequests() -> tried to sub without authing')
|
||||
throw new Error(ErrorCode.NOT_AUTH)
|
||||
}
|
||||
|
||||
procid++
|
||||
|
||||
user
|
||||
.get(Key.STORED_REQS)
|
||||
.map()
|
||||
.on(async (storedReq, id) => {
|
||||
logger.info(
|
||||
`------------------------------------\nPROCID:${procid} (used for debugging memory leaks in jobs)\n---------------------------------------`
|
||||
)
|
||||
|
||||
const mySecret = require('../../Mediator').getMySecret()
|
||||
|
||||
try {
|
||||
if (!Schema.isStoredRequest(storedReq)) {
|
||||
throw new Error(
|
||||
'Stored request not an StoredRequest, instead got: ' +
|
||||
JSON.stringify(storedReq) +
|
||||
' this can be due to nulling out an old request (if null) or something else happened (please look at the output)'
|
||||
)
|
||||
}
|
||||
// get the recipient pub from the stored request to avoid an attacker
|
||||
// overwriting the handshake request in the root graph
|
||||
const recipientPub = await SEA.decrypt(storedReq.recipientPub, mySecret)
|
||||
|
||||
if (typeof recipientPub !== 'string') {
|
||||
throw new TypeError(
|
||||
`Expected storedReq.recipientPub to be an string, instead got: ${recipientPub}`
|
||||
)
|
||||
}
|
||||
|
||||
if (await Utils.successfulHandshakeAlreadyExists(recipientPub)) {
|
||||
return
|
||||
}
|
||||
|
||||
const requestAddress = await SEA.decrypt(
|
||||
storedReq.handshakeAddress,
|
||||
mySecret
|
||||
)
|
||||
if (typeof requestAddress !== 'string') {
|
||||
throw new TypeError()
|
||||
}
|
||||
const sentReqID = await SEA.decrypt(storedReq.sentReqID, mySecret)
|
||||
if (typeof sentReqID !== 'string') {
|
||||
throw new TypeError()
|
||||
}
|
||||
|
||||
const latestReqSentID = await Utils.recipientPubToLastReqSentID(
|
||||
recipientPub
|
||||
)
|
||||
|
||||
const isStaleRequest = latestReqSentID !== sentReqID
|
||||
if (isStaleRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
const gun = require('../../Mediator').getGun()
|
||||
const user = require('../../Mediator').getUser()
|
||||
|
||||
const recipientEpub = await Utils.pubToEpub(recipientPub)
|
||||
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
|
||||
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
gun
|
||||
.get(Key.HANDSHAKE_NODES)
|
||||
.get(requestAddress)
|
||||
.get(sentReqID)
|
||||
.on(async sentReq => {
|
||||
if (!Schema.isHandshakeRequest(sentReq)) {
|
||||
rej(
|
||||
new Error(
|
||||
'sent request found in handshake node not a handshake request'
|
||||
)
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// The response can be decrypted with the same secret regardless
|
||||
// of who wrote to it last (see HandshakeRequest definition). This
|
||||
// could be our feed ID for the recipient, or the recipient's feed
|
||||
// id if he accepted the request.
|
||||
const feedID = await SEA.decrypt(sentReq.response, ourSecret)
|
||||
|
||||
if (typeof feedID !== 'string') {
|
||||
throw new TypeError("typeof feedID !== 'string'")
|
||||
}
|
||||
|
||||
logger.info(`onAcceptedRequests -> decrypted feed ID: ${feedID}`)
|
||||
|
||||
logger.info(
|
||||
'Will now try to access the other users outgoing feed'
|
||||
)
|
||||
|
||||
const maybeFeedOnRecipientsOutgoings = await Utils.tryAndWait(
|
||||
gun =>
|
||||
new Promise(res => {
|
||||
gun
|
||||
.user(recipientPub)
|
||||
.get(Key.OUTGOINGS)
|
||||
.get(feedID)
|
||||
.once(feed => {
|
||||
res(feed)
|
||||
})
|
||||
}),
|
||||
// @ts-ignore
|
||||
v => size(v) === 0
|
||||
)
|
||||
|
||||
const feedIDExistsOnRecipientsOutgoings =
|
||||
typeof maybeFeedOnRecipientsOutgoings === 'object' &&
|
||||
maybeFeedOnRecipientsOutgoings !== null
|
||||
|
||||
if (!feedIDExistsOnRecipientsOutgoings) {
|
||||
return
|
||||
}
|
||||
|
||||
const encryptedForMeIncomingID = await SEA.encrypt(
|
||||
feedID,
|
||||
mySecret
|
||||
)
|
||||
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.USER_TO_INCOMING)
|
||||
.get(recipientPub)
|
||||
.put(encryptedForMeIncomingID, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
|
||||
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
|
||||
user
|
||||
.get(Key.STORED_REQS)
|
||||
.get(id)
|
||||
.put(null, ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
})
|
||||
}))
|
||||
|
||||
// ensure this listeners gets called at least once
|
||||
res()
|
||||
})
|
||||
}))
|
||||
} catch (err) {
|
||||
logger.warn(`Jobs.onAcceptedRequests() -> ${err.message}`)
|
||||
logger.error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = onAcceptedRequests
|
||||
|
|
@ -27,12 +27,15 @@ const PATH_SEPARATOR = '>'
|
|||
/**
|
||||
* @param {ValidDataValue} value
|
||||
* @param {string} publicKey
|
||||
* @param {string=} epubForDecryption
|
||||
* @returns {Promise<ValidDataValue>}
|
||||
*/
|
||||
const deepDecryptIfNeeded = async (value, publicKey) => {
|
||||
const deepDecryptIfNeeded = async (value, publicKey, epubForDecryption) => {
|
||||
if (Schema.isObj(value)) {
|
||||
return Bluebird.props(
|
||||
mapValues(value, o => deepDecryptIfNeeded(o, publicKey))
|
||||
mapValues(value, o =>
|
||||
deepDecryptIfNeeded(o, publicKey, epubForDecryption)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -49,7 +52,15 @@ const deepDecryptIfNeeded = async (value, publicKey) => {
|
|||
if (user.is.pub === publicKey || 'me' === publicKey) {
|
||||
sec = getMySecret()
|
||||
} else {
|
||||
sec = await SEA.secret(await pubToEpub(publicKey), user._.sea)
|
||||
sec = await SEA.secret(
|
||||
await (() => {
|
||||
if (epubForDecryption) {
|
||||
return epubForDecryption
|
||||
}
|
||||
return pubToEpub(publicKey)
|
||||
})(),
|
||||
user._.sea
|
||||
)
|
||||
}
|
||||
|
||||
const decrypted = SEA.decrypt(value, sec)
|
||||
|
|
@ -81,6 +92,7 @@ async function deepEncryptIfNeeded(value) {
|
|||
}
|
||||
|
||||
const pk = /** @type {string|undefined} */ (value.$$__ENCRYPT__FOR)
|
||||
const epub = /** @type {string|undefined} */ (value.$$__EPUB__FOR)
|
||||
|
||||
if (!pk) {
|
||||
return Bluebird.props(mapValues(value, deepEncryptIfNeeded))
|
||||
|
|
@ -93,7 +105,15 @@ async function deepEncryptIfNeeded(value) {
|
|||
if (pk === u.is.pub || pk === 'me') {
|
||||
encryptedValue = await SEA.encrypt(actualValue, getMySecret())
|
||||
} else {
|
||||
const sec = await SEA.secret(await pubToEpub(pk), u._.sea)
|
||||
const sec = await SEA.secret(
|
||||
await (() => {
|
||||
if (epub) {
|
||||
return epub
|
||||
}
|
||||
return pubToEpub(pk)
|
||||
})(),
|
||||
u._.sea
|
||||
)
|
||||
|
||||
encryptedValue = await SEA.encrypt(actualValue, sec)
|
||||
}
|
||||
|
|
@ -187,7 +207,13 @@ const put = async (rawPath, value) => {
|
|||
await makePromise((res, rej) => {
|
||||
node.put(/** @type {ValidDataValue} */ (theValue), ack => {
|
||||
if (ack.err && typeof ack.err !== 'number') {
|
||||
if (typeof ack.err === 'string') {
|
||||
rej(new Error(ack.err))
|
||||
} else {
|
||||
console.log(`NON STANDARD GUN ERROR:`)
|
||||
console.log(ack)
|
||||
rej(new Error(JSON.stringify(ack.err, null, 4)))
|
||||
}
|
||||
} else {
|
||||
res()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,14 +9,12 @@ const uuidv4 = require('uuid/v4')
|
|||
const { getGun, getUser, isAuthenticated } = require('../Mediator')
|
||||
const { deepDecryptIfNeeded } = require('../rpc')
|
||||
const Subscriptions = require('./subscriptions')
|
||||
const GunEvents = require('../contact-api/events')
|
||||
const GunActions = require('../../gunDB/contact-api/actions')
|
||||
const {
|
||||
encryptedEmit,
|
||||
encryptedOn,
|
||||
encryptedCallback
|
||||
} = require('../../../utils/ECC/socket')
|
||||
const auth = require('../../auth/auth')
|
||||
|
||||
const ALLOWED_GUN_METHODS = [
|
||||
'map',
|
||||
|
|
@ -37,28 +35,6 @@ const ALLOWED_GUN_METHODS = [
|
|||
* @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {string} token
|
||||
* @returns {Promise<boolean>}
|
||||
*/
|
||||
const isValidToken = async token => {
|
||||
const validation = await auth.validateToken(token)
|
||||
|
||||
if (typeof validation !== 'object') {
|
||||
return false
|
||||
}
|
||||
|
||||
if (validation === null) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (typeof validation.valid !== 'boolean') {
|
||||
return false
|
||||
}
|
||||
|
||||
return validation.valid
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} root
|
||||
*/
|
||||
|
|
@ -121,13 +97,18 @@ const executeGunQuery = (query, method, listener) => {
|
|||
* @param {string} queryData.publicKeyForDecryption
|
||||
* @param {string} queryData.subscriptionId
|
||||
* @param {string} queryData.deviceId
|
||||
* @param {string=} queryData.epubForDecryption
|
||||
* @param {string=} queryData.epubField If the epub is included in the received
|
||||
* data itself. Handshake requests for example, have an epub field.
|
||||
* @returns {GunListener}
|
||||
*/
|
||||
const queryListenerCallback = ({
|
||||
emit,
|
||||
publicKeyForDecryption,
|
||||
subscriptionId,
|
||||
deviceId
|
||||
deviceId,
|
||||
epubForDecryption,
|
||||
epubField
|
||||
}) => async (data, key, _msg, event) => {
|
||||
try {
|
||||
const subscription = Subscriptions.get({
|
||||
|
|
@ -142,8 +123,38 @@ const queryListenerCallback = ({
|
|||
})
|
||||
}
|
||||
const eventName = `query:data`
|
||||
if (publicKeyForDecryption) {
|
||||
const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption)
|
||||
if (publicKeyForDecryption?.length > 15 || epubForDecryption || epubField) {
|
||||
const decData = await deepDecryptIfNeeded(
|
||||
data,
|
||||
publicKeyForDecryption,
|
||||
(() => {
|
||||
if (epubField) {
|
||||
if (Common.isObj(data)) {
|
||||
const epub = data[epubField]
|
||||
if (Common.isPopulatedString(epub)) {
|
||||
return epub
|
||||
}
|
||||
|
||||
logger.error(
|
||||
`Got epubField in a rifle query, but the resulting value obtained is not an string -> `,
|
||||
{
|
||||
data,
|
||||
epub
|
||||
}
|
||||
)
|
||||
} else {
|
||||
logger.warn(
|
||||
`Got epubField in a rifle query for a non-object data -> `,
|
||||
{
|
||||
epubField,
|
||||
data
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
return epubForDecryption
|
||||
})()
|
||||
)
|
||||
emit(eventName, { subscriptionId, response: { data: decData, key } })
|
||||
return
|
||||
}
|
||||
|
|
@ -154,84 +165,6 @@ const queryListenerCallback = ({
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Object} GunSocketOptions
|
||||
* @param {() => (import('./subscriptions').Unsubscribe | void)} GunSocketOptions.handler
|
||||
* @param {string} GunSocketOptions.subscriptionId
|
||||
* @param {string} GunSocketOptions.encryptionId
|
||||
* @param {import('socket.io').Socket} GunSocketOptions.socket
|
||||
* @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise<void>}
|
||||
*/
|
||||
const wrap = ({ handler, subscriptionId, encryptionId, socket }) => async (
|
||||
{ reconnect, token },
|
||||
response
|
||||
) => {
|
||||
try {
|
||||
logger.info('Subscribe function executing...')
|
||||
if (!isAuthenticated()) {
|
||||
logger.warn('GunDB is not yet authenticated')
|
||||
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
|
||||
return
|
||||
}
|
||||
|
||||
const callback = encryptedCallback(socket, response)
|
||||
const emit = encryptedEmit(socket)
|
||||
const subscription = Subscriptions.get({
|
||||
deviceId: encryptionId,
|
||||
subscriptionId
|
||||
})
|
||||
|
||||
if (subscription && !reconnect) {
|
||||
const error = {
|
||||
field: 'subscription',
|
||||
message:
|
||||
"You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true "
|
||||
}
|
||||
logger.error('Duplicate subscription:', error)
|
||||
callback(error)
|
||||
return
|
||||
}
|
||||
|
||||
if (subscription && reconnect) {
|
||||
if (subscription.unsubscribe) {
|
||||
subscription.unsubscribe()
|
||||
}
|
||||
|
||||
Subscriptions.remove({
|
||||
deviceId: encryptionId,
|
||||
subscriptionId
|
||||
})
|
||||
}
|
||||
|
||||
if (!subscription || reconnect) {
|
||||
const isAuth = await isValidToken(token)
|
||||
|
||||
if (!isAuth) {
|
||||
logger.warn('invalid token specified')
|
||||
emit(Common.Constants.ErrorCode.NOT_AUTH)
|
||||
return
|
||||
}
|
||||
|
||||
const unsubscribe = handler()
|
||||
|
||||
if (unsubscribe) {
|
||||
Subscriptions.attachUnsubscribe({
|
||||
deviceId: encryptionId,
|
||||
subscriptionId,
|
||||
unsubscribe
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
callback(null, {
|
||||
message: 'Subscribed successfully!',
|
||||
success: true
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Socket wrapper error:', error)
|
||||
}
|
||||
}
|
||||
|
||||
/** @param {import('socket.io').Socket} socket */
|
||||
const startSocket = socket => {
|
||||
try {
|
||||
|
|
@ -252,7 +185,8 @@ const startSocket = socket => {
|
|||
})
|
||||
}
|
||||
|
||||
on('subscribe:query', ({ $shock, publicKey }, response) => {
|
||||
on('subscribe:query', (query, response) => {
|
||||
const { $shock, publicKey, epubForDecryption, epubField } = query
|
||||
const subscriptionId = uuidv4()
|
||||
try {
|
||||
if (!isAuthenticated()) {
|
||||
|
|
@ -279,7 +213,9 @@ const startSocket = socket => {
|
|||
emit,
|
||||
publicKeyForDecryption: publicKey,
|
||||
subscriptionId,
|
||||
deviceId: encryptionId
|
||||
deviceId: encryptionId,
|
||||
epubForDecryption,
|
||||
epubField
|
||||
})
|
||||
|
||||
socketCallback(null, {
|
||||
|
|
@ -295,112 +231,6 @@ const startSocket = socket => {
|
|||
}
|
||||
})
|
||||
|
||||
const onChats = () => {
|
||||
return GunEvents.onChats(chats => {
|
||||
const processed = chats.map(
|
||||
({
|
||||
didDisconnect,
|
||||
id,
|
||||
lastSeenApp,
|
||||
messages,
|
||||
recipientPublicKey
|
||||
}) => {
|
||||
/** @type {Common.Schema.Chat} */
|
||||
const stripped = {
|
||||
didDisconnect,
|
||||
id,
|
||||
lastSeenApp,
|
||||
messages,
|
||||
recipientAvatar: null,
|
||||
recipientDisplayName: null,
|
||||
recipientPublicKey
|
||||
}
|
||||
|
||||
return stripped
|
||||
}
|
||||
)
|
||||
|
||||
emit('chats', processed)
|
||||
})
|
||||
}
|
||||
|
||||
on(
|
||||
'subscribe:chats',
|
||||
wrap({
|
||||
handler: onChats,
|
||||
encryptionId,
|
||||
subscriptionId: 'chats',
|
||||
socket
|
||||
})
|
||||
)
|
||||
|
||||
const onSentRequests = () => {
|
||||
return GunEvents.onSimplerSentRequests(sentReqs => {
|
||||
const processed = sentReqs.map(
|
||||
({
|
||||
id,
|
||||
recipientChangedRequestAddress,
|
||||
recipientPublicKey,
|
||||
timestamp
|
||||
}) => {
|
||||
/**
|
||||
* @type {Common.Schema.SimpleSentRequest}
|
||||
*/
|
||||
const stripped = {
|
||||
id,
|
||||
recipientAvatar: null,
|
||||
recipientChangedRequestAddress,
|
||||
recipientDisplayName: null,
|
||||
recipientPublicKey,
|
||||
timestamp
|
||||
}
|
||||
|
||||
return stripped
|
||||
}
|
||||
)
|
||||
emit('sentRequests', processed)
|
||||
})
|
||||
}
|
||||
|
||||
on(
|
||||
'subscribe:sentRequests',
|
||||
wrap({
|
||||
handler: onSentRequests,
|
||||
encryptionId,
|
||||
subscriptionId: 'sentRequests',
|
||||
socket
|
||||
})
|
||||
)
|
||||
|
||||
const onReceivedRequests = () => {
|
||||
return GunEvents.onSimplerReceivedRequests(receivedReqs => {
|
||||
const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => {
|
||||
/** @type {Common.Schema.SimpleReceivedRequest} */
|
||||
const stripped = {
|
||||
id,
|
||||
requestorAvatar: null,
|
||||
requestorDisplayName: null,
|
||||
requestorPK,
|
||||
timestamp
|
||||
}
|
||||
|
||||
return stripped
|
||||
})
|
||||
|
||||
emit('receivedRequests', processed)
|
||||
})
|
||||
}
|
||||
|
||||
on(
|
||||
'subscribe:receivedRequests',
|
||||
wrap({
|
||||
handler: onReceivedRequests,
|
||||
encryptionId,
|
||||
subscriptionId: 'receivedRequests',
|
||||
socket
|
||||
})
|
||||
)
|
||||
|
||||
on('unsubscribe', ({ subscriptionId }, response) => {
|
||||
const callback = encryptedCallback(socket, response)
|
||||
Subscriptions.remove({ deviceId: encryptionId, subscriptionId })
|
||||
|
|
|
|||
|
|
@ -3096,6 +3096,7 @@ module.exports = async (
|
|||
* @prop {string} path
|
||||
* @prop {string=} publicKey
|
||||
* @prop {string=} publicKeyForDecryption
|
||||
* @prop {string=} epubForDecryption
|
||||
*/
|
||||
/**
|
||||
* @param {HandleGunFetchParams} args0
|
||||
|
|
@ -3106,7 +3107,8 @@ module.exports = async (
|
|||
startFromUserGraph,
|
||||
path,
|
||||
publicKey,
|
||||
publicKeyForDecryption
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
}) => {
|
||||
const keys = path.split('>')
|
||||
const { tryAndWait } = require('../services/gunDB/contact-api/utils')
|
||||
|
|
@ -3125,7 +3127,8 @@ module.exports = async (
|
|||
res(
|
||||
await GunWriteRPC.deepDecryptIfNeeded(
|
||||
data,
|
||||
publicKeyForDecryption
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
)
|
||||
)
|
||||
} else {
|
||||
|
|
@ -3143,118 +3146,79 @@ module.exports = async (
|
|||
* Used decryption of incoming data.
|
||||
*/
|
||||
const PUBKEY_FOR_DECRYPT_HEADER = 'public-key-for-decryption'
|
||||
/**
|
||||
* Used decryption of incoming data.
|
||||
*/
|
||||
const EPUB_FOR_DECRYPT_HEADER = 'epub-for-decryption'
|
||||
|
||||
ap.get('/api/gun/once/:path', async (req, res) => {
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path } = req.params
|
||||
logger.info(`gun ONCE: ${path}`)
|
||||
try {
|
||||
const data = await handleGunFetch({
|
||||
res.status(200).json({
|
||||
data: await handleGunFetch({
|
||||
path,
|
||||
startFromUserGraph: false,
|
||||
type: 'once',
|
||||
publicKeyForDecryption
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
})
|
||||
} catch (err) {
|
||||
logger.error('error in rpc once')
|
||||
logger.error(err)
|
||||
res
|
||||
.status(
|
||||
err.message === Common.Constants.ErrorCode.NOT_AUTH ? 401 : 500
|
||||
)
|
||||
.json({
|
||||
errorMessage: err.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
ap.get('/api/gun/load/:path', async (req, res) => {
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path } = req.params
|
||||
logger.info(`gun LOAD: ${path}`)
|
||||
try {
|
||||
const data = await handleGunFetch({
|
||||
res.status(200).json({
|
||||
data: await handleGunFetch({
|
||||
path,
|
||||
startFromUserGraph: false,
|
||||
type: 'load',
|
||||
publicKeyForDecryption
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
})
|
||||
} catch (err) {
|
||||
logger.error('error in rpc load')
|
||||
logger.error(err)
|
||||
res
|
||||
.status(
|
||||
err.message === Common.Constants.ErrorCode.NOT_AUTH ? 401 : 500
|
||||
)
|
||||
.json({
|
||||
errorMessage: err.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
ap.get('/api/gun/user/once/:path', async (req, res) => {
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path } = req.params
|
||||
logger.info(`gun otheruser ONCE: ${path}`)
|
||||
try {
|
||||
const data = await handleGunFetch({
|
||||
res.status(200).json({
|
||||
data: await handleGunFetch({
|
||||
path,
|
||||
startFromUserGraph: true,
|
||||
type: 'once',
|
||||
publicKeyForDecryption
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
})
|
||||
} catch (err) {
|
||||
logger.error('error in rpc once user')
|
||||
logger.error(err)
|
||||
res
|
||||
.status(
|
||||
err.message === Common.Constants.ErrorCode.NOT_AUTH ? 401 : 500
|
||||
)
|
||||
.json({
|
||||
errorMessage: err.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
ap.get('/api/gun/user/load/:path', async (req, res) => {
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path } = req.params
|
||||
logger.info(`gun otheruser LOAD: ${path}`)
|
||||
try {
|
||||
const data = await handleGunFetch({
|
||||
res.status(200).json({
|
||||
data: await handleGunFetch({
|
||||
path,
|
||||
startFromUserGraph: true,
|
||||
type: 'load',
|
||||
publicKeyForDecryption
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
})
|
||||
} catch (err) {
|
||||
logger.error('error in rpc load user')
|
||||
logger.error(err)
|
||||
res
|
||||
.status(
|
||||
err.message === Common.Constants.ErrorCode.NOT_AUTH ? 401 : 500
|
||||
)
|
||||
.json({
|
||||
errorMessage: err.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
ap.get('/api/gun/otheruser/:publicKey/:type/:path', async (req, res) => {
|
||||
const allowedTypes = ['once', 'load', 'open']
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path /*:rawPath*/, publicKey, type } = req.params
|
||||
logger.info(`gun otheruser ${type}: ${path}`)
|
||||
// const path = decodeURI(rawPath)
|
||||
|
|
@ -3278,7 +3242,8 @@ module.exports = async (
|
|||
startFromUserGraph: false,
|
||||
type,
|
||||
publicKey,
|
||||
publicKeyForDecryption
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
})
|
||||
})
|
||||
} catch (err) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue