GunDB Socket.io functionality rewritten

This commit is contained in:
emad-salah 2021-04-18 21:15:48 +00:00
parent 58f3e343f4
commit 9a95baf03c
5 changed files with 554 additions and 294 deletions

View file

@ -257,7 +257,7 @@ const Config = require('../config')
/**
* @typedef {object} SimpleSocket
* @prop {(eventName: string, data?: Emission|EncryptedEmissionLegacy|EncryptedEmission|ValidDataValue) => void} emit
* @prop {(eventName: string, handler: (data: any) => void) => void} on
* @prop {(eventName: string, handler: (data: any, callback: (err?: any, data?: any) => void) => void) => void} on
* @prop {{ auth: { [key: string]: any } }} handshake
*/

View file

@ -0,0 +1,366 @@
/**
* @format
*/
const logger = require('winston')
const Common = require('shock-common')
const uuidv4 = require('uuid/v4')
const { getGun, getUser, isAuthenticated } = require('../Mediator')
const { deepDecryptIfNeeded } = require('../rpc')
const Subscriptions = require('./subscriptions')
const GunEvents = require('../contact-api/events')
const {
encryptedEmit,
encryptedOn,
encryptedCallback
} = require('../../../utils/ECC/socket')
const TipsForwarder = require('../../tipsCallback')
const auth = require('../../auth/auth')
const ALLOWED_GUN_METHODS = ['map', 'map.on', 'on', 'once', 'load', 'then']
/**
* @typedef {import('../contact-api/SimpleGUN').ValidDataValue} ValidDataValue
*/
/**
* @typedef {(data: ValidDataValue, key: string, _msg: any, event: any) => (void | Promise<void>)} GunListener
* @typedef {{ reconnect: boolean, token: string }} SubscriptionOptions
*/
/**
* @param {string} token
* @returns {Promise<boolean>}
*/
const isValidToken = async token => {
const validation = await auth.validateToken(token)
if (typeof validation !== 'object') {
return false
}
if (validation === null) {
return false
}
if (typeof validation.valid !== 'boolean') {
return false
}
return validation.valid
}
/**
* @param {string} root
*/
const getNode = root => {
if (root === '$gun') {
return getGun()
}
if (root === '$user') {
return getUser()
}
return getGun().user(root)
}
/**
* @param {import("../contact-api/SimpleGUN").GUNNode} node
* @param {string} path
*/
const getGunQuery = (node, path) => {
const bits = path.split('>')
const query = bits.reduce((gunQuery, bit) => gunQuery.get(bit), node)
return query
}
/**
* Dynamically construct a GunDB query call
* @param {any} query
* @param {string} method
*/
const getGunListener = (query, method) => {
const methods = method.split('.')
const listener = methods.reduce((listener, method) => {
if (typeof listener === 'function') {
return listener[method]()
}
return listener[method]()
}, query)
return listener
}
/**
* @param {Object} queryData
* @param {(eventName: string, ...args: any[]) => Promise<void>} queryData.emit
* @param {string} queryData.publicKeyForDecryption
* @param {string} queryData.subscriptionId
* @param {string} queryData.deviceId
* @returns {GunListener}
*/
const queryListenerCallback = ({
emit,
publicKeyForDecryption,
subscriptionId,
deviceId
}) => async (data, key, _msg, event) => {
try {
const subscription = Subscriptions.get({
deviceId,
subscriptionId
})
if (subscription && event && event.off) {
event.off()
}
const eventName = `subscription:${subscriptionId}`
if (publicKeyForDecryption?.length > 15) {
const decData = await deepDecryptIfNeeded(data, publicKeyForDecryption)
emit(eventName, decData, key)
return
}
emit(eventName, data, key)
} catch (err) {
logger.error(`Error for gun rpc socket: ${err.message}`)
}
}
/**
* @param {Object} GunSocketOptions
* @param {() => (import('./subscriptions').Unsubscribe | void)} GunSocketOptions.handler
* @param {string} GunSocketOptions.subscriptionId
* @param {string} GunSocketOptions.encryptionId
* @param {import('socket.io').Socket} GunSocketOptions.socket
* @returns {(options: SubscriptionOptions, response: (error?: any, data?: any) => void) => Promise<void>}
*/
const wrap = ({ handler, subscriptionId, encryptionId, socket }) => {
return async ({ reconnect = false, token }, response) => {
const callback = encryptedCallback(socket, response)
const emit = encryptedEmit(socket)
const subscription = Subscriptions.get({
deviceId: encryptionId,
subscriptionId
})
if (subscription && !reconnect) {
callback({
field: 'subscription',
message:
"You're already subscribed to this event, you can re-subscribe again by setting 'reconnect' to true "
})
return
}
if (reconnect) {
Subscriptions.remove({
deviceId: encryptionId,
subscriptionId
})
}
if (!subscription || reconnect) {
const isAuth = await isValidToken(token)
if (!isAuth) {
logger.warn('invalid token specified')
emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const unsubscribe = handler()
if (unsubscribe) {
Subscriptions.attachUnsubscribe({
deviceId: encryptionId,
subscriptionId,
unsubscribe
})
}
}
callback(null, {
message: 'Subscribed successfully!',
success: true
})
}
}
/** @param {import('socket.io').Socket} socket */
const startSocket = socket => {
try {
if (!isAuthenticated()) {
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const emit = encryptedEmit(socket)
const on = encryptedOn(socket)
const { encryptionId } = socket.handshake.auth
on('subscribe:query', ({ $shock, publicKey }, response) => {
const [root, path, method] = $shock.split('::')
const callback = encryptedCallback(socket, response)
if (!ALLOWED_GUN_METHODS.includes(method)) {
callback(`Invalid method for gun rpc call: ${method}, query: ${$shock}`)
return
}
const subscriptionId = uuidv4()
const queryCallback = queryListenerCallback({
emit,
publicKeyForDecryption: publicKey,
subscriptionId,
deviceId: encryptionId
})
const node = getNode(root)
const query = getGunQuery(node, path)
/** @type {(cb?: GunListener) => void} */
const listener = getGunListener(query, method)
Subscriptions.add({
deviceId: encryptionId,
subscriptionId
})
callback(null, {
subscriptionId
})
listener(queryCallback)
})
const onChats = () => {
return GunEvents.onChats(chats => {
const processed = chats.map(
({
didDisconnect,
id,
lastSeenApp,
messages,
recipientPublicKey
}) => {
/** @type {Common.Schema.Chat} */
const stripped = {
didDisconnect,
id,
lastSeenApp,
messages,
recipientAvatar: null,
recipientDisplayName: null,
recipientPublicKey
}
return stripped
}
)
emit('chats', processed)
})
}
on(
'subscribe:chats',
wrap({
handler: onChats,
encryptionId,
subscriptionId: 'chats',
socket
})
)
const onSentRequests = () => {
return GunEvents.onSimplerSentRequests(sentReqs => {
const processed = sentReqs.map(
({
id,
recipientChangedRequestAddress,
recipientPublicKey,
timestamp
}) => {
/**
* @type {Common.Schema.SimpleSentRequest}
*/
const stripped = {
id,
recipientAvatar: null,
recipientChangedRequestAddress,
recipientDisplayName: null,
recipientPublicKey,
timestamp
}
return stripped
}
)
emit('sentRequests', processed)
})
}
on(
'subscribe:sentRequests',
wrap({
handler: onSentRequests,
encryptionId,
subscriptionId: 'sentRequests',
socket
})
)
const onReceivedRequests = () => {
return GunEvents.onSimplerReceivedRequests(receivedReqs => {
const processed = receivedReqs.map(({ id, requestorPK, timestamp }) => {
/** @type {Common.Schema.SimpleReceivedRequest} */
const stripped = {
id,
requestorAvatar: null,
requestorDisplayName: null,
requestorPK,
timestamp
}
return stripped
})
emit('receivedRequests', processed)
})
}
on(
'subscribe:receivedRequests',
wrap({
handler: onReceivedRequests,
encryptionId,
subscriptionId: 'receivedRequests',
socket
})
)
on('streams:postID', postID => {
TipsForwarder.addSocket(postID, socket)
})
on('unsubscribe', (subscriptionId, response) => {
const callback = encryptedCallback(socket, response)
Subscriptions.remove({ deviceId: encryptionId, subscriptionId })
callback(null, {
message: 'Unsubscribed successfully!',
success: true
})
})
socket.on('disconnect', () => {
Subscriptions.removeDevice({ deviceId: encryptionId })
})
} catch (err) {
logger.error('GUNRPC: ' + err.message)
}
}
module.exports = startSocket

View file

@ -0,0 +1,127 @@
/**
* @typedef {() => void} Unsubscribe
*/
/** @type {Map<string, Map<string, { subscriptionId: string, unsubscribe?: () => void, metadata?: object }>>} */
const userSubscriptions = new Map()
/**
* Adds a new Subscription
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
* @param {(Unsubscribe)=} subscription.unsubscribe
* @param {(object)=} subscription.metadata
*/
const add = ({ deviceId, subscriptionId, unsubscribe, metadata }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
const subscriptions = deviceSubscriptions ?? new Map()
subscriptions.set(subscriptionId, {
subscriptionId,
unsubscribe,
metadata
})
userSubscriptions.set(deviceId, subscriptions)
}
/**
* Adds a new Subscription
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
* @param {Unsubscribe} subscription.unsubscribe
*/
const attachUnsubscribe = ({
deviceId,
subscriptionId,
unsubscribe
}) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
const subscriptions = deviceSubscriptions
if (!subscriptions) {
return
}
const subscription = subscriptions.get(subscriptionId)
if (!subscription) {
return
}
subscriptions.set(subscriptionId, {
...subscription,
unsubscribe
})
userSubscriptions.set(deviceId, subscriptions)
}
/**
* Unsubscribes from a GunDB query
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
*/
const remove = ({ deviceId, subscriptionId }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
const subscriptions = deviceSubscriptions ?? new Map()
const subscription = subscriptions.get(subscriptionId);
if (subscription?.unsubscribe) {
subscription.unsubscribe()
}
subscriptions.delete(subscriptionId)
userSubscriptions.set(deviceId, subscriptions)
}
/**
* Unsubscribes from all GunDB queries for a specific device
* @param {Object} subscription
* @param {string} subscription.deviceId
*/
const removeDevice = ({ deviceId }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId);
if (!deviceSubscriptions) {
return
}
Array.from(deviceSubscriptions.values()).map(subscription => {
if (subscription && subscription.unsubscribe) {
subscription.unsubscribe()
}
return subscription
})
userSubscriptions.set(deviceId, new Map());
}
/**
* Retrieves the specified subscription's info if it exists
* @param {Object} subscription
* @param {string} subscription.deviceId
* @param {string} subscription.subscriptionId
*/
const get = ({ deviceId, subscriptionId }) => {
const deviceSubscriptions = userSubscriptions.get(deviceId)
if (!deviceSubscriptions) {
return false
}
const subscription = deviceSubscriptions.get(subscriptionId)
return subscription
}
module.exports = {
add,
attachUnsubscribe,
get,
remove,
removeDevice
}

