标签搜索

目 录CONTENT

文章目录

IO多路转接之epoll.md

小小城
2021-08-22 / 0 评论 / 0 点赞 / 5 阅读 / 9,319 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-02,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

IO多路转接之epoll

@[toc]

一、epool

是为处理大批量句柄而作了改进的pol

  •  1.相关函数

==a.==

int epoll_create(int size);

创建一个epoll的句柄.自从linux2.6.8之后,size参数是被忽略的.
用完之后, ==必须调用close()关闭==

官方的讲:创建一个epoll的句柄

其实呢,它是创建了一个epoll模型:

 1. 在操作系统内核构建一个红黑树

    节点 : 表示要关心的哪个文件描述符的事件
    key键 :用文件描述符作为key键

 2. 在操作系统内核构建一个回调机制 

    作用:就是减少了操作系统的开销(不用操作系统再去轮询的找就绪事件)
         有这么一个机制告诉我们,我们所关心的文件描述符的时间已经就绪
 3. 在操作系统内核构建一个就绪队列

    如何构建的:有了回调机制,告诉了我们所关心的文件描述符的事件已经就绪
              接下来就是把该文件描述符拷贝到就绪队列中;等我们处理的时候
              就不用轮询的去找就绪事件,而是 从就绪队列的开始找epoll_wait() 
              的返回值(>0,成功的情况下)这么大的一个区间, 
              这段区间就是当前的就绪事件

这三个组在一起的返回值是fd(文件描述符)

==b.==

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
参数作用
epfdepoll_create()的返回值(epoll的句柄)
op表示动作,用三个宏来表示,
fd是需要监听的fd
struct epoll_event *event是告诉内核需要监听什么事

op的三个宏

  • EPOLL_CTL_ADD :注册新的fd到epfd中
  • EPOLL_CTL_MOD :修改已经注册的fd的监听事件
  • EPOLL_CTL_DEL :从epfd中删除一个fd

struct epoll_event的结构:

typedef union epoll_data {
               void        *ptr;
               int          fd;
               uint32_t     u32;
               uint64_t     u64;
           } epoll_data_t;

           struct epoll_event {
               uint32_t     events;      /* Epoll events */
               epoll_data_t data;        /* User data variable */
           };

events可以是下面几个宏:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR :表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将EPOLL设为边缘触发(EdgeTriggered)模式, 这是相对于水平触发(Level Triggered)来说的.
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要 再次把这个socket加入到EPOLL队列里
epoll的事件注册函数:要关心哪个文件描述符的事件

1. 第一个参数是epoll_create()的返回值:一个文件描述符;

2.第二个参数表示动作,三个宏来表示:

 EPOLL_CTL_ADD:注册新的fd到epfd中;

 EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

 EPOLL_CTL_DEL:从epfd中删除一个fd;

3. 第三个参数是需要监听的fd ;

4. 第四个参数是告诉内核需要监听什么事 .

events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这个应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发⽣错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式, 
          这是相对于水平触发(LevelTriggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续  
             监听这个socket的话,需要再次把这个socket加入到EPOLL队列中。

==c.==

int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
参数作用
epfdepoll_create函数的返回值
struct epoll_event *events结构体,告诉内核需要监听什么事
maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞)
返回 :关心事件已经就绪的事件

1. > 0 :满足就绪条件的事件个数
2. 0 : 在规定的时间内没有事件发生(超出timeout设置的时间)
3. -1 :错误 
       原因由errno标识;此时中间三个参数的值变得不可预测。

  •  2.epoll原理

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关

struct eventpoll{
	....
	/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
	struct rb_root rbr;
	/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
	struct list_head rdlist;
	....
};
  • 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件.
  • 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插 入时间效率是lgn,其中n为树的高度).
  • 而所有添加到epoll中的事件都会与设备(网卡)驱动程建立回调关系,也就是说,当响应的事件发生时 会调用这个回调方法.
  • 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中
  • 在epoll中,对于每一个事件,都会建立一个epitem结构体
struct epitem{
	struct rb_node rbn;//红黑树节点
	struct list_head rdllink;//双向链表节点
	struct epoll_filefd ffd; //事件句柄信息
	struct eventpoll *ep; //指向其所属的eventpoll对象
	struct epoll_event event; //期待发生的事件类型
}
  • 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.
  • 如果rdlist不为空,则把发生的事件拷贝到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度 是O(1)

总结一下, epoll的使用过程就是三部曲:

  • 调用epoll_create创建一个epoll句柄;

  • 调用epoll_ctl, 将要监控的文件描述符进行注册;

  • 调用epoll_wait, 等待文件描述符就绪

  •  3 . epoll的优点(和 select 的缺点对应)

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开

  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)

  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.

  • 没有数量限制: 文件描述符数目无上限

  •  4.epoll的工作方式

  • 水平触发(LT)和边缘触发(ET)

