muduo源码阅读笔记(11、TcpClient)

news/2024/7/10 19:53:32 标签: 笔记, c++, linux, 架构, 后端, github, 开源

muduo源码阅读笔记(11、TcpClient)

Muduo源码笔记系列:

muduo源码阅读笔记(0、下载编译muduo)

muduo源码阅读笔记(1、同步日志)

muduo源码阅读笔记(2、对C语言原生的线程安全以及同步的API的封装)

muduo源码阅读笔记(3、线程和线程池的封装)

muduo源码阅读笔记(4、异步日志)

muduo源码阅读笔记(5、Channel和Poller)

muduo源码阅读笔记(6、EvevntLoop和Thread)

muduo源码阅读笔记(7、EventLoopThreadPool)

muduo源码阅读笔记(8、定时器TimerQueue)

muduo源码阅读笔记(9、TcpServer)

muduo源码阅读笔记(10、TcpConnection)

muduo源码阅读笔记(11、TcpClient)

前言

本章新涉及的文件有:

  1. TcpClient.h/cc:和TcpServer不同的是,TcpClient位于客户端,主要是对客户发起的连接进行管理,TcpClient只有一个loop,也会和TcpConnection配合,将三次握手连接成功的sockfd交由TcpConnection管理。

  2. Connector.h/cc:Muduo将一个客户端的sock分成了两个阶段,分别是:连接阶段、读写阶段,Connector就是负责fd的连接阶段,当一个sockfd连接成功后,将sockfd传给TcpClient,由TcpClient将sockfd传给TcpConnection进行读写管理,Connector和TcpServer的Acceptor在设计上有这类似的思想,不同的是,Connector是可以针对同一个ip地址进行多次连接,产生不同的sockfd、而Acceptor是去读listen sock来接收连接,产生不同sockfd。

总体来说,TcpClient的实现是严格遵循TcpServer的实现的,

Connector的实现

提供的接口:

class Connector : noncopyable,
                  public std::enable_shared_from_this<Connector>{
public:
    typedef std::function<void (int sockfd)> NewConnectionCallback;

    Connector(EventLoop* loop, const InetAddress& serverAddr);
    ~Connector();

    void setNewConnectionCallback(const NewConnectionCallback& cb)
    { newConnectionCallback_ = cb; }

    void start();  // can be called in any thread
    void restart();  // must be called in loop thread
    void stop();  // can be called in any thread

    const InetAddress& serverAddress() const { return serverAddr_; }

    private:
    enum States { kDisconnected, kConnecting, kConnected };
    static const int kMaxRetryDelayMs = 30*1000;
    static const int kInitRetryDelayMs = 500;

    void setState(States s) { state_ = s; }
    void startInLoop();
    void stopInLoop();
    void connect();
    void connecting(int sockfd);
    void handleWrite();
    void handleError();
    void retry(int sockfd);
    int removeAndResetChannel();
    void resetChannel();

    EventLoop* loop_; // 连接发起所在loop
    InetAddress serverAddr_;  // 连接到哪里
    bool connect_; // atomic  // 开始连接?
    States state_;  // FIXME: use atomic variable // 连接状态
    std::unique_ptr<Channel> channel_;  // fd读写以及读写事件管理,对epoll/poll/selectIO多路复用的抽象,方便跨平台。
    NewConnectionCallback newConnectionCallback_; // 一般是:TcpClient::newConnection
    int retryDelayMs_;  // 连接重试毫秒数。
};

简单记录一下连接阶段启动流程:

调用Connector::start()->

  1. connect_ 赋值为 true。

  2. 在loop任务队列追加Connector::startInLoop()回调任务

    1. 执行回调任务:Connector::startInLoop()

    2. 调用Connector::connect()

      1. 创建非阻塞的连接sock

      2. ::connect(sock, …)

      3. 调用Connector::connecting(int sockfd)

        1. new channel(sockfd)赋值给channel_将Connector::handleWrite()和Connector::handleError()设置给cahnnel的写回调以及错误处理回调

        2. 使能Poller开始监听sockfd

