stream manager

This commit is contained in:
hatim boufnichel 2021-05-12 23:01:32 +02:00
parent 03d73e1067
commit 7d8ee58e9c
2 changed files with 166 additions and 27 deletions

150
services/streams.js Normal file
View file

@ -0,0 +1,150 @@
const EventEmitter = require('events')
const fetch = require('node-fetch')
const Key = require('./gunDB/contact-api/key')
const StreamLiveManager = new EventEmitter()
const startedStream = (data) => {
StreamLiveManager.emit('awaitStream',data)
}
const endStream = (data) => {
StreamLiveManager.emit('endStream',data)
}
module.exports = {startedStream,endStream}
//-----------------------------------------
const intervalsWaitingAlive = {}
const intervalsStreamingViewers = {}
const intervalsWaitingMp4 = {}
const clearStreamInterval = (postId, map) => {
if(!postId){
return
}
if(map === "intervalsWaitingAlive"){
if(!intervalsWaitingAlive[postId]){
return
}
clearInterval(intervalsWaitingAlive[postId])
delete intervalsWaitingAlive[postId]
}
if(map === "intervalsStreamingViewers"){
if(!intervalsStreamingViewers[postId]){
return
}
clearInterval(intervalsStreamingViewers[postId])
delete intervalsStreamingViewers[postId]
}
if(map === "intervalsWaitingMp4"){
if(!intervalsWaitingMp4[postId]){
return
}
clearInterval(intervalsWaitingMp4[postId])
delete intervalsWaitingMp4[postId]
}
}
StreamLiveManager.on('awaitStream', data => {
const { postId, contentId, statusUrl } = data
if(intervalsWaitingAlive[postId]){
clearStreamInterval(intervalsWaitingAlive[postId])
}
const user = require('../services/gunDB/Mediator').getUser()
intervalsWaitingAlive[postId] = setInterval(async () => {
try {
const res = await fetch(statusUrl)
const j = await res.json()
if (!j.isLive) {
return
}
user
.get(Key.POSTS_NEW)
.get(postId)
.get('contentItems')
.get(contentId)
.get('liveStatus')
.put('live')
clearStreamInterval(postId,"intervalsWaitingAlive")
StreamLiveManager.emit('followStream', data)
//eslint-disable-next-line no-empty
} catch{}
}, 2 * 1000)
//kill sub after 10 minutes
setTimeout(()=>{
clearStreamInterval(postId,"intervalsWaitingAlive")
},10 * 60 * 1000)
})
StreamLiveManager.on('followStream', data => {
const { postId, contentId, statusUrl } = data
if(intervalsStreamingViewers[postId]){
clearStreamInterval(postId,"intervalsStreamingViewers")
}
const user = require('../services/gunDB/Mediator').getUser()
intervalsStreamingViewers[postId] = setInterval(async () => {
try {
const res = await fetch(statusUrl)
const j = await res.json()
if (typeof j.viewers !== 'number') {
return
}
user
.get(Key.POSTS_NEW)
.get(postId)
.get('contentItems')
.get(contentId)
.get('viewersCounter')
.put(j.viewers)
//eslint-disable-next-line no-empty
} catch{}
}, 5 * 1000)
})
StreamLiveManager.on('endStream', data => {
const { postId, contentId, endUrl, urlForMagnet, obsToken } = data
console.log("ending stream!")
clearStreamInterval(postId,"intervalsStreamingViewers")
if(intervalsWaitingMp4[postId]){
clearStreamInterval(postId,"intervalsWaitingMp4")
}
const user = require('../services/gunDB/Mediator').getUser()
user
.get(Key.POSTS_NEW)
.get(postId)
.get('contentItems')
.get(contentId)
.get('liveStatus')
.put('waiting')
fetch(endUrl,{
headers: {
'Authorization': `Bearer ${obsToken}`
},
})
intervalsWaitingMp4[postId] = setInterval(async () => {
try {
const res = await fetch(urlForMagnet)
const j = await res.json()
if (!j.magnet) {
return
}
user
.get(Key.POSTS_NEW)
.get(postId)
.get('contentItems')
.get(contentId)
.get('liveStatus')
.put('wasLive')
user
.get(Key.POSTS_NEW)
.get(postId)
.get('contentItems')
.get(contentId)
.get('playbackMagnet')
.put(j.magnet)
clearStreamInterval(postId,"intervalsWaitingMp4")
//eslint-disable-next-line no-empty
} catch{}
}, 5 * 1000)
})

View file

@ -16,8 +16,6 @@ const Big = require('big.js')
const size = require('lodash/size')
const { range, flatten, evolve } = require('ramda')
const path = require('path')
const fetch = require('node-fetch')
const EventEmitter = require('events')
const getListPage = require('../utils/paginate')
const auth = require('../services/auth/auth')
@ -37,6 +35,7 @@ const GunKey = require('../services/gunDB/contact-api/key')
const LV2 = require('../utils/lightningServices/v2')
const GunWriteRPC = require('../services/gunDB/rpc')
const Key = require('../services/gunDB/contact-api/key')
const { startedStream, endStream } = require('../services/streams')
const DEFAULT_MAX_NUM_ROUTES_TO_QUERY = 10
const SESSION_ID = uuid()
@ -56,30 +55,6 @@ module.exports = async (
})
})
const StreamLiveManager = new EventEmitter()
StreamLiveManager.on('followStream', data => {
const { postId, contentId, statusUrl } = data
const user = require('../services/gunDB/Mediator').getUser()
const interval = setInterval(async () => {
console.log('check!')
console.log(statusUrl)
const res = await fetch(statusUrl)
const j = await res.json()
console.log(j)
if (!j.isLive) {
return
}
user
.get(Key.POSTS_NEW)
.get(postId)
.get('contentItems')
.get(contentId)
.get('liveStatus')
.put('live')
clearInterval(interval)
}, 2000)
})
const sanitizeLNDError = (message = '') => {
if (message.toLowerCase().includes('unknown')) {
const splittedMessage = message.split('UNKNOWN: ')
@ -3323,7 +3298,21 @@ module.exports = async (
//this is for wasLive/isLive status
ap.post('/api/listenStream', (req, res) => {
try {
StreamLiveManager.emit('followStream', req.body)
startedStream(req.body)
return res.status(200).json({
ok: true
})
} catch (e) {
console.log(e)
return res.status(500).json({
errorMessage:
(typeof e === 'string' ? e : e.message) || 'Unknown error.'
})
}
})
ap.post('/api/stopStream', (req, res) => {
try {
endStream(req.body)
return res.status(200).json({
ok: true
})