首页云计算 正文

Linux C++高并发多线程服务器实战项目

2024-12-09 3 0条评论

Linux C++ 高并发多线程服务器实战项目

在实际开发中,Linux下的高并发服务器是非常具有挑战性和实用价值的应用场景之一。结合多线程编程和网络编程知识,可以构建一个高效稳定的高并发服务器。在本项目中,我们将通过Linux C++实现一个高并发多线程服务器,具体功能包括:

  1. 多线程并发处理
  2. 使用线程池优化资源分配
  3. 基于 epoll 的IO多路复用技术
  4. 客户端连接的管理和处理
  5. 数据的并发读写和同步控制

一、项目概述

本项目的目标是构建一个能够同时处理多个客户端请求的高并发服务器。我们将利用Linux系统的 epoll 进行I/O多路复用管理大量的客户端连接,使用C++标准库中的 std::thread 创建线程,并通过线程池来管理这些线程。这样可以确保系统在高并发情况下保持较高的性能和资源利用率。

二、主要技术点

技术点 说明
epoll 高效的IO多路复用机制,用于管理大量连接
多线程 使用 std::thread进行多线程并发处理
线程池 线程池用于管理线程,减少线程的频繁创建和销毁成本
互斥锁与条件变量 用于线程间的数据同步,保证线程安全
非阻塞IO 通过设置非阻塞套接字提高服务器的处理性能

三、项目架构设计

3.1 服务器核心结构

  1. 监听套接字:服务器使用 socket 创建套接字,绑定地址和端口,通过 listen() 函数监听客户端连接。
  2. epoll事件循环:使用 epoll 管理大量的客户端连接,监听套接字的读写事件,通过非阻塞的方式处理连接。
  3. 线程池处理任务:当有新的客户端请求时,将任务提交给线程池中的空闲线程进行处理。线程池可以提高并发处理能力,避免频繁创建销毁线程带来的开销。
  4. 客户端管理:服务器需要管理多个客户端连接,包括接收数据、发送数据、断开连接等。

3.2 数据处理流程

  1. 客户端连接:通过 accept() 函数接受新的客户端连接,生成新的套接字。
  2. 数据读取:客户端发送的数据通过 recv() 函数读取,服务器进行相应处理。
  3. 数据发送:服务器根据客户端请求处理完成后,通过 send() 函数发送响应数据。
  4. 连接断开:检测到客户端关闭连接,服务器将其从 epoll 中移除并关闭套接字。

四、具体代码实现

4.1 服务器启动与epoll设置

首先,初始化服务器套接字,并设置 epoll,然后进入事件循环。

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <thread>
#include <vector>
#include <mutex>

#define MAX_EVENTS 1024
#define BUFFER_SIZE 4096

int setNonBlocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
    return 0;
}

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8080);

    bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    listen(listen_fd, 5);

    int epoll_fd = epoll_create(1);
    struct epoll_event ev, events[MAX_EVENTS];
    ev.events = EPOLLIN;
    ev.data.fd = listen_fd;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);

    while (true) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == listen_fd) {
                // 接受新的客户端连接
                struct sockaddr_in client_addr;
                socklen_t client_len = sizeof(client_addr);
                int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_len);
                setNonBlocking(client_fd);

                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = client_fd;
                epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
            } else {
                // 处理客户端请求
                char buffer[BUFFER_SIZE];
                int client_fd = events[i].data.fd;
                int n = read(client_fd, buffer, BUFFER_SIZE);
                if (n <= 0) {
                    close(client_fd);
                } else {
                    // 将数据回显给客户端
                    write(client_fd, buffer, n);
                }
            }
        }
    }

    close(listen_fd);
    return 0;
}

代码解释

  • socket(AF_INET, SOCK_STREAM, 0):创建一个TCP套接字。
  • bind():绑定套接字到服务器地址和端口8080。
  • listen():让套接字进入监听状态,等待客户端连接。
  • epoll_create():创建一个epoll实例。
  • epoll_ctl():将监听套接字加入epoll实例中。
  • epoll_wait():等待事件的发生,处理I/O。
  • accept():接受客户端连接并设置为非阻塞。
  • read()/write():读取和写入客户端数据。

4.2 线程池实现

线程池用于管理多个工作线程,避免频繁的创建和销毁线程。线程池接受来自 epoll 的事件,分发给线程执行。

#include <queue>
#include <functional>
#include <condition_variable>

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    void enqueue(std::function<void()> task);

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;

    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop;
};

ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        workers.emplace_back([this] {
            while (true) {
                std::function<void()> task;

                {
                    std::unique_lock<std::mutex> lock(this->queueMutex);
                    this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                    if (this->stop && this->tasks.empty())
                        return;
                    task = std::move(this->tasks.front());
                    this->tasks.pop();
                }

                task();
            }
        });
    }
}

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread &worker : workers)
        worker.join();
}

void ThreadPool::enqueue(std::function<void()> task) {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        tasks.emplace(task);
    }
    condition.notify_one();
}

代码解释

  • ThreadPool:线程池类,管理线程的创建和销毁。
  • enqueue():将新的任务加入任务队列,并唤醒工作线程执行任务。
  • condition_variable:用于在线程间进行同步,防止任务队列被多个线程同时操作。

4.3 集成线程池

在服务器中,将从 epoll 获取到的事件任务分配给线程池处理。

ThreadPool pool(4);  // 创建一个包含4个线程的线程池

while (true) {
    int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
    for (int i = 0; i < nfds; i++) {
        if (events[i].data.fd == listen_fd) {
            // 接受新的客户端连接
            struct sockaddr_in client_addr;
            socklen_t client_len = sizeof(client_addr);
            int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_len);
            setNonBlocking(client_fd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = client_fd;
            epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
        } else {
            // 将处理客户端的请求分配到线程池
            int client_fd = events[i].data.fd;
            pool.enqueue([client_fd]() {
                char buffer[BUFFER_SIZE];
                int n = read(client_fd, buffer, BUFFER_SIZE);
                if (n <= 0) {
                    close(client_fd);
                } else {
                    write(client_fd

, buffer, n);
                }
            });
        }
    }
}

代码解释

  • 使用 ThreadPoolenqueue() 函数将处理客户端的任务分配给线程池的线程去执行,提升并发处理能力。

五、性能优化与注意事项

  1. 减少锁竞争:在多线程环境中,锁的使用是不可避免的,但应尽量减少锁的粒度和竞争,使用局部变量和线程局部存储可以避免不必要的锁。
  2. 合理的线程数:线程池的线程数应根据服务器硬件资源配置,通常是CPU核心数的1到2倍,以充分利用多核CPU的并发能力。
  3. 非阻塞IO:在高并发场景下,使用非阻塞IO和 epoll 这种高效的多路复用机制,可以减少系统调用开销,提高性能。

六、总结

本项目通过C++结合Linux的 epoll 和多线程机制实现了一个高并发服务器。使用 epoll 来管理大量的客户端连接,通过线程池并发处理客户端请求,有效提高了服务器的并发性能和资源利用率。这个项目展示了高性能服务器开发的关键技术点,适合用于高并发、低延迟的网络应用场景。

文章版权及转载声明

本文作者:admin 网址:http://news.edns.com/post/183297.html 发布于 2024-12-09
文章转载或复制请以超链接形式并注明出处。

取消
微信二维码
微信二维码
支付宝二维码