Featured image of post 在 Cloudflare Workers Rust 中使用异步代码(Future)

在 Cloudflare Workers Rust 中使用异步代码(Future)

介绍如何在 Cloudflare Workers Rust 中使用异步代码(Future)。由于 Cloudflare Workers 的 wasm 环境限制,tokio 库无法直接使用。重点讲解如何使用 futures crate 的 join! 和 join_all 实现并发,以及如何使用 buffer_unordered 限制并发数。

Cloudflare Workers 全面支持 Rust,其原理是使用 wasm_bindgen,先将 Rust 代码编译成 wasm,然后再在 CF 的环境中运行。

由于 wasm 环境本身是单线程的,因此我们平常用的 tokio,以及其中的许多工具方法(比如 spawn)等都不能用了。

如果直接在 Cloudflare Workers 中使用 tokio 的话会报类似这样的错:

image-20250416092505020

Cloudflare Workers 确实是提供了 Rust 开发的文档,但文档非常简略,其中的例子虽然有涉及到异步,但基本都是简单的在 async 函数中调用 await 这样的操作。并没有介绍用什么方法能做到类似 tokio 中那样,同时 spawn 几个异步函数,最后使用 join 统一收集结果的操作。

这篇文章试图弥补一部分这方面的空白。

使用 async/await

Cloudflare Workers 本身提供一个异步的执行环境(Future Executor),因此原生的 async/await 是可以自然使用的。

这也是官方文档中唯一介绍的方法,具体而言:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::sync::Once;
use tracing::info;
use tracing_subscriber::fmt;
use tracing_subscriber_wasm::MakeConsoleWriter;
use worker::*;

static START: Once = Once::new();

#[event(fetch)]
async fn fetch(req: HttpRequest, _env: Env, _ctx: Context) -> anyhow::Result<Response> {
    console_error_panic_hook::set_once();
    START.call_once(|| {
        fmt()
            .with_writer(MakeConsoleWriter::default().map_trace_level_to(tracing::Level::DEBUG))
            .without_time()
            .init();
    });

    fut().await;
    fut2().await;

    Ok(Response::ok("ok")?)
}

async fn fut() -> String {
    let resp = reqwest::get("https://httpbin.org/status/400")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut");
    resp
}
async fn fut2() -> String {
    let resp = reqwest::get("https://httpbin.org/status/500")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut2");
    resp
}

输出:

1
2
 INFO test_workers: fut
 INFO test_workers: fut2 

先执行 fut 然后执行 fut2,虽然用了 await 但还是同步操作,到这里还没什么特别的。

使用 futures 的 API

如果要一起 launch 两个 future,让它们同时开始执行,并且最后收集返回结果,只是简单的 await 就不行了。

在 tokio 里,我们可能会先用 tokio::spawn 启动两个异步任务,然后去 join 返回的两个 JoinHandle。

其实 spawn 也算是 tokio 提供的一种工具函数,目的是马上执行传入的 future。但这样的工具函数在 Cloudflare Workers 或者说 wasm 的环境中不存在。

我们需要做的是直接使用 futures 自己的 API 来编写异步代码。

简单情况:使用 futures::join!

简单情况可以直接使用 futures::join!。

其中 futures 是一个独立的 crate,需要先安装。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use std::sync::Once;
use tracing::info;
use tracing_subscriber::fmt;
use tracing_subscriber_wasm::MakeConsoleWriter;
use worker::wasm_bindgen_futures::spawn_local;
use worker::*;

static START: Once = Once::new();

#[event(fetch)]
async fn fetch(req: HttpRequest, _env: Env, _ctx: Context) -> anyhow::Result<Response> {
    console_error_panic_hook::set_once();
    START.call_once(|| {
        fmt()
            .with_writer(MakeConsoleWriter::default().map_trace_level_to(tracing::Level::DEBUG))
            .without_time()
            .init();
    });

    let handle = fut();
    let handle2 = fut2();
    let tuple = futures::join!(handle, handle2);
    info!("{:#?}", tuple);

    Ok(Response::ok("ok")?)
}

async fn fut() -> String {
    let resp = reqwest::get("https://httpbin.org/status/400")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut");
    resp
}
async fn fut2() -> String {
    let resp = reqwest::get("https://httpbin.org/status/500")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut2");
    resp
}

输出结果:

1
2
3
4
5
6
7
INFO test_workers: fut2
 INFO test_workers: fut

 INFO test_workers: (
    "400 Bad Request",
    "500 Internal Server Error",
)

输出时间基本相当,确实是并行的。

数组的情况:使用 futures::future::join_all

如果 future 很多,是一个数组的话,就需要用到这种方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use std::sync::Once;
use tracing::info;
use tracing_subscriber::fmt;
use tracing_subscriber_wasm::MakeConsoleWriter;
use worker::wasm_bindgen_futures::spawn_local;
use worker::*;

static START: Once = Once::new();

#[event(fetch)]
async fn fetch(req: HttpRequest, _env: Env, _ctx: Context) -> anyhow::Result<Response> {
    console_error_panic_hook::set_once();
    START.call_once(|| {
        fmt()
            .with_writer(MakeConsoleWriter::default().map_trace_level_to(tracing::Level::DEBUG))
            .without_time()
            .init();
    });

    let mut arr = vec![];
    for i in 1..5 {
        arr.push(fut());
    }
    let res = futures::future::join_all(arr).await;
    info!("{:#?}", res);

    Ok(Response::ok("ok")?)
}

