使用消息传递在线程间传输数据
Transfer Data Between Threads with Message Passing
一种日益流行的确保安全并发的方法是“消息传递”(message passing),在这种方法中,线程或 actor 通过彼此发送包含数据的消息来进行通信。这一理念可以用 Go 语言文档中的一句口号来概括:“不要通过共享内存来通信;相反,通过通信来共享内存。”
One increasingly popular approach to ensuring safe concurrency is message passing, where threads or actors communicate by sending each other messages containing data. Here’s the idea in a slogan from the Go language documentation: “Do not communicate by sharing memory; instead, share memory by communicating.”
为了实现基于消息发送的并发,Rust 标准库提供了“通道”(channels)的实现。通道是一个通用的编程概念,通过它,数据可以从一个线程发送到另一个线程。
To accomplish message-sending concurrency, Rust’s standard library provides an implementation of channels. A channel is a general programming concept by which data is sent from one thread to another.
你可以把编程中的通道想象成一个有方向的水道,比如一条小溪或河流。如果你把一个橡皮鸭之类的东西放入河中,它会顺流而下到达水道的尽头。
You can imagine a channel in programming as being like a directional channel of water, such as a stream or a river. If you put something like a rubber duck into a river, it will travel downstream to the end of the waterway.
一个通道由两部分组成:发送者(transmitter)和接收者(receiver)。发送者位于上游,是你把橡皮鸭放入河流的地方;接收者则是橡皮鸭在下游最终到达的地方。你代码的一部分调用发送者的方法并传入你想发送的数据,另一部分代码则检查接收端是否有到来的消息。如果发送者或接收者中的任何一半被丢弃(dropped),通道就被认为是“关闭”(closed)了。
A channel has two halves: a transmitter and a receiver. The transmitter half is the upstream location where you put the rubber duck into the river, and the receiver half is where the rubber duck ends up downstream. One part of your code calls methods on the transmitter with the data you want to send, and another part checks the receiving end for arriving messages. A channel is said to be closed if either the transmitter or receiver half is dropped.
在这里,我们将逐步编写一个程序,它包含一个生成值并将其发送到通道的线程,以及另一个接收值并将其打印出来的线程。我们将通过通道在线程间发送简单的数据来展示这一功能。一旦你熟悉了这种技术,你就可以将通道用于任何需要相互通信的线程,例如聊天系统,或者多个线程执行计算的一部分并将其发送给一个汇总结果的线程。
Here, we’ll work up to a program that has one thread to generate values and send them down a channel, and another thread that will receive the values and print them out. We’ll be sending simple values between threads using a channel to illustrate the feature. Once you’re familiar with the technique, you could use channels for any threads that need to communicate with each other, such as a chat system or a system where many threads perform parts of a calculation and send the parts to one thread that aggregates the results.
首先,在示例 16-6 中,我们将创建一个通道但不做任何处理。请注意,这还不能编译,因为 Rust 无法推断我们想通过通道发送什么类型的值。
First, in Listing 16-6, we’ll create a channel but not do anything with it. Note that this won’t compile yet because Rust can’t tell what type of values we want to send over the channel.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
我们使用 mpsc::channel 函数创建一个新通道;mpsc 代表“多个生产者,单个消费者”(multiple producer, single consumer)。简而言之,Rust 标准库实现通道的方式意味着一个通道可以有多个生产值的“发送”端,但只能有一个消费这些值的“接收”端。想象一下多条小溪汇流成一条大河:从任何一条小溪发送的东西最终都会汇聚到尽头的那条河里。我们现在先从单个生产者开始,但在使这个示例运行起来后,我们会添加多个生产者。
We create a new channel using the mpsc::channel function; mpsc stands for
multiple producer, single consumer. In short, the way Rust’s standard library
implements channels means a channel can have multiple sending ends that
produce values but only one receiving end that consumes those values. Imagine
multiple streams flowing together into one big river: Everything sent down any
of the streams will end up in one river at the end. We’ll start with a single
producer for now, but we’ll add multiple producers when we get this example
working.
mpsc::channel 函数返回一个元组,其第一个元素是发送端——发送者,第二个元素是接收端——接收者。在许多领域中,缩写 tx 和 rx 传统上分别代表“发送者”(transmitter)和“接收者”(receiver),因此我们将变量命名为这些缩写以指示每一端。我们使用带有模式的 let 语句来解构元组;我们将在第 19 章讨论 let 语句中模式的使用和解构。现在,只需知道以这种方式使用 let 语句是提取 mpsc::channel 返回的元组各个部分的便捷方法。
The mpsc::channel function returns a tuple, the first element of which is the
sending end—the transmitter—and the second element of which is the receiving
end—the receiver. The abbreviations tx and rx are traditionally used in
many fields for transmitter and receiver, respectively, so we name our
variables as such to indicate each end. We’re using a let statement with a
pattern that destructures the tuples; we’ll discuss the use of patterns in
let statements and destructuring in Chapter 19. For now, know that using a
let statement in this way is a convenient approach to extract the pieces of
the tuple returned by mpsc::channel.
让我们将发送端移入一个派生线程,并让它发送一个字符串,以便派生线程与主线程进行通信,如示例 16-7 所示。这就像是把橡皮鸭放入上游的河流中,或者是从一个线程发送聊天消息到另一个线程。
Let’s move the transmitting end into a spawned thread and have it send one string so that the spawned thread is communicating with the main thread, as shown in Listing 16-7. This is like putting a rubber duck in the river upstream or sending a chat message from one thread to another.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
同样,我们使用 thread::spawn 来创建一个新线程,然后使用 move 将 tx 移入闭包中,以便派生线程拥有 tx。派生线程需要拥有发送者才能通过通道发送消息。
Again, we’re using thread::spawn to create a new thread and then using move
to move tx into the closure so that the spawned thread owns tx. The spawned
thread needs to own the transmitter to be able to send messages through the
channel.
发送者有一个 send 方法,它接收我们想要发送的值。send 方法返回一个 Result<T, E> 类型,因此如果接收者已经被丢弃,且没有地方可以发送值,发送操作将返回一个错误。在这个示例中,我们调用 unwrap 以在发生错误时触发 panic。但在实际应用中,我们会妥善处理它:请回到第 9 章复习正确错误处理的策略。
The transmitter has a send method that takes the value we want to send. The
send method returns a Result<T, E> type, so if the receiver has already
been dropped and there’s nowhere to send a value, the send operation will
return an error. In this example, we’re calling unwrap to panic in case of an
error. But in a real application, we would handle it properly: Return to
Chapter 9 to review strategies for proper error handling.
在示例 16-8 中,我们将在主线程中从接收者那里获取值。这就像从河流尽头的水中捞回橡皮鸭,或者接收到一条聊天消息。
In Listing 16-8, we’ll get the value from the receiver in the main thread. This is like retrieving the rubber duck from the water at the end of the river or receiving a chat message.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
接收者有两个有用的方法:recv 和 try_recv。我们使用的是 recv,它是“接收”(receive)的缩写,它会阻塞主线程的执行,直到有值被发送到通道中。一旦有值发送过来,recv 会将其封装在 Result<T, E> 中返回。当发送者关闭时,recv 会返回一个错误,以此信号告知不会再有更多的值传过来。
The receiver has two useful methods: recv and try_recv. We’re using recv,
short for receive, which will block the main thread’s execution and wait
until a value is sent down the channel. Once a value is sent, recv will
return it in a Result<T, E>. When the transmitter closes, recv will return
an error to signal that no more values will be coming.
try_recv 方法不会阻塞,而是立即返回一个 Result<T, E>:如果有一个可用的消息,则返回包含该消息的 Ok 值;如果此时没有任何消息,则返回 Err 值。如果该线程在等待消息的同时还有其他工作要做,使用 try_recv 很有用:我们可以编写一个循环,每隔一段时间调用一次 try_recv,如果有可用消息就处理它,否则就先做一小会儿其他工作,然后再次检查。
The try_recv method doesn’t block, but will instead return a Result<T, E>
immediately: an Ok value holding a message if one is available and an Err
value if there aren’t any messages this time. Using try_recv is useful if
this thread has other work to do while waiting for messages: We could write a
loop that calls try_recv every so often, handles a message if one is
available, and otherwise does other work for a little while until checking
again.
为了简单起见,我们在本例中使用了 recv;除了等待消息,主线程没有其他工作要做,所以阻塞主线程是合适的。
We’ve used recv in this example for simplicity; we don’t have any other work
for the main thread to do other than wait for messages, so blocking the main
thread is appropriate.
当我们运行示例 16-8 中的代码时,我们将看到主线程打印出的值:
When we run the code in Listing 16-8, we’ll see the value printed from the main thread:
Got: hi
完美!
Perfect!
通过通道转移所有权
Transferring Ownership Through Channels
所有权规则在消息发送中起着至关重要的作用,因为它们能帮助你编写安全的并发代码。在整个 Rust 程序中考虑所有权的好处在于可以防止并发编程中的错误。让我们做一个实验,看看通道和所有权是如何协同工作来防止问题的:我们将尝试在将 val 发送到通道 之后,在派生线程中使用它。尝试编译示例 16-9 中的代码,看看为什么这种代码是不被允许的。
The ownership rules play a vital role in message sending because they help you
write safe, concurrent code. Preventing errors in concurrent programming is the
advantage of thinking about ownership throughout your Rust programs. Let’s do
an experiment to show how channels and ownership work together to prevent
problems: We’ll try to use a val value in the spawned thread after we’ve
sent it down the channel. Try compiling the code in Listing 16-9 to see why
this code isn’t allowed.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
在这里,我们在通过 tx.send 将 val 发送到通道之后,尝试打印它。允许这样做会是一个坏主意:一旦值发送给另一个线程,那个线程可能在我们再次尝试使用该值之前就对其进行了修改或丢弃。潜在地,由于数据不一致或不存在,另一个线程的修改可能会导致错误或意外结果。然而,如果我们尝试编译示例 16-9 中的代码,Rust 会报错:
Here, we try to print val after we’ve sent it down the channel via tx.send.
Allowing this would be a bad idea: Once the value has been sent to another
thread, that thread could modify or drop it before we try to use the value
again. Potentially, the other thread’s modifications could cause errors or
unexpected results due to inconsistent or nonexistent data. However, Rust gives
us an error if we try to compile the code in Listing 16-9:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:27
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
我们的并发错误导致了编译时错误。send 函数获取其参数的所有权,当值被移动时,接收者就获得了它的所有权。这阻止了我们在发送值后意外地再次使用它;所有权系统检查并确保了一切正常。
Our concurrency mistake has caused a compile-time error. The send function
takes ownership of its parameter, and when the value is moved the receiver
takes ownership of it. This stops us from accidentally using the value again
after sending it; the ownership system checks that everything is okay.
发送多个值
Sending Multiple Values
示例 16-8 中的代码虽然编译运行成功,但它并没有清晰地向我们展示两个独立的线程正在通过通道进行对话。
The code in Listing 16-8 compiled and ran, but it didn’t clearly show us that two separate threads were talking to each other over the channel.
在示例 16-10 中,我们做了一些修改,这将证明示例 16-8 中的代码是并发运行的:派生线程现在将发送多条消息,并在每条消息之间暂停一秒钟。
In Listing 16-10, we’ve made some modifications that will prove the code in Listing 16-8 is running concurrently: The spawned thread will now send multiple messages and pause for a second between each message.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
这一次,派生线程有一个我们想要发送给主线程的字符串 vector。我们遍历它们,逐个发送,并在发送每个字符串之间调用 thread::sleep 函数暂停一秒钟,该函数接收一个为期一秒的 Duration 值。
This time, the spawned thread has a vector of strings that we want to send to
the main thread. We iterate over them, sending each individually, and pause
between each by calling the thread::sleep function with a Duration value of
one second.
在主线程中,我们不再显式调用 recv 函数:相反,我们将 rx 视为迭代器。对于收到的每个值,我们都将其打印出来。当通道关闭时,迭代将结束。
In the main thread, we’re not calling the recv function explicitly anymore:
Instead, we’re treating rx as an iterator. For each value received, we’re
printing it. When the channel is closed, iteration will end.
运行示例 16-10 中的代码时,你应该会看到以下输出,每行之间有一秒钟的停顿:
When running the code in Listing 16-10, you should see the following output with a one-second pause in between each line:
Got: hi
Got: from
Got: the
Got: thread
因为主线程的 for 循环中没有任何暂停或延迟的代码,所以我们可以看出主线程正在等待从派生线程接收值。
Because we don’t have any code that pauses or delays in the for loop in the
main thread, we can tell that the main thread is waiting to receive values from
the spawned thread.
创建多个生产者
Creating Multiple Producers
之前我们提到 mpsc 是“多个生产者,单个消费者”的缩写。让我们通过克隆发送者来扩展示例 16-10 中的代码,创建多个向同一个接收者发送值的线程,从而让 mpsc 发挥作用,如示例 16-11 所示。
Earlier we mentioned that mpsc was an acronym for multiple producer, single
consumer. Let’s put mpsc to use and expand the code in Listing 16-10 to
create multiple threads that all send values to the same receiver. We can do so
by cloning the transmitter, as shown in Listing 16-11.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
这一次,在创建第一个派生线程之前,我们对发送者调用 clone。这将为我们提供一个新的发送者,我们可以将其传递给第一个派生线程。我们将原始发送者传递给第二个派生线程。这样我们就有了两个线程,每个线程都向同一个接收者发送不同的消息。
This time, before we create the first spawned thread, we call clone on the
transmitter. This will give us a new transmitter we can pass to the first
spawned thread. We pass the original transmitter to a second spawned thread.
This gives us two threads, each sending different messages to the one receiver.
当你运行代码时,输出看起来应该像这样:
When you run the code, your output should look something like this:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
你可能会看到不同顺序的值,这取决于你的系统。这就是并发令人着迷同时也困难的地方。如果你对 thread::sleep 进行实验,在不同的线程中给它设置各种不同的值,那么每次运行都会更具不确定性,并产生不同的输出。
You might see the values in another order, depending on your system. This is
what makes concurrency interesting as well as difficult. If you experiment with
thread::sleep, giving it various values in the different threads, each run
will be more nondeterministic and create different output each time.
既然我们已经了解了通道的工作原理,那么让我们来看看另一种并发方法。
Now that we’ve looked at how channels work, let’s look at a different method of concurrency.