假如有这样一个例子:

我们已经把一个tcp socket添加到epoll描述符

这个时候socket的另一端被写入了2KB的数据

调用epoll_wait,并且它会返回. 说明它已经准备好读取操作

然后调用read, 只读取了1KB的数据

继续调用epoll_wait......
  •  水平触发Level Triggered 工作模式
    epoll默认状态下就是LT工作模式.

  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.

  • 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait

  • 仍然会立刻返回并通知socket读事件就绪.

  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.

  • ==支持阻塞读写和非阻塞读写==

  •  边缘触发Edge Triggered工作模式

  • 如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.

  • 当epoll检测到socket上事件就绪时, 必须立刻处理.

  • 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了.

  • 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.

  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.

  • ==只支持非阻塞的读写==

select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET

  •  5.理解ET模式和非阻塞文件描述符

使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工程实践" 上的要求.

  • 假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第二个10k请求
  • 如果服务端写的代码是阻塞式的read, 并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来, 参考 man手册的说明, 可能被信号打断), 剩下的9k数据就会待在缓冲区中
  • 此时由于 epoll 是ET模式, 并不会认为文件描述符读就绪.
  • epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回

但是问题来了.

  • 服务器只读到1k个数据, 要10k读完才会给客户端返回响应数据.
  • 客户端要读到服务器的响应, 才会发送下一个请求
  • 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据

所以, 为了解决上述问题(阻塞read不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区,
保证一定能把完整的请求都读出来

  •  6.epoll的优点还有一个内存映射机制,这样的说法正确吗
  1. 内存映射机制: 内存直接把就绪队列映射到用户态

  2. 但是我觉得这种说法是错误的。

    1. 就绪队列是操作系统在管理
    2. 而操作系统就不会把自己的内部暴露给用户态,如果暴露出去就不安全,
    3. 我们在使用epoll_wait时,会告诉了我们所关心的文件描述符的事件已经就绪,而这个时候有一个回调机制会告诉我们:所关心的哪个文件描述符的事件已经就绪不用操作系统一一去找,减少的操作系统的开销;
    4. 接下来就是把该文件描述符拷贝到就绪队列中;等我们处理的时候就不用轮询的去找就绪事件,而是 从就绪队列的开始找epoll_wait() 的返回值(>0,成功的情况下)这么大的一个区间,这段区间就是当前的就绪事件
    5. 这个过程中,并没有映射,如果有映射的话,再传一个缓冲区,岂不是多此一举

二、基于epoll实现服务器(LT)

#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"

typedef std::function<void (const std::string&, std::string* resp)> Handler;

class Epoll 
{
  public:
    Epoll() 
    {
      epoll_fd_ = epoll_create(10);

    }

    ~Epoll() 
    {
      close(epoll_fd_);

    }

    bool Add(TcpSocket& sock) const 
    {
      int fd = sock.GetFd();
      printf("[Epoll Add] fd = %d\n", fd);

      epoll_event ev;
      ev.data.fd = fd;
      ev.events = EPOLLIN;//设置为ET模式

      int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
      if (ret < 0) 
      {
        perror("epoll_ctl ADD");
        return false;
      }
      return true;

    }

    bool Del(TcpSocket& sock) const 
    {
      int fd = sock.GetFd();
      printf("[Epoll Del] fd = %d\n", fd);

      int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
      if (ret < 0) 
      {
        perror("epoll_ctl DEL");
        return false;
      }
      return true;

    }

    bool Wait(std::vector<TcpSocket>* output) const 
    {
      output->clear();

      epoll_event events[1000];
      int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
      if (nfds < 0) 
      {
        perror("epoll_wait");
        return false;

      }

      // [注意!] 此处必须是循环到 nfds, 不能多循环

      for (int i = 0; i < nfds; ++i) 
      {
        TcpSocket sock(events[i].data.fd);
        output->push_back(sock);
      }

      return true;
    }

  private:
    int epoll_fd_;
};

class TcpEpollServer 
{
  public:
    TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) 
    {}

    bool Start(Handler handler) 
    {
      //1. 创建 socket
      TcpSocket listen_sock;
      CHECK_RET(listen_sock.Socket());

      // 2. 绑定
      CHECK_RET(listen_sock.Bind(ip_, port_));

      // 3. 监听
      CHECK_RET(listen_sock.Listen(5));

      // 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
      Epoll epoll;
      epoll.Add(listen_sock);

      // 5. 进入事件循环
      for (;;) 
      {
        // 6. 进行 epoll_wait
        std::vector<TcpSocket> output;
        if (!epoll.Wait(&output)) 
        {
          continue;
        }

        // 7. 根据就绪的文件描述符的种类决定如何处理
        for (size_t i = 0; i < output.size(); ++i) 
        {
          if (output[i].GetFd() == listen_sock.GetFd()) 
          {
            // 如果是 listen_sock, 就调用 accept
            TcpSocket new_sock;
            listen_sock.Accept(&new_sock);
            epoll.Add(new_sock);
          }
          else 
          {
            // 如果是 new_sock, 就进行一次读写
            std::string req, resp;
            bool ret = output[i].Recv(&req);
            if (!ret) 
            {
              // [注意!!] 需要把不用的 socket 关闭
              // 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
              epoll.Del(output[i]);
              output[i].Close();
              continue;
            }

            handler(req, &resp);

            output[i].Send(resp);

          } // end for
        } // end for (;;)
      }
      return true;
    }
  private:
    std::string ip_;
    uint16_t port_;
};

