Rust异步编程 一

本文同步于 Rust中文社区 ,本文时间:2018-12-20, 作者: Rust中文社区 ,简介:Rust中文社区

欢迎向Rust中文社区投稿, 投稿地址 ,好文将在以下地方直接展示

  1. Rust中文社区首页
  2. Rust中文社区 文章专栏
  3. 知乎专栏 Rust中文社区
  4. 思否专栏 Rust中文社区
  5. 简书专题 Rust中文社区
  6. 微博Rustlang-cn

欢迎来到 Rust 异步编程!如果你正打算用 Rust 写一些异步代码,那你就来对地方了。不管你打算构建Web服务器,数据库,还是操作系统,这本书都能帮助你运用 Rust 的异步编程工具最大化利用你的硬件。

  • 开始几章主要介绍异步编程,以及在 Rust 中的特别之处。
  • 中间几章讨论用于异步编程的关键的基础设施和控制流工具,并详细介绍了一些构建类库、应用时将性能和复用性最大化的最佳实践。
  • 本书的最后章节介绍了 Rust 的异步生态并提供了一些实现常见功能的例子。

接下来,让我们打开 Rust 异步编程世界的大门吧!

Rust异步编程中文版 同步官方更新,欢迎参与!同时 欢迎加入 Rust中文社区,共建Rust语言中文网络!

Why 异步

我们喜欢 Rust,因为它能让我们写出高效、安全的代码,但为什么要异步呢? 因为异步代码能让我们在同一个系统线程上并发执行多项任务。在一个典型的多线程应用里,如果你想同时下载两个不同的网页,你必须将这两项工作分配到两个不同的线程上,像这样:

fn get_two_sites() {
    // 创建两个线程分别执行各自的下载任务
    let thread_one = thread::spawn(|| download("https:://www.foo.com"));
    let thread_two = thread::spawn(|| download("https:://www.bar.com"));
     // 等待两个线程完成任务
    thread_one.join();
    thread_two.join();
}

对很多应用来说这就足够了——这样一来,多线程就被设计为只用来一次性执行多个不同任务。但这也带来了一些限制。在线程切换和跨线程共享数据上会产生很多额外开销。即使是一个什么都不做的线程也会用尽珍贵的系统资源,而这就是异步代码要减少的开销。我们可以使用 Rust 的 async/await! 重写上面的函数,实现执行多个任务的目标而不用创建多个线程:

async fn get_two_sites() {
    // Create a two different "futures" which, when run to completion,
    // will asynchronously download the webpages.
    // 创建两个不同的 future,当它们被完成执行时会异步下载不同的网页
    let future_one = download_async("https:://www.foo.com");
    let future_two = download_async("https:://www.bar.com");
     // 同时执行两个 future 使它们完成
    join!(future_one, future_two);
}

总之,相比多线程实现来说,异步实现的应用具有使用更少的资源获得更高性能的潜力。线程由操作系统支持,使用它们并不需要特别的编程模型——任何函数都可以创建一个线程,而调用一个使用多线程的函数就像调用一个普通函数一样简单。但是异步函数就需要语言层面或者类库层面提供特殊的支持才能工作。在 Rust 中, async fn 会创建一个异步函数,当它被调用时,会返回一个需要依次执行函数体来完成的 future 对象。 传统多线程应用也可以非常有效,Rust的较小的内存占用以及可预测性意味着你可以做更多的事,即使不使用 async 关键字。然而,异步编程模型增长的复杂性并不总是值得的,想清楚你的应用采用简单多线程模型是否会更好仍然是很重要的。

async/await! 入门

async/await! 是 Rust 语言用于编写像同步代码一样的异步函数的内置工具。 async 将一个代码块转化为一个实现了名为 Future 的特质(trait)的状态机。虽然在同步方法中调用阻塞函数会阻塞整个线程,但阻塞的 Futures 将让出线程控制权,允许其他 Futures 运行。

要创建异步函数,可以使用 async fn 语法:

async fn do_something() { ... }

async fn 返回的值是一个 Future ,需要在执行着上运行才能起作用:

// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheudling
// multiple futures onto the same thread.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    block_on(future); // `future` is run and "hello, world!" is printed
}

