【译】Inventing the Service trait

写在前面:

最近看到了一篇讲 Rust 如何对框架进行抽象的文章,写得非常好,这两天抽空翻译了一下。

原文:https://tokio.rs/blog/2021-05-14-inventing-the-service-trait

正文

Tower是一个模块化和可重复使用的用来构建 client 和 server 的组件库。其核心是Service特性。一个Service是一个异步函数,它接受一个请求并产生一个响应。然而,Tower设计的某些方面可能不是那么一目了然。

与其解释今天存在于Tower中的Service特性,不如来看看Service背后的设计考量。让我们试试看,如果今天重新设计实现它,我们会怎么做。

想象一下,你正在用 Rust 构建一个简单的 HTTP 框架。这个框架将允许用户提供接收请求并返回响应的处理逻辑来实现一个 HTTP 服务器。你可能会有这么一个 API:

1
2
3
4
5
// 创建一个在 3000 端口监听的服务器
let server = Server::new("127.0.0.1:3000").await?

// 以某种方式运行用户的应用程序
server.run(the_users_application).await?

现在问题来了,the_users_application应该是什么?

最简单的一个实现,可能是这样的:

1
2
3
fn handle_request(request: HttpRequest) -> HttpResponse {
// ...
}

其中HttpRequestHttpResponse是由我们的框架提供的一些结构。有了这个,我们就可以这样实现Server::run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
impl Server {
async fn run<F>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> HttpResponse,
{
let listener = TcpListener::bind(self.addr).await?

loop {
let mut connection = listener.accept().await?
let request = read_http_request(&mut connection).await?

// 调用由用户提供的处理程序
let response = handler(request);

write_http_response(connection).await?
}
}
}

在这里,我们有一个异步函数run,它接受一个闭包,这个闭包接受一个HttpRequest并返回HttpResponse。用户可以像这样使用我们的server

1
2
3
4
5
6
7
8
9
10
fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!" )
} else {
HttpResponse::not_found()
}
}

// 运行服务器并使用我们的 handle_request 函数处理请求
server.run(handle_request).await?

感觉还行,它让用户可以很容易地运行 HTTP 服务器而不必担心任何低层次的细节。

然而,我们目前的设计有一个问题:我们无法处理异步地处理请求。想象一下,我们的用户需要查询一个数据库,或者在处理请求的同时发送一个请求给其他服务器。目前,这样会导致我们需要同步等待 handler 的返回结果,从而导致了阻塞

如果我们希望我们的服务器能够处理大量的并发连接,我们需要在等待该请求异步完成的同时为其他请求提供服务。我们可以通过让 handler 返回一个future来解决这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
impl Server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
// handler 现在返回一个通用类型的 Fut
F: Fn(HttpRequest) -> Fut,
// FUT 是一个 Future,其输出是一个 HttpResponse
Fut: Future<Output = HttpResponse>,
{
let listener = TcpListener::bind(self.addr).await?

loop {
let mut connection = listener.accept().await?
let request = read_http_request(&mut connection).await?

// 等待由 handler 返回的 Future
let response = handler(request).await?

write_http_response(connection).await?
}
}
}

API 的用法和之前差不多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 现在是一个异步函数
async fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!" )
} else if request.path() == "/important-data" {
// 我们现在可以在这里做异步的事情了
let some_data = fetch_data_from_database().await;
make_response(some_data)
} else {
HttpResponse::not_found()
}
}

// 运行 server 也是一样的
server.run(handle_request).await?

这就比之前要好很多了,因为我们的 handler 现在可以调用其他异步函数啦。然而,我们仍然缺了点啥——如果我们的处理程序出错了怎么办?我们可以让 Handler 返回一个Result

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
impl server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> Fut,
// 响应的 Future 允许返回 Error
Fut: Future<Output = Result<HttpResponse, Error>>。
{
let listener = TcpListener::bind(self.addr).await?

loop {
let mut connection = listener.accept().await?
let request = read_http_request(&mut connection).await?

// 对响应的 Future 进行模式匹配
match handler(request).await {
Ok(response) => write_http_response(connection).await?
Err(error) => handle_error_somehow(error, connection)。
}
}
}
}

添加更多的功能

现在,假设我们想确保所有的请求都能及时完成或失败,而不是让客户端无限期地等待一个可能永远不会有的响应。

