古月不傲 发表于 2021-1-15 22:49

简易线程池

本帖最后由 古月不傲 于 2021-1-16 03:08 编辑

Thread.h
#ifndef __THREAD_H__
#define __THREAD_H__

#include <boost/noncopyable.hpp>
#include <functional>
#include <string>
#include <pthread.h>
#include <memory>

namespace wind
{
        class Thread : public boost::noncopyable
        {
                public:
                        struct ThreadData
                        {
                                ThreadData(const std::function<void()> &cb) :
                                        m_cb(cb)
                                {
                                }
                                // 线程执行回调
                                void run()
                                {
                                        m_cb();
                                }
                                const std::function<void()> m_cb;
                        };

                public:
                        Thread(std::function<void()> cb) :
                                m_cb(std::move(cb)),
                                m_tid(0),
                                m_bStarted(false),
                                m_bJoined(false)
                {
                }
                        void start();
                        void join();
                private:
                        static void *thread_enter(void *arg);
                        void set_name();
                private:
                        pthread_t                                                m_tid;                        // 线程ID
                        bool                                                         m_bStarted;                // 线程正在运行
                        bool                                                         m_bJoined;                // 线程正在等待
                        std::function<void()>                         m_cb;                        // 任务回调
                        std::shared_ptr<ThreadData>                m_threadData;        // 任务参数
        };
}

#endif

Thread.cpp
#include "include/Thread.h"

#include <iostream>
#include <assert.h>
#include <memory>

namespace wind
{
        // 线程启动
        void Thread::start()
        {
                assert(!m_bStarted);
                m_bStarted = true;

                m_threadData = std::make_shared<ThreadData>(m_cb);
                pthread_create(&m_tid, nullptr, thread_enter, static_cast<void *>(m_threadData.get()));
        }

        // 线程等待
        void Thread::join()
        {
                assert(m_bStarted);
                assert(!m_bJoined);

                m_bJoined = true;

                pthread_join(m_tid, nullptr);
        }

        // 线程入口
        void *Thread::thread_enter(void *arg)
        {
                ThreadData *data = static_cast<ThreadData *>(arg);
                data->run();

                return nullptr;
        }
}

Threadpool.h
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__

#include <boost/noncopyable.hpp>
#include "Thread.h"
#include <functional>
#include <vector>
#include <deque>
#include <memory>
#include <string>
#include <mutex>
#include <condition_variable>
#include <atomic>

namespacewind
{
        class Threadpool
        {
                public:
                        ~Threadpool()
                        {
                                if (m_running)
                                {
                                        {
                                                std::lock_guard<std::mutex> locker(m_mutex);
                                                m_running = false;

                                                // 唤醒为了不让wait卡住,否则m_running收不到信号
                                                m_notFullCondition.notify_all();
                                                m_notEmptyCondition.notify_all();
                                        }
                                        for (auto &e : m_threads)
                                                e->join();
                                }
                        }
                        // 创建线程池
                        void init_pool();
                        // 加入一个任务
                        void add_task(std::function<void()> task);
                private:
                        // Thread线程回调的函数
                        void runThread();
                        // 取出一个任务
                        std::function<void()> take_task();
                        bool is_full()
                        {
                                return m_maxnum > 0 && m_tasks.size() >= m_maxnum;
                        }
                private:
                        int                                                                   m_threadNum = 10;                // 线程数量
                        std::vector<std::shared_ptr<Thread>>         m_threads;                                // 线程池
                        std::deque<std::function<void()>>                 m_tasks {};                                // 任务队列
                        std::mutex                                                                 m_mutex;                                // 互斥锁
                        std::condition_variable                                 m_notEmptyCondition;        // 任务队列不空
                        std::condition_variable                                 m_notFullCondition;                // 任务队列不满
                        bool                                                                         m_running = false;                // 线程池是否正在运行
                        int                                                                         m_maxnum = 10;                        // 任务队列最大数量
        };
}

#endif

Threadpool.cpp
#include "include/Threadpool.h"
#include <memory>
#include <functional>
#include <assert.h>
#include <mutex>
#include <exception>

namespace wind
{
        // 创建线程池
        void Threadpool::init_pool()
        {
                assert(m_threads.empty());
                m_threads.reserve(m_threadNum);
                m_running = true;

                for (unsigned i = 0; i < m_threadNum; i++)
                {
                        m_threads.emplace_back(
                                        std::make_shared<Thread>(std::bind(&Threadpool::runThread, this)));
                        m_threads->start();
                }
        }

