Merge branch 'master' into bug/bad-mac-logs

This commit is contained in:
emad-salah 2021-07-22 15:26:39 +01:00
commit 77ac1ce8f4
32 changed files with 2888 additions and 5682 deletions

View file

@ -16,3 +16,5 @@ LOCAL_TUNNEL_SERVER=https://tunnel.rip
TORRENT_SEED_URL=https://webtorrent.shock.network
# Admin token for your own seed server
TORRENT_SEED_TOKEN=jibberish
# "default" or "hosting"
DEPLOYMENT_TYPE=hosting

View file

@ -4,9 +4,5 @@
"debug.node.autoAttach": "on",
"editor.formatOnSave": true,
"editor.defaultFormatter": "esbenp.prettier-vscode",
"cSpell.words": [
"Epub",
"ISEA",
"Reqs"
]
"cSpell.words": ["Epub", "GUNRPC", "ISEA", "PUBKEY", "Reqs", "uuidv"]
}

View file

@ -57,14 +57,19 @@ yarn install
To run ShockAPI in a fully isolated environment you can use the Docker image
provided on the Docker Hub and easily interact with API's CLI interface and flags.
#### Prerequisites
To interact with ShockAPI's Docker image you need an instance of LND running and
also if your configs, LND related files and certificates are located on a local file system you'll need to mount **Docker Volumes** pointed to them while starting the container.
Example of listing available configuration flags:
```
docker run --rm shockwallet/api:latest --help
```
Example of running an local instance:
Example of running an local instance with mounted volumes:
```
docker run shockwallet/api:latest -h 0.0.0.0 -c
docker run -v /home/$USER/.lnd:/root/.lnd --network host shockwallet/api:latest
```
<!---
### Docker for Raspberry Pi

View file

@ -1,58 +1,58 @@
// config/log.js
/** @prettier */
const winston = require("winston");
const util = require("util")
require("winston-daily-rotate-file");
const { createLogger, transports, format } = require('winston')
const util = require('util')
require('winston-daily-rotate-file')
const winstonAttached = new Map();
const transform = (info) => {
const args = info[Symbol.for('splat')];
// @ts-ignore
const transform = info => {
const args = info[Symbol.for('splat')]
if (args) {
return {...info, message: util.format(info.message, ...args)};
return { ...info, message: util.format(info.message, ...args) }
}
return info;
return info
}
const logFormatter = () => ({ transform })
/**
* @param {string} logFileName
* @param {string} logLevel
* @returns {import("winston").Logger}
*/
module.exports = (logFileName, logLevel) => {
if (!winstonAttached.has(logFileName)) {
winston.add(new (winston.transports.DailyRotateFile)({
filename: logFileName,
datePattern: "yyyy-MM-DD",
const formatter = format.combine(
format.colorize(),
format.errors({ stack: true }),
logFormatter(),
format.prettyPrint(),
format.timestamp(),
format.simple(),
format.align(),
format.printf(info => {
const { timestamp, level, message, stack, exception } = info
const ts = timestamp.slice(0, 19).replace('T', ' ')
const isObject = typeof message === 'object'
const formattedJson = isObject ? JSON.stringify(message, null, 2) : message
const formattedException = exception ? exception.stack : ''
const errorMessage = stack || formattedException
const formattedMessage = errorMessage ? errorMessage : formattedJson
return `${ts} [${level}]: ${formattedMessage}`
})
)
const Logger = createLogger({
format: formatter,
transports: [
new transports.DailyRotateFile({
filename: 'shockapi.log',
datePattern: 'yyyy-MM-DD',
// https://github.com/winstonjs/winston-daily-rotate-file/issues/188
json: true,
json: false,
maxSize: 1000000,
maxFiles: 7,
level: logLevel
}))
winston.add(new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
logFormatter(),
winston.format.prettyPrint(),
winston.format.timestamp(),
winston.format.simple(),
winston.format.align(),
winston.format.printf((info) => {
const {
timestamp, level, message
} = info;
handleExceptions: true
}),
new transports.Console({
handleExceptions: true
})
]
})
const ts = timestamp.slice(0, 19).replace('T', ' ');
return `${ts} [${level}]: ${typeof message === "object" ? JSON.stringify(message, null, 2) : message}`;
}),
)
}))
winston.level = logLevel
winstonAttached.set(logFileName, winston)
}
return winstonAttached.get(logFileName)
}
module.exports = Logger

View file

