写在前面:
最近看到了一篇讲 Rust 如何对框架进行抽象的文章,写得非常好,这两天抽空翻译了一下。
原文:https://tokio.rs/blog/2021-05-14-inventing-the-service-trait
正文
Tower
是一个模块化和可重复使用的用来构建 client 和 server 的组件库。其核心是Service
特性。一个Service
是一个异步函数,它接受一个请求并产生一个响应。然而,Tower
设计的某些方面可能不是那么一目了然。
与其解释今天存在于Tower
中的Service
特性,不如来看看Service
背后的设计考量。让我们试试看,如果今天重新设计实现它,我们会怎么做。
想象一下,你正在用 Rust 构建一个简单的 HTTP 框架。这个框架将允许用户提供接收请求并返回响应的处理逻辑来实现一个 HTTP 服务器。你可能会有这么一个 API:
1 | // 创建一个在 3000 端口监听的服务器 |
现在问题来了,the_users_application
应该是什么?
最简单的一个实现,可能是这样的:
1 | fn handle_request(request: HttpRequest) -> HttpResponse { |
其中HttpRequest
和HttpResponse
是由我们的框架提供的一些结构。有了这个,我们就可以这样实现Server::run
:
1 | impl Server { |
在这里,我们有一个异步函数run
,它接受一个闭包,这个闭包接受一个HttpRequest
并返回HttpResponse
。用户可以像这样使用我们的server
:
1 | fn handle_request(request: HttpRequest) -> HttpResponse { |
感觉还行,它让用户可以很容易地运行 HTTP 服务器而不必担心任何低层次的细节。
然而,我们目前的设计有一个问题:我们无法处理异步地处理请求。想象一下,我们的用户需要查询一个数据库,或者在处理请求的同时发送一个请求给其他服务器。目前,这样会导致我们需要同步等待 handler 的返回结果,从而导致了阻塞。
如果我们希望我们的服务器能够处理大量的并发连接,我们需要在等待该请求异步完成的同时为其他请求提供服务。我们可以通过让 handler 返回一个future
来解决这个问题。
1 | impl Server { |
API 的用法和之前差不多:
1 | // 现在是一个异步函数 |
这就比之前要好很多了,因为我们的 handler 现在可以调用其他异步函数啦。然而,我们仍然缺了点啥——如果我们的处理程序出错了怎么办?我们可以让 Handler 返回一个Result
:
1 | impl server { |
添加更多的功能
现在,假设我们想确保所有的请求都能及时完成或失败,而不是让客户端无限期地等待一个可能永远不会有的响应。
我们可以通过给每个请求添加一个超时来做到这一点。一个超时设置了handler
允许持续的最大时间的限制。如果它在这个时间内没有产生响应,就会返回一个错误。这使得客户端可以重试该请求或向用户报告错误,而不是永远等待。
最简单的方法可能是去修改Server
,使其可以配置一个超时,然后在每次调用handler
时应用该超时。然而,其实你也可以在不修改Server
的情况下添加一个超时。使用tokio::time::timeout
,我们可以写一个新的处理函数,让它调用我们之前的handle_request
,并且设置超时时间为 30 秒:
1 | async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> { |
这提供了一个相当好的抽象,我们能够添加一个超时器而不改变任何现有的代码。
让我们用这种方式再增加一个功能。想象一下,我们正在写一个 JSON API,并且希望在所有的响应上有一个Content-Type: application/json
的头。我们可以用类似的方式包装handler_with_timeout
:
1 | async fn handler_with_timeout_and_content_type( |
我们现在有了一个处理程序,它将处理一个 HTTP 请求,超时为 30 秒,并且会设置好正确的Content-Type
头,所有这些都不需要修改我们原来的handle_request
函数或Server
结构。
设计可以以这种方式扩展的库是非常强大的,因为它允许用户通过增加一层新行为来扩展库的功能,而不需要等待库的维护者为其添加支持。
它也使测试变得更容易,因为你可以把你的代码分解成小的隔离的孤立的单元,并为它们编写细粒度的测试,而不必担心其他的部分。
然而,又有了一个问题:我们目前的设计是套娃,也就是实现一个处理函数来实现功能,并在其内部调用其他处理函数。这能 work,但如果我们想增加更多的额外功能,它并不能很好地扩展。
想象一下,我们有许多handle_with_*
函数,每一个都增加了一点儿新的行为。要硬编码谁调用谁的这个调用链将成为一种挑战。我们目前的调用链是:
handler_with_timeout_and_content_type
,调用handler_with_timeout
,调用handle_request
,实际处理请求。
如果我们能以某种方式组合这三个函数而不需要硬编码确切的顺序,那就更好了,就像这样:
1 | let final_handler = with_content_type(with_timeout(handle_request)); |
同时仍然能够像以前一样运行我们的处理程序。
1 | server.run(final_handler).await? |
你可以把with_content_type
和with_timeout
作为函数来实现,该函数接受一个F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>
的参数并返回一个impl Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>>
的闭包。这也不是不行,但所有这些闭包类型会很快变得难以处理。
Handler
trait
让我们来尝试另一种方法。与其让Server::run
接受了一个闭包(Fn(HttpRequest) -> …
),不如让我们定义一个新的 trait 来封装async fn(HttpRequest) -> Result<HttpResponse, Error>
:
1 | trait Handler { |
有了这样一个 trait,我们就可以编写实现它的具体类型,这样我们就不必到处用Fn
了。
然而,Rust 目前不支持 async trait 方法,所以我们有两个选择:
- 让
call
返回一个 Boxed Future,如Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>
。这也就是async-trait
干的事。 - 在
Handler
中添加一个关联的type Future
,这样用户就可以指定自己的类型。
我们采用方案二,因为它是最灵活的。有一个具体的 Future 类型的用户可以避免Box
的开销,而不在乎的用户也可以使用Pin<Box<...>>
。
1 | trait handler { |
我们仍然要求Handler::Future
实现输出为Result<HttpResponse, Error>
的Future
,因为那是Server::run
的要求。
让call
接受&mut self
是有用的,因为它允许处理程序在必要时更新他们的内部状态1。
让我们把原来的handle_request
函数转换为使用这个特性的实现:
1 | struct RequestHandler; |
那我们如何基于这个实现超时呢?请记住,我们的目标是允许我们在不修改每个单独部分的情况下,将不同的功能组合在一起。
我们可以定义一个通用的Timeout
结构,就像这样:
1 | struct Timeout<T> { |
然后我们可以为Timeout<T>
实现Handler
并委托给T
的Handler
实现。
1 | impl<T> Handler for Timeout<T> |
这里重要的一行是self.inner_handler.call(request)
,在这我们继续调用内部处理程序,让它做自己的事情而不管关它是什么。我们只需要知道它完成后会返回一个Result<HttpResponse, Error>
。
但是,这段代码编译不过:
1 | error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement |
编译出错的原因是,我们正在捕获一个&mut self
并将其移到一个异步的代码块中。这意味着我们的 Future 和&mut self
的生命周期是相同的。但是这并不符合我们的预期,因为我们可能想在多个线程上运行我们的 Future 以获得更好的性能,或者产生多个 Future 并将它们全部并行运行。如果对 handler 的引用存在于 Future2 中,这就不可能了。
因此,我们需要将&mut self
转换为一个有所有权的self
。这正是Clone
所做的。
1 | // 这必须是 Clone,才能使 Timeout<T> 成为 Clone |
请注意,在这种情况下,clone 是非常便宜的,因为RequestHandler
没有任何数据,Timeout<T>
只增加了一个Duration
(也就是实际上是Copy
)。
好,我们现在更进一步了,现在我们得到了另一个错误:
1 | error[E0310]: the parameter type `T` may not live long enough |
现在的问题是,因为T
可以是任何类型。它甚至可以是一个包含引用的类型,比如Vec<&'a str>
。然而这就拉胯了,原因和之前一样。我们需要返回的 Future 有一个'static
的生命周期,这样我们可以更容易地传递它。
编译器实际上已经告诉了我们该如何解决——加个T: 'static'
:
1 | impl<T> Handler for Timeout<T> |
返回的 Future 现在满足了'static'
寿命的要求,因为它不包含引用(并且任何T
包含的引用都是'static'
的)。现在,我们的代码可以编译了!
让我们创建一个类似的 handler 在响应中添加Content-Type
头:
1 |
|
这与Timeout
的模式非常相似。
接下来我们修改Server::run
以接受我们新的Handler Trait
。
1 | impl Server { |
我们现在可以将我们的三个 handler 组合在一起:
1 | JsonContentType { |
如果我们给我们的类型添加一些new
方法,那就更容易构建啦:
1 | let handler = RequestHandler; |
搞定!我们现在可以为RequestHandler
增加额外的功能而不必修改它的实现。理论上,我们可以把我们的JsonContentType
和Timeout
handler 放到一个crate
中,然后在crates.io
上把它作为一个库发布供其他用户使用!
让Handler
更加灵活
我们的Handler trait
看着还不错,但目前它只支持我们的HttpRequest
和HttpResponse
类型。如果这些变成了泛型,用户就可以使用他们想要的任何类型。
我们将 Request 作为 Trait 的泛型参数,这样服务就可以接受许多不同类型的请求。这样,我们的 handler 就可以用于不同的协议,而不仅仅是 HTTP 了。我们定义 Response 为一个关联类型,因为对于任意给定的请求类型,只能有且只有一种(相关的)响应类型:对应的调用返回的类型。
1 |
|
我们对RequestHandler
的实现现在变成了:
1 | impl Handler<HttpRequest> for RequestHandler { |
Timeout<T>
则有点不同,因为它包装了一些其他的Handler
,并添加了一个异步超时,它实际上并不关心请求或响应类型是什么,只要它所包装的Handler
使用相同的类型。
而Error
类型则有点不同。因为tokio::time::timeout
会返回Result<T, tokio::time::error::Elapsed>
,我们必须能够把tokio::time::error::Elapsed
转换成内部Handler
的错误类型。
如果我们把所有这些东西组合在一起,我们就能获得:
1 | // `Timeout`接受任何类型的`R`的请求,只要和`T`接受相同类型的请求 |
JsonContentType
也有点不同。它不关心请求或错误类型,但它关心响应类型。它必须是Response
,这样我们才能调用set_header
。
因此,实现如下:
1 | // 还是一个通用的请求类型 |
最后,传递给Server::run
的Handler
必须使用HttpRequest
和HttpResponse
。
1 | impl Server { |
创建 server 的代码不需要变:
1 | let handler = RequestHandler; |
到目前为止,我们有了一个Handler trait
,这可以将我们的应用程序分解成独立的小部分,并可以复用。看着不错!
“如果我告诉你……”
到目前为止,我们只讨论了 server 方面的事情。但是实际上,我们的Handler trait
也适用于 HTTP 客户端。比如,我们可以想象有个客户端的Handler
接受一些请求并异步地将其发送给互联网上的某 server,我们的Timeout
包装器在这里也很有用。JsonContentType
可能没啥用,因为设置响应头不是客户端的工作。
由于我们的Handler trait
对于定义服务器和客户端都很有用,Handler
可能不是一个合适的名字,毕竟客户端并不处理一个请求,它将请求发送给服务器,然后由服务器来处理它。让我们改称我们的 trait 为Service
:
1 | trait Service<Request> { |
这实际上几乎就是Tower
中定义的Service trait
了。如果你已经跟着看到了这里,你现在已经了解了Tower
的大部分内容了。除了Service trait
,Tower
还提供了一些实用工具,通过包装其它的Service
并实现一个Service
,就像我们对Timeout
和JsonContentType
所做的那样。这些Service
的组成方式与我们到目前为止所做的类似。
以下是一些由Tower
提供的Service
示例:
像Timeout
和JsonContentType
这样的类型通常被称为中间件,因为它们包裹着另一个Service
并以某种方式对请求或响应进行处理。像RequestHandler
这样的类型通常被称为叶子服务
,因为它们位于嵌套服务树的叶子上。实际的响应通常是在叶子服务中产生,并由中间件修改。
好了,到这里唯一(唯二?)我们剩下还没聊的是backpressure和poll_ready
。
Backpressure
想象一下,现在你想写一个限制请求速率的中间件,来包装一个Service
,以对底层服务的最大并发请求数进行限制。如果你的服务对它的负载量有一个硬性的上限,这将是非常有用的。
在我们目前的Service trait
中,我们并没有一个好的方法来实现这样的东西,我们可以尝试这样做:
1 | impl<R, T> Service<R> for ConcurrencyLimit<T> { |
如果没有剩余的容量,我们必须等待,并在容量可用时以某种方式得到通知。此外,我们必须在等待时将请求保留在内存中(也称为缓冲)。这意味着,等待的请求越多,我们的程序就会使用更多的内存——如果产生的请求超过我们的服务所能处理的数量,我们可能会耗尽内存。
只有当我们确定服务有能力处理请求时,才为请求分配空间,这将是更稳健的做法。否则,在我们等待我们的服务准备好时,我们有可能使用大量的内存来缓冲请求。
如果说Service
有这样一个方法,那就完美了:
1 | trait Service<R> { |
ready
将是一个异步函数,当服务有足够的容量来接收一个新的请求时,它就会完成并返回。我们将要求用户首先调用service.ready().await
,然后再进行service.call(require).await
。
将“调用服务”与“预留容量”分开,还可以有新的用法:比如我们可以维护一组“准备好的服务”,并在后台保持更新。这样,当一个请求到来时,我们已经有了一个可以使用的服务,而不需要首先等待它准备好。
通过这种设计,ConcurrencyLimit
可以在ready
内部计算容量,而不允许用户调用call
,直到有足够的容量。
不关心容量的服务可以从ready
中立即返回,或者如果它们包含了一些内部的Service
,它们可以委托给它内部的ready
方法。
然而,现在我们仍然不能在 trait 中定义异步函数。因此,我们可以给Service
定义另一个关联类型,叫做ReadyFuture
,但是必须返回一个Future
会给我们带来我们之前遇到的同样的生命周期问题。如果有一些方法可以解决这个问题就好了。
作为替代,我们可以从Future
特性中获得一些灵感,定义一个方法叫做poll_ready
。
1 | use std::task::{Context, Poll}; |
如果服务没有容量,poll_ready
将返回Poll::Pending
;当容量变得可用时,使用Context
中的waker
通知调用者。这时,可以再次调用poll_ready
,如果它返回Poll::Ready(())
,那么容量就被保留了,就可以调用call
了。
请注意,从技术上来说,没有任何东西可以阻止用户在没有确定服务准备好的情况下调用call
,然而,这样做被认为是违反了Service
的 API 调用约定。这时候call
可以panic
如果服务没有准备好。
poll_ready
不返回Future
也意味着我们能够快速检查一个服务是否准备好了,而不需要被迫等待它准备好。如果我们
调用poll_ready
并返回Poll::Pending
,我们可以决定去做其他事情而不是等待。举个例子,这允许你写个负载均衡器,通过服务返回Poll::Pending
的频率来估计服务的负载,并将请求发送到负载最小的服务。
使用类似于futures::future::poll_fn
或者tower::ServiceExt::ready
的东西,仍然可以获得一个等待服务容量可用的 Future。
这种服务与它们的调用者沟通其容量的概念被称为“反压传播”。你可以把它看作是服务向后反推他们的调用者,并且如果他们产生的请求太快了时,告诉他们需要放慢速度。其基本思想是,你不应该向一个没有能力处理的服务发送请求。相反,你应该等待(缓冲),放弃请求(减负),或以其他方式处理能力不足的问题。你可以在这里和这里了解更多关于背压的一般概念。
最后,在预留容量时也可能发生一些错误,所以poll_ready
也许应该返回Poll<Result<(), Self::Error>
。
有了这一改变,我们现在已经有了完整的tower::Service
特性。
1 | pub trait Service<Request> { |
许多中间件不添加自己的背压,而只是委托给被封装的服务的poll_ready
实现。然而,中间件的背压确实可以实现一些有趣的用例,例如各种速率限制、负载均衡和自动扩容。
由于你永远不知道一个Service
可能由哪些中间件组成,所以重要的是不要忘记调用poll_ready
。
有了这一切,调用服务的最常用方法是:
1 | use tower::{ |
脚注
1: 关于call
是否应该使用Pin<&mut Self>
,已经有了一些讨论。但是到目前为止,我们决定采用一个普通的 &mut self
,这意味着 handler(咳,Services)必须是Unpin
。在实践中,这很少出现问题。更多细节可以看这里。
2: 说得更准确一点,这要求响应返回的 Future 必须是'static'
的,因为写Box<dyn Future>
实际上会被 desugar 成Box<dyn Future + 'static>
,因此在fn call(&'_ mut self, ...)
中的匿名lifetime
并不满足这个要求。在未来,Rust编译器团队计划增加一个名为泛型关联类型(GAT)的功能,这将解决这个问题。泛型关联类型允许我们将响应的 future 定义为type Future<'a>
,call
定义为fn call<'a>(&'a mut self, ...) -> Self::Future<'a>
,但现在响应返回的 Future 必须是'static
的。