Bug fixes

This commit is contained in:
emad-salah 2021-04-21 12:22:04 +00:00
parent c6f4259a18
commit e0310f13ca
2 changed files with 106 additions and 276 deletions

View file

@ -18,14 +18,22 @@ const {
const TipsForwarder = require('../../tipsCallback') const TipsForwarder = require('../../tipsCallback')
const auth = require('../../auth/auth') const auth = require('../../auth/auth')
const ALLOWED_GUN_METHODS = ['map', 'map.on', 'on', 'once', 'load', 'then'] const ALLOWED_GUN_METHODS = [
'map',
'map.on',
'on',
'once',
'load',
'then',
'open'
]
/** /**
* @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue * @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue
*/ */
/** /**
* @typedef {(data: ValidDataValue, key: string, _msg: any, event: any) => (void | Promise<void>)} GunListener * @typedef {(data: ValidDataValue, key?: string, _msg?: any, event?: any) => (void | Promise<void>)} GunListener
* @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions * @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions
*/ */
@ -77,21 +85,34 @@ const getGunQuery = (node, path) => {
} }
/** /**
* Dynamically construct a GunDB query call * Executes a GunDB query call using the specified method
* @param {any} query * @param {any} query
* @param {string} method * @param {string} method
* @param {GunListener} listener
*/ */
const getGunListener = (query, method) => { const executeGunQuery = (query, method, listener) => {
const methods = method.split('.') if (!ALLOWED_GUN_METHODS.includes(method)) {
const listener = methods.reduce((listener, method) => { throw {
if (typeof listener === 'function') { field: 'method',
return listener[method]() message: `Invalid GunDB method specified (${method}). `
}
} }
return listener[method]() if (method === 'on') {
}, query) return query.on(listener)
}
return listener if (method === 'open') {
return query.open(listener)
}
if (method === 'map.on') {
return query.map().on(listener)
}
if (method === 'map.once') {
return query.map().once(listener)
}
} }
/** /**
@ -113,18 +134,21 @@ const queryListenerCallback = ({
deviceId, deviceId,
subscriptionId subscriptionId
}) })
if (subscription && event && event.off) { if (subscription && !subscription.unsubscribe && event) {
event.off() Subscriptions.attachUnsubscribe({
deviceId,
subscriptionId,
unsubscribe: () => event.off()
})
} }
const eventName = `subscription:${subscriptionId}` const eventName = `query:data`
if (publicKeyForDecryption?.length > 15) { if (publicKeyForDecryption?.length > 15) {
const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption) const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption)
emit(eventName, { subscriptionId, response: { data: decData, key } })
emit(eventName, decData, key)
return return
} }
emit(eventName, data, key) emit(eventName, { subscriptionId, response: { data, key } })
} catch (err) { } catch (err) {
logger.error(`Error for gun rpc socket: ${err.message}`) logger.error(`Error for gun rpc socket: ${err.message}`)
} }
@ -138,8 +162,18 @@ const queryListenerCallback = ({
* @param {import('socket.io').Socket} GunSocketOptions.socket * @param {import('socket.io').Socket} GunSocketOptions.socket
* @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise<void>} * @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise<void>}
*/ */
const wrap = ({ handler, subscriptionId, encryptionId, socket }) => { const wrap = ({ handler, subscriptionId, encryptionId, socket }) => async (
return async ({ reconnect = false, token }, response) => { { reconnect, token },
response
) => {
try {
logger.info('Subscribe function executing...')
if (!isAuthenticated()) {
logger.warn('GunDB is not yet authenticated')
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const callback = encryptedCallback(socket, response) const callback = encryptedCallback(socket, response)
const emit = encryptedEmit(socket) const emit = encryptedEmit(socket)
const subscription = Subscriptions.get({ const subscription = Subscriptions.get({
@ -148,15 +182,21 @@ const wrap = ({ handler, subscriptionId, encryptionId, socket }) => {
}) })
if (subscription && !reconnect) { if (subscription && !reconnect) {
callback({ const error = {
field: 'subscription', field: 'subscription',
message: message:
"You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true " "You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true "
}) }
logger.error('Duplicate subscription:', error)
callback(error)
return return
} }
if (reconnect) { if (subscription && reconnect) {
if (subscription.unsubscribe) {
subscription.unsubscribe()
}
Subscriptions.remove({ Subscriptions.remove({
deviceId: encryptionId, deviceId: encryptionId,
subscriptionId subscriptionId
@ -187,31 +227,41 @@ const wrap = ({ handler, subscriptionId, encryptionId, socket }) => {
message: 'Subscribed successfully!', message: 'Subscribed successfully!',
success: true success: true
}) })
} catch (error) {
logger.error('Socket wrapper error:', error)
} }
} }
/** @param {import('socket.io').Socket} socket */ /** @param {import('socket.io').Socket} socket */
const startSocket = socket => { const startSocket = socket => {
try { try {
if (!isAuthenticated()) {
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const emit = encryptedEmit(socket) const emit = encryptedEmit(socket)
const on = encryptedOn(socket) const on = encryptedOn(socket)
const { encryptionId } = socket.handshake.auth const { encryptionId } = socket.handshake.auth
on('subscribe:query', ({ $shock, publicKey }, response) => { on('subscribe:query', ({ $shock, publicKey }, response) => {
const [root, path, method] = $shock.split('::') const subscriptionId = uuidv4()
const callback = encryptedCallback(socket, response) try {
if (!isAuthenticated()) {
if (!ALLOWED_GUN_METHODS.includes(method)) { socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
callback(`Invalid method for gun rpc call: ${method}, query: ${$shock}`)
return return
} }
const subscriptionId = uuidv4() const [root, path, method] = $shock.split('::')
const socketCallback = encryptedCallback(socket, response)
if (!ALLOWED_GUN_METHODS.includes(method)) {
socketCallback(
`Invalid method for gun rpc call: ${method}, query: ${$shock}`
)
return
}
Subscriptions.add({
deviceId: encryptionId,
subscriptionId
})
const queryCallback = queryListenerCallback({ const queryCallback = queryListenerCallback({
emit, emit,
publicKeyForDecryption: publicKey, publicKeyForDecryption: publicKey,
@ -219,21 +269,17 @@ const startSocket = socket => {
deviceId: encryptionId deviceId: encryptionId
}) })
socketCallback(null, {
subscriptionId
})
const node = getNode(root) const node = getNode(root)
const query = getGunQuery(node, path) const query = getGunQuery(node, path)
/** @type {(cb?: GunListener) => void} */
const listener = getGunListener(query, method)
Subscriptions.add({ executeGunQuery(query, method, queryCallback)
deviceId: encryptionId, } catch (error) {
subscriptionId emit(`query:error`, { subscriptionId, response: { data: error } })
}) }
callback(null, {
subscriptionId
})
listener(queryCallback)
}) })
const onChats = () => { const onChats = () => {
@ -346,7 +392,7 @@ const startSocket = socket => {
TipsForwarder.addSocket(postID, socket) TipsForwarder.addSocket(postID, socket)
}) })
on('unsubscribe', (subscriptionId, response) => { on('unsubscribe', ({ subscriptionId }, response) => {
const callback = encryptedCallback(socket, response) const callback = encryptedCallback(socket, response)
Subscriptions.remove({ deviceId: encryptionId, subscriptionId }) Subscriptions.remove({ deviceId: encryptionId, subscriptionId })
callback(null, { callback(null, {

View file

@ -11,7 +11,6 @@ const auth = require('../services/auth/auth')
const LightningServices = require('../utils/lightningServices') const LightningServices = require('../utils/lightningServices')
const { isAuthenticated } = require('../services/gunDB/Mediator') const { isAuthenticated } = require('../services/gunDB/Mediator')
const initGunDBSocket = require('../services/gunDB/sockets') const initGunDBSocket = require('../services/gunDB/sockets')
const GunEvents = require('../services/gunDB/contact-api/events')
const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket') const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket')
const TipsForwarder = require('../services/tipsCallback') const TipsForwarder = require('../services/tipsCallback')
/** /**
@ -23,7 +22,7 @@ module.exports = (
/** @type {import('socket.io').Server} */ /** @type {import('socket.io').Server} */
io io
) => { ) => {
io.on('connection', socket => { io.on('connect', socket => {
const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET
const isNotificationsSocket = !!socket.handshake.auth const isNotificationsSocket = !!socket.handshake.auth
.IS_NOTIFICATIONS_SOCKET .IS_NOTIFICATIONS_SOCKET
@ -105,6 +104,10 @@ module.exports = (
} }
}) })
io.of('gun').on('connect', socket => {
initGunDBSocket(socket)
})
/** /**
* @param {string} token * @param {string} token
* @returns {Promise<boolean>} * @returns {Promise<boolean>}
@ -129,7 +132,7 @@ module.exports = (
/** @type {null|NodeJS.Timeout} */ /** @type {null|NodeJS.Timeout} */
let pingIntervalID = null let pingIntervalID = null
// TODO: Unused?
io.of('shockping').on( io.of('shockping').on(
'connect', 'connect',
// TODO: make this sync // TODO: make this sync
@ -181,225 +184,6 @@ module.exports = (
} }
) )
// TODO: do this through rpc
const emptyUnsub = () => {}
let chatsUnsub = emptyUnsub
io.of('chats').on('connect', async socket => {
const on = encryptedOn(socket)
const emit = encryptedEmit(socket)
try {
if (!isAuthenticated()) {
logger.info(
'not authenticated in gun for chats socket, will send NOT_AUTH'
)
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
logger.info('now checking token for chats socket')
const { token } = socket.handshake.auth
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token for chats socket')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
if (chatsUnsub !== emptyUnsub) {
logger.error(
'Tried to set chats socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead'
)
chatsUnsub()
chatsUnsub = emptyUnsub
}
/**
* @param {Common.Schema.Chat[]} chats
*/
const onChats = chats => {
const processed = chats.map(
({
didDisconnect,
id,
lastSeenApp,
messages,
recipientPublicKey
}) => {
/** @type {Common.Schema.Chat} */
const stripped = {
didDisconnect,
id,
lastSeenApp,
messages,
recipientAvatar: null,
recipientDisplayName: null,
recipientPublicKey
}
return stripped
}
)
emit('$shock', processed)
}
chatsUnsub = GunEvents.onChats(onChats)
on('disconnect', () => {
chatsUnsub()
chatsUnsub = emptyUnsub
})
} catch (e) {
logger.error('Error inside chats socket connect: ' + e.message)
emit('$error', e.message)
}
})
let sentReqsUnsub = emptyUnsub
io.of('sentReqs').on('connect', async socket => {
const on = encryptedOn(socket)
const emit = encryptedEmit(socket)
try {
if (!isAuthenticated()) {
logger.info(
'not authenticated in gun for sentReqs socket, will send NOT_AUTH'
)
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
logger.info('now checking token for sentReqs socket')
const { token } = socket.handshake.auth
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token for sentReqs socket')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
if (sentReqsUnsub !== emptyUnsub) {
logger.error(
'Tried to set sentReqs socket twice, this might be due to an app restart and the old socket not being recycled by io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead'
)
sentReqsUnsub()
sentReqsUnsub = emptyUnsub
}
/**
* @param {Common.Schema.SimpleSentRequest[]} sentReqs
*/
const onSentReqs = sentReqs => {
const processed = sentReqs.map(
({
id,
recipientChangedRequestAddress,
recipientPublicKey,
timestamp
}) => {
/**
* @type {Common.Schema.SimpleSentRequest}
*/
const stripped = {
id,
recipientAvatar: null,
recipientChangedRequestAddress,
recipientDisplayName: null,
recipientPublicKey,
timestamp
}
return stripped
}
)
emit('$shock', processed)
}
sentReqsUnsub = GunEvents.onSimplerSentRequests(onSentReqs)
on('disconnect', () => {
sentReqsUnsub()
sentReqsUnsub = emptyUnsub
})
} catch (e) {
logger.error('Error inside sentReqs socket connect: ' + e.message)
emit('$error', e.message)
}
})
let receivedReqsUnsub = emptyUnsub
io.of('receivedReqs').on('connect', async socket => {
const on = encryptedOn(socket)
const emit = encryptedEmit(socket)
try {
if (!isAuthenticated()) {
logger.info(
'not authenticated in gun for receivedReqs socket, will send NOT_AUTH'
)
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
logger.info('now checking token for receivedReqs socket')
const { token } = socket.handshake.auth
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token for receivedReqs socket')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
if (receivedReqsUnsub !== emptyUnsub) {
logger.error(
'Tried to set receivedReqs socket twice, this might be due to an app restart and the old socket not being recycled by socket.io in time, will disable the older subscription, which means the old socket wont work and data will be sent to this new socket instead'
)
receivedReqsUnsub()
receivedReqsUnsub = emptyUnsub
}
/**
* @param {ReadonlyArray<Common.SimpleReceivedRequest>} receivedReqs
*/
const onReceivedReqs = receivedReqs => {
const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => {
/** @type {Common.Schema.SimpleReceivedRequest} */
const stripped = {
id,
requestorAvatar: null,
requestorDisplayName: null,
requestorPK,
timestamp
}
return stripped
})
emit('$shock', processed)
}
receivedReqsUnsub = GunEvents.onSimplerReceivedRequests(onReceivedReqs)
on('disconnect', () => {
receivedReqsUnsub()
receivedReqsUnsub = emptyUnsub
})
} catch (e) {
logger.error('Error inside receivedReqs socket connect: ' + e.message)
emit('$error', e.message)
}
})
io.of('streams').on('connect', socket => { io.of('streams').on('connect', socket => {
console.log('a user connected') console.log('a user connected')
socket.on('postID', postID => { socket.on('postID', postID => {