@ -1,6 +1,6 @@
{
"name": "shockapi",
"version": "2021.07.10",
"version": "2021.7.21",
"description": "",
"main": "src/server.js",
"scripts": {

View file

@ -6,7 +6,7 @@ const jwt = require('jsonwebtoken')
const uuidv1 = require('uuid/v1')
const jsonfile = require('jsonfile')
const path = require('path')
const logger = require('winston')
const logger = require('../../config/log')
const Storage = require('node-persist')
const FS = require('../../utils/fs')

View file

@ -5,7 +5,7 @@ const Common = require('shock-common')
const Gun = require('gun')
// @ts-ignore
require('gun/nts')
const logger = require('winston')
const logger = require('../../../config/log')
// @ts-ignore
Gun.log = () => {}
// @ts-ignore
@ -335,7 +335,15 @@ const authenticate = async (alias, pass, __user) => {
},
ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(`Error initializing follows: ${ack.err}`))
rej(
new Error(
`Error initializing follows: ${JSON.stringify(
ack.err,
null,
4
)}`
)
)
} else {
res()
}
@ -366,7 +374,15 @@ const authenticate = async (alias, pass, __user) => {
},
ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(`Error initializing follows: ${ack.err}`))
rej(
new Error(
`Error initializing follows: ${JSON.stringify(
ack.err,
null,
4
)}`
)
)
} else {
res()
}
@ -375,16 +391,10 @@ const authenticate = async (alias, pass, __user) => {
}))
// move this to a subscription; implement off() ? todo
API.Jobs.onAcceptedRequests(_user, mySEA)
API.Jobs.onOrders(_user, gun, mySEA)
API.Jobs.lastSeenNode(_user)
API.Events.onChats(() => {})()
API.Events.onCurrentHandshakeAddress(() => {}, user)()
API.Events.onOutgoing(() => {})()
API.Events.onSeedBackup(() => {}, user, mySEA)
API.Events.onSimplerReceivedRequests(() => {})()
API.Events.onSimplerSentRequests(() => {})()
return _user._.sea.pub
}
@ -421,7 +431,15 @@ const authenticate = async (alias, pass, __user) => {
},
ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(`Error initializing follows: ${ack.err}`))
rej(
new Error(
`Error initializing follows: ${JSON.stringify(
ack.err,
null,
4
)}`
)
)
} else {
res()
}
@ -429,17 +447,10 @@ const authenticate = async (alias, pass, __user) => {
)
}))
API.Jobs.onAcceptedRequests(_user, mySEA)
API.Jobs.onOrders(_user, gun, mySEA)
API.Jobs.lastSeenNode(_user)
API.Events.onChats(() => {})()
API.Events.onCurrentHandshakeAddress(() => {}, user)()
API.Events.onOutgoing(() => {})()
API.Events.onSeedBackup(() => {}, user, mySEA)
API.Events.onSimplerReceivedRequests(() => {})()
API.Events.onSimplerSentRequests(() => {})()
return ack.sea.pub
} else {

View file

@ -2,7 +2,7 @@
* @format
*/
const uuidv1 = require('uuid/v1')
const logger = require('winston')
const logger = require('../../../config/log')
const Common = require('shock-common')
const { Constants, Schema } = Common
const Gun = require('gun')
@ -26,262 +26,10 @@ const LNDHealthMananger = require('../../../utils/lightningServices/errors')
const { enrollContentTokens, selfContentToken } = require('../../seed')
/**
* @typedef {import('./SimpleGUN').GUNNode} GUNNode
* @typedef {import('./SimpleGUN').ISEA} ISEA
* @typedef {import('./SimpleGUN').UserGUNNode} UserGUNNode
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
* @typedef {import('shock-common').Schema.StoredRequest} StoredReq
* @typedef {import('shock-common').Schema.Message} Message
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
* @typedef {import('shock-common').Schema.Order} Order
* @typedef {import('./SimpleGUN').Ack} Ack
*/
/**
* Create a an outgoing feed. The feed will have an initial special acceptance
* message. Returns a promise that resolves to the id of the newly-created
* outgoing feed.
*
* If an outgoing feed is already created for the recipient, then returns the id
* of that one.
* @param {string} withPublicKey Public key of the intended recipient of the
* outgoing feed that will be created.
* @throws {Error} If the outgoing feed cannot be created or if the initial
* message for it also cannot be created. These errors aren't coded as they are
* not meant to be caught outside of this module.
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @returns {Promise<string>}
*/
const __createOutgoingFeed = async (withPublicKey, user, SEA) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
const mySecret = require('../Mediator').getMySecret()
const encryptedForMeRecipientPub = await SEA.encrypt(withPublicKey, mySecret)
const ourSecret = await SEA.secret(
await Utils.pubToEpub(withPublicKey),
user._.sea
)
const maybeOutgoingID = await Utils.recipientToOutgoingID(withPublicKey)
let outgoingFeedID = ''
// if there was no stored outgoing, create an outgoing feed
if (typeof maybeOutgoingID !== 'string') {
/** @type {PartialOutgoing} */
const newPartialOutgoingFeed = {
with: encryptedForMeRecipientPub
}
/** @type {string} */
const newOutgoingFeedID = await new Promise((res, rej) => {
const _outFeedNode = user
.get(Key.OUTGOINGS)
//@ts-ignore
.set(newPartialOutgoingFeed, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res(_outFeedNode._.get)
}
})
})
if (typeof newOutgoingFeedID !== 'string') {
throw new TypeError('typeof newOutgoingFeedID !== "string"')
}
/** @type {Message} */
const initialMsg = {
body: await SEA.encrypt(Constants.Misc.INITIAL_MSG, ourSecret),
timestamp: Date.now()
}
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.OUTGOINGS)
.get(newOutgoingFeedID)
.get(Key.MESSAGES)
//@ts-ignore
.set(initialMsg, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
const encryptedForMeNewOutgoingFeedID = await SEA.encrypt(
newOutgoingFeedID,
mySecret
)
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.RECIPIENT_TO_OUTGOING)
.get(withPublicKey)
.put(encryptedForMeNewOutgoingFeedID, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(Error(ack.err))
} else {
res()
}
})
}))
outgoingFeedID = newOutgoingFeedID
}
// otherwise decrypt stored outgoing
else {
outgoingFeedID = maybeOutgoingID
}
if (typeof outgoingFeedID === 'undefined') {
throw new TypeError(
'__createOutgoingFeed() -> typeof outgoingFeedID === "undefined"'
)
}
if (typeof outgoingFeedID !== 'string') {
throw new TypeError(
'__createOutgoingFeed() -> expected outgoingFeedID to be an string'
)
}
if (outgoingFeedID.length === 0) {
throw new TypeError(
'__createOutgoingFeed() -> expected outgoingFeedID to be a populated string.'
)
}
return outgoingFeedID
}
/**
* Given a request's ID, that should be found on the user's current handshake
* node, accept the request by creating an outgoing feed intended for the
* requestor, then encrypting and putting the id of this newly created outgoing
* feed on the response prop of the request.
* @param {string} requestID The id for the request to accept.
* @param {GUNNode} gun
* @param {UserGUNNode} user Pass only for testing purposes.
* @param {ISEA} SEA
* @param {typeof __createOutgoingFeed} outgoingFeedCreator Pass only
* for testing. purposes.
* @throws {Error} Throws if trying to accept an invalid request, or an error on
* gun's part.
* @returns {Promise<void>}
*/
const acceptRequest = async (
requestID,
gun,
user,
SEA,
outgoingFeedCreator = __createOutgoingFeed
) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
const handshakeAddress = await Utils.tryAndWait(async (_, user) => {
const addr = await user.get(Key.CURRENT_HANDSHAKE_ADDRESS).then()
if (typeof addr !== 'string') {
throw new TypeError("typeof addr !== 'string'")
}
return addr
})
const {
response: encryptedForUsIncomingID,
from: senderPublicKey
} = await Utils.tryAndWait(async gun => {
const hr = await gun
.get(Key.HANDSHAKE_NODES)
.get(handshakeAddress)
.get(requestID)
.then()
if (!Schema.isHandshakeRequest(hr)) {
throw new Error(ErrorCode.TRIED_TO_ACCEPT_AN_INVALID_REQUEST)
}
return hr
})
/** @type {string} */
const requestorEpub = await Utils.pubToEpub(senderPublicKey)
const ourSecret = await SEA.secret(requestorEpub, user._.sea)
if (typeof ourSecret !== 'string') {
throw new TypeError("typeof ourSecret !== 'string'")
}
const incomingID = await SEA.decrypt(encryptedForUsIncomingID, ourSecret)
if (typeof incomingID !== 'string') {
throw new TypeError("typeof incomingID !== 'string'")
}
const newlyCreatedOutgoingFeedID = await outgoingFeedCreator(
senderPublicKey,
user,
SEA
)
const mySecret = require('../Mediator').getMySecret()
const encryptedForMeIncomingID = await SEA.encrypt(incomingID, mySecret)
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.USER_TO_INCOMING)
.get(senderPublicKey)
.put(encryptedForMeIncomingID, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
////////////////////////////////////////////////////////////////////////////
// NOTE: perform non-reversable actions before destructive actions
// In case any of the non-reversable actions reject.
// In this case, writing to the response is the non-revesarble op.
////////////////////////////////////////////////////////////////////////////
const encryptedForUsOutgoingID = await SEA.encrypt(
newlyCreatedOutgoingFeedID,
ourSecret
)
//why await if you dont need the response?
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
gun
.get(Key.HANDSHAKE_NODES)
.get(handshakeAddress)
.get(requestID)
.put(
{
response: encryptedForUsOutgoingID
},
ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
}
)
}))
}
/**
* @param {string} user
* @param {string} pass
@ -374,438 +122,6 @@ const generateHandshakeAddress = async () => {
}))
}
/**
*
* @param {string} pub
* @throws {Error}
* @returns {Promise<void>}
*/
const cleanup = async pub => {
const user = require('../Mediator').getUser()
const outGoingID = await Utils.recipientToOutgoingID(pub)
const promises = []
promises.push(
/** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.USER_TO_INCOMING)
.get(pub)
.put(null, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
)
promises.push(
/** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.RECIPIENT_TO_OUTGOING)
.get(pub)
.put(null, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
)
promises.push(
/** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.USER_TO_LAST_REQUEST_SENT)
.get(pub)
.put(null, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
)
if (outGoingID) {
promises.push(
/** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.OUTGOINGS)
.get(outGoingID)
.put(null, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
)
}
await Promise.all(promises)
}
/**
* @param {string} recipientPublicKey
* @param {GUNNode} gun
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @throws {Error|TypeError}
* @returns {Promise<void>}
*/
const sendHandshakeRequest = async (recipientPublicKey, gun, user, SEA) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
await cleanup(recipientPublicKey)
if (typeof recipientPublicKey !== 'string') {
throw new TypeError(
`recipientPublicKey is not string, got: ${typeof recipientPublicKey}`
)
}
if (recipientPublicKey.length === 0) {
throw new TypeError('recipientPublicKey is an string of length 0')
}
if (recipientPublicKey === user.is.pub) {
throw new Error('Do not send a request to yourself')
}
logger.info('sendHR() -> before recipientEpub')
/** @type {string} */
const recipientEpub = await Utils.pubToEpub(recipientPublicKey)
logger.info('sendHR() -> before mySecret')
const mySecret = require('../Mediator').getMySecret()
logger.info('sendHR() -> before ourSecret')
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
// check if successful handshake is present
logger.info('sendHR() -> before alreadyHandshaked')
/** @type {boolean} */
const alreadyHandshaked = await Utils.successfulHandshakeAlreadyExists(
recipientPublicKey
)
if (alreadyHandshaked) {
throw new Error(ErrorCode.ALREADY_HANDSHAKED)
}
logger.info('sendHR() -> before maybeLastRequestIDSentToUser')
// check that we have already sent a request to this user, on his current
// handshake node
const maybeLastRequestIDSentToUser = await Utils.tryAndWait((_, user) =>
user
.get(Key.USER_TO_LAST_REQUEST_SENT)
.get(recipientPublicKey)
.then()
)
logger.info('sendHR() -> before currentHandshakeAddress')
const currentHandshakeAddress = await Utils.tryAndWait(
gun =>
Common.Utils.makePromise(res => {
gun
.user(recipientPublicKey)
.get(Key.CURRENT_HANDSHAKE_ADDRESS)
.once(
data => {
res(data)
},
{ wait: 1000 }
)
}),
data => typeof data !== 'string'
)
if (typeof currentHandshakeAddress !== 'string') {
throw new TypeError(
'expected current handshake address found on recipients user node to be an string'
)
}
if (typeof maybeLastRequestIDSentToUser === 'string') {
if (maybeLastRequestIDSentToUser.length < 5) {
throw new TypeError(
'sendHandshakeRequest() -> maybeLastRequestIDSentToUser.length < 5'
)
}
const lastRequestIDSentToUser = maybeLastRequestIDSentToUser
logger.info('sendHR() -> before alreadyContactedOnCurrHandshakeNode')
const hrInHandshakeNode = await Utils.tryAndWait(
gun =>
new Promise(res => {
gun
.get(Key.HANDSHAKE_NODES)
.get(currentHandshakeAddress)
.get(lastRequestIDSentToUser)
.once(data => {
res(data)
})
}),
// force retry on undefined in case the undefined was a false negative
v => typeof v === 'undefined'
)
const alreadyContactedOnCurrHandshakeNode =
typeof hrInHandshakeNode !== 'undefined'
if (alreadyContactedOnCurrHandshakeNode) {
throw new Error(ErrorCode.ALREADY_REQUESTED_HANDSHAKE)
}
}
logger.info('sendHR() -> before __createOutgoingFeed')
const outgoingFeedID = await __createOutgoingFeed(
recipientPublicKey,
user,
SEA
)
logger.info('sendHR() -> before encryptedForUsOutgoingFeedID')
const encryptedForUsOutgoingFeedID = await SEA.encrypt(
outgoingFeedID,
ourSecret
)
const timestamp = Date.now()
/** @type {HandshakeRequest} */
const handshakeRequestData = {
from: user.is.pub,
response: encryptedForUsOutgoingFeedID,
timestamp
}
const encryptedForMeRecipientPublicKey = await SEA.encrypt(
recipientPublicKey,
mySecret
)
logger.info('sendHR() -> before newHandshakeRequestID')
/** @type {string} */
const newHandshakeRequestID = await new Promise((res, rej) => {
const hr = gun
.get(Key.HANDSHAKE_NODES)
.get(currentHandshakeAddress)
//@ts-ignore
.set(handshakeRequestData, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(`Error trying to create request: ${ack.err}`))
} else {
res(hr._.get)
}
})
})
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.USER_TO_LAST_REQUEST_SENT)
.get(recipientPublicKey)
.put(newHandshakeRequestID, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
// This needs to come before the write to sent requests. Because that write
// triggers Jobs.onAcceptedRequests and it in turn reads from request-to-user
/**
* @type {StoredReq}
*/
const storedReq = {
sentReqID: await SEA.encrypt(newHandshakeRequestID, mySecret),
recipientPub: encryptedForMeRecipientPublicKey,
handshakeAddress: await SEA.encrypt(currentHandshakeAddress, mySecret),
timestamp
}
//why await if you dont need the response?
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
//@ts-ignore
user.get(Key.STORED_REQS).set(storedReq, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(
new Error(
`Error saving newly created request to sent requests: ${ack.err}`
)
)
} else {
res()
}
})
}))
}
/**
* Returns the message id.
* @param {string} recipientPublicKey
* @param {string} body
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @returns {Promise<import('shock-common').Schema.ChatMessage>} The message id.
*/
const sendMessageNew = async (recipientPublicKey, body, user, SEA) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
if (typeof recipientPublicKey !== 'string') {
throw new TypeError(
`expected recipientPublicKey to be an string, but instead got: ${typeof recipientPublicKey}`
)
}
if (recipientPublicKey.length === 0) {
throw new TypeError(
'expected recipientPublicKey to be an string of length greater than zero'
)
}
if (typeof body !== 'string') {
throw new TypeError(
`expected message to be an string, instead got: ${typeof body}`
)
}
if (body.length === 0) {
throw new TypeError(
'expected message to be an string of length greater than zero'
)
}
const outgoingID = await Utils.recipientToOutgoingID(recipientPublicKey)
if (outgoingID === null) {
throw new Error(
`Could not fetch an outgoing id for user: ${recipientPublicKey}`
)
}
const recipientEpub = await Utils.pubToEpub(recipientPublicKey)
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
if (typeof ourSecret !== 'string') {
throw new TypeError("sendMessage() -> typeof ourSecret !== 'string'")
}
const encryptedBody = await SEA.encrypt(body, ourSecret)
const newMessage = {
body: encryptedBody,
timestamp: Date.now()
}
return new Promise((res, rej) => {
const msgNode = user
.get(Key.OUTGOINGS)
.get(outgoingID)
.get(Key.MESSAGES)
.set(newMessage, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res({
body,
id: msgNode._.get,
outgoing: true,
timestamp: newMessage.timestamp
})
}
})
})
}
/**
* Returns the message id.
* @param {string} recipientPublicKey
* @param {string} body
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @returns {Promise<string>} The message id.
*/
const sendMessage = async (recipientPublicKey, body, user, SEA) =>
(await sendMessageNew(recipientPublicKey, body, user, SEA)).id
/**
* @param {string} recipientPub
* @param {string} msgID
* @param {UserGUNNode} user
* @returns {Promise<void>}
*/
const deleteMessage = async (recipientPub, msgID, user) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
if (typeof recipientPub !== 'string') {
throw new TypeError(
`expected recipientPublicKey to be an string, but instead got: ${typeof recipientPub}`
)
}
if (recipientPub.length === 0) {
throw new TypeError(
'expected recipientPublicKey to be an string of length greater than zero'
)
}
if (typeof msgID !== 'string') {
throw new TypeError(
`expected msgID to be an string, instead got: ${typeof msgID}`
)
}
if (msgID.length === 0) {
throw new TypeError(
'expected msgID to be an string of length greater than zero'
)
}
const outgoingID = await Utils.recipientToOutgoingID(recipientPub)
if (outgoingID === null) {
throw new Error(`Could not fetch an outgoing id for user: ${recipientPub}`)
}
return new Promise((res, rej) => {
user
.get(Key.OUTGOINGS)
.get(outgoingID)
.get(Key.MESSAGES)
.get(msgID)
.put(null, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
})
}
/**
* @param {string|null} avatar
* @param {UserGUNNode} user
@ -924,52 +240,6 @@ const setSeedServiceData = (encryptedSeedServiceData, user) =>
})
})
/**
* @param {string} initialMsg
* @param {string} recipientPublicKey
* @param {GUNNode} gun
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @throws {Error|TypeError}
* @returns {Promise<void>}
*/
const sendHRWithInitialMsg = async (
initialMsg,
recipientPublicKey,
gun,
user,
SEA
) => {
/** @type {boolean} */
const alreadyHandshaked = await Utils.tryAndWait(
(_, user) =>
new Promise((res, rej) => {
user
.get(Key.USER_TO_INCOMING)
.get(recipientPublicKey)
.once(inc => {
if (typeof inc !== 'string') {
res(false)
} else if (inc.length === 0) {
rej(
new Error(
`sendHRWithInitialMsg()-> obtained encryptedIncomingId from user-to-incoming an string but of length 0`
)
)
} else {
res(true)
}
})
})
)
if (!alreadyHandshaked) {
await sendHandshakeRequest(recipientPublicKey, gun, user, SEA)
}
await sendMessage(recipientPublicKey, initialMsg, user, SEA)
}
/**
* @typedef {object} SpontPaymentOptions
* @prop {Common.Schema.OrderTargetType} type
@ -1035,7 +305,7 @@ const sendSpontaneousPayment = async (
logger.info('sendPayment() -> will now create order:')
/** @type {Order} */
/** @type {import('shock-common').Schema.Order} */
const order = {
amount: amount.toString(),
from: getUser()._.sea.pub,
@ -1395,18 +665,6 @@ const saveChannelsBackup = async (backups, user, SEA) => {
})
}
/**
* @param {string} pub
* @returns {Promise<void>}
*/
const disconnect = async pub => {
if (!(await Utils.successfulHandshakeAlreadyExists(pub))) {
throw new Error('No handshake exists for this pub')
}
await Promise.all([cleanup(pub), generateHandshakeAddress()])
}
/**
* @returns {Promise<void>}
*/
@ -1789,15 +1047,9 @@ const initWall = async () => {
}
module.exports = {
__createOutgoingFeed,
acceptRequest,
authenticate,
blacklist,
generateHandshakeAddress,
sendHandshakeRequest,
deleteMessage,
sendMessage,
sendHRWithInitialMsg,
setAvatar,
setDisplayName,
sendPayment,
@ -1805,14 +1057,12 @@ module.exports = {
setBio,
saveSeedBackup,
saveChannelsBackup,
disconnect,
setLastSeenApp,
createPost,
deletePost,
follow,
unfollow,
initWall,
sendMessageNew,
sendSpontaneousPayment,
createPostNew,
setDefaultSeedProvider,

View file

@ -2,427 +2,22 @@
* @prettier
*/
const debounce = require('lodash/debounce')
const logger = require('winston')
const {
Constants: { ErrorCode },
Schema,
Utils: CommonUtils
Constants: { ErrorCode }
} = require('shock-common')
const Key = require('../key')
const Utils = require('../utils')
/**
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
* @typedef {import('../SimpleGUN').ISEA} ISEA
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
* @typedef {import('shock-common').Schema.Message} Message
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
* @typedef {import('shock-common').Schema.Chat} Chat
* @typedef {import('shock-common').Schema.ChatMessage} ChatMessage
* @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest
* @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest
*/
const DEBOUNCE_WAIT_TIME = 500
/**
* @param {(userToIncoming: Record<string, string>) => void} cb
* @param {UserGUNNode} user Pass only for testing purposes.
* @param {ISEA} SEA
* @returns {void}
*/
const __onUserToIncoming = (cb, user, SEA) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
/** @type {Record<string, string>} */
const userToIncoming = {}
const mySecret = require('../../Mediator').getMySecret()
user
.get(Key.USER_TO_INCOMING)
.map()
.on(async (encryptedIncomingID, userPub) => {
if (typeof encryptedIncomingID !== 'string') {
if (encryptedIncomingID === null) {
// on disconnect
delete userToIncoming[userPub]
} else {
logger.error(
'got a non string non null value inside user to incoming'
)
}
return
}
if (encryptedIncomingID.length === 0) {
logger.error('got an empty string value')
return
}
const incomingID = await SEA.decrypt(encryptedIncomingID, mySecret)
if (typeof incomingID === 'undefined') {
logger.warn('could not decrypt incomingID inside __onUserToIncoming')
return
}
userToIncoming[userPub] = incomingID
callb(userToIncoming)
})
}
/** @type {Set<(addr: string|null) => void>} */
const addressListeners = new Set()
/** @type {string|null} */
let currentAddress = null
const getHandshakeAddress = () => currentAddress
/** @param {string|null} addr */
const setAddress = addr => {
currentAddress = addr
addressListeners.forEach(l => l(currentAddress))
}
let addrSubbed = false
/**
* @param {(currentHandshakeAddress: string|null) => void} cb
* @param {UserGUNNode} user
* @returns {() => void}
*/
const onCurrentHandshakeAddress = (cb, user) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
addressListeners.add(cb)
cb(currentAddress)
if (!addrSubbed) {
addrSubbed = true
user.get(Key.CURRENT_HANDSHAKE_ADDRESS).on(addr => {
if (typeof addr !== 'string') {
logger.error('expected handshake address to be an string')
setAddress(null)
return
}
setAddress(addr)
})
}
return () => {
addressListeners.delete(cb)
}
}
/**
* @param {(messages: Record<string, Message>) => void} cb
* @param {string} userPK Public key of the user from whom the incoming
* messages will be obtained.
* @param {string} incomingFeedID ID of the outgoing feed from which the
* incoming messages will be obtained.
* @param {GUNNode} gun (Pass only for testing purposes)
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @returns {void}
*/
const onIncomingMessages = (cb, userPK, incomingFeedID, gun, user, SEA) => {
if (!user.is) {
throw new Error(ErrorCode.NOT_AUTH)
}
const callb = debounce(cb, DEBOUNCE_WAIT_TIME)
const otherUser = gun.user(userPK)
/**
* @type {Record<string, Message>}
*/
const messages = {}
callb(messages)
otherUser
.get(Key.OUTGOINGS)
.get(incomingFeedID)
.get(Key.MESSAGES)
.map()
.on(async (data, key) => {
if (!Schema.isMessage(data)) {
logger.warn('non-message received')
return
}
/** @type {string} */
const recipientEpub = await Utils.pubToEpub(userPK)
const secret = await SEA.secret(recipientEpub, user._.sea)
let { body } = data
body = await SEA.decrypt(body, secret)
messages[key] = {
body,
timestamp: data.timestamp
}
callb(messages)
})
}
/**
* @typedef {Record<string, Outgoing|null>} Outgoings
* @typedef {(outgoings: Outgoings) => void} OutgoingsListener
*/
/**
* @type {Outgoings}
*/
let currentOutgoings = {}
const getCurrentOutgoings = () => currentOutgoings
/** @type {Set<OutgoingsListener>} */
const outgoingsListeners = new Set()
outgoingsListeners.add(o => {
const values = Object.values(o)
const nulls = values.filter(x => x === null).length
const nonNulls = values.length - nulls
logger.info(`new outgoings, ${nulls} nulls and ${nonNulls} nonNulls`)
})
const notifyOutgoingsListeners = () => {
outgoingsListeners.forEach(l => l(currentOutgoings))
}
let outSubbed = false
/**
* @param {OutgoingsListener} cb
* @returns {() => void}
*/
const onOutgoing = cb => {
outgoingsListeners.add(cb)
cb(currentOutgoings)
if (!outSubbed) {
const user = require('../../Mediator').getUser()
user.get(Key.OUTGOINGS).open(
debounce(async data => {
try {
if (typeof data !== 'object' || data === null) {
currentOutgoings = {}
notifyOutgoingsListeners()
return
}
/** @type {Record<string, Outgoing|null>} */
const newOuts = {}
const SEA = require('../../Mediator').mySEA
const mySecret = await Utils.mySecret()
await CommonUtils.asyncForEach(
Object.entries(data),
async ([id, out]) => {
if (typeof out !== 'object') {
return
}
if (out === null) {
newOuts[id] = null
return
}
const { with: encPub, messages } = out
if (typeof encPub !== 'string') {
return
}
const pub = await SEA.decrypt(encPub, mySecret)
if (!newOuts[id]) {
newOuts[id] = {
with: pub,
messages: {}
}
}
const ourSec = await SEA.secret(
await Utils.pubToEpub(pub),
user._.sea
)
if (typeof messages === 'object' && messages !== null) {
await CommonUtils.asyncForEach(
Object.entries(messages),
async ([mid, msg]) => {
if (typeof msg === 'object' && msg !== null) {
if (
typeof msg.body === 'string' &&
typeof msg.timestamp === 'number'
) {
const newOut = newOuts[id]
if (!newOut) {
return
}
newOut.messages[mid] = {
body: await SEA.decrypt(msg.body, ourSec),
timestamp: msg.timestamp
}
}
}
}
)
}
}
)
currentOutgoings = newOuts
notifyOutgoingsListeners()
} catch (e) {
logger.info('--------------------------')
logger.info('Events -> onOutgoing')
logger.info(e)
logger.info('--------------------------')
}
}, 400)
)
outSubbed = true
}
return () => {
outgoingsListeners.delete(cb)
}
}
////////////////////////////////////////////////////////////////////////////////
/**
* @typedef {(chats: Chat[]) => void} ChatsListener
*/
/** @type {Chat[]} */
let currentChats = []
const getChats = () => currentChats
/** @type {Set<ChatsListener>} */
const chatsListeners = new Set()
chatsListeners.add(c => {
logger.info(`Chats: ${c.length}`)
})
const notifyChatsListeners = () => {
chatsListeners.forEach(l => l(currentChats))
}
const processChats = debounce(() => {
const Streams = require('../streams')
const currentOutgoings = getCurrentOutgoings()
const existingOutgoings = /** @type {[string, Outgoing][]} */ (Object.entries(
currentOutgoings
).filter(([_, o]) => o !== null))
const pubToFeed = Streams.getPubToFeed()
/** @type {Chat[]} */
const newChats = []
for (const [outID, out] of existingOutgoings) {
/** @type {ChatMessage[]} */
let msgs = Object.entries(out.messages)
.map(([mid, m]) => ({
id: mid,
outgoing: true,
body: m.body,
timestamp: m.timestamp
}))
// filter out null messages
.filter(m => typeof m.body === 'string')
const incoming = pubToFeed[out.with]
if (Array.isArray(incoming)) {
msgs = [...msgs, ...incoming]
}
/** @type {Chat} */
const chat = {
recipientPublicKey: out.with,
didDisconnect: pubToFeed[out.with] === 'disconnected',
id: out.with + outID,
messages: msgs,
recipientAvatar: null,
recipientDisplayName: null,
lastSeenApp: null
}
newChats.push(chat)
}
currentChats = newChats.filter(
c =>
Array.isArray(pubToFeed[c.recipientPublicKey]) ||
pubToFeed[c.recipientPublicKey] === 'disconnected'
)
notifyChatsListeners()
}, 750)
let onChatsSubbed = false
/**
* Massages all of the more primitive data structures into a more manageable
* 'Chat' paradigm.
* @param {ChatsListener} cb
* @returns {() => void}
*/
const onChats = cb => {
if (!chatsListeners.add(cb)) {
throw new Error('Tried to subscribe twice')
}
cb(currentChats)
if (!onChatsSubbed) {
const Streams = require('../streams')
onOutgoing(processChats)
Streams.onPubToFeed(processChats)
onChatsSubbed = true
}
return () => {
if (!chatsListeners.delete(cb)) {
throw new Error('Tried to unsubscribe twice')
}
}
}
/** @type {string|null} */
let currentSeedBackup = null
/**
* @param {(seedBackup: string|null) => void} cb
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @param {import('../SimpleGUN').UserGUNNode} user
* @param {import('../SimpleGUN').ISEA} SEA
* @throws {Error} If user hasn't been auth.
* @returns {void}
*/
@ -445,17 +40,5 @@ const onSeedBackup = (cb, user, SEA) => {
}
module.exports = {
__onUserToIncoming,
onCurrentHandshakeAddress,
onIncomingMessages,
onOutgoing,
getCurrentOutgoings,
onSimplerReceivedRequests: require('./onReceivedReqs').onReceivedReqs,
onSimplerSentRequests: require('./onSentReqs').onSentReqs,
getCurrentSentReqs: require('./onSentReqs').getCurrentSentReqs,
getCurrentReceivedReqs: require('./onReceivedReqs').getReceivedReqs,
onSeedBackup,
onChats,
getHandshakeAddress,
getChats
onSeedBackup
}