async fn 中,你可以使用 await! 等待另一种实现 Future 特性的类型完成,例如另一个 async fn 的输出。 与 block_on 不同, await! 不会阻止当前线程,而是异步等待 Future 完成,如果 Future 无法取得进展,则允许其他任务运行。

例如,假设我们有三个 async fnlearn_songsing_songdance

async fn learn_song() -> Song { ... }
async fn sing_song(song: Song) { ... }
async fn dance() { ... }

一种执行“学习”、“唱歌” 和 “跳舞” 的方法是,在执行每一项任务时阻塞:

fn main() {
  let song = block_on(learn_song());
  block_on(sing_song(song));
  block_on(dance);
}

但是,我们使用这种方式并没有发挥出最大的性能——我们只是把它们一个个执行了。很明显,我们唱歌之前必须要学会它,但是在学歌和唱歌的同时我们也是可以跳舞的。要实现这样的效果,我们可以分别创建两个 async fn 来并发地执行:

async fn learn_and_sing() {
    // 在唱歌之前等待学歌完成
    // 这里我们使用 `await!` 而不是 `block_on` 来防止阻塞线程,这样就可以同时执行 `dance` 了。
    let song = await!(learn_song());
    await!(sing_song(song));
}
 async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();
     // `join!` 类似于 `await!` ,但是可以等待多个 future 并发完成
    join!(f1, f2)
}
 fn main() {
    block_on(async_main());
}

在本例中,学歌必须发生在唱歌之前,但是学习和唱歌当同时都可以跳舞。如果我们在 learn_and_sing 中使用 block_on(learn_song()) 而不是 await!(learn_song()) 的话,它的执行将阻塞至学歌结束,就无法同时跳舞了。通过 await! 学歌这一操作,我们允许其他任务并发执行。

到目前为止你已经学会了 async/await! 的基本用法,现在我们尝试写一个例子。

应用:简单HTTP服务器

让我们使用 async/ await! 构建一个echo服务器!

首先,请 rustup update nightly 确保您的Rust最新 -- 我们正在使用最前沿的功能,因此保持最新状态至关重要。完成后,运行 cargo +nightly new async-await-echo 以创建新项目,然后打开生成的 async-await-echo 文件夹。

让我们在 Cargo.toml 文件中添加一些依赖项:

[dependencies]
# The latest version of the "futures" library, which has lots of utilities
# for writing async code. Enable the "tokio-compat" feature to include the
# functions for using futures 0.3 and async/await with the Tokio library.
futures-preview = { version = "0.3.0-alpha.9", features = "tokio-compat"] }
# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
hyper = "0.12.9"
# Tokio is a runtime for asynchronous I/O applications. Hyper uses
# it for the default server runtime. The `tokio` crate also provides an
# an `await!` macro similar to the one in `std`, but it supports `await!`ing
# both futures 0.1 futures (the kind used by Hyper and Tokio) and
# futures 0.3 futures (the kind produced by the new `async`/`await!` language
# feature).
tokio = { version = "0.1.11", features = ["async-await-preview"] }

现在我们已经完成依赖项了,让我们开始编写一些代码。打开 src/main.rs 并在文件的顶部启用以下功能:

#![feature(async_await, await_macro, futures_api)]
  • async_await 增加了对 async fn 语法的支持。
  • await_macro 增加了对 await! 宏的支持。
  • futures_api 增加了对 nightly std::future 和 std::task 模块的支持,这些模块定义了核心Future特征和依赖类型。

另外,我们还要添加一些导入:

use {
    hyper::{
        // Miscellaneous types from Hyper for working with HTTP.
        Body, Client, Request, Response, Server, Uri,
         // This function turns a closure which returns a future into an
        // implementation of the the Hyper `Service` trait, which is an
        // asynchronous function from a generic `Request` to a `Response`.
        service::service_fn,
         // A function which runs a future to completion using the Hyper runtime.
        rt::run,
    },
    futures::{
        // `TokioDefaultSpawner` tells futures 0.3 futures how to spawn tasks
        // onto the Tokio runtime.
        compat::TokioDefaultSpawner,
         // Extension traits providing additional methods on futures.
        // `FutureExt` adds methods that work for all futures, whereas
        // `TryFutureExt` adds methods to futures that return `Result` types.
        future::{FutureExt, TryFutureExt},
    },
    std::net::SocketAddr,
     // This is the redefinition of the await! macro which supports both
    // futures 0.1 (used by Hyper and Tokio) and futures 0.3 (the new API
    // exposed by `std::future` and implemented by `async fn` syntax).
    tokio::await,
};