当连接成功,会触发sockfd的写事件,从而调用Connector::handleWrite()->

  1. 将sockfd和channel_解绑,并将channel_ rest。

  2. 调用newConnectionCallback_(也即TcpClient::newConnection)将连接完成的sockfd传给TcpClient处理

感兴趣的读者,可以自行阅读源码,了解连接过程中,stop、retry的流程。

实现的伪代码:


void Connector::start(){
    connect_ = true;
    loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}

void Connector::startInLoop(){
    loop_->assertInLoopThread();
    assert(state_ == kDisconnected);
    if (connect_){
        connect();
    }else{
        LOG_DEBUG << "do not connect";
    }
}

void Connector::stop(){
    connect_ = false;
    loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
    // FIXME: cancel timer
}

void Connector::stopInLoop(){
    loop_->assertInLoopThread();
    if (state_ == kConnecting){
        setState(kDisconnected);
        int sockfd = removeAndResetChannel();
        retry(sockfd);
    }
}

void Connector::connect(){
    int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
    int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
    int savedErrno = (ret == 0) ? 0 : errno;
    switch (savedErrno){
        case 0:
        case EINPROGRESS:
        case EINTR:
        case EISCONN:
            connecting(sockfd);
            break;
        /*...*/
    }
}

void Connector::connecting(int sockfd){
    setState(kConnecting);
    assert(!channel_);
    channel_.reset(new Channel(loop_, sockfd));
    channel_->setWriteCallback(
        std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
    channel_->setErrorCallback(
        std::bind(&Connector::handleError, this)); // FIXME: unsafe

    // channel_->tie(shared_from_this()); is not working,
    // as channel_ is not managed by shared_ptr
    channel_->enableWriting();
}

int Connector::removeAndResetChannel(){
    channel_->disableAll();
    channel_->remove();
    int sockfd = channel_->fd();
    // Can't reset channel_ here, because we are inside Channel::handleEvent
    loop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafe
    return sockfd;
}

void Connector::resetChannel(){
    channel_.reset();
}

void Connector::handleWrite(){
    LOG_TRACE << "Connector::handleWrite " << state_;

    if (state_ == kConnecting){
        int sockfd = removeAndResetChannel();
        int err = sockets::getSocketError(sockfd);

        if (err){
            LOG_WARN << "Connector::handleWrite - SO_ERROR = "
                    << err << " " << strerror_tl(err);
            retry(sockfd);
        }else{
            setState(kConnected);
            if (connect_){
                newConnectionCallback_(sockfd);
            }else{
                sockets::close(sockfd);
            }
        }
    }else{
        // what happened?
        assert(state_ == kDisconnected);
    }
}

void Connector::handleError(){
    LOG_ERROR << "Connector::handleError state=" << state_;
    if (state_ == kConnecting){
        int sockfd = removeAndResetChannel();
        int err = sockets::getSocketError(sockfd);
        LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
        retry(sockfd);
    }
}

void Connector::retry(int sockfd){
    sockets::close(sockfd);
    setState(kDisconnected);
    if (connect_){
        LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
                    << " in " << retryDelayMs_ << " milliseconds. ";
        loop_->runAfter(retryDelayMs_/1000.0, // 稍后重试
                        std::bind(&Connector::startInLoop, shared_from_this()));
        retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);  // 超时加倍
    }else{
        LOG_DEBUG << "do not connect";
    }
}

TcpClient的实现

提供的接口:

class TcpClient : noncopyable
{
public:
    // TcpClient(EventLoop* loop);
    // TcpClient(EventLoop* loop, const string& host, uint16_t port);
    TcpClient(EventLoop* loop,
            const InetAddress& serverAddr,
            const string& nameArg);
    ~TcpClient();  // force out-line dtor, for std::unique_ptr members.

    void connect();
    void disconnect();
    void stop();