View file

@ -1,138 +0,0 @@
/** @format */
const debounce = require('lodash/debounce')
const logger = require('winston')
const { Schema } = require('shock-common')
const size = require('lodash/size')
const Key = require('../key')
const Streams = require('../streams')
/**
* @typedef {Readonly<import('shock-common').Schema.SimpleReceivedRequest>} SimpleReceivedRequest
* @typedef {(reqs: ReadonlyArray<SimpleReceivedRequest>) => void} Listener
*/
/** @type {Set<Listener>} */
const listeners = new Set()
/** @type {string|null} */
let currentAddress = null
/** @type {Record<string, SimpleReceivedRequest>} */
let currReceivedReqsMap = {}
/**
* Unprocessed requests in current handshake node.
* @type {Record<string, import('shock-common').Schema.HandshakeRequest>}
*/
let currAddressData = {}
/** @returns {SimpleReceivedRequest[]} */
const getReceivedReqs = () => Object.values(currReceivedReqsMap)
/** @param {Record<string, SimpleReceivedRequest>} reqs */
const setReceivedReqsMap = reqs => {
currReceivedReqsMap = reqs
listeners.forEach(l => l(getReceivedReqs()))
}
listeners.add(() => {
logger.info(`new received reqs: ${size(getReceivedReqs())}`)
})
const react = debounce(() => {
/** @type {Record<string, SimpleReceivedRequest>} */
const newReceivedReqsMap = {}
const pubToFeed = Streams.getPubToFeed()
for (const [id, req] of Object.entries(currAddressData)) {
const inContact = Array.isArray(pubToFeed[req.from])
const isDisconnected = pubToFeed[req.from] === 'disconnected'
if (!inContact && !isDisconnected) {
newReceivedReqsMap[req.from] = {
id,
requestorAvatar: null,
requestorDisplayName: null,
requestorPK: req.from,
timestamp: req.timestamp
}
}
}
setReceivedReqsMap(newReceivedReqsMap)
}, 750)
/**
* @param {string} addr
* @returns {(data: import('../SimpleGUN').OpenListenerData) => void}
*/
const listenerForAddr = addr => data => {
// did invalidate
if (addr !== currentAddress) {
return
}
if (typeof data !== 'object' || data === null) {
currAddressData = {}
} else {
for (const [id, req] of Object.entries(data)) {
// no need to update them just write them once
if (Schema.isHandshakeRequest(req) && !currAddressData[id]) {
currAddressData[id] = req
}
}
}
logger.info('data for address length: ' + size(currAddressData))
react()
}
let subbed = false
/**
* @param {Listener} cb
* @returns {() => void}
*/
const onReceivedReqs = cb => {
listeners.add(cb)
cb(getReceivedReqs())
if (!subbed) {
const user = require('../../Mediator').getUser()
if (!user.is) {
logger.warn('Tried subscribing to onReceivedReqs without authing')
}
require('./index').onCurrentHandshakeAddress(addr => {
if (currentAddress === addr) {
return
}
currentAddress = addr
currAddressData = {}
setReceivedReqsMap({})
if (typeof addr === 'string') {
require('../../Mediator')
.getGun()
.get(Key.HANDSHAKE_NODES)
.get(addr)
.open(listenerForAddr(addr))
}
}, user)
Streams.onPubToFeed(react)
subbed = true
}
return () => {
listeners.delete(cb)
}
}
module.exports = {
getReceivedReqs,
onReceivedReqs
}

