向运行时交出控制权
Yielding Control to the Runtime
回想一下 “我们的第一个异步程序” 一节,在每个等待点(await point),如果正在等待的 future 尚未准备就绪,Rust 都会给运行时一个机会来暂停当前任务并切换到另一个任务。反之亦然:Rust 仅 在等待点暂停异步块并将控制权交回给运行时。等待点之间的所有内容都是同步的。
Recall from the “Our First Async Program” section that at each await point, Rust gives a runtime a chance to pause the task and switch to another one if the future being awaited isn’t ready. The inverse is also true: Rust only pauses async blocks and hands control back to a runtime at an await point. Everything between await points is synchronous.
这意味着如果你在一个没有等待点的异步块中做了一堆工作,那个 future 将会阻塞任何其他 future 取得进展。你有时可能会听到这被称为一个 future 饿死(starving)了其他 future。在某些情况下,这可能不是什么大问题。但是,如果你正在进行某种昂贵的设置或长时间运行的工作,或者你有一个会无限期地持续执行某个特定任务的 future,你就需要考虑何时以及在哪里将控制权交回给运行时。
That means if you do a bunch of work in an async block without an await point, that future will block any other futures from making progress. You may sometimes hear this referred to as one future starving other futures. In some cases, that may not be a big deal. However, if you are doing some kind of expensive setup or long-running work, or if you have a future that will keep doing some particular task indefinitely, you’ll need to think about when and where to hand control back to the runtime.
让我们模拟一个长时间运行的操作来说明饥饿问题,然后探索如何解决它。示例 17-14 引入了一个 slow 函数。
Let’s simulate a long-running operation to illustrate the starvation problem,
then explore how to solve it. Listing 17-14 introduces a slow function.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
// We will call `slow` here later
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
这段代码使用 std::thread::sleep 而不是 trpl::sleep,因此调用 slow 将会阻塞当前线程若干毫秒。我们可以使用 slow 来代表现实世界中既耗时又具有阻塞性的操作。
This code uses std::thread::sleep instead of trpl::sleep so that calling
slow will block the current thread for some number of milliseconds. We can
use slow to stand in for real-world operations that are both long-running and
blocking.
在示例 17-15 中,我们使用 slow 来模仿在一对 future 中执行这种 CPU 密集型工作。
In Listing 17-15, we use slow to emulate doing this kind of CPU-bound work in
a pair of futures.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
slow("a", 10);
slow("a", 20);
trpl::sleep(Duration::from_millis(50)).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
slow("b", 10);
slow("b", 15);
slow("b", 350);
trpl::sleep(Duration::from_millis(50)).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
每个 future 仅在执行完一堆慢速操作 之后 才将控制权交回给运行时。如果你运行这段代码,你将看到如下输出:
Each future hands control back to the runtime only after carrying out a bunch of slow operations. If you run this code, you will see this output:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
与示例 17-5 中我们使用 trpl::select 来竞争获取两个 URL 的 future 一样,一旦 a 完成,select 就会结束。然而,在这两个 future 中对 slow 的调用之间没有交错。a future 执行其所有工作直到 trpl::sleep 调用被等待(awaited),然后 b future 执行其所有工作直到它自己的 trpl::sleep 调用被等待,最后 a future 完成。为了允许两个 future 在它们的慢速任务之间取得进展,我们需要等待点,以便我们可以将控制权交回给运行时。这意味着我们需要一些我们可以等待的东西!
As with Listing 17-5 where we used trpl::select to race futures fetching two
URLs, select still finishes as soon as a is done. There’s no interleaving
between the calls to slow in the two futures, though. The a future does all
of its work until the trpl::sleep call is awaited, then the b future does
all of its work until its own trpl::sleep call is awaited, and finally the
a future completes. To allow both futures to make progress between their slow
tasks, we need await points so we can hand control back to the runtime. That
means we need something we can await!
我们已经可以在示例 17-15 中看到这种交接的发生:如果我们移除 a future 末尾的 trpl::sleep,它将会在 b future 根本 没有运行的情况下完成。让我们尝试使用 trpl::sleep 函数作为起点,让操作轮流取得进展,如示例 17-16 所示。
We can already see this kind of handoff happening in Listing 17-15: if we
removed the trpl::sleep at the end of the a future, it would complete
without the b future running at all. Let’s try using the trpl::sleep
function as a starting point for letting operations switch off making progress,
as shown in Listing 17-16.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let one_ms = Duration::from_millis(1);
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::sleep(one_ms).await;
slow("a", 10);
trpl::sleep(one_ms).await;
slow("a", 20);
trpl::sleep(one_ms).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::sleep(one_ms).await;
slow("b", 10);
trpl::sleep(one_ms).await;
slow("b", 15);
trpl::sleep(one_ms).await;
slow("b", 350);
trpl::sleep(one_ms).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
我们在每次调用 slow 之间添加了带有等待点的 trpl::sleep 调用。现在两个 future 的工作交错进行了:
We’ve added trpl::sleep calls with await points between each call to slow.
Now the two futures’ work is interleaved:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
由于 a future 在调用 trpl::sleep 之前调用了 slow,所以它仍然运行了一会儿才将控制权交给 b,但之后每当其中一个 future 碰到等待点时,它们就会来回切换。在本例中,我们在每次调用 slow 之后都这样做,但我们可以按照对我们最有意义的任何方式来分解工作。
The a future still runs for a bit before handing off control to b, because
it calls slow before ever calling trpl::sleep, but after that the futures
swap back and forth each time one of them hits an await point. In this case, we
have done that after every call to slow, but we could break up the work in
whatever way makes the most sense to us.
然而,我们在这里并不是真的想要 休眠(sleep):我们想要尽可能快地取得进展。我们只需要将控制权交回给运行时。我们可以直接使用 trpl::yield_now 函数做到这一点。在示例 17-17 中,我们将所有那些 trpl::sleep 调用替换为 trpl::yield_now。
We don’t really want to sleep here, though: we want to make progress as fast
as we can. We just need to hand back control to the runtime. We can do that
directly, using the trpl::yield_now function. In Listing 17-17, we replace
all those trpl::sleep calls with trpl::yield_now.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::yield_now().await;
slow("a", 10);
trpl::yield_now().await;
slow("a", 20);
trpl::yield_now().await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::yield_now().await;
slow("b", 10);
trpl::yield_now().await;
slow("b", 15);
trpl::yield_now().await;
slow("b", 350);
trpl::yield_now().await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
这段代码不仅更清楚地表达了实际意图,而且比使用 sleep 快得多,因为像 sleep 所使用的计时器通常在粒度上有限制。例如,我们正在使用的 sleep 版本总是至少休眠一毫秒,即使我们传递给它一纳秒的 Duration。再说一遍,现代计算机是 飞快 的:它们可以在一毫秒内做很多事情!
This code is both clearer about the actual intent and can be significantly
faster than using sleep, because timers such as the one used by sleep often
have limits on how granular they can be. The version of sleep we are using,
for example, will always sleep for at least a millisecond, even if we pass it a
Duration of one nanosecond. Again, modern computers are fast: they can do a
lot in one millisecond!
这意味着异步甚至对计算密集型任务也很有用,这取决于你的程序还在做其他什么事情,因为它提供了一个有用的工具来构建程序不同部分之间的关系(但代价是异步状态机的开销)。这是一种 协作式多任务(cooperative multitasking)的形式,其中每个 future 都有权通过等待点决定何时移交控制权。因此,每个 future 也有责任避免阻塞太长时间。在某些基于 Rust 的嵌入式操作系统中,这是 唯一 的多任务处理方式!
This means that async can be useful even for compute-bound tasks, depending on what else your program is doing, because it provides a useful tool for structuring the relationships between different parts of the program (but at a cost of the overhead of the async state machine). This is a form of cooperative multitasking, where each future has the power to determine when it hands over control via await points. Each future therefore also has the responsibility to avoid blocking for too long. In some Rust-based embedded operating systems, this is the only kind of multitasking!
当然,在现实的代码中,你通常不会在每一行都交替进行函数调用和等待点。虽然以这种方式交出控制权的成本相对较低,但并不是免费的。在许多情况下,尝试分解计算密集型任务可能会使其显著变慢,因此有时为了 整体 性能,让操作短暂阻塞会更好。务必进行测量,看看代码实际的性能瓶颈在哪里。然而,如果你 确实 看到很多你原本预期会并发发生的工作正在串行发生,那么记住底层的动态是很重要的!
In real-world code, you won’t usually be alternating function calls with await points on every single line, of course. While yielding control in this way is relatively inexpensive, it’s not free. In many cases, trying to break up a compute-bound task might make it significantly slower, so sometimes it’s better for overall performance to let an operation block briefly. Always measure to see what your code’s actual performance bottlenecks are. The underlying dynamic is important to keep in mind, though, if you are seeing a lot of work happening in serial that you expected to happen concurrently!
构建我们自己的异步抽象
Building Our Own Async Abstractions
我们还可以将 future 组合在一起以创建新的模式。例如,我们可以利用已有的异步构建块构建一个 timeout 函数。完成后,结果将成为另一个构建块,我们可以使用它来创建更多的异步抽象。
We can also compose futures together to create new patterns. For example, we can
build a timeout function with async building blocks we already have. When
we’re done, the result will be another building block we could use to create
still more async abstractions.
示例 17-18 展示了我们期望这个 timeout 如何与慢速 future 一起工作。
Listing 17-18 shows how we would expect this timeout to work with a slow
future.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
让我们来实现它!首先,让我们考虑 timeout 的 API:
Let’s implement this! To begin, let’s think about the API for timeout:
-
它本身需要是一个异步函数,以便我们可以等待它。
-
它的第一个参数应该是要运行的 future。我们可以使其泛型化,以便让它适用于任何 future。
-
它的第二个参数将是等待的最长时间。如果我们使用
Duration,那将很容易传递给trpl::sleep。 -
它应该返回一个
Result。如果 future 成功完成,Result将是Ok且带有 future 产生的值。如果超时先到期,Result将是Err且带有超时等待的持续时间。 -
It needs to be an async function itself so we can await it.
-
Its first parameter should be a future to run. We can make it generic to allow it to work with any future.
-
Its second parameter will be the maximum time to wait. If we use a
Duration, that will make it easy to pass along totrpl::sleep. -
It should return a
Result. If the future completes successfully, theResultwill beOkwith the value produced by the future. If the timeout elapses first, theResultwill beErrwith the duration that the timeout waited for.
示例 17-19 展示了这一声明。
Listing 17-19 shows this declaration.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
这满足了我们对类型的目标。现在让我们考虑我们需要的 行为:我们想要让传入的 future 与持续时间进行竞争。我们可以使用 trpl::sleep 从持续时间创建一个计时器 future,并使用 trpl::select 将该计时器与调用者传入的 future 一起运行。
That satisfies our goals for the types. Now let’s think about the behavior we
need: we want to race the future passed in against the duration. We can use
trpl::sleep to make a timer future from the duration, and use trpl::select
to run that timer with the future the caller passes in.
在示例 17-20 中,我们通过对等待 trpl::select 的结果进行匹配来实现 timeout。
In Listing 17-20, we implement timeout by matching on the result of awaiting
trpl::select.
extern crate trpl; // required for mdbook test
use std::time::Duration;
use trpl::Either;
// --snip--
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
match trpl::select(future_to_try, trpl::sleep(max_time)).await {
Either::Left(output) => Ok(output),
Either::Right(_) => Err(max_time),
}
}
trpl::select 的实现是不公平的:它总是按照参数传入的顺序轮询它们(其他 select 实现会随机选择先轮询哪个参数)。因此,我们首先将 future_to_try 传递给 select,这样即使 max_time 是非常短的持续时间,它也有机会完成。如果 future_to_try 最先完成,select 将返回带有来自 future_to_try 输出的 Left。如果 timer 最先完成,select 将返回带有计时器输出 () 的 Right。
The implementation of trpl::select is not fair: it always polls arguments in
the order in which they are passed (other select implementations will
randomly choose which argument to poll first). Thus, we pass future_to_try to
select first so it gets a chance to complete even if max_time is a very
short duration. If future_to_try finishes first, select will resolve to Left
with the output from future_to_try. If timer finishes first, select will
resolve to Right with the timer’s output of ().
如果 future_to_try 成功并且我们得到了一个 Left(output),我们返回 Ok(output)。如果休眠计时器先到期且我们得到了一个 Right(()),我们就用 _ 忽略 (),转而返回 Err(max_time)。
If the future_to_try succeeds and we get a Left(output), we return
Ok(output). If the sleep timer elapses instead and we get a Right(()), we
ignore the () with _ and return Err(max_time) instead.
至此,我们有了一个由另外两个异步助手构建而成的、可以工作的 timeout。如果我们运行代码,它将在超时后打印出失败模式:
With that, we have a working timeout built out of two other async helpers. If
we run our code, it will print the failure mode after the timeout:
Failed after 2 seconds
因为 future 可以与其他 future 组合,所以你可以使用更小的异步构建块构建非常强大的工具。例如,你可以使用相同的方法将超时与重试结合起来,然后再将其与网络调用(如示例 17-5 中的那些)等操作结合使用。
Because futures compose with other futures, you can build really powerful tools using smaller async building blocks. For example, you can use this same approach to combine timeouts with retries, and in turn use those with operations such as network calls (such as those in Listing 17-5).
在实践中,你通常会直接使用 async 和 await,其次是使用 select 这种函数和 join! 这种宏来控制最外层 future 的执行方式。
In practice, you’ll usually work directly with async and await, and
secondarily with functions such as select and macros such as the join!
macro to control how the outermost futures are executed.
我们现在已经看到了几种同时处理多个 future 的方法。接下来,我们将通过 streams 看看如何随时间处理一系列的 future。
We’ve now seen a number of ways to work with multiple futures at the same time. Up next, we’ll look at how we can work with multiple futures in a sequence over time with streams.