    TcpConnectionPtr connection() const
    {
    MutexLockGuard lock(mutex_);
    return connection_;
    }

    EventLoop* getLoop() const { return loop_; }
    bool retry() const { return retry_; }
    void enableRetry() { retry_ = true; }

    const string& name() const
    { return name_; }

    /// Set connection callback.
    /// Not thread safe.
    void setConnectionCallback(ConnectionCallback cb)
    { connectionCallback_ = std::move(cb); }

    /// Set message callback.
    /// Not thread safe.
    void setMessageCallback(MessageCallback cb)
    { messageCallback_ = std::move(cb); }

    /// Set write complete callback.
    /// Not thread safe.
    void setWriteCompleteCallback(WriteCompleteCallback cb)
    { writeCompleteCallback_ = std::move(cb); }

private:
    /// Not thread safe, but in loop
    void newConnection(int sockfd);
    /// Not thread safe, but in loop
    void removeConnection(const TcpConnectionPtr& conn);

    EventLoop* loop_; // 运行在那个loop
    ConnectorPtr connector_; // avoid revealing Connector // 连接器
    const string name_; // TcpClient名
    ConnectionCallback connectionCallback_;   // 连接建立和断开回调
    MessageCallback messageCallback_;   // 可读回调
    WriteCompleteCallback writeCompleteCallback_;   // 写完回调
    bool retry_;   // atomic  重连
    bool connect_; // atomic  // 已经连接?
    // always in loop thread
    int nextConnId_;  // 字面意思
    mutable MutexLock mutex_;
    TcpConnectionPtr connection_ GUARDED_BY(mutex_);  // 连接读写管理器
};

TcpClient核心函数TcpClient::newConnection,该函数会作为连接器的回调,当sockfd连接成功后,该函数被调用,设置必要信息后,为该sockfd产生一个TcpConnection对象,后续该fd的读写,全权交由TcpConnection处理。逻辑比较简单,实现如下:

实现的伪代码:

TcpClient::TcpClient(EventLoop* loop,
                     const InetAddress& serverAddr,
                     const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),
    connector_(new Connector(loop, serverAddr)),
    name_(nameArg),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    retry_(false),
    connect_(true),
    nextConnId_(1){
    
    connector_->setNewConnectionCallback(
        std::bind(&TcpClient::newConnection, this, _1));
    // FIXME setConnectFailedCallback
    LOG_INFO << "TcpClient::TcpClient[" << name_
            << "] - connector " << get_pointer(connector_);
}

void TcpClient::connect(){
    // FIXME: check state
    LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
            << connector_->serverAddress().toIpPort();
    connect_ = true;
    connector_->start();
}

void TcpClient::disconnect(){
    connect_ = false;

    {
        MutexLockGuard lock(mutex_);
        if (connection_){
            connection_->shutdown();
        }
    }
}

void TcpClient::stop(){
    connect_ = false;
    connector_->stop();
}