View file

@ -1,128 +0,0 @@
/** @format */
const debounce = require('lodash/debounce')
const logger = require('winston')
const size = require('lodash/size')
const Streams = require('../streams')
/**
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
* @typedef {import('../SimpleGUN').ISEA} ISEA
* @typedef {import('../SimpleGUN').ListenerData} ListenerData
* @typedef {import('shock-common').Schema.HandshakeRequest} HandshakeRequest
* @typedef {import('shock-common').Schema.Message} Message
* @typedef {import('shock-common').Schema.Outgoing} Outgoing
* @typedef {import('shock-common').Schema.PartialOutgoing} PartialOutgoing
* @typedef {import('shock-common').Schema.Chat} Chat
* @typedef {import('shock-common').Schema.ChatMessage} ChatMessage
* @typedef {import('shock-common').Schema.SimpleSentRequest} SimpleSentRequest
* @typedef {import('shock-common').Schema.SimpleReceivedRequest} SimpleReceivedRequest
*/
/**
* @typedef {(chats: SimpleSentRequest[]) => void} Listener
*/
/** @type {Set<Listener>} */
const listeners = new Set()
/** @type {SimpleSentRequest[]} */
let currentReqs = []
listeners.add(() => {
logger.info(`new sent reqs length: ${size(currentReqs)}`)
})
const getCurrentSentReqs = () => currentReqs
// any time any of the streams we use notifies us that it changed, we fire up
// react()
const react = debounce(() => {
/** @type {SimpleSentRequest[]} */
const newReqs = []
// reactive streams
// maps a pk to its current handshake address
const pubToHAddr = Streams.getAddresses()
// a set or list containing copies of sent requests
const storedReqs = Streams.getStoredReqs()
// maps a pk to the last request sent to it (so old stored reqs are invalidated)
const pubToLastSentReqID = Streams.getSentReqIDs()
// maps a pk to a feed, messages if subbed and pk is pubbing, null /
// 'disconnected' otherwise
const pubToFeed = Streams.getPubToFeed()
logger.info(`pubToLastSentREqID length: ${size(pubToLastSentReqID)}`)
for (const storedReq of storedReqs) {
const { handshakeAddress, recipientPub, sentReqID, timestamp } = storedReq
const currAddress = pubToHAddr[recipientPub]
const lastReqID = pubToLastSentReqID[recipientPub]
// invalidate if this stored request is not the last one sent to this
// particular pk
const isStale = typeof lastReqID !== 'undefined' && lastReqID !== sentReqID
// invalidate if we are in a pub/sub state to this pk (handshake in place)
const isConnected = Array.isArray(pubToFeed[recipientPub])
if (isStale || isConnected) {
// eslint-disable-next-line no-continue
continue
}
// no address for this pk? let's ask the corresponding stream to sub to
// gun.user(pk).get('currentAddr')
if (typeof currAddress === 'undefined') {
// eslint-disable-next-line no-empty-function
Streams.onAddresses(() => {}, recipientPub)()
}
newReqs.push({
id: sentReqID,
recipientAvatar: null,
recipientChangedRequestAddress:
// if we haven't received the other's user current handshake address,
// let's assume he hasn't changed it and that this request is still
// valid
typeof currAddress !== 'undefined' && handshakeAddress !== currAddress,
recipientDisplayName: null,
recipientPublicKey: recipientPub,
timestamp
})
}
currentReqs = newReqs
listeners.forEach(l => l(currentReqs))
}, 750)
let subbed = false
/**
* Massages all of the more primitive data structures into a more manageable
* 'Chat' paradigm.
* @param {Listener} cb
* @returns {() => void}
*/
const onSentReqs = cb => {
listeners.add(cb)
cb(currentReqs)
if (!subbed) {
Streams.onAddresses(react)
Streams.onStoredReqs(react)
Streams.onLastSentReqIDs(react)
Streams.onPubToFeed(react)
subbed = true
}
return () => {
listeners.delete(cb)
}
}
module.exports = {
onSentReqs,
getCurrentSentReqs
}

