
本文详解如何利用 php fiber 实现真正的并发流读取:通过同时启动多个非阻塞流 fiber,并轮询调度,避免串行等待,从而高效获取多个远程 url 的分块内容。
本文详解如何利用 php fiber 实现真正的并发流读取:通过同时启动多个非阻塞流 fiber,并轮询调度,避免串行等待,从而高效获取多个远程 url 的分块内容。
在 PHP 8.1+ 中,Fiber 提供了轻量级协程能力,配合 stream_set_blocking(false) 可实现 I/O 多路复用式的异步读取。但需注意:Fiber 本身不提供自动调度器——若按顺序逐个 resume() 直至终止(如原代码),本质仍是同步执行;真正并发的关键在于 “并发启动 + 轮询协作”。
以下是一个完整、可运行的解决方案,支持对多个 http URL 同时发起非阻塞读取,并以 100 字节为单位交错处理响应流:
<?php function getFiberFromStream($stream, $url): Fiber { return new Fiber(function ($stream) use ($url): void { while (!feof($stream)) { // 非阻塞 fread:若无数据立即返回空字符串(不会挂起) $chunk = fread($stream, 100); if ($chunk !== '') { echo "reading " . strlen($chunk) . " bytes from $urln"; Fiber::suspend($chunk); } else { // 模拟短暂让出控制权,避免忙等(实际中可结合 usleep 或事件循环) Fiber::suspend(''); } } // feof 触发后自动退出,Fiber 终止 }); } function getContents(array $urls): array { $contents = []; $fibers = []; // 存储 [Fiber, current_content, stream] 三元组 // ✅ 第一阶段:并发启动所有 Fiber foreach ($urls as $index => $url) { $stream = fopen($url, 'r'); if (!$stream) { throw new RuntimeException("Failed to open stream for $url"); } stream_set_blocking($stream, false); // 关键:设为非阻塞模式 $fiber = getFiberFromStream($stream, $url); $initialContent = $fiber->start($stream); // 启动 Fiber,获取首个 chunk $fibers[$index] = [$fiber, $initialContent, $stream]; } // ✅ 第二阶段:轮询调度所有活跃 Fiber(协作式多任务) $hasActive = true; while ($hasActive) { $hasActive = false; foreach ($fibers as $index => &$item) { [$fiber, $content, $stream] = $item; if ($fiber->isTerminated()) { // Fiber 已完成:保存结果并关闭流 if ($stream) { fclose($stream); $item[2] = null; // 防重复关闭 } $contents[$urls[$index]] = $content; continue; } // Fiber 仍在运行:继续推进一步 $hasActive = true; try { $nextChunk = $fiber->resume(); $item[1] = $content .= $nextChunk; } catch (Throwable $e) { // 处理 Fiber 内部异常(如网络中断、SSL 错误等) error_log("Fiber error for {$urls[$index]}: " . $e->getMessage()); $item[1] = $content; // 保留已读内容 $fiber->throw($e); // 可选:向 Fiber 抛出异常 } } } return $contents; } // 使用示例(请确保目标站点允许爬取且网络可达) $urls = [ 'https://httpbin.org/delay/1', // 模拟慢响应 'https://httpbin.org/delay/1', 'https://httpbin.org/delay/1', ]; // ⚠️ 注意事项: // 1. fopen('https://...') 依赖 php-curl 或 OpenSSL 扩展,部分环境需配置 allow_url_fopen=On; // 2. 非阻塞流在 HTTPS 场景下可能受底层 SSL 握手限制,建议生产环境搭配 ReactPHP/swoole 等成熟事件循环; // 3. 此方案为纯 Fiber 协作调度,无内核级异步支持,高并发时仍受限于单线程吞吐; // 4. 实际项目中应增加超时控制(如 stream_get_meta_data($stream)['timed_out'])、重试机制与错误熔断。 var_dump(getContents($urls));
该实现的核心改进在于将“启动”与“执行”解耦:先批量创建并启动所有 Fiber,再通过外层 while 循环统一调度,每次仅推进每个未终止 Fiber 一次(resume()),从而实现类似 select() 的轮询效果。输出日志将呈现理想的交错模式:
reading 123 bytes from https://httpbin.org/delay/1 reading 98 bytes from https://httpbin.org/delay/1 reading 115 bytes from https://httpbin.org/delay/1 reading 0 bytes from https://httpbin.org/delay/1 reading 67 bytes from https://httpbin.org/delay/1 ...
✅ 总结:Fiber 不是银弹,但它是构建用户态异步逻辑的有力工具。要发挥其并发价值,必须主动设计协作调度逻辑——而非依赖“自动并行”。本方案虽简洁,却清晰揭示了 Fiber 编程的本质:手动控制执行流,以换取最大灵活性。进阶场景建议集成 ext-Event 或迁移到 Swoole/ReactPHP 等专业异步框架。
立即学习“PHP免费学习笔记(深入)”;