第21章 最后的项目:构建多线程 web server

第21章 最后的项目:构建多线程 web server

这是一次漫长的旅途,不过我们已经抵达了本书的结尾。在本章中,我们将一同构建另一个项目,来展示最后几章所学,同时复习更早的章节。

作为最后的项目,我们将要实现一个返回 “hello” 的 web server,它在浏览器中看起来就如图 21-1 所示:

图 21-1: 我们最后将一起分享的项目

如下是构建 web server 的计划:

  1. 学习一些 TCP 与 HTTP 知识
  2. 在套接字(socket)上监听 TCP 请求
  3. 解析少量的 HTTP 请求
  4. 创建一个合适的 HTTP 响应
  5. 通过线程池改善 server 的吞吐量

在开始之前,我们先提两点说明。首先,这里使用的方法并不是使用 Rust 构建 web server 的最佳方式。crates.io 上有很多可用于生产环境的 crate,它们提供了比我们所要编写的更为完整的 web server 和线程池实现。然而,本章的目的在于学习,而不是走捷径。因为 Rust 是一个系统编程语言,我们能够选择处理什么层次的抽象,并能够选择比其他语言可能或可用的层次更低的层次。

其次,我们不会在这里使用 async 和 await。构建线程池本身已经是一个相当大的挑战,无需再加入构建异步运行时的复杂度!不过,我们会指出 async 和 await 在本章中会遇到的一些问题上的可能应用。

因此我们将手动编写一个基础的 HTTP server 和线程池,以便学习将来可能用到的 crate 背后的通用理念和技术。

21.1 构建单线程 web server

首先让我们创建一个可运行的单线程 web server。在开始之前,我们将快速了解一下构建 web server 所涉及到的协议。这些协议的细节超出了本书的范畴,不过一个简单的概括会提供我们所需的信息。

web server 中涉及到的两个主要协议是 超文本传输协议Hypertext Transfer ProtocolHTTP)和 传输控制协议Transmission Control ProtocolTCP)。这两者都是 请求 - 响应request-response)协议,也就是说,有 客户端client)来初始化请求,并有 服务端server)监听请求并向客户端提供响应。请求与响应的内容由协议本身定义。

TCP 是一个底层协议,它描述了信息如何从一个 server 到另一个的细节,不过其并不指定信息是什么。HTTP 构建于 TCP 之上,它定义了请求和响应的内容。从技术上讲可将 HTTP 用于其他协议之上,不过对于绝大部分情况,HTTP 通过 TCP 传输数据。我们将要做的就是处理 TCP 和 HTTP 请求与响应的原始字节数据。

监听TCP连接

我们的 web server 所需做的第一件事,就是监听 TCP 连接。标准库提供了 std::net 模块处理这些功能。让我们像往常一样新建一个项目:

$ cargo new hello Created binary (application) `hello` project $ cd hello

现在,在 src/main.rs 输入示例 21-1 中的代码,作为一个开始。这段代码会在地址 127.0.0.1:7878 上监听传入的 TCP 流。当获取到传入的流,它会打印出 Connection established!

文件名:src/main.rs

// 21.1节 构建单线程 web server ————监听TCP连接 参考 示例 21-1 // 示例 21-1: 监听传入的流并在接收到流时打印信息 use std::net::TcpListener; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); println!("Connection established! {:?}", stream); } println!("Hello Rust"); }

示例 21-1: 监听传入的流并在接收到流时打印信息

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_1_1 Creating binary (application) `example21_1_1` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_1_1 PS F:\rustproject\RustProgramLanguage\chapter21\example21_1_1> cargo run Compiling example21_1_1 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_1_1) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.35s Running `target\debug\example21_1_1.exe` Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50656, socket: 308 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50657, socket: 312 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50658, socket: 312 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50659, socket: 316 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50660, socket: 316 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50661, socket: 312 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50662, socket: 312 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50663, socket: 312 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50664, socket: 312 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50665, socket: 308 } Connection established! TcpStream { addr: 127.0.0.1:7878, peer: 127.0.0.1:50666, socket: 308 }

TcpListener 用于监听 TCP 连接。我们选择监听本地地址 127.0.0.1:7878。将这个地址拆开来看,冒号之前的部分是一个代表本机的 IP 地址(在每台计算机上,这个地址都指本机,并不特指作者的计算机),而 7878 是端口。选择这个端口出于两个原因:通常 HTTP 服务器不在这个端口上接受请求,所以它不太可能与你机器上运行的其它 web server 的端口冲突;而且 7878 在电话上打出来就是 "rust"(译者注:九宫格键盘上的英文)。

在这个场景中 bind 函数类似于 new 函数,在这里它返回一个新的 TcpListener 实例。这个函数叫做 bind 是因为,在网络领域,连接到要监听的端口称为“绑定到端口”(“binding to a port”)

bind 函数返回 Result<T, E>,这表明绑定可能会失败。例如,监听 80 端口需要管理员权限(非管理员用户只能监听大于 1023 的端口),所以如果尝试监听 80 端口而没有管理员权限,则会绑定失败。再比如,如果我们运行这个程序的两个实例,并因此有两个实例监听同一个端口,那么绑定也将失败。我们是出于学习目的来编写一个基础的服务器,不用关心处理这类错误,而仅仅使用 unwrap 在出现这些情况时直接停止程序。

TcpListenerincoming 方法返回一个迭代器,它提供了一系列的流(更准确的说是 TcpStream 类型的流)。stream)代表一个客户端和服务端之间打开的连接。连接connection)代表客户端连接服务端、服务端生成响应以及服务端关闭连接的整个请求 / 响应过程。为此,我们会从 TcpStream 读取客户端发送了什么并接着向流发送响应以向客户端发回数据。总体来说,这个 for 循环会依次处理每个连接并产生一系列的流供我们处理。

目前,处理流的代码中也有一个 unwrap 调用,如果 stream 出现任何错误会终止程序;如果没有任何错误,程序会打印一条消息。下一个示例中,我们将为成功的情况增加更多功能。当客户端连接到服务端时,incoming 方法是可能返回错误的,因为我们实际上不是在遍历连接,而是遍历 连接尝试connection attempts)。连接的尝试可能会因为多种原因不能成功,其中大部分是操作系统相关的。例如,很多系统限制它所能支持的同时打开的连接数,超出数量限制的新连接尝试会产生错误,直到一些现有的连接关闭为止。

让我们试试这段代码!首先在终端执行 cargo run,接着在浏览器中打开 127.0.0.1:7878。浏览器会显示出看起来类似于“连接重置”(“Connection reset”)的错误信息,因为 server 目前并没响应任何数据。如果我们观察终端,会发现当浏览器连接我们的服务端时,会打印出一系列的信息!

Running `target/debug/hello` Connection established! Connection established! Connection established!

有时,对于一次浏览器请求可能会打印出多条信息;原因可能是浏览器不仅请求页面,还请求其他资源,比如出现在浏览器标签页中的 favicon.ico 图标。

这也可能是因为浏览器尝试多次连接服务端,因为服务端没有响应任何数据。当 stream 在循环结束时离开作用域并被丢弃,其连接将作为 drop 实现的一部分被关闭。浏览器有时通过重连来处理关闭的连接,因为这些问题可能是暂时的。

浏览器有时还会在不发送任何请求的情况下打开多个连接,以便在稍后发送请求时能够更快地开始。出现这种情况时,我们的服务端会看到每个连接,而不管该连接上是否有请求。例如,许多基于 Chrome 的浏览器版本都会这样做;你可以通过使用私人浏览模式或更换其他浏览器来禁用该优化。

重要的是,我们已经成功获取了一个 TCP 连接的句柄!

记得当运行完特定版本的代码后,使用 ctrl-c 来停止程序。然后在你完成每次代码修改后,通过运行 cargo run 命令重新启动程序,以确保你正在运行最新的代码。

读取请求

让我们实现读取来自浏览器请求的功能!为了将首先获取连接和接着对连接采取操作两项职责分离,我们将开始写一个新函数来处理连接。在这个新的 handle_connection 函数中,我们从 TCP 流中读取数据,并打印出来,以便观察浏览器发送过来的数据。将代码修改为如示例 21-2 所示:

文件名:src/main.rs

// 21.1节 构建单线程 web server ————读取请求 参考 示例 21-2 // 示例 21-2: 读取 TcpStream 并打印数据 use std::{ io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } println!("Hello Rust"); } fn handle_connection(stream: TcpStream) { let buf_reader = BufReader::new(&stream); let http_request: Vec<_> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect(); println!("Request: {http_request:#?}"); } 

示例 21-2: 读取 TcpStream 并打印数据

