古月不傲 发表于 2020-11-26 17:50

c++11 线程池

原文地址:https://www.cnblogs.com/lzpong/p/6397997.html#include <iostream>
#include <vector>
#include <queue>
#include <atomic>
#include <future>
#include <condition_variable>
#include <thread>
#include <functional>
#include <stdexcept>

namespace wind
{
    #defineTHREADPOOL_MAX_NUM 16

    class threadpool
    {
      using Task = std::function<void()>;//定义类型
      std::vector<std::thread> m_pool;   //线程池
      std::queue<Task> m_tasks;            //任务队列
      std::mutex _lock;                  //同步
      std::condition_variable m_cv_task;   //条件阻塞
      std::atomic<bool> m_run;             //线程池是否执行
      std::atomic<int>m_idle_num;      //空闲线程数量
    public:
      inline threadpool(size_t size = 4)
      {
            this->m_run = true;
            this->m_idle_num = 0;
            addThread(size);
      }
      inline ~threadpool()
      {
            m_run = false;
            m_cv_task.notify_all();
            for (auto &thread : this->m_pool)
            {
                if(thread.joinable())
                  thread.join();
            }
      }
    public:
      template<class F, class... Args>
      auto commit(F&& f, Args&&... args) ->std::future<decltype(f(args...))>
      {
            if (!m_run)   
                throw std::runtime_error("commit on ThreadPool is stopped.");

            // 萃取类型 得到函数返回类型
            using RetType = decltype(f(args...));

            // 智能指针
            auto task = std::make_shared<std::packaged_task<RetType()>>(
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );
            
            std::future<RetType> future = task->get_future();
            // 插入一个任务
            {   
                std::lock_guard<std::mutex> lock{ _lock };
                m_tasks.emplace(
                  
                  {
                        (*task)();
                  });
            }
            // 通知任意一个线程 当前有任务到来
            m_cv_task.notify_one();

            return future;
      }
      //空闲线程数量
      int idlCount() { return m_idle_num; }
      //线程数量
      int thrCount() { return m_pool.size(); }
    private:
      //添加指定数量的线程
      void addThread(size_t size)
      {
            // 创建size个线程
            for (int i = 0; i < size; i++)
            {
                m_pool.emplace_back(
               
                {
                  while (m_run)
                  {
                        Task task;
                        {
                            std::unique_lock<std::mutex> lock{ _lock };
                            // 等待任务队列有任务到来
                            m_cv_task.wait(lock, {
                              return !this->m_run || !this->m_tasks.empty();
                            });
                            if (!m_run && m_tasks.empty())
                              return;
                            // 按先进先出从队列取一个 task
                            task = std::move(m_tasks.front());
                            m_tasks.pop();
                        }
                        //执行任务
                        m_idle_num--;
                        task();
                        m_idle_num++;
                  }
                });
                m_idle_num++;
            }
      }
    };
}

void fun1(int slp)
{
   printf("hello, fun1 !%d\n" ,std::this_thread::get_id());
   
   if (slp>0)
   {
      printf(" ======= fun1 sleep %d=========%d\n",slp, std::this_thread::get_id());
      std::this_thread::sleep_for(std::chrono::milliseconds(slp));
   }
}

struct gfun {
   int operator()(int n) {
         printf("%dhello, gfun !%d\n" ,n, std::this_thread::get_id() );
         return 42;
   }
};

class A {
public:
   static int Afun(int n = 0) {   //函数必须是 static 的才能直接使用线程池
         std::cout << n << "hello, Afun !" << std::this_thread::get_id() << std::endl;
         return n;
   }

   static std::string Bfun(int n, std::string str, char c) {
         std::cout << n << "hello, Bfun !"<< str.c_str() <<"" << (int)c <<"" << std::this_thread::get_id() << std::endl;
         return str;
   }
};
int main()
{
   try {
         wind::threadpool executor{16};
         A a;
         std::future<void> ff = executor.commit(fun1,0);
         std::future<int> fg = executor.commit(gfun{},0);
         std::future<int> gg = executor.commit(a.Afun, 9999); //IDE提示错误,但可以编译运行
         std::future<std::string> gh = executor.commit(A::Bfun, 9998,"mult args", 123);
         std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh !" << std::this_thread::get_id() << std::endl; return "hello,fh ret !"; });

         std::cout << " =======sleep ========= " << std::this_thread::get_id() << std::endl;
         std::this_thread::sleep_for(std::chrono::microseconds(900));

         for (int i = 0; i < 5; i++) {
             executor.commit(fun1,i*100 );
         }
         std::cout << " =======commit all ========= " << std::this_thread::get_id()<< " idlsize="<<executor.idlCount() << std::endl;

         std::cout << " =======sleep ========= " << std::this_thread::get_id() << std::endl;
         std::this_thread::sleep_for(std::chrono::seconds(3));

         ff.get(); //调用.get()获取返回值会等待线程执行完,获取返回值
         std::cout << fg.get() << "" << fh.get().c_str()<< "" << std::this_thread::get_id() << std::endl;

         std::cout << " =======sleep ========= " << std::this_thread::get_id() << std::endl;
         std::this_thread::sleep_for(std::chrono::seconds(3));

         std::cout << " =======fun1,55 ========= " << std::this_thread::get_id() << std::endl;
         executor.commit(fun1,55).get();    //调用.get()获取返回值会等待线程执行完

         std::cout << "end... " << std::this_thread::get_id() << std::endl;


         wind::threadpool pool(4);
         std::vector< std::future<int> > results;

         for (int i = 0; i < 16; ++i) {
             results.emplace_back(
               pool.commit( {
                     std::cout << "hello " << i << std::endl;
                     std::this_thread::sleep_for(std::chrono::seconds(1));
                     std::cout << "world " << i << std::endl;
                     return i*i;
               })
             );
         }
         std::cout << " =======commit all2 ========= " << std::this_thread::get_id() << std::endl;

         for (auto && result : results)
             std::cout << result.get() << ' ';
         std::cout << std::endl;
         return 0;
   }
catch (std::exception& e) {
   std::cout << "some unhappy happened..." << std::this_thread::get_id() << e.what() << std::endl;
}

    return 0;
}


c++11 第一次见 看了两天才搞懂 头皮发麻 c++小白可以看下 晕的先大致看一下下面 突然感觉好陌生
#include <iostream>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <vector>
#include <functional>

std::atomic<bool> g_var(false);
std::mutex mtx;
std::condition_variable con;

void fun1()
{
    std::unique_lock<std::mutex> mm(mtx);
    con.wait(mm,
    []
    {
      return g_var == true;
    });
    printf("fun1 条件变量成立\n");
    // 执行下面结果
}

void fun2()
{
    std::unique_lock<std::mutex> mm(mtx);
    con.wait(mm,
    []
    {
      return g_var == false;
    });
    printf("fun2 条件变量成立\n");
    // 执行下面结果
}

void fun3()
{
    std::unique_lock<std::mutex> mm(mtx);
    con.wait(mm,
    []
    {
      return g_var == true;
    });
    printf("fun3 条件变量成立\n");
    // 执行下面结果
}

int main(void)
{
    std::vector<std::thread> vec;

    vec.emplace_back(fun1);
    vec.emplace_back(fun2);
    vec.emplace_back(fun3);

    for (auto &e : vec)
    {
      if (e.joinable())
            e.join();
    }
      
    return 0;
}

c1earlov8 发表于 2020-11-26 19:20

好家伙 在论坛还能学C++
页: [1]
查看完整版本: c++11 线程池