async fn fut() -> String {
    let resp = reqwest::get("https://httpbin.org/status/400")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut");
    resp
}
async fn fut2() -> String {
    let resp = reqwest::get("https://httpbin.org/status/500")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut2");
    resp
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
INFO test_workers: fut
 INFO test_workers: fut
 INFO test_workers: fut
 INFO test_workers: fut

 INFO test_workers: [
    "400 Bad Request",
    "400 Bad Request",
    "400 Bad Request",
    "400 Bad Request",
]

但有一个限制是 future 的类型必须要一致,如果不一致的话(如 fut 和 fut2),就会出错。例如我们把 fut2 也给 push 进去:

1
2
3
4
for i in 1..5 {
        arr.push(fut());
}
arr.push(fut2());

报错:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
error[E0308]: mismatched types
    --> src\lib.rs:24:14
     |
22   |         arr.push(fut());
     |         ---      ----- this argument has type `impl futures::Future<Output = std::string::String>`...
     |         |
     |         ... which causes `arr` to have type `Vec<impl futures::Future<Output = std::string::String>>`
23   |     }
24   |     arr.push(fut2());
     |         ---- ^^^^^^ expected future, found a different future
     |         |
     |         arguments to this method are incorrect
     |
     = help: consider `await`ing on both `Future`s
     = note: distinct uses of `impl Trait` result in different opaque types

限制并发数:使用 futures::stream::iter 配合 buffer_unordered

如果要限制并发数,在 tokio 中我们可能会使用 Semaphore 等,但在 wasm 环境中没有 Semaphore。

为此我们可以使用 futures 包 StreamExt 里面的 buffer_unordered。

代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use futures::{FutureExt, StreamExt};
use std::sync::Once;
use tracing::info;
use tracing_subscriber::fmt;
use tracing_subscriber_wasm::MakeConsoleWriter;
use worker::wasm_bindgen_futures::spawn_local;
use worker::*;

static START: Once = Once::new();

#[event(fetch)]
async fn fetch(req: HttpRequest, _env: Env, _ctx: Context) -> anyhow::Result<Response> {
    console_error_panic_hook::set_once();
    START.call_once(|| {
        fmt()
            .with_writer(MakeConsoleWriter::default().map_trace_level_to(tracing::Level::DEBUG))
            .without_time()
            .init();
    });

    let mut arr = vec![];
    for i in 1..5 {
        arr.push(fut());
    }
    let res = futures::stream::iter(arr)
        .buffer_unordered(2) // 限制并发数为 2
        .collect::<Vec<_>>()
        .await;
    info!("{:#?}", res);

    Ok(Response::ok("ok")?)
}

async fn fut() -> String {
    let resp = reqwest::get("https://httpbin.org/status/400")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut");
    resp
}
async fn fut2() -> String {
    let resp = reqwest::get("https://httpbin.org/status/500")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut2");
    resp
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 INFO test_workers: fut
 INFO test_workers: fut
 INFO test_workers: fut
 INFO test_workers: fut

 INFO test_workers: [
    "400 Bad Request",
    "400 Bad Request",
    "400 Bad Request",
    "400 Bad Request",
]

这样的话每次最多有两个请求在执行。

关于 wasm_bindgen_futures::spawn_local

spawn_local 似乎是 wasm 提供的一种类似 tokio 中 spawn 的方法,目的是马上执行一个传入的 future。

示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
use futures::{FutureExt, StreamExt};
use std::sync::Once;
use tracing::info;
use tracing_subscriber::fmt;
use tracing_subscriber_wasm::MakeConsoleWriter;
use worker::wasm_bindgen_futures::spawn_local;
use worker::*;

static START: Once = Once::new();

#[event(fetch)]
async fn fetch(req: HttpRequest, _env: Env, _ctx: Context) -> anyhow::Result<Response> {
    console_error_panic_hook::set_once();
    START.call_once(|| {
        fmt()
            .with_writer(MakeConsoleWriter::default().map_trace_level_to(tracing::Level::DEBUG))
            .without_time()
            .init();
    });

    spawn_local(async {
        fut().await;
    });
    spawn_local(async {
        fut2().await;
    });

    Ok(Response::ok("ok")?)
}

async fn fut() -> String {
    let resp = reqwest::get("https://httpbin.org/status/400")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut");
    resp
}
async fn fut2() -> String {
    let resp = reqwest::get("https://httpbin.org/status/500")
        .await
        .unwrap()
        .status()
        .to_string();
    info!("fut2");
    resp
}

但在测试过程中发现这些问题:

  • 传入的 Future,Output 必须是 (),也就是没有返回值,这样的话就无法收集 future 的执行结果了;如果只是这样的话似乎还可以用 channel 等方式发送数据,关键是下面一个问题。
  • 函数本身也没有返回值,没有一个 handle 可以 await,导致程序马上执行到 Ok(Response::ok(“ok”)?) 这一行就退出了,根本来不及等 future 跑完。

查看文档的过程中发现文档很简略,也没有例子。所以非常意义不明。

不过好在我们用 futures 包里面的工具就行了,也用不到这个方法。

Licensed under CC BY-NC-SA 4.0