C++如何实现带依赖关系的任务图执行?(DAG调度引擎)

2次阅读

用 std::shared_ptr 管理强依赖(如上游结果),std::weak_ptr 管理结构依赖(如前驱边),避免循环引用;拓扑排序需显式计算入度并正确初始化零入度队列;并发读写结果时优先用 std::shared_mutex 提升吞吐;任务执行应走线程池+std::packaged_task,避免频繁创建 std::Thread

C++如何实现带依赖关系的任务图执行?(DAG调度引擎)

怎么用 std::shared_ptrstd::weak_ptr 管理任务节点生命周期?

DAG 中节点之间互相引用(比如后继节点存前驱,前驱又存后继),裸指针std::unique_ptr 会直接导致循环引用、内存泄漏。必须用 std::shared_ptr 持有“强依赖”关系(如执行时需要访问的上游结果),用 std::weak_ptr 持有“结构依赖”关系(如拓扑排序所需的边)。

  • 节点类里:用 std::vector<:weak_ptr>> predecessors</:weak_ptr> 存前驱,避免增加引用计数
  • 执行逻辑中:调用 lock() 获取临时 std::shared_ptr,检查是否为空再读取结果
  • 错误现象:不加 weak_ptr,整个图构建完后所有节点引用计数 ≥2,析构不触发
  • 示例:
    struct TaskNode {   std::vector<std::weak_ptr<TaskNode>> predecessors;   std::function<void()> work;   std::shared_ptr<std::any> result; };

拓扑排序失败但没报错?检查入度更新和零入度队列初始化

很多实现卡在“某几个任务永远不启动”,表面看是调度器卡住,实际是拓扑排序阶段漏清入度或重复入队。

  • 初始化时:每个节点的入度必须显式计算,不能靠运行时动态推导;遍历所有边,对 edge.from → edge.to,执行 indegree[edge.to]++
  • 零入度队列:只把入度为 0 的节点 push 进去,且仅一次;不要在循环中反复扫描全部节点找入度 0 的
  • 常见坑:边存储用 std::vector<:pair int>></:pair> 但节点 ID 是指针或 shared_ptr,比较失效导致入度没加对
  • 兼容性注意:如果节点可动态增删,每次变更后必须重算入度,不能复用旧状态

并发执行时结果读取崩溃?std::shared_mutexstd::mutex 更合适

多个下游任务可能同时读同一个上游结果,而上游只写一次。用独占锁会严重串行化读操作。

  • 写结果时(上游执行完):用 std::shared_mutex::lock()
  • 读结果时(下游执行前):用 std::shared_mutex::lock_shared()
  • 错误现象:只用 std::mutex,5 个下游全阻塞在同一个锁上,吞吐掉一半以上
  • 性能影响:实测在 8 核机器上,读多写少场景下,shared_mutex 比普通 mutex 吞吐高 3–4 倍
  • 注意:c++17 起才有 std::shared_mutex;低于 C++17 可用 boost::shared_mutex,但需额外链接

为什么不能直接用 std::thread 拉起任务?优先走线程池 + std::packaged_task

DAG 节点数量可能远超系统线程数,频繁创建/销毁 std::thread 开销大,还容易触发 OS 线程数限制(尤其 windows 默认 2000 左右)。

  • 必须用固定大小线程池,比如用 std::queue<:packaged_task>></:packaged_task> 做任务队列
  • 每个节点包装成 std::packaged_task,投递到池中;完成时通过 std::future 通知下游
  • 容易踩的坑:
    • Lambda 直接传给 std::thread,捕获了局部 shared_ptr,线程还没跑完函数就返回,导致悬空
    • 线程池没设最大并发数,突发大量叶子节点触发数百线程,调度反成瓶颈
  • 示例关键片段:
    auto task = std::make_shared<std::packaged_task<void()>>([node]{ node->execute(); }); pool.enqueue([task]{ (*task)(); });

复杂点在于:节点执行时机受上游 completion signal、线程池空闲度、结果读取锁竞争三重影响,任何一环没对齐,就会出现“该跑没跑”或“跑完不通知”。别迷信自动拓扑——得亲手 trace 几个节点的 indegree 变化和 weak_ptr::lock() 返回值。

text=ZqhQzanResources