日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

RUST異步流處理方法詳細講解_Rust語言

作者:上后左愛 ? 更新時間: 2023-01-15 編程語言

Stream 特質

在同步Rust 中流的核心是Iterator 提供了一種在序列中產生項的方法,并在它們之間進行阻塞,通過迭代器傳遞給其他迭代器

在異步Rust中流的核心Stream, 允許其他任務在當前阻塞等待時允許

Read/Write, AsyncRead/AsyncWrite

fn main() {
    let f = file::create("E:\\foot.txt").await?;
    f.write_all(b"hello world").await?;
    let f = file::open("E:\\foot.txt").await?;
    let mut buffer = Vec::new();
    f.read_to_end(&mut buffer).await?;
}

Stream 經典子流

source: 可以生成數據流

Sink: 可以消費數據流

Through: 消費數據,對其進行操作生成新數據流

Duplex: 流可以生成數據,也可以獨立消費數據(AsyncWrite/Read)

asyncread 和 Stream 區別

這兩種對byte 進行操作,AsyncRead 只能對byte進行操作(生成未解析數據),Stream對任何類型的數據進行操作(生成解析數據)

使用for_each_concurrent, try_for_each_concurrent 進行并發的處理流,進行流的處理

yield 匿名流

在async 異步過程中使用yield 關鍵字, 類似于Python 迭代產生時候可以返回,下一次從上一次返回值在進行開始跌打

try_join

如果某個發生錯誤后會立即返回數據

使用try_join 需要函數返回結果,并且錯誤的類型,才能正常運行

use futures;
use tokio::runtime::Runtime;
use std::io::Result;
async fn func1() -> Result<()> {
    tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
    println!("func1 finished!");
	Ok(())
}
async fn func2() -> Result<()> {
    println!("func2 finished!");
	Ok(())
}
async fn async_main() {
    let f1 = func1();
    let f2 = func2();

    if let Err(_) = futures::try_join!(f1, f2) {
		println!("Err!");
	}
}
fn main() {
    let mut runtime = Runtime::new().unwrap();
    runtime.block_on(async_main());
    println!("Hello, world!");
}

select

使用場景 有三個運行任務 ,只要其中一個完成后立馬返回,使用select

在使用select啟動使用pin_mut!(f1, f2), 使用select! 進行匹配

use futures::{select, future::FutureExt, pin_mut};
use tokio::runtime::Runtime;
use std::io::Result;
async fn func1() -> Result<()> {
	tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await;
	println!("func1 finished!");
	Ok(())
}
async fn func2() -> Result<()> {
	println!("func2 finished!");
	Ok(())
}
async fn async_main() {
	let f1 = func1().fuse();
	let f2 = func2().fuse();
	pin_mut!(f1, f2);
	// 使用select 進行匹配
	select! {
		_ = f1 => println!("func1 finished++++++!"),
		_ = f2 => println!("func2 finished++++++!"),
	}
}
fn main() {
// 使用tokio的runtime()
	let mut runtime = Runtime::new().unwrap();
	runtime.block_on(async_main());
    println!("Hello, world!");
}

select! y與default/complete 一起聯合使用

complete :表示兩個都已經就緒,default表示兩個都沒有就緒

use futures::{future, select, executor};
async fn count() {
	let mut a_fut = future::ready(4);
	let mut b_fut = future::ready(6);
	let mut total = 0;	
	loop {
		select! {
			a = a_fut => total += a,
			b = b_fut => total += b,
			complete => break,   //表示所有的分支都已經完成,并且不會再取得進展的情況
			default => unreachable!(), //表示沒有分支完成
		}
	}
	assert_eq!(total, 10);
}
fn main() {
	executor::block_on(count());
    println!("Hello, world!");
}

complete 表示所有分支都已經完成,并且不會取得進展的情況,如上所示,使用loop 第一次b分支準備好,下一次循環可能是a分支,最后兩個分支都已經完成后 就break退出

complete 類似讓所有分支都完成后直接退出

SELECT宏幾個條件

  • select中使用Future必須首先UnPinFuture trait, Fused trait
  • 必須實現UnpinFuture原因在于select! 不是按照值獲取,按照引用獲取,這樣能夠在不獲取future所有權條件下,未完成的future可以繼續使用
  • 必須實現FusedFuture: select 完成后不在輪詢future,因此需要實現FusedFuture 跟蹤Future是否完成
  • 如果select使用stream,其stream 也是需要實現FusedStream

async 問號使用

如果返回類型有Result<T, E> 結果使用.await?

Send trait

在保證多線程安全時候 需要保證接口實現Send trait 、sync trait 才能保證多線程的安全

Send trait 表示數據能夠在線程間安全的發送,sync trait 能夠保證線程安全的引用

use std::rc::Rc;
#[derive(Default)]
struct NoSend(Rc<()>);
async fn bar() {}
async fn foo() {
	NoSend::default();
	//{
	//	let x = NoSend::default();
	//	//to do : xxxxx
	//}
	let _ = NoSend::default();
	bar().await;
}
//Send trait:如果所有的子類型都是實現Send trait的,那么它本身也是實現Send Trait的
// 如果內部沒有定義 只是使用 是一個Send Trait 主要是在 生成 匿名結構體中 會進行解析
not let x: impl Send Trait
//struct Foo {
//	f: Future,
//}
let x: Not impl Send Trait
//struct Foo {
//	x: NoSend, //not impl Send Trait
//	f: Future, //impl Send Trait
//}
fn required_send(_: impl Send) {}
fn main() {
	required_send(foo());
    println!("Hello, world!");
}

原文鏈接:https://blog.csdn.net/qq_27217017/article/details/123601095

欄目分類
最近更新