C++如何实现一个Actor模型_利用C++构建高并发的Actor并发模型

2次阅读

Actor模型通过独立实体间的消息传递实现并发,每个Actor拥有私有状态、邮箱和行为逻辑,c++中可利用std::Thread线程安全队列模拟,如counterActor示例所示,通过消息触发状态变更,避免共享内存,确保线程安全,虽无原生支持但能高效构建高并发系统。

C++如何实现一个Actor模型_利用C++构建高并发的Actor并发模型

Actor模型是一种处理并发计算的数学模型,每个Actor是独立的实体,通过消息传递进行通信,不共享状态。在C++中实现Actor模型,虽然不像erlang或Akka那样原生支持,但借助现代C++(C++11及以上)的多线程异步机制,完全可以构建出高效、安全的高并发Actor系统。

Actor模型核心思想

每个Actor拥有:

  • 独立的状态:不与其他Actor共享内存
  • 邮箱(Mailbox):接收其他Actor发送的消息
  • 行为逻辑:处理消息时执行的函数
  • 创建与通信能力:可创建新Actor,并向其他Actor发送消息

消息通过异步方式发送,Actor逐个处理邮箱中的消息,避免竞态条件。

使用std::thread与消息队列实现Actor

最基础的方式是为每个Actor分配一个线程或共享线程池,配合线程安全的消息队列。

立即学习C++免费学习笔记(深入)”;

C++如何实现一个Actor模型_利用C++构建高并发的Actor并发模型

风车Ai翻译

跨境电商必备ai翻译工具

C++如何实现一个Actor模型_利用C++构建高并发的Actor并发模型 407

查看详情 C++如何实现一个Actor模型_利用C++构建高并发的Actor并发模型

// 简单的消息基类

#include
#include
#include
#include #include
#include

Struct Message {
    virtual ~Message() = default;
};

// 线程安全的消息队列
template
class ThreadSafeQueue {
private:
    mutable std::mutex mtx;
    std::queue data_queue;
public:
    void push(T msg) {
        std::lock_guard<:mutex> lock(mtx);
        data_queue.push(std::move(msg));
    }
    bool try_pop(T& value) {
        std::lock_guard<:mutex> lock(mtx);
        if (data_queue.empty()) return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        return true;
    }
    bool empty() const {
        std::lock_guard<:mutex> lock(mtx);
        return data_queue.empty();
    }
};

定义Actor基类与行为

Actor运行在一个独立的上下文中,循环从邮箱取消息并处理。

class Actor {
public:
    using MessagePtr = std::shared_ptr;
    virtual ~Actor() { stop(); }

    void start() {
        running = true;
        thread = std::thread([this] { run(); });
    }

    void stop() {
        running = false;
        if (thread.joinable()) thread.join();
    }

    void send(MessagePtr msg) {
        mailbox.push(std::move(msg));
    }

protected:
    virtual void onMessage(const MessagePtr& msg) = 0;

private:
    void run() {
        while (running) {
            MessagePtr msg;
            if (mailbox.try_pop(msg)) {
                if (msg) onMessage(msg);
            } else {
                std::this_thread::sleep_for(std::chrono::nanoseconds(10));
            }
        }
    }

    ThreadSafeQueue mailbox;
    std::thread thread;
    std::atomic running{false};
};

实现具体Actor示例

比如一个计数Actor,接收增加指令并打印当前值。

struct IncrementMsg : Message {
    int delta;
    IncrementMsg(int d) : delta(d) {}
};

class CounterActor : public Actor {
private:
    int count = 0;

protected:
    void onMessage(const MessagePtr& msg) override {
        if (auto inc = dynamic_cast(msg.get())) {
            count += inc->delta;
            printf(“Count is now: %dn”, count);
        }
    }
};

int main() {
    CounterActor counter;
    counter.start();

    counter.send(std::make_shared(1));
    counter.send(std::make_shared(2));
    counter.send(std::make_shared(3));

    std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待处理
    return 0;
}

基本上就这些。这个简单框架展示了Actor模型的核心:封装状态、消息驱动、异步通信。实际生产中可进一步优化,如使用无锁队列、Actor池、调度器分离等。C++虽无原生Actor支持,但凭借其灵活性和性能优势,完全能构建出媲美Erlang的高并发Actor系统。

text=ZqhQzanResources