本帖最后由 古月不傲 于 2021-1-16 03:08 编辑
Thread.h
[C++] 纯文本查看 复制代码 #ifndef __THREAD_H__
#define __THREAD_H__
#include <boost/noncopyable.hpp>
#include <functional>
#include <string>
#include <pthread.h>
#include <memory>
namespace wind
{
class Thread : public boost::noncopyable
{
public:
struct ThreadData
{
ThreadData(const std::function<void()> &cb) :
m_cb(cb)
{
}
// 线程执行回调
void run()
{
m_cb();
}
const std::function<void()> m_cb;
};
public:
Thread(std::function<void()> cb) :
m_cb(std::move(cb)),
m_tid(0),
m_bStarted(false),
m_bJoined(false)
{
}
void start();
void join();
private:
static void *thread_enter(void *arg);
void set_name();
private:
pthread_t m_tid; // 线程ID
bool m_bStarted; // 线程正在运行
bool m_bJoined; // 线程正在等待
std::function<void()> m_cb; // 任务回调
std::shared_ptr<ThreadData> m_threadData; // 任务参数
};
}
#endif
Thread.cpp
[C++] 纯文本查看 复制代码 #include "include/Thread.h"
#include <iostream>
#include <assert.h>
#include <memory>
namespace wind
{
// 线程启动
void Thread::start()
{
assert(!m_bStarted);
m_bStarted = true;
m_threadData = std::make_shared<ThreadData>(m_cb);
pthread_create(&m_tid, nullptr, thread_enter, static_cast<void *>(m_threadData.get()));
}
// 线程等待
void Thread::join()
{
assert(m_bStarted);
assert(!m_bJoined);
m_bJoined = true;
pthread_join(m_tid, nullptr);
}
// 线程入口
void *Thread::thread_enter(void *arg)
{
ThreadData *data = static_cast<ThreadData *>(arg);
data->run();
return nullptr;
}
}
Threadpool.h
[C++] 纯文本查看 复制代码 #ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include <boost/noncopyable.hpp>
#include "Thread.h"
#include <functional>
#include <vector>
#include <deque>
#include <memory>
#include <string>
#include <mutex>
#include <condition_variable>
#include <atomic>
namespace wind
{
class Threadpool
{
public:
~Threadpool()
{
if (m_running)
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_running = false;
// 唤醒为了不让wait卡住,否则m_running收不到信号
m_notFullCondition.notify_all();
m_notEmptyCondition.notify_all();
}
for (auto &e : m_threads)
e->join();
}
}
// 创建线程池
void init_pool();
// 加入一个任务
void add_task(std::function<void()> task);
private:
// Thread线程回调的函数
void runThread();
// 取出一个任务
std::function<void()> take_task();
bool is_full()
{
return m_maxnum > 0 && m_tasks.size() >= m_maxnum;
}
private:
int m_threadNum = 10; // 线程数量
std::vector<std::shared_ptr<Thread>> m_threads; // 线程池
std::deque<std::function<void()>> m_tasks {}; // 任务队列
std::mutex m_mutex; // 互斥锁
std::condition_variable m_notEmptyCondition; // 任务队列不空
std::condition_variable m_notFullCondition; // 任务队列不满
bool m_running = false; // 线程池是否正在运行
int m_maxnum = 10; // 任务队列最大数量
};
}
#endif
Threadpool.cpp
[C++] 纯文本查看 复制代码 #include "include/Threadpool.h"
#include <memory>
#include <functional>
#include <assert.h>
#include <mutex>
#include <exception>
namespace wind
{
// 创建线程池
void Threadpool::init_pool()
{
assert(m_threads.empty());
m_threads.reserve(m_threadNum);
m_running = true;
for (unsigned i = 0; i < m_threadNum; i++)
{
m_threads.emplace_back(
std::make_shared<Thread>(std::bind(&Threadpool::runThread, this)));
m_threads[i]->start();
}
}
// 加入一个任务
void Threadpool::add_task(std::function<void()> task)
{
// 如果线程池是空的那么就执行执行该任务
if (m_threads.empty())
task();
else
{
std::unique_lock<std::mutex> locker(m_mutex);
// 如果任务队列已满,当前线程睡眠并等待条件成立
while (is_full() && m_running)
{
// 等待m_running是为了正确收到退出信号
m_notFullCondition.wait(locker, [this]{return is_full() == false || m_running == false;});
break;
}
if (!m_running)
return;
m_tasks.push_back(std::move(task));
m_notEmptyCondition.notify_one();
}
}
// 取出一个任务
std::function<void()> Threadpool::take_task()
{
std::unique_lock<std::mutex> locker(m_mutex);
// 如果任务队列为空,当前线程睡眠并等待条件成立
while (m_tasks.empty() && m_running)
{
// 等待m_running是为了正确收到退出信号
m_notEmptyCondition.wait(locker, [this]{return m_tasks.empty() == false || m_running == false;});
break;
}
if (!m_running)
return {};
std::function<void()> func;
// 取出一个任务
if (!m_tasks.empty())
{
func = m_tasks.front();
m_tasks.pop_front();
// 通知队列不满了
if (m_maxnum > 0)
m_notFullCondition.notify_one();
}
return func;
}
// Thread线程回调的函数
void Threadpool::runThread()
{
while (m_running)
{
std::function<void()> task = take_task();
if (task)
task();
}
}
}
ThreadTest.cpp
[C++] 纯文本查看 复制代码 #include "include/Threadpool.h"
#include <iostream>
#include <unistd.h>
#include <thread>
#include <chrono>
void print()
{
printf("任务1\n");
}
void ttt(int a)
{
printf("任务2 a = %d\n", a);
}
void ccc()
{
printf("任务3\n");
}
int main(void)
{
wind::Threadpool pool;
pool.init_pool();
pool.add_task(print);
pool.add_task(std::bind(&ttt, 5));
pool.add_task(ccc);
pool.add_task(print);
pool.add_task(std::bind(&ttt, 7));
pool.add_task(ccc);
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
CMakeLists
[C++] 纯文本查看 复制代码 # 设置CMake支持的版本
cmake_minimum_required(VERSION 3.5)
# 设置变量
set(CXX_FLAG "-g -pthread -rdynamic")
# 设置项目名称
project(project)
# 寻找第三方库
# find_package(Boost 1.75.0 REQUIRED COMPONENTS filesystem system)
#
# if (Boost_FOUND)
# message("boost found")
# else()
# message("cannot found boost")
# endif()
# 生成可执行文件
add_executable(project
Thread.cpp
Threadpool.cpp
ThreadTest.cpp
)
# 包含头文件
target_include_directories(project
PRIVATE
${PROJECT_SOURCE_DIR}/include)
# 如果是g++,添加额外的编译选项-std=c++17 -Wold-style-cast
if (CMAKE_COMPILER_IS_GNUCXX)
set(CMAKE_CXX_FLAGS "-std=c++17 -Wold-style-cast ${CXX_FLAG}")
endif (CMAKE_COMPILER_IS_GNUCXX)
# 添加第三方库
# target_link_libraries(project PRIVATE ${Boost_LIBRARIES})
# 添加编译选项
add_compile_options(project PRIVATE EX3)
run.sh
[C++] 纯文本查看 复制代码 #!/bin/bash
# Auto create the build folder and generate binary.
dir="build"
find . -name ${dir} | grep "build"
if [ $? -eq 0 ]; then
rm -rf ${dir}
fi
mkdir -p build && cd build && cmake .. && make
if [ $? -ne 0 ]; then
echo "error"
exit 1
fi
|