day17-使用EventLoopThreadPool、移交EventLoop
# day17-使用EventLoopThreadPool、移交EventLoop
在之前的操作中, 我们在main_reactor中创建了sub_reactor,但是这意味着只有在Loop时才能够调用CurrentThread::tid()获取j对应的线程ID,并将其认定为自身的sub_reactor的所属线程ID,这并没有问题,但是条理上不够清晰。
我们期望在将创建相应的线程,之后将sub_reactor的任务直接分配给相应的线程,并在初始化直接绑定线程ID,这更方便理解,也更方便将不同的模块进行分离。
此外,对于我们的多线程服务器,线程池其实并不需要task_queue,因为每一个线程执行的任务是确定且绑定的,因此也可以使用更简单且便于理解的线程池。
为了将构造EventLoop也就是构造sub_reactor的任务交给子线程并且我们的主线程能够调用相应的EventLoop绑定相应的TcpConnection。创建了EventLoopThread类,这个类将由主线程进行管理。该类主要是EventLoop与线程之间的操作。
当我们创建线程时,子线程将首先创建一个EventLoop对象,之后由主线程获取该对象的地址,并执行Loop函数。
void EventLoopThread::ThreadFunc(){
// 由IO线程创建EventLoop对象
EventLoop loop;
{
// 加锁
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop; // 获取子线程的地址
cv_.notify_one(); // loop_被创建成功,发送通知,唤醒主线程。
}
loop_->Loop(); // 开始循环,直到析构
{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
而我们的主线程,将创建执行该函数的子线程,并获得由子线程所创建的EventLoop的地址。
EventLoop *EventLoopThread::StartLoop(){
// 绑定当前线程的所执行的函数,并创建子线程
// 在这个线程中创建EventLoop.
thread_ = std::thread(std::bind(&EventLoopThread::ThreadFunc, this));
EventLoop *loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
while (loop_ == NULL){
cv_.wait(lock); // 当IO线程未创建LOOP时,阻塞
}
// 将IO线程创建的loop_赋给主线程。
loop = loop_;
}
// 返回创建好的线程。
return loop;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
构造了EventLoopThread之后,EventLoopThreadPool就非常简单了,创建相应的EventLoopThread对象,并执行StartLoop函数,保存创建的EventLoop的地址,并在需要时返回即可。
// EventLoopThreadPool
class EventLoopThreadPool{
public:
DISALLOW_COPY_AND_MOVE(EventLoopThreadPool);
EventLoopThreadPool(EventLoop *loop);
~EventLoopThreadPool();
void SetThreadNums(int thread_nums);
void start();
// 获取线程池中的EventLoop
EventLoop *nextloop();
private:
EventLoop *main_reactor_;
// 保存对应的线程对象
std::vector<std::unique_ptr<EventLoopThread>> threads_;
// 保存创建的EventLoop
std::vector<EventLoop *> loops_;
int thread_nums_;
int next_;
};
EventLoopThreadPool::EventLoopThreadPool(EventLoop *loop)
: main_reactor_(loop),
thread_nums_(0),
next_(0){};
EventLoopThreadPool::~EventLoopThreadPool(){}
void EventLoopThreadPool::start(){
for (int i = 0; i < thread_nums_; ++i){
// 创建EventLoopThread对象,并保存由子线程创建的EventLoop的地址
std::unique_ptr<EventLoopThread> ptr = std::make_unique<EventLoopThread>();
threads_.push_back(std::move(ptr));
loops_.push_back(threads_.back()->StartLoop());
}
}
EventLoop *EventLoopThreadPool::nextloop(){
EventLoop *ret = main_reactor_;
if (!loop_.empty()){
ret = loops_[next_++];
// 采用轮询法调度。
if (next_ == static_cast<int>(loops_.size())){
next_ = 0;
}
}
return ret;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
而我们只需要在TcpServer中创建一个EventLoopThreadPool对象,并对这个对象进行操作即可。并在启动服务器时,创建子线程EventLoop。
// TcpServer.cpp
TcpServer::TcpServer(EventLoop *loop, const char * ip, const int port){
// ...
// 创建线程池
thread_pool_ = std::make_unique<EventLoopThreadPool>(loop);
}
void TcpServer::Start(){
// 创建子线程和对应的EventLoop
thread_pool_->start();
// 主线程启动
main_reactor_->Loop();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
上述的EventLoopThreadPool与之前的ThreadPool并没有性能上的提高,但是其将对线程的操作与TcpServer分离,将代码进一步的模块化,使可读性大大增强。