编译运行,执行cargo run命令后,打开浏览器,输入127.0.0.1:7878

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_1_2 Creating binary (application) `example21_1_2` package PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_1_2 PS F:\rustproject\RustProgramLanguage\chapter21\example21_1_2> cargo run Compiling example21_1_2 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_1_2) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.20s Running `target\debug\example21_1_2.exe` Request: [ "GET / HTTP/1.1", "Host: 127.0.0.1:7878", "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0", "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language: zh-CN,zh;q=0.9,zh-TW;q=0.8,zh-HK;q=0.7,en-US;q=0.6,en;q=0.5", "Accept-Encoding: gzip, deflate, br, zstd", "DNT: 1", "Sec-GPC: 1", "Connection: keep-alive", "Upgrade-Insecure-Requests: 1", "Sec-Fetch-Dest: document", "Sec-Fetch-Mode: navigate", "Sec-Fetch-Site: none", "Sec-Fetch-User: ?1", "Priority: u=0, i", ] Request: [ "GET / HTTP/1.1", "Host: 127.0.0.1:7878", "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0", "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language: zh-CN,zh;q=0.9,zh-TW;q=0.8,zh-HK;q=0.7,en-US;q=0.6,en;q=0.5", "Accept-Encoding: gzip, deflate, br, zstd", "DNT: 1", "Sec-GPC: 1", "Connection: keep-alive", "Upgrade-Insecure-Requests: 1", "Sec-Fetch-Dest: document", "Sec-Fetch-Mode: navigate", "Sec-Fetch-Site: none", "Sec-Fetch-User: ?1", "Priority: u=0, i", ] 后面还有很多个Request

这里将 std::io::preludestd::io::BufReader 引入作用域,来获取读写流所需的 trait 和类型。在 main 函数的 for 循环中,相比获取到连接时打印信息,现在调用新的 handle_connection 函数并向其传递 stream

handle_connection 中,我们新建了一个 BufReader 实例来封装一个 stream 的引用。BufReader 通过替我们管理 std::io::Read trait 方法的调用增加了缓冲。

我们创建了一个 http_request 变量来收集浏览器发送给服务端的请求行。这里增加了 Vec<_> 类型注解表明希望将这些行收集到一个 vector 中。

BufReader 实现了 std::io::BufRead trait,它提供了 lines 方法。lines 方法通过遇到换行符(newline)字节就切分数据流来返回一个 Result<String, std::io::Error> 的迭代器。为了获取每一个 String,我们通过 map 并 unwrap 每一个 Result。如果数据不是有效的 UTF-8 编码或者读取流遇到问题时,Result 可能是一个错误。同理,用于生产环境的程序应该更优雅地处理这些错误,不过出于简单的目的我们选择在错误情况下停止程序。

浏览器通过连续发送两个换行符来代表一个 HTTP 请求的结束,所以为了从流中获取一个请求,我们会读取行直到遇到一个空字符串的行。一旦将这些行收集进 vector,就可以使用友好的 debug 格式化打印它们,以便看看浏览器发送给服务端的指令。

让我们试一试!启动程序并再次在浏览器中发起请求。注意浏览器中仍然会出现错误页面,不过终端中程序的输出现在看起来像这样:

$ cargo run Compiling hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 0.42s Running `target/debug/hello` Request: [ "GET / HTTP/1.1", "Host: 127.0.0.1:7878", "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:99.0) Gecko/20100101 Firefox/99.0", "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language: en-US,en;q=0.5", "Accept-Encoding: gzip, deflate, br", "DNT: 1", "Connection: keep-alive", "Upgrade-Insecure-Requests: 1", "Sec-Fetch-Dest: document", "Sec-Fetch-Mode: navigate", "Sec-Fetch-Site: none", "Sec-Fetch-User: ?1", "Cache-Control: max-age=0", ]

根据不同的浏览器,可能会出现稍微不同的输出。现在我们打印出了请求数据,可以通过观察第一行 GET 之后的路径就可以看出为何会从浏览器得到多个连接。如果重复的连接都是请求 /,就知道了浏览器尝试重复获取 / 因为它没有从程序得到响应。

让我们拆开请求数据来理解浏览器向程序请求了什么。

仔细观察HTTP请求

HTTP 是一个基于文本的协议,同时一个请求有如下格式:

Method Request-URI HTTP-Version CRLF headers CRLF message-body

"GET / HTTP/1.1",

第一行叫做 请求行request line),它存放了客户端请求了什么的信息。请求行的第一部分是所使用的 method,比如 GETPOST,这描述了客户端如何进行请求。这里客户端使用了 GET 请求,表明它在请求信息。

请求行接下来的部分是 /,它代表客户端请求的 统一资源标识符Uniform Resource IdentifierURI):URI 大体上但也不完全类似于 URL(统一资源定位符Uniform Resource Locators)。URI 和 URL 之间的区别对于本章的目的来说并不重要,不过 HTTP 规范使用术语 URI,所以这里可以简单的将 URL 理解为 URI。

最后一部分是客户端使用的 HTTP 版本,然后请求行以 CRLF 序列 (CRLF 代表回车和换行,carriage return line feed,这是打字机时代的术语!)结束。CRLF 序列也可以写成\r\n,其中\r是回车符,\n是换行符。CRLF 序列将请求行与其余请求数据分开。请注意,打印 CRLF 时,我们会看到一个新行开始,而不是\r\n

观察目前为止运行程序所接收到的请求行数据,可以看到 GET 是 method,/ 是请求 URI,而 HTTP/1.1 是版本。

Host: 开始的其余的行是 headers;GET 请求没有 body。

如果你希望的话,可以尝试用不同的浏览器发送请求,或请求不同的地址,比如 127.0.0.1:7878/test,来观察请求数据如何变化。

现在我们知道了浏览器请求了什么。让我们返回一些数据!

编写响应

我们将实现在客户端请求的响应中发送数据的功能。响应具有如下格式:

HTTP-Version Status-Code Reason-Phrase CRLF headers CRLF message-body

第一行叫做 状态行status line),它包含响应的 HTTP 版本、一个数字状态码(status code)用以总结请求的结果和一个描述之前状态码的文本原因短语(reason phrase)。CRLF 序列之后是任意 header,另一个 CRLF 序列,和响应的 body。

这里是一个使用 HTTP 1.1 版本的响应例子,其状态码为 200,原因短语为 OK,没有 header,也没有 body:

HTTP/1.1 200 OK\r\n\r\n

状态码 200 是一个标准的成功响应。这些文本是一个微型的成功 HTTP 响应。让我们将这些文本写入流作为成功请求的响应!在 handle_connection 函数中,我们需要去掉打印请求数据的 println!,并替换为示例 21-3 中的代码:

文件名:src/main.rs

// 21.1节 构建单线程 web server ————编写响应 参考 示例 21-3 // 示例 21-3: 将一个微型成功 HTTP 响应写入流 use std::{ io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let _http_request: Vec<_> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect(); //println!("Request: {http_request:#?}"); let response = "HTTP/1.1 200 OK\r\n\r\n"; stream.write_all(response.as_bytes()).unwrap(); }

示例 21-3: 将一个微型成功 HTTP 响应写入流

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_1_3 Creating binary (application) `example21_1_3` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_1_3 PS F:\rustproject\RustProgramLanguage\chapter21\example21_1_3> cargo run Compiling example21_1_3 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_1_3) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.30s Running `target\debug\example21_1_3.exe`

浏览器输入127.0.0.1:7878

新代码中的第一行定义了变量 response 来存放成功消息的数据。接着,我们在 response 上调用 as_bytes 将字符串数据转换为字节数组。因为 streamwrite_all 方法获取一个 &[u8] 并直接将这些字节发送给连接。因为 write_all 操作可能会失败,所以像之前那样对任何错误结果使用 unwrap。同理,在真实世界的应用中这里需要添加错误处理。

有了这些修改,运行我们的代码并进行请求。由于不再向终端打印任何数据,所以不会再看到除了 Cargo 以外的任何输出。不过当在浏览器中加载 127.0.0.1:7878 时,会得到一个空页面而不是错误。我们刚刚手写了接收 HTTP 请求并发送响应!

返回真正的HTML

让我们实现不只是返回空页面的功能。在项目根目录创建一个新文件,hello.html,不是在 src 目录。在此可以放入任何你期望的 HTML 内容;示例 21-4 展示了一个可能的文本:

文件名:hello.html

<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello!</title> </head> <body> <h1>Hello!</h1> <p>Hi from Rust</p> </body> </html>

示例 21-4: 一个示例 HTML 文件作为响应返回

这是一个极简 HTML5 文档包含一个标题和一小段文本。为了在服务端接收请求时返回它,需要如示例 21-5 所示修改 handle_connection 来读取 HTML 文件,将其加入到响应的 body 中并发送:

文件名:src/main.rs

