c++轻量线程池基于std::Thread、std::queue、std::mutex和std::condition_variable实现,支持任务提交、原子状态控制与优雅关闭,核心包含线程安全队列、工作线程循环、条件变量等待及析构自动清理。

用 C++ 实现一个轻量、实用的线程池,核心是管理一组复用的线程来异步执行任务,避免频繁创建/销毁线程的开销。下面是一个基于 std::thread、std::queue、std::mutex 和 std::condition_variable 的简洁实现,支持任务提交、自动扩容(可选)、优雅关闭,已在 C++11 及以上环境验证可用。
线程池基础结构设计
线程池需包含:任务队列(线程安全)、工作线程集合、同步原语(互斥锁 + 条件变量)、运行状态控制。所有任务封装为 std::function,支持 Lambda、函数指针、绑定对象等。
关键点:
- 任务队列用
std::queue+std::mutex保护,避免多线程竞争 - 用
std::condition_variable让空闲线程等待新任务,而非忙等 - 设置
m_stop原子标志位,控制线程退出循环 - 析构时调用
stop(),等待所有线程完成当前任务后退出
完整可运行代码示例
以下是一个最小可行线程池(无任务优先级、无动态扩缩容,但稳定高效):
立即学习“C++免费学习笔记(深入)”;
#include #include #include #include #include #include #include #include #include class ThreadPool { public: explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()) : m_stop(false) { for (size_t i = 0; i < threads; ++i) { m_workers.emplace_back([this] { while (true) { std::function task; { std::unique_lock lock(m_queue_mutex); m_condition.wait(lock, [this] { return m_stop.load() || !m_tasks.empty(); }); if (m_stop.load() && m_tasks.empty()) return; task = std::move(m_tasks.front()); m_tasks.pop(); } task(); } }); } } template auto enqueue(F&& f, Args&&... args) -> std::future::type> { using return_type = typename std::result_of::type; auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...) ); std::future res = task->get_future(); { std::unique_lock lock(m_queue_mutex); if (m_stop.load()) { throw std::runtime_error("enqueue on stopped ThreadPool"); } m_tasks.emplace([task]() { (*task)(); }); } m_condition.notify_one(); return res; } ~ThreadPool() { stop(); } private: void stop() { { std::unique_lock lock(m_queue_mutex); m_stop = true; } m_condition.notify_all(); for (std::thread &t : m_workers) { if (t.joinable()) t.join(); } } std::vector m_workers; std::queue> m_tasks; std::mutex m_queue_mutex; std::condition_variable m_condition; std::atomic m_stop; };
高并发使用技巧与注意事项
在真实项目中提升线程池稳定性与性能,注意以下实践:
- 避免任务阻塞过久:单个任务若长时间 I/O 或 sleep,会占用线程资源;可拆分为异步 I/O + 回调,或单独配 I/O 线程池
- 限制队列长度:无界队列可能 OOM,建议添加
if (m_tasks.size() > MAX_QUEUE_SIZE) throw ...防护 - 返回 future 时注意生命周期:不要让
std::future在线程池析构后被访问,否则会std::terminate - 异常处理要在线程内捕获:worker 线程中未捕获异常会导致整个程序终止,可在 task 包装层加 try/catch
- 考虑使用无锁队列(如 moodycamel::ConcurrentQueue):在极端高吞吐场景下减少锁争用
实际调用示例
提交普通函数、lambda、带返回值任务:
int main() { ThreadPool pool(4); // 提交无返回值任务 pool.enqueue([]{ std::cout << "Hello from threadn"; }); // 提交带参数的 lambda pool.enqueue([](int x, const std::string& s) { std::cout << "x=" << x << ", s=" << s << "n"; }, 42, "world"); // 提交有返回值任务,获取 future auto fut = pool.enqueue([] -> int { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 123; }); std::cout << "Result: " << fut.get() << "n"; // 阻塞等待结果 return 0; }
不复杂但容易忽略细节。按这个结构写,线程池就既安全又够用。