一旦导入完成,我们就可以开始整理样板,以便我们提供请求服务:

async fn serve_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    unimplemented!()
}
async fn run_server(addr: SocketAddr) {
    println!("Listening on http://{}", addr);
     // Create a server bound on the provided address
    let serve_future = Server::bind(&addr)
        // Serve requests using our `async serve_req` function.
        // `serve` takes a closure which returns a type implementing the
        // `Service` trait. `service_fn` returns a value implementing the
        // `Service` trait, and accepts a closure which goes from request
        // to a future of the response. In order to use our `serve_req`
        // function with Hyper, we have to box it and put it in a compatability
        // wrapper to go from a futures 0.3 future (the kind returned by
        // `async fn`) to a futures 0.1 future (the kind used by Hyper).
        .serve(|| service_fn(|req|
            serve_req(req).boxed().compat(TokioDefaultSpawner)
        ));
     // Wait for the server to complete serving or exit with an error.
    // If an error occurred, print it to stderr.
    if let Err(e) = await!(serve_future) {
        eprintln!("server error: {}", e);
    }
}
fn main() {
    // Set the address to run our socket on.
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
     // Call our run_server function, which returns a future.
    // As with every `async fn`, we need to run that future in order for
    // `run_server` to do anything. Additionally, since `run_server` is an
    // `async fn`, we need to convert it from a futures 0.3 future into a
    // futures 0.1 future.
    let futures_03_future = run_server(addr);
    let futures_01_future =
        futures_03_future.unit_error().boxed().compat(TokioDefaultSpawner);
     // Finally, we can run the future to completion using the `run` function
    // provided by Hyper.
    run(futures_01_future);
}

如果您现在 cargo run ,您应该在终端上看到 "Listening on http://127.0.0.1:300" 消息。如果您在所选择的浏览器中打开该URL,您将看到 "thread ... panicked at 'not yet implemented'." 现在我们只需要实际处理请求。首先,让我们返回一条静态消息:

async fn serve_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    // Always return successfully with a response containing a body with
    // a friendly greeting ;)
    Ok(Response::new(Body::from("hello, world!")))
}

如果你 cargo run 再次刷新页面,你应该看到 "hello, world!" 出现在浏览器中。恭喜!您刚刚在Rust中编写了第一个异步Web服务器。

您还可以检查请求本身,其中包含请求URI,HTTP版本,标头和其他元数据等信息。例如,我们可以打印出请求的URI,如下所示:

println!("Got request at {:?}", req.uri());

你可能已经注意到我们在处理请求时还没有做任何异步 - 我们只是立即响应,所以我们没有利用 async fn 给我们的灵活性。让我们尝试使用 Hyper 的HTTP客户端将用户的请求代理到另一个网站,而不仅仅是返回静态消息。

我们首先解析出我们想要请求的URL:

let url_str = "http://www.rust-lang.org/en-US/";
let url = url_str.parse::<Uri>().expect("failed to parse URL");

然后我们可以创建一个新的 hyper::Client 并使用它来发出 GET 请求,将响应返回给用户:

let res = await!(Client::new().get(url));
// Return the result of the request directly to the user
println!("request finished --returning response");
res

Client::get 返回 hyper::client::FutureResponse ,他实现了 Future<Output = Result<Response, Error>> (或 Future<Item = Response, Error = Error> 在futures 0.1)。当我们 await! 这个 future 时,HTTP请求已发出,当前任务被暂停,并且一旦响应可用,任务就排队等待继续。

现在,如果 cargo run 您在浏览器中打开 http://127.0.0.1:3000/foo ,您将看到Rust主页,以及以下终端输出:\

Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response

恭喜!您刚刚完成了代理HTTP请求。

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章