
Rxjs 的 `first` 操作符用于获取 Observable 发出的第一个值。其核心在于“第一个值”的定义:如果 Observable 发出的是一个数组作为整体,`first` 将返回整个数组;而如果 Observable 将数组中的每个元素分别发出,`first` 则返回数组的第一个元素。本文将详细解析这两种情况,并通过示例代码展示 `of` 和 `from` 操作符在处理集合数据时的差异,并介绍如何通过 `mergeAll` 等操作符实现预期的数据扁平化处理。
RxJS first 操作符核心机制
first 操作符是 RxJS 中一个常用且直观的过滤操作符,它的作用是从源 Observable 中取出它发出的第一个值,然后完成(complete)。这意味着一旦找到符合条件的值,整个数据流就会终止。如果源 Observable 在发出任何值之前就完成了,或者没有满足指定谓词(predicate)的值,并且没有提供默认值,那么 first 将会抛出一个 EmptyError。
理解 first 操作符的关键在于明确“第一个值”的定义。它始终作用于 Observable 实际发出的数据流中的第一个项,而不会深入到该项的内部结构。
集合类型数据发射行为解析
当处理数组或其他集合类型数据时,first 操作符的行为常常会引起混淆。这主要源于 Observable 创建操作符(如 of 和 from)在处理集合数据时的不同策略。
场景一:将数组作为一个整体发出
当使用 of 操作符或直接通过 new Observable 的 next 方法发出一个数组时,这个数组会被视为一个单一的值。first 操作符会捕获到这个完整的数组,并将其作为第一个值返回。
示例代码:
import { Observable, of } from 'rxjs'; import { first } from 'rxjs/operators'; // 方式一:使用 new Observable 发出数组 new Observable<number[]>(subscriber => { subscriber.next([1, 2, 3, 4]); // 整个数组 [1,2,3,4] 被作为一个单一的值发出 subscriber.complete(); }).pipe( first() ).subscribe({ next: (response) => console.log('Observable emits array:', response), // 输出:[1, 2, 3, 4] error: (err) => console.error(err), complete: () => console.log('Completed') }); // 方式二:使用 of 操作符发出数组 of([1, 2, 3, 4]).pipe( first() ).subscribe({ next: (response) => console.log('Of emits array:', response), // 输出:[1, 2, 3, 4] error: (err) => console.error(err), complete: () => console.log('Completed') });
在这两种情况下,first() 操作符接收到的第一个(也是唯一一个)值就是完整的数组 [1, 2, 3, 4]。因此,它会直接将这个数组传递给 subscribe 的 next 回调函数。
场景二:将数组中的元素逐个发出
与 of 不同,from 操作符专门设计用于将可迭代对象(如数组、字符串、Set、map)或 promise 转换为 Observable,并将其内部的元素逐个发出。
示例代码:
import { from } from 'rxjs'; import { first } from 'rxjs/operators'; from([1, 2, 3, 4]).pipe( first() ).subscribe({ next: (response) => console.log('From emits individual:', response), // 输出:1 error: (err) => console.error(err), complete: () => console.log('Completed') });
在这个例子中,from([1, 2, 3, 4]) 会首先发出 1,然后发出 2,以此类推。当 first() 操作符接收到第一个值 1 时,它就会立即将 1 传递给 next 回调,并使 Observable 完成。后续的 2, 3, 4 将不会被发出,因为 first 已经找到了它所需的值并终止了流。
如何实现数组元素的扁平化处理
如果你希望 Observable 发出一个数组,但又想让 first 操作符能够获取到数组内部的第一个元素(即 1 而不是 [1,2,3,4]),你需要将这个数组“扁平化”成一个 Observable,使其逐个发出内部元素。
RxJS 提供了多种扁平化操作符,其中 mergeAll 是一个直接的解决方案,它会将一个发出 Observables 的 Observable 扁平化,使其发出内部 Observable 的所有值。
示例代码:
import { of } from 'rxjs'; import { first, mergeAll } from 'rxjs/operators'; of([1, 2, 3, 4]).pipe( mergeAll(), // 将发出 [1,2,3,4] 的 Observable 转换为发出 1, 2, 3, 4 的 Observable first() ).subscribe({ next: (response) => console.log('Of + mergeAll + first:', response), // 输出:1 error: (err) => console.error(err), complete: () => console.log('Completed') });
在这个流程中:
- of([1, 2, 3, 4]) 首先发出一个值:[1, 2, 3, 4] (这是一个数组)。
- mergeAll() 接收到这个数组。由于 mergeAll 期望接收的是一个 Observable,或者一个可转换为 Observable 的值(如 Promise 或可迭代对象),它会将这个数组视为一个内部 Observable,并开始逐个发出数组中的元素:1,然后 2,以此类推。
- first() 操作符接收到 mergeAll 发出的第一个值 1,然后将其传递给 next 回调,并完成整个流。
除了 mergeAll,你也可以使用 flatMap (即 mergeMap)、concatMap 或 switchMap 结合 from 来实现更复杂的扁平化逻辑,具体选择哪个操作符取决于你对并发、顺序和取消行为的需求。
import { of, from } from 'rxjs'; import { first, mergeMap } from 'rxjs/operators'; of([1, 2, 3, 4]).pipe( mergeMap(arr => from(arr)), // 对于每个发出的数组,创建一个新的 Observable 逐个发出其元素 first() ).subscribe({ next: (response) => console.log('Of + mergeMap(from) + first:', response), // 输出:1 error: (err) => console.error(err), complete: () => console.log('Completed') });
注意事项与总结
- 理解“值”的边界:RxJS 操作符处理的是 Observable 发出的“值”本身。一个数组 [1,2,3,4] 作为一个整体被发出时,它就是一个值;而当 from 操作符将其解构后,1、2、3、4 才是各自独立的值。
- of 与 from 的区别:
- of(…items):将括号内的所有参数作为独立的项依次发出。如果参数本身是一个数组,它会把这个数组作为一个整体发出。
- from(iterable):将可迭代对象(如数组、字符串、Set、Map)中的每个元素逐个发出。
- 扁平化操作符的重要性:当源 Observable 发出的是包含多个值的集合(例如一个数组),但你希望操作符作用于集合内部的每个元素时,你需要使用 mergeAll、flatMap (mergeMap) 等扁平化操作符来改变数据流的结构。
- first() 的完成机制:一旦 first() 找到它需要的值(无论是第一个值还是第一个满足谓词的值),它就会立即完成整个 Observable 流,阻止后续值的发出。
通过深入理解 Observable 的数据发射机制以及 first 等操作符的工作原理,可以有效避免在 RxJS 编程中遇到的常见混淆,从而更准确地控制数据流。


