Merge pull request #346 from shocknet/feature/sockets-fix

Feature/sockets fix
This commit is contained in:
CapDog 2021-04-21 17:51:36 -04:00 committed by GitHub
commit 87d639cf24
5 changed files with 604 additions and 516 deletions

View file

@ -257,7 +257,7 @@ const Config = require('../config')
/**
* @typedef {object} SimpleSocket
* @prop {(eventName: string, data?: Emission|EncryptedEmissionLegacy|EncryptedEmission|ValidDataValue) => void} emit
* @prop {(eventName: string, handler: (data: any) => void) => void} on
* @prop {(eventName: string, handler: (data: any, callback: (err?: any, data?: any) => void) => void) => void} on
* @prop {{ auth: { [key: string]: any } }} handshake
*/

View file

@ -0,0 +1,407 @@
/**
* @format
*/
const logger = require('winston')
const Common = require('shock-common')
const uuidv4 = require('uuid/v4')
const { getGun, getUser, isAuthenticated } = require('../Mediator')
const { deepDecryptIfNeeded } = require('../rpc')
const Subscriptions = require('./subscriptions')
const GunEvents = require('../contact-api/events')
const {
encryptedEmit,
encryptedOn,
encryptedCallback
} = require('../../../utils/ECC/socket')
const auth = require('../../auth/auth')
const ALLOWED_GUN_METHODS = [
'map',
'map.on',
'on',
'once',
'load',
'then',
'open'
]
/**
* @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue
*/
/**
* @typedef {(data: ValidDataValue, key?: string, _msg?: any, event?: any) => (void | Promise<void>)} GunListener
* @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions
*/
/**
* @param {string} token
* @returns {Promise<boolean>}
*/
const isValidToken = async token => {
const validation = await auth.validateToken(token)
if (typeof validation !== 'object') {
return false
}
if (validation === null) {
return false
}
if (typeof validation.valid !== 'boolean') {
return false
}
return validation.valid
}
/**
* @param {string} root
*/
const getNode = root => {
if (root === '$gun') {
return getGun()
}
if (root === '$user') {
return getUser()
}
return getGun().user(root)
}
/**
* @param {import("../contact-api/SimpleGUN").GUNNode} node
* @param {string} path
*/
const getGunQuery = (node, path) => {
const bits = path.split('>')
const query = bits.reduce((gunQuery, bit) => gunQuery.get(bit), node)
return query
}
/**
* Executes a GunDB query call using the specified method
* @param {any} query
* @param {string} method
* @param {GunListener} listener
*/
const executeGunQuery = (query, method, listener) => {
if (!ALLOWED_GUN_METHODS.includes(method)) {
throw {
field: 'method',
message: `Invalid GunDB method specified (${method}). `
}
}
if (method === 'on') {
return query.on(listener)
}
if (method === 'open') {
return query.open(listener)
}
if (method === 'map.on') {
return query.map().on(listener)
}
if (method === 'map.once') {
return query.map().once(listener)
}
}
/**
* @param {Object} queryData
* @param {(eventName: string, ...args: any[]) => Promise<void>} queryData.emit
* @param {string} queryData.publicKeyForDecryption
* @param {string} queryData.subscriptionId
* @param {string} queryData.deviceId
* @returns {GunListener}
*/
const queryListenerCallback = ({
emit,
publicKeyForDecryption,
subscriptionId,
deviceId
}) => async (data, key, _msg, event) => {
try {
const subscription = Subscriptions.get({
deviceId,
subscriptionId
})
if (subscription && !subscription.unsubscribe && event) {
Subscriptions.attachUnsubscribe({
deviceId,
subscriptionId,
unsubscribe: () => event.off()
})
}
const eventName = `query:data`
if (publicKeyForDecryption?.length > 15) {
const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption)
emit(eventName, { subscriptionId, response: { data: decData, key } })
return
}
emit(eventName, { subscriptionId, response: { data, key } })
} catch (err) {
logger.error(`Error for gun rpc socket: ${err.message}`)
}
}
/**
* @param {Object} GunSocketOptions
* @param {() => (import('./subscriptions').Unsubscribe | void)} GunSocketOptions.handler
* @param {string} GunSocketOptions.subscriptionId
* @param {string} GunSocketOptions.encryptionId
* @param {import('socket.io').Socket} GunSocketOptions.socket
* @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise<void>}
*/
const wrap = ({ handler, subscriptionId, encryptionId, socket }) => async (
{ reconnect, token },
response
) => {
try {
logger.info('Subscribe function executing...')
if (!isAuthenticated()) {
logger.warn('GunDB is not yet authenticated')
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const callback = encryptedCallback(socket, response)
const emit = encryptedEmit(socket)
const subscription = Subscriptions.get({
deviceId: encryptionId,
subscriptionId
})
if (subscription && !reconnect) {
const error = {
field: 'subscription',
message:
"You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true "
}
logger.error('Duplicate subscription:', error)
callback(error)
return
}
if (subscription && reconnect) {
if (subscription.unsubscribe) {
subscription.unsubscribe()
}
Subscriptions.remove({
deviceId: encryptionId,
subscriptionId
})
}
if (!subscription || reconnect) {
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token specified')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const unsubscribe = handler()
if (unsubscribe) {
Subscriptions.attachUnsubscribe({
deviceId: encryptionId,
subscriptionId,
unsubscribe
})
}
}
callback(null, {
message: 'Subscribed successfully!',
success: true
})
} catch (error) {
logger.error('Socket wrapper error:', error)
}
}
/** @param {import('socket.io').Socket} socket */
const startSocket = socket => {
try {
const emit = encryptedEmit(socket)
const on = encryptedOn(socket)
const { encryptionId } = socket.handshake.auth
on('subscribe:query', ({ $shock, publicKey }, response) => {
const subscriptionId = uuidv4()
try {
if (!isAuthenticated()) {
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const [root, path, method] = $shock.split('::')
const socketCallback = encryptedCallback(socket, response)
if (!ALLOWED_GUN_METHODS.includes(method)) {
socketCallback(
`Invalid method for gun rpc call: ${method}, query: ${$shock}`
)
return
}
Subscriptions.add({
deviceId: encryptionId,
subscriptionId
})
const queryCallback = queryListenerCallback({
emit,
publicKeyForDecryption: publicKey,
subscriptionId,
deviceId: encryptionId
})
socketCallback(null, {
subscriptionId
})
const node = getNode(root)
const query = getGunQuery(node, path)
executeGunQuery(query, method, queryCallback)
} catch (error) {
emit(`query:error`, { subscriptionId, response: { data: error } })
}
})
const onChats = () => {
return GunEvents.onChats(chats => {
const processed = chats.map(
({
didDisconnect,
id,
lastSeenApp,
messages,
recipientPublicKey
}) => {
/** @type {Common.Schema.Chat} */
const stripped = {
didDisconnect,
id,
lastSeenApp,
messages,
recipientAvatar: null,
recipientDisplayName: null,
recipientPublicKey
}
return stripped
}
)
emit('chats', processed)
})
}
on(
'subscribe:chats',
wrap({
handler: onChats,
encryptionId,
subscriptionId: 'chats',
socket
})
)
const onSentRequests = () => {
return GunEvents.onSimplerSentRequests(sentReqs => {
const processed = sentReqs.map(
({
id,
recipientChangedRequestAddress,
recipientPublicKey,
timestamp
}) => {
/**
* @type {Common.Schema.SimpleSentRequest}
*/
const stripped = {
id,
recipientAvatar: null,
recipientChangedRequestAddress,
recipientDisplayName: null,
recipientPublicKey,
timestamp
}
return stripped
}
)
emit('sentRequests', processed)
})
}
on(
'subscribe:sentRequests',
wrap({
handler: onSentRequests,
encryptionId,
subscriptionId: 'sentRequests',
socket
})
)
const onReceivedRequests = () => {
return GunEvents.onSimplerReceivedRequests(receivedReqs => {
const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => {
/** @type {Common.Schema.SimpleReceivedRequest} */
const stripped = {
id,
requestorAvatar: null,
requestorDisplayName: null,
requestorPK,
timestamp
}
return stripped
})
emit('receivedRequests', processed)
})
}
on(
'subscribe:receivedRequests',
wrap({
handler: onReceivedRequests,
encryptionId,
subscriptionId: 'receivedRequests',
socket
})
)
on('unsubscribe', ({ subscriptionId }, response) => {
const callback = encryptedCallback(socket, response)
Subscriptions.remove({ deviceId: encryptionId, subscriptionId })
callback(null, {
message: 'Unsubscribed successfully!',
success: true
})
})
socket.on('disconnect', () => {
Subscriptions.removeDevice({ deviceId: encryptionId })
})
} catch (err) {
logger.error('GUNRPC: ' + err.message)
}
}
module.exports = startSocket

View file

@ -0,0 +1,127 @@
/**
* @typedef {() => void} Unsubscribe
*/
/** @type {Map<string, Map<string, { subscriptionId: string, unsubscribe?: () => void, metadata?: object }>>} */
const userSubscriptions = new Map()
/**
* Adds a new Subscription
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
* @param {(Unsubscribe)=} subscription.unsubscribe
* @param {(object)=} subscription.metadata
*/
const add = ({ deviceId, subscriptionId, unsubscribe, metadata }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
const subscriptions = deviceSubscriptions ?? new Map()
subscriptions.set(subscriptionId, {
subscriptionId,
unsubscribe,
metadata
})
userSubscriptions.set(deviceId, subscriptions)
}
/**
* Adds a new Subscription
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
* @param {Unsubscribe} subscription.unsubscribe
*/
const attachUnsubscribe = ({
deviceId,
subscriptionId,
unsubscribe
}) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
const subscriptions = deviceSubscriptions
if (!subscriptions) {
return
}
const subscription = subscriptions.get(subscriptionId)
if (!subscription) {
return
}
subscriptions.set(subscriptionId, {
...subscription,
unsubscribe
})
userSubscriptions.set(deviceId, subscriptions)
}
/**
* Unsubscribes from a GunDB query
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
*/
const remove = ({ deviceId, subscriptionId }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
const subscriptions = deviceSubscriptions ?? new Map()
const subscription = subscriptions.get(subscriptionId);
if (subscription?.unsubscribe) {
subscription.unsubscribe()
}
subscriptions.delete(subscriptionId)
userSubscriptions.set(deviceId, subscriptions)
}
/**
* Unsubscribes from all GunDB queries for a specific device
* @param {Object} subscription
* @param {string} subscription.deviceId
*/
const removeDevice = ({ deviceId }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId);
if (!deviceSubscriptions) {
return
}
Array.from(deviceSubscriptions.values()).map(subscription => {
if (subscription && subscription.unsubscribe) {
subscription.unsubscribe()
}
return subscription
})
userSubscriptions.set(deviceId, new Map());
}
/**
* Retrieves the specified subscription's info if it exists
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
*/
const get = ({ deviceId, subscriptionId }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
if (!deviceSubscriptions) {
return false
}
const subscription = deviceSubscriptions.get(subscriptionId)
return subscription
}
module.exports = {
add,
attachUnsubscribe,
get,
remove,
removeDevice
}

