Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help


x-i18n: generated_at: “2026-03-01T14:46:41Z” model: gemini-3-flash-preview provider: google-gemini-cli source_hash: b494f9019a1e9a79992db4c056377995227764d2822bb8cd0f86d51ce4a08655 source_path: ch17-04-streams.md workflow: 16

流:顺序排列的 Future (Streams: Futures in Sequence)

回想一下在本章前面的“消息传递”一节中我们如何为异步通道使用接收端。异步 recv 方法会随时间产生一系列项。这是一个更通用模式的一个实例,称为“流 (stream)”。许多概念都可以自然地表示为流:队列中可用的项、当完整数据集对于计算机内存来说太大时从文件系统增量拉取的数据块,或者随时间通过网络到达的数据。因为流是 future,所以我们可以将它们与任何其他种类的 future 一起使用,并以有趣的方式组合它们。例如,我们可以批量处理事件以避免触发过多的网络调用,为一系列长时间运行的操作设置超时,或者限制用户界面事件以避免做无谓的工作。

Recall how we used the receiver for our async channel earlier in this chapter in the “Message Passing” section. The async recv method produces a sequence of items over time. This is an instance of a much more general pattern known as a stream. Many concepts are naturally represented as streams: items becoming available in a queue, chunks of data being pulled incrementally from the filesystem when the full data set is too large for the computer’s memory, or data arriving over the network over time. Because streams are futures, we can use them with any other kind of future and combine them in interesting ways. For example, we can batch up events to avoid triggering too many network calls, set timeouts on sequences of long-running operations, or throttle user interface events to avoid doing needless work.

我们在第 13 章Iterator 特征和 next 方法”一节中看到过一系列项,但迭代器和异步通道接收端之间有两个区别。第一个区别是时间:迭代器是同步的,而通道接收端是异步的。第二个区别是 API。当直接处理 Iterator 时,我们调用它的同步 next 方法。特别地,对于 trpl::Receiver 流,我们调用了一个异步 recv 方法。除此之外,这些 API 感觉非常相似,而且这种相似性并非巧合。流就像是迭代的一种异步形式。不过,虽然 trpl::Receiver 特别用于等待接收消息,但通用的流 API 则广泛得多:它像 Iterator 那样提供下一个项,但是是异步的。

We saw a sequence of items back in Chapter 13, when we looked at the Iterator trait in “The Iterator Trait and the next Method” section, but there are two differences between iterators and the async channel receiver. The first difference is time: iterators are synchronous, while the channel receiver is asynchronous. The second difference is the API. When working directly with Iterator, we call its synchronous next method. With the trpl::Receiver stream in particular, we called an asynchronous recv method instead. Otherwise, these APIs feel very similar, and that similarity isn’t a coincidence. A stream is like an asynchronous form of iteration. Whereas the trpl::Receiver specifically waits to receive messages, though, the general-purpose stream API is much broader: it provides the next item the way Iterator does, but asynchronously.

Rust 中迭代器和流之间的相似性意味着我们实际上可以从任何迭代器创建一个流。与迭代器一样,我们可以通过调用流的 next 方法然后 await 输出处理流,如示例 17-21 所示,这段代码目前还无法编译。

The similarity between iterators and streams in Rust means we can actually create a stream from any iterator. As with an iterator, we can work with a stream by calling its next method and then awaiting the output, as in Listing 17-21, which won’t compile yet.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-21/src/main.rs:stream}}

我们从一个数字数组开始,将其转换为迭代器,然后调用 map 将所有值翻倍。然后我们使用 trpl::stream_from_iter 函数将该迭代器转换为流。接下来,我们使用 while let 循环在该流中的项到达时对其进行遍历。

We start with an array of numbers, which we convert to an iterator and then call map on to double all the values. Then we convert the iterator into a stream using the trpl::stream_from_iter function. Next, we loop over the items in the stream as they arrive with the while let loop.

不幸的是,当我们尝试运行这段代码时,它无法编译,而是报告没有可用的 next 方法:

Unfortunately, when we try to run the code, it doesn’t compile but instead reports that there’s no next method available:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

(错误[E0599]:在当前作用域内,结构体 tokio_stream::iter::Iter 中找不到名为 next 的方法)

正如这份输出所解释的,编译器错误的原因是我们需要将正确的特征引入作用域,才能使用 next 方法。鉴于我们到目前为止的讨论,你可能会理所当然地认为那个特征是 Stream ,但它实际上是 StreamExt 。作为 extension 的缩写, Ext 是 Rust 社区中用一个特征扩展另一个特征的一种常见模式。

As this output explains, the reason for the compiler error is that we need the right trait in scope to be able to use the next method. Given our discussion so far, you might reasonably expect that trait to be Stream, but it’s actually StreamExt. Short for extension, Ext is a common pattern in the Rust community for extending one trait with another.

Stream 特征定义了一个有效地结合了 IteratorFuture 特征的底层接口。 StreamExtStream 之上提供了一组更高级别的 API,包括 next 方法以及类似于 Iterator 特征提供的其他工具方法。 StreamStreamExt 尚未成为 Rust 标准库的一部分,但大多数生态系统 crate 使用类似的定义。

The Stream trait defines a low-level interface that effectively combines the Iterator and Future traits. StreamExt supplies a higher-level set of APIs on top of Stream, including the next method as well as other utility methods similar to those provided by the Iterator trait. Stream and StreamExt are not yet part of Rust’s standard library, but most ecosystem crates use similar definitions.

解决编译器错误的方法是为 trpl::StreamExt 添加一个 use 语句,如示例 17-22 所示。

The fix to the compiler error is to add a use statement for trpl::StreamExt, as in Listing 17-22.

#![allow(unused)]
fn main() {
{{#rustdoc_include ../listings/ch17-async-await/listing-17-22/src/main.rs:all}}
}

将所有这些部分拼凑在一起,这段代码就能按我们想要的方式工作了!而且,既然我们将 StreamExt 引入了作用域,我们就可以使用它的所有工具方法,就像使用迭代器一样。

With all those pieces put together, this code works the way we want! What’s more, now that we have StreamExt in scope, we can use all of its utility methods, just as with iterators.