Administrator
发布于 2026-01-06 / 5 阅读
0
0

IO多路复用-epoll

完整C++实现

#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <vector>
#include <cerrno>
#include <csignal>

class EpollServer {
private:
    int server_fd_;
    int epoll_fd_;
    int port_;
    bool running_;
    static const int MAX_EVENTS = 1024;
    static const int BUFFER_SIZE = 4096;

    // 设置文件描述符为非阻塞模式
    int set_nonblocking(int fd) {
        int flags = fcntl(fd, F_GETFL, 0);
        if (flags == -1) return -1;
        return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    }

    // 添加文件描述符到epoll
    void add_to_epoll(int fd, uint32_t events) {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = fd;
        
        if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) == -1) {
            std::cerr << "Failed to add fd to epoll: " << strerror(errno) << std::endl;
            close(fd);
        }
    }

    // 从epoll移除文件描述符
    void remove_from_epoll(int fd) {
        epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
    }

    // 处理客户端连接
    void handle_accept() {
        struct sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        
        int client_fd = accept(server_fd_, (struct sockaddr*)&client_addr, &client_len);
        if (client_fd == -1) {
            std::cerr << "Accept failed: " << strerror(errno) << std::endl;
            return;
        }

        // 获取客户端IP和端口
        char client_ip[INET_ADDRSTRLEN];
        inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, sizeof(client_ip));
        int client_port = ntohs(client_addr.sin_port);
        
        std::cout << "New connection from " << client_ip << ":" << client_port 
                  << " (fd: " << client_fd << ")" << std::endl;

        // 设置为非阻塞模式
        if (set_nonblocking(client_fd) == -1) {
            std::cerr << "Failed to set non-blocking: " << strerror(errno) << std::endl;
            close(client_fd);
            return;
        }

        // 添加到epoll监控,关注读事件和断开连接事件
        add_to_epoll(client_fd, EPOLLIN | EPOLLRDHUP | EPOLLET); // ET边缘触发模式
    }

    // 处理客户端数据
    void handle_client_data(int client_fd) {
        char buffer[BUFFER_SIZE];
        ssize_t bytes_read;
        
        // 使用循环读取所有数据(ET模式需要这样处理)
        while (true) {
            bytes_read = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);
            
            if (bytes_read > 0) {
                buffer[bytes_read] = '\0';
                std::cout << "Received from fd " << client_fd << ": " << buffer;
                
                // 回显数据给客户端
                send(client_fd, buffer, bytes_read, 0);
            } 
            else if (bytes_read == 0) {
                // 客户端正常关闭连接
                std::cout << "Client fd " << client_fd << " disconnected" << std::endl;
                close(client_fd);
                remove_from_epoll(client_fd);
                break;
            } 
            else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    // 数据已读取完毕
                    break;
                } else {
                    // 发生错误
                    std::cerr << "Recv error on fd " << client_fd << ": " 
                              << strerror(errno) << std::endl;
                    close(client_fd);
                    remove_from_epoll(client_fd);
                    break;
                }
            }
        }
    }

public:
    EpollServer(int port) : port_(port), running_(false) {
        server_fd_ = -1;
        epoll_fd_ = -1;
    }

    ~EpollServer() {
        stop();
    }

    // 初始化服务器
    bool init() {
        // 创建socket
        server_fd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (server_fd_ == -1) {
            std::cerr << "Failed to create socket: " << strerror(errno) << std::endl;
            return false;
        }

        // 设置socket选项,允许地址复用
        int opt = 1;
        if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
            std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
            return false;
        }

        // 绑定地址
        struct sockaddr_in server_addr;
        memset(&server_addr, 0, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = INADDR_ANY;
        server_addr.sin_port = htons(port_);

        if (bind(server_fd_, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
            std::cerr << "Failed to bind: " << strerror(errno) << std::endl;
            return false;
        }

        // 开始监听
        if (listen(server_fd_, 128) == -1) {
            std::cerr << "Failed to listen: " << strerror(errno) << std::endl;
            return false;
        }

        // 设置为非阻塞模式
        if (set_nonblocking(server_fd_) == -1) {
            std::cerr << "Failed to set non-blocking: " << strerror(errno) << std::endl;
            return false;
        }

        // 创建epoll实例
        epoll_fd_ = epoll_create1(0);
        if (epoll_fd_ == -1) {
            std::cerr << "Failed to create epoll: " << strerror(errno) << std::endl;
            return false;
        }

        // 将服务器socket添加到epoll监控
        add_to_epoll(server_fd_, EPOLLIN);

        std::cout << "Server initialized on port " << port_ << std::endl;
        return true;
    }

    // 运行服务器
    void run() {
        if (server_fd_ == -1 || epoll_fd_ == -1) {
            std::cerr << "Server not initialized" << std::endl;
            return;
        }

        running_ = true;
        std::cout << "Server started. Waiting for connections..." << std::endl;

        struct epoll_event events[MAX_EVENTS];
        
        while (running_) {
            // 等待事件发生,超时时间1000ms
            int num_events = epoll_wait(epoll_fd_, events, MAX_EVENTS, 1000);
            
            if (num_events == -1) {
                if (errno == EINTR) {
                    continue; // 被信号中断
                }
                std::cerr << "epoll_wait error: " << strerror(errno) << std::endl;
                break;
            }

            // 处理所有就绪的事件
            for (int i = 0; i < num_events; ++i) {
                int fd = events[i].data.fd;
                
                // 处理新连接
                if (fd == server_fd_) {
                    handle_accept();
                }
                // 处理客户端数据或断开连接
                else {
                    // 检查连接是否断开
                    if (events[i].events & EPOLLRDHUP || events[i].events & EPOLLHUP) {
                        std::cout << "Client fd " << fd << " disconnected" << std::endl;
                        close(fd);
                        remove_from_epoll(fd);
                    }
                    // 处理可读事件
                    else if (events[i].events & EPOLLIN) {
                        handle_client_data(fd);
                    }
                }
            }
        }
    }

    // 停止服务器
    void stop() {
        running_ = false;
        
        if (server_fd_ != -1) {
            close(server_fd_);
            server_fd_ = -1;
        }
        
        if (epoll_fd_ != -1) {
            close(epoll_fd_);
            epoll_fd_ = -1;
        }
        
        std::cout << "Server stopped." << std::endl;
    }
};

// 信号处理函数
void signal_handler(int sig) {
    std::cout << "\nReceived signal " << sig << ", shutting down..." << std::endl;
}

int main(int argc, char* argv[]) {
    // 设置端口号,默认8888
    int port = 8888;
    if (argc > 1) {
        port = atoi(argv[1]);
    }

    // 注册信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    // 创建并运行服务器
    EpollServer server(port);
    
    if (!server.init()) {
        std::cerr << "Failed to initialize server" << std::endl;
        return 1;
    }

    server.run();

    return 0;
}


评论