目录

异步编程:使用 stream

本章的重点

  • Stream 提供一个异步的数据序列。

  • 数据序列包括用户生成的事件和从文件读取的数据。

  • 你可以使用 Stream API 中的 listen() 方法和 await for 关键字来处理一个 Stream。

  • 当出现错误时,Stream 提供一种处理错误的方式。

  • Stream 有两种类型:Single-Subscription 和 Broadcast。

FutureStream 类是 Dart 异步编程的核心。

Future 表示一个不会立即完成的计算过程。与普通函数直接返回结果不同的是异步函数返回一个将会包含结果的 Future。该 Future 会在结果准备好时通知调用者。

Stream 是一系列异步事件的序列。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事件时它会立即给你,但是 Stream 则不会立即给你而是在它准备好时告诉你。

接收 Stream 事件

Stream 可以通过许多方式创建,这个话题我们会在另一篇文章详述,而这些所有的创建方式都可以相同的方式在代码中使用:像使用 for 循环 迭代一个 Iterable 一样,我们可以使用 异步 for 循环 (通常我们直接称之为 await for)来迭代 Stream 中的事件。例如:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

该代码只是简单地接收整型事件流中的每一个事件并将它们相加,然后返回(被 Future 包裹)相加后的整型值。当循环体结束时,函数会暂停直到下一个事件到达或 Stream 完成。

内部使用 await for 循环的函数需要使用 async 关键字标记。

下面的示例中使用了 async* 函数生成一个简单的整型 Stream 来测试上一个代码片段:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

错误事件

当 Stream 再也没有需要处理的事件时会变为完成状态,与此同时,调用者可以像接收到新事件回调那样接收 Stream 完成的事件回调。当使用 await for 循环读取事件时,循环会在 Stream 完成时停止。

有时在 Stream 完成前会出现错误;比如从远程服务器获取文件时出现网络请求失败,或者创建事件时出现 bug,尽管错误总是会有可能存在,但它出现时应该告知使用者。

Stream 可以像提供数据事件那样提供错误事件。大多数 Stream 会在第一次错误出现后停止,但其也可以提供多次错误并可以在在出现错误后继续提供数据事件。在本篇文档中我们只讨论 Stream 最多出现并提供一次错误事件的情况。

当使用 await for 读取 Stream 时,如果出现错误,则由循环语句抛出,同时循环结束。你可以使用 try-catch 语句捕获错误。下面的示例会在循环迭代到参数值等于 4 时抛出一个错误:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

Stream 的使用

Stream 类中包含了许多像 Iterable 类中一样的辅助方法帮助你实现一些常用的操作。例如,你可以使用 Stream API 中的 lastWhere() 方法从 Stream 中找出最后一个正整数。

Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

Stream 的两种类型

Stream 有两种类型。

Single-Subscription 类型的 Stream

最常见的类型是一个 Stream 只包含了某个众多事件序列的一个。而这些事件需要按顺序提供并且不能丢失。当你读取一个文件或接收一个网页请求时就需要使用这种类型的 Stream。

这种 Stream 只能设置一次监听。重复设置则会丢失原来的事件,而导致你所监听到的剩余其它事件毫无意义。当你开始监听时,数据将以块的形式提供和获取。

Broadcast 类型的 Stream

另一种流是针对单个消息的,这种流可以一次处理一个消息。例如可以将其用于浏览器的鼠标事件。

你可以在任何时候监听这种 Stream,且在此之后你可以获取到任何触发的事件。这种流可以在同一时间设置多个不同的监听器同时监听,同时你也可以在取消上一个订阅后再次对其发起监听。

处理 Stream 的方法

下面这些 Stream<T> 类中的方法可以对 Stream 进行处理并返回结果:

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

上述所有的方法,除了 drain() and pipe() 方法外,都在 Iterable 类中有对应的相似方法。如果你在异步函数中使用了 await for 循环(或者只是在另一个方法中使用),那么使用上述的这些方法将会更加容易。例如,一些代码实现大概是这样的:

Future<bool> contains(Object? needle) async {
  await for (final event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (final event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await forEach(result.add);
  return result;
}

Future<String> join([String separator = '']) async =>
    (await toList()).join(separator);

(上述代码只是个简单的示例,实际的实现逻辑可能要稍微复杂一点。)

修改 Stream 的方法

下面的方法可以对原始的 Stream 进行处理并返回新的 Stream。当调用了这些方法后,设置在原始 Stream 上的监听器会先监听被转换后的新 Stream,待新的 Stream 处理完成后才会转而回去监听原始的 Stream。

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

Iterable 类中也有一些将一个 iterable 转换为另一个 iterable 的方法,上述的这些方法与 Iterable 类中的这些方法相似。如果你在异步函数中使用了 await for 循环,那么使用上述的这些方法将会更加容易。

Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);

asyncExpand()asyncMap() 方法与 expand()map() 方法类似,不同的是前两者允许将一个异步函数作为函数参数。 Iterable 中没有与 distinct() 类似的方法,但是在不久的将来可能会加上。

Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink)? onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最后这三个方法比较特殊。它们用于处理 await for 循环不能处理的错误:当循环执行过程中出现错误时,该循环会结束同时取消 Stream 上的订阅且不能恢复。你可以使用 handleError() 方法在 await for 循环中使用 Stream 前将相关错误移除。

Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (final event in streamWithoutErrors) {
    yield convert(event);
  }
}

transform() 方法

transform() 方法并不只是用于处理错误;它更是一个通用的 Stream “map 映射”。通常而言,一个 “map 映射”会为每一个输入事件设置一个值。但是对于 I/O Stream 而言,它可能会使用多个输入事件来生成一个输出事件。这时候使用 StreamTransformer 就可以做到这一点。例如像 Utf8Decoder 这样的解码器就是一个变换器。一个变换器只需要实现一个 bind() 方法,其可通过 async 函数轻松实现。

读取和解码文件

下面的代码示例读取一个文件并在其 Stream 上执行了两次变换。第一次转换是将文件数据转换成 UTF-8 编码格式,然后将转换后的数据变换成一个 LineSplitter 执行。文件中除了 # 开头的行外其它的行都会被打印出来。

import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

listen() 方法

最后一个重要的方法是 listen()。这是一个“底层”方法,其它所有的 Stream 方法都根据 listen() 方法定义。

StreamSubscription<T> listen(void Function(T event)? onData,
    {Function? onError, void Function()? onDone, bool? cancelOnError});

你只需继承 Stream 类并实现 listen() 方法来创建一个 Stream 类型的子类。 Stream 类中所有其它的方法都依赖于对 listen() 方法的调用。

listen() 方法可以让你对一个 Stream 进行监听。在你对一个 Stream 进行监听前,它只不过是个惰性对象,该对象描述了你想查看的事件。当你对其进行监听后,其会返回一个 StreamSubscription 对象,该对象用以表示一个生产事件的活跃的 Stream。这与 Iterable 对象的实现方式类似,不同的是 Iterable 对象可返回迭代器并可以进行真实的迭代操作。

Stream 允许你暂停、继续甚至完全取消一个订阅。你也可以为其设置一个回调,该回调会在每一个数据事件、错误事件以及 Stream 自身关闭时通知调用者。

其它资源信息

可以阅读下面的文档获取更多关于在 Dart 中使用 Stream 和异步编程的信息: