muduo库 之Poller
poller.h// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H
#include <map>
#include <vector>
#include "muduo/base/Timestamp.h"
#include "muduo/net/EventLoop.h"
// IO复用类,聚合了EventLoop对象
// 实现了对poll、epoll的抽象
namespace muduo
{
namespace net
{
class Channel;
///
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller : noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
virtual ~Poller();
// 将就绪事件添加到活动通道中
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
// 注册或更新通道
virtual void updateChannel(Channel* channel) = 0;
// 移除通道
virtual void removeChannel(Channel* channel) = 0;
// 是否拥有该通道
virtual bool hasChannel(Channel* channel) const;
// new一个IO复用 poll || epoll
static Poller* newDefaultPoller(EventLoop* loop);
// 当前通道只能在创建的线程中运行
void assertInLoopThread() const
{ ownerLoop_->assertInLoopThread(); }
protected:
// fd, Channel
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
};
}// namespace net
}// namespace muduo
#endif// MUDUO_NET_POLLER_H
poller.cpp
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/net/Poller.h"
#include "muduo/net/Channel.h"
using namespace muduo;
using namespace muduo::net;
Poller::Poller(EventLoop* loop)
: ownerLoop_(loop)
{ }
Poller::~Poller() = default;
// 是否拥有该通道
bool Poller::hasChannel(Channel* channel) const
{
assertInLoopThread();
// 获取fd对应的通道
auto it = channels_.find(channel->fd());
return (it != channels_.end() && it->second == channel;)
}
PollPoller.h
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
#ifndef MUDUO_NET_POLLER_POLLPOLLER_H
#define MUDUO_NET_POLLER_POLLPOLLER_H
#include "muduo/net/Poller.h"
#include <vector>
struct pollfd;
// poll()函数的封装
namespace muduo
{
namespace net
{
///
/// IO Multiplexing with poll(2).
///
class PollPoller : public Poller
{
public:
typedef std::vector<struct pollfd> PollFdList; // struct pollfd数组
PollPoller(EventLoop* loop);
~PollPoller() override;
// 初始化poll
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
// 注册或更新通道
void updateChannel(Channel* channel) override;
// 移除通道
void removeChannel(Channel* channel) override;
private:
// 填充活动的通道
void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
private:
PollFdList pollfds_; // pfd数组
};
}// namespace net
}// namespace muduo
#endif// MUDUO_NET_POLLER_POLLPOLLER_H
PollPoller.cpp
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/net/poller/PollPoller.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Types.h"
#include "muduo/net/Channel.h"
#include <assert.h>
#include <errno.h>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;
PollPoller::PollPoller(EventLoop* loop)
: Poller(loop)
{ }
PollPoller::~PollPoller() = default;
// timoutMs:超时时间
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
// XXX pollfds_ shouldn't change
// 注册事件,产生的事件存入pollfd.revent中
// 返回就绪事件
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
int savedErrno = errno;
// 当前的时间戳
Timestamp now(Timestamp::now());
// 填充就绪事件到活动通道列表中
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
// 填充就绪事件到活动通道列表中
fillActiveChannels(numEvents, activeChannels);
}
// 没有就绪事件
else if (numEvents == 0)
LOG_TRACE << " nothing happened";
else
{
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "PollPoller::poll()";
}
}
// 返回事件到来的时间戳
return now;
}
// 填充就绪事件到活动通道列表中
void PollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
for (auto pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
// 表示填充了一个事件要--
--numEvents;
// typedef std::map<int, Channel*> ChannelMap;
// 取出指定的通道
auto ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
// 设置就绪事件
channel->set_revents(pfd->revents);
// 加入活动的通道
activeChannels->push_back(channel);
}
}
}
// 注册或更新通道
void PollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
// 如果通道不在pfd数组中
if (channel->index() < 0)
{ // 注册通道
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
// 加入pfd数组中
pollfds_.push_back(pfd);
// 设置通道在pfd数组中的索引
int idx = static_cast<int>(pollfds_.size()) - 1;
channel->set_index(idx);
// typedef std::map<int, Channel*> ChannelMap;
// 将通道加入通道列表
channels_ = channel;
}
else
{ // 更新通道
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_ == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_;
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
// 设置该事件为不关注
if (channel->isNoneEvent())
// ignore this pollfd
// 设为负数即可 -1是为了优化,因为有一个特殊的数字0
pfd.fd = -channel->fd()-1;
}
}
// 移除通道
void PollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_ == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_; (void)pfd;
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd());
assert(n == 1); (void)n;
// 如果通道是最后一次元素,直接pop
if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
pollfds_.pop_back();
else
{
// 取出最后一个元素的fd
int channelAtEnd = pollfds_.back().fd;
// 交换idx和最后一个元素
iter_swap(pollfds_.begin() + idx, pollfds_.end() - 1);
if (channelAtEnd < 0)
channelAtEnd = -channelAtEnd-1;
// 重新设置一下索引,因为已经交换位置
channels_->set_index(idx);
// 再移除最后一个元素即可,就是channel
pollfds_.pop_back();
}
}
EpollPoller.h
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H
#include "muduo/net/Poller.h"
#include <vector>
struct epoll_event;
// epoll()函数封装
namespace muduo
{
namespace net
{
///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop* loop);
~EPollPoller() override;
// 调用 epoll_wait 将就绪事件添加到活动通道中
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
// 注册或更新通道
void updateChannel(Channel* channel) override;
// 移除通道
void removeChannel(Channel* channel) override;
private:
// epoll可以接收的事件个数
static const int kInitEventListSize = 16;
// epoll_ctrl的操作类型转字符串
static const char* operationToString(int op);
// 将就绪事件添加到活动通道中
void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
// 调用 epoll_ctl 更新事件
void update(int operation, Channel* channel);
typedef std::vector<struct epoll_event> EventList;
int epollfd_; // epoll句柄
EventList events_; // 事件列表
};
}// namespace net
}// namespace muduo
#endif// MUDUO_NET_POLLER_EPOLLPOLLER_H
EpollPoller.cpp
// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/net/poller/EPollPoller.h"
#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN, "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI, "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT, "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP,"epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR, "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP, "epoll uses same flag values as poll");
// 事件状态 未雕琢、已加入、已删除
namespace
{
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;
}
// 构造
EPollPoller::EPollPoller(EventLoop* loop)
:
Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (epollfd_ < 0)
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
// 析沟
EPollPoller::~EPollPoller()
{
// 关闭epoll句柄
::close(epollfd_);
}
// 调用 epoll_wait 将就绪事件添加到活动通道中
// timeoutMs: 等待时间
// activeChannels: 活动通道列表
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
// 打印通道列表中的元素个数
LOG_TRACE << "fd total count " << channels_.size();
// 从双向链表中取就绪事件
int numEvents = ::epoll_wait(
epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
// 当前时间戳
Timestamp now(Timestamp::now());
// 存在就绪事件
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
// 将就绪事件添加到活动通道中
fillActiveChannels(numEvents, activeChannels);
// 如果就绪事件满了,就动态扩容两倍的大小
if (implicit_cast<size_t>(numEvents) == events_.size())
events_.resize(events_.size()*2);
}
else if (numEvents == 0)
LOG_TRACE << "nothing happened";
else
{
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "EPollPoller::poll()";
}
}
// 返回事件到来的时间戳
return now;
}
// 将就绪事件添加到活动通道列表中
void EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
// 断言就绪事件 <= 事件列表大小
assert(implicit_cast<size_t>(numEvents) <= events_.size());
// 添加就绪事件们
for (int i = 0; i < numEvents; ++i)
{
// 取出事件关联的通道
Channel* channel = static_cast<Channel*>(events_.data.ptr);
// 调试模式
#ifndef NDEBUG
// 断言通道列表中存在该通道
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
// 设置就绪事件
channel->set_revents(events_.events);
// 将对应的通道添加到活动通道列表中
activeChannels->push_back(channel);
}
}
// 更新通道
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
// 取通道的索引
const int index = channel->index();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events()
<< " index = " << index;
// 注册事件
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
// 断言不存在该通道
assert(channels_.find(fd) == channels_.end());
// 将该通道添加到通道列表中
channels_ = channel;
}
else // index == kDeleted
{
// 断言存在该通道
assert(channels_.find(fd) != channels_.end());
assert(channels_ == channel);
}
// 设置为已加入状态
channel->set_index(kAdded);
// 将通道加入到epool中
update(EPOLL_CTL_ADD, channel);
}
// 更新事件
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd(); (void)fd;
// 断言存在该通道
assert(channels_.find(fd) != channels_.end());
assert(channels_ == channel);
assert(index == kAdded);
// 移除不关注的事件
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
// 设置为以删除状态
channel->set_index(kDeleted);
}
// 更新事件
else
update(EPOLL_CTL_MOD, channel);
}
}
// 移除事件
void EPollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
int fd = channel->fd();
LOG_TRACE << "fd = " << fd;
// 断言存在该通道
assert(channels_.find(fd) != channels_.end());
assert(channels_ == channel);
// 断言不关注该事件
assert(channel->isNoneEvent());
int index = channel->index();
assert(index == kAdded || index == kDeleted);
// 移除该通道
size_t n = channels_.erase(fd); (void)n;
assert(n == 1);
// 移除该事件
if (index == kAdded)
update(EPOLL_CTL_DEL, channel);
// 设置为未雕琢状态
channel->set_index(kNew);
}
// 调用 epoll_ctl 更新事件
// operation: 操作类型
// channel: 通道,管理事件的注册与更新
void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event {};
// 取出要处理的事件
event.events = channel->events();
// 将通道关联到event中
event.data.ptr = channel;
int fd = channel->fd();
LOG_TRACE << "epoll_ctl op = " << operationToString(operation)
<< " fd = " << fd << " event = { " << channel->eventsToString() << " }";
// 将事件加入到红黑树中
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
else
LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
}
// epoll_ctrl的操作类型转字符串
const char* EPollPoller::operationToString(int op)
{
switch (op)
{
case EPOLL_CTL_ADD:
return "ADD";
case EPOLL_CTL_DEL:
return "DEL";
case EPOLL_CTL_MOD:
return "MOD";
default:
assert(false && "ERROR op");
return "Unknown Operation";
}
}
页:
[1]