View file

@ -2,7 +2,7 @@
* @format
*/
const Common = require('shock-common')
const Logger = require('winston')
const Logger = require('../../../../config/log')
const size = require('lodash/size')
const Utils = require('../utils')

View file

@ -9,12 +9,10 @@
* tasks accept factories that are homonymous to the events on this same module.
*/
const onAcceptedRequests = require('./onAcceptedRequests')
const onOrders = require('./onOrders')
const lastSeenNode = require('./lastSeenNode')
module.exports = {
onAcceptedRequests,
onOrders,
lastSeenNode
}

View file

@ -2,7 +2,7 @@
* @format
*/
const logger = require('winston')
const logger = require('../../../../config/log')
const {
Constants: {

View file

@ -1,191 +0,0 @@
/**
* @format
*/
const logger = require('winston')
const {
Constants: { ErrorCode },
Schema
} = require('shock-common')
const size = require('lodash/size')
const Key = require('../key')
const Utils = require('../utils')
/**
* @typedef {import('../SimpleGUN').GUNNode} GUNNode
* @typedef {import('../SimpleGUN').ISEA} ISEA
* @typedef {import('../SimpleGUN').UserGUNNode} UserGUNNode
*/
let procid = 0
/**
* @throws {Error} NOT_AUTH
* @param {UserGUNNode} user
* @param {ISEA} SEA
* @returns {void}
*/
const onAcceptedRequests = (user, SEA) => {
if (!user.is) {
logger.warn('onAcceptedRequests() -> tried to sub without authing')
throw new Error(ErrorCode.NOT_AUTH)
}
procid++
user
.get(Key.STORED_REQS)
.map()
.on(async (storedReq, id) => {
logger.info(
`------------------------------------\nPROCID:${procid} (used for debugging memory leaks in jobs)\n---------------------------------------`
)
const mySecret = require('../../Mediator').getMySecret()
try {
if (!Schema.isStoredRequest(storedReq)) {
throw new Error(
'Stored request not an StoredRequest, instead got: ' +
JSON.stringify(storedReq) +
' this can be due to nulling out an old request (if null) or something else happened (please look at the output)'
)
}
// get the recipient pub from the stored request to avoid an attacker
// overwriting the handshake request in the root graph
const recipientPub = await SEA.decrypt(storedReq.recipientPub, mySecret)
if (typeof recipientPub !== 'string') {
throw new TypeError(
`Expected storedReq.recipientPub to be an string, instead got: ${recipientPub}`
)
}
if (await Utils.successfulHandshakeAlreadyExists(recipientPub)) {
return
}
const requestAddress = await SEA.decrypt(
storedReq.handshakeAddress,
mySecret
)
if (typeof requestAddress !== 'string') {
throw new TypeError()
}
const sentReqID = await SEA.decrypt(storedReq.sentReqID, mySecret)
if (typeof sentReqID !== 'string') {
throw new TypeError()
}
const latestReqSentID = await Utils.recipientPubToLastReqSentID(
recipientPub
)
const isStaleRequest = latestReqSentID !== sentReqID
if (isStaleRequest) {
return
}
const gun = require('../../Mediator').getGun()
const user = require('../../Mediator').getUser()
const recipientEpub = await Utils.pubToEpub(recipientPub)
const ourSecret = await SEA.secret(recipientEpub, user._.sea)
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
gun
.get(Key.HANDSHAKE_NODES)
.get(requestAddress)
.get(sentReqID)
.on(async sentReq => {
if (!Schema.isHandshakeRequest(sentReq)) {
rej(
new Error(
'sent request found in handshake node not a handshake request'
)
)
return
}
// The response can be decrypted with the same secret regardless
// of who wrote to it last (see HandshakeRequest definition). This
// could be our feed ID for the recipient, or the recipient's feed
// id if he accepted the request.
const feedID = await SEA.decrypt(sentReq.response, ourSecret)
if (typeof feedID !== 'string') {
throw new TypeError("typeof feedID !== 'string'")
}
logger.info(`onAcceptedRequests -> decrypted feed ID: ${feedID}`)
logger.info(
'Will now try to access the other users outgoing feed'
)
const maybeFeedOnRecipientsOutgoings = await Utils.tryAndWait(
gun =>
new Promise(res => {
gun
.user(recipientPub)
.get(Key.OUTGOINGS)
.get(feedID)
.once(feed => {
res(feed)
})
}),
// @ts-ignore
v => size(v) === 0
)
const feedIDExistsOnRecipientsOutgoings =
typeof maybeFeedOnRecipientsOutgoings === 'object' &&
maybeFeedOnRecipientsOutgoings !== null
if (!feedIDExistsOnRecipientsOutgoings) {
return
}
const encryptedForMeIncomingID = await SEA.encrypt(
feedID,
mySecret
)
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.USER_TO_INCOMING)
.get(recipientPub)
.put(encryptedForMeIncomingID, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
await /** @type {Promise<void>} */ (new Promise((res, rej) => {
user
.get(Key.STORED_REQS)
.get(id)
.put(null, ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
} else {
res()
}
})
}))
// ensure this listeners gets called at least once
res()
})
}))
} catch (err) {
logger.warn(`Jobs.onAcceptedRequests() -> ${err.message}`)
logger.error(err)
}
})
}
module.exports = onAcceptedRequests

View file

