diff --git a/services/streams.js b/services/streams.js new file mode 100644 index 00000000..d567fb35 --- /dev/null +++ b/services/streams.js @@ -0,0 +1,150 @@ + +const EventEmitter = require('events') +const fetch = require('node-fetch') +const Key = require('./gunDB/contact-api/key') +const StreamLiveManager = new EventEmitter() + +const startedStream = (data) => { + StreamLiveManager.emit('awaitStream',data) +} +const endStream = (data) => { + StreamLiveManager.emit('endStream',data) +} +module.exports = {startedStream,endStream} + +//----------------------------------------- + +const intervalsWaitingAlive = {} +const intervalsStreamingViewers = {} +const intervalsWaitingMp4 = {} + +const clearStreamInterval = (postId, map) => { + if(!postId){ + return + } + if(map === "intervalsWaitingAlive"){ + if(!intervalsWaitingAlive[postId]){ + return + } + clearInterval(intervalsWaitingAlive[postId]) + delete intervalsWaitingAlive[postId] + } + if(map === "intervalsStreamingViewers"){ + if(!intervalsStreamingViewers[postId]){ + return + } + clearInterval(intervalsStreamingViewers[postId]) + delete intervalsStreamingViewers[postId] + } + if(map === "intervalsWaitingMp4"){ + if(!intervalsWaitingMp4[postId]){ + return + } + clearInterval(intervalsWaitingMp4[postId]) + delete intervalsWaitingMp4[postId] + } +} + +StreamLiveManager.on('awaitStream', data => { + const { postId, contentId, statusUrl } = data + if(intervalsWaitingAlive[postId]){ + clearStreamInterval(intervalsWaitingAlive[postId]) + } + const user = require('../services/gunDB/Mediator').getUser() + intervalsWaitingAlive[postId] = setInterval(async () => { + try { + const res = await fetch(statusUrl) + const j = await res.json() + if (!j.isLive) { + return + } + user + .get(Key.POSTS_NEW) + .get(postId) + .get('contentItems') + .get(contentId) + .get('liveStatus') + .put('live') + clearStreamInterval(postId,"intervalsWaitingAlive") + StreamLiveManager.emit('followStream', data) + //eslint-disable-next-line no-empty + } catch{} + }, 2 * 1000) + //kill sub after 10 minutes + setTimeout(()=>{ + clearStreamInterval(postId,"intervalsWaitingAlive") + },10 * 60 * 1000) +}) + +StreamLiveManager.on('followStream', data => { + const { postId, contentId, statusUrl } = data + if(intervalsStreamingViewers[postId]){ + clearStreamInterval(postId,"intervalsStreamingViewers") + } + const user = require('../services/gunDB/Mediator').getUser() + intervalsStreamingViewers[postId] = setInterval(async () => { + try { + const res = await fetch(statusUrl) + const j = await res.json() + if (typeof j.viewers !== 'number') { + return + } + user + .get(Key.POSTS_NEW) + .get(postId) + .get('contentItems') + .get(contentId) + .get('viewersCounter') + .put(j.viewers) + //eslint-disable-next-line no-empty + } catch{} + }, 5 * 1000) +}) + +StreamLiveManager.on('endStream', data => { + const { postId, contentId, endUrl, urlForMagnet, obsToken } = data + console.log("ending stream!") + clearStreamInterval(postId,"intervalsStreamingViewers") + if(intervalsWaitingMp4[postId]){ + clearStreamInterval(postId,"intervalsWaitingMp4") + } + const user = require('../services/gunDB/Mediator').getUser() + user + .get(Key.POSTS_NEW) + .get(postId) + .get('contentItems') + .get(contentId) + .get('liveStatus') + .put('waiting') + fetch(endUrl,{ + headers: { + 'Authorization': `Bearer ${obsToken}` + }, + }) + intervalsWaitingMp4[postId] = setInterval(async () => { + try { + const res = await fetch(urlForMagnet) + const j = await res.json() + if (!j.magnet) { + return + } + user + .get(Key.POSTS_NEW) + .get(postId) + .get('contentItems') + .get(contentId) + .get('liveStatus') + .put('wasLive') + user + .get(Key.POSTS_NEW) + .get(postId) + .get('contentItems') + .get(contentId) + .get('playbackMagnet') + .put(j.magnet) + clearStreamInterval(postId,"intervalsWaitingMp4") + //eslint-disable-next-line no-empty + } catch{} + }, 5 * 1000) +}) + diff --git a/src/routes.js b/src/routes.js index a4a9deeb..e0081722 100644 --- a/src/routes.js +++ b/src/routes.js @@ -16,8 +16,6 @@ const Big = require('big.js') const size = require('lodash/size') const { range, flatten, evolve } = require('ramda') const path = require('path') -const fetch = require('node-fetch') -const EventEmitter = require('events') const getListPage = require('../utils/paginate') const auth = require('../services/auth/auth') @@ -37,6 +35,7 @@ const GunKey = require('../services/gunDB/contact-api/key') const LV2 = require('../utils/lightningServices/v2') const GunWriteRPC = require('../services/gunDB/rpc') const Key = require('../services/gunDB/contact-api/key') +const { startedStream, endStream } = require('../services/streams') const DEFAULT_MAX_NUM_ROUTES_TO_QUERY = 10 const SESSION_ID = uuid() @@ -56,30 +55,6 @@ module.exports = async ( }) }) - const StreamLiveManager = new EventEmitter() - StreamLiveManager.on('followStream', data => { - const { postId, contentId, statusUrl } = data - const user = require('../services/gunDB/Mediator').getUser() - const interval = setInterval(async () => { - console.log('check!') - console.log(statusUrl) - const res = await fetch(statusUrl) - const j = await res.json() - console.log(j) - if (!j.isLive) { - return - } - user - .get(Key.POSTS_NEW) - .get(postId) - .get('contentItems') - .get(contentId) - .get('liveStatus') - .put('live') - clearInterval(interval) - }, 2000) - }) - const sanitizeLNDError = (message = '') => { if (message.toLowerCase().includes('unknown')) { const splittedMessage = message.split('UNKNOWN: ') @@ -3323,7 +3298,21 @@ module.exports = async ( //this is for wasLive/isLive status ap.post('/api/listenStream', (req, res) => { try { - StreamLiveManager.emit('followStream', req.body) + startedStream(req.body) + return res.status(200).json({ + ok: true + }) + } catch (e) { + console.log(e) + return res.status(500).json({ + errorMessage: + (typeof e === 'string' ? e : e.message) || 'Unknown error.' + }) + } + }) + ap.post('/api/stopStream', (req, res) => { + try { + endStream(req.body) return res.status(200).json({ ok: true })