View file

@ -8,16 +8,10 @@ const Common = require('shock-common')
const mapValues = require('lodash/mapValues')
const auth = require('../services/auth/auth')
const Encryption = require('../utils/encryptionStore')
const LightningServices = require('../utils/lightningServices')
const {
getGun,
getUser,
isAuthenticated
} = require('../services/gunDB/Mediator')
const { deepDecryptIfNeeded } = require('../services/gunDB/rpc')
const { isAuthenticated } = require('../services/gunDB/Mediator')
const initGunDBSocket = require('../services/gunDB/sockets')
const GunEvents = require('../services/gunDB/contact-api/events')
const SchemaManager = require('../services/schema')
const { encryptedEmit, encryptedOn } = require('../utils/ECC/socket')
const TipsForwarder = require('../services/tipsCallback')
/**
@ -29,204 +23,7 @@ module.exports = (
/** @type {import('socket.io').Server} */
io
) => {
// This should be used for encrypting and emitting your data
const encryptedEmitLegacy = ({ eventName, data, socket }) => {
try {
if (Encryption.isNonEncrypted(eventName)) {
return socket.emit(eventName, data)
}
const deviceId = socket.handshake.auth['x-shockwallet-device-id']
const authorized = Encryption.isAuthorizedDevice({ deviceId })
if (!deviceId) {
throw {
field: 'deviceId',
message: 'Please specify a device ID'
}
}
if (!authorized) {
throw {
field: 'deviceId',
message: 'Please exchange keys with the API before using the socket'
}
}
const encryptedMessage = Encryption.encryptMessage({
message: data,
deviceId
})
return socket.emit(eventName, encryptedMessage)
} catch (err) {
logger.error(
`[SOCKET] An error has occurred while encrypting an event (${eventName}):`,
err
)
return socket.emit('encryption:error', err)
}
}
const onNewInvoice = (socket, subID) => {
const { lightning } = LightningServices.services
logger.warn('Subscribing to invoices socket...' + subID)
const stream = lightning.subscribeInvoices({})
stream.on('data', data => {
logger.info('[SOCKET] New invoice data:', data)
encryptedEmitLegacy({ eventName: 'invoice:new', data, socket })
if (!data.settled) {
return
}
SchemaManager.AddOrder({
type: 'invoice',
amount: parseInt(data.amt_paid_sat, 10),
coordinateHash: data.r_hash.toString('hex'),
coordinateIndex: parseInt(data.add_index, 10),
inbound: true,
toLndPub: data.payment_addr
})
})
stream.on('end', () => {
logger.info('New invoice stream ended, starting a new one...')
// Prevents call stack overflow exceptions
//process.nextTick(() => onNewInvoice(socket))
})
stream.on('error', err => {
logger.error('New invoice stream error:' + subID, err)
})
stream.on('status', status => {
logger.warn('New invoice stream status:' + subID, status)
switch (status.code) {
case 0: {
logger.info('[event:invoice:new] stream ok')
break
}
case 1: {
logger.info(
'[event:invoice:new] stream canceled, probably socket disconnected'
)
break
}
case 2: {
logger.warn('[event:invoice:new] got UNKNOWN error status')
break
}
case 12: {
logger.warn(
'[event:invoice:new] LND locked, new registration in 60 seconds'
)
process.nextTick(() =>
setTimeout(() => onNewInvoice(socket, subID), 60000)
)
break
}
case 13: {
//https://grpc.github.io/grpc/core/md_doc_statuscodes.html
logger.error('[event:invoice:new] INTERNAL LND error')
break
}
case 14: {
logger.error(
'[event:invoice:new] LND disconnected, sockets reconnecting in 30 seconds...'
)
process.nextTick(() =>
setTimeout(() => onNewInvoice(socket, subID), 30000)
)
break
}
default: {
logger.error('[event:invoice:new] UNKNOWN LND error')
}
}
})
return () => {
stream.cancel()
}
}
const onNewTransaction = (socket, subID) => {
const { lightning } = LightningServices.services
const stream = lightning.subscribeTransactions({})
logger.warn('Subscribing to transactions socket...' + subID)
stream.on('data', data => {
logger.info('[SOCKET] New transaction data:', data)
Promise.all(data.dest_addresses.map(SchemaManager.isTmpChainOrder)).then(
responses => {
const hasOrder = responses.some(res => res !== false)
if (hasOrder && data.num_confirmations > 0) {
//buddy needs to manage this
} else {
//business as usual
encryptedEmitLegacy({ eventName: 'transaction:new', data, socket })
}
}
)
})
stream.on('end', () => {
logger.info('New transactions stream ended, starting a new one...')
//process.nextTick(() => onNewTransaction(socket))
})
stream.on('error', err => {
logger.error('New transactions stream error:' + subID, err)
})
stream.on('status', status => {
logger.info('New transactions stream status:' + subID, status)
switch (status.code) {
case 0: {
logger.info('[event:transaction:new] stream ok')
break
}
case 1: {
logger.info(
'[event:transaction:new] stream canceled, probably socket disconnected'
)
break
}
case 2: {
//Happens to fire when the grpc client lose access to macaroon file
logger.warn('[event:transaction:new] got UNKNOWN error status')
break
}
case 12: {
logger.warn(
'[event:transaction:new] LND locked, new registration in 60 seconds'
)
process.nextTick(() =>
setTimeout(() => onNewTransaction(socket, subID), 60000)
)
break
}
case 13: {
//https://grpc.github.io/grpc/core/md_doc_statuscodes.html
logger.error('[event:transaction:new] INTERNAL LND error')
break
}
case 14: {
logger.error(
'[event:transaction:new] LND disconnected, sockets reconnecting in 30 seconds...'
)
process.nextTick(() =>
setTimeout(() => onNewTransaction(socket, subID), 30000)
)
break
}
default: {
logger.error('[event:transaction:new] UNKNOWN LND error')
}
}
})
return () => {
stream.cancel()
}
}
io.on('connection', socket => {
logger.info(`io.onconnection`)
logger.info('socket.handshake', socket.handshake)
const isLNDSocket = !!socket.handshake.auth.IS_LND_SOCKET
const isNotificationsSocket = !!socket.handshake.auth
.IS_NOTIFICATIONS_SOCKET
@ -240,89 +37,6 @@ module.exports = (
const subID = Math.floor(Math.random() * 1000).toString()
const isNotifications = isNotificationsSocket ? 'notifications' : ''
logger.info('[LND] New LND Socket created:' + isNotifications + subID)
/* not used by wallet anymore
const cancelInvoiceStream = onNewInvoice(socket, subID)
const cancelTransactionStream = onNewTransaction(socket, subID)
socket.on('disconnect', () => {
logger.info('LND socket disconnected:' + isNotifications + subID)
cancelInvoiceStream()
cancelTransactionStream()
})*/
}
})
io.of('gun').on('connect', socket => {
// TODO: off()
try {
if (!isAuthenticated()) {
socket.emit(Common.Constants.ErrorCode.NOT_AUTH)
return
}
const emit = encryptedEmit(socket)
const { $shock, publicKeyForDecryption } = socket.handshake.auth
const [root, path, method] = $shock.split('::')
// eslint-disable-next-line init-declarations
let node
if (root === '$gun') {
node = getGun()
} else if (root === '$user') {
node = getUser()
} else {
node = getGun().user(root)
}
for (const bit of path.split('>')) {
node = node.get(bit)
}
/**
* @param {ValidDataValue} data
* @param {string} key
*/
const listener = async (data, key) => {
try {
if (
typeof publicKeyForDecryption === 'string' &&
publicKeyForDecryption !== 'undefined' &&
publicKeyForDecryption.length > 15
) {
const decData = await deepDecryptIfNeeded(
data,
publicKeyForDecryption
)
emit('$shock', decData, key)
} else {
emit('$shock', data, key)
}
} catch (err) {
logger.error(
`Error for gun rpc socket, query ${$shock} -> ${err.message}`
)
}
}
if (method === 'on') {
node.on(listener)
} else if (method === 'open') {
node.open(listener)
} else if (method === 'map.on') {
node.map().on(listener)
} else if (method === 'map.once') {
node.map().once(listener)
} else {
throw new TypeError(
`Invalid method for gun rpc call : ${method}, query: ${$shock}`
)
}
} catch (err) {
logger.error('GUNRPC: ' + err.message)
}
})

View file

@ -20,6 +20,7 @@ const nonEncryptedEvents = [
* @typedef {import('../../services/gunDB/Mediator').EncryptedEmission} EncryptedEmission
* @typedef {import('../../services/gunDB/Mediator').EncryptedEmissionLegacy} EncryptedEmissionLegacy
* @typedef {import('../../services/gunDB/contact-api/SimpleGUN').ValidDataValue} ValidDataValue
* @typedef {(data: any, callback: (error?: any, data?: any) => void) => void} SocketOnListener
*/
/**
@ -83,7 +84,7 @@ const encryptedEmit = socket => async (eventName, ...args) => {
/**
* @param {SimpleSocket} socket
* @returns {(eventName: string, callback: (data: any) => void) => void}
* @returns {(eventName: string, callback: SocketOnListener) => void}
*/
const encryptedOn = socket => (eventName, callback) => {
try {
@ -110,9 +111,9 @@ const encryptedOn = socket => (eventName, callback) => {
}
}
socket.on(eventName, async data => {
socket.on(eventName, async (data, response) => {
if (isNonEncrypted(eventName)) {
callback(data)
callback(data, response)
return
}
@ -122,7 +123,7 @@ const encryptedOn = socket => (eventName, callback) => {
encryptedMessage: data
})
callback(safeParseJSON(decryptedMessage))
callback(safeParseJSON(decryptedMessage), response)
}
})
} catch (err) {
@ -135,8 +136,60 @@ const encryptedOn = socket => (eventName, callback) => {
}
}
/**
* @param {SimpleSocket} socket
* @param {(error?: any, data?: any) => void} callback
* @returns {(...args: any[]) => Promise<void>}
*/
const encryptedCallback = (socket, callback) => async (...args) => {
try {
const deviceId = socket.handshake.auth.encryptionId
if (!deviceId) {
throw {
field: 'deviceId',
message: 'Please specify a device ID'
}
}
const authorized = ECC.isAuthorizedDevice({ deviceId })
if (!authorized) {
throw {
field: 'deviceId',
message: 'Please exchange keys with the API before using the socket'
}
}
const encryptedArgs = await Promise.all(
args.map(async data => {
if (!data) {
return data
}
const encryptedMessage = await ECC.encryptMessage({
message: typeof data === 'object' ? JSON.stringify(data) : data,
deviceId
})
return encryptedMessage
})
)
return callback(...encryptedArgs)
} catch (err) {
logger.error(
`[SOCKET] An error has occurred while emitting an event response:`,
err
)
return socket.emit('encryption:error', err)
}
}
module.exports = {
isNonEncrypted,
encryptedOn,
encryptedEmit
encryptedEmit,
encryptedCallback
}