Consolexin's blog Consolexin's blog
首页
  • 算法基础

    • 图论
    • 字符串
    • 动态规划
    • 二分
    • 滑动窗口
    • 排序
  • Project

    • CppServer
  • 相关书籍

    • 现代C++编程
  • 书籍

    • SQL必知必会
    • MySQL必知必会
分类
标签
归档
GitHub (opens new window)

Consolexinhun

小学生
首页
  • 算法基础

    • 图论
    • 字符串
    • 动态规划
    • 二分
    • 滑动窗口
    • 排序
  • Project

    • CppServer
  • 相关书籍

    • 现代C++编程
  • 书籍

    • SQL必知必会
    • MySQL必知必会
分类
标签
归档
GitHub (opens new window)
  • README
  • day01-从一个最简单的socket开始
  • day02-不要放过任何一个错误
  • day03-高并发还得用epoll
  • day04-来看看我们的第一个类
  • day05-epoll高级用法-Channel登场
  • day06-服务器与事件驱动核心类登场
  • day07-为我们的服务器添加一个Acceptor
  • day08-一切皆是类,连TCP连接也不例外
  • day09-缓冲区-大作用
  • day10-加入线程池到服务器
  • day11-完善线程池,加入一个简单的测试程序
  • day12-将服务器改写为主从Reactor多线程模式
  • day13-支持业务逻辑自定义、完善Connection类
  • day14-重构核心库、使用智能指针
  • day15-重构Connection、修改生命周期
    • day16-使用CMake工程化
    • day17-使用EventLoopThreadPool、移交EventLoop
    • day18-HTTP有限状态转换机
    • day19-创建HTTP响应,实现HTTP服务器
    • day20-定时器的创建使用
    • day21-服务器主动关闭连接
    • day22-初步涉及日志库,定义自己的输出流LogStream
    • day23-定义前端日志库,实现同步输出
    • day24-异步日志库
    • day25-更有效的缓冲区
    • day26-监听写事件
    • day27-处理静态文件,实现POST请求
    • day28-文件服务器的简单实现,文件的展示和下载
    • day29-文件的上传
    • day30-WebBench的测试
    • CppServer
    consolexinhun
    2025-04-20
    目录

    day15-重构Connection、修改生命周期

    # day15-重构TcpConnection、修改生命周期

    • 首先,在debug过程中,对内容在此进行了精简,去掉了一些无效的代码,例如将Socket类去掉了,这是因为Socket类定义的操作一般只由Accpetor来调用,直接将其封装在Acceptor中更容易让人理解。

    • 本章内容偏多,主要是为了理清程序运行的逻辑时,对代码进行了大幅度的更改。

    在昨天的重构中,将每个类独属的资源使用unique_ptr进行了包装。但是对于TcpConnection这个类,其生命周期模糊,使用unique_ptr很容易导致内存泄漏。这是因为我们的对于TcpConnection是被动关闭,当我们channel在handleEvent时,发现客户端传入了关闭连接的信息后,直接对onClose进行了调用。因此如果使用unqiue_ptr时,我们在调用onclose时已经销毁了tcpconnection,而对应的channel也会被移除,但是此时的HandleEvent并没有结束,因此存在了内存泄漏。

    • 针对,这个问题,总要从三个步骤进行。
      1. 使用shared_ptr智能指针管理TcpConnection。
      2. 在HandleEvent和销毁时,增加引用计数。
      3. 将HandleClose操作移交给main_reactor_进行。

    # 使用shared_ptr对TcpConnection进行管理

    为了解决这个问题,我们采用shared_ptr对TcpConnection进行了管理。这样就方便延长TcpConnection的生命周期。

    具体的应用并不赘述,将TcpConnection继承自enable_shared_from_this即可使用shared_ptr管理。并在TcpServer中使用shared_ptr保存TcpConnection。

    // TcpConnection.h
    class TcpConnection : public std::enable_shared_from_this<TcpConnection>{
        //...
    }
    
    // TcpServer.h
    class TcpServer{
        private:
            std::map<int, std::shared_ptr<TcpConnection>> connectionsMap_;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    # 增加TcpConnection的引用计数

    在当前状态下,在创建TcpConnection会将其加入到connectionsMap_使其引用计数变成了1,之后当TcpConnection处理HandleEvent受到关闭信号时,会直接调用HandleClose,这时会将TcpConnection从connectionsMaps_释放,引用计数变成0,直接销毁,但是HandleEvent并没有处理结束,从而导致了内存泄漏。

    为了解决该问题,进行了两点处理。

    1. 在HandleEvent时,增加了引用计数。

    具体的,在Channel处增加一个指向TcpConnection的weak_ptr,当进行HandleEvent时,增加其应用计数。

    // Channel.h
    class Channel{
    public:
        void HandleEvent() const;
        void HandleEventWithGuard() const;
        void Tie(const std::shared_ptr<void> &ptr);
    private:
        std::weak_ptr<void> tie_;
    }
    //Channel.cpp
    void Channel::HandleEvent() const{
        if(tie_){
            std::shared_ptr<void> guard() = tie_.lock();
            HandleEventWithGuard();
        }else{
            HandleEventWithGuard();
        }
    }
    void Channel::HandleEventWithGuard() const{
        if(ready_events_ & READ_EVENT){
            read_callback_();
        }
        if(ready_events_ & WRITE_EVENT){
            write_callback_();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

    当我们建立TcpConnection时,会首先将其绑定在Channel的tie_上,由于shared_from_this无法在构造函数处调用,因此将部分操作进行分离,并保证在构造函数执行结束后调用该函数。随后,在令其进行监听读操作,这样就可以保证Channel在HandleEvent时,会增加TcpConnection的引用计数。

    // TcpConnection.cpp
    void TcpConnection::ConnectionEstablished(){
        state_ = ConnectionState::Connected;
        channel_->Tie(shared_from_this());
        channel_->EnableRead();
        if (on_connect_){
            on_connect_(shared_from_this());
        }
    }
    
    // TcpServer.cpp
    inline void TcpServer::HandleNewConnection(int fd){
        assert(fd != -1);
        uint64_t random = fd % sub_reactors_.size();
        
        // 创建TcpConnection对象
        std::shared_ptr<TcpConnection> conn = std::make_shared<TcpConnection>(sub_reactors_[random].get(), fd, next_conn_id_);
        std::function<void(const std::shared_ptr<TcpConnection> &)> cb = std::bind(&TcpServer::HandleClose, this, std::placeholders::_1);
        conn->set_connection_callback(on_connect_);
        conn->set_close_callback(cb);
        conn->set_message_callback(on_message_);
    
        connectionsMap_[fd] = conn;
        // 分配id
        ++next_conn_id_;
        if(next_conn_id_ == 1000){
            next_conn_id_ = 1;
        }
    
        // 将connection分配给Channel的tie,增加计数 并开始监听读事件
        conn->ConnectionEstablished();
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32

    这样就保证了只有当HandleEvent结束后,TcpConnection的引用计数才会变成0。

    1. 在销毁时,HandleEvent结束后,增加引用计数。

    上述操作主要是在HandleEvent进行时,增加了TcpConnection的引用计数。在HandleEvent之后增加引用计数可以使程序更加鲁棒。

    具体的,我们在EventLoop处,增加一个to_do_list_列表,并在每次TcpConnection的销毁时,向to_do_list_处增加一个TcpConnection销毁程序从而增加TcpConnection的计数,这个列表中的任务只有在HandleEvent之运行,这样就保证了TcpConnection的销毁,必然时在HandleEvent之后的。

    void EventLoop::Loop(){
        while(true){
            for (Channel *active_ch : poller_->Poll()){
                active_ch->HandleEvent();
            }
            DoToDoList();
        }
    }
    
    void EventLoop::DoToDoList(){
        // 此时已经epoll_wait出来,可能存在阻塞在epoll_wait的可能性。
        std::vector < std::function<void()>> functors;
        {
            // 加锁 保证线程同步
            std::unique_lock<std::mutex> lock(mutex_); 
            functors.swap(to_do_list_);
        }
        for(const auto& func: functors){
            func();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    在HandleClose时,会将TcpConnection的ConnectionDestructor加入到to_do_list_中.

    // TcpServer.cpp
    
    void TcpConnection::ConnectionDestructor(){
        //std::cout << CurrentThread::tid() << " TcpConnection::ConnectionDestructor" << std::endl;
        // 将该操作从析构处,移植该处,增加性能,因为在析构前,当前`TcpConnection`已经相当于关闭了。
        // 已经可以将其从loop处离开。
        loop_->DeleteChannel(channel_.get());
    }
    
    inline void TcpServer::HandleClose(const std::shared_ptr<TcpConnection> & conn){
    
        auto it = connectionsMap_.find(conn->fd());
        assert(it != connectionsMap_.end());
        connectionsMap_.erase(connectionsMap_.find(conn->fd()));
    
        EventLoop *loop = conn->loop();
        loop->QueueOneFunc(std::bind(&TcpConnection::ConnectionDestructor, conn));
    }
    
    // EventLoop.cpp
    void EventLoop::QueueOneFunc(std::function<void()> cb){
        {
            // 加锁,保证线程同步
            std::unique_lock<std::mutex> lock(mutex_);
            to_do_list_.emplace_back(std::move(cb));
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

    当前的版本对TcpConnection的生命周期管理已经差不多是安全的了。在连接到来时,创建TcpConnection,并用shared_ptr管理,加入到ConnectionsMap_中,此时引用计数为1.

    而当HandleEvent时,使用weak_ptr.lock增加TcpConnection的应用计数,引用计数为2。

    当关闭时,TcpServer中erase后TcpConnection引用计数变为了1。之后将再次当前连接的销毁程序加入到DoToDoList使引用计数变为2, 而HandleEvent之后完之后引用计数变为了1。当DoToDoList执行完成之后,引用计数变成了0。便自动销毁。

    # 将从ConnectionsMaps_释放TcpConnection的操作移交给main_reactor_

    考虑这样一个问题,当同时有多个连接来连接时,而正好某个连接正好在关闭时,程序会发生什么。

    当sub_reactor_在处理HandleEvent时,接收到关闭请求,此时其会调用HandleClose,在这个函数中,会有一个connectionsMap_.erase()操作。但是此时main_reactor_可能正在接收连接会向connectionsMap_中添加连接。由于connectionsMap_底层是红黑树,并不支持同时读写操作。因此这是线程冲突的。

    因此对于此操作有两种可能,一个是加锁,另一个就是将HandleClose的中的connectionsMap_.erase操作移交给main_reactor_来操作。

    在这里实现了第二种操作,为了实现这种操作,必须要获得当前运行线程的id,并判断其与对应reactor_的线程id是否相同。

    我们使用定义了CurrenntThread来获取当前运行线程的线程id.

    由于EventLoop的创建是在主线程中,只是将EventLoop::Loop分配给了不同的子线程,因此在Loop函数中调用get_id()并将其保存在EventLoop的成员变量中。

    // EventLoop.cpp
    void EventLoop::Loop(){
        // 将Loop函数分配给了不同的线程,获取执行该函数的线程
        tid_ = CurrentThread::tid();
        while (true)
        {
            for (Channel *active_ch : poller_->Poll()){
                active_ch->HandleEvent();
            }
            DoToDoList();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    当我们判断当前运行的线程是否是EventLoop对应的线程,只需要比较tid_即可。

    bool EventLoop::IsInLoopThread(){
        return CurrentThread::tid() == tid_;
    }
    
    1
    2
    3

    通过以上操作,我们就可以将其保证connectionsMap_.erase操作由main_reactor_线程进行操作。

    具体的,我们对HandleClose进行一层额外的封装。当调用HandleClose时,会判断调用该函数的线程是否是main_reactor_对应的线程,如果是,就直接运行,如果不是,则加入main_reactor_的to_do_list_中,由main_reactor_后续进行操作。

    // TcpServer.cpp
    inline void TcpServer::HandleClose(const std::shared_ptr<TcpConnection> & conn){
        std::cout <<  CurrentThread::tid() << " TcpServer::HandleClose"  << std::endl;
        // 由main_reactor_来执行`HandleCloseInLoop`函数,来保证线程安全
        main_reactor_->RunOneFunc(std::bind(&TcpServer::HandleCloseInLoop, this, conn));
    }
    
    inline void TcpServer::HandleCloseInLoop(const std::shared_ptr<TcpConnection> & conn){
        std::cout << CurrentThread::tid() << " TcpServer::HandleCloseInLoop - Remove connection id: " <<  conn->id() << " and fd: " << conn->fd() << std::endl;
        auto it = connectionsMap_.find(conn->fd());
        assert(it != connectionsMap_.end());
        connectionsMap_.erase(connectionsMap_.find(conn->fd()));
    
        EventLoop *loop = conn->loop();
        loop->QueueOneFunc(std::bind(&TcpConnection::ConnectionDestructor, conn));
    }
    
    // EventLoop.cpp
    void EventLoop::RunOneFunc(std::function<void()> cb){
        if(IsInLoopThread()){
            cb();
        }else{
            QueueOneFunc(cb);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    # eventfd异步唤醒机制

    但是上述仍然存在一个比较严重的问题,由于to_do_list_只有在HandleEvent之后进行处理,如果当前Epoller监听的没有事件发生,那么就会堵塞在epoll_wait处,这对于服务器的性能影响是灾难性的。为此,我们希望在将任务加入to_do_list_时,唤醒相应的Epoller。

    为了实现该操作,在EventLoop处,增加了一个wakeup_channel_,并对其进行监听读操作。当我们为to_do_list_添加任务时,如果如果不是当前线程,就随便往wakeup_channel_对应的fd写点东西,此时,读事件会监听到,就不会再阻塞在epoll_wait中了,并可以迅速执行HandleCloseInLoop操作,释放TcpConnection。

    muduo中主要是通过eventfd来实现的。

    // EventLoop.cpp
    EventLoop::EventLoop() : tid_(CurrentThread::tid()) { 
        poller_ = std::make_unique<Epoller>();
        wakeup_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
        wakeup_channel_ = std::make_unique<Channel>(wakeup_fd_, this);
        calling_functors_ = false;
    
        wakeup_channel_->set_read_callback(std::bind(&EventLoop::HandleRead, this));
        wakeup_channel_->EnableRead();
    }
    
    
    void EventLoop::QueueOneFunc(std::function<void()> cb){
        {
            // 加锁,保证线程同步
            std::unique_lock<std::mutex> lock(mutex_);
            to_do_list_.emplace_back(std::move(cb));
        }
    
        // 如果调用当前函数的并不是当前当前EventLoop对应的的线程,将其唤醒。主要用于关闭TcpConnection
        // 由于关闭连接是由对应`TcpConnection`所发起的,但是关闭连接的操作应该由main_reactor所进行(为了释放ConnectionMap的所持有的TcpConnection)
        if (!IsInLoopThread() || calling_functors_) {
            uint64_t write_one_byte = 1;  
            ssize_t write_size = ::write(wakeup_fd_, &write_one_byte, sizeof(write_one_byte));
            (void) write_size;
            assert(write_size == sizeof(write_one_byte));
        } 
    }
    
    
    void EventLoop::HandleRead(){
        // 用于唤醒EventLoop
        uint64_t read_one_byte = 1;
        ssize_t read_size = ::read(wakeup_fd_, &read_one_byte, sizeof(read_one_byte));
        (void) read_size;
        assert(read_size == sizeof(read_one_byte));
        return;
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    编辑 (opens new window)
    上次更新: 2025/05/21, 06:42:57
    day14-重构核心库、使用智能指针
    day16-使用CMake工程化

    ← day14-重构核心库、使用智能指针 day16-使用CMake工程化→

    最近更新
    01
    6-其他操作
    05-20
    02
    4-联结
    05-20
    03
    7-管理
    05-20
    更多文章>
    Theme by Vdoing | Copyright © 2019-2025 Consolexinhun | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式
    ×