我们可以通过给每个请求添加一个超时来做到这一点。一个超时设置了handler允许持续的最大时间的限制。如果它在这个时间内没有产生响应,就会返回一个错误。这使得客户端可以重试该请求或向用户报告错误,而不是永远等待。

最简单的方法可能是去修改Server,使其可以配置一个超时,然后在每次调用handler时应用该超时。然而,其实你也可以在不修改Server的情况下添加一个超时。使用tokio::time::timeout,我们可以写一个新的处理函数,让它调用我们之前的handle_request,并且设置超时时间为 30 秒:

1
2
3
4
5
6
7
8
9
10
11
12
async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> {
let result = tokio::time::timeout(
Duration::from_secs(30),
handle_request(request)
).await

match result {
Ok(Ok(response)) => Ok(response)。
Ok(Err(error)) => Err(error),
Err(_timeout_elapsed) => Err(Error::timeout() )。
}
}

这提供了一个相当好的抽象,我们能够添加一个超时器而不改变任何现有的代码。

让我们用这种方式再增加一个功能。想象一下,我们正在写一个 JSON API,并且希望在所有的响应上有一个Content-Type: application/json的头。我们可以用类似的方式包装handler_with_timeout

1
2
3
4
5
6
7
async fn handler_with_timeout_and_content_type(
request: HttpRequest,
) -> Result<HttpResponse, Error> {
let mut response = handler_with_timeout(request).await?
response.set_header("Content-Type", "application/json")。
Ok(response)
}

我们现在有了一个处理程序,它将处理一个 HTTP 请求,超时为 30 秒,并且会设置好正确的Content-Type头,所有这些都不需要修改我们原来的handle_request函数或Server结构。

设计可以以这种方式扩展的库是非常强大的,因为它允许用户通过增加一层新行为来扩展库的功能,而不需要等待库的维护者为其添加支持。

它也使测试变得更容易,因为你可以把你的代码分解成小的隔离的孤立的单元,并为它们编写细粒度的测试,而不必担心其他的部分。

然而,又有了一个问题:我们目前的设计是套娃,也就是实现一个处理函数来实现功能,并在其内部调用其他处理函数。这能 work,但如果我们想增加更多的额外功能,它并不能很好地扩展。

想象一下,我们有许多handle_with_*函数,每一个都增加了一点儿新的行为。要硬编码谁调用谁的这个调用链将成为一种挑战。我们目前的调用链是:

  1. handler_with_timeout_and_content_type,调用
  2. handler_with_timeout,调用
  3. handle_request,实际处理请求。

如果我们能以某种方式组合这三个函数而不需要硬编码确切的顺序,那就更好了,就像这样:

1
let final_handler = with_content_type(with_timeout(handle_request));

同时仍然能够像以前一样运行我们的处理程序。

1
server.run(final_handler).await?

你可以把with_content_typewith_timeout作为函数来实现,该函数接受一个F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>的参数并返回一个impl Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>>的闭包。这也不是不行,但所有这些闭包类型会很快变得难以处理。

Handlertrait

让我们来尝试另一种方法。与其让Server::run接受了一个闭包(Fn(HttpRequest) -> …),不如让我们定义一个新的 trait 来封装async fn(HttpRequest) -> Result<HttpResponse, Error>:

1
2
3
trait Handler {
async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}

有了这样一个 trait,我们就可以编写实现它的具体类型,这样我们就不必到处用Fn了。

然而,Rust 目前不支持 async trait 方法,所以我们有两个选择:

  1. call返回一个 Boxed Future,如Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>。这也就是async-trait干的事。
  2. Handler中添加一个关联的type Future,这样用户就可以指定自己的类型。

我们采用方案二,因为它是最灵活的。有一个具体的 Future 类型的用户可以避免Box的开销,而不在乎的用户也可以使用Pin<Box<...>>

1
2
3
4
5
trait handler {
type Future: Future<Output = Result<HttpResponse, Error>>;

fn call(&mut self, request: HttpRequest) -> Self::Future;
}

我们仍然要求Handler::Future实现输出为Result<HttpResponse, Error>Future,因为那是Server::run的要求。

call接受&mut self是有用的,因为它允许处理程序在必要时更新他们的内部状态1

让我们把原来的handle_request函数转换为使用这个特性的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct RequestHandler;

