C++ Thread Pool

最近实在是太忙(懒)了,要学的东西好多 (ง •̀_•́)ง,感觉要向全栈工程师迈进了。这半年里用到了很多技术,客户端用C++开发,Web服务器端用ASP .Net Core开发,测试工具用C#开发,数据库用SQL Server,训练数据和爬虫用Python,可能接下来训练NLP模型得用Tensorflow,JavaScript也加入到待学清单中。。。真让人头秃啊,一股脑的要灌这么多的知识,虽然学的都不太精,但是好歹知道了大概的处理流程。今天就记录下在用C++开发中,线程池的相关操作,算是一种总结吧,以后说不定还用得到。

关于线程池

为什么要用线程池?

最近在做一个语音识别的项目,用的百度的语音识别引擎,奈何是开发阶段(穷),免费的百度语音识别引擎服务器只有区区的5QPS。在进行大规模多线程测试的时候,经常会返回百度服务器访问错误,这是为啥啊?我摸了摸我的小脑袋瓜,发现问题并不简单,我瞅了眼百度语音识别的控制台,看到QPS的峰值达到了16,也就是说,规定1秒只能访问5次的服务器被我访问了16次,那服务器肯定要报错啊。这可咋整啊,老大说了先用免费的配额进行测试,之后再谈价格提高QPS,可我又必须要做多线程的测试呀,于是我就想到了用线程池。我把语音识别的任务放在线程池里,并且规定线程池里线程最多为5个,所以就不会出现1秒访问16次的问题了。

什么是线程池?

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

C++中线程池的实现

线程池的实现借鉴了这个C++线程池开源项目,这篇博文介绍关于C++线程池的实现原理很是详细,我在这里就讲一下用法吧,毕竟会用才是王道,这个C++线程池实现的非常简单,只需要在项目中添加一个.h文件和.cpp文件就行。

CThreadPool.h

#ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP

#include <functional>
#include <future>
#include <queue>

namespace ThreadPool
{
class ThreadPool {
public:
ThreadPool(size_t);
void StartThreadPool();
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
std::vector< std::thread > workers;
// the task queue
std::queue< std::packaged_task<void()> > tasks;

// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
size_t maxThreads;
};

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
#endif
}

CThreadPool.cpp

#include "CThreadPool.hpp"

namespace ThreadPool
{
// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
: maxThreads(threads),stop(false)
{

}

// 启动线程池
void ThreadPool::StartThreadPool()
{
//是否已开启线程任务
if (workers.size() > 0)
{
return;
}

for (size_t i = 0; i<maxThreads; ++i)
workers.emplace_back([this] {
for (;;) {
std::packaged_task<void()> task;

{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}

task();
}
});
}

// the destructor joins all threads
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
}

使用方法

  1. 在代码的最开头定义线程池全局变量,如:ThreadPool::ThreadPool rec_keyword_threadpool(5);
  2. 在初始化阶段启动线程池,如:rec_keyword_threadpool.StartThreadPool();
  3. 在线程池中加入绑定函数,如:std::future<int> result = rec_keyword_threadpool.enqueue(std::bind(&AudioTransform, clientID, accessToken, ref(allText)));
  4. 函数运行完毕后取出运行结果,如:int back_transform_int = result.get();

大概就是这么4个步骤,线程池的实现用到了std::future模板类,利用get()方法可以取得函数运行的结果,并且利用std::bind()方法绑定函数和参数,值得注意的是,如果参数是一个引用类型,一定要用ref()包起来,不然访问不了引用类型的值。

Author: Hongyi Guo
Link: https://guohongyi.com/2020/01/08/C-Thread-Pool/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.