古月不傲 发表于 2021-1-22 21:34

muduo库 之Poller

poller.h
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H

#include <map>
#include <vector>

#include "muduo/base/Timestamp.h"

#include "muduo/net/EventLoop.h"

// IO复用类,聚合了EventLoop对象
// 实现了对poll、epoll的抽象
namespace muduo
{
        namespace net
        {
                class Channel;
                ///
                /// Base class for IO Multiplexing
                ///
                /// This class doesn't own the Channel objects.
                class Poller : noncopyable
                {
                        public:
                                typedef std::vector<Channel*> ChannelList;

                                Poller(EventLoop* loop);
                                virtual ~Poller();

                                // 将就绪事件添加到活动通道中
                                virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
                                // 注册或更新通道
                                virtual void updateChannel(Channel* channel) = 0;
                                // 移除通道
                                virtual void removeChannel(Channel* channel) = 0;
                                // 是否拥有该通道
                                virtual bool hasChannel(Channel* channel) const;

                                // new一个IO复用 poll || epoll
                                static Poller* newDefaultPoller(EventLoop* loop);
                                // 当前通道只能在创建的线程中运行
                                void assertInLoopThread() const
                                {        ownerLoop_->assertInLoopThread();        }

                        protected:
                                // fd, Channel
                                typedef std::map<int, Channel*> ChannelMap;
                                ChannelMap channels_;
                        private:
                                EventLoop* ownerLoop_;
                };
        }// namespace net
}// namespace muduo

#endif// MUDUO_NET_POLLER_H

poller.cpp
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/net/Poller.h"

#include "muduo/net/Channel.h"

using namespace muduo;
using namespace muduo::net;

Poller::Poller(EventLoop* loop)
        : ownerLoop_(loop)
{        }

Poller::~Poller() = default;

// 是否拥有该通道
bool Poller::hasChannel(Channel* channel) const
{
        assertInLoopThread();
        // 获取fd对应的通道
        auto it = channels_.find(channel->fd());
        return (it != channels_.end() && it->second == channel;)
}

PollPoller.h
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_POLLER_POLLPOLLER_H
#define MUDUO_NET_POLLER_POLLPOLLER_H

#include "muduo/net/Poller.h"

#include <vector>

struct pollfd;

// poll()函数的封装
namespace muduo
{
        namespace net
        {
                ///
                /// IO Multiplexing with poll(2).
                ///
                class PollPoller : public Poller
                {
                        public:
                                typedef std::vector<struct pollfd> PollFdList;        // struct pollfd数组

                                PollPoller(EventLoop* loop);
                                ~PollPoller() override;

                                // 初始化poll
                                Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
                                // 注册或更新通道
                                void updateChannel(Channel* channel) override;
                                // 移除通道
                                void removeChannel(Channel* channel) override;
                        private:
                                // 填充活动的通道
                                void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
                        private:
                                PollFdList pollfds_;        // pfd数组
                };
        }// namespace net
}// namespace muduo
#endif// MUDUO_NET_POLLER_POLLPOLLER_H

PollPoller.cpp
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/net/poller/PollPoller.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Types.h"
#include "muduo/net/Channel.h"

#include <assert.h>
#include <errno.h>
#include <poll.h>

using namespace muduo;
using namespace muduo::net;

PollPoller::PollPoller(EventLoop* loop)
        : Poller(loop)
{        }

PollPoller::~PollPoller() = default;

// timoutMs:超时时间
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
        // XXX pollfds_ shouldn't change
        // 注册事件,产生的事件存入pollfd.revent中
        // 返回就绪事件
        int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);

        int savedErrno = errno;

        // 当前的时间戳
        Timestamp now(Timestamp::now());
        // 填充就绪事件到活动通道列表中
        if (numEvents > 0)
        {
                LOG_TRACE << numEvents << " events happened";

                // 填充就绪事件到活动通道列表中
                fillActiveChannels(numEvents, activeChannels);
        }
        // 没有就绪事件
        else if (numEvents == 0)
                LOG_TRACE << " nothing happened";
        else
        {
                if (savedErrno != EINTR)
                {
                        errno = savedErrno;
                        LOG_SYSERR << "PollPoller::poll()";
                }
        }
        // 返回事件到来的时间戳
        return now;
}

// 填充就绪事件到活动通道列表中
void PollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
        for (auto pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd)
        {
                if (pfd->revents > 0)
                {
                        // 表示填充了一个事件要--
                        --numEvents;

                        // typedef std::map<int, Channel*> ChannelMap;
                        // 取出指定的通道
                        auto ch = channels_.find(pfd->fd);

                        assert(ch != channels_.end());

                        Channel* channel = ch->second;
                        assert(channel->fd() == pfd->fd);

                        // 设置就绪事件
                        channel->set_revents(pfd->revents);
                        // 加入活动的通道
                        activeChannels->push_back(channel);
                }
        }
}