impl Handler for RequestHandler {
// 为了简单起见,我们在这里使用 Pin<Box<...>>,但也可以定义我们的
// 自己的 Future 类型,以避免开销。
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
// 与我们之前的实现相同
if request.path() == "/" {
Ok(HttpResponse::ok("Hello, World!"))
} else if request.path() == "/important-data" {
let some_data = fetch_data_from_database().await?;
Ok(make_response(some_data))
} else {
Ok(HttpResponse::not_found())
}
})
}
}

那我们如何基于这个实现超时呢?请记住,我们的目标是允许我们在不修改每个单独部分的情况下,将不同的功能组合在一起。

我们可以定义一个通用的Timeout结构,就像这样:

1
2
3
4
5
struct Timeout<T> {
// T 实现了`Handler'的类型
inner_handler: T,
duration: Duration,
}

然后我们可以为Timeout<T>实现Handler并委托给THandler实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
impl<T> Handler for Timeout<T>
where
T: Handler,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
let result = tokio::time::timeout(
self.duration,
self.inner_handler.call(request),
).await;

match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}

这里重要的一行是self.inner_handler.call(request),在这我们继续调用内部处理程序,让它做自己的事情而不管关它是什么。我们只需要知道它完成后会返回一个Result<HttpResponse, Error>