        // 加入一个任务
        void Threadpool::add_task(std::function<void()> task)
        {
                // 如果线程池是空的那么就执行执行该任务
                if (m_threads.empty())
                        task();
                else
                {
                        std::unique_lock<std::mutex> locker(m_mutex);
                        // 如果任务队列已满,当前线程睡眠并等待条件成立
                        while (is_full() && m_running)
                        {
                                // 等待m_running是为了正确收到退出信号
                                m_notFullCondition.wait(locker, {return is_full() == false || m_running == false;});
                                break;
                        }
                        if (!m_running)
                                return;

                        m_tasks.push_back(std::move(task));
                        m_notEmptyCondition.notify_one();
                }
        }

        // 取出一个任务
        std::function<void()> Threadpool::take_task()
        {
                std::unique_lock<std::mutex> locker(m_mutex);
                // 如果任务队列为空,当前线程睡眠并等待条件成立
                while (m_tasks.empty() && m_running)
                {
                        // 等待m_running是为了正确收到退出信号
                        m_notEmptyCondition.wait(locker, {return m_tasks.empty() == false || m_running == false;});
                        break;
                }
                if (!m_running)
                        return {};

                std::function<void()> func;
                // 取出一个任务
                if (!m_tasks.empty())
                {
                        func = m_tasks.front();
                        m_tasks.pop_front();

                        // 通知队列不满了
                        if (m_maxnum > 0)
                                m_notFullCondition.notify_one();
                }
                return func;
        }

        // Thread线程回调的函数
        void Threadpool::runThread()
        {
                while (m_running)       
                {
                        std::function<void()> task = take_task();
                       
                        if (task)
                                task();
                }
        }
}

ThreadTest.cpp
#include "include/Threadpool.h"
#include <iostream>
#include <unistd.h>
#include <thread>
#include <chrono>

void print()
{
        printf("任务1\n");
}

void ttt(int a)
{
        printf("任务2 a = %d\n", a);
}

void ccc()
{
        printf("任务3\n");
}

int main(void)
{
        wind::Threadpool pool;
        pool.init_pool();

        pool.add_task(print);
        pool.add_task(std::bind(&ttt, 5));
        pool.add_task(ccc);
        pool.add_task(print);
        pool.add_task(std::bind(&ttt, 7));
        pool.add_task(ccc);

        std::this_thread::sleep_for(std::chrono::seconds(1));
       
        return 0;
}

CMakeLists
# 设置CMake支持的版本
cmake_minimum_required(VERSION 3.5)

# 设置变量
set(CXX_FLAG "-g -pthread -rdynamic")

# 设置项目名称
project(project)

# 寻找第三方库
# find_package(Boost 1.75.0 REQUIRED COMPONENTS filesystem system)
#
# if (Boost_FOUND)
#         message("boost found")
# else()
#         message("cannot found boost")
# endif()

# 生成可执行文件
add_executable(project
    Thread.cpp
        Threadpool.cpp
        ThreadTest.cpp
    )

# 包含头文件
target_include_directories(project
    PRIVATE
    ${PROJECT_SOURCE_DIR}/include)

# 如果是g++,添加额外的编译选项-std=c++17 -Wold-style-cast
if (CMAKE_COMPILER_IS_GNUCXX)
    set(CMAKE_CXX_FLAGS "-std=c++17 -Wold-style-cast ${CXX_FLAG}")
endif (CMAKE_COMPILER_IS_GNUCXX)

# 添加第三方库
# target_link_libraries(project PRIVATE ${Boost_LIBRARIES})
# 添加编译选项
add_compile_options(project PRIVATE EX3)

run.sh
#!/bin/bash
# Auto create the build folder and generate binary.

dir="build"

find . -name ${dir} | grep "build"
if [ $? -eq 0 ]; then
    rm -rf ${dir}
fi

mkdir -p build && cd build && cmake .. && make
if [ $? -ne 0 ]; then
    echo "error"
    exit 1
fi



Eaglecad 发表于 2021-1-15 23:36

C++11的线程他不香吗,还可以跨平台

古月不傲 发表于 2021-1-15 23:50

Eaglecad 发表于 2021-1-15 23:36
C++11的线程他不香吗,还可以跨平台
香啊,不习惯。windows中我会用CreateThraed

lingyinGR 发表于 2021-1-16 07:52

感谢分享

wobzhidao 发表于 2021-1-16 08:17

这个软件确实不错的

JuncoJet 发表于 2021-1-16 09:42

这年头多线程的场合越来越少了,高性能的都被异步取代了

好久丿好酒 发表于 2021-1-16 17:34

感谢楼主分享

duier 发表于 2021-1-21 12:04

Eaglecad 发表于 2021-1-15 23:36
C++11的线程他不香吗,还可以跨平台

大佬们好

Eaglecad 发表于 2021-1-21 19:03

duier 发表于 2021-1-21 12:04
大佬们好

阿弥陀佛,这位道友你好

GrabySky 发表于 2022-7-9 18:50

基于boost的线程池?
页: [1]
查看完整版本: 简易线程池