// 注册或更新通道
void PollPoller::updateChannel(Channel* channel)
{
        Poller::assertInLoopThread();
        LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();

        // 如果通道不在pfd数组中
        if (channel->index() < 0)
        {        // 注册通道
                assert(channels_.find(channel->fd()) == channels_.end());

                struct pollfd pfd;
                pfd.fd = channel->fd();
                pfd.events = static_cast<short>(channel->events());
                pfd.revents = 0;
                // 加入pfd数组中
                pollfds_.push_back(pfd);

                // 设置通道在pfd数组中的索引
                int idx = static_cast<int>(pollfds_.size()) - 1;
                channel->set_index(idx);
                // typedef std::map<int, Channel*> ChannelMap;
                // 将通道加入通道列表
                channels_ = channel;
        }
        else
        {        // 更新通道
                assert(channels_.find(channel->fd()) != channels_.end());
                assert(channels_ == channel);

                int idx = channel->index();
                assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));

                struct pollfd& pfd = pollfds_;
                assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
                pfd.fd = channel->fd();
                pfd.events = static_cast<short>(channel->events());
                pfd.revents = 0;
                // 设置该事件为不关注
                if (channel->isNoneEvent())
                        // ignore this pollfd
                        // 设为负数即可 -1是为了优化,因为有一个特殊的数字0
                        pfd.fd = -channel->fd()-1;
        }
}

// 移除通道
void PollPoller::removeChannel(Channel* channel)
{
        Poller::assertInLoopThread();
        LOG_TRACE << "fd = " << channel->fd();
        assert(channels_.find(channel->fd()) != channels_.end());
        assert(channels_ == channel);
        assert(channel->isNoneEvent());

        int idx = channel->index();
        assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));

        const struct pollfd& pfd = pollfds_; (void)pfd;
        assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());

        size_t n = channels_.erase(channel->fd());
        assert(n == 1); (void)n;

        // 如果通道是最后一次元素,直接pop
        if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
                pollfds_.pop_back();
        else
        {
                // 取出最后一个元素的fd
                int channelAtEnd = pollfds_.back().fd;
                // 交换idx和最后一个元素
                iter_swap(pollfds_.begin() + idx, pollfds_.end() - 1);
                if (channelAtEnd < 0)
                        channelAtEnd = -channelAtEnd-1;

                // 重新设置一下索引,因为已经交换位置
                channels_->set_index(idx);
                // 再移除最后一个元素即可,就是channel
                pollfds_.pop_back();
        }
}


EpollPoller.h
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H

#include "muduo/net/Poller.h"

#include <vector>

struct epoll_event;

// epoll()函数封装
namespace muduo
{
        namespace net
        {
                ///
                /// IO Multiplexing with epoll(4).
                ///
                class EPollPoller : public Poller
                {
                        public:
                                EPollPoller(EventLoop* loop);
                                ~EPollPoller() override;

                                // 调用 epoll_wait 将就绪事件添加到活动通道中
                                Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
                                // 注册或更新通道
                                void updateChannel(Channel* channel) override;
                                // 移除通道
                                void removeChannel(Channel* channel) override;
                        private:
                                // epoll可以接收的事件个数
                                static const int kInitEventListSize = 16;               

                                // epoll_ctrl的操作类型转字符串
                                static const char* operationToString(int op);
                                // 将就绪事件添加到活动通道中
                                void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
                                // 调用 epoll_ctl 更新事件
                                void update(int operation, Channel* channel);

                                typedef std::vector<struct epoll_event> EventList;

                                int epollfd_;                // epoll句柄
                                EventList events_;        // 事件列表
                };
        }// namespace net
}// namespace muduo
#endif// MUDUO_NET_POLLER_EPOLLPOLLER_H

EpollPoller.cpp
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/net/poller/EPollPoller.h"

#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"

#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN,      "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI,      "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT,      "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP,"epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR,      "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP,      "epoll uses same flag values as poll");

// 事件状态        未雕琢、已加入、已删除
namespace
{
        const int kNew = -1;
        const int kAdded = 1;
        const int kDeleted = 2;
}

// 构造
EPollPoller::EPollPoller(EventLoop* loop)
        :
                Poller(loop),
                epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
                events_(kInitEventListSize)
{
        if (epollfd_ < 0)
                LOG_SYSFATAL << "EPollPoller::EPollPoller";
}

// 析沟
EPollPoller::~EPollPoller()
{
        // 关闭epoll句柄
        ::close(epollfd_);
}