但是,这段代码编译不过:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/lib.rs:145:29
|
144 | fn call(&mut self, request: HttpRequest) -> Self::Future {
| --------- this data with an anonymous lifetime `'_`...
145 | Box::pin(async move {
| _____________________________^
146 | | let result = tokio::time::timeout(
147 | | self.duration,
148 | | self.inner_handler.call(request),
... |
155 | | }
156 | | })
| |_________^ ...is captured here, requiring it to live as long as `'static`

编译出错的原因是,我们正在捕获一个&mut self并将其移到一个异步的代码块中。这意味着我们的 Future 和&mut self的生命周期是相同的。但是这并不符合我们的预期,因为我们可能想在多个线程上运行我们的 Future 以获得更好的性能,或者产生多个 Future 并将它们全部并行运行。如果对 handler 的引用存在于 Future2 中,这就不可能了。

因此,我们需要将&mut self转换为一个有所有权的self。这正是Clone所做的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 这必须是 Clone,才能使 Timeout<T> 成为 Clone
#[derive(Clone)]
struct RequestHandler;

impl Handler for RequestHandler {
// ...
}

#[derive(Clone)]
struct Timeout<T> {
inner_handler: T,
duration: Duration,
}

impl<T> Handler for Timeout<T>
where
T: Handler + Clone,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

fn call(&mut self, request: HttpRequest) -> Self::Future {
// 获得`&mut self`的所有权
let mut this = self.clone();

Box::pin(async move {
let result = tokio::time::timeout(
this.duration,
this.inner_handler.call(request),
).await;

match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}

请注意,在这种情况下,clone 是非常便宜的,因为RequestHandler没有任何数据,Timeout<T>只增加了一个Duration(也就是实际上是Copy)。

好,我们现在更进一步了,现在我们得到了另一个错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
error[E0310]: the parameter type `T` may not live long enough
--> src/lib.rs:149:9
|
140 | impl<T> Handler for Timeout<T>
| - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | / Box::pin(async move {
150 | | let result = tokio::time::timeout(
151 | | this.duration,
152 | | this.inner_handler.call(request),
... |
159 | | }
160 | | })
| |__________^ ...so that the type `impl Future` will meet its required lifetime bounds

现在的问题是,因为T可以是任何类型。它甚至可以是一个包含引用的类型,比如Vec<&'a str>。然而这就拉胯了,原因和之前一样。我们需要返回的 Future 有一个'static的生命周期,这样我们可以更容易地传递它。

编译器实际上已经告诉了我们该如何解决——加个T: 'static'

1
2
3
4
5
6
impl<T> Handler for Timeout<T>
where
T: Handler + Clone + 'static,
{
// ...
}

返回的 Future 现在满足了'static'寿命的要求,因为它不包含引用(并且任何T包含的引用都是'static'的)。现在,我们的代码可以编译了!

让我们创建一个类似的 handler 在响应中添加Content-Type头:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#[derive(Clone)]
struct JsonContentType<T> {
inner_handler: T,
}

impl<T> Handler for JsonContentType<T>
where
T: Handler + Clone + 'static,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

fn call(&mut self, request: HttpRequest) -> Self::Future {
let mut this = self.clone();

Box::pin(async move {
let mut response = this.inner_handler.call(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
})
}
}

这与Timeout的模式非常相似。

接下来我们修改Server::run以接受我们新的Handler Trait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
impl Server {
async fn run<T>(self, mut handler: T) -> Result<(), Error>
where
T: Handler,
{
let listener = TcpListener::bind(self.addr).await?;

loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;

// have to call `Handler::call` here
match handler.call(request).await {
Ok(response) => write_http_response(connection, response).await?,
Err(error) => handle_error_somehow(error, connection),
}
}
}
}

我们现在可以将我们的三个 handler 组合在一起:

1
2
3
4
5
6
JsonContentType {
inner_handler: Timeout {
inner_handler: RequestHandler,
duration: Duration::from_secs(30),
},
}

如果我们给我们的类型添加一些new方法,那就更容易构建啦:

1
2
3
4
5
6
7
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);

// `handler` has type `JsonContentType<Timeout<RequestHandler>>`

server.run(handler).await

搞定!我们现在可以为RequestHandler增加额外的功能而不必修改它的实现。理论上,我们可以把我们的JsonContentTypeTimeout handler 放到一个crate中,然后在crates.io上把它作为一个库发布供其他用户使用!

Handler更加灵活

我们的Handler trait看着还不错,但目前它只支持我们的HttpRequestHttpResponse类型。如果这些变成了泛型,用户就可以使用他们想要的任何类型。

我们将 Request 作为 Trait 的泛型参数,这样服务就可以接受许多不同类型的请求。这样,我们的 handler 就可以用于不同的协议,而不仅仅是 HTTP 了。我们定义 Response 为一个关联类型,因为对于任意给定的请求类型,只能有且只有一种(相关的)响应类型:对应的调用返回的类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

trait Handler<Request> {
type Response;

// 错误也应该是一个关联类型。没有理由让它成为一个
// 硬编码的类型
type Error;

// 之前的 Future 类型,但现在它的输出必须使用
// 相关的 Response 和 Error 类型。
type Future: Future<Output = Result<Self::Response, Self::Error>>;

// call 没有变化,但注意这里的 Request 是个泛型,
// 而不是我们之前所使用的 HttpRequest 类型。
fn call(&mut self, request: Request) -> Self::Future;
}

我们对RequestHandler的实现现在变成了:

1
2
3
4
5
6
7
8
9
impl Handler<HttpRequest> for RequestHandler {
type Response = HttpResponse;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

fn call(&mut self, request: Request) -> Self::Future {
// 和之前一样
}
}

Timeout<T>则有点不同,因为它包装了一些其他的Handler,并添加了一个异步超时,它实际上并不关心请求或响应类型是什么,只要它所包装的Handler使用相同的类型。

Error类型则有点不同。因为tokio::time::timeout会返回Result<T, tokio::time::error::Elapsed>,我们必须能够把tokio::time::error::Elapsed转换成内部Handler的错误类型。

如果我们把所有这些东西组合在一起,我们就能获得:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// `Timeout`接受任何类型的`R`的请求,只要和`T`接受相同类型的请求
impl<R, T> Handler<R> for Timeout<T>
where
// 实际的请求类型必须不包含
// 引用。编译器会告诉我们要添加
// 这个,如果我们不这样做的话
R: 'static,
// `T`必须接受`R`类型的请求
T: Handler<R> + Clone + 'static,
// 我们必须能够将一个超时的请求转换为
// `T`的错误类型
T::Error: From<tokio::time::error::Elapsed>,
{
// 我们的响应类型与`T`相同,因此我们
// 不需要修改它
type Response = T::Response;

// 错误类型也是一样的
type Error = T::Error;

// Future 必须输出一个具有正确类型的`Result`。
type Future = Pin<Box<dyn Future<Output = Result<T::Response, T::Error>>>>;

fn call(&mut self, request: R) -> Self::Future {
let mut this = self.clone();

Box::pin(async move {
let result = tokio::time::timeout(
this.duration,
this.inner_handler.call(request),
).await;

match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(elapsed) => {
// 转换错误类型
Err(T::Error::from(elapsed))
}
}
})
}
}

JsonContentType也有点不同。它不关心请求或错误类型,但它关心响应类型。它必须是Response,这样我们才能调用set_header

