day15-重构Connection、修改生命周期
# day15-重构TcpConnection、修改生命周期
首先,在debug过程中,对内容在此进行了精简,去掉了一些无效的代码,例如将
Socket类去掉了,这是因为Socket类定义的操作一般只由Accpetor来调用,直接将其封装在Acceptor中更容易让人理解。本章内容偏多,主要是为了理清程序运行的逻辑时,对代码进行了大幅度的更改。
在昨天的重构中,将每个类独属的资源使用unique_ptr进行了包装。但是对于TcpConnection这个类,其生命周期模糊,使用unique_ptr很容易导致内存泄漏。这是因为我们的对于TcpConnection是被动关闭,当我们channel在handleEvent时,发现客户端传入了关闭连接的信息后,直接对onClose进行了调用。因此如果使用unqiue_ptr时,我们在调用onclose时已经销毁了tcpconnection,而对应的channel也会被移除,但是此时的HandleEvent并没有结束,因此存在了内存泄漏。
- 针对,这个问题,总要从三个步骤进行。
- 使用
shared_ptr智能指针管理TcpConnection。 - 在
HandleEvent和销毁时,增加引用计数。 - 将
HandleClose操作移交给main_reactor_进行。
- 使用
# 使用shared_ptr对TcpConnection进行管理
为了解决这个问题,我们采用shared_ptr对TcpConnection进行了管理。这样就方便延长TcpConnection的生命周期。
具体的应用并不赘述,将TcpConnection继承自enable_shared_from_this即可使用shared_ptr管理。并在TcpServer中使用shared_ptr保存TcpConnection。
// TcpConnection.h
class TcpConnection : public std::enable_shared_from_this<TcpConnection>{
//...
}
// TcpServer.h
class TcpServer{
private:
std::map<int, std::shared_ptr<TcpConnection>> connectionsMap_;
}
2
3
4
5
6
7
8
9
10
# 增加TcpConnection的引用计数
在当前状态下,在创建TcpConnection会将其加入到connectionsMap_使其引用计数变成了1,之后当TcpConnection处理HandleEvent受到关闭信号时,会直接调用HandleClose,这时会将TcpConnection从connectionsMaps_释放,引用计数变成0,直接销毁,但是HandleEvent并没有处理结束,从而导致了内存泄漏。
为了解决该问题,进行了两点处理。
- 在
HandleEvent时,增加了引用计数。
具体的,在Channel处增加一个指向TcpConnection的weak_ptr,当进行HandleEvent时,增加其应用计数。
// Channel.h
class Channel{
public:
void HandleEvent() const;
void HandleEventWithGuard() const;
void Tie(const std::shared_ptr<void> &ptr);
private:
std::weak_ptr<void> tie_;
}
//Channel.cpp
void Channel::HandleEvent() const{
if(tie_){
std::shared_ptr<void> guard() = tie_.lock();
HandleEventWithGuard();
}else{
HandleEventWithGuard();
}
}
void Channel::HandleEventWithGuard() const{
if(ready_events_ & READ_EVENT){
read_callback_();
}
if(ready_events_ & WRITE_EVENT){
write_callback_();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
当我们建立TcpConnection时,会首先将其绑定在Channel的tie_上,由于shared_from_this无法在构造函数处调用,因此将部分操作进行分离,并保证在构造函数执行结束后调用该函数。随后,在令其进行监听读操作,这样就可以保证Channel在HandleEvent时,会增加TcpConnection的引用计数。
// TcpConnection.cpp
void TcpConnection::ConnectionEstablished(){
state_ = ConnectionState::Connected;
channel_->Tie(shared_from_this());
channel_->EnableRead();
if (on_connect_){
on_connect_(shared_from_this());
}
}
// TcpServer.cpp
inline void TcpServer::HandleNewConnection(int fd){
assert(fd != -1);
uint64_t random = fd % sub_reactors_.size();
// 创建TcpConnection对象
std::shared_ptr<TcpConnection> conn = std::make_shared<TcpConnection>(sub_reactors_[random].get(), fd, next_conn_id_);
std::function<void(const std::shared_ptr<TcpConnection> &)> cb = std::bind(&TcpServer::HandleClose, this, std::placeholders::_1);
conn->set_connection_callback(on_connect_);
conn->set_close_callback(cb);
conn->set_message_callback(on_message_);
connectionsMap_[fd] = conn;
// 分配id
++next_conn_id_;
if(next_conn_id_ == 1000){
next_conn_id_ = 1;
}
// 将connection分配给Channel的tie,增加计数 并开始监听读事件
conn->ConnectionEstablished();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
这样就保证了只有当HandleEvent结束后,TcpConnection的引用计数才会变成0。
- 在销毁时,
HandleEvent结束后,增加引用计数。
上述操作主要是在HandleEvent进行时,增加了TcpConnection的引用计数。在HandleEvent之后增加引用计数可以使程序更加鲁棒。
具体的,我们在EventLoop处,增加一个to_do_list_列表,并在每次TcpConnection的销毁时,向to_do_list_处增加一个TcpConnection销毁程序从而增加TcpConnection的计数,这个列表中的任务只有在HandleEvent之运行,这样就保证了TcpConnection的销毁,必然时在HandleEvent之后的。
void EventLoop::Loop(){
while(true){
for (Channel *active_ch : poller_->Poll()){
active_ch->HandleEvent();
}
DoToDoList();
}
}
void EventLoop::DoToDoList(){
// 此时已经epoll_wait出来,可能存在阻塞在epoll_wait的可能性。
std::vector < std::function<void()>> functors;
{
// 加锁 保证线程同步
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(to_do_list_);
}
for(const auto& func: functors){
func();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
在HandleClose时,会将TcpConnection的ConnectionDestructor加入到to_do_list_中.
// TcpServer.cpp
void TcpConnection::ConnectionDestructor(){
//std::cout << CurrentThread::tid() << " TcpConnection::ConnectionDestructor" << std::endl;
// 将该操作从析构处,移植该处,增加性能,因为在析构前,当前`TcpConnection`已经相当于关闭了。
// 已经可以将其从loop处离开。
loop_->DeleteChannel(channel_.get());
}
inline void TcpServer::HandleClose(const std::shared_ptr<TcpConnection> & conn){
auto it = connectionsMap_.find(conn->fd());
assert(it != connectionsMap_.end());
connectionsMap_.erase(connectionsMap_.find(conn->fd()));
EventLoop *loop = conn->loop();
loop->QueueOneFunc(std::bind(&TcpConnection::ConnectionDestructor, conn));
}
// EventLoop.cpp
void EventLoop::QueueOneFunc(std::function<void()> cb){
{
// 加锁,保证线程同步
std::unique_lock<std::mutex> lock(mutex_);
to_do_list_.emplace_back(std::move(cb));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
当前的版本对TcpConnection的生命周期管理已经差不多是安全的了。在连接到来时,创建TcpConnection,并用shared_ptr管理,加入到ConnectionsMap_中,此时引用计数为1.
而当HandleEvent时,使用weak_ptr.lock增加TcpConnection的应用计数,引用计数为2。
当关闭时,TcpServer中erase后TcpConnection引用计数变为了1。之后将再次当前连接的销毁程序加入到DoToDoList使引用计数变为2, 而HandleEvent之后完之后引用计数变为了1。当DoToDoList执行完成之后,引用计数变成了0。便自动销毁。
# 将从ConnectionsMaps_释放TcpConnection的操作移交给main_reactor_
考虑这样一个问题,当同时有多个连接来连接时,而正好某个连接正好在关闭时,程序会发生什么。
当sub_reactor_在处理HandleEvent时,接收到关闭请求,此时其会调用HandleClose,在这个函数中,会有一个connectionsMap_.erase()操作。但是此时main_reactor_可能正在接收连接会向connectionsMap_中添加连接。由于connectionsMap_底层是红黑树,并不支持同时读写操作。因此这是线程冲突的。
因此对于此操作有两种可能,一个是加锁,另一个就是将HandleClose的中的connectionsMap_.erase操作移交给main_reactor_来操作。
在这里实现了第二种操作,为了实现这种操作,必须要获得当前运行线程的id,并判断其与对应reactor_的线程id是否相同。
我们使用定义了CurrenntThread来获取当前运行线程的线程id.
由于EventLoop的创建是在主线程中,只是将EventLoop::Loop分配给了不同的子线程,因此在Loop函数中调用get_id()并将其保存在EventLoop的成员变量中。
// EventLoop.cpp
void EventLoop::Loop(){
// 将Loop函数分配给了不同的线程,获取执行该函数的线程
tid_ = CurrentThread::tid();
while (true)
{
for (Channel *active_ch : poller_->Poll()){
active_ch->HandleEvent();
}
DoToDoList();
}
}
2
3
4
5
6
7
8
9
10
11
12
当我们判断当前运行的线程是否是EventLoop对应的线程,只需要比较tid_即可。
bool EventLoop::IsInLoopThread(){
return CurrentThread::tid() == tid_;
}
2
3
通过以上操作,我们就可以将其保证connectionsMap_.erase操作由main_reactor_线程进行操作。
具体的,我们对HandleClose进行一层额外的封装。当调用HandleClose时,会判断调用该函数的线程是否是main_reactor_对应的线程,如果是,就直接运行,如果不是,则加入main_reactor_的to_do_list_中,由main_reactor_后续进行操作。
// TcpServer.cpp
inline void TcpServer::HandleClose(const std::shared_ptr<TcpConnection> & conn){
std::cout << CurrentThread::tid() << " TcpServer::HandleClose" << std::endl;
// 由main_reactor_来执行`HandleCloseInLoop`函数,来保证线程安全
main_reactor_->RunOneFunc(std::bind(&TcpServer::HandleCloseInLoop, this, conn));
}
inline void TcpServer::HandleCloseInLoop(const std::shared_ptr<TcpConnection> & conn){
std::cout << CurrentThread::tid() << " TcpServer::HandleCloseInLoop - Remove connection id: " << conn->id() << " and fd: " << conn->fd() << std::endl;
auto it = connectionsMap_.find(conn->fd());
assert(it != connectionsMap_.end());
connectionsMap_.erase(connectionsMap_.find(conn->fd()));
EventLoop *loop = conn->loop();
loop->QueueOneFunc(std::bind(&TcpConnection::ConnectionDestructor, conn));
}
// EventLoop.cpp
void EventLoop::RunOneFunc(std::function<void()> cb){
if(IsInLoopThread()){
cb();
}else{
QueueOneFunc(cb);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# eventfd异步唤醒机制
但是上述仍然存在一个比较严重的问题,由于to_do_list_只有在HandleEvent之后进行处理,如果当前Epoller监听的没有事件发生,那么就会堵塞在epoll_wait处,这对于服务器的性能影响是灾难性的。为此,我们希望在将任务加入to_do_list_时,唤醒相应的Epoller。
为了实现该操作,在EventLoop处,增加了一个wakeup_channel_,并对其进行监听读操作。当我们为to_do_list_添加任务时,如果如果不是当前线程,就随便往wakeup_channel_对应的fd写点东西,此时,读事件会监听到,就不会再阻塞在epoll_wait中了,并可以迅速执行HandleCloseInLoop操作,释放TcpConnection。
muduo中主要是通过eventfd来实现的。
// EventLoop.cpp
EventLoop::EventLoop() : tid_(CurrentThread::tid()) {
poller_ = std::make_unique<Epoller>();
wakeup_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
wakeup_channel_ = std::make_unique<Channel>(wakeup_fd_, this);
calling_functors_ = false;
wakeup_channel_->set_read_callback(std::bind(&EventLoop::HandleRead, this));
wakeup_channel_->EnableRead();
}
void EventLoop::QueueOneFunc(std::function<void()> cb){
{
// 加锁,保证线程同步
std::unique_lock<std::mutex> lock(mutex_);
to_do_list_.emplace_back(std::move(cb));
}
// 如果调用当前函数的并不是当前当前EventLoop对应的的线程,将其唤醒。主要用于关闭TcpConnection
// 由于关闭连接是由对应`TcpConnection`所发起的,但是关闭连接的操作应该由main_reactor所进行(为了释放ConnectionMap的所持有的TcpConnection)
if (!IsInLoopThread() || calling_functors_) {
uint64_t write_one_byte = 1;
ssize_t write_size = ::write(wakeup_fd_, &write_one_byte, sizeof(write_one_byte));
(void) write_size;
assert(write_size == sizeof(write_one_byte));
}
}
void EventLoop::HandleRead(){
// 用于唤醒EventLoop
uint64_t read_one_byte = 1;
ssize_t read_size = ::read(wakeup_fd_, &read_one_byte, sizeof(read_one_byte));
(void) read_size;
assert(read_size == sizeof(read_one_byte));
return;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39