View file

@ -8,16 +8,9 @@ const Common = require('shock-common')
const mapValues = require('lodash/mapValues')
const auth = require('../services/auth/auth')
const Encryption = require('../utils/encryptionStore')
const LightningServices = require('../utils/lightningServices')
const {
getGun,
getUser,
isAuthenticated
} = require('../services/gunDB/Mediator')
const { deepDecryptIfNeeded } = require('../services/gunDB/rpc')
const GunEvents = require('../services/gunDB/contact-api/events')
const SchemaManager = require('../services/schema')
const { isAuthenticated } = require('../services/gunDB/Mediator')
const initGunDBSocket = require('../services/gunDB/sockets')
const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket')
const TipsForwarder = require('../services/tipsCallback')
/**
@ -29,204 +22,7 @@ module.exports = (
/** @type {import('socket.io').Server} */
io
) => {
// This should be used for encrypting and emitting your data
const encryptedEmitLegacy = ({ eventName, data, socket }) => {
try {
if (Encryption.isNonEncrypted(eventName)) {
return socket.emit(eventName, data)
}
const deviceId = socket.handshake.auth['x-shockwallet-device-id']
const authorized = Encryption.isAuthorizedDevice({ deviceId })
if (!deviceId) {
throw {
field: 'deviceId',
message: 'Please specify a device ID'
}
}
if (!authorized) {
throw {
field: 'deviceId',
message: 'Please exchange keys with the API before using the socket'
}
}
const encryptedMessage = Encryption.encryptMessage({
message: data,
deviceId
})
return socket.emit(eventName, encryptedMessage)
} catch (err) {
logger.error(
`[SOCKET] An error has occurred while encrypting an event (${eventName}):`,
err
)
return socket.emit('encryption:error', err)
}
}
const onNewInvoice = (socket, subID) => {
const { lightning } = LightningServices.services
logger.warn('Subscribing to invoices socket...' + subID)
const stream = lightning.subscribeInvoices({})
stream.on('data', data => {
logger.info('[SOCKET] New invoice data:', data)
encryptedEmitLegacy({ eventName: 'invoice:new', data, socket })
if (!data.settled) {
return
}
SchemaManager.AddOrder({
type: 'invoice',
amount: parseInt(data.amt_paid_sat, 10),
coordinateHash: data.r_hash.toString('hex'),
coordinateIndex: parseInt(data.add_index, 10),
inbound: true,
toLndPub: data.payment_addr
})
})
stream.on('end', () => {
logger.info('New invoice stream ended, starting a new one...')
// Prevents call stack overflow exceptions
//process.nextTick(() => onNewInvoice(socket))
})
stream.on('error', err => {
logger.error('New invoice stream error:' + subID, err)
})
stream.on('status', status => {
logger.warn('New invoice stream status:' + subID, status)
switch (status.code) {
case 0: {
logger.info('[event:invoice:new] stream ok')
break
}
case 1: {
logger.info(
'[event:invoice:new] stream canceled, probably socket disconnected'
)
break
}
case 2: {
logger.warn('[event:invoice:new] got UNKNOWN error status')
break
}
case 12: {
logger.warn(
'[event:invoice:new] LND locked, new registration in 60 seconds'
)
process.nextTick(() =>
setTimeout(() => onNewInvoice(socket, subID), 60000)
)
break
}
case 13: {
//https://grpc.github.io/grpc/core/md_doc_statuscodes.html
logger.error('[event:invoice:new] INTERNAL LND error')
break
}
case 14: {
logger.error(
'[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...'
)
process.nextTick(() =>
setTimeout(() => onNewInvoice(socket, subID), 30000)
)
break
}
default: {
logger.error('[event:invoice:new] UNKNOWN LND error')
}
}
})
return () => {
stream.cancel()
}
}
const onNewTransaction = (socket, subID) => {
const { lightning } = LightningServices.services
const stream = lightning.subscribeTransactions({})
logger.warn('Subscribing to transactions socket...' + subID)
stream.on('data', data => {
logger.info('[SOCKET] New transaction data:', data)
Promise.all(data.dest_addresses.map(SchemaManager.isTmpChainOrder)).then(
responses => {
const hasOrder = responses.some(res => res !== false)
if (hasOrder && data.num_confirmations > 0) {
//buddy needs to manage this
} else {
//business as usual
encryptedEmitLegacy({ eventName: 'transaction:new', data, socket })
}
}
)
})
stream.on('end', () => {
logger.info('New transactions stream ended, starting a new one...')
//process.nextTick(() => onNewTransaction(socket))
})
stream.on('error', err => {
logger.error('New transactions stream error:' + subID, err)
})
stream.on('status', status => {
logger.info('New transactions stream status:' + subID, status)
switch (status.code) {
case 0: {
logger.info('[event:transaction:new] stream ok')
break
}
case 1: {
logger.info(
'[event:transaction:new] stream canceled, probably socket disconnected'
)
break
}
case 2: {
//Happens to fire when the grpc client lose access to macaroon file
logger.warn('[event:transaction:new] got UNKNOWN error status')
break
}
case 12: {
logger.warn(
'[event:transaction:new] LND locked, new registration in 60 seconds'
)
process.nextTick(() =>
setTimeout(() => onNewTransaction(socket, subID), 60000)
)
break
}
case 13: {
//https://grpc.github.io/grpc/core/md_doc_statuscodes.html
logger.error('[event:transaction:new] INTERNAL LND error')
break
}
case 14: {
logger.error(
'[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...'
)
process.nextTick(() =>
setTimeout(() => onNewTransaction(socket, subID), 30000)
)
break
}
default: {
logger.error('[event:transaction:new] UNKNOWN LND error')
}
}
})
return () => {
stream.cancel()
}
}
io.on('connection', socket => {
logger.info(`io.onconnection`)
logger.info('socket.handshake', socket.handshake)
io.on('connect', socket => {
const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET
const isNotificationsSocket = !!socket.handshake.auth
.IS_NOTIFICATIONS_SOCKET
@ -240,89 +36,6 @@ module.exports = (
const subID = Math.floor(Math.random() * 1000).toString()
const isNotifications = isNotificationsSocket ? 'notifications' : ''
logger.info('[LND] New LND Socket created:' + isNotifications + subID)
/* not used by wallet anymore
const cancelInvoiceStream = onNewInvoice(socket, subID)
const cancelTransactionStream = onNewTransaction(socket, subID)
socket.on('disconnect', () => {
logger.info('LND socket disconnected:' + isNotifications + subID)
cancelInvoiceStream()
cancelTransactionStream()
})*/
}
})
io.of('gun').on('connect', socket => {
// TODO: off()
try {
if (!isAuthenticated()) {
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const emit = encryptedEmit(socket)
const { $shock, publicKeyForDecryption } = socket.handshake.auth
const [root, path, method] = $shock.split('::')
// eslint-disable-next-line init-declarations
let node
if (root === '$gun') {
node = getGun()
} else if (root === '$user') {
node = getUser()
} else {
node = getGun().user(root)
}
for (const bit of path.split('>')) {
node = node.get(bit)
}
/**
* @param {ValidDataValue} data
* @param {string} key
*/
const listener = async (data, key) => {
try {
if (
typeof publicKeyForDecryption === 'string' &&
publicKeyForDecryption !== 'undefined' &&
publicKeyForDecryption.length > 15
) {
const decData = await deepDecryptIfNeeded(
data,
publicKeyForDecryption
)
emit('$shock', decData, key)
} else {
emit('$shock', data, key)
}
} catch (err) {
logger.error(
`Error for gun rpc socket, query ${$shock} -> ${err.message}`
)
}
}
if (method === 'on') {
node.on(listener)
} else if (method === 'open') {
node.open(listener)
} else if (method === 'map.on') {
node.map().on(listener)
} else if (method === 'map.once') {
node.map().once(listener)
} else {
throw new TypeError(
`Invalid method for gun rpc call : ${method}, query: ${$shock}`
)
}
} catch (err) {
logger.error('GUNRPC: ' + err.message)
}
})
@ -391,6 +104,10 @@ module.exports = (
}
})
io.of('gun').on('connect', socket => {
initGunDBSocket(socket)
})
/**
* @param {string} token
* @returns {Promise<boolean>}
@ -415,7 +132,7 @@ module.exports = (
/** @type {null|NodeJS.Timeout} */
let pingIntervalID = null
// TODO: Unused?
io.of('shockping').on(
'connect',
// TODO: make this sync
@ -467,225 +184,6 @@ module.exports = (
}
)
// TODO: do this through rpc
const emptyUnsub = () => {}
let chatsUnsub = emptyUnsub
io.of('chats').on('connect', async socket => {
const on = encryptedOn(socket)
const emit = encryptedEmit(socket)
try {
if (!isAuthenticated()) {
logger.info(
'not authenticated in gun for chats socket, will send NOT_AUTH'
)
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
logger.info('now checking token for chats socket')
const { token } = socket.handshake.auth
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token for chats socket')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
if (chatsUnsub !== emptyUnsub) {
logger.error(
'Tried to set chats socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead'
)
chatsUnsub()
chatsUnsub = emptyUnsub
}
/**
* @param {Common.Schema.Chat[]} chats
*/
const onChats = chats => {
const processed = chats.map(
({
didDisconnect,
id,
lastSeenApp,
messages,
recipientPublicKey
}) => {
/** @type {Common.Schema.Chat} */
const stripped = {
didDisconnect,
id,
lastSeenApp,
messages,
recipientAvatar: null,
recipientDisplayName: null,
recipientPublicKey
}
return stripped
}
)
emit('$shock', processed)
}
chatsUnsub = GunEvents.onChats(onChats)
on('disconnect', () => {
chatsUnsub()
chatsUnsub = emptyUnsub
})
} catch (e) {
logger.error('Error inside chats socket connect: ' + e.message)
emit('$error', e.message)
}
})
let sentReqsUnsub = emptyUnsub
io.of('sentReqs').on('connect', async socket => {
const on = encryptedOn(socket)
const emit = encryptedEmit(socket)
try {
if (!isAuthenticated()) {
logger.info(
'not authenticated in gun for sentReqs socket, will send NOT_AUTH'
)
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
logger.info('now checking token for sentReqs socket')
const { token } = socket.handshake.auth
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token for sentReqs socket')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
if (sentReqsUnsub !== emptyUnsub) {
logger.error(
'Tried to set sentReqs socket twice, this might be due to an app restart and the old socket not being recycled by io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead'
)
sentReqsUnsub()
sentReqsUnsub = emptyUnsub
}
/**
* @param {Common.Schema.SimpleSentRequest[]} sentReqs
*/
const onSentReqs = sentReqs => {
const processed = sentReqs.map(
({
id,
recipientChangedRequestAddress,
recipientPublicKey,
timestamp
}) => {
/**
* @type {Common.Schema.SimpleSentRequest}
*/
const stripped = {
id,
recipientAvatar: null,
recipientChangedRequestAddress,
recipientDisplayName: null,
recipientPublicKey,
timestamp
}
return stripped
}
)
emit('$shock', processed)
}
sentReqsUnsub = GunEvents.onSimplerSentRequests(onSentReqs)
on('disconnect', () => {
sentReqsUnsub()
sentReqsUnsub = emptyUnsub
})
} catch (e) {
logger.error('Error inside sentReqs socket connect: ' + e.message)
emit('$error', e.message)
}
})
let receivedReqsUnsub = emptyUnsub
io.of('receivedReqs').on('connect', async socket => {
const on = encryptedOn(socket)
const emit = encryptedEmit(socket)
try {
if (!isAuthenticated()) {
logger.info(
'not authenticated in gun for receivedReqs socket, will send NOT_AUTH'
)
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
logger.info('now checking token for receivedReqs socket')
const { token } = socket.handshake.auth
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token for receivedReqs socket')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
if (receivedReqsUnsub !== emptyUnsub) {
logger.error(
'Tried to set receivedReqs socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead'
)
receivedReqsUnsub()
receivedReqsUnsub = emptyUnsub
}
/**
* @param {ReadonlyArray<Common.SimpleReceivedRequest>} receivedReqs
*/
const onReceivedReqs = receivedReqs => {
const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => {
/** @type {Common.Schema.SimpleReceivedRequest} */
const stripped = {
id,
requestorAvatar: null,
requestorDisplayName: null,
requestorPK,
timestamp
}
return stripped
})
emit('$shock', processed)
}
receivedReqsUnsub = GunEvents.onSimplerReceivedRequests(onReceivedReqs)
on('disconnect', () => {
receivedReqsUnsub()
receivedReqsUnsub = emptyUnsub
})
} catch (e) {
logger.error('Error inside receivedReqs socket connect: ' + e.message)
emit('$error', e.message)
}
})
io.of('streams').on('connect', socket => {
console.log('a user connected')
socket.on('postID', postID => {

View file

@ -20,6 +20,7 @@ const nonEncryptedEvents = [
* @typedef {import('../../services/gunDB/Mediator').EncryptedEmission} EncryptedEmission
* @typedef {import('../../services/gunDB/Mediator').EncryptedEmissionLegacy} EncryptedEmissionLegacy
* @typedef {import('../../services/gunDB/contact-api/SimpleGUN').ValidDataValue} ValidDataValue
* @typedef {(data: any, callback: (error?: any, data?: any) => void) => void} SocketOnListener
*/
/**
@ -83,7 +84,7 @@ const encryptedEmit = socket => async (eventName, ...args) => {
/**
* @param {SimpleSocket} socket
* @returns {(eventName: string, callback: (data: any) => void) => void}
* @returns {(eventName: string, callback: SocketOnListener) => void}
*/
const encryptedOn = socket => (eventName, callback) => {
try {
@ -110,9 +111,9 @@ const encryptedOn = socket => (eventName, callback) => {
}
}
socket.on(eventName, async data => {
socket.on(eventName, async (data, response) => {
if (isNonEncrypted(eventName)) {
callback(data)
callback(data, response)
return
}
@ -122,8 +123,11 @@ const encryptedOn = socket => (eventName, callback) => {
encryptedMessage: data
})
callback(safeParseJSON(decryptedMessage))
callback(safeParseJSON(decryptedMessage), response)
return
}
callback(data, response)
})
} catch (err) {
logger.error(
@ -135,8 +139,60 @@ const encryptedOn = socket => (eventName, callback) => {
}
}
/**
* @param {SimpleSocket} socket
* @param {(error?: any, data?: any) => void} callback
* @returns {(...args: any[]) => Promise<void>}
*/
const encryptedCallback = (socket, callback) => async (...args) => {
try {
const deviceId = socket.handshake.auth.encryptionId
if (!deviceId) {
throw {
field: 'deviceId',
message: 'Please specify a device ID'
}
}
const authorized = ECC.isAuthorizedDevice({ deviceId })
if (!authorized) {
throw {
field: 'deviceId',
message: 'Please exchange keys with the API before using the socket'
}
}
const encryptedArgs = await Promise.all(
args.map(async data => {
if (!data) {
return data
}
const encryptedMessage = await ECC.encryptMessage({
message: typeof data === 'object' ? JSON.stringify(data) : data,
deviceId
})
return encryptedMessage
})
)
return callback(...encryptedArgs)
} catch (err) {
logger.error(
`[SOCKET] An error has occurred while emitting an event response:`,
err
)
return socket.emit('encryption:error', err)
}
}
module.exports = {
isNonEncrypted,
encryptedOn,
encryptedEmit
encryptedEmit,
encryptedCallback
}