因此,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 还是一个通用的请求类型
impl<R, T> Handler<R> for JsonContentType<T>
where
R: 'static,
// `T`必须接受任何类型的`R`的请求,并返回`HttpResponse`类型的响应。
T: Handler<R, Response = HttpResponse> + Clone + 'static,
{
type Response = HttpResponse;

// 我们的错误类型和`T`一致
type Error = T::Error;

type Future = Pin<Box<dyn Future<Output = Result<Response, T::Error>>>>;

fn call(&mut self, request: R) -> Self::Future {
let mut this = self.clone();

Box::pin(async move {
let mut response = this.inner_handler.call(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
})
}
}

最后,传递给Server::runHandler必须使用HttpRequestHttpResponse

1
2
3
4
5
6
7
8
impl Server {
async fn run<T>(self, mut handler: T) -> Result<(), Error>
where
T: Handler<HttpRequest, Response = HttpResponse>,
{
// ...
}
}

创建 server 的代码不需要变:

1
2
3
4
5
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);

server.run(handler).await

到目前为止,我们有了一个Handler trait,这可以将我们的应用程序分解成独立的小部分,并可以复用。看着不错!

“如果我告诉你……”

到目前为止,我们只讨论了 server 方面的事情。但是实际上,我们的Handler trait也适用于 HTTP 客户端。比如,我们可以想象有个客户端的Handler接受一些请求并异步地将其发送给互联网上的某 server,我们的Timeout包装器在这里也很有用。JsonContentType可能没啥用,因为设置响应头不是客户端的工作。

由于我们的Handler trait对于定义服务器和客户端都很有用,Handler可能不是一个合适的名字,毕竟客户端并不处理一个请求,它将请求发送给服务器,然后由服务器来处理它。让我们改称我们的 trait 为Service

1
2
3
4
5
6
7
trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;

fn call(&mut self, request: Request) -> Self::Future;
}

这实际上几乎就是Tower中定义的Service trait了。如果你已经跟着看到了这里,你现在已经了解了Tower的大部分内容了。除了Service traitTower还提供了一些实用工具,通过包装其它的Service并实现一个Service,就像我们对TimeoutJsonContentType所做的那样。这些Service的组成方式与我们到目前为止所做的类似。

以下是一些由Tower提供的Service示例:

  • Timeout——这与我们之前实现的超时基本相同。
  • Retry——自动重试失败的请求。
  • RateLimit——限制一个服务在一段时间内收到的请求数量。

TimeoutJsonContentType这样的类型通常被称为中间件,因为它们包裹着另一个Service并以某种方式对请求或响应进行处理。像RequestHandler这样的类型通常被称为叶子服务,因为它们位于嵌套服务树的叶子上。实际的响应通常是在叶子服务中产生,并由中间件修改。

好了,到这里唯一(唯二?)我们剩下还没聊的是backpressurepoll_ready

Backpressure

想象一下,现在你想写一个限制请求速率的中间件,来包装一个Service,以对底层服务的最大并发请求数进行限制。如果你的服务对它的负载量有一个硬性的上限,这将是非常有用的。

在我们目前的Service trait中,我们并没有一个好的方法来实现这样的东西,我们可以尝试这样做:

1
2
3
4
5
6
7
8
impl<R, T> Service<R> for ConcurrencyLimit<T> {
fn call(&mut self, request: R) -> Self::Future {
// 1. 检查当前正在处理的请求数的计数器。
// 2. 如果有剩余的容量,将请求发送到`T`,并增加计数器。
// 3. 如果没有,则等到有能力时再进行处理。
// 4. 当返回响应后,减去计数器。
}
}

如果没有剩余的容量,我们必须等待,并在容量可用时以某种方式得到通知。此外,我们必须在等待时将请求保留在内存中(也称为缓冲)。这意味着,等待的请求越多,我们的程序就会使用更多的内存——如果产生的请求超过我们的服务所能处理的数量,我们可能会耗尽内存。
只有当我们确定服务有能力处理请求时,才为请求分配空间,这将是更稳健的做法。否则,在我们等待我们的服务准备好时,我们有可能使用大量的内存来缓冲请求。

如果说Service有这样一个方法,那就完美了:

1
2
3
trait Service<R> {
async fn ready(&mut self);
}

ready将是一个异步函数,当服务有足够的容量来接收一个新的请求时,它就会完成并返回。我们将要求用户首先调用service.ready().await,然后再进行service.call(require).await

将“调用服务”与“预留容量”分开,还可以有新的用法:比如我们可以维护一组“准备好的服务”,并在后台保持更新。这样,当一个请求到来时,我们已经有了一个可以使用的服务,而不需要首先等待它准备好。

通过这种设计,ConcurrencyLimit可以在ready内部计算容量,而不允许用户调用call,直到有足够的容量。

不关心容量的服务可以从ready中立即返回,或者如果它们包含了一些内部的Service,它们可以委托给它内部的ready方法。

然而,现在我们仍然不能在 trait 中定义异步函数。因此,我们可以给Service定义另一个关联类型,叫做ReadyFuture,但是必须返回一个Future会给我们带来我们之前遇到的同样的生命周期问题。如果有一些方法可以解决这个问题就好了。

作为替代,我们可以从Future特性中获得一些灵感,定义一个方法叫做poll_ready

1
2
3
4
5
use std::task::{Context, Poll};

trait Service<R> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()>;
}

如果服务没有容量,poll_ready将返回Poll::Pending;当容量变得可用时,使用Context中的waker通知调用者。这时,可以再次调用poll_ready,如果它返回Poll::Ready(()),那么容量就被保留了,就可以调用call了。

请注意,从技术上来说,没有任何东西可以阻止用户在没有确定服务准备好的情况下调用call,然而,这样做被认为是违反了Service的 API 调用约定。这时候call可以panic如果服务没有准备好。

poll_ready不返回Future也意味着我们能够快速检查一个服务是否准备好了,而不需要被迫等待它准备好。如果我们
调用poll_ready并返回Poll::Pending,我们可以决定去做其他事情而不是等待。举个例子,这允许你写个负载均衡器,通过服务返回Poll::Pending的频率来估计服务的负载,并将请求发送到负载最小的服务。

使用类似于futures::future::poll_fn或者tower::ServiceExt::ready的东西,仍然可以获得一个等待服务容量可用的 Future。

这种服务与它们的调用者沟通其容量的概念被称为“反压传播”。你可以把它看作是服务向后反推他们的调用者,并且如果他们产生的请求太快了时,告诉他们需要放慢速度。其基本思想是,你不应该向一个没有能力处理的服务发送请求。相反,你应该等待(缓冲),放弃请求(减负),或以其他方式处理能力不足的问题。你可以在这里这里了解更多关于背压的一般概念。

最后,在预留容量时也可能发生一些错误,所以poll_ready也许应该返回Poll<Result<(), Self::Error>

有了这一改变,我们现在已经有了完整的tower::Service特性。

1
2
3
4
5
6
7
8
9
10
11
12
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>;

fn call(&mut self, req: Request) -> Self::Future;
}

许多中间件不添加自己的背压,而只是委托给被封装的服务的poll_ready实现。然而,中间件的背压确实可以实现一些有趣的用例,例如各种速率限制、负载均衡和自动扩容。

由于你永远不知道一个Service可能由哪些中间件组成,所以重要的是不要忘记调用poll_ready

有了这一切,调用服务的最常用方法是:

1
2
3
4
5
6
7
8
9
10
11
use tower::{
Service,
// for the `ready` method
ServiceExt,
};

let response = service
// wait for the service to have capacity
.ready().await?
// send the request
.call(request).await?;
— David Pedersen (@davidpdrsn)

脚注

1: 关于call是否应该使用Pin<&mut Self> ,已经有了一些讨论。但是到目前为止,我们决定采用一个普通的 &mut self,这意味着 handler(咳,Services)必须是Unpin。在实践中,这很少出现问题。更多细节可以看这里

2: 说得更准确一点,这要求响应返回的 Future 必须是'static'的,因为写Box<dyn Future>实际上会被 desugar 成Box<dyn Future + 'static>,因此在fn call(&'_ mut self, ...) 中的匿名lifetime并不满足这个要求。在未来,Rust编译器团队计划增加一个名为泛型关联类型(GAT)的功能,这将解决这个问题。泛型关联类型允许我们将响应的 future 定义为type Future<'a>call定义为fn call<'a>(&'a mut self, ...) -> Self::Future<'a>,但现在响应返回的 Future 必须是'static的。