@ -2,7 +2,7 @@
* @format
*/
// @ts-check
const logger = require('winston')
const logger = require('../../../../config/log')
const isFinite = require('lodash/isFinite')
const isNumber = require('lodash/isNumber')
const isNaN = require('lodash/isNaN')
@ -110,19 +110,12 @@ const listenerForAddr = (addr, SEA) => async (order, orderID) => {
ordersProcessed.add(orderID)
logger.info(
`onOrders() -> processing order: ${orderID} -- ${JSON.stringify(
order
)} -- addr: ${addr}`
)
const alreadyAnswered = await getUser()
.get(Key.ORDER_TO_RESPONSE)
.get(orderID)
.then()
if (alreadyAnswered) {
logger.info('this order is already answered, quitting')
return
}
@ -209,10 +202,6 @@ const listenerForAddr = (addr, SEA) => async (order, orderID) => {
private: true
}
logger.info(
`onOrders() -> Will now create an invoice : ${JSON.stringify(invoiceReq)}`
)
const invoice = await _addInvoice(invoiceReq)
logger.info(

View file

@ -1,56 +0,0 @@
/** @format */
const logger = require('winston')
const size = require('lodash/size')
const Key = require('../key')
/**
* @typedef {Record<string, string|null|undefined>} Addresses
*/
/** @type {Addresses} */
const pubToAddress = {}
/** @type {Set<() => void>} */
const listeners = new Set()
listeners.add(() => {
logger.info(`pubToAddress length: ${size(pubToAddress)}`)
})
const notify = () => listeners.forEach(l => l())
/** @type {Set<string>} */
const subbedPublicKeys = new Set()
/**
* @param {() => void} cb
* @param {string=} pub
*/
const onAddresses = (cb, pub) => {
listeners.add(cb)
cb()
if (pub && subbedPublicKeys.add(pub)) {
require('../../Mediator')
.getGun()
.user(pub)
.get(Key.CURRENT_HANDSHAKE_ADDRESS)
.on(addr => {
if (typeof addr === 'string' || addr === null) {
pubToAddress[pub] = addr
} else {
pubToAddress[pub] = null
}
notify()
})
}
return () => {
listeners.delete(cb)
}
}
const getAddresses = () => pubToAddress
module.exports = {
onAddresses,
getAddresses
}

View file

@ -1,98 +1,6 @@
/** @format */
const { Schema, Utils: CommonUtils } = require('shock-common')
const Key = require('../key')
const Utils = require('../utils')
/**
* @typedef {import('shock-common').Schema.StoredRequest} StoredRequest
* @typedef {(reqs: StoredRequest[]) => void} StoredRequestsListener
*/
/** @type {Set<StoredRequestsListener>} */
const storedRequestsListeners = new Set()
/**
* @type {StoredRequest[]}
*/
let encryptedStoredReqs = []
/**
* @type {StoredRequest[]}
*/
let currentStoredReqs = []
const getStoredReqs = () => currentStoredReqs
const processStoredReqs = async () => {
const ereqs = encryptedStoredReqs
encryptedStoredReqs = []
const mySecret = await Utils.mySecret()
const SEA = require('../../Mediator').mySEA
const finalReqs = await CommonUtils.asyncMap(ereqs, async er => {
/** @type {StoredRequest} */
const r = {
handshakeAddress: await SEA.decrypt(er.handshakeAddress, mySecret),
recipientPub: await SEA.decrypt(er.recipientPub, mySecret),
sentReqID: await SEA.decrypt(er.sentReqID, mySecret),
timestamp: er.timestamp
}
return r
})
currentStoredReqs = finalReqs
storedRequestsListeners.forEach(l => l(currentStoredReqs))
}
let storedReqsSubbed = false
/**
* @param {StoredRequestsListener} cb
*/
const onStoredReqs = cb => {
storedRequestsListeners.add(cb)
if (!storedReqsSubbed) {
require('../../Mediator')
.getUser()
.get(Key.STORED_REQS)
.open(d => {
if (typeof d === 'object' && d !== null) {
//@ts-ignore
encryptedStoredReqs = /** @type {StoredRequest[]} */ (Object.values(
d
).filter(i => Schema.isStoredRequest(i)))
}
processStoredReqs()
})
storedReqsSubbed = true
}
cb(currentStoredReqs)
return () => {
storedRequestsListeners.delete(cb)
}
}
module.exports = {
onPubToIncoming: require('./pubToIncoming').onPubToIncoming,
getPubToIncoming: require('./pubToIncoming').getPubToIncoming,
setPubToIncoming: require('./pubToIncoming').setPubToIncoming,
onPubToFeed: require('./pubToFeed').onPubToFeed,
getPubToFeed: require('./pubToFeed').getPubToFeed,
onStoredReqs,
getStoredReqs,
onAddresses: require('./addresses').onAddresses,
getAddresses: require('./addresses').getAddresses,
onLastSentReqIDs: require('./lastSentReqID').onLastSentReqIDs,
getSentReqIDs: require('./lastSentReqID').getSentReqIDs,
PubToIncoming: require('./pubToIncoming'),
getPubToLastSeenApp: require('./pubToLastSeenApp').getPubToLastSeenApp,
onPubToLastSeenApp: require('./pubToLastSeenApp').on
}

View file

@ -1,56 +0,0 @@
/** @format */
const logger = require('winston')
const { Constants } = require('shock-common')
const Key = require('../key')
/** @type {Record<string, string|null|undefined>} */
let pubToLastSentReqID = {}
/** @type {Set<() => void>} */
const listeners = new Set()
const notify = () => listeners.forEach(l => l())
let subbed = false
/**
* @param {() => void} cb
*/
const onLastSentReqIDs = cb => {
listeners.add(cb)
cb()
if (!subbed) {
const user = require('../../Mediator').getUser()
if (!user.is) {
logger.warn('lastSentReqID() -> tried to sub without authing')
throw new Error(Constants.ErrorCode.NOT_AUTH)
}
user.get(Key.USER_TO_LAST_REQUEST_SENT).open(data => {
if (typeof data === 'object' && data !== null) {
for (const [pub, id] of Object.entries(data)) {
if (typeof id === 'string' || id === null) {
pubToLastSentReqID[pub] = id
}
}
} else {
pubToLastSentReqID = {}
}
notify()
})
subbed = true
}
return () => {
listeners.delete(cb)
}
}
const getSentReqIDs = () => pubToLastSentReqID
module.exports = {
onLastSentReqIDs,
getSentReqIDs
}

View file

@ -1,260 +0,0 @@
/** @format */
const uuidv1 = require('uuid/v1')
const logger = require('winston')
const debounce = require('lodash/debounce')
const { Schema, Utils: CommonUtils } = require('shock-common')
const size = require('lodash/size')
const Key = require('../key')
const Utils = require('../utils')
/**
* @typedef {import('shock-common').Schema.ChatMessage} Message
* @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData
*/
const PubToIncoming = require('./pubToIncoming')
/**
* @typedef {Record<string, Message[]|null|undefined|'disconnected'>} Feeds
* @typedef {(feeds: Feeds) => void} FeedsListener
*/
/** @type {Set<FeedsListener>} */
const feedsListeners = new Set()
/**
* @type {Feeds}
*/
let pubToFeed = {}
const getPubToFeed = () => pubToFeed
feedsListeners.add(() => {
logger.info(`new pubToFeed length: ${size(getPubToFeed())}`)
})
/** @param {Feeds} ptf */
const setPubToFeed = ptf => {
pubToFeed = ptf
feedsListeners.forEach(l => {
l(pubToFeed)
})
}
/**
* If at one point we subscribed to a feed, record it here. Keeps track of it
* for unsubbing.
*
* Since we can't really unsub in GUN, what we do is that each listener created
* checks the last incoming feed, if it was created for other feed that is not
* the latest, it becomes inactive.
* @type {Record<string, string|undefined|null>}
*/
const pubToLastIncoming = {}
/**
* Any pub-feed pair listener will write its update id here when fired up. Avoid
* race conditions between different listeners and between different invocations
* of the same listener.
* @type {Record<string, string>}
*/
const pubToLastUpdate = {}
/**
* Performs a sub to a pub feed pair that will only emit if it is the last
* subbed feed for that pub, according to `pubToLastIncoming`. This listener is
* not in charge of writing to the cache.
* @param {[ string , string ]} param0
* @returns {(data: OpenListenerData) => void}
*/
const onOpenForPubFeedPair = ([pub, feed]) =>
debounce(async data => {
try {
// did invalidate
if (pubToLastIncoming[pub] !== feed) {
return
}
if (
// did disconnect
data === null ||
// interpret as disconnect
typeof data !== 'object'
) {
// invalidate this listener. If a reconnection happens it will be for a
// different pub-feed pair.
pubToLastIncoming[pub] = null
setImmediate(() => {
logger.info(
`onOpenForPubFeedPair -> didDisconnect -> pub: ${pub} - feed: ${feed}`
)
})
// signal disconnect to listeners listeners should rely on pubToFeed for
// disconnect status instead of pub-to-incoming. Only the latter will
// detect remote disconnection
setPubToFeed({
...getPubToFeed(),
[pub]: /** @type {'disconnected'} */ ('disconnected')
})
return
}
//@ts-ignore
const incoming = /** @type {import('shock-common').Schema.Outgoing} */ (data)
// incomplete data, let's not assume anything
if (
typeof incoming.with !== 'string' ||
typeof incoming.messages !== 'object'
) {
return
}
/** @type {import('shock-common').Schema.ChatMessage[]} */
const newMsgs = Object.entries(incoming.messages)
// filter out messages with incomplete data
.filter(([_, msg]) => Schema.isMessage(msg))
.map(([id, msg]) => {
/** @type {import('shock-common').Schema.ChatMessage} */
const m = {
// we'll decrypt later
body: msg.body,
id,
outgoing: false,
timestamp: msg.timestamp
}
return m
})
if (newMsgs.length === 0) {
setPubToFeed({
...getPubToFeed(),
[pub]: []
})
return
}
const thisUpdate = uuidv1()
pubToLastUpdate[pub] = thisUpdate
const user = require('../../Mediator').getUser()
if (!user.is) {
logger.warn('pubToFeed -> onOpenForPubFeedPair() -> user is not auth')
}
const SEA = require('../../Mediator').mySEA
const ourSecret = await SEA.secret(await Utils.pubToEpub(pub), user._.sea)
const decryptedMsgs = await CommonUtils.asyncMap(newMsgs, async m => {
/** @type {import('shock-common').Schema.ChatMessage} */
const decryptedMsg = {
...m,
body: await SEA.decrypt(m.body, ourSecret)
}
return decryptedMsg
})
// this listener got invalidated while we were awaiting the async operations
// above.
if (pubToLastUpdate[pub] !== thisUpdate) {
return
}
setPubToFeed({
...getPubToFeed(),
[pub]: decryptedMsgs
})
} catch (err) {
logger.warn(`error inside pub to pk-feed pair: ${pub} -- ${feed}`)
logger.error(err)
}
}, 750)
const react = () => {
const pubToIncoming = PubToIncoming.getPubToIncoming()
const gun = require('../../Mediator').getGun()
/** @type {Feeds} */
const newPubToFeed = {}
for (const [pub, inc] of Object.entries(pubToIncoming)) {
/**
* empty string -> null
* @type {string|null}
*/
const newIncoming = inc || null
if (
// if disconnected, the same incoming feed will try to overwrite the
// nulled out pubToLastIncoming[pub] entry. Making the listener for that
// pub feed pair fire up again, etc. Now. When the user disconnects from
// this side of things. He will overwrite the pub to incoming with null.
// Let's allow that.
newIncoming === pubToLastIncoming[pub] &&
!(pubToFeed[pub] === 'disconnected' && newIncoming === null)
) {
// eslint-disable-next-line no-continue
continue
}
// will invalidate stale listeners (a listener for an outdated incoming feed
// id)
pubToLastIncoming[pub] = newIncoming
// Invalidate pending writes from stale listener(s) for the old incoming
// address.
pubToLastUpdate[pub] = uuidv1()
newPubToFeed[pub] = newIncoming ? [] : null
// sub to this incoming feed
if (typeof newIncoming === 'string') {
// perform sub to pub-incoming_feed pair
// leave all of the sideffects from this for the next tick
setImmediate(() => {
gun
.user(pub)
.get(Key.OUTGOINGS)
.get(newIncoming)
.open(onOpenForPubFeedPair([pub, newIncoming]))
})
}
}
if (Object.keys(newPubToFeed).length > 0) {
setPubToFeed({
...getPubToFeed(),
...newPubToFeed
})
}
}
let subbed = false
/**
* Array.isArray(pubToFeed[pub]) means a Handshake is in place, look for
* incoming messages here.
* pubToIncoming[pub] === null means a disconnection took place.
* typeof pubToIncoming[pub] === 'undefined' means none of the above.
* @param {FeedsListener} cb
* @returns {() => void}
*/
const onPubToFeed = cb => {
feedsListeners.add(cb)
cb(getPubToFeed())
if (!subbed) {
PubToIncoming.onPubToIncoming(react)
subbed = true
}
return () => {
feedsListeners.delete(cb)
}
}
module.exports = {
getPubToFeed,
setPubToFeed,
onPubToFeed
}

View file

@ -1,105 +0,0 @@
/** @format */
const uuidv1 = require('uuid/v1')
const debounce = require('lodash/debounce')
const logger = require('winston')
const { Utils: CommonUtils } = require('shock-common')
const size = require('lodash/size')
const { USER_TO_INCOMING } = require('../key')
/** @typedef {import('../SimpleGUN').OpenListenerData} OpenListenerData */
/**
* @typedef {Record<string, string|null|undefined>} PubToIncoming
*/
/** @type {Set<() => void>} */
const listeners = new Set()
/** @type {PubToIncoming} */
let pubToIncoming = {}
const getPubToIncoming = () => pubToIncoming
/**
* @param {PubToIncoming} pti
* @returns {void}
*/
const setPubToIncoming = pti => {
pubToIncoming = pti
listeners.forEach(l => l())
}
let latestUpdate = uuidv1()
listeners.add(() => {
logger.info(`new pubToIncoming length: ${size(getPubToIncoming())}`)
})
const onOpen = debounce(async uti => {
const SEA = require('../../Mediator').mySEA
const mySec = require('../../Mediator').getMySecret()
const thisUpdate = uuidv1()
latestUpdate = thisUpdate
if (typeof uti !== 'object' || uti === null) {
setPubToIncoming({})
return
}
/** @type {PubToIncoming} */
const newPubToIncoming = {}
await CommonUtils.asyncForEach(
Object.entries(uti),
async ([pub, encFeedID]) => {
if (encFeedID === null) {
newPubToIncoming[pub] = null
return
}
if (typeof encFeedID === 'string') {
newPubToIncoming[pub] = await SEA.decrypt(encFeedID, mySec)
}
}
)
// avoid old data from overwriting new data if decrypting took longer to
// process for the older open() call than for the newer open() call
if (latestUpdate === thisUpdate) {
setPubToIncoming(newPubToIncoming)
}
}, 750)
let subbed = false
/**
* @param {() => void} cb
* @returns {() => void}
*/
const onPubToIncoming = cb => {
if (!listeners.add(cb)) {
throw new Error('Tried to subscribe twice')
}
cb()
if (!subbed) {
const user = require('../../Mediator').getUser()
if (!user.is) {
logger.warn(`subscribing to pubToIncoming on a unauth user`)
}
user.get(USER_TO_INCOMING).open(onOpen)
subbed = true
}
return () => {
if (!listeners.delete(cb)) {
throw new Error('Tried to unsubscribe twice')
}
}
}
module.exports = {
getPubToIncoming,
setPubToIncoming,
onPubToIncoming
}

View file

@ -2,7 +2,7 @@
* @format
*/
/* eslint-disable init-declarations */
const logger = require('winston')
const logger = require('../../../../config/log')
const { Constants, Utils: CommonUtils } = require('shock-common')
const Key = require('../key')
@ -128,8 +128,7 @@ const tryAndWait = async (promGen, shouldRetry = () => false) => {
return resolvedValue
}
} catch (e) {
logger.error(e)
logger.info(JSON.stringify(e))
console.log(e)
if (e.message === Constants.ErrorCode.NOT_AUTH) {
throw e
}
@ -161,7 +160,7 @@ const tryAndWait = async (promGen, shouldRetry = () => false) => {
return resolvedValue
}
} catch (e) {
logger.error(e)
console.log(e)
if (e.message === Constants.ErrorCode.NOT_AUTH) {
throw e
}
@ -193,7 +192,7 @@ const tryAndWait = async (promGen, shouldRetry = () => false) => {
return resolvedValue
}
} catch (e) {
logger.error(e)
console.log(e)
if (e.message === Constants.ErrorCode.NOT_AUTH) {
throw e
}
@ -239,78 +238,6 @@ const pubToEpub = async pub => {
}
}
/**
* Should only be called with a recipient pub that has already been contacted.
* If returns null, a disconnect happened.
* @param {string} recipientPub
* @returns {Promise<string|null>}
*/
const recipientPubToLastReqSentID = async recipientPub => {
const maybeLastReqSentID = await tryAndWait(
(_, user) => {
const userToLastReqSent = user.get(Key.USER_TO_LAST_REQUEST_SENT)
return userToLastReqSent.get(recipientPub).then()
},
// retry on undefined, in case it is a false negative
v => typeof v === 'undefined'
)
if (typeof maybeLastReqSentID !== 'string') {
return null
}
return maybeLastReqSentID
}
/**
* @param {string} recipientPub
* @returns {Promise<boolean>}
*/
const successfulHandshakeAlreadyExists = async recipientPub => {
const maybeIncomingID = await tryAndWait((_, user) => {
const userToIncoming = user.get(Key.USER_TO_INCOMING)
return userToIncoming.get(recipientPub).then()
})
const maybeOutgoingID = await tryAndWait((_, user) => {
const recipientToOutgoing = user.get(Key.RECIPIENT_TO_OUTGOING)
return recipientToOutgoing.get(recipientPub).then()
})
return (
typeof maybeIncomingID === 'string' && typeof maybeOutgoingID === 'string'
)
}
/**
* @param {string} recipientPub
* @returns {Promise<string|null>}
*/
const recipientToOutgoingID = async recipientPub => {
const maybeEncryptedOutgoingID = await tryAndWait(
(_, user) =>
user
.get(Key.RECIPIENT_TO_OUTGOING)
.get(recipientPub)
.then(),
// force retry in case undefined is a false negative
v => typeof v === 'undefined'
)
if (typeof maybeEncryptedOutgoingID === 'string') {
const outgoingID = await require('../../Mediator/index').mySEA.decrypt(
maybeEncryptedOutgoingID,
await mySecret()
)
return outgoingID || null
}
return null
}
/**
* @param {import('../SimpleGUN').ListenerData} listenerData
* @returns {listenerData is import('../SimpleGUN').ListenerObj}
@ -351,9 +278,6 @@ module.exports = {
dataHasSoul,
delay,
pubToEpub,
recipientPubToLastReqSentID,
successfulHandshakeAlreadyExists,
recipientToOutgoingID,
tryAndWait,
mySecret,
promisifyGunNode: require('./promisifygun'),

View file

@ -27,12 +27,15 @@ const PATH_SEPARATOR = '>'
/**
* @param {ValidDataValue} value
* @param {string} publicKey
* @param {string=} epubForDecryption
* @returns {Promise<ValidDataValue>}
*/
const deepDecryptIfNeeded = async (value, publicKey) => {
const deepDecryptIfNeeded = async (value, publicKey, epubForDecryption) => {
if (Schema.isObj(value)) {
return Bluebird.props(
mapValues(value, o => deepDecryptIfNeeded(o, publicKey))
mapValues(value, o =>
deepDecryptIfNeeded(o, publicKey, epubForDecryption)
)
)
}
@ -49,7 +52,15 @@ const deepDecryptIfNeeded = async (value, publicKey) => {
if (user.is.pub === publicKey || 'me' === publicKey) {
sec = getMySecret()
} else {
sec = await SEA.secret(await pubToEpub(publicKey), user._.sea)
sec = await SEA.secret(
await (() => {
if (epubForDecryption) {
return epubForDecryption
}
return pubToEpub(publicKey)
})(),
user._.sea
)
}
const decrypted = SEA.decrypt(value, sec)
@ -81,6 +92,7 @@ async function deepEncryptIfNeeded(value) {
}
const pk = /** @type {string|undefined} */ (value.$$__ENCRYPT__FOR)
const epub = /** @type {string|undefined} */ (value.$$__EPUB__FOR)
if (!pk) {
return Bluebird.props(mapValues(value, deepEncryptIfNeeded))
@ -93,7 +105,15 @@ async function deepEncryptIfNeeded(value) {
if (pk === u.is.pub || pk === 'me') {
encryptedValue = await SEA.encrypt(actualValue, getMySecret())
} else {
const sec = await SEA.secret(await pubToEpub(pk), u._.sea)
const sec = await SEA.secret(
await (() => {
if (epub) {
return epub
}
return pubToEpub(pk)
})(),
u._.sea
)
encryptedValue = await SEA.encrypt(actualValue, sec)
}
@ -187,7 +207,13 @@ const put = async (rawPath, value) => {
await makePromise((res, rej) => {
node.put(/** @type {ValidDataValue} */ (theValue), ack => {
if (ack.err && typeof ack.err !== 'number') {
rej(new Error(ack.err))
if (typeof ack.err === 'string') {
rej(new Error(ack.err))
} else {
console.log(`NON STANDARD GUN ERROR:`)
console.log(ack)
rej(new Error(JSON.stringify(ack.err, null, 4)))
}
} else {
res()
}

View file

@ -2,20 +2,19 @@
* @format
*/
const logger = require('winston')
const logger = require('../../../config/log')
const Common = require('shock-common')
const uuidv4 = require('uuid/v4')
const { getGun, getUser, isAuthenticated } = require('../Mediator')
const { deepDecryptIfNeeded } = require('../rpc')
const Subscriptions = require('./subscriptions')
const GunEvents = require('../contact-api/events')
const GunActions = require('../../gunDB/contact-api/actions')
const {
encryptedEmit,
encryptedOn,
encryptedCallback
} = require('../../../utils/ECC/socket')
const auth = require('../../auth/auth')
const ALLOWED_GUN_METHODS = [
'map',
@ -36,28 +35,6 @@ const ALLOWED_GUN_METHODS = [
* @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions
*/
/**
* @param {string} token
* @returns {Promise<boolean>}
*/
const isValidToken = async token => {
const validation = await auth.validateToken(token)
if (typeof validation !== 'object') {
return false
}
if (validation === null) {
return false
}
if (typeof validation.valid !== 'boolean') {
return false
}
return validation.valid
}
/**
* @param {string} root
*/
@ -120,13 +97,18 @@ const executeGunQuery = (query, method, listener) => {
* @param {string} queryData.publicKeyForDecryption
* @param {string} queryData.subscriptionId
* @param {string} queryData.deviceId
* @param {string=} queryData.epubForDecryption
* @param {string=} queryData.epubField If the epub is included in the received
* data itself. Handshake requests for example, have an epub field.
* @returns {GunListener}
*/
const queryListenerCallback = ({
emit,
publicKeyForDecryption,
subscriptionId,
deviceId
deviceId,
epubForDecryption,
epubField
}) => async (data, key, _msg, event) => {
try {
const subscription = Subscriptions.get({
@ -141,8 +123,38 @@ const queryListenerCallback = ({
})
}
const eventName = `query:data`
if (publicKeyForDecryption) {
const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption)
if (publicKeyForDecryption?.length > 15 || epubForDecryption || epubField) {
const decData = await deepDecryptIfNeeded(
data,
publicKeyForDecryption,
(() => {
if (epubField) {
if (Common.isObj(data)) {
const epub = data[epubField]
if (Common.isPopulatedString(epub)) {
return epub
}
logger.error(
`Got epubField in a rifle query, but the resulting value obtained is not an string -> `,
{
data,
epub
}
)
} else {
logger.warn(
`Got epubField in a rifle query for a non-object data -> `,
{
epubField,
data
}
)
}
}
return epubForDecryption
})()
)
emit(eventName, { subscriptionId, response: { data: decData, key } })
return
}
@ -153,84 +165,6 @@ const queryListenerCallback = ({
}
}
/**
* @param {Object} GunSocketOptions
* @param {() => (import('./subscriptions').Unsubscribe | void)} GunSocketOptions.handler
* @param {string} GunSocketOptions.subscriptionId
* @param {string} GunSocketOptions.encryptionId
* @param {import('socket.io').Socket} GunSocketOptions.socket
* @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise<void>}
*/
const wrap = ({ handler, subscriptionId, encryptionId, socket }) => async (
{ reconnect, token },
response
) => {
try {
logger.info('Subscribe function executing...')
if (!isAuthenticated()) {
logger.warn('GunDB is not yet authenticated')
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const callback = encryptedCallback(socket, response)
const emit = encryptedEmit(socket)
const subscription = Subscriptions.get({
deviceId: encryptionId,
subscriptionId
})
if (subscription && !reconnect) {
const error = {
field: 'subscription',
message:
"You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true "
}
logger.error('Duplicate subscription:', error)
callback(error)
return
}
if (subscription && reconnect) {
if (subscription.unsubscribe) {
subscription.unsubscribe()
}
Subscriptions.remove({
deviceId: encryptionId,
subscriptionId
})
}
if (!subscription || reconnect) {
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token specified')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const unsubscribe = handler()
if (unsubscribe) {
Subscriptions.attachUnsubscribe({
deviceId: encryptionId,
subscriptionId,
unsubscribe
})
}
}
callback(null, {
message: 'Subscribed successfully!',
success: true
})
} catch (error) {
logger.error('Socket wrapper error:', error)
}
}
/** @param {import('socket.io').Socket} socket */
const startSocket = socket => {
try {
@ -243,7 +177,16 @@ const startSocket = socket => {
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
}
on('subscribe:query', ({ $shock, publicKey }, response) => {
if (isAuthenticated()) {
socket.onAny(() => {
GunActions.setLastSeenApp().catch(e =>
console.log('error setting last seen app', e)
)
})
}
on('subscribe:query', (query, response) => {
const { $shock, publicKey, epubForDecryption, epubField } = query
const subscriptionId = uuidv4()
try {
if (!isAuthenticated()) {
@ -270,7 +213,9 @@ const startSocket = socket => {
emit,
publicKeyForDecryption: publicKey,
subscriptionId,
deviceId: encryptionId
deviceId: encryptionId,
epubForDecryption,
epubField
})
socketCallback(null, {
@ -286,112 +231,6 @@ const startSocket = socket => {
}
})
const onChats = () => {
return GunEvents.onChats(chats => {
const processed = chats.map(
({
didDisconnect,
id,
lastSeenApp,
messages,
recipientPublicKey
}) => {
/** @type {Common.Schema.Chat} */
const stripped = {
didDisconnect,
id,
lastSeenApp,
messages,
recipientAvatar: null,
recipientDisplayName: null,
recipientPublicKey
}
return stripped
}
)
emit('chats', processed)
})
}
on(
'subscribe:chats',
wrap({
handler: onChats,
encryptionId,
subscriptionId: 'chats',
socket
})
)
const onSentRequests = () => {
return GunEvents.onSimplerSentRequests(sentReqs => {
const processed = sentReqs.map(
({
id,
recipientChangedRequestAddress,
recipientPublicKey,
timestamp
}) => {
/**
* @type {Common.Schema.SimpleSentRequest}
*/
const stripped = {
id,
recipientAvatar: null,
recipientChangedRequestAddress,
recipientDisplayName: null,
recipientPublicKey,
timestamp
}
return stripped
}
)
emit('sentRequests', processed)
})
}
on(
'subscribe:sentRequests',
wrap({
handler: onSentRequests,
encryptionId,
subscriptionId: 'sentRequests',
socket
})
)
const onReceivedRequests = () => {
return GunEvents.onSimplerReceivedRequests(receivedReqs => {
const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => {
/** @type {Common.Schema.SimpleReceivedRequest} */
const stripped = {
id,
requestorAvatar: null,
requestorDisplayName: null,
requestorPK,
timestamp
}
return stripped
})
emit('receivedRequests', processed)
})
}
on(
'subscribe:receivedRequests',
wrap({
handler: onReceivedRequests,
encryptionId,
subscriptionId: 'receivedRequests',
socket
})
)
on('unsubscribe', ({ subscriptionId }, response) => {
const callback = encryptedCallback(socket, response)
Subscriptions.remove({ deviceId: encryptionId, subscriptionId })

View file

@ -1,5 +1,5 @@
const Crypto = require('crypto')
const logger = require('winston')
const logger = require('../../config/log')
const Common = require('shock-common')
const getGunUser = () => require('../gunDB/Mediator').getUser()
const isAuthenticated = () => require('../gunDB/Mediator').isAuthenticated()

File diff suppressed because it is too large Load diff

View file

@ -47,10 +47,7 @@ const server = program => {
const tunnelHost = process.env.LOCAL_TUNNEL_SERVER || defaults.localtunnelHost
// setup winston logging ==========
const logger = require('../config/log')(
program.logfile || defaults.logfile,
program.loglevel || defaults.loglevel
)
const logger = require('../config/log')
CommonLogger.setLogger(logger)

View file

@ -3,7 +3,7 @@
*/
// @ts-check
const logger = require('winston')
const logger = require('../config/log')
const Common = require('shock-common')
const mapValues = require('lodash/mapValues')

View file

@ -2,7 +2,7 @@
* @format
*/
const Common = require('shock-common')
const logger = require('winston')
const logger = require('../../config/log')
const { safeParseJSON } = require('../JSON')
const ECC = require('./index')

View file

@ -3,7 +3,7 @@
*/
const Crypto = require('crypto')
const { Buffer } = require('buffer')
const logger = require('winston')
const logger = require('../config/log')
const APIKeyPair = new Map()
const authorizedDevices = new Map()

View file

@ -1,44 +1,37 @@
const logger = require('winston')
const logger = require('../../config/log')
const fetch = require('node-fetch')
const Storage = require('node-persist')
const { listPeers, connectPeer,getInfo } = require('./v2')
const handlerBaseUrl = "https://channels.shock.network:4444"
/**
*
* @param {string} inviteFromAuth
*/
module.exports = async (inviteFromAuth) => {
module.exports = async () => {
console.log("DOING CHANNEL INVITE THING: START")
/**
* @type string | undefined
*/
const invite = inviteFromAuth || process.env.HOSTING_INVITE
const invite = process.env.HOSTING_INVITE
if(!invite) {
console.log("DOING CHANNEL INVITE THING: NVM NO INVITE")
return
}
try{
await Storage.getItem('processedInvites')
} catch(e) {
await Storage.setItem('processedInvites',[])
}
try {
/**
* @type string[]
*/
const invites = await Storage.getItem('processedInvites')
const invites = await Storage.getItem('processedInvites') || []
if(invites.includes(invite)){
console.log("DOING CHANNEL INVITE THING: INVITE PROCESSED ALREADY")
return
}
const me = await getInfo()
const {identity_pubkey} = me
//@ts-expect-error
const connectReq = await fetch(`${handlerBaseUrl}/connect`)
if(connectReq.status !== 200 ){
console.log("DOING CHANNEL INVITE THING: CONNECT FAILED")
return
}
const connJson = await connectReq.json()
const [uri] = connJson.uris
const [pub,host] = uri.split("@")
@ -46,22 +39,29 @@ module.exports = async (inviteFromAuth) => {
if(peers.findIndex(peer => peer.pub_key === pub) === -1){
await connectPeer(pub,host)
}
const channelReq = {
userPubKey:identity_pubkey,
invite,
lndTo:pub,
}
//@ts-expect-error
const res = await fetch(`${handlerBaseUrl}/channel`,{
method:'POST',
body:JSON.stringify({
userPubKey:identity_pubkey,
invite,
lndTo:pub,
})
headers: {
'Content-Type': 'application/json'
},
body:JSON.stringify(channelReq)
})
if(res.status !== 200 ){
console.log("DOING CHANNEL INVITE THING: FAILED ")
return
}
invites.push(invite)
await Storage.setItem('processedInvites',invites)
console.log("DOING CHANNEL INVITE THING: DONE!")
} catch(e){
logger.error("error sending invite to channels handler")
console.log("DOING CHANNEL INVITE THING: :(")
console.error(e)
}

View file

@ -2,7 +2,7 @@
* @format
*/
const Crypto = require('crypto')
const logger = require('winston')
const logger = require('../../config/log')
const Common = require('shock-common')
const Ramda = require('ramda')