x-i18n: generated_at: “2026-03-01T15:06:51Z” model: gemini-3-flash-preview provider: google-gemini-cli source_hash: ea94b0a16b87464290058c5c650d4e859d3d5d796af955742068a915ee97cd14 source_path: ch21-02-multithreaded.md workflow: 16
从单线程服务器到多线程服务器 (From a Single-Threaded to a Multithreaded Server)
目前,服务器会轮流处理每个请求,这意味着在第一个连接处理完毕之前,它不会处理第二个连接。如果服务器收到的请求越来越多,这种串行执行的效率会越来越低。如果服务器收到一个处理时间很长的请求,后续请求必须等待该长请求完成,即使新请求可以很快处理完毕。我们需要修复这个问题,但首先让我们看看实际存在的问题。
Right now, the server will process each request in turn, meaning it won’t process a second connection until the first connection is finished processing. If the server received more and more requests, this serial execution would be less and less optimal. If the server receives a request that takes a long time to process, subsequent requests will have to wait until the long request is finished, even if the new requests can be processed quickly. We’ll need to fix this, but first we’ll look at the problem in action.
模拟慢速请求 (Simulating a Slow Request)
我们将观察慢速处理请求如何影响对我们当前服务器实现的其他请求。示例 21-10 实现了对 /sleep 请求的处理,并带有一个模拟的慢速响应,该响应会导致服务器在响应前休眠五秒。
We’ll look at how a slowly processing request can affect other requests made to our current server implementation. Listing 21-10 implements handling a request to /sleep with a simulated slow response that will cause the server to sleep for five seconds before responding.
#![allow(unused)]
fn main() {
{{#rustdoc_include ../listings/ch21-web-server/listing-21-10/src/main.rs:here}}
}
既然我们有了三种情况,我们将 if 切换为了 match 。我们需要显式地匹配 request_line 的切片来与字符串字面量值进行模式匹配; match 不会像相等方法那样进行自动引用和解引用。
We switched from if to match now that we have three cases. We need to
explicitly match on a slice of request_line to pattern-match against the
string literal values; match doesn’t do automatic referencing and
dereferencing, like the equality method does.
第一个分支与示例 21-9 中的 if 块相同。第二个分支匹配对 /sleep 的请求。收到该请求后,服务器将在渲染成功 HTML 页面之前休眠五秒。第三个分支与示例 21-9 中的 else 块相同。
The first arm is the same as the if block from Listing 21-9. The second arm
matches a request to /sleep. When that request is received, the server will
sleep for five seconds before rendering the successful HTML page. The third arm
is the same as the else block from Listing 21-9.
你可以看到我们的服务器是多么原始:真正的库会以更简洁的方式处理多个请求的识别!
You can see how primitive our server is: Real libraries would handle the recognition of multiple requests in a much less verbose way!
使用 cargo run 启动服务器。然后,开启两个浏览器窗口:一个访问 http://127.0.0.1:7878 ,另一个访问 http://127.0.0.1:7878/sleep 。如果你像之前一样多次输入 / URI,你会看到它响应很快。但如果你输入 /sleep 然后加载 / ,你会看到 / 一直等待,直到 sleep 休眠了整整五秒后才加载。
Start the server using cargo run. Then, open two browser windows: one for
http://127.0.0.1:7878 and the other for http://127.0.0.1:7878/sleep. If you
enter the / URI a few times, as before, you’ll see it respond quickly. But if
you enter /sleep and then load /, you’ll see that / waits until sleep
has slept for its full five seconds before loading.
有多种技术可以用来避免请求积压在慢速请求之后,包括像我们在第 17 章中那样使用异步;我们要实现的一种技术是线程池 (thread pool)。
There are multiple techniques we could use to avoid requests backing up behind a slow request, including using async as we did Chapter 17; the one we’ll implement is a thread pool.
使用线程池提高吞吐量 (Improving Throughput with a Thread Pool)
“线程池 (thread pool)”是一组已生成且正在等待处理任务的线程。当程序收到新任务时,它会将池中的一个线程分配给该任务,由该线程处理任务。池中剩余的线程可用于处理在第一个线程处理期间进来的任何其他任务。当第一个线程处理完任务后,它会返回到空闲线程池中,准备处理新任务。线程池允许你并发处理连接,从而提高服务器的吞吐量。
A thread pool is a group of spawned threads that are ready and waiting to handle a task. When the program receives a new task, it assigns one of the threads in the pool to the task, and that thread will process the task. The remaining threads in the pool are available to handle any other tasks that come in while the first thread is processing. When the first thread is done processing its task, it’s returned to the pool of idle threads, ready to handle a new task. A thread pool allows you to process connections concurrently, increasing the throughput of your server.
我们将把池中的线程数量限制在一个较小的数字,以保护我们免受 DoS 攻击;如果我们的程序为每个进来的请求都创建一个新线程,那么有人对我们的服务器发起 1000 万个请求,就可能会耗尽我们服务器的所有资源并导致请求处理陷入停滞,从而造成严重破坏。
We’ll limit the number of threads in the pool to a small number to protect us from DoS attacks; if we had our program create a new thread for each request as it came in, someone making 10 million requests to our server could wreak havoc by using up all our server’s resources and grinding the processing of requests to a halt.
因此,我们将让固定数量的线程在池中等待,而不是生成无限量的线程。进来的请求被发送到池中进行处理。池将维护一个进来的请求队列。池中的每个线程将从这个队列中弹出一个请求,处理该请求,然后再向队列索要另一个请求。有了这种设计,我们可以并发处理多达 N 个请求,其中 N 是线程数量。如果每个线程都在响应一个长时间运行的请求,后续请求仍然可以在队列中积压,但我们在达到那一点之前增加了可以处理的长时间运行请求的数量。
Rather than spawning unlimited threads, then, we’ll have a fixed number of
threads waiting in the pool. Requests that come in are sent to the pool for
processing. The pool will maintain a queue of incoming requests. Each of the
threads in the pool will pop off a request from this queue, handle the request,
and then ask the queue for another request. With this design, we can process up
to N requests concurrently, where N is the number of threads. If each
thread is responding to a long-running request, subsequent requests can still
back up in the queue, but we’ve increased the number of long-running requests
we can handle before reaching that point.
这种技术只是提高 Web 服务器吞吐量的众多方法之一。你可能还会探索其他选项,如 fork/join 模型、单线程异步 I/O 模型和多线程异步 I/O 模型。如果你对这个话题感兴趣,可以阅读更多关于其他解决方案的信息并尝试实现它们;使用 Rust 这样的低级语言,所有这些选项都是可能的。
This technique is just one of many ways to improve the throughput of a web server. Other options you might explore are the fork/join model, the single-threaded async I/O model, and the multithreaded async I/O model. If you’re interested in this topic, you can read more about other solutions and try to implement them; with a low-level language like Rust, all of these options are possible.
在开始实现线程池之前,让我们谈谈使用池应该是什么样子的。当你尝试设计代码时,先编写客户端接口可以帮助指导你的设计。以你想要调用代码的方式来构建代码的 API;然后在该结构内实现功能,而不是先实现功能然后再设计公共 API。
Before we begin implementing a thread pool, let’s talk about what using the pool should look like. When you’re trying to design code, writing the client interface first can help guide your design. Write the API of the code so that it’s structured in the way you want to call it; then, implement the functionality within that structure rather than implementing the functionality and then designing the public API.
类似于我们在第 12 章的项目中使用测试驱动开发的方式,我们在这里将使用“编译器驱动开发”。我们将编写调用所需函数的代码,然后根据编译器的错误来确定我们接下来应该更改什么以使代码正常工作。然而,在这样做之前,我们将探索一下不打算作为起点的技术。
Similar to how we used test-driven development in the project in Chapter 12, we’ll use compiler-driven development here. We’ll write the code that calls the functions we want, and then we’ll look at errors from the compiler to determine what we should change next to get the code to work. Before we do that, however, we’ll explore the technique we’re not going to use as a starting point.
为每个请求生成一个线程 (Spawning a Thread for Each Request)
首先,让我们探索一下如果代码确实为每个连接都创建一个新线程,它看起来会是什么样子。如前所述,由于潜在地生成无限数量线程的问题,这并不是我们的最终计划,但它是首先获得一个可运行的多线程服务器的起点。然后,我们将添加线程池作为改进,这样对比这两个解决方案会更容易。
First, let’s explore how our code might look if it did create a new thread for every connection. As mentioned earlier, this isn’t our final plan due to the problems with potentially spawning an unlimited number of threads, but it is a starting point to get a working multithreaded server first. Then, we’ll add the thread pool as an improvement, and contrasting the two solutions will be easier.
示例 21-11 显示了对 main 进行的更改,以便在 for 循环内生成一个新线程来处理每个流。
#![allow(unused)]
fn main() {
{{#rustdoc_include ../listings/ch21-web-server/listing-21-11/src/main.rs:here}}
}
正如你在第 16 章学到的, thread::spawn 将创建一个新线程,然后在该新线程中运行闭包中的代码。如果你运行这段代码并在浏览器中加载 /sleep ,然后在另外两个浏览器标签页中加载 / ,你确实会看到对 / 的请求不必等待 /sleep 完成。然而,正如我们提到的,这最终会使系统崩溃,因为你会无限制地创建新线程。
As you learned in Chapter 16, thread::spawn will create a new thread and then
run the code in the closure in the new thread. If you run this code and load
/sleep in your browser, then / in two more browser tabs, you’ll indeed see
that the requests to / don’t have to wait for /sleep to finish. However, as
we mentioned, this will eventually overwhelm the system because you’d be making
new threads without any limit.
你可能还记得第 17 章提到过,这正是 async 和 await 大显身手的情况!在我们构建线程池时请记住这一点,并思考使用异步时情况会有什么不同或相同。
You may also recall from Chapter 17 that this is exactly the kind of situation where async and await really shine! Keep that in mind as we build the thread pool and think about how things would look different or the same with async.
创建有限数量的线程 (Creating a Finite Number of Threads)
我们希望我们的线程池以类似的、熟悉的模式工作,这样从线程切换到线程池就不需要对使用我们 API 的代码进行大幅改动。示例 21-12 展示了我们要用来代替 thread::spawn 的 ThreadPool 结构体的假设接口。
We want our thread pool to work in a similar, familiar way so that switching
from threads to a thread pool doesn’t require large changes to the code that
uses our API. Listing 21-12 shows the hypothetical interface for a ThreadPool
struct we want to use instead of thread::spawn.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-12/src/main.rs:here}}
我们使用 ThreadPool::new 来创建一个具有可配置线程数量(在此例中为四个)的新线程池。然后,在 for 循环中, pool.execute 具有与 thread::spawn 类似的接口,它接收一个池应该为每个流运行的闭包。我们需要实现 pool.execute ,使其接收闭包并将其交给池中的一个线程运行。这段代码目前还无法编译,但我们将尝试这样做,以便编译器能指导我们如何修复它。
We use ThreadPool::new to create a new thread pool with a configurable number
of threads, in this case four. Then, in the for loop, pool.execute has a
similar interface as thread::spawn in that it takes a closure that the pool
should run for each stream. We need to implement pool.execute so that it
takes the closure and gives it to a thread in the pool to run. This code won’t
yet compile, but we’ll try so that the compiler can guide us in how to fix it.
使用编译器驱动开发构建 ThreadPool (Building ThreadPool Using Compiler-Driven Development)
对 src/main.rs 进行示例 21-12 中的更改,然后让我们使用来自 cargo check 的编译器错误来驱动我们的开发。这是我们得到的第一个错误:
Make the changes in Listing 21-12 to src/main.rs, and then let’s use the
compiler errors from cargo check to drive our development. Here is the first
error we get:
{{#include ../listings/ch21-web-server/listing-21-12/output.txt}}
太好了!这个错误告诉我们我们需要一个 ThreadPool 类型或模块,所以我们现在构建一个。我们的 ThreadPool 实现将独立于 Web 服务器正在执行的工作。因此,让我们将 hello crate 从二进制 crate 切换为库 crate,以持有我们的 ThreadPool 实现。切换到库 crate 后,我们也可以将独立的线程池库用于任何我们想使用线程池完成的工作,而不仅仅是用于服务 Web 请求。
Great! This error tells us we need a ThreadPool type or module, so we’ll
build one now. Our ThreadPool implementation will be independent of the kind
of work our web server is doing. So, let’s switch the hello crate from a
binary crate to a library crate to hold our ThreadPool implementation. After
we change to a library crate, we could also use the separate thread pool
library for any work we want to do using a thread pool, not just for serving
web requests.
创建一个包含以下内容的 src/lib.rs 文件,这是我们目前能拥有的 ThreadPool 结构体的最简单定义:
Create a src/lib.rs file that contains the following, which is the simplest
definition of a ThreadPool struct that we can have for now:
{{#rustdoc_include ../listings/ch21-web-server/no-listing-01-define-threadpool-struct/src/lib.rs}}
然后,编辑 main.rs 文件,通过在 src/main.rs 顶部添加以下代码,将 ThreadPool 从库 crate 引入作用域:
Then, edit the main.rs file to bring ThreadPool into scope from the library
crate by adding the following code to the top of src/main.rs:
{{#rustdoc_include ../listings/ch21-web-server/no-listing-01-define-threadpool-struct/src/main.rs:here}}
这段代码仍然无法运行,但让我们再次检查以获得我们需要解决的下一个错误:
This code still won’t work, but let’s check it again to get the next error that we need to address:
{{#include ../listings/ch21-web-server/no-listing-01-define-threadpool-struct/output.txt}}
这个错误表明接下来我们需要为 ThreadPool 创建一个名为 new 的关联函数。我们也知道 new 需要有一个可以接收 4 作为参数的参数,并应返回一个 ThreadPool 实例。让我们实现具有这些特征的最简单的 new 函数:
This error indicates that next we need to create an associated function named
new for ThreadPool. We also know that new needs to have one parameter
that can accept 4 as an argument and should return a ThreadPool instance.
Let’s implement the simplest new function that will have those
characteristics:
{{#rustdoc_include ../listings/ch21-web-server/no-listing-02-impl-threadpool-new/src/lib.rs}}
我们选择 usize 作为 size 参数的类型,因为我们知道负数的线程数量没有任何意义。我们也知道我们将使用这个 4 作为线程集合中元素的数量,而 usize 类型正是为此设计的,正如在第 3 章“整数类型”一节中所讨论的那样。
We chose usize as the type of the size parameter because we know that a
negative number of threads doesn’t make any sense. We also know we’ll use this
4 as the number of elements in a collection of threads, which is what the
usize type is for, as discussed in the “Integer Types” section in Chapter 3.
让我们再次检查代码:
Let’s check the code again:
{{#include ../listings/ch21-web-server/no-listing-02-impl-threadpool-new/output.txt}}
现在的错误发生是因为我们在 ThreadPool 上没有 execute 方法。回想“创建有限数量的线程”一节,我们决定我们的线程池应该具有类似于 thread::spawn 的接口。此外,我们将实现 execute 函数,使其接收给予它的闭包,并将其交给池中的一个空闲线程来运行。
Now the error occurs because we don’t have an execute method on ThreadPool.
Recall from the “Creating a Finite Number of
Threads” section that we
decided our thread pool should have an interface similar to thread::spawn. In
addition, we’ll implement the execute function so that it takes the closure
it’s given and gives it to an idle thread in the pool to run.
我们将定义 ThreadPool 上的 execute 方法以接收一个闭包作为参数。回想第 13 章中的“将捕获的值移出闭包”一节,我们可以接收具有三种不同特征的闭包作为参数: Fn 、 FnMut 和 FnOnce 。我们需要决定在这里使用哪种闭包。我们知道最终会执行类似于标准库 thread::spawn 实现的操作,所以我们可以查看 thread::spawn 的签名对其参数有哪些约束。文档向我们展示了以下内容:
We’ll define the execute method on ThreadPool to take a closure as a
parameter. Recall from the “Moving Captured Values Out of
Closures” in Chapter 13 that we can
take closures as parameters with three different traits: Fn, FnMut, and
FnOnce. We need to decide which kind of closure to use here. We know we’ll
end up doing something similar to the standard library thread::spawn
implementation, so we can look at what bounds the signature of thread::spawn
has on its parameter. The documentation shows us the following:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
F 类型参数是我们在这里关心的; T 类型参数与返回值有关,我们并不关心那个。我们可以看到 spawn 使用了 FnOnce 作为 F 的特征约束。这可能也是我们想要的,因为我们最终会将获得的实参在 execute 中传递给 spawn 。我们可以进一步确信 FnOnce 是我们要使用的特征,因为运行请求的线程只会执行该请求的闭包一次,这与 FnOnce 中的 Once 相匹配。
The F type parameter is the one we’re concerned with here; the T type
parameter is related to the return value, and we’re not concerned with that. We
can see that spawn uses FnOnce as the trait bound on F. This is probably
what we want as well, because we’ll eventually pass the argument we get in
execute to spawn. We can be further confident that FnOnce is the trait we
want to use because the thread for running a request will only execute that
request’s closure one time, which matches the Once in FnOnce.
F 类型参数还具有 Send 特征约束和 'static 生命周期约束,这在我们的情况下很有用:我们需要 Send 来将闭包从一个线程转移到另一个线程,需要 'static 是因为我们不知道线程执行需要多长时间。让我们在 ThreadPool 上创建一个 execute 方法,它将接收一个具有这些约束的类型为 F 的泛型参数:
The F type parameter also has the trait bound Send and the lifetime bound
'static, which are useful in our situation: We need Send to transfer the
closure from one thread to another and 'static because we don’t know how long
the thread will take to execute. Let’s create an execute method on
ThreadPool that will take a generic parameter of type F with these bounds:
{{#rustdoc_include ../listings/ch21-web-server/no-listing-03-define-execute/src/lib.rs:here}}
我们仍然在 FnOnce 之后使用 () ,因为这个 FnOnce 代表一个不接收任何参数并返回单元类型 () 的闭包。就像函数定义一样,返回类型可以从签名中省略,但即使我们没有参数,我们仍然需要圆括号。
We still use the () after FnOnce because this FnOnce represents a closure
that takes no parameters and returns the unit type (). Just like function
definitions, the return type can be omitted from the signature, but even if we
have no parameters, we still need the parentheses.
同样,这是 execute 方法最简单的实现:它什么也不做,但我们只是想让我们的代码通过编译。让我们再次检查它:
Again, this is the simplest implementation of the execute method: It does
nothing, but we’re only trying to make our code compile. Let’s check it again:
{{#include ../listings/ch21-web-server/no-listing-03-define-execute/output.txt}}
它通过编译了!但请注意,如果你尝试 cargo run 并在浏览器中发起请求,你将看到我们在本章开头看到的浏览器错误。我们的库实际上还没有调用传递给 execute 的闭包!
It compiles! But note that if you try cargo run and make a request in the
browser, you’ll see the errors in the browser that we saw at the beginning of
the chapter. Our library isn’t actually calling the closure passed to execute
yet!
注意:关于像 Haskell 和 Rust 这样具有严格编译器的语言,有一句谚语是:“如果代码能编译,它就能工作。”但这句话并非普遍适用。我们的项目通过了编译,但它绝对什么也没做!如果我们正在构建一个真正的、完整的项目,现在将是开始编写单元测试的好时机,以检查代码在通过编译的“同时”是否具有我们想要的行为。
Note: A saying you might hear about languages with strict compilers, such as Haskell and Rust, is “If the code compiles, it works.” But this saying is not universally true. Our project compiles, but it does absolutely nothing! If we were building a real, complete project, this would be a good time to start writing unit tests to check that the code compiles and has the behavior we want.
思考一下:如果我们打算执行的是一个 future 而不是闭包,这里会有什么不同?
Consider: What would be different here if we were going to execute a future instead of a closure?
在 new 中验证线程数量 (Validating the Number of Threads in new)
我们没有对 new 和 execute 的参数做任何操作。让我们通过我们想要的行为来实现这些函数的主体。首先,让我们考虑一下 new 。早些时候我们为 size 参数选择了无符号类型,因为负数的线程池没有任何意义。然而,零个线程的池也没有任何意义,但零是一个完全有效的 usize 。我们将在返回 ThreadPool 实例之前添加代码来检查 size 是否大于零,并让程序在接收到零时使用 assert! 宏引发恐慌,如示例 21-13 所示。
We aren’t doing anything with the parameters to new and execute. Let’s
implement the bodies of these functions with the behavior we want. To start,
let’s think about new. Earlier we chose an unsigned type for the size
parameter because a pool with a negative number of threads makes no sense.
However, a pool with zero threads also makes no sense, yet zero is a perfectly
valid usize. We’ll add code to check that size is greater than zero before
we return a ThreadPool instance, and we’ll have the program panic if it
receives a zero by using the assert! macro, as shown in Listing 21-13.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-13/src/lib.rs:here}}
我们还使用文档注释为我们的 ThreadPool 添加了一些文档。请注意,我们遵循了良好的文档实践,添加了一个部分来说明我们的函数可能发生恐慌的情况,正如第 14 章中所讨论的那样。尝试运行 cargo doc --open 并点击 ThreadPool 结构体,看看生成的 new 文档是什么样子的!
We’ve also added some documentation for our ThreadPool with doc comments.
Note that we followed good documentation practices by adding a section that
calls out the situations in which our function can panic, as discussed in
Chapter 14. Try running cargo doc --open and clicking the ThreadPool struct
to see what the generated docs for new look like!
与其像我们在这里所做的那样添加 assert! 宏,我们可以将 new 更改为 build 并返回一个 Result ,就像我们在示例 12-9 的 I/O 项目中对 Config::build 所做的那样。但在这种情况下,我们决定尝试创建一个没有任何线程的线程池应该是一个不可恢复的错误。如果你雄心勃勃,可以尝试编写一个名为 build 的具有以下签名的函数,并与 new 函数进行比较:
Instead of adding the assert! macro as we’ve done here, we could change new
into build and return a Result like we did with Config::build in the I/O
project in Listing 12-9. But we’ve decided in this case that trying to create a
thread pool without any threads should be an unrecoverable error. If you’re
feeling ambitious, try to write a function named build with the following
signature to compare with the new function:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
创建用于存储线程的空间 (Creating Space to Store the Threads)
既然我们有办法知道我们拥有存储在池中的有效线程数量,我们就可以创建这些线程并在返回结构体之前将其存储在 ThreadPool 结构体中。但我们如何“存储”一个线程呢?让我们再看一下 thread::spawn 的签名:
Now that we have a way to know we have a valid number of threads to store in
the pool, we can create those threads and store them in the ThreadPool struct
before returning the struct. But how do we “store” a thread? Let’s take another
look at the thread::spawn signature:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn 函数返回一个 JoinHandle<T> ,其中 T 是闭包返回的类型。让我们也尝试使用 JoinHandle 看看会发生什么。在我们的例子中,我们要传递给线程池的闭包将处理连接且不返回任何内容,因此 T 将是单元类型 () 。
The spawn function returns a JoinHandle<T>, where T is the type that the
closure returns. Let’s try using JoinHandle too and see what happens. In our
case, the closures we’re passing to the thread pool will handle the connection
and not return anything, so T will be the unit type ().
示例 21-14 中的代码可以编译,但它目前还没有创建任何线程。我们已经更改了 ThreadPool 的定义以持有一个 thread::JoinHandle<()> 实例向量,使用 size 容量初始化了该向量,设置了一个将运行一些代码来创建线程的 for 循环,并返回了一个包含它们的 ThreadPool 实例。
The code in Listing 21-14 will compile, but it doesn’t create any threads yet.
We’ve changed the definition of ThreadPool to hold a vector of
thread::JoinHandle<()> instances, initialized the vector with a capacity of
size, set up a for loop that will run some code to create the threads, and
returned a ThreadPool instance containing them.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-14/src/lib.rs:here}}
我们在库 crate 中引入了 std::thread ,因为我们正在使用 thread::JoinHandle 作为 ThreadPool 向量中项的类型。
We’ve brought std::thread into scope in the library crate because we’re
using thread::JoinHandle as the type of the items in the vector in
ThreadPool.
一旦接收到有效的 size,我们的 ThreadPool 就会创建一个可以持有 size 项的新向量。 with_capacity 函数执行与 Vec::new 相同的任务,但有一个重要的区别:它预先分配了向量中的空间。因为我们知道我们需要在向量中存储 size 个元素,所以预先进行这种分配比使用 Vec::new 稍微高效一些,后者会在元素插入时调整自身大小。
Once a valid size is received, our ThreadPool creates a new vector that can
hold size items. The with_capacity function performs the same task as
Vec::new but with an important difference: It pre-allocates space in the
vector. Because we know we need to store size elements in the vector, doing
this allocation up front is slightly more efficient than using Vec::new,
which resizes itself as elements are inserted.
当你再次运行 cargo check 时,它应该会成功。
When you run cargo check again, it should succeed.
将代码从 ThreadPool 发送到线程 (Sending Code from the ThreadPool to a Thread)
Sending Code from the ThreadPool to a Thread
我们在示例 21-14 的 for 循环中留下了关于创建线程的注释。在这里,我们将看看我们如何真正创建线程。标准库提供了 thread::spawn 作为创建线程的一种方式,并且 thread::spawn 期望获得线程在创建后应立即运行的一些代码。然而,在我们的例子中,我们想要创建线程并让它们“等待”我们稍后将发送的代码。标准库的线程实现不包含任何执行此操作的方法;我们必须手动实现。
We left a comment in the for loop in Listing 21-14 regarding the creation of
threads. Here, we’ll look at how we actually create threads. The standard
library provides thread::spawn as a way to create threads, and
thread::spawn expects to get some code the thread should run as soon as the
thread is created. However, in our case, we want to create the threads and have
them wait for code that we’ll send later. The standard library’s
implementation of threads doesn’t include any way to do that; we have to
implement it manually.
我们将通过在 ThreadPool 和线程之间引入一个新的数据结构来管理这种新行为,以此来实现它。我们将这个数据结构称为 Worker(工人),这是池实现中的一个常用术语。 Worker 拾取需要运行的代码并在其线程中运行该代码。
We’ll implement this behavior by introducing a new data structure between the
ThreadPool and the threads that will manage this new behavior. We’ll call
this data structure Worker, which is a common term in pooling
implementations. The Worker picks up code that needs to be run and runs the
code in its thread.
想象一下在餐厅厨房工作的人:工人等待来自顾客的订单,然后他们负责接单并完成它们。
Think of people working in the kitchen at a restaurant: The workers wait until orders come in from customers, and then they’re responsible for taking those orders and filling them.
我们不再在线程池中存储 JoinHandle<()> 实例向量,而是存储 Worker 结构体实例。每个 Worker 将存储单个 JoinHandle<()> 实例。然后,我们将在 Worker 上实现一个方法,该方法将接收一个要运行的闭包代码,并将其发送到已经在运行的线程中执行。我们还将给每个 Worker 一个 id ,以便在记录日志或调试时我们可以区分池中的不同 Worker 实例。
Instead of storing a vector of JoinHandle<()> instances in the thread pool,
we’ll store instances of the Worker struct. Each Worker will store a single
JoinHandle<()> instance. Then, we’ll implement a method on Worker that will
take a closure of code to run and send it to the already running thread for
execution. We’ll also give each Worker an id so that we can distinguish
between the different instances of Worker in the pool when logging or
debugging.
以下是我们在创建 ThreadPool 时将发生的新过程。在按这种方式设置好 Worker 后,我们将实现将闭包发送到线程的代码:
- 定义一个持有
id和JoinHandle<()>的Worker结构体。 - 更改
ThreadPool以持有一个Worker实例向量。 - 定义一个
Worker::new函数,该函数接收一个id数字并返回一个持有该id以及使用空闭包生成的线程的Worker实例。 - 在
ThreadPool::new中,使用for循环计数器生成一个id,创建一个具有该id的新Worker,并将该Worker存储在向量中。
Here is the new process that will happen when we create a ThreadPool. We’ll
implement the code that sends the closure to the thread after we have Worker
set up in this way:
- Define a
Workerstruct that holds anidand aJoinHandle<()>. - Change
ThreadPoolto hold a vector ofWorkerinstances. - Define a
Worker::newfunction that takes anidnumber and returns aWorkerinstance that holds theidand a thread spawned with an empty closure. - In
ThreadPool::new, use theforloop counter to generate anid, create a newWorkerwith thatid, and store theWorkerin the vector.
如果你准备好迎接挑战,请在查看示例 21-15 中的代码之前尝试自己实现这些更改。
If you’re up for a challenge, try implementing these changes on your own before looking at the code in Listing 21-15.
准备好了吗?这里是示例 21-15,提供了一种进行上述修改的方法。
Ready? Here is Listing 21-15 with one way to make the preceding modifications.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-15/src/lib.rs:here}}
我们将 ThreadPool 上的字段名称从 threads 更改为了 workers ,因为它现在持有的是 Worker 实例而不是 JoinHandle<()> 实例。我们将 for 循环中的计数器作为 Worker::new 的参数,并将每个新 Worker 存储在名为 workers 的向量中。
We’ve changed the name of the field on ThreadPool from threads to workers
because it’s now holding Worker instances instead of JoinHandle<()>
instances. We use the counter in the for loop as an argument to
Worker::new, and we store each new Worker in the vector named workers.
外部代码(比如 src/main.rs 中的服务器)不需要知道关于在 ThreadPool 内部使用 Worker 结构体的实现细节,所以我们将 Worker 结构体及其 new 函数设为私有。 Worker::new 函数使用我们给出的 id ,并存储一个通过使用空闭包产生新线程创建的 JoinHandle<()> 实例。
External code (like our server in src/main.rs) doesn’t need to know the implementation details regarding using a Worker struct within ThreadPool, so we make the Worker struct and its new function private. The Worker::new function uses the id we give it and stores a JoinHandle<()> instance that is created by spawning a new thread using an empty closure.
注意:如果由于系统资源不足而无法创建线程,
thread::spawn将引发恐慌。即使创建某些线程可能成功,这也会导致我们的整个服务器发生恐慌。为了简单起见,这种行为是可以接受的,但在生产级线程池实现中,你可能会想使用std::thread::Builder及其返回Result的spawn方法。
Note: If the operating system can’t create a thread because there aren’t enough system resources, thread::spawn will panic. That will cause our whole server to panic, even though the creation of some threads might succeed. For simplicity’s sake, this behavior is fine, but in a production thread pool implementation, you’d likely want to use
std::thread::Builderand itsspawnmethod that returns Result instead.
这段代码可以编译,并将存储我们在 ThreadPool::new 参数中指定的 Worker 实例数量。但我们“仍然”没有处理我们在 execute 中获得的闭包。让我们接下来看看如何做到这一点。
This code will compile and will store the number of Worker instances we
specified as an argument to ThreadPool::new. But we’re still not processing
the closure that we get in execute. Let’s look at how to do that next.
通过通道向线程发送请求 (Sending Requests to Threads via Channels)
我们要解决的下一个问题是,给予 thread::spawn 的闭包绝对没做任何事情。目前,我们在 execute 方法中获得了我们想要执行的闭包。但我们需要在创建 ThreadPool 期间创建每个 Worker 时,给 thread::spawn 一个要运行的闭包。
The next problem we’ll tackle is that the closures given to thread::spawn do
absolutely nothing. Currently, we get the closure we want to execute in the
execute method. But we need to give thread::spawn a closure to run when we
create each Worker during the creation of the ThreadPool.
我们希望我们刚刚创建的 Worker 结构体能从 ThreadPool 持有的队列中获取要运行的代码,并将该代码发送到其线程中运行。
We want the Worker structs that we just created to fetch the code to run from
a queue held in the ThreadPool and send that code to its thread to run.
我们在第 16 章学到的通道——一种在两个线程之间通信的简单方式——对于这种用例来说是完美的。我们将使用通道作为工作队列, execute 将通过发送端从 ThreadPool 向 Worker 实例发送一个作业,后者再将该作业发送到其线程。计划如下:
ThreadPool将创建一个通道并持有发送端。- 每个
Worker将持有接收端。 - 我们将创建一个新的
Job结构体,它将持有我们想要通过通道发送的闭包。 execute方法将通过发送端发送它想要执行的作业。- 在其线程中,
Worker将在其接收端上循环,并执行其收到的任何作业的闭包。
The channels we learned about in Chapter 16—a simple way to communicate between
two threads—would be perfect for this use case. We’ll use a channel to function
as the queue of jobs, and execute will send a job from the ThreadPool to
the Worker instances, which will send the job to its thread. Here is the plan:
- The
ThreadPoolwill create a channel and hold on to the sender. - Each
Workerwill hold on to the receiver. - We’ll create a new
Jobstruct that will hold the closures we want to send down the channel. - The
executemethod will send the job it wants to execute through the sender. - In its thread, the
Workerwill loop over its receiver and execute the closures of any jobs it receives.
让我们先在 ThreadPool::new 中创建一个通道,并在 ThreadPool 实例中持有发送端,如示例 21-16 所示。 Job 结构体目前不持有任何内容,但它将是我们通过通道发送的项类型。
{{#rustdoc_include ../listings/ch21-web-server/listing-21-16/src/lib.rs:here}}
在 ThreadPool::new 中,我们创建了我们的新通道,并让池持有发送端。这将成功通过编译。
In ThreadPool::new, we create our new channel and have the pool hold the
sender. This will successfully compile.
让我们尝试在线程池创建通道时,将通道的接收端传递给每个 Worker 。我们知道我们想在 Worker 实例产生的线程中使用接收端,所以我们将引用闭包中的 receiver 参数。示例 21-17 中的代码还不能完全通过编译。
Let’s try passing a receiver of the channel into each Worker as the thread
pool creates the channel. We know we want to use the receiver in the thread that
the Worker instances spawn, so we’ll reference the receiver parameter in the
closure. The code in Listing 21-17 won’t quite compile yet.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-17/src/lib.rs:here}}
我们做了一些小而直观的更改:我们将接收端传入 Worker::new ,然后我们在闭包内部使用它。
We’ve made some small and straightforward changes: We pass the receiver into
Worker::new, and then we use it inside the closure.
当我们尝试检查这段代码时,我们得到了这个错误:
When we try to check this code, we get this error:
{{#include ../listings/ch21-web-server/listing-21-17/output.txt}}
代码尝试将 receiver 传递给多个 Worker 实例。这将行不通,正如你从第 16 章所能回想起来的:Rust 提供的通道实现是“多生产者,单消费者 (multiple producer, single consumer)”。这意味着我们不能仅仅通过克隆通道的消费端来修复这段代码。我们也不想多次向多个消费者发送一条消息;我们想要一个由多个 Worker 实例共享的消息列表,使得每条消息仅被处理一次。
The code is trying to pass receiver to multiple Worker instances. This
won’t work, as you’ll recall from Chapter 16: The channel implementation that
Rust provides is multiple producer, single consumer. This means we can’t
just clone the consuming end of the channel to fix this code. We also don’t
want to send a message multiple times to multiple consumers; we want one list
of messages with multiple Worker instances such that each message gets
processed once.
此外,从通道队列中取出一个作业涉及修改 receiver ,因此线程需要一种安全的方式来共享和修改 receiver ;否则,我们可能会遇到竞态条件(如第 16 章所述)。
Additionally, taking a job off the channel queue involves mutating the
receiver, so the threads need a safe way to share and modify receiver;
otherwise, we might get race conditions (as covered in Chapter 16).
回想一下第 16 章讨论过的线程安全智能指针:为了跨多个线程共享所有权并允许线程修改值,我们需要使用 Arc<Mutex<T>> 。 Arc 类型将允许多个 Worker 实例拥有接收端,而 Mutex 将确保同一时间只有一个 Worker 从接收端获得作业。示例 21-18 显示了我们需要做的更改。
Recall the thread-safe smart pointers discussed in Chapter 16: To share
ownership across multiple threads and allow the threads to mutate the value, we
need to use Arc<Mutex<T>>. The Arc type will let multiple Worker instances
own the receiver, and Mutex will ensure that only one Worker gets a job from
the receiver at a time. Listing 21-18 shows the changes we need to make.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-18/src/lib.rs:here}}
在 ThreadPool::new 中,我们将接收端放在 Arc 和 Mutex 中。对于每一个新的 Worker ,我们克隆 Arc 以增加引用计数,以便 Worker 实例可以共享接收端的所有权。
In ThreadPool::new, we put the receiver in an Arc and a Mutex. For each
new Worker, we clone the Arc to bump the reference count so that the
Worker instances can share ownership of the receiver.
做了这些更改后,代码可以通过编译了!我们快成功了!
With these changes, the code compiles! We’re getting there!
实现 execute 方法 (Implementing the execute Method)
Implementing the execute Method
让我们最终实现 ThreadPool 上的 execute 方法。我们还将把 Job 从结构体更改为一个特征对象的类型别名,该特征对象持有 execute 接收到的闭包类型。正如在第 20 章“类型同义词和类型别名”一节中所讨论的,类型别名允许我们将长类型缩短以方便使用。看示例 21-19。
Let’s finally implement the execute method on ThreadPool. We’ll also change
Job from a struct to a type alias for a trait object that holds the type of
closure that execute receives. As discussed in the “Type Synonyms and Type
Aliases” section in Chapter 20, type aliases
allow us to make long types shorter for ease of use. Look at Listing 21-19.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-19/src/lib.rs:here}}
在使用我们在 execute 中获得的闭包创建了一个新的 Job 实例后,我们将该作业发送到通道的发送端。我们为发送失败的情况在 send 上调用 unwrap 。这可能会发生,例如,如果我们停止了所有线程的执行,这意味着接收端已经停止接收新消息。目前,我们还不能停止线程的执行:只要池还存在,我们的线程就会继续执行。我们使用 unwrap 的原因是我们知道失败的情况不会发生,但编译器并不知道。
After creating a new Job instance using the closure we get in execute, we
send that job down the sending end of the channel. We’re calling unwrap on
send for the case that sending fails. This might happen if, for example, we
stop all our threads from executing, meaning the receiving end has stopped
receiving new messages. At the moment, we can’t stop our threads from
executing: Our threads continue executing as long as the pool exists. The
reason we use unwrap is that we know the failure case won’t happen, but the
compiler doesn’t know that.
但我们还没有完全完成!在 Worker 中,我们传递给 thread::spawn 的闭包仍然只是“引用”了通道的接收端。相反,我们需要闭包永远循环,向通道的接收端索要作业,并在获得作业时运行它。让我们对 Worker::new 做出示例 21-20 所示的更改。
But we’re not quite done yet! In the Worker, our closure being passed to
thread::spawn still only references the receiving end of the channel.
Instead, we need the closure to loop forever, asking the receiving end of the
channel for a job and running the job when it gets one. Let’s make the change
shown in Listing 21-20 to Worker::new.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-20/src/lib.rs:here}}
在这里,我们首先在 receiver 上调用 lock 以获取互斥锁,然后调用 unwrap 在发生任何错误时引发恐慌。如果互斥锁处于“中毒 (poisoned)”状态,获取锁可能会失败,当其他线程在持有锁而不是释放锁时发生恐慌,就可能发生这种情况。在这种情况下,调用 unwrap 让此线程也发生恐慌是正确的行动。你可以随意将这个 unwrap 更改为一个具有对你有意义错误消息的 expect 。
Here, we first call lock on the receiver to acquire the mutex, and then we
call unwrap to panic on any errors. Acquiring a lock might fail if the mutex
is in a poisoned state, which can happen if some other thread panicked while
holding the lock rather than releasing the lock. In this situation, calling
unwrap to have this thread panic is the correct action to take. Feel free to
change this unwrap to an expect with an error message that is meaningful to
you.
如果我们拿到了互斥锁,我们就调用 recv 从通道中接收一个 Job 。最后一次 unwrap 也会略过这里的任何错误,如果持有发送端的线程已经关闭,就可能会发生这类错误,这类似于如果接收端关闭, send 方法会返回 Err 。
If we get the lock on the mutex, we call recv to receive a Job from the
channel. A final unwrap moves past any errors here as well, which might occur
if the thread holding the sender has shut down, similar to how the send
method returns Err if the receiver shuts down.
对 recv 的调用是阻塞的,所以如果没有可用的作业,当前线程将一直等待,直到有一个作业变为可用。 Mutex<T> 确保了同一时间只有一个 Worker 线程在尝试请求作业。
The call to recv blocks, so if there is no job yet, the current thread will
wait until a job becomes available. The Mutex<T> ensures that only one
Worker thread at a time is trying to request a job.
我们的线程池现在处于工作状态了!给它一个 cargo run 并发起一些请求:
Our thread pool is now in a working state! Give it a cargo run and make some
requests:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功了!我们现在拥有一个可以异步执行连接的线程池。创建的线程永远不会超过四个,所以如果服务器收到大量请求,我们的系统就不会过载。如果我们对 /sleep 发起请求,服务器将能够通过让另一个线程运行其他请求来为它们提供服务。
Success! We now have a thread pool that executes connections asynchronously. There are never more than four threads created, so our system won’t get overloaded if the server receives a lot of requests. If we make a request to /sleep, the server will be able to serve other requests by having another thread run them.
注意:如果你同时在多个浏览器窗口中打开 /sleep ,它们可能会以五秒的间隔逐一加载。一些 Web 浏览器出于缓存原因会顺序执行同一个请求的多个实例。这种限制并不是由我们的 Web 服务器造成的。
Note: If you open /sleep in multiple browser windows simultaneously, they might load one at a time in five-second intervals. Some web browsers execute multiple instances of the same request sequentially for caching reasons. This limitation is not caused by our web server.
现在是一个停下来思考的好时机,看看如果我们将 future 而不是闭包用于待完成工作,示例 21-18、21-19 和 21-20 中的代码会有什么不同。哪些类型会改变?方法签名会有什么不同(如果有的话)?代码的哪些部分会保持不变?
This is a good time to pause and consider how the code in Listings 21-18, 21-19, and 21-20 would be different if we were using futures instead of a closure for the work to be done. What types would change? How would the method signatures be different, if at all? What parts of the code would stay the same?
在第 17 章和第 19 章学习了 while let 循环后,你可能会想知道为什么我们没有像示例 21-21 所示那样编写 Worker 线程代码。
After learning about the while let loop in Chapter 17 and Chapter 19, you
might be wondering why we didn’t write the Worker thread code as shown in
Listing 21-21.
{{#rustdoc_include ../listings/ch21-web-server/listing-21-21/src/lib.rs:here}}
这段代码虽然可以编译并运行,但并不会产生预期的多线程行为:一个慢速请求仍然会导致其他请求等待处理。原因有些微妙: Mutex 结构体没有公共的 unlock 方法,因为锁的所有权基于 lock 方法返回的 LockResult<MutexGuard<T>> 内部的 MutexGuard<T> 的生命周期。在编译时,借用检查器随后可以强制执行这样一条规则:除非我们持有锁,否则不能访问由 Mutex 保护的资源。然而,如果我们不注意 MutexGuard<T> 的生命周期,这种实现也可能导致锁被持有的时间超过预期。
This code compiles and runs but doesn’t result in the desired threading
behavior: A slow request will still cause other requests to wait to be
processed. The reason is somewhat subtle: The Mutex struct has no public
unlock method because the ownership of the lock is based on the lifetime of
the MutexGuard<T> within the LockResult<MutexGuard<T>> that the lock
method returns. At compile time, the borrow checker can then enforce the rule
that a resource guarded by a Mutex cannot be accessed unless we hold the
lock. However, this implementation can also result in the lock being held
longer than intended if we aren’t mindful of the lifetime of the
MutexGuard<T>.
示例 21-20 中使用 let job = receiver.lock().unwrap().recv().unwrap(); 的代码之所以有效,是因为对于 let ,等号右侧表达式中使用的任何临时值在 let 语句结束时都会被立即丢弃。然而, while let (以及 if let 和 match )直到关联代码块结束时才会丢弃临时值。在示例 21-21 中,锁在调用 job() 的整个持续时间内都保持被持有的状态,这意味着其他 Worker 实例无法接收作业。
The code in Listing 21-20 that uses let job = receiver.lock().unwrap().recv().unwrap(); works because with let, any
temporary values used in the expression on the right-hand side of the equal
sign are immediately dropped when the let statement ends. However, while let (and if let and match) does not drop temporary values until the end of
the associated block. In Listing 21-21, the lock remains held for the duration
of the call to job(), meaning other Worker instances cannot receive jobs.