forked from Glimesh/janus-ftl-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRelayThreadPool.cpp
More file actions
127 lines (108 loc) · 3.27 KB
/
RelayThreadPool.cpp
File metadata and controls
127 lines (108 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
* @file RelayThreadPool.cpp
* @author Hayden McAfee (hayden@outlook.com)
* @version 0.1
* @date 2020-08-28
*
* @copyright Copyright (c) 2020 Hayden McAfee
*
*/
#include "RelayThreadPool.h"
#include "FtlStreamStore.h"
#include "FtlStream.h"
#include "JanusSession.h"
extern "C"
{
#include <debug.h>
}
#pragma region Constructor/Destructor
RelayThreadPool::RelayThreadPool(
std::shared_ptr<FtlStreamStore> ftlStreamStore,
unsigned int threadCount) :
ftlStreamStore(ftlStreamStore),
threadCount(threadCount)
{ }
#pragma endregion
#pragma region Public methods
void RelayThreadPool::Start()
{
std::lock_guard<std::mutex> lock(threadVectorMutex);
for (unsigned int i = 0; i < threadCount; ++i)
{
auto thread = std::thread(&RelayThreadPool::relayThreadMethod, this, i);
relayThreads.push_back(std::move(thread));
}
}
void RelayThreadPool::Stop()
{
std::lock_guard<std::mutex> lock(threadVectorMutex);
// Notify threads that we're stopping
stopping = true;
relayThreadCondition.notify_all();
// Join all threads
for (auto& thread : relayThreads)
{
if (thread.joinable())
{
thread.join();
}
}
// Clear thread pool
relayThreads.clear();
}
void RelayThreadPool::RelayPacket(RtpRelayPacket packet)
{
{
std::lock_guard<std::mutex> lock(relayMutex);
packetRelayQueue.push(packet);
}
relayThreadCondition.notify_one();
}
#pragma endregion
#pragma region Private methods
void RelayThreadPool::relayThreadMethod(unsigned int threadNumber)
{
JANUS_LOG(LOG_INFO, "FTL: Relay thread pool thread #%d started.\n", threadNumber);
std::unique_lock<std::mutex> lock(relayMutex);
while (true)
{
// Wait to be signaled that we have new packets to process.
// NOTE: `wait` will automatically release the lock until the condition
// variable is triggered, at which point it will hold the lock again.
relayThreadCondition.wait(lock,
[this]()
{
return (packetRelayQueue.size() > 0) || stopping;
});
// If we're stopping, release the lock and exit the thread.
if (stopping)
{
break;
}
// Pop a packet off of the queue, then clear and unlock the mutex so it can continue
// being filled while we're processing.
RtpRelayPacket packet = std::move(packetRelayQueue.front());
packetRelayQueue.pop();
lock.unlock();
// Find the stream we're sending this to the viewers of
std::shared_ptr<FtlStream> originStream =
ftlStreamStore->GetStreamByChannelId(packet.channelId);
if (originStream == nullptr)
{
JANUS_LOG(
LOG_WARN,
"FTL: Packet relay failed for non-existant stream with channel ID %lu\n",
packet.channelId);
continue;
}
std::list<std::shared_ptr<JanusSession>> channelSessions =
originStream->GetViewers();
for (const auto& session : channelSessions)
{
session->SendRtpPacket(packet);
}
lock.lock();
}
JANUS_LOG(LOG_INFO, "FTL: Relay thread pool thread #%d terminated.\n", threadNumber);
}
#pragma endregion