void TcpClient::newConnection(int sockfd){
    loop_->assertInLoopThread();
    InetAddress peerAddr(sockets::getPeerAddr(sockfd));
    char buf[32];
    snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
    ++nextConnId_;
    string connName = name_ + buf;

    InetAddress localAddr(sockets::getLocalAddr(sockfd));
    // FIXME poll with zero timeout to double confirm the new connection
    // FIXME use make_shared if necessary
    TcpConnectionPtr conn(new TcpConnection(loop_,
                                            connName,
                                            sockfd,
                                            localAddr,
                                            peerAddr));

    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(
        std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
    {
        MutexLockGuard lock(mutex_);
        connection_ = conn;
    }
    conn->connectEstablished(); // 同一loop,可以直接调用
}

void TcpClient::removeConnection(const TcpConnectionPtr& conn){
    loop_->assertInLoopThread();
    assert(loop_ == conn->getLoop());

    {
        MutexLockGuard lock(mutex_);
        assert(connection_ == conn);
        connection_.reset();
    }

    loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
    if (retry_ && connect_){
    LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
                << connector_->serverAddress().toIpPort();
    connector_->restart();
    }
}

细节明细:

疑问

在TcpConnection::handleClose()实现当中,为什么没有调用close,关闭sockfd?也看了一下TcpConnection的析构、TcpConnection::connectDestroyed(),没有一个地方调用了close来关闭sockfd

解答

github上提交了discuss,待更新。。。

总结

Muduo设计的TcpServer和TcpClient代码思想及其统一,一些算法题也是需要这样的抽象思维,所以我认为这也是以后从事it最重要的品质,可以避免很多不必要的bug。。


http://www.niftyadmin.cn/n/5349821.html

相关文章

【线性代数与矩阵论】矩阵的酉相似

矩阵的酉相似&#xff08;合同变换&#xff09; 2023年11月7日 #algebra 文章目录 矩阵的酉相似&#xff08;合同变换&#xff09;1. 酉矩阵2. 酉相似3. Schur分解定理4. 正规矩阵5. 酉相似对角化6. Hermit矩阵&#xff0c;反Hermit矩阵及酉矩阵的特性7. Hermit矩阵的正定性下…

UE4 CustomDepthMobile流程小记

原生UE opaque材质中获取CustomDepth/CustomStencil会报错 在其Compile中调用的函数中没有看到报错逻辑 材质节点的逻辑都没有什么问题&#xff0c;所以看一下报错 在HLSLMaterialTranslator::Translate中 修改之后 mobile流程的不透明材质可以直接获取SceneTexture::customd…

React中文官网已经搬迁了,原网址内容将不再更新

注意1&#xff1a;React中文官网已经搬迁至-React 官方中文文档&#xff0c;原网址内容将不再更新 注意2&#xff1a;React官网已经将React的定义由“用于构建用户界面的 JavaScript 库”更改为“用于构建 Web 和原生交互界面的库”。

Linux:命名管道及其实现原理

文章目录 命名管道指令级命名管道代码级命名管道 本篇要引入的内容是命名管道 命名管道 前面的总结中已经搞定了匿名管道&#xff0c;但是匿名管道有一个很严重的问题&#xff0c;它只允许具有血缘关系的进程进行通信&#xff0c;那如果是两个不相关的进程进行通信&#xff0…

【GitHub项目推荐--十六进制编辑器】【转载】

一款名为 ImHex 的十六进制编辑器获得了 15.3k 的 Star。十六进制编辑器可以让你以十六进制的形式查看或编辑文件的二进制数据&#xff0c;并用较为友好的界面来编辑二进制数据&#xff0c;和常见的十六进制编辑器 GNOME Hex Editor 等不一样&#xff0c;ImHex 功能非常强大&am…

DolphinDB学习(2):增删改查数据表(分布式表的基本操作)

文章目录 创建数据表1. 创建数据表全流程2. 核心&#xff1a;创建table3. 在已有的数据表中追加新的数据 数据表自身的操作1. 查询有哪些数据表2. 删除某张数据表3. 修改数据表的名称 博客里只介绍最常见的分区表&#xff08;createPartitionedTable&#xff09;的创建方法&…

Compose | UI组件(六) | 选择框

文章目录 前言Checkbox 复选框的含义Checkbox 复选框的使用Switch 单选框的含义Switch 单选框的使用Slider 滑竿组件的含义Slider 滑竿组件的使用 总结 前言 随着移动端的技术不断更新迭代&#xff0c;Compose也运用的越来越广泛&#xff0c;很多人都开始学习Compose 本文主要…

2024 高级前端面试题之 ES6 「精选篇」

该内容主要整理关于 ES6 的相关面试题&#xff0c;其他内容面试题请移步至 「最新最全的前端面试题集锦」 查看。 ES6模块精选篇 1. ES5、ES6和ES2015有什么区别?2. babel是什么&#xff0c;有什么作用?3. let有什么用&#xff0c;有了var为什么还要用let&#xff1f;4. 举一…