// 21.1节 构建单线程 web server ————返回真正的HTML 参考 示例 21-5 // 示例 21-5: 将 hello.html 的内容作为响应 body 发送 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let _http_request: Vec<_> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect(); let status_line = "HTTP/1.1 200 OK"; let contents = fs::read_to_string("hello.html").unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }

示例 21-5: 将 hello.html 的内容作为响应 body 发送

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_1_4 Creating binary (application) `example21_1_4` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_1_4 PS F:\rustproject\RustProgramLanguage\chapter21\example21_1_4> cargo run Compiling example21_1_4 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_1_4) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.13s Running `target\debug\example21_1_4.exe`

我们在开头 use 语句将标准库的文件系统模块 fs 引入作用域。打开和读取文件的代码应该看起来很熟悉;在第十二章 I/O 项目中的 Listing 12-4 就用到了类似的代码。

接下来,使用 format! 将文件内容加入到将要写入流的成功响应的 body 中。为了确保构造出有效的 HTTP 响应,我们添加了 Content-Length header,其值设为响应 body 的大小,在这里也就是 hello.html 文件的大小。

使用 cargo run 运行程序,在浏览器加载 127.0.0.1:7878,你应该会看到渲染后的 HTML!

目前忽略了 http_request 中的请求数据并无条件的发送了 HTML 文件的内容。这意味着如果尝试在浏览器中请求 127.0.0.1:7878/something-else 也会得到同样的 HTML 响应。目前我们的 server 的作用是非常有限的,也不是大部分 server 所做的那样;让我们检查请求并只对格式良好(well-formed)的请求 / 发送 HTML 文件。

验证请求并有选择的进行响应

目前我们的 web server 不管客户端请求什么都会返回相同的 HTML 文件。让我们增加在返回 HTML 文件前检查浏览器是否请求 /,并在其请求任何其他内容时返回错误的功能。为此需要如示例 21-6 那样修改 handle_connection。新代码接收到的请求的内容与已知的 / 请求做比较,并增加了 ifelse 块来区别处理请求:

文件名:src/main.rs

// 21.1节 构建单线程 web server ————验证请求并有选择的进行响应 参考 示例 21-6 // 示例 21-6: 以不同于其它请求的方式处理 / 请求 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); println!("request_line = {:#?}", request_line); if request_line == "GET / HTTP/1.1" { let status_line = "HTTP/1.1 200 OK"; let contents = fs::read_to_string("hello.html").unwrap(); let length = contents.len(); let response = format!( "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}" ); stream.write_all(response.as_bytes()).unwrap(); } else { println!("Request Error!"); } }

示例 21-6: 以不同于其它请求的方式处理 / 请求

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_1_5 Creating binary (application) `example21_1_5` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_1_5 PS F:\rustproject\RustProgramLanguage\chapter21\example21_1_5> cargo run Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.00s Running `target\debug\example21_1_5.exe` request_line = "GET / HTTP/1.1" 

浏览器请求

我们只看 HTTP 请求的第一行,所以不同于将整个请求读取进 vector 中,这里调用 next 从迭代器中获取第一项。第一个 unwrap 负责处理 Option 并在迭代器没有项时停止程序。第二个 unwrap 处理 Result 并与示例 21-2 中增加的 map 中的 unwrap 有着相同的效果。

接下来检查 request_line 是否等于一个 / 路径的 GET 请求。如果是,if 代码块返回 HTML 文件的内容。

如果 request_line 等于一个 / 路径的 GET 请求,就说明接收的是其它请求。我们之后会在 else 块中增加代码来响应所有其他请求。

现在如果运行代码并请求 127.0.0.1:7878,就会得到 hello.html 中的 HTML。如果进行任何其他请求,比如 127.0.0.1:7878/something-else,则会得到像运行示例 21-1 和 21-2 中代码那样的连接错误。

现在向示例 21-7 的 else 块增加代码来返回一个带有 404 状态码的响应,这代表了所请求的内容没有找到。接着也会返回一个 HTML 向浏览器终端用户渲染该响应。

文件名:src/main.rs

// 21.1节 构建单线程 web server ————验证请求并有选择的进行响应 参考 示例 21-7 // 示例 21-7: 对于任何不是 / 的请求返回 404 状态码的响应和错误页面 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); println!("request_line = {:#?}", request_line); if request_line == "GET / HTTP/1.1" { let status_line = "HTTP/1.1 200 OK"; let contents = fs::read_to_string("hello.html").unwrap(); let length = contents.len(); let response = format!( "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}" ); stream.write_all(response.as_bytes()).unwrap(); } else { let status_line = "HTTP/1.1 404 NOT FOUND"; let contents = fs::read_to_string("404.html").unwrap(); let length = contents.len(); let response = format!( "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}" ); stream.write_all(response.as_bytes()).unwrap(); } }

示例 21-7: 对于任何不是 / 的请求返回 404 状态码的响应和错误页面

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_1_6 Creating binary (application) `example21_1_6` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_1_6 PS F:\rustproject\RustProgramLanguage\chapter21\example21_1_6> cargo run Compiling example21_1_6 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_1_6) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.31s Running `target\debug\example21_1_6.exe` request_line = "GET / HTTP/1.1"

这里,响应的状态行有状态码 404 和原因短语 NOT FOUND。仍然没有返回任何 header,而其 body 将是 404.html 文件中的 HTML。需要在 hello.html 同级目录创建 404.html 文件作为错误页面;这一次也可以随意使用任何 HTML 或使用示例 21-8 中的示例 HTML:

文件名:404.html

<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello!</title> </head> <body> <h1>Oops!</h1> <p>Sorry, I don't know what you're asking for.</p> </body> </html>

示例 21-8: 任何 404 响应所返回错误页面内容样例

有了这些修改,再次运行服务端。请求 127.0.0.1:7878 应该会返回 hello.html 的内容,而对于任何其他请求,比如 127.0.0.1:7878/foo,应该会返回 404.html 中的错误 HTML。

稍加重构

目前 ifelse 块中的代码存在大量重复:他们都读取文件并将其内容写入流。唯一的区别是状态行和文件名。为使代码更简洁,将这些区别分别提取到各自的 ifelse 中,对状态行和文件名变量赋值;然后在读取文件和写入响应的代码中无条件地使用这些变量。重构后取代了大段 ifelse 块代码后的结果如示例 21-9 所示:

文件名:src/main.rs

// 21.1节 构建单线程 web server ————稍加重构 参考 示例 21-9 // 示例 21-9: 重构使得 if 和 else 块中只包含两个情况所不同的代码 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); println!("request_line = {:#?}", request_line); let (status_line, filename) = if request_line == "GET / HTTP/1.1" { ("HTTP/1.1 200 OK", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!( "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}" ); stream.write_all(response.as_bytes()).unwrap(); }

示例 21-9: 重构使得 ifelse 块中只包含两个情况所不同的代码

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_1_7 Creating binary (application) `example21_1_7` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_1_7 PS F:\rustproject\RustProgramLanguage\chapter21\example21_1_7> cargo run Compiling example21_1_7 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_1_7) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.30s Running `target\debug\example21_1_7.exe` request_line = "GET / HTTP/1.1" 

输入http://127.0.0.1:7878/foo

现在 ifelse 块所做的唯一的事就是在一个元组中返回合适的状态行和文件名的值;接着使用第十九章讲到的使用模式的 let 语句通过解构元组的两部分给 filenameheader 赋值。

之前读取文件和写入响应的冗余代码现在位于 ifelse 块之外,并会使用变量 status_linefilename。这样更易于观察这两种情况真正有何不同,还意味着如果需要改变如何读取文件或写入响应时只需要更新一处的代码。示例 21-9 中代码的行为与示例 21-8 完全相同。

好极了!我们现在有了一个 40 行左右 Rust 代码的小而简单的服务端,它对一个请求返回页面内容而对所有其他请求返回 404 响应。

目前服务端运行于单线程中,这意味着它一次只能处理一个请求。让我们模拟一些慢请求来看看这为何会成为一个问题。然后我们将修复它使得服务端能够同时处理多个请求。

21.2 将单线程server变为多线程server

目前服务端会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果服务端正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。

在当前服务端实现中模拟慢请求

让我们看看一个慢请求如何影响当前服务端实现中的其他请求。示例 21-10 通过模拟慢响应实现了 /sleep 请求处理,它会使服务端在响应之前休眠五秒。

文件名:src/main.rs

// 21.2节 将单线程 server 变为多线程 server ————在当前服务端实现中模拟慢请求 // 参考 示例 21-10: 通过休眠五秒来模拟慢请求 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); } 

示例 21-10: 通过休眠五秒来模拟慢请求

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_2_1 Creating binary (application) `example21_2_1` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_2_1 PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_1> cargo run Compiling example21_2_1 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_1) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.34s Running `target\debug\example21_2_1.exe` 