// 调用 epoll_wait         将就绪事件添加到活动通道中
// timeoutMs:                等待时间
// activeChannels:        活动通道列表
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
        // 打印通道列表中的元素个数
        LOG_TRACE << "fd total count " << channels_.size();
        // 从双向链表中取就绪事件
        int numEvents = ::epoll_wait(
                        epollfd_,
                        &*events_.begin(),
                        static_cast<int>(events_.size()),
                        timeoutMs);

        int savedErrno = errno;

        // 当前时间戳
        Timestamp now(Timestamp::now());

        // 存在就绪事件
        if (numEvents > 0)
        {
                LOG_TRACE << numEvents << " events happened";

                // 将就绪事件添加到活动通道中
                fillActiveChannels(numEvents, activeChannels);

                // 如果就绪事件满了,就动态扩容两倍的大小
                if (implicit_cast<size_t>(numEvents) == events_.size())
                        events_.resize(events_.size()*2);
        }
        else if (numEvents == 0)
                LOG_TRACE << "nothing happened";
        else
        {
                if (savedErrno != EINTR)
                {
                        errno = savedErrno;
                        LOG_SYSERR << "EPollPoller::poll()";
                }
        }
        // 返回事件到来的时间戳
        return now;
}

// 将就绪事件添加到活动通道列表中
void EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
        // 断言就绪事件 <= 事件列表大小
        assert(implicit_cast<size_t>(numEvents) <= events_.size());

        // 添加就绪事件们
        for (int i = 0; i < numEvents; ++i)
        {
                // 取出事件关联的通道
                Channel* channel = static_cast<Channel*>(events_.data.ptr);
                // 调试模式
#ifndef NDEBUG
                // 断言通道列表中存在该通道
                int fd = channel->fd();
                ChannelMap::const_iterator it = channels_.find(fd);
                assert(it != channels_.end());
                assert(it->second == channel);
#endif
                // 设置就绪事件
                channel->set_revents(events_.events);
                // 将对应的通道添加到活动通道列表中
                activeChannels->push_back(channel);
        }
}

// 更新通道
void EPollPoller::updateChannel(Channel* channel)
{
        Poller::assertInLoopThread();

        // 取通道的索引
        const int index = channel->index();
        LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events()
                << " index = " << index;

        // 注册事件
        if (index == kNew || index == kDeleted)
        {
                // a new one, add with EPOLL_CTL_ADD
                int fd = channel->fd();
                if (index == kNew)
                {
                        // 断言不存在该通道
                        assert(channels_.find(fd) == channels_.end());
                        // 将该通道添加到通道列表中
                        channels_ = channel;
                }
                else // index == kDeleted
                {
                        // 断言存在该通道
                        assert(channels_.find(fd) != channels_.end());
                        assert(channels_ == channel);
                }
                // 设置为已加入状态
                channel->set_index(kAdded);
                // 将通道加入到epool中
                update(EPOLL_CTL_ADD, channel);
        }
        // 更新事件
        else
        {
                // update existing one with EPOLL_CTL_MOD/DEL
                int fd = channel->fd(); (void)fd;

                // 断言存在该通道
                assert(channels_.find(fd) != channels_.end());
                assert(channels_ == channel);
                assert(index == kAdded);

                // 移除不关注的事件
                if (channel->isNoneEvent())
                {
                        update(EPOLL_CTL_DEL, channel);
                        // 设置为以删除状态
                        channel->set_index(kDeleted);
                }
                // 更新事件
                else
                        update(EPOLL_CTL_MOD, channel);
        }
}

// 移除事件
void EPollPoller::removeChannel(Channel* channel)
{
        Poller::assertInLoopThread();

        int fd = channel->fd();
        LOG_TRACE << "fd = " << fd;

        // 断言存在该通道
        assert(channels_.find(fd) != channels_.end());
        assert(channels_ == channel);
        // 断言不关注该事件
        assert(channel->isNoneEvent());

        int index = channel->index();
        assert(index == kAdded || index == kDeleted);

        // 移除该通道
        size_t n = channels_.erase(fd); (void)n;
        assert(n == 1);

        // 移除该事件
        if (index == kAdded)
                update(EPOLL_CTL_DEL, channel);

        // 设置为未雕琢状态
        channel->set_index(kNew);
}

// 调用 epoll_ctl 更新事件
// operation:         操作类型
// channel:                通道,管理事件的注册与更新
void EPollPoller::update(int operation, Channel* channel)
{
        struct epoll_event event {};

        // 取出要处理的事件
        event.events = channel->events();
        // 将通道关联到event中
        event.data.ptr = channel;

        int fd = channel->fd();
        LOG_TRACE << "epoll_ctl op = " << operationToString(operation)
                << " fd = " << fd << " event = { " << channel->eventsToString() << " }";

        // 将事件加入到红黑树中
        if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
        {
                if (operation == EPOLL_CTL_DEL)
                        LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
                else
                        LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
        }
}

// epoll_ctrl的操作类型转字符串
const char* EPollPoller::operationToString(int op)
{
        switch (op)
        {
                case EPOLL_CTL_ADD:
                        return "ADD";
                case EPOLL_CTL_DEL:
                        return "DEL";
                case EPOLL_CTL_MOD:
                        return "MOD";
                default:
                        assert(false && "ERROR op");
                        return "Unknown Operation";
        }
}
页: [1]
查看完整版本: muduo库 之Poller