在这里插入图片描述

在这里插入图片描述

三、基于epoll实现服务器(LT)

#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"

typedef std::function<void (const std::string&, std::string* resp)> Handler;


//如果需要设置为非阻塞方式,需要在tcp_socket.hpp中提供非阻塞方式的recv和send接口
class Epoll 
{
  public:
    Epoll() 
    {
      epoll_fd_ = epoll_create(10);
    }

    ~Epoll() 
    {
      close(epoll_fd_);
    }

    bool Add(TcpSocket& sock, bool epoll_et = false) const 
    {
      int fd = sock.GetFd();
      printf("[Epoll Add] fd = %d\n", fd);

      epoll_event ev;
      ev.data.fd = fd;
      if (epoll_et)//如果为true,说明要设为非阻塞方式 
      {
        ev.events = EPOLLIN | EPOLLET;
      } 
      else 
      {
        ev.events = EPOLLIN;
      }

      int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
      if (ret < 0) 
      {
        perror("epoll_ctl ADD");
        return false;
      }

      return true;
    }

    bool Del(TcpSocket& sock) const 
    {
      int fd = sock.GetFd();
      printf("[Epoll Del] fd = %d\n", fd);

      int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
      if (ret < 0) 
      {
        perror("epoll_ctl DEL");
        return false;
      }

      return true;
    }

    bool Wait(std::vector<TcpSocket>* output) const 
    {
      output->clear();

      epoll_event events[1000];
      int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
      if (nfds < 0) 
      {
        perror("epoll_wait");
        return false;
      }

      // [注意!] 此处必须是循环到 nfds, 不能多循环
      for (int i = 0; i < nfds; ++i) 
      {
        TcpSocket sock(events[i].data.fd);
        output->push_back(sock);
      }

      return true;
    }

  private:
    int epoll_fd_;
};

class TcpEpollServer 
{
  public:
    TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) 
    {}

    bool Start(Handler handler) 
    {
      // 1. 创建 socket
      TcpSocket listen_sock;
      CHECK_RET(listen_sock.Socket());

      // 2. 绑定
      CHECK_RET(listen_sock.Bind(ip_, port_));

      // 3. 监听
      CHECK_RET(listen_sock.Listen(5));

      // 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
      Epoll epoll;
      epoll.Add(listen_sock);

      // 5. 进入事件循环
      for (;;) 
      {
        // 6. 进行 epoll_wait
        std::vector<TcpSocket> output;
        if (!epoll.Wait(&output)) 
        {
          continue;
        }

        // 7. 根据就绪的文件描述符的种类决定如何处理
        for (size_t i = 0; i < output.size(); ++i) 
        {
          if (output[i].GetFd() == listen_sock.GetFd()) 
          {
            // 如果是 listen_sock, 就调用 accept
            TcpSocket new_sock;
            listen_sock.Accept(&new_sock);
            epoll.Add(new_sock, true);
          } 
          else 
          {
            // 如果是 new_sock, 就进行一次读写
            std::string req, resp;
            bool ret = output[i].RecvNoBlock(&req);
            if (!ret) 
            {
              // [注意!!] 需要把不用的 socket 关闭
              // 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
              epoll.Del(output[i]);
              output[i].Close();
              continue;
            }

            handler(req, &resp);

            output[i].SendNoBlock(resp);
            printf("[client %d] req: %s, resp: %s\n", output[i].GetFd(),
                req.c_str(), resp.c_str());
          } // end for
        } // end for (;;)
      }
      return true;
    }

  private:
    std::string ip_;
    uint16_t port_;
};

在这里插入图片描述

在这里插入图片描述

0

评论区