if 切换到 match 后现在有三个分支了。我们需要显式地匹配一个 slice 的 request_line 以模式匹配字符串字面值。match 不会像相等方法那样自动引用和解引用。

第一个分支与示例 21-9 中的 if 代码块相同。第二个分支匹配一个 /sleep 请求。当接收到这个请求时,server 在渲染成功 HTML 页面之前会先休眠五秒。第三个分支与示例 21-9 中的 else 代码块相同。

现在就可以真切的看出我们的服务端有多么的原始:真实的库将会以更简洁的方式处理多个请求的识别!

使用 cargo run 启动服务端,并接着打开两个浏览器窗口:一个请求 http://127.0.0.1:7878/ 而另一个请求 http://127.0.0.1:7878/sleep 。如果像之前一样多次请求 /,会发现响应的比较快速。不过如果请求 /sleep 之后再请求 /,就会看到 / 会等待直到 sleep 休眠完五秒之后才响应。

有多种技术可以用来避免所有请求都排在慢请求之后,包括我们在第十七章中所使用的异步;我们将要实现的一个便是线程池。

使用线程池改善吞吐量

线程池thread pool)是一组预先分配的等待或准备处理任务的线程。当程序收到一个新任务,线程池中的一个线程会被分配该任务,并负责处理它。其余线程在该线程处理任务的同时可以处理任何其他接收到的任务。当第一个线程处理完任务时,它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接,提高服务端的吞吐量。

我们会将池中线程限制为较少的数量,以防拒绝服务(Denial of Service,DoS)攻击;如果程序为每一个接收的请求都新建一个线程,某人向服务端发起千万级的请求时会耗尽服务器的资源并导致请求处理陷入停滞。

不同于分配无限的线程,线程池中将有固定数量的等待线程。当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向队列获取下一个请求。通过这种设计,则可以并发处理 N 个请求,其中 N 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前已经增加了能处理的慢请求的数量。

这个设计仅仅是多种改善 web 服务端吞吐量的方法之一。其他可供探索的方法有 fork/join 模型(fork/join model)、单线程异步 I/O 模型(single-threaded async I/O model)或者多线程异步 I/O 模型(multi-threaded async I/O model)。如果你对这个主题感兴趣,则可以阅读更多关于其他解决方案的内容并尝试实现它们;对于一个像 Rust 这样的底层语言,所有这些方法都是可行的。

在开始之前,让我们讨论一下线程池应用看起来如何。当尝试设计代码时,首先编写客户端接口(client interface)有助于指导代码设计。以期望的调用方式来构建 API 代码的结构,接着在这个结构之内实现功能,而不是先实现功能再设计公有 API。

类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发(compiler-driven development)。我们将编写调用所期望的函数的代码,接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。不过在开始之前,我们将探索不会作为起点使用的技术。

为每一个请求分配线程

首先,让我们探索一下如果为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个可用的多线程服务端的起点。接着我们会增加线程池作为改进,这样比较两个方案将会更容易。示例 21-11 展示了 main 的改变,它在 for 循环中为每一个流分配了一个新线程进行处理:

文件名:src/main.rs

// 21.2节 将单线程 server 变为多线程 server ————为每一个请求分配线程 // 参考 示例 21-11: 为每一个流新建一个线程 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }

示例 21-11: 为每一个流新建一个线程

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_2_2 Creating binary (application) `example21_2_2` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_2_2 PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_2> cargo run Compiling example21_2_2 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_2) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.52s Running `target\debug\example21_2_2.exe`

正如第十六章讲到的,thread::spawn 会创建一个新线程并在其中运行闭包中的代码。如果运行这段代码并在在浏览器中加载 /sleep,接着在另两个浏览器标签页中加载 /,确实会发现 / 请求不必等待 /sleep 结束。不过正如之前提到的,这最终会使系统崩溃因为我们会无限制地创建新线程。

你可能也会回想起第十七章中正是这一类情况才是 async 和 await 真正闪光的地方!在我们用线程池构建项目时请记住并思考这与异步有什么不同或相同的地方。

创建有限数量的线程

我们期望线程池以类似且熟悉的方式工作,以便从线程切换到线程池并不会对使用该 API 的代码做出大幅修改。示例 21-12 展示我们希望用来替换 thread::spawnThreadPool 结构体的假想接口:

文件名:src/main.rs

// 21.2节 将单线程 server 变为多线程 server ————创建有限数量的线程 // 参考 示例 21-12: 假想的 ThreadPool 接口 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); let pool = ThreadPool::new(4); pool.execure(|| { handle_connection(stream); }); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }

示例 21-12: 假想的 ThreadPool 接口

我们使用 ThreadPool::new 创建一个具有可配置线程数的新线程池,在这里是四。这样在 for 循环中,pool.execute 有着类似 thread::spawn 的接口,它获取一个线程池运行于每一个流的闭包。我们需要实现 pool.execute,使其能够接收闭包并将其传递给线程池中的线程执行。这段代码还不能编译,但我们可以尝试让编译器指导我们如何修复它。

采用编译器驱动开发构建ThreadPool

继续并对示例 21-12 中的 src/main.rs 做出修改,并利用来自 cargo check 的编译器错误来驱动开发。下面是我们得到的第一个错误:

PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3> cargo check Checking example21_2_3 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3) error[E0433]: failed to resolve: use of undeclared type `ThreadPool` --> src\main.rs:17:20 | 17 | let pool = ThreadPool::new(4); | ^^^^^^^^^^ use of undeclared type `ThreadPool` For more information about this error, try `rustc --explain E0433`. error: could not compile `example21_2_3` (bin "example21_2_3") due to 1 previous error PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3>

太好了!这个错误告诉我们需要一个 ThreadPool 类型或模块,所以我们现在就来构建一个。ThreadPool 的实现会与 web 服务端的特定工作相独立。所以让我们从 hello crate 切换到存放 ThreadPool 实现的新库 crate。切换为库 crate 之后,我们就可以在任何工作中使用这个单独的线程池库,而不仅仅是处理网络请求。

创建 src/lib.rs 文件,它包含了目前可用的最简单的 ThreadPool 定义:

文件名:src/lib.rs

pub struct ThreadPool;

接着编辑 main.rs 文件通过在 src/main.rs 的开头增加如下代码将 ThreadPool 从库 crate 引入作用域:

文件名:src/main.rs

use hello::ThreadPool;

这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:

PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3> cargo check Locking 1 package to latest Rust 1.92.0 compatible version Adding hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3\hello) Checking hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3\hello) Checking example21_2_3 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3) error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope --> src\main.rs:18:32 | 18 | let pool = ThreadPool::new(4); | ^^^ function or associated item not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `example21_2_3` (bin "example21_2_3") due to 1 previous error PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3>

此错误表明下一步是为 ThreadPool 创建一个叫做 new 的关联函数。我们还知道 new 需要有一个参数可以接受 4,而且 new 应该返回 ThreadPool 实例。让我们实现拥有此特征的最小化 new 函数:

文件夹:src/lib.rs

pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } }

这里选择 usize 作为 size 参数的类型,因为我们知道线程数为负没有意义。我们还知道将使用 4 作为线程集合的元素数量,这也就是使用 usize 类型的原因,如第三章 “整型” 部分所讲。

再次编译检查这段代码:

PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3> cargo check Checking hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3\hello) Checking example21_2_3 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3) error[E0599]: no method named `execure` found for struct `ThreadPool` in the current scope --> src\main.rs:19:14 | 19 | pool.execure(|| { | -----^^^^^^^ method not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `example21_2_3` (bin "example21_2_3") due to 1 previous error PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3>

这里发生错误是因为并没有 ThreadPool 上的 execute 方法。回忆 “创建有限数量的线程” 部分我们决定线程池应该有与 thread::spawn 类似的接口,同时我们将实现 execute 函数来获取传递的闭包并将其传递给池中的空闲线程执行。

我们会在 ThreadPool 上定义 execute 函数来获取一个闭包参数。回忆第十三章的 “将捕获的值移出闭包和 Fn trait” 部分,闭包作为参数时可以使用三个不同的 trait:FnFnMutFnOnce。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 thread::spawn,所以我们可以观察 thread::spawn 的签名在其参数中使用了何种 bound。查看文档会发现:

pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,

F 是这里我们关心的参数;T 与返回值有关所以我们并不关心。考虑到 spawn 使用 FnOnce 作为 F 的 trait bound,这可能也是我们需要的,因为最终会将传递给 execute 的参数传给 spawn。因为处理请求的线程只会执行闭包一次,这也进一步确认了 FnOnce 是我们需要的 trait,这里符合 FnOnceOnce 的意思。

F 还有 trait bound Send 和生命周期绑定 'static,这对我们的情况也是有意义的:需要 Send 来将闭包从一个线程转移到另一个线程,而 'static 是因为并不知道线程会执行多久。让我们编写一个使用带有这些 bound 的泛型参数 FThreadPoolexecute 方法:

文件名:src/lib.rs

pub struct ThreadPool; #[allow(unused)] impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }

FnOnce trait 仍然需要之后的 (),因为这里的 FnOnce 代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。

这里再一次增加了 execute 方法的最小化实现:它没有做任何工作,只是尝试让代码能够编译。再次进行检查:

PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3> cargo check Checking hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3\hello) Checking example21_2_3 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_3) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.07s PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3>

