600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > rust异步编程--理解并发/多线程/回调/异步/future/promise/async/await/tokio

rust异步编程--理解并发/多线程/回调/异步/future/promise/async/await/tokio

时间:2024-05-19 17:17:26

相关推荐

rust异步编程--理解并发/多线程/回调/异步/future/promise/async/await/tokio

1. 异步编程简介

通常我们将消息通信分成同步和异步两种:

同步就是消息的发送方要等待消息返回才能继续处理其它事情异步就是消息的发送方不需要等待消息返回就可以处理其它事情

很显然异步允许我们同时做更多事情,往往也能获得更高的性能。异步编程,是一种被越来越多编程语言支持的并发编程模型。

1.1 常见的并发编程模型

并发编程相对于常规、顺序式编程不够成熟或“标准化”。结果是,我们表达并发的方式不一样,取决于语言支持哪种并发模型。

常见的并发模型有:

OS 线程

不需要编程模型作任何改动,这使得表达并发很容易。然而,线程间同步可能会很困难,并且性能开销很大。线程池可以较少一部分开销,但是不足够支持超大量 IO 密集负载。事件驱动编程

可以变得高性能,但倾向于导致冗长,“非线性”的控制流。数据流和错误传播通常就变得很难跟进了。协程

就像线程,但不需要改变编程模型,于是他们变得便于使用。像异步,他们可以支持大量的任务。然而,他们抽象了对于系统编程和自定义运行时实现非常重要的底层细节。

1.2 异步 vs 回调

异步编程的核心问题是如何处理通信:要么有办法知道通信有没有完成,要么能保证在通信完成后执行一段特定的逻辑。前者就是通知机制,比如信号量、条件变量等;后者就是callback,即回调。

当一项任务需要分成多个异步阶段完成时,就需要在每个阶段的回调函数中加入下阶段回调的代码,最终产生下面这样金字塔形状的代码:

