需求
经常会有这样的需求,需要根据具体参数的不同调用不同的处理函数。
比如根据 URL 的不同,调用不同的路由 handler;根据 command 的不同,调用不同的处理 handler。
这些需求基本都可以抽象为一件事:把 function 放在 HashMap 里。
如果是异步编程的话,那么就是把 async function 放在 HashMap 里了。
这件事在 Rust 中做起来,要比想象中困难。
今天就来浅谈一下这个主题。
框架
我们希望设计一个路由器结构体,里面放一个 HashMap。
HashMap 以 String 为 key,具体的处理函数为 value。
在初始化的时候把处理函数按不同的 key 放到 HashMap 里,调用时根据不同情况按 key 取出。
简化的话,处理函数就定为接受一个 String 并返回一个 String。
大概是这种感觉:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;
type HandlerFn = // TODO 需要写一个 Handler 的具体类型
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, HandlerFn>>>,
}
impl Router {
// TODO 需要一个 add 函数
// async fn add
}
#[tokio::main]
async fn main() {
let mut router = Router::default();
router.add("handler1", handler1).await;
router.add("handler2", handler2).await;
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
handler("req1".into()).await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
handler("req2".into()).await;
});
handle
};
let _ = tokio::join!(handle1, handle2);
}
async fn handler1(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler1");
"handler1".into()
}
async fn handler2(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler2");
"handler2".into()
}
|
HandlerFn 的类型
我们的 HandlerFn 应该是一个 async 函数。
在 Rust 里 async 函数实际上就是一个普通函数,只不过它需要返回 Future。
我们先来尝试一下这么做:
1
2
3
4
5
|
type HandlerFn = Fn(String) -> Future<Output = String>;
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, HandlerFn>>>,
}
|
输出的错误是:
1
2
3
4
5
6
7
8
9
10
|
error[E0782]: expected a type, found a trait
--> src\bin\attempt1.rs:8:18
|
8 | type HandlerFn = Fn(String) -> Future<Output = String>;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
help: you can add the `dyn` keyword if you want a trait object
|
8 | type HandlerFn = dyn Fn(String) -> Future<Output = String>;
| +++
|
Fn 和 Future 都是特征,而不是具体类型。
作为 HashMap 的 value 我们需要一个具体的类型。
直接加 dyn 显然是不行的,需要先把它进行装箱。
所以变成了这样子:
1
2
3
4
5
|
type HandlerFn = Box<dyn Fn(String) -> Box<dyn Future<Output = String>>>;
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, HandlerFn>>>,
}
|
这样编译姑且是没问题的。
但这不是最终的类型,之后会说明为什么。
实现 add 函数
接下来需要一个函数把 handler 放到 HashMap 里,供初始化的时候调用。
我们可能会写出这样的代码,这里直接把我们上面定义的 HandlerFn 作为参数:
1
2
3
4
5
6
|
impl Router {
async fn add(&mut self, key: &str, handler: HandlerFn) {}
}
...
router.add("handler1", handler1).await;
router.add("handler2", handler2).await;
|
但是 router.add 调用的时候报错了:
1
2
3
4
5
6
7
8
9
10
|
error[E0308]: mismatched types
--> src\bin\attempt1.rs:26:28
|
26 | router.add("handler1", handler1).await;
| --- ^^^^^^^^ expected `Box<dyn Fn(String) -> Box<...>>`, found fn item
| |
| arguments to this method are incorrect
|
= note: expected struct `Box<(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + 'static)>`
found fn item `fn(String) -> impl Future<Output = String> {handler1}`
|
因为我们是直接把 async function 传到 add 里面的,async function 又没有装箱,当然不能直接传进去。
那我们应该为 add 函数的 handler 这个参数定一个类型,让它能直接接收 async function 的引用。
参数类型
看编译的报错信息,似乎可以这样写:
1
|
async fn add(&mut self, key: &str, handler: Fn(String) -> impl Future<Output = String>) {}
|
很遗憾这样是不行的:
1
2
3
4
5
6
7
|
error[E0562]: `impl Trait` is not allowed in the return type of `Fn` trait bounds
--> src\bin\attempt1.rs:14:63
|
14 | async fn add(&mut self, key: &str, handler: Fn(String) -> impl Future<Output = String>) {}
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `impl Trait` is only allowed in arguments and return types of functions and methods
|
这里的 handler
参数的类型被指定为 Fn(String) -> impl Future<Output = String>
。这是一个 Trait 约束,它要求 handler
参数的类型必须实现 Fn
Trait,并且这个 Fn
Trait 的调用签名是接受一个 String
参数,并返回一个实现了 Future<Output = String>
Trait 的某个具体但未指定类型。
impl Trait
主要用于函数的参数位置(表示接受任何实现了该 Trait 的类型)和函数的返回类型位置(表示返回一个实现了该 Trait 的某个具体类型,但调用者不知道具体是哪个类型,这被称为“不透明返回类型”)。它是一种语法糖,用于避免写出复杂的具体类型名称,尤其是在处理闭包和异步函数返回的 Future 类型时非常方便。
当你在参数位置使用 Fn(Args) -> Return
时,你是在定义一个 Trait 约束,说明参数的类型必须实现 Fn
Trait,并且其调用签名符合 (Args) -> Return
。这里的 Return
部分实际上是在描述 Fn
Trait 的关联类型 Output
。
impl Future<Output = String>
表示一个不透明的、具体类型。而 Fn
Trait 约束的 Return
位置需要的是一个具体的类型或者一个关联类型的定义。你不能在一个 Trait 定义(或者 Trait 约束,它本质上是基于 Trait 定义的)中使用 impl Trait
来表示关联类型,因为 impl Trait
本身不是一个具体的类型名称,它只是一个类型占位符,其具体类型只有实现 Trait 的那个类型才知道。
简单来说,impl Trait
是用来隐藏具体类型的,而 Trait 定义(或 Trait 约束)需要知道它操作的类型是什么(即使是通过关联类型)。你不能说一个 Trait 的关联类型是“某个实现了 Future 的东西”,你必须说它是“一个实现了 Future 的具体类型 MyFuture
”或者使用一个泛型参数来代表这个具体类型。
对 handler 本身和返回的 Future 装箱
我们改成用泛型类型来实现:
1
2
3
4
5
6
7
8
9
|
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: Fn(String) -> Fut,
Fut: Future<Output = String>,
{
self.table.write().await.insert(key.to_string(), handler);
}
}
|
这样可以了,但是有新的报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
error[E0308]: mismatched types
--> src\bin\attempt1.rs:19:58
|
14 | async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
| --- found this type parameter
...
19 | self.table.write().await.insert(key.to_string(), handler);
| ------ ^^^^^^^ expected `Box<dyn Fn(String) -> Box<...>>`, found type parameter `Fun`
| |
| arguments to this method are incorrect
|
= note: expected struct `Box<(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + 'static)>`
found type parameter `Fun`
|
因为 HashMap 的 value 必须是一个 Box 装箱的 Fn,所以直接把 handler 作为 Fn 传进去不行。
那我们用 Box::new 装箱?
1
2
3
4
5
6
7
8
9
|
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: Fn(String) -> Fut,
Fut: Future<Output = String>,
{
self.table.write().await.insert(key.to_string(), Box::new(handler));
}
}
|
还是不行:
1
2
3
4
5
6
7
8
9
10
11
12
|
error[E0271]: expected `Fun` to be a type parameter that returns `Box<dyn Future<Output = String>>`, but it returns `Fut`
--> src\bin\attempt1.rs:19:58
|
14 | async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
| --- found this type parameter
...
19 | self.table.write().await.insert(key.to_string(), Box::new(handler));
| ^^^^^^^^^^^^^^^^^ expected `Box<dyn Future<Output = String>>`, found type parameter `Fut`
|
= note: expected struct `Box<(dyn Future<Output = String> + 'static)>`
found type parameter `Fut`
= note: required for the cast from `Box<Fun>` to `Box<(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + 'static)>`
|
为啥?因为我们 HandlerFn 要求的 Future 也是装箱的。只把 handler 本身装箱不行。
那我怎么可以改掉 handler 返回的 Future 类型?
答案是使用闭包:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: Fn(String) -> Fut,
Fut: Future<Output = String>,
{
self.table.write().await.insert(
key.to_string(),
Box::new(move |s| {
let fut = handler(s);
Box::new(fut)
}),
);
}
}
|
这里两个 Box::new,第一个是把 handler 函数装箱,第二个是把调用这个 handler 函数返回的 Future 装箱。
注意闭包的 move,因为我们要把 handler 的所有权移动到闭包里。
这样看起来行了吧?
解决生命周期问题
下面是新的报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
error[E0310]: the parameter type `Fun` may not live long enough
--> src\bin\attempt1.rs:21:13
|
21 | / Box::new(move |s| {
22 | | let fut = handler(s);
23 | | Box::new(fut)
24 | | }),
| | ^
| | |
| |______________the parameter type `Fun` must be valid for the static lifetime...
| ...so that the type `Fun` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound
|
16 | Fun: Fn(String) -> Fut + 'static,
| +++++++++
|
我们既然要把 handler 函数装箱,那 Box 就必须拥有它的所有权。
而我们使用 Fun 类型约束传入的 handler 函数的,因此我们必须约束传入的 handler 是所有权的,而不是一个引用。
解决方法是给 Fun 加上’static 约束:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Fn(String) -> Fut,
Fut: Future<Output = String>,
{
self.table.write().await.insert(
key.to_string(),
Box::new(move |s| {
let fut = handler(s);
Box::new(fut)
}),
);
}
}
|
关于’static 和所有权到底有啥关系,推荐看看这篇文章:Rust 中常见的有关生命周期的误解
好了,又有新的报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
error[E0310]: the parameter type `Fut` may not live long enough
--> src\bin\attempt1.rs:23:17
|
23 | Box::new(fut)
| ^^^^^^^^^^^^^
| |
| the parameter type `Fut` must be valid for the static lifetime...
| ...so that the type `Fut` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound
|
17 | Fut: Future<Output = String> + 'static,
| +++++++++
|
由于我们也给 Future 装箱了,所以我们也要保证 Fut 类型拥有所有权。同样给它加上’static 约束:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Fn(String) -> Fut,
Fut: 'static + Future<Output = String>,
{
self.table.write().await.insert(
key.to_string(),
Box::new(move |s| {
let fut = handler(s);
Box::new(fut)
}),
);
}
}
|
这样没问题了,但很可惜东窗事发,下面 tokio::spawn 那边出问题了。
解决 Pin、Send 和 Sync 问题
Future 必须是 Unpin
还记得我们的 main 函数是啥?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#[tokio::main]
async fn main() {
let mut router = Router::default();
router.add("handler1", handler1).await;
router.add("handler2", handler2).await;
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
handler("req1".into()).await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
handler("req2".into()).await;
});
handle
};
let _ = tokio::join!(handle1, handle2);
}
|
报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
error[E0277]: `dyn Future<Output = String>` cannot be unpinned
--> src\bin\attempt1.rs:38:36
|
38 | handler("req1".into()).await;
| -----------------------^^^^^
| | ||
| | |the trait `Unpin` is not implemented for `dyn Future<Output = String>`
| | help: remove the `.await`
| this call returns `dyn Future<Output = String>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = String>>` to implement `Future`
= note: required for `Box<dyn Future<Output = String>>` to implement `IntoFuture`
|
这是说 Future 必须要是 Unpin 的,因为我们在一个 tokio 协程中调用了 await,在这个 await 点上它会不断去 poll 这个 Future,在这个过程中它可能在线程中移动,导致其中的自引用指针不安全。所以必须把它 Pin 起来。用 Pin 包装一层之后,Pin<Box<dyn Future>> 本身是 Unpin 的,就可以安全地 await 它了。
好,那我们把它 Pin 一下。由于 Future 已经在 Box 里了,我们直接使用 Box::into_pin:
1
2
3
4
5
6
7
8
9
10
11
|
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let fut = handler("req1".into());
let pinned_fut = Box::into_pin(fut);
pinned_fut.await;
});
handle
};
|
注意哈,关于 Pin 常用的有三个函数的区别:
- Box::into_pin(x):x 本身是个 Box<Future>,把它装到 Pin 里,变成 Pin<Box<Future>>
- Box::pin(x):x 本身是个 Future,把它装到 Pin<Box>里,变成 Pin<Box<Future>>;相当于 Box::into_pin(Box::new(x))
- Pin::new(x):x 是个&mut 指针;基本用不到这个函数
Fn 必须是 Send
下面来看新的报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
error[E0277]: `(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + 'static)` cannot be sent between threads safely
--> src\bin\attempt1.rs:35:35
|
35 | let handle = tokio::spawn(async move {
| ______________________------------_^
| | |
| | required by a bound introduced by this call
36 | | let lock = router_clone.table.read().await;
37 | | let handler = lock.get("handler1").unwrap();
38 | | let fut = handler("req1".into());
39 | | let pinned_fut = Box::into_pin(fut);
40 | | pinned_fut.await;
41 | | });
| |_________^ `(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + 'static)` cannot be sent between threads safely
|
= help: the trait `Send` is not implemented for `(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + 'static)`
= note: required for `Unique<(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + 'static)>` to implement `Send`
|
tokio::spawn
创建的任务可以在不同的线程上执行,Tokio 运行时可能会在 .await
点之间将任务从一个线程移动到另一个线程。当一个任务被移动时,它所拥有的所有数据(包括从 Router
中读取并正在使用的 HandlerFn
)也必须能够安全地跨线程移动。Fn
特征对象(dyn Fn(...)
)代表一个函数或闭包,它可能捕获了环境中的变量。如果这个函数或闭包捕获了非 Send
的数据(例如 Rc
或裸指针),那么将它移动到另一个线程是不安全的,会导致数据竞争或其他未定义行为。因此,为了保证在多线程环境中安全地从共享的 HashMap
中获取并调用 Fn
特征对象,该特征对象本身必须实现 Send
特征,表明它可以安全地在线程间转移所有权。
为什么要显式声明?因为 Box 里面的特征对象 Rust 是不会帮我们自动推导它的约束的,所以 Send、Sync 包括’static,如果要约束的话,都得我们自己写。
因此改一下 HandlerFn 的类型定义:
1
|
type HandlerFn = Box<dyn Fn(String) -> Box<dyn Future<Output = String>> + Send>;
|
Fn 必须是 Sync
解决之后,又来一个报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
error[E0277]: `(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + Send + 'static)` cannot be shared between threads safely
--> src\bin\attempt1.rs:35:35
|
35 | let handle = tokio::spawn(async move {
| ______________________------------_^
| | |
| | required by a bound introduced by this call
36 | | let lock = router_clone.table.read().await;
37 | | let handler = lock.get("handler1").unwrap();
38 | | let fut = handler("req1".into());
39 | | let pinned_fut = Box::into_pin(fut);
40 | | pinned_fut.await;
41 | | });
| |_________^ `(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + Send + 'static)` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + Send + 'static)`
= note: required for `Unique<(dyn Fn(String) -> Box<(dyn Future<Output = String> + 'static)> + Send + 'static)>` to implement `Sync`
|
这段代码中 Fn
需要是 Sync
的原因,是我们用了 RwLock,因为存储在 RwLock
内部的 HandlerFn
会在多个线程间通过共享引用(读锁)被并发访问和调用,而通过共享引用调用 Fn
trait 的方法要求该类型是 Sync
。
一个 Fn
闭包可能捕获了非 Sync
的环境数据(例如 Rc
或 Cell
)。Send
bound 只保证 trait 对象本身可以跨线程移动,但不能保证通过共享引用并发调用它是安全的。为了保证通过共享引用并发调用是安全的,需要 Sync
bound。
如果我们把 RwLock 改成 Mutex,就不需要这个 Sync 了。因为 Mutex 是独占访问的。
因此:
1
|
type HandlerFn = Box<dyn Fn(String) -> Box<dyn Future<Output = String>> + Send + Sync>;
|
Future 必须是 Send
来看这部分最后一个报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
error: future cannot be sent between threads safely
--> src\bin\attempt1.rs:35:22
|
35 | let handle = tokio::spawn(async move {
| ______________________^
36 | | let lock = router_clone.table.read().await;
37 | | let handler = lock.get("handler1").unwrap();
38 | | let fut = handler("req1".into());
39 | | let pinned_fut = Box::into_pin(fut);
40 | | pinned_fut.await;
41 | | });
| |__________^ future created by async block is not `Send`
|
= help: the trait `Send` is not implemented for `dyn Future<Output = String>`
note: future is not `Send` as it awaits another future which is not `Send`
--> src\bin\attempt1.rs:40:13
|
40 | pinned_fut.await;
| ^^^^^^^^^^ await occurs here on type `Pin<Box<dyn Future<Output = String>>>`, which is not `Send`
|
前面说了,在任务执行过程中,特别是在 .await
点之间,Tokio 运行时可能会将这个任务从一个线程移动到另一个线程上继续执行。因此,传递给 tokio::spawn
的 Future
必须是 Send
的,因为 Tokio 运行时需要在其内部的线程池中安全地调度和执行这个 Future,这可能涉及将 Future 的状态在不同的线程之间移动。
所以我们来改一下:
1
|
type HandlerFn = Box<dyn Fn(String) -> Box<dyn Future<Output = String> + Send> + Send + Sync>;
|
需要注意,我们改了 HandlerFn 之后,别忘了把 add 函数里面 Fun 和 Fut 的声明也给相应改了,不然对不上一样编译不通过:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
self.table.write().await.insert(
key.to_string(),
Box::new(move |s| {
let fut = handler(s);
Box::new(fut)
}),
);
}
}
|
到目前为止的完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;
type HandlerFn = Box<dyn Fn(String) -> Box<dyn Future<Output = String> + Send> + Send + Sync>;
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, HandlerFn>>>,
}
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
self.table.write().await.insert(
key.to_string(),
Box::new(move |s| {
let fut = handler(s);
Box::new(fut)
}),
);
}
}
#[tokio::main]
async fn main() {
let mut router = Router::default();
router.add("handler1", handler1).await;
router.add("handler2", handler2).await;
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let fut = handler("req1".into());
let pinned_fut = Box::into_pin(fut);
pinned_fut.await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
let fut = handler("req2".into());
let pinned_fut = Box::into_pin(fut);
pinned_fut.await;
});
handle
};
let _ = tokio::join!(handle1, handle2);
}
async fn handler1(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler1");
"handler1".into()
}
async fn handler2(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler2");
"handler2".into()
}
|
终于没有编译错误了,运行试试:
1
2
3
4
|
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.04s
Running `target\debug\attempt1.exe`
handler2
handler1
|
把 Pin<Box>包装的 Future 放到 HashMap 里
现在我们有这样的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let fut = handler("req1".into());
let pinned_fut = Box::into_pin(fut);
pinned_fut.await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
let fut = handler("req2".into());
let pinned_fut = Box::into_pin(fut);
pinned_fut.await;
});
handle
};
|
我们每次调用 handler 的时候都要把 Future 给 Pin 一下,有点麻烦。所以我们最好把 Pin 的封装这个过程放到 add 函数里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 这里改了返回的 Future 类型
type HandlerFn = Box<dyn Fn(String) -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, HandlerFn>>>,
}
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
self.table.write().await.insert(
key.to_string(),
Box::new(move |s| {
let fut = handler(s);
Box::pin(fut) // 这里从 into_pin 改成了 pin
}),
);
}
}
|
这样,我们就可以直接对 HashMap 获取到的 Future 进行 await:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let fut = handler("req1".into());
fut.await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
let fut = handler("req2".into());
fut.await;
});
handle
};
|
使用#[async_trait]
一直到目前为止我们都是自己封装的,感觉要写一大坨类型和约束,还要操作闭包,特别麻烦,有没有简单一点的办法?
有的,就是使用 async_trait 这个 crate。
Rust 一般是不允许在 trait 里面使用 async 函数的,如果硬要用的话可能会碰到这样的报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
error[E0038]: the trait `HandlerFn` cannot be made into an object
--> src\bin\attempt2.rs:31:9
|
31 | self.table.write().await.insert(key.to_string(),Box::new(handler));
| ^^^^^^^^^^^^^^^^^^ `HandlerFn` cannot be made into an object
|
note: for a trait to be "dyn-compatible" it needs to allow building a vtable to allow the call to be resolvable dynamically; for more information visit <https://doc.rust-lang.org/reference/items/traits.html#object-safety>
--> src\bin\attempt2.rs:9:14
|
8 | trait HandlerFn {
| --------- this trait cannot be made into an object...
9 | async fn handle(&self, req: String) -> String;
| ^^^^^^ ...because method `handle` is `async`
= help: consider moving `handle` to another trait
|
看这里:this trait cannot be made into an object... because method handle is async
。
但这个 crate 可以帮助我们实现这一点。
实际上还是一种语法糖,它帮我们解决了繁琐的封装工作。
请看改过的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
#[async_trait]
trait HandlerFn {
async fn handle(&self, req: String) -> String;
}
#[async_trait]
impl<Fun, Fut> HandlerFn for Fun
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
async fn handle(&self, req: String) -> String {
self(req).await
}
}
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, Box<dyn HandlerFn>>>>,
}
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
self.table
.write()
.await
.insert(key.to_string(), Box::new(handler));
}
}
#[tokio::main]
async fn main() {
let mut router = Router::default();
router.add("handler1", handler1).await;
router.add("handler2", handler2).await;
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let fut = handler.handle("req1".into());
fut.await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
let fut = handler.handle("req2".into());
fut.await;
});
handle
};
let _ = tokio::join!(handle1, handle2);
}
|
- 我们把 HandlerFn 声明为了 async trait,其中有一个 handle 方法
- 我们为 Fun 函数类型实现了 HandlerFn 这个 trait,里面用 self(req).await 调用了函数自身
- 现在往 HashMap 里面放的 value 变成了 Box 包装的 HandlerFn 这个特征对象
- add 方法改成直接把 Box 包装的 handler 函数 insert 到 HashMap 里
- tokio::spawn 里改成了调用拿出来 HandlerFn 特征对象的 handle 函数
看看现在的报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
error[E0277]: `(dyn HandlerFn + 'static)` cannot be sent between threads safely
--> src\bin\attempt2.rs:47:35
|
47 | let handle = tokio::spawn(async move {
| ______________________------------_^
| | |
| | required by a bound introduced by this call
48 | | let lock = router_clone.table.read().await;
49 | | let handler = lock.get("handler1").unwrap();
50 | | let fut = handler.handle("req1".into());
51 | | fut.await;
52 | | });
| |_________^ `(dyn HandlerFn + 'static)` cannot be sent between threads safely
|
= help: the trait `Send` is not implemented for `(dyn HandlerFn + 'static)`
= note: required for `Unique<(dyn HandlerFn + 'static)>` to implement `Send`
|
似曾相识吧,原因是现在我们把 Box<dyn xx>给写到 table 这个 HashMap 的类型上面了,然后忘记加 Send 和 Sync 约束了。
改一下:
1
2
3
|
struct Router {
table: Arc<RwLock<HashMap<String, Box<dyn HandlerFn + Send + Sync>>>>,
}
|
现在编译通过了,运行也没问题,这次的完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
use async_trait::async_trait;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;
#[async_trait]
trait HandlerFn {
async fn handle(&self, req: String) -> String;
}
#[async_trait]
impl<Fun, Fut> HandlerFn for Fun
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
async fn handle(&self, req: String) -> String {
self(req).await
}
}
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, Box<dyn HandlerFn + Send + Sync>>>>,
}
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
self.table
.write()
.await
.insert(key.to_string(), Box::new(handler));
}
}
#[tokio::main]
async fn main() {
let mut router = Router::default();
router.add("handler1", handler1).await;
router.add("handler2", handler2).await;
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let fut = handler.handle("req1".into());
fut.await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
let fut = handler.handle("req2".into());
fut.await;
});
handle
};
let _ = tokio::join!(handle1, handle2);
}
async fn handler1(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler1");
"handler1".into()
}
async fn handler2(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler2");
"handler2".into()
}
|
为啥现在不需要把 Future 给 Pin 起来了?
因为我们用了async_trait
宏,会转换我们的代码,使得 async fn
方法在编译后实际上返回一个堆分配的、已 Pin 的特征对象,通常是 Pin<Box<dyn Future + Send + 'life>>
这样的类型。非常方便。
提前释放读锁
看下面的代码:
1
2
3
4
5
6
7
8
9
10
|
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let fut = handler.handle("req1".into());
fut.await;
});
handle
};
|
lock 会一直持有,直到 fut 的 await 结束,持有时间太长了感觉。
虽然在我们的例子里,HashMap 写入只有在初始化时进行,之后就都是读了。RwLock 支持并发读,所以这样没什么性能问题。
但能修还是修一下吧。
下面看完整代码,其中包含注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
use async_trait::async_trait;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;
#[async_trait]
trait HandlerFn: Send + Sync + 'static {
async fn handle(&self, req: String) -> String;
}
#[async_trait]
impl<Fun, Fut> HandlerFn for Fun
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
async fn handle(&self, req: String) -> String {
self(req).await
}
}
// HashMap 的 value 把 Arc 换成了 Box
type RouterTableValue = Arc<dyn HandlerFn + Send + Sync>;
#[derive(Default, Clone)]
struct Router {
table: Arc<RwLock<HashMap<String, RouterTableValue>>>,
}
impl Router {
async fn add<Fun, Fut>(&mut self, key: &str, handler: Fun)
where
Fun: 'static + Sync + Send + Fn(String) -> Fut,
Fut: 'static + Send + Future<Output = String>,
{
self.table
.write()
.await
.insert(key.to_string(), Arc::new(handler));
}
}
#[tokio::main]
async fn main() {
let mut router = Router::default();
router.add("handler1", handler1).await;
router.add("handler2", handler2).await;
let handle1 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler1").unwrap();
let handler_clone = handler.clone(); // clone 一份
drop(lock); // 这里释放锁
let fut = handler_clone.handle("req1".into());
fut.await;
});
handle
};
let handle2 = {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let lock = router_clone.table.read().await;
let handler = lock.get("handler2").unwrap();
let handler_clone = handler.clone();
drop(lock);
let fut = handler_clone.handle("req2".into());
fut.await;
});
handle
};
let _ = tokio::join!(handle1, handle2);
}
async fn handler1(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler1");
"handler1".into()
}
async fn handler2(req: String) -> String {
sleep(Duration::from_secs(1)).await;
println!("handler2");
"handler2".into()
}
|
为啥把装 HandlerFn 的 Box 换成 Arc 就行了?
Box 和 Arc 都是智能指针,其中 Box 单纯是为了把数据分配在堆上,而 Arc 还有一个功能就是引用计数。
Arc 在 clone 的时候会给内部 HandlerFn 的引用计数增加 1,然后返回一个完整的拥有所有权的 Arc 对象。
使用lock.get("handler1").unwrap()
获取到的东西是&RouterTableValue
这个不可变引用。
如果调用在这上面调用handler.clone()
的话,当RouterTableValue
是不同的东西,效果也不同:
- 如果是 Box,实际上只是把这个不可变引用给 clone 了一份,得到的仍然是和原来一样的
&RouterTableValue
,仍然持有对 lock 的借用
- 如果是 Arc,调用的会是 Arc 实现的 clone 方法,获取到的是一个全新的
Arc<dyn HandlerFn+Send+Sync>
,就不持有对 lock 的借用了,所以之后可以安全地 drop 掉 lock
总结
这样一个小小的需求,在 Rust 里实现起来还是挺麻烦的。
还涉及到了很多比较进阶的知识,不太了解的话可能会有点晕。
所以可以和 AI 多多交流,这篇文章写作的过程中,一些解释部分也用了 AI 生成。(Gemini 2.5 Flash 思考模式,目前感觉很强)
如果在 Go 里实现的话,一般也就是写个函数类型,完事,然后读取写入的时候记得加个锁。
不过 Rust 大概正是麻烦,才保证了运行时的安全吧。
这一套编译错误解决下来也感觉学到了不少东西。