现在就只有警告了!这意味着能够编译了!注意如果尝试 cargo run 运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 execute 的闭包!

一个你可能听说过的关于像 Haskell 和 Rust 这样有严格编译器的语言的说法是 “如果代码能够编译,它就能工作”。不过这个说法并不是普适的。我们的项目可以编译,不过它完全没有做任何工作!如果构建一个真实且功能完整的项目,则需花费大量的时间来开始编写单元测试来检查代码能否编译 并且 拥有期望的行为。

思考一下:如果这里要执行的是一个 future 而不是闭包会有什么不同?

在new中验证线程池的线程数量

这里并没有对 newexecute 的参数做任何操作。让我们用期望的行为来实现这些函数。以考虑 new 作为开始。之前选择使用无符号类型作为 size 参数的类型,因为线程数为负的线程池没有意义。然而,线程数为零的线程池同样没有意义,不过零是一个完全有效的 usize 值。让我们增加在返回 ThreadPool 实例之前检查 size 是否大于零的代码,并使用 assert! 宏在得到零时 panic,如示例 21-13 所示:

文件名:src/lib.rs

impl ThreadPool { /// 创建一个新的线程池。 /// /// size 是池中线程的数量。 /// /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --snip-- }

示例 21-13: 实现 ThreadPool::newsize 为零时 panic

这里也用文档注释为 ThreadPool 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 cargo doc --open 并点击 ThreadPool 结构体来查看生成的 new 的文档看起来如何!

相比像这里使用 assert! 宏,也可以让 new 像之前 I/O 项目中示例 12-9 中 Config::build 那样将 new 更改为 build 并返回一个 Result,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的名为 build 的函数来对比一下 new 函数:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
分配空间以存储线程

现在我们已经有了一种方法来确保线程池中的线程数有效,就可以实际创建这些线程并在返回结构体之前将它们存储在 ThreadPool 结构体中。不过如何 “存储” 一个线程?让我们再看看 thread::spawn 的签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,

spawn 返回 JoinHandle<T>,其中 T 是闭包返回的类型。尝试使用 JoinHandle 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 T 将会是单元类型 ()

示例 21-14 中的代码可以编译,不过实际上还并没有创建任何线程。我们改变了 ThreadPool 的定义来存放一个 thread::JoinHandle<()> 的 vector 实例,使用 size 容量来初始化,并设置一个 for 循环来运行创建线程的代码,并返回包含这些线程的 ThreadPool 实例:

文件名:src/lib.rs

use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } #[allow(unused)] impl ThreadPool { /// 创建一个新的线程池。 /// Create a new ThreadPool. /// size 是池中线程的数量。 /// The size is the number of threads in the pool. /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // 创建一些线程并将它们存入 vector 中。 } ThreadPool {threads} } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } 

示例 21-14: 为 ThreadPool 创建一个 vector 来存放线程

这里将 std::thread 引入库 crate 的作用域,因为使用了 thread::JoinHandle 作为 ThreadPool 中 vector 元素的类型。

在得到了有效的数量之后,ThreadPool 新建一个存放 size 个元素的 vector。with_capacity 函数与 Vec::new 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 size 个元素,预先进行分配比仅仅 Vec::new 要稍微有效率一些,因为 Vec::new 随着插入元素而重新改变大小。

如果再次运行 cargo check,它应该会成功。

完整的实验代码:

项目结构:

example21_2_3\hello\src\lib.rs文件:

use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } #[allow(unused)] impl ThreadPool { /// 创建一个新的线程池。 /// Create a new ThreadPool. /// size 是池中线程的数量。 /// The size is the number of threads in the pool. /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // 创建一些线程并将它们存入 vector 中。 } ThreadPool {threads} } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } 

example21_2_3\src\main.rs文件

// 21.2节 将单线程 server 变为多线程 server ————创建有限数量的线程 // 参考 示例 21-12: 假想的 ThreadPool 接口 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; use hello::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); let pool = ThreadPool::new(4); pool.execute(|| { handle_connection(stream); }); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }

example21_2_3\Cargo.toml文件

[package] name = "example21_2_3" version = "0.1.0" edition = "2024" [dependencies] hello = {path = "./hello"}

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_2_3 Creating binary (application) `example21_2_3` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_2_3 PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3> cargo new hello --lib Creating library `hello` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3> PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_3> cargo run Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.00s Running `target\debug\example21_2_3.exe` 

请求失败

Worker结构体负责将代码从ThreadPool传递给线程

示例 21-14 的 for 循环中留下了一个关于创建线程的注释。这里,我们来看看如何实际创建线程。标准库提供了 thread::spawn 作为创建线程的方法,thread::spawn 期望获取一些一旦创建线程就应该执行的代码。然而,我们希望开始线程并使其等待稍后传递的代码。标准库的线程实现并没有包含这么做的方法;我们必须手动实现。

我们将要实现的行为是创建线程并稍后发送代码,这会在 ThreadPool 和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 Worker,这是一个池实现中的常见概念。Worker 会获取需要运行的代码,并在该 worker 的线程中运行该代码。

想象一下在餐馆厨房工作的员工:员工等待来自顾客的订单,他们负责接单并完成它们。

不同于在线程池中储存一个 JoinHandle<()> 实例的 vector,我们会储存 Worker 结构体的实例。每一个 Worker 会储存一个单独的 JoinHandle<()> 实例。接着会在 Worker 上实现一个方法,该方法将闭包发送到已经运行的线程中执行。我们还会赋予每个 worker 一个 id,这样就可以在日志和调试中区别线程池中的不同 Worker 的实例。

如下是创建 ThreadPool 时会发生的新过程。在通过如下方式设置完 Worker 之后,我们会实现向线程发送闭包的代码:

  1. 定义存放 idJoinHandle<()>Worker 结构体。
  2. 修改 ThreadPool 存放一个 Worker 实例的 vector。
  3. 定义 Worker::new 函数,它获取一个 id 数字并返回一个带有 id 和用空闭包分配的线程的 Worker 实例。
  4. ThreadPool::new 中,使用 for 循环计数生成 id,使用这个 id 新建 Worker,并储存进 vector 中。

如果你渴望挑战,在查示例 21-15 中的代码之前尝试自己实现这些修改。

准备好了吗?示例 21-15 就是一个做出了上述修改的例子:

文件名:src/lib.rs

use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }

示例 21-15: 修改 ThreadPool 存放 Worker 实例而不是直接存放线程

这里将 ThreadPool 中字段名从 threads 改为 workers,因为它现在存储 Worker 而不是 JoinHandle<()>。使用 for 循环中的计数作为 Worker::new 的参数,并将每一个新建的 Worker 存储在叫做 workers 的 vector 中。

Worker 结构体和其 new 函数是私有的,因为外部代码(比如 src/main.rs 中的 server)并不需要知道关于 ThreadPool 中使用 Worker 结构体的实现细节。Worker::new 函数使用 id 参数并存储了使用一个空闭包创建的 JoinHandle<()> 实例。

注意:如果操作系统因为没有足够的系统资源而无法创建线程时,thread::spawn 会 panic。这会导致整个 server panic,即使一些线程可能创建成功了。出于简单的考虑,这个行为是可行的,不过在一个生产级别的线程池实现中,你可能会希望使用 std::thread::Builder 和其 spawn 方法来返回一个 Result

这段代码能够编译并用指定给 ThreadPool::new 的参数创建存储了一系列的 Worker 实例,不过 仍然 没有处理 execute 中得到的闭包。让我们聊聊接下来怎么做。

使用信道向线程发送请求

下一个需要解决的问题是传递给 thread::spawn 的闭包完全没有做任何工作。目前,我们在 execute 方法中获得期望执行的闭包,不过在创建 ThreadPool 的过程中创建每一个 Worker 时需要向 thread::spawn 传递一个要运行的闭包。

我们希望刚创建的 Worker 结构体能够从 ThreadPool 的队列中获取需要执行的代码,并发送到线程中执行。

在第十六章,我们学习了 信道 —— 一个沟通两个线程的简单手段 —— 对于这个例子来说则是绝佳的选择。这里信道将充当任务队列的作用,execute 将通过 ThreadPool 向其中线程正在寻找工作的 Worker 实例发送任务。计划如下:

  1. ThreadPool 会创建一个信道并持有发送端。
  2. 每个 Worker 将持有接收端。
  3. 新建一个 Job 结构体来存放用于向信道中发送的闭包。
  4. execute 方法会在发送者发出期望执行的工作。
  5. 在线程中,Worker 会遍历接收者并执行任何接收到的工作。