getData = function(param, callback){$.get('/get/'+param,function(responseText){callback(responseText);});}getData(0, function(a){getData(a, function(b){getData(b, function(c){getData(c, function(d){getData(d, function(e){// ...});});});});});

可以想象当回调层次继续增加时,代码有多恐怖。这就是回调噩梦。

1.3 异步 vs 多线程

OS 线程

适合少量任务,因为线程会有 CPU 和内存开销。生成和切换线程是代价相当昂贵,甚至闲置的线程也会消耗系统资源。一个线程池库可以减轻这些开销,但并不能全部健康。然而,线程能让你重新利用存在的同步代码,而不需要大改源代码——不需要特别的编程模型。

异步

极大地降低了 CPU 和内存开销,尤其是再负载大量越过IO 边界的任务,例如服务器和数据库。同样,你可以处理比 OS 线程更高数量级的任务,因为异步运行时使用少量(昂贵的)线程来处理大量任务

这个例子的目标,是并发地下载两个网页。在典型的线程化(threaded)应用中,我们需要生成线程来达到并发:

fn get_two_sites() {// 生成两个线程来下载网页.let thread_one = thread::spawn(|| download("https:://"));let thread_two = thread::spawn(|| download("https:://"));// 等待两个线程运行下载完成.thread_one.join().expect("thread one panicked");thread_two.join().expect("thread two panicked");}

然而,下载网页是小任务,为了这么少量工作创建线程相当浪费。对更大的应用来说,这很容易就会变成瓶颈。在异步 Rust,我们能够并发地运行这些任务而不需要额外的线程:

async fn get_two_sites_async() {// 创建两个不同的 "futures", 当创建完成之后将异步下载网页.let future_one = download_async("https:://");let future_two = download_async("https:://");// 同时运行两个 "futures" 直到完成.join!(future_one, future_two);}

这里没有创建额外的线程。此外,所有函数调用都是静态分发的,也没有堆分配!然而,我们需要先编写能够异步执行的代码。

1.4 Future和Promise

Future和Promise来源于函数式语言,其目的是分离一个值和产生值的方法,从而简化异步代码的处理。

Future指一个只读的值的容器,这个值可能立即可用,也可能在未来某个时间可用。而Promise则是一个只能写入一次的对象。每个Promise关联一个Future,对Promise的写入会令Future的值可用。

Future与Promise配合起来可以实现一种可靠的通知机制,即我们可以异步执行一个方法,通过返回的Future来知道异步方法何时结束、是否成功、返回值是什么。

// 调用方void SyncOperation() {Promise<int> promise;RunAsync(std::bind(AsyncFunc, promise));Future<int> future = promise.GetFuture();int result = future.Get(); // wait until future is done}// 接收方void AsyncFunc(Promise<int> promise) {// do somethingpromise.Done(result);}

Promise的一个重要特性就是它支持then,可以将金字塔式的回调组织为链式,极大地降低了理解和维护的难度:

getData = function(param, callback){return new Promise(function(resolve, reject) {$.get('/get/'+param,function(responseText){resolve(responseText);});});}getData(0).then(getData).then(getData).then(getData).then(getData);

1.5 async/await

async/.await在promise链式代码的基础上,更进一步,让异步函数编写得像同步代码。

getData = async function(param, callback){return new Promise(function(resolve, reject) {$.get('/get/'+param,function(responseText){resolve(responseText);});});}var data = await getData(0);var data1 = await getData(data);var data2 = await getData(data1);var data3 = await getData(data2);var data4 = await getData(data3);

这种写法要比Promise链更接近同步,也更易懂,但其底层依然是Promise。这种写法很接近于协程:用Promise来实现yield和resume,它就是一种协程。

async在运行之前什么都不做。运行async函数的最常见方式是 await它。当在async函数上调用 await时,它将尝试运行以完成它。如果函数被阻止,它将让出当前线程。当可以取得更多进展时,执行者将继续运行,以便 await 解决。

2. rust并发编程

2.1 rust多线程

下面是一个简单的程序,它可以显示10次Sleepus消息,每次间隔 0.5秒;同时显示5次Interruptus消息,每次间隔1秒。

use std::thread::{sleep, spawn};use std::time::Duration;fn sleepus() {for i in 1..=10 {println!("Sleepus {}", i);sleep(Duration::from_millis(500));}}fn interruptus() {for i in 1..=5 {println!("Interruptus {}", i);sleep(Duration::from_millis(1000));}}fn main() {let sleepus = spawn(sleepus);let interruptus = spawn(interruptus);sleepus.join().unwrap();interruptus.join().unwrap();}

可以看到,和其他语言的多线程编程写法基本类似。不需要对同步函数代码做太大修改。

2.2 基于async/await的rust异步编程

我们对上面的例子,进行异步改造,实现在单一线程内让两个任务 协作执行。

use async_std::task::{sleep, spawn};use std::time::Duration;async fn sleepus() {for i in 1..=10 {println!("Sleepus {}", i);sleep(Duration::from_millis(500)).await;}}async fn interruptus() {for i in 1..=5 {println!("Interruptus {}", i);sleep(Duration::from_millis(1000)).await;}}#[async_std::main]async fn main() {let sleepus = spawn(sleepus());interruptus().await;sleepus.await;}

看起来有很多修改,不过实际上,我们的代码结构和之前的版本基本是一致的。

异步函数能够与普通的 Rust 函数一样使用。但是,调用这些函数不意味着执行这些函数,调用 async fn 类型的函数返回的是一个代表该操作的标识。在概念上他跟一个无参的闭包函数类型。为了能够真正的执行它,你需要在函数返回的标识上使用 .await 操作。

比如:

async fn say_world() {println!("world");}#[tokio::main]async fn main() {// Calling `say_world()` does not execute the body of `say_world()`let op = say_hello();// This println! comes firstprintln!("hello");// Calling `.await` on `op` starts executing `say_world`.op.await;}

输出

helloworld

3. rust Tokio库

Tokio 是 Rust 的异步 runtime,可用于编写快速、可靠的网络应用。Tokio 还提供用于 TCP、UDP、计时器、多线程、工作窃取算法(work-stealing)调度等的 API。

3.1 Tokio 入门

我们从写一个最基础的的 Tokio 程序开始,这个程序会连接到 MiniRedis 的服务端,然后设置一个 key 为 hello,value 为 world 的键值对,然后再把这个键值对读取回来。

打开 Cargo.toml ,并在 [dependencies] 后添加下面的代码

tokio = {version = "1", features = ["full"] }mini-redis = "0.4"

代码如下

use mini_redis::{client, Result};#[tokil::main]pub async fn main() -> Result<()> {// Open a connection to the mini-redis address.let mut client = client::connect("127.0.0.1:6379").await?;// Set the key "hello" with value "world"client.set("hello", "world".into()).await?;// Get key "hello"let result = client.get("hello").await?;println!("got value from the server; result={:?}", result);Ok(())}

接下来花点时间梳理下我们刚才做的事情。代码并不多,但其中却触发了许多的事情。

let mut client = client::connect("127.0.0.1:6379").await?;

函数 client::connect 是 mini-redis 这个包所提供的,他会使用指定的地址来异步的创建一个 TCP 连接,当这个连接建立成功时, client 则保存了该函数返回的结果。尽管这个操作是异步发生的,但代码 看起来 却是同步的。其中唯一指示了该操作为异步的只有 .await 操作符。

用来启动程序的 main 函数其他普通的 Rust 程序的有所不同:

被定义为 async fn添加了 #[tokio::main] 宏

async fn 函数在我们需要执行异步操作的上下文中被使用。然而,异步函数需要通过 runtime 来运行,runtime 中包含异步任务的调度器,他提供了事件驱动的 I/O、定时器等。runtime 并不会自动的运行,所以需要在主函数中运行它。

我们在 async fn main() 函数中添加的 #[tokio::main] 宏会将其转换为同步的 fn main() 函数,该函数会初始化 runtime 并执行我们定义的异步的 main 函数。

比如

#[tokio::main]async fn main() {println!("hello");}

会被转换为

fn main() {let mut rt = tokio::runtime::Runtime::new().unwrap();rt.block_on(async {println!("hello");})}

3.2 Spawning(并发)

接下来,我们写一个Redis服务端

use mini_redis::{Connection, Frame};use tokio::net::{TcpListener, TcpStream};#[tokio::main]async fn main() {let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();loop {// The second item contains the IP and Port or the new connectionlet (socket, _) = listener.accept().await.unwrap();process(socket).await;}}async fn process(socket: TcpStream) {// The `Connection` lets us read/write redis **frame** instead of// byte streams. The `Connection` type is defined by mini-redislet mut connection = Connection::new(socket);if let Some(frame) = connection.read_frame().await.unwrap() {println!("GOT: {:?}", frame);// Response with an errorlet response = Frame::Error("unimplemented".to_string());connection.write_frame(&response);}}

每次只能处理一个请求。当接收了一个连接后,服务端会在当前循环中一直堵塞直到返回信息完全写到套接字中。

我们希望 Redis 服务能够同时处理多个请求,所以我们需要让他并发 (Concurrenty) 起来。

use tokio::net::TcpListener;#[tokio::main]async fn main() {let listener = TcpListener::bind("127.0.0.1:6379")loop {let (socket, _) = listerner.accept().await.unwrap();// A new task is spqwned for each inbound socket. the socket is// moved to the new task and processed there.tokio::spawn(async move {process(socket).await;});}}

Tokio 的任务是异步的绿色线程,他通过传递给 tokio::spawn 的 async 语句块创建,这个函数接收 async 语句块后返回一个 JoinHandle,调用者则通过 JoinHandle 与创建的任务交互。有些传递的 async 语句块是具有返回值的,调用者通过 JoinHandle 的 .await 来获取其返回值,

#[tokio::main]async fn main() {let handle = tokio::spawn(async {"return value"});// Do some other worklet out = handle.await.unwrap();println!("GOT {}", out);}

任务在 Tokio 中是非常轻量的,实际上他只需要申请一次 64 个字节的内存。所以程序可以轻松的产生成千上万的任务。

接下来继续实现 process 函数来处理接收的命令。我们将使用 HashMap 来存储收到的值,SET 操作会插入一条新的记录到 HashMap 中,而 GET 操作则从中读取。并且,我们还会使用一个循环来处理来自同个连接的多个命令。

async fn process(socket: TcpStream) {// The `Connection` lets us read/write redis **frame** instead of// byte streams. The `Connection` type is defined by mini-redisuse mini_redis::Command::{self, Get, Set};use std::collections::HashMap;// A hashmap is used to store datalet mut db = HashMap::new();// Connection, provided by `mini-redis`, handles parsing frames from// the socketlet mut connection = Connection::new(socket);while let Some(frame) = connection.read_frame().await.unwrap() {let response = match Command::from_frame(frame).unwrap() {Set(cmd) => {db.insert(cmd.key().to_string(), cmd.value().to_vec());Frame::Simple("OK".to_string())}Get(cmd) => {if let Some(value) = db.get(cmd.key()) {Frame::Bulk(value.clone().into())} else {Frame::Null}}cmd => panic!("unimplemented {:?}", cmd),};connection.write_frame(&response).await.unwrap();}

现在我们能获取跟设置信息了,但还存在一个问题。设置的信息还没办法在不同的连接中共享,如果其他的套接字连接尝试使用 GET 命令获取 hello 的值,他将找不到任何东西。

3.3 Channel(消息队列)

Channel大部分用在消息传递的场景中。

Tokio 提供了数种用于处理不同场景的 Channel

mpsc: 多生产者、单消费者的 Channel,能够发送多个信息oneshot 单生产者、单消费者的 Channel,只能发送一个信息broadcast 多生产者、多消费者,能够发送多个信息,每个消费者都能收到所有信息watch 单生产者、多消费者,能够发送多个信息,但不会保存历史信息,消费者只能收到最新的信息

我们创建一个 mppsc 类型的 Channel

use tokio::sync::mpsc;#[tokio::main]async fn main() {// Create a new channel with a capacity of at most 32let (tx, mut rx) = mpsc::channel(32);}

mpsc 的 Channel 将用来发送命令给管理 Redis 连接的任务,其多生产者的模式允许多个任务通过他来发送消息。创建 Channel 的函数返回了两个值,一个发送者跟一个接收者,这两个句柄通常是分开使用的,他们会被移到到不同的任务中。

创建 Channel 时设置了容量为 32,如果消息发送的速度超过了接收的速度,这个 Channel 只会最多保存 32 个消息,当其中保存的消息超过了 32 时,继续调用 send(…).await 会让发送的任务进入睡眠,直到接收者又从 Channel 中消费了消息。

在使用中会通过 clone 发送者的方式,来让多个任务同时发送消息,如下例

use tokio::sync::mpsc;#[tokio::main]async fn main() -> Result<()> {let (tx, mut rx) = mpsc::channel(32);let tx2 = tx.clone();tokio::spawn(async move {tx.send("sending from first handle").await;});tokio::spawn(async move {tx2.send("sending from second handle").await;});while let Some(message) = rx.recv().await {println!("GOT = {}", message);}Ok(())}

每个消息最后都会发送给唯一的接收者,因为通过 mpsc 创建的接收者是不能 clone 的。

当所有发送者出了自身的作用域或被 drop 后就不再允许发送消息了,在这个时候接收者会返回 None,意味着所有的发送者已经被销毁,所以 Channel 也已经被关闭了。

3.4 Select(等待多个异步任务)

需要可以并发运行程序时,可以通过 spawn 创建一个新的任务。

tokio::select! 宏允许我们等待多个异步的任务,并且在其中一个完成时返回。

use tokio::sync::oneshot;#[tokio::main]async fn main() {let (tx1, rx1) = oneshot::channel();let (tx2, rx2) = oneshot::channel();tokio::spawn(async {let _ = tx1.send("one");});tokio::spawn(async {let _ = tx2.send("two");});tokio::select! {val = rx1 => {println!("rx1 completed first with {:?}", val);}val = rx2 => {println!("rx2 completed first with {:?}", val);}}}

这里我们使用了两个 OneShot Channel, 每个 Channel 都可能会先完成,select! 语句同时等待这两个 Channel,并在操作完成时将其返回值绑定到语句块的 val 变量,然后执行对应的完成语句。

要注意的是,另一个未完成的操作将会被丢弃,在这个示例中,对应的操作是等待每一个 oneshot::Receiver 的结果,最后未完成的那个 Channel 将会被丢弃。

3.5 Streams(异步迭代)

Stream 表示一个异步的数据序列,我们用 Stream Trait 来表示跟标准库的 std::iter::Iterator 类似的概念。

Tokio 提供的 Stream 支持是通过一个独立的包来实现的,他就是 tokio-stream

到目前为止,Rust 这门编程语言尚未支持异步的循环,因此要对 Stream 进行迭代我们需要用到 while let 循环及 StreamExt::next().

use tokio_stream::StreamExt;#[tokio::main]async fn main() {let mut stream = tokio_stream::iter(&[1, 2, 3]);while let Some(v) = stream.next().await {println!("GOT = {:?}", v);}}

现在来看一个略微复杂的 Mini-Redis 客户端的例子。

use tokio_stream::StreamExt;use mini_redis::client;async fn publish() -> mini_redis::Result<()> {let mut client = client::connect("127.0.0.1:6379").await?;// Publish some dataclient.publish("numbers", "1".into()).await?lclient.publish("numbers", "two".into()).await?lclient.publish("numbers", "3".into()).await?lclient.publish("numbers", "four".into()).await?lclient.publish("numbers", "five".into()).await?lclient.publish("numbers", "6".into()).await?l}async fn subscribe() -> mini_redis::Result<()> {let client = client::connect("127.0.0.1:6379").await?;let subscriber = client::subscribe(vec!["numbers".to_string()]).await?;let messages = subscriber.into_stream();tokio::pin!(messages);while let Some(msg) = messages.next().await {println!("Got = {:?}", msg);}Ok(())}#[tokio::main]async fn main() -> mini_redis::Result<()> {tokio::spawn(async {publish().await});subscribe().await?;println!("DONE");Ok(())}

在上面的代码中我们创建了一个用 Mini-Redis 在 numbers频道中发布消息的任务,而在主任务中,我们订阅了 number 频道,并且每次在收到该频道的消息时将它打印了出来。

在订阅之后,我们在订阅者上面调用了 into_stream() 函数,这个函数消费了 Subscriber 然后返回了一个 在接收到消息时迭代数据的 Stream。

参考

https://fuzhe1989.github.io//01/30/future-promise/

/news/124525/tokio-1-0-released

/people/sinsay-chen/posts

/docs/async-book/

进一步学习Rust

欢迎加入我的知识星球,我会提供一些学习资料(书籍/视频)以及解答一些问题。

大家一起学习Rust 😃

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。