Gunsmith restart stuff in separate methods, improve re-forge
This commit is contained in:
parent
051d5aa5df
commit
7af0ce5bfc
4 changed files with 172 additions and 86 deletions
|
|
@ -2268,13 +2268,12 @@ module.exports = async (
|
|||
|
||||
/**
|
||||
* @typedef {object} HandleGunFetchParams
|
||||
* @prop {'once'|'load'} type
|
||||
* @prop {'once'|'load'|'specialOnce'} type
|
||||
* @prop {boolean} startFromUserGraph
|
||||
* @prop {string} path
|
||||
* @prop {string=} publicKey
|
||||
* @prop {string=} publicKeyForDecryption
|
||||
* @prop {string=} epubForDecryption
|
||||
* @prop {boolean=} mustBePopulated
|
||||
*/
|
||||
/**
|
||||
* @param {HandleGunFetchParams} args0
|
||||
|
|
@ -2286,8 +2285,7 @@ module.exports = async (
|
|||
path,
|
||||
publicKey,
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption,
|
||||
mustBePopulated
|
||||
epubForDecryption
|
||||
}) => {
|
||||
const keys = path.split('>')
|
||||
const { gun, user } = require('../services/gunDB/Mediator')
|
||||
|
|
@ -2317,8 +2315,9 @@ module.exports = async (
|
|||
}
|
||||
}
|
||||
|
||||
if (type === 'once') node.once(listener, { mustBePopulated })
|
||||
if (type === 'once') node.once(listener)
|
||||
if (type === 'load') node.load(listener)
|
||||
if (type === 'specialOnce') node.specialOnce(listener)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -2342,8 +2341,31 @@ module.exports = async (
|
|||
startFromUserGraph: false,
|
||||
type: 'once',
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption,
|
||||
mustBePopulated: !!req.header('must-be-populated')
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
})
|
||||
} catch (e) {
|
||||
logger.error(e)
|
||||
res.status(500).json({
|
||||
errorMessage: e.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
ap.get('/api/gun/specialOnce/:path', async (req, res) => {
|
||||
try {
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path } = req.params
|
||||
logger.info(`Gun special once: ${path}`)
|
||||
const data = await handleGunFetch({
|
||||
path,
|
||||
startFromUserGraph: false,
|
||||
type: 'specialOnce',
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
|
|
@ -2391,8 +2413,31 @@ module.exports = async (
|
|||
startFromUserGraph: true,
|
||||
type: 'once',
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption,
|
||||
mustBePopulated: !!req.header('must-be-populated')
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
})
|
||||
} catch (e) {
|
||||
logger.error(e)
|
||||
res.status(500).json({
|
||||
errorMessage: e.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
ap.get('/api/gun/user/specialOnce/:path', async (req, res) => {
|
||||
try {
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path } = req.params
|
||||
logger.info(`Gun user special once: ${path}`)
|
||||
const data = await handleGunFetch({
|
||||
path,
|
||||
startFromUserGraph: true,
|
||||
type: 'specialOnce',
|
||||
publicKeyForDecryption,
|
||||
epubForDecryption
|
||||
})
|
||||
res.status(200).json({
|
||||
data
|
||||
|
|
@ -2431,11 +2476,11 @@ module.exports = async (
|
|||
|
||||
ap.get('/api/gun/otheruser/:publicKey/:type/:path', async (req, res) => {
|
||||
try {
|
||||
const allowedTypes = ['once', 'load', 'open']
|
||||
const allowedTypes = ['once', 'load', 'open', 'specialOnce']
|
||||
const publicKeyForDecryption = req.header(PUBKEY_FOR_DECRYPT_HEADER)
|
||||
const epubForDecryption = req.header(EPUB_FOR_DECRYPT_HEADER)
|
||||
const { path /*:rawPath*/, publicKey, type } = req.params
|
||||
logger.info(`gun otheruser ${type}: ${path}`)
|
||||
logger.info(`Gun other user ${type}: ${path}`)
|
||||
// const path = decodeURI(rawPath)
|
||||
if (!publicKey || publicKey === 'undefined') {
|
||||
res.status(400).json({
|
||||
|
|
|
|||
|
|
@ -161,7 +161,7 @@ const auth = (alias, pass) => {
|
|||
/** @param {Smith.GunMsg} msg */
|
||||
const _cb = msg => {
|
||||
if (msg.type === 'auth') {
|
||||
logger.info('Received auth reply.', msg)
|
||||
logger.info(`Received ${msg.ack.sea ? 'ok' : 'bad'} auth reply.`)
|
||||
currentGun.off('message', _cb)
|
||||
|
||||
isAuthing = false
|
||||
|
|
@ -190,17 +190,13 @@ const auth = (alias, pass) => {
|
|||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns null if there's no cached credentials.
|
||||
* @returns {Promise<GunT.UserPair|null>}
|
||||
*/
|
||||
const autoAuth = () => {
|
||||
const autoAuth = async () => {
|
||||
if (!lastAlias || !lastPass) {
|
||||
logger.info('No credentials cached, will not auto-auth')
|
||||
return Promise.resolve(null)
|
||||
return
|
||||
}
|
||||
logger.info('Credentials cached, will auth.')
|
||||
return auth(lastAlias, lastPass)
|
||||
await auth(lastAlias, lastPass)
|
||||
}
|
||||
|
||||
const flushPendingPuts = () => {
|
||||
|
|
@ -227,20 +223,36 @@ const flushPendingPuts = () => {
|
|||
logger.info(`Sent ${messages.length} pending puts.`)
|
||||
}
|
||||
|
||||
let isReforging = false
|
||||
let isForging = false
|
||||
|
||||
/** @returns {Promise<void>} */
|
||||
const isReady = () =>
|
||||
new Promise(res => {
|
||||
if (isForging || isAuthing) {
|
||||
setTimeout(() => {
|
||||
isReady().then(res)
|
||||
}, 1000)
|
||||
} else {
|
||||
logger.info('isReady')
|
||||
res()
|
||||
}
|
||||
})
|
||||
|
||||
const forge = () => {
|
||||
if (isReforging) {
|
||||
if (isForging) {
|
||||
throw new Error('Double forge?')
|
||||
}
|
||||
logger.info('Will reforge')
|
||||
isReforging = true
|
||||
|
||||
isForging = true
|
||||
if (currentGun) {
|
||||
logger.info('Will reforge')
|
||||
currentGun.off('message', handleMsg)
|
||||
currentGun.disconnect()
|
||||
currentGun.kill()
|
||||
}
|
||||
logger.info('Killed current gun')
|
||||
} else {
|
||||
logger.info('Will forge')
|
||||
}
|
||||
const newGun = fork('utils/GunSmith/gun.js')
|
||||
currentGun = newGun
|
||||
logger.info('Forged new gun')
|
||||
|
|
@ -284,14 +296,15 @@ const forge = () => {
|
|||
|
||||
logger.info('Finished reforging, will now auto-auth')
|
||||
|
||||
isReforging = false
|
||||
autoAuth()
|
||||
autoAuth().then(() => {
|
||||
isForging = false
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} path
|
||||
* @param {boolean=} afterMap
|
||||
* @returns {GunT.GUNNode}
|
||||
* @returns {GunT.GUNNode & Smith.GunSmithNode}
|
||||
*/
|
||||
function createReplica(path, afterMap = false) {
|
||||
/** @type {(GunT.Listener|GunT.LoadListener)[]} */
|
||||
|
|
@ -366,29 +379,15 @@ function createReplica(path, afterMap = false) {
|
|||
mapListeners.delete(l)
|
||||
}
|
||||
},
|
||||
on(cb, { mustBePopulated } = {}) {
|
||||
on(cb) {
|
||||
listenersForThisRef.push(cb)
|
||||
|
||||
let canaryPeep = false
|
||||
const canary = () => {
|
||||
canaryPeep = true
|
||||
}
|
||||
listenersForThisRef.push(canary)
|
||||
const checkCanary = () =>
|
||||
setTimeout(() => {
|
||||
if (!canaryPeep && mustBePopulated) {
|
||||
forge()
|
||||
checkCanary()
|
||||
}
|
||||
}, 5000)
|
||||
|
||||
if (afterMap) {
|
||||
// eslint-disable-next-line no-multi-assign
|
||||
const listeners =
|
||||
pathToMapListeners[path] || (pathToMapListeners[path] = new Set())
|
||||
|
||||
listeners.add(cb)
|
||||
listeners.add(canary)
|
||||
|
||||
/** @type {Smith.SmithMsgMapOn} */
|
||||
const msg = {
|
||||
|
|
@ -402,7 +401,6 @@ function createReplica(path, afterMap = false) {
|
|||
pathToListeners[path] || (pathToListeners[path] = new Set())
|
||||
|
||||
listeners.add(cb)
|
||||
listeners.add(canary)
|
||||
|
||||
/** @type {Smith.SmithMsgOn} */
|
||||
const msg = {
|
||||
|
|
@ -414,8 +412,7 @@ function createReplica(path, afterMap = false) {
|
|||
|
||||
return this
|
||||
},
|
||||
once(cb, _opts) {
|
||||
const opts = { ...{ mustBePopulated: false, wait: 500 }, ..._opts }
|
||||
once(cb, opts = { wait: 500 }) {
|
||||
// We could use this.on() but then we couldn't call .off()
|
||||
const tmp = createReplica(path, afterMap)
|
||||
if (afterMap) {
|
||||
|
|
@ -430,14 +427,9 @@ function createReplica(path, afterMap = false) {
|
|||
|
||||
setTimeout(() => {
|
||||
tmp.off()
|
||||
if (cb) {
|
||||
if (opts.mustBePopulated && !isPopulated(lastVal)) {
|
||||
forge()
|
||||
this.once(cb, { ...opts, wait: 5000, mustBePopulated: false })
|
||||
} else {
|
||||
cb(lastVal, path.split('>')[path.split('>').length - 1])
|
||||
}
|
||||
}
|
||||
const keys = path.split('>')
|
||||
// eslint-disable-next-line no-unused-expressions
|
||||
cb && cb(lastVal, keys[keys.length - 1])
|
||||
}, opts.wait)
|
||||
|
||||
return this
|
||||
|
|
@ -464,9 +456,9 @@ function createReplica(path, afterMap = false) {
|
|||
path,
|
||||
type: 'put'
|
||||
}
|
||||
if (!isAuthing && !isReforging) {
|
||||
isReady().then(() => {
|
||||
currentGun.send(msg)
|
||||
}
|
||||
})
|
||||
return this
|
||||
},
|
||||
set(data, cb) {
|
||||
|
|
@ -525,11 +517,52 @@ function createReplica(path, afterMap = false) {
|
|||
}
|
||||
}
|
||||
},
|
||||
then(opts) {
|
||||
then() {
|
||||
return new Promise(res => {
|
||||
this.once(data => {
|
||||
res(data)
|
||||
}, opts)
|
||||
})
|
||||
})
|
||||
},
|
||||
specialOn(cb) {
|
||||
let canaryPeep = false
|
||||
|
||||
const checkCanary = () =>
|
||||
setTimeout(() => {
|
||||
if (!canaryPeep) {
|
||||
forge()
|
||||
isReady().then(checkCanary)
|
||||
}
|
||||
}, 5000)
|
||||
|
||||
checkCanary()
|
||||
return this.on((data, key) => {
|
||||
canaryPeep = true
|
||||
cb(data, key)
|
||||
})
|
||||
},
|
||||
specialOnce(cb, _wait = 500) {
|
||||
this.once((data, key) => {
|
||||
if (isPopulated(data) || _wait === 4500) {
|
||||
cb(data, key)
|
||||
} else {
|
||||
forge()
|
||||
isReady().then(() => {
|
||||
this.specialOnce(cb, _wait * 3)
|
||||
})
|
||||
}
|
||||
})
|
||||
return this
|
||||
},
|
||||
specialThen() {
|
||||
return new Promise((res, rej) => {
|
||||
this.specialOnce(data => {
|
||||
if (isPopulated(data)) {
|
||||
res(data)
|
||||
} else {
|
||||
rej(new Error(`Could not fetch data at path ${path}`))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -538,7 +571,7 @@ function createReplica(path, afterMap = false) {
|
|||
let userReplicaCalled = false
|
||||
|
||||
/**
|
||||
* @returns {GunT.UserGUNNode}
|
||||
* @returns {GunT.UserGUNNode & Smith.GunSmithNode}
|
||||
*/
|
||||
function createUserReplica() {
|
||||
if (userReplicaCalled) {
|
||||
|
|
@ -548,7 +581,7 @@ function createUserReplica() {
|
|||
|
||||
const baseReplica = createReplica('$user')
|
||||
|
||||
/** @type {GunT.UserGUNNode} */
|
||||
/** @type {GunT.UserGUNNode & Smith.GunSmithNode} */
|
||||
const completeReplica = {
|
||||
...baseReplica,
|
||||
get _() {
|
||||
|
|
@ -638,7 +671,7 @@ function createUserReplica() {
|
|||
}
|
||||
|
||||
/**
|
||||
* @typedef {GunT.GUNNode & { reforge(): void }} RootNode
|
||||
* @typedef {GunT.GUNNode & Smith.GunSmithNode & { reforge(): void }} RootNode
|
||||
*/
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -58,14 +58,6 @@ namespace GunT {
|
|||
|
||||
export type LoadListener = (data: LoadListenerData, key: string) => void
|
||||
|
||||
export interface GunSmithFetchOpts {
|
||||
/**
|
||||
* GunSmith exclusive. If set to true, gun will be restarted to force
|
||||
* replication of this data.
|
||||
*/
|
||||
mustBePopulated?: boolean
|
||||
}
|
||||
|
||||
export interface GUNNode {
|
||||
_: Soul
|
||||
/**
|
||||
|
|
@ -86,29 +78,16 @@ namespace GunT {
|
|||
>
|
||||
}
|
||||
get(key: string): GUNNode
|
||||
load(this: GUNNode, cb?: LoadListener): GUNNode
|
||||
load(this: GUNNode, cb?: LoadListener): void
|
||||
map(): GUNNode
|
||||
off(): void
|
||||
on(
|
||||
this: GUNNode,
|
||||
cb: Listener,
|
||||
opts?: {
|
||||
change?: boolean
|
||||
} & GunSmithFetchOpts
|
||||
): void
|
||||
once(
|
||||
this: GUNNode,
|
||||
cb?: Listener,
|
||||
opts?: { wait?: number } & GunSmithFetchOpts
|
||||
): GUNNode
|
||||
on(this: GUNNode, cb: Listener): void
|
||||
once(this: GUNNode, cb?: Listener, opts?: { wait?: number }): void
|
||||
user(): UserGUNNode
|
||||
user(pub: string): GUNNode
|
||||
put(data: ValidDataValue, cb?: Callback): GUNNode
|
||||
put(data: ValidDataValue, cb?: Callback): void
|
||||
set(data: ValidDataValue, cb?: Callback): GUNNode
|
||||
/**
|
||||
* @param options Gunsmith only.
|
||||
*/
|
||||
then(opts?: GunSmithFetchOpts): Promise<ListenerData>
|
||||
then(): Promise<ListenerData>
|
||||
}
|
||||
|
||||
export interface CreateAck {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,35 @@
|
|||
*/
|
||||
/// <reference path="GunT.ts" />
|
||||
namespace Smith {
|
||||
export interface GunSmithNode {
|
||||
/**
|
||||
* @override
|
||||
*/
|
||||
map(): GunSmithNode
|
||||
/**
|
||||
* @override
|
||||
*/
|
||||
set(data: GunT.ValidDataValue, cb?: GunT.Callback): GunT.GUNNode
|
||||
/**
|
||||
* Gun will be restarted to force replication of data
|
||||
* if needed.
|
||||
* @param cb
|
||||
*/
|
||||
specialOn(cb: GunT.Listener): void
|
||||
/**
|
||||
* Gun will be restarted to force replication of data
|
||||
* if needed.
|
||||
* @param cb
|
||||
* @param _wait
|
||||
*/
|
||||
specialOnce(cb: GunT.Listener, _wait?: number): GunSmithNode
|
||||
/**
|
||||
* Gun will be restarted to force replication of data
|
||||
* if needed.
|
||||
*/
|
||||
specialThen(): Promise<GunT.ListenerData>
|
||||
}
|
||||
|
||||
export interface PendingPut {
|
||||
cb: GunT.Callback
|
||||
data: GunT.ValidDataValue
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue