Erlang/Elixir如何利用OTP实现高并发XML上传和处理

1次阅读

Plug是更主流的选择,但需禁用默认xml解析并用body_stream配合SAX流式处理,避免OOM;GenServer应只存轻量状态,解析逻辑下沉;I/O操作需与CPU解析解耦,用Task.Supervisor限流;动态worker适配不同Schema,确保资源隔离。

Erlang/Elixir如何利用OTP实现高并发XML上传和处理

XML上传接口cowboy 还是 plug?选错会卡死连接

在 Elixir 中,plug 是更主流的选择,但关键不在框架本身,而在底层 http 解析行为。默认的 Plug.Parsersmultipart 或大 XML 体不做流式处理,会把整个请求体读进内存,导致上传几百 MB XML 时进程 OOM 或 GC 压力飙升。

必须禁用默认 XML 解析,并手动接管原始 body 流:

plug Plug.Parsers,   parsers: [:urlencoded, :json],   pass: ["*/*"],   json_decoder: Jason

然后在 controller 中用 conn.body_params 取不到 XML,改用 conn.body_stream 获取 enumerable 流,再传给 SAX 解析器(如 xmerl_sax_parser 或 Elixir 的 Saxy)。

  • 别在 body_params 里找 XML —— 它只处理表单编码和 JSON
  • body_stream{module, fun, args} 三元组,需用 Stream.Resource/3 封装成可迭代流
  • 若用 cowboy 直接写 handler,要设 max_body_lengthstream_handlers,否则 Cowboy 会拒绝大包

GenServer 处理 XML 时为何越跑越慢?状态膨胀是主因

常见错误是把整棵解析后的 XML 树(比如 xmerl 返回的 {xmlElement, ...})存进 GenServer 状态。erlang 的 term 内存不共享,每次更新都复制全量结构,10MB XML 树反复修改会触发频繁 GC 和调度器争抢。

正确做法是:只保留轻量上下文(如 upload_idfile_refstate),把实际解析逻辑下沉到无状态函数,或用 Agent 管理临时数据。

  • Enum.reduce/3 + SAX 回调做增量处理,边读边存关键字段到 ETS 或 Postgres
  • 避免在 handle_cast 中调用 xmerl_scan:String/1 —— 它吃内存且非流式
  • 若必须构建树,用 xmerl_scan:file/2[{continuation_fun, Fun}] 实现分片解析

并发瓶颈不在解析,而在文件落地和 DB 写入

OTP 应用常误以为 “开了 1000 个 GenServer 就能吞 1000 并发 XML”,实际压测发现 CPU 利用率不足 30%,磁盘 I/O 或 DB 连接池早被打满。XML 解析本身是 CPU-bound,但落地为文件或插入数据库是 I/O-bound,会阻塞调度器。

必须拆离这两类操作:

  • 上传 → 解析 → 提取元数据 → 写入 ETS(瞬时)→ 投递 {:process_xml, upload_id}Task.Supervisor
  • DB 写入用 Ecto.Repo.insert_all/3 批量提交,避免逐条 insert/2
  • 文件存储优先走 :prim_file.write/2File.stream!/3 + Stream.into/2,绕过 Erlang IO server 争抢

同时限制后台任务数:

supervisor(Task.Supervisor, [[name: Myapp.TaskSup, shutdown: {:shutdown, 5000}, max_children: 50]])

,防止雪崩。

为什么 Supervisor 树里要加 simple_one_for_one?不是所有 XML 都一样

不同业务线上传的 XML Schema 差异极大:有的含二进制附件(base64),有的需校验 XSD,有的要转发到 kafka。硬编码一个 GenServer 类型无法适配。

simple_one_for_one 动态启动专用 worker,每个实例绑定唯一 upload_id 和配置项:

Supervisor.start_link([   {MyApp.XMLWorker, {upload_id, schema, options}} ], strategy: :simple_one_for_one)

这样既能隔离故障(某次上传崩溃不影响其他),又能按需加载依赖(如仅 XSD 校验才启动 xsd 库)。

  • 别把 schema 验证逻辑塞进通用 parser —— xmerl_xsd 是同步阻塞调用,会拖慢整个 worker
  • worker 初始化时通过 start_link/1 传参,而非从 Registry 查 —— 减少竞争点
  • 超时必须设:用 Process.flag(:trap_exit, true) + Process.exit(self(), :kill) 防止坏 XML 卡死进程

真正难的不是并发数字,而是让每个 XML 在独立资源边界内完成“上传→校验→落库→通知”闭环,且不互相污染。OTP 提供的是骨架,填什么肉,得看 XML 的实际重量和流向。

text=ZqhQzanResources