Consolexin's blog Consolexin's blog
首页
  • 算法基础

    • 图论
    • 字符串
    • 动态规划
    • 二分
    • 滑动窗口
    • 排序
  • Project

    • CppServer
  • 相关书籍

    • 现代C++编程
  • 书籍

    • SQL必知必会
    • MySQL必知必会
分类
标签
归档
GitHub (opens new window)

Consolexinhun

小学生
首页
  • 算法基础

    • 图论
    • 字符串
    • 动态规划
    • 二分
    • 滑动窗口
    • 排序
  • Project

    • CppServer
  • 相关书籍

    • 现代C++编程
  • 书籍

    • SQL必知必会
    • MySQL必知必会
分类
标签
归档
GitHub (opens new window)
  • README
  • day01-从一个最简单的socket开始
  • day02-不要放过任何一个错误
  • day03-高并发还得用epoll
  • day04-来看看我们的第一个类
  • day05-epoll高级用法-Channel登场
  • day06-服务器与事件驱动核心类登场
  • day07-为我们的服务器添加一个Acceptor
  • day08-一切皆是类,连TCP连接也不例外
  • day09-缓冲区-大作用
  • day10-加入线程池到服务器
  • day11-完善线程池,加入一个简单的测试程序
  • day12-将服务器改写为主从Reactor多线程模式
  • day13-支持业务逻辑自定义、完善Connection类
  • day14-重构核心库、使用智能指针
  • day15-重构Connection、修改生命周期
  • day16-使用CMake工程化
  • day17-使用EventLoopThreadPool、移交EventLoop
  • day18-HTTP有限状态转换机
  • day19-创建HTTP响应,实现HTTP服务器
  • day20-定时器的创建使用
  • day21-服务器主动关闭连接
  • day22-初步涉及日志库,定义自己的输出流LogStream
  • day23-定义前端日志库,实现同步输出
  • day24-异步日志库
  • day25-更有效的缓冲区
  • day26-监听写事件
  • day27-处理静态文件,实现POST请求
  • day28-文件服务器的简单实现,文件的展示和下载
  • day29-文件的上传
  • day30-WebBench的测试
  • CppServer
consolexinhun
2025-04-20

day17-使用EventLoopThreadPool、移交EventLoop

# day17-使用EventLoopThreadPool、移交EventLoop

在之前的操作中, 我们在main_reactor中创建了sub_reactor,但是这意味着只有在Loop时才能够调用CurrentThread::tid()获取j对应的线程ID,并将其认定为自身的sub_reactor的所属线程ID,这并没有问题,但是条理上不够清晰。

我们期望在将创建相应的线程,之后将sub_reactor的任务直接分配给相应的线程,并在初始化直接绑定线程ID,这更方便理解,也更方便将不同的模块进行分离。

此外,对于我们的多线程服务器,线程池其实并不需要task_queue,因为每一个线程执行的任务是确定且绑定的,因此也可以使用更简单且便于理解的线程池。

为了将构造EventLoop也就是构造sub_reactor的任务交给子线程并且我们的主线程能够调用相应的EventLoop绑定相应的TcpConnection。创建了EventLoopThread类,这个类将由主线程进行管理。该类主要是EventLoop与线程之间的操作。

当我们创建线程时,子线程将首先创建一个EventLoop对象,之后由主线程获取该对象的地址,并执行Loop函数。

void EventLoopThread::ThreadFunc(){
    // 由IO线程创建EventLoop对象
    EventLoop loop;
    {
        // 加锁
        std::unique_lock<std::mutex> lock(mutex_);
        loop_ = &loop; // 获取子线程的地址
        cv_.notify_one(); // loop_被创建成功,发送通知,唤醒主线程。
    }

    loop_->Loop(); // 开始循环,直到析构
    {
        std::unique_lock<std::mutex> lock(mutex_);
        loop_ = nullptr;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

而我们的主线程,将创建执行该函数的子线程,并获得由子线程所创建的EventLoop的地址。

EventLoop *EventLoopThread::StartLoop(){
    // 绑定当前线程的所执行的函数,并创建子线程
    // 在这个线程中创建EventLoop.
    thread_ = std::thread(std::bind(&EventLoopThread::ThreadFunc, this));
    EventLoop *loop = nullptr;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        while (loop_ == NULL){
            cv_.wait(lock); // 当IO线程未创建LOOP时,阻塞
        }
        // 将IO线程创建的loop_赋给主线程。
        loop = loop_;
    }
    // 返回创建好的线程。
    return loop;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

构造了EventLoopThread之后,EventLoopThreadPool就非常简单了,创建相应的EventLoopThread对象,并执行StartLoop函数,保存创建的EventLoop的地址,并在需要时返回即可。

// EventLoopThreadPool
class EventLoopThreadPool{

    public:
        DISALLOW_COPY_AND_MOVE(EventLoopThreadPool);
        EventLoopThreadPool(EventLoop *loop);
        ~EventLoopThreadPool();

        void SetThreadNums(int thread_nums);

        void start();

        // 获取线程池中的EventLoop
        EventLoop *nextloop();

    private:
        EventLoop *main_reactor_;
        // 保存对应的线程对象
        std::vector<std::unique_ptr<EventLoopThread>> threads_;
        // 保存创建的EventLoop
        std::vector<EventLoop *> loops_;

        int thread_nums_;

        int next_;
};
EventLoopThreadPool::EventLoopThreadPool(EventLoop *loop)
    : main_reactor_(loop),
      thread_nums_(0),
      next_(0){};

EventLoopThreadPool::~EventLoopThreadPool(){}

void EventLoopThreadPool::start(){
    for (int i = 0; i < thread_nums_; ++i){
        // 创建EventLoopThread对象,并保存由子线程创建的EventLoop的地址
        std::unique_ptr<EventLoopThread> ptr = std::make_unique<EventLoopThread>();
        threads_.push_back(std::move(ptr));
        loops_.push_back(threads_.back()->StartLoop());
    }
}

EventLoop *EventLoopThreadPool::nextloop(){
    EventLoop *ret = main_reactor_;
    if (!loop_.empty()){
        ret = loops_[next_++];
        // 采用轮询法调度。
        if (next_ == static_cast<int>(loops_.size())){
            next_ = 0;
        }
    }
    return ret;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

而我们只需要在TcpServer中创建一个EventLoopThreadPool对象,并对这个对象进行操作即可。并在启动服务器时,创建子线程EventLoop。

// TcpServer.cpp
TcpServer::TcpServer(EventLoop *loop, const char * ip, const int port){
    // ...
    // 创建线程池
    thread_pool_ = std::make_unique<EventLoopThreadPool>(loop);
}


void TcpServer::Start(){
    // 创建子线程和对应的EventLoop
    thread_pool_->start();
    // 主线程启动
    main_reactor_->Loop();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

上述的EventLoopThreadPool与之前的ThreadPool并没有性能上的提高,但是其将对线程的操作与TcpServer分离,将代码进一步的模块化,使可读性大大增强。

编辑 (opens new window)
上次更新: 2025/05/21, 06:42:57
day16-使用CMake工程化
day18-HTTP有限状态转换机

← day16-使用CMake工程化 day18-HTTP有限状态转换机→

最近更新
01
6-其他操作
05-20
02
4-联结
05-20
03
7-管理
05-20
更多文章>
Theme by Vdoing | Copyright © 2019-2025 Consolexinhun | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×