让我们以在 ThreadPool::new 中创建信道并让 ThreadPool 实例充当发送者开始,如示例 21-16 所示。Job 结构体目前为空,但它将作为我们通过通道发送的类型:

文件名:src/lib.rs

use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender } } // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }

示例 21-16: 修改 ThreadPool 来储存一个传输 Job 实例的发送者

ThreadPool::new 中,新建了一个信道,并接着让线程池在接收端等待。这段代码能够成功编译。

让我们尝试在线程池创建每个 worker 时将接收端传递给它们。须知我们希望在 worker 所分配的线程中使用接收者,所以将在闭包中引用 receiver 参数。示例 21-17 中展示的代码还不能编译:

文件名:src/lib.rs

use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } // --snip-- struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }

示例 21-17: 将信道的接收端传递给 worker

这是一些简单而直观的修改:将接收端传递进了 Worker::new,并接着在闭包中使用它。

如果尝试 check 代码,会得到这个错误:

PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4> cargo check Checking hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_4\hello) error[E0061]: this function takes 2 arguments but 1 argument was supplied --> hello\src\lib.rs:31:26 | 31 | workers.push(Worker::new(id)); | ^^^^^^^^^^^---- argument #2 of type `std::sync::mpsc::Receiver<Job>` is missing | note: associated function defined here --> hello\src\lib.rs:51:8 | 51 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { | ^^^ ----------------------------- help: provide the argument | 31 | workers.push(Worker::new(id, /* std::sync::mpsc::Receiver<Job> */)); | ++++++++++++++++++++++++++++++++++++++ For more information about this error, try `rustc --explain E0061`. error: could not compile `hello` (lib) due to 1 previous error PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4>

这段代码尝试将 receiver 传递给多个 Worker 实例。这是不行的,回忆第十六章:Rust 所提供的信道实现是多 生产者,单 消费者 的。这意味着不能简单的克隆信道的消费端来解决问题。我们也不希望将一个消息向多个消费者发送多次;我们希望有一个消息列表和多个 worker 这样每个消息就只会处理一次。

另外,从信道队列中取出任务涉及到修改 receiver,所以这些线程需要一个能安全的共享和修改 receiver 的方式,否则可能导致竞争状态(参考第十六章)。

回忆一下第十六章讨论的线程安全智能指针,为了在多个线程间共享所有权并允许线程修改其值,需要使用 Arc<Mutex<T>>Arc 使得多个 Worker 实例拥有接收端,而 Mutex 则确保一次只有一个 Worker 能从接收端得到任务。示例 21-18 展示了所需的修改:

文件名:src/lib.rs

use std::{ sync::{Arc, Mutex, mpsc}, thread, }; // --snip-- pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } // --snip-- struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }

示例 21-18: 使用 ArcMutex 在 worker 间共享接收者

ThreadPool::new 中,将接收端放入 ArcMutex 中。对于每一个新 WorkerArc 来增加引用计数,如此这些 Worker 实例就可以共享接收者的所有权了。

通过这些修改,代码可以编译了!我们已经快完成了!

PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4> cargo build Compiling hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_4\hello) Compiling example21_2_4 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_4) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.29s PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4>

实现execute方法

最后让我们实现 ThreadPool 上的 execute 方法。同时也要修改 Job 结构体:它将不再是结构体,Job 将是一个有着 execute 接收到的闭包类型的 trait 对象的类型别名。第二十章 “使用类型别名创建类型同义词” 部分提到过,类型别名允许将长的类型变短。观察示例 21-19:

文件名:src/lib.rs

use std::{ sync::{Arc, Mutex, mpsc}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // --snip-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } // --snip-- struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }

示例 21-19: 为存放每一个闭包的 Box 创建一个 Job 类型别名,接着在信道中发出任务

在使用 execute 得到的闭包新建 Job 实例之后,将这些任务从信道的发送端发出。这里调用 send 上的 unwrap,因为发送可能会失败,这可能发生于例如停止了所有线程执行的情况,这意味着接收端停止接收新消息了。不过目前我们无法停止线程执行;只要线程池存在它们就会一直执行。使用 unwrap 是因为我们知道失败不可能发生,不过编译器不知道这些。

不过到此事情还没有结束!在 Worker 中,传递给 thread::spawn 的闭包仍然还只是 引用 了信道的接收端。相反我们需要闭包一直循环,向信道的接收端请求任务,并在得到任务时执行它们。如示例 21-20 对 Worker::new 做出修改:

文件名:src/lib.rs

// 21.2节 将单线程 server 变为多线程 server ————Worker 结构体负责将代码从 ThreadPool 传递给线程 // 参考 示例 21-15, 21-16, 21-17, 21-18, 21-19, 21-20 use std::{ sync::{Arc, Mutex, mpsc}, thread, }; #[allow(unused)] pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; #[allow(unused)] impl ThreadPool { /// 创建一个新的线程池。 /// Create a new ThreadPool. /// size 是池中线程的数量。 /// The size is the number of threads in the pool. /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool {workers, sender} } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } #[allow(unused)] struct Worker { id: usize, thread: thread::JoinHandle<()>, } #[allow(unused)] impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } println!("thread end==============================="); }); Worker {id, thread} } }

示例 21-20: 在 worker 线程中接收并执行任务

这里,首先在 receiver 上调用了 lock 来获取互斥器,接着 unwrap 在出现任何错误时 panic。如果互斥器处于一种叫做 被污染poisoned)的状态时获取锁可能会失败,这可能发生于其他线程在持有锁时 panic 了且没有释放锁。在这种情况下,调用 unwrap 使其 panic 是正确的行为。请随意将 unwrap 改为包含有意义错误信息的 expect

如果锁定了互斥器,接着调用 recv 从信道中接收 Job。最后的 unwrap 也绕过了一些错误,这可能发生于持有信道发送端的线程停止的情况,类似于如果接收端关闭时 send 方法如何返回 Err 一样。

调用 recv 会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务。Mutex<T> 确保一次只有一个 Worker 线程尝试请求任务。

现在线程池处于可以运行的状态了!执行 cargo run 并发起一些请求:

PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4> cargo run Compiling hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_4\hello) Compiling example21_2_4 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_4) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.36s Running `target\debug\example21_2_4.exe` Worker 0 got a job; executing. thread '<unnamed>' (16176) panicked at hello\src\lib.rs:62:59: called `Result::unwrap()` on an `Err` value: RecvError note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace thread '<unnamed>' (7284) panicked at hello\src\lib.rs:62:43: called `Result::unwrap()` on an `Err` value: PoisonError { .. } thread '<unnamed>' (5160) panicked at hello\src\lib.rs:62:43: called `Result::unwrap()` on an `Err` value: PoisonError { .. } thread '<unnamed>' (1704) panicked at hello\src\lib.rs:62:43: called `Result::unwrap()` on an `Err` value: PoisonError { .. }

成功了!现在我们有了一个可以异步执行连接的线程池!它绝不会创建超过四个线程,所以当服务端收到大量请求时系统也不会负担过重。如果请求 /sleep,server 也能够通过另外一个线程处理其他请求。

注意如果同时在多个浏览器窗口打开 /sleep,它们可能会彼此间隔地加载 5 秒,因为一些浏览器出于缓存的原因会顺序执行相同请求的多个实例。这些限制并不是由于我们的 web 服务端造成的。

完整的实验代码:

项目结构:

example21_2_4\hello\src\lib.rs文件:

// 21.2节 将单线程 server 变为多线程 server ————Worker 结构体负责将代码从 ThreadPool 传递给线程 // 参考 示例 21-15, 21-16, 21-17, 21-18, 21-19, 21-20 use std::{ sync::{Arc, Mutex, mpsc}, thread, }; #[allow(unused)] pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; #[allow(unused)] impl ThreadPool { /// 创建一个新的线程池。 /// Create a new ThreadPool. /// size 是池中线程的数量。 /// The size is the number of threads in the pool. /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool {workers, sender} } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } #[allow(unused)] struct Worker { id: usize, thread: thread::JoinHandle<()>, } #[allow(unused)] impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } println!("thread end==============================="); }); Worker {id, thread} } }

example21_2_4\src\main.rs文件

// 21.2节 将单线程 server 变为多线程 server ————Worker 结构体负责将代码从 ThreadPool 传递给线程 // 参考 示例 21-12: 假想的 ThreadPool 接口 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; use hello::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); let pool = ThreadPool::new(4); pool.execute(|| { handle_connection(stream); }); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }

example21_2_4\Cargo.toml文件:

[package] name = "example21_2_4" version = "0.1.0" edition = "2024" [dependencies] hello = {path = "./hello"}

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_2_4 Creating binary (application) `example21_2_4` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_2_4 PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4> cargo new hello --lib Creating library `hello` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4> PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_4> cargo run Compiling hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_4\hello) Compiling example21_2_4 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_4) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.33s Running `target\debug\example21_2_4.exe` Worker 0 got a job; executing. Worker 0 shutting down. Worker 1 shutting down. Worker 3 shutting down. Worker 2 shutting down. Worker 0 got a job; executing. Worker 0 shutting down. Worker 1 shutting down. Worker 3 shutting down. Worker 2 shutting down.

在学习了第十七章和第十八章的 while let 循环之后,你可能会好奇为何不能如此编写 worker 线程,如示例 21-21 所示:

文件名:src/lib.rs

// 21.2节 将单线程 server 变为多线程 server ————实现execute方法 // 参考 示例 21-21: 一个使用 while let 的 Worker::new 替代实现 use std::{ sync::{Arc, Mutex, mpsc}, thread, }; #[allow(unused)] pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; #[allow(unused)] impl ThreadPool { /// 创建一个新的线程池。 /// Create a new ThreadPool. /// size 是池中线程的数量。 /// The size is the number of threads in the pool. /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool {workers, sender} } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } #[allow(unused)] struct Worker { id: usize, thread: thread::JoinHandle<()>, } #[allow(unused)] impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { // 这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。 // while let 会自动处理 Result while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } // 当 recv() 返回 Err 时循环结束 println!("Worker {} shutting down.", id); }); Worker {id, thread} } }

示例 21-21: 一个使用 while letWorker::new 替代实现

这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。其原因有些微妙:Mutex 结构体没有公有 unlock 方法,因为锁的所有权依赖 lock 方法返回的 LockResult<MutexGuard<T>>MutexGuard<T> 的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 Mutex 守护的资源,不过如果没有认真的思考 MutexGuard<T> 的生命周期的话,也可能会导致比预期更久的持有锁。

示例 21-20 中的代码使用的 let job = receiver.lock().unwrap().recv().unwrap(); 之所以可以工作是因为对于 let 来说,当 let 语句结束时任何表达式中等号右侧使用的临时值都会立即被丢弃。然而 while letif letmatch)直到相关的代码块结束都不会丢弃临时值。在示例 21-21 中,job() 调用期间锁一直持续,这也意味着其他的 Worker 实例无法接收任务。

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_2_5 Creating binary (application) `example21_2_5` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_2_5 PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_5> cargo new hello --lib Creating library `hello` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21\example21_2_5> cargo run Locking 1 package to latest Rust 1.92.0 compatible version Compiling hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_5\hello) Compiling example21_2_5 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_2_5) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.58s Running `target\debug\example21_2_5.exe` Worker 1 got a job; executing. Worker 1 shutting down. Worker 0 shutting down. Worker 2 shutting down. Worker 3 shutting down.

浏览器请求:

21.3 优雅停机与清理

示例 21-20 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说 workersidthread 字段没有直接被使用,这提醒了我们并没有清理所有的内容。当使用不那么优雅的 ctrl-c 终止主线程时,所有其他线程也会立刻停止,即便它们正处于处理请求的过程中。

现在我们要为 ThreadPool 实现 Drop trait 对线程池中的每一个线程调用 join,这样这些线程在关闭前将会执行完它们的请求。接着会为 ThreadPool 实现一个告诉线程它们应该停止接收新请求并结束的方式。为了实践这些代码,修改服务端在优雅停机(graceful shutdown)之前只接受两个请求。

在我们开始时需要注意的是:这一切都不会影响处理执行闭包的那部分代码因此如果我们在异步运行时中使用线程池,所有操作也完全相同。

为ThreadPool实现Drop Trait

现在开始为线程池实现 Drop。当线程池被丢弃时,应该 join 所有线程以确保它们完成其操作。示例 21-22 展示了 Drop 实现的第一次尝试;这些代码还不能够编译:

文件名:src/lib.rs

impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }

示例 21-22: 当线程池离开作用域时 join 每个线程

这里首先遍历线程池中的每个 workers。这里使用了 &mut 因为 self 本身是一个可变引用而且也需要能够修改 worker。对于每一个线程,会打印出说明信息表明此特定 Worker 实例正在关闭,接着在 Worker 实例的线程上调用 join。如果 join 调用失败,通过 unwrap 使得 panic 并进行不优雅的关闭。

PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_1> cargo check Locking 1 package to latest Rust 1.92.0 compatible version Adding hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_3_1\hello) Checking hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_3_1\hello) error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference --> hello\src\lib.rs:55:13 | 55 | worker.thread.join().unwrap(); | ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call | | | move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait | note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread` --> /rustc/ded5c06cf21d2b93bffd5d884aa6e96934ee4234\library\std\src\thread\mod.rs:1962:17 For more information about this error, try `rustc --explain E0507`. error: could not compile `hello` (lib) due to 1 previous error PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_1>

这里的错误告诉我们并不能调用 join,因为我们只有每一个 worker 的可变借用,而 join 需要获取其参数的所有权。为了解决这个问题,需要一个方法将 thread 移动出拥有其所有权的 Worker 实例以便 join 可以消费这个线程。示例 18-15 中我们曾见过这么做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上调用 take 方法将值从 Some 成员中移动出来而对 None 成员不做处理。换句话说,正在运行的 Workerthread 将是 Some 成员值,而当需要清理 worker 时,将 Some 替换为 None,这样 worker 就没有可以运行的线程了。

然而,这种情况会在丢弃 Worker 时出现。相应地,我们必须在任何访问 worker.thread 时处理 Option<thread::JoinHandle<()>>。在惯用的 Rust 代码中 Option 用的很多,但当你发现自己总是知道 Option 中一定会有值,却还要将其包装在 Option 中来应对这一场景时,就应该考虑其他更优雅的方法了。

在这个例子中,存在一个更好的替代方案:Vec::drain 方法。它接受一个 range 参数来指定哪些项要从 Vec 中移除,并返回一个这些项的迭代器。使用 .. range 语法会从 Vec 中移除所有值。

因此我们需要像下面这样更新 ThreadPooldrop 实现:

文件名:src/lib.rs

impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }

这解决了编译器错误且不需要对我们的代码做其它更改。

完整的实验代码:

项目结构:

example21_3_1\hello\src\lib.rs文件:

// 21.3节 优雅停机与清理 ————为 ThreadPool 实现 Drop Trait // 参考 示例 21-22: 当线程池离开作用域时 join 每个线程 use std::{ sync::{Arc, Mutex, mpsc}, thread, }; #[allow(unused)] pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; #[allow(unused)] impl ThreadPool { /// 创建一个新的线程池。 /// Create a new ThreadPool. /// size 是池中线程的数量。 /// The size is the number of threads in the pool. /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool {workers, sender} } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { //for worker in &mut self.workers { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } #[allow(unused)] struct Worker { id: usize, thread: thread::JoinHandle<()>, } #[allow(unused)] impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { // 这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。 // while let 会自动处理 Result while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } // 当 recv() 返回 Err 时循环结束 println!("Worker {} shutting down.", id); }); Worker {id, thread} } }

example21_3_1\src\main.rs文件:

// 21.2节 将单线程 server 变为多线程 server ————Worker 结构体负责将代码从 ThreadPool 传递给线程 // 参考 示例 21-12: 假想的 ThreadPool 接口 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; use hello::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); let pool = ThreadPool::new(4); pool.execute(|| { handle_connection(stream); }); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }

example21_3_1\Cargo.toml文件:

[package] name = "example21_3_1" version = "0.1.0" edition = "2024" [dependencies] hello = {path = "./hello"}

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_3_1 Creating binary (application) `example21_3_1` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_3_1 PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_1> cargo new hello --lib Creating library `hello` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_1> PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_1> cargo run Compiling hello v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_3_1\hello) Compiling example21_3_1 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_3_1) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.67s Running `target\debug\example21_3_1.exe` Shutting down worker 0 Worker 2 got a job; executing. 

浏览器请求:

向线程发送信号使期停止接收任务

有了所有这些修改,代码就能编译且没有任何警告。然而,坏消息是,这些代码还不能以我们期望的方式运行。问题的关键在于 Worker 实例中分配的线程所运行的闭包中的逻辑:此时,调用 join 并不会关闭线程,因为它们一直 loop 来寻找任务。如果采用这个实现来尝试丢弃 ThreadPool,则主线程会永远阻塞在等待第一个线程结束上。

为了修复这个问题,我们将修改 ThreadPooldrop 实现并修改 Worker 循环。

首先修改 ThreadPooldrop 实现在等待线程结束前显式地丢弃 sender。示例 21-23 展示了 ThreadPool 显式丢弃 sender 所作的修改。与处理线程时不同,这里确实需要使用 Option,以便能够使用 Option::takesenderThreadPool 中移出。

文件名:src/lib.rs

pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } // --snip-- impl ThreadPool { pub fn new(size: usize) -> ThreadPool { // --snip-- ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }

示例 21-23: 在 join Worker 线程之前显式丢弃 sender

丢弃 sender 会关闭信道,这表明不会有更多的消息被发送。这时 Worker 实例中的无限循环中的所有 recv 调用都会返回错误。在示例 21-24 中,我们修改 Worker 循环在这种情况下优雅地退出,这意味着当 ThreadPooldrop 实现调用 join 时线程会结束。

文件名:src/lib.rs

impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } } }); Worker { id, thread } } }

示例 21-24:当 recv 返回错误时显式退地出循环

为了实践这些代码,如示例 21-25 所示修改 main 在优雅停机服务端之前只接受两个请求:

文件名:src/main.rs

fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); }

示例 21-25: 在处理两个请求之后通过退出循环来停止服务端

你不会希望真实世界的 web 服务端只处理两次请求就停机了,这只是为了展示优雅停机和清理处于正常工作状态。

take 方法定义于 Iterator trait,这里限制循环最多头 2 次。ThreadPool 会在 main 的结尾离开作用域, drop 实现会运行。

使用 cargo run 启动服务端,并发起三个请求。第三个请求应该会失败,而终端的输出应该看起来像这样:

PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_2> cargo run Compiling example21_3_2 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_3_2) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.27s Running `target\debug\example21_3_2.exe` Worker 0 got a job; executing. Hello Rust Shutting down worker 0 Worker 3 disconnected; shutting down. Worker 2 disconnected; shutting down. Worker 1 got a job; executing. Worker 0 disconnected; shutting down. Shutting down worker 1 Worker 1 disconnected; shutting down. Shutting down worker 2 Shutting down worker 3

可能会出现不同顺序的 Worker ID 和信息输出。可以从信息中看到服务是如何运行的:Worker 实例 0 和 3 获取了头两个请求。server 会在头第二个请求后停止接受请求,ThreadPoolDrop 实现甚至会在 Worker 3 开始工作之前就开始执行。丢弃 sender 会断开所有 Worker 实例的连接并让它们关闭。每个 Worker 实例在断开时会打印出一个信息,接着线程池调用 join 来等待每一个 Worker 线程结束。

注意在这个特定的运行过程中一个有趣的地方在于:ThreadPool 丢弃 sender,而在任何 Worker 收到消息之前,就尝试 join Worker 0 Worker 0 还没有从 recv 获得一个错误,所以主线程阻塞直到 Worker 0 结束。与此同时,Worker 3 接收到一个任务接着所有线程会收到一个错误。一旦 Worker 0 结束,主线程就等待余下其他 worker 结束。此时它们都退出了循环并停止。

恭喜!现在我们完成了这个项目,也有了一个使用线程池异步响应请求的基础 web 服务端。我们能对服务端执行优雅停机,它会清理线程池中的所有线程。

如下是完整的代码参考:

项目结构:

example21_3_2\hello\src\lib.rs文件:

// 21.3节 优雅停机与清理 ————向线程发送信号使期停止接收任务 // 参考 示例 21-23: 在 join Worker 线程之前显式丢弃 sender // 示例 21-24:当 recv 返回错误时显式退地出循环 use std::{ sync::{Arc, Mutex, mpsc}, thread, }; #[allow(unused)] pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } type Job = Box<dyn FnOnce() + Send + 'static>; #[allow(unused)] impl ThreadPool { /// 创建一个新的线程池。 /// Create a new ThreadPool. /// size 是池中线程的数量。 /// The size is the number of threads in the pool. /// # Panics /// /// 如果 size 为 0,`new` 方法会 panic。 /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } #[allow(unused)] struct Worker { id: usize, thread: thread::JoinHandle<()>, } #[allow(unused)] impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } } // end loop // 循环结束 //println!("loop end=======================."); }); Worker {id, thread} } }

example21_3_2\src\main.rs文件:

// 21.3节 优雅停机与清理 ————向线程发送信号使期停止接收任务 // 参考 示例 21-25: 在处理两个请求之后通过退出循环来停止服务端 use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; use hello::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Hello Rust"); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }

example21_3_2\Cargo.toml文件:

[package] name = "example21_3_2" version = "0.1.0" edition = "2024" [dependencies] hello = {path = "./hello"}

编译运行:

PS F:\rustproject\RustProgramLanguage\chapter21> cargo new example21_3_2 Creating binary (application) `example21_3_2` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21> cd example21_3_2 PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_2> cargo new hello --lib Creating library `hello` package note: see more `Cargo.toml` keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_2> cargo run Compiling example21_3_2 v0.1.0 (F:\rustproject\RustProgramLanguage\chapter21\example21_3_2) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.27s Running `target\debug\example21_3_2.exe` Worker 0 got a job; executing. Hello Rust Shutting down worker 0 Worker 3 disconnected; shutting down. Worker 2 disconnected; shutting down. Worker 1 got a job; executing. Worker 0 disconnected; shutting down. Shutting down worker 1 Worker 1 disconnected; shutting down. Shutting down worker 2 Shutting down worker 3 PS F:\rustproject\RustProgramLanguage\chapter21\example21_3_2> 

浏览器请求两次

我们还能做得更多!如果你希望继续增强这个项目,如下是一些点子:

  • ThreadPool 和其公有方法增加更多文档
  • 为库的功能增加测试
  • unwrap 调用改为更健壮的错误处理
  • 使用 ThreadPool 进行其他不同于处理网络请求的任务
  • crates.io 上寻找一个线程池 crate 并使用它实现一个类似的 web 服务端,将其 API 和鲁棒性与我们的实现做对比

总结

好极了!你已经完成了本书的学习!由衷感谢你与我们一道踏上这段 Rust 之旅。现在你已经准备好实现自己的 Rust 项目并帮助他人了。请不要忘记我们的社区,这里有其他 Rustaceans 正乐于帮助你迎接 Rust 之路上的任何挑战。

Read more

Java在AI时代的崛起:从传统机器学习到AIGC的全栈解决方案

Java在AI时代的崛起:从传统机器学习到AIGC的全栈解决方案

个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[[email protected]] 📱个人微信:15279484656 🌐个人导航网站:www.forff.top 💡座右铭:总有人要赢。为什么不能是我呢? * 专栏导航: 码农阿豪系列专栏导航 面试专栏:收集了java相关高频面试题,面试实战总结🍻🎉🖥️ Spring5系列专栏:整理了Spring5重要知识点与实战演练,有案例可直接使用🚀🔧💻 Redis专栏:Redis从零到一学习分享,经验总结,案例实战💐📝💡 全栈系列专栏:海纳百川有容乃大,可能你想要的东西里面都有🤸🌱🚀 目录 * Java在AI时代的崛起:从传统机器学习到AIGC的全栈解决方案 * 一、Java AI生态概览:多样化的技术选择 * 1.1 深度学习框架:接轨主流AI技术 * Deep Java Library

【Coze-AI智能体平台】解锁 Coze 工作流:逻辑控制・数据处理・AIGC 多媒体全场景实战

【Coze-AI智能体平台】解锁 Coze 工作流:逻辑控制・数据处理・AIGC 多媒体全场景实战

🔥小龙报:个人主页 🎬作者简介:C++研发,嵌入式,机器人方向学习者 ❄️个人专栏:《coze智能体开发平台》 ✨ 永远相信美好的事情即将发生 文章目录 * 前言 * 一、业务逻辑节点 * 1.1 选择器节点 * 1.2 意图识别节点 * 1.3 循环节点 * 1.4 批处理节点 * 1.5 变量聚合节点 * 1.6 代码节点 * 1.6.1 JSON? * 1.6.2 python异步编程 * 1.7 数据库节点 * 1.7.1 新增数据节点 * 1.7.2

服务器环境 VsCode:Github Copilot 安装完成却用不了?关键步骤补全

GitHub Copilot在VS Code中无法使用的关键解决步骤 1. 基础环境检查 * VS Code版本:确保使用最新版(至少≥1.60),旧版可能导致兼容问题 * Copilot状态:在VS Code左侧活动栏点击Copilot图标(飞机形状),检查是否显示已登录和启用状态 * 网络环境:Copilot需访问GitHub服务器,尝试关闭代理或检查防火墙是否屏蔽api.github.com 2. 核心配置步骤 # 步骤1:检查Copilot是否激活 # 在VS Code命令面板(Ctrl+Shift+P)输入: > GitHub Copilot: Check Status # 步骤2:重置授权令牌(常见问题根源) > GitHub Copilot: Reset GitHub Copilot Token # 步骤3:强制刷新扩展 >