StupidBeauty
Read times:1188Posted at: - no title specified

Rust0.11.0文档翻译:Rust任务及通信指南,The Rust Tasks and Communication Guide

内容目录

1 介绍

1.1 对于库的一个注解

2 基本概念

2.1 通信

2.2 后台计算:未来对象(Futures)

2.3 在不复制的情况下共享不可变数据:自动引用计数(Arc)

3 处理任务失败的情况

3.1 创建一个带有双向通信路径的任务

1 介绍

Rust通过以下二者的组合,提供了安全的并发功能:轻量级的、内存分隔的任务;和消息传递。这篇指南将会说明:Rust中的并发模型;该模型与Rust的类型系统有什么关系;以及,用于构建并发程序的基础库抽象。

Rust 中的任务与传统的线程不一样:它们 被称作 绿色线程 是轻量及的执行单元, Rust运行 时环境映射到少数几个操作系统线程中去,并且调度。 在多核系统中, Rust任务默认 会并行地调度。因为任务 创建起来要比传统的线程容易得多,所以 ,在一个典型的32 位系统上, Rust 可以创建数以十万计的并发任务 一般情况下,所有的 Rust代码 都是在任务中执行的,包括 main 函数

为了高效地使用内存,Rust中的任务拥有动态尺寸的栈。一个任务在刚开始的时候只会拥有一个较小的栈空间(目前是几千个字节,这取决于平台),之后就按照需要来获取更多的栈空间。与其它语言(例如C 语言)不同的是,Rust任务不会意外地对超出栈空间范围的内存进行写入操作,而引起崩溃或更坏的事情。

任务提供 了错误分隔及恢复能力。每当Rust 代码 因为 以下原因而发生一个致命错误时,运行 时系统会摧毁整个任务 显式调用 fail!() ;断言失败;或者其它 的无效操作。 与其它的语言(例如Java C++)不同, 在Rust中是无法捕获( catch )异常的。取而代之 的机制是,任务之间 可以互相监视其它任务的失败情况。

任务利用Rust 的类型系统来提供强大的内存安全保障。尤其 是,类型系统确保 了,任务之间无法共享 可变的状态。任务之间 是通过全局的 交换 传递 被拥有的 数据,以互相通信。

1.1 对于 库的一个注

尽管Rust的类型系统提供了用于构建安全且高效的任务的基础,但是,所有的任务功能本身是由标准库和同步(sync)库来实现的,而这两个东西仍然在开发当中,因此并非总是呈现出一个一致的或者说完整的接口。

供妳参考,以下是编写本文档时,与Rust 的并发性功能相关的标准模块列表:

  • •. std::task  - 与任务及任务调度相关的所有代码,

  • •. std::comm  - 消息传递接口,

  • •. sync::DuplexStream  - pipes::stream 的扩展,同时允许发送及接收,

  • •. sync::Arc  - 自动引用计数 Arc (atomically reference counted) )类型,用于安全 地共享不可变的数据,

  • •. sync::Semaphore  - 一个计数 、阻塞 、绑定-等待 型的信号灯,

  • •. sync::Mutex  - 一个阻塞 、绑定-等待型的互斥锁, 带有一个相关联的先入先出(FIFO)条件变量,

  • •. sync::RWLock  - 一个阻塞的、不饿死的读写锁, 带有一个相关联的条件变量,

  • •. sync::Barrier  - 一个栅栏,允许多个任务进行 同步以开始 某个计算过程

  • •. sync::TaskPool  - 一个任务池抽象,

  • •. sync::Future  - 一个类型,封装了某个计算过程的结果,该结果不一定已经完成,

  • •. sync::one  - 一个“ 一次性初始化 基本类型

  • •. sync::mutex  - 一个合适的互斥锁实现, 不会 对获取该锁的任务进行挑剔。

2 基本概念

用于创建 及管理任务的编程接口,位于 std 库的 task 模块中,因此 ,默认情况下可在所有 Rust代码 中使用。 最简单的情况下,调用 spawn 函数 ,并且传递一个闭包作为其参数 ,这样就可以创建一个任务 spawn 会在新的任务中执行该闭包。

// 使用一个命名函数,在某个不同的任务中输出一些深刻的文字

fn print_message() { println!("I am running in a different task!"); }

spawn(print_message);

// 使用`proc`表达式,在一个不同的任务中输出一些深刻的文字

// `proc`表达式的值会被计算成一个(匿名)被拥有的闭包。

// 那个闭包会在所创建的任务运行的时候调用`println!(...)`

spawn(proc() println!("I am also running in a different task!") );

Rust 中,创建任务并没有什么特殊的:任务 并不是语言语义中的一个什么概念。而是, Rust 的类型系统提供 了用来实现安全的并发性的所有必要工具:尤其 是, 被拥有的类型 。语言 将实现细节 留给标准库来做。

spawn 函数 的类型签名是狠简单的: fn spawn(f: proc()) 。因为 它只接受 被拥有的闭包, 而被拥有的闭包中只包含被拥有的数据,所以, spawn 可以安全地将整个闭包和与它关联的状态移动到一个完全不同的任务中去执行。 就与任何其它的闭包一样,传递 spawn 的函数可以捕获周围环境中的数据,并且携带到其它的任务中去。

// 在局部生成某些状态

let child_task_number = generate_task_number();

spawn(proc() {

// 在远程任务中捕获它

println! ( "I am child number {}" , child_task_number);

});

2.1 通信

我们已经创建了一个新任务,要是能跟它通信的话就狠有意思了。回想 一下, Rust 中没有共享的可变 状态,所以, 一个任务无法操作另一个任务所拥有的变量。 我们用的手段是管道( pipes )。

管道只是一对端点:一个用于发送消息,另一个用于接收消息。管道是低级的通信手段,因此以多种形式存在,每种形式都适用于特定的用途。接下来,我们讲述那些最常用的用途。

创建管道 的最简单手段就是,使用 channel 函数 来创建一个 (Sender, Receiver) 二元 对。 Rust术语 中,发送者( sender )是指管道的发送端,接收 者( receiver )是管道的接收端。研究 一下下面这个例子,它并行地计算两个结果

let (tx, rx): (Sender<int>, Receiver<int>) = channel();

spawn(proc() {

let result = some_expensive_computation();

tx.send(result);

});

some_other_expensive_computation();

let result = rx.recv();

让我们仔细研究一下这个示例。首先 let 语句创建 了一个用于 发送及接收整数的流( let 语句 的左侧, (tx, rx) ,是一个解构( destructuring let 的例子: 这个模式将一个元组分割成两个组件 )。

let (tx, rx): (Sender<int>, Receiver<int>) = channel();

子任务会使用发送器来向亲代任务发送数据,亲代任务会使用接收器来等待并接收数据。下一条语句,创建了子代任务。

spawn(proc() {

let result = some_expensive_computation();

tx.send(result);

});

注意 ,在创建任务闭包时, 就隐式地将 tx 转移 给子任务了:闭包捕获 了环境中的 tx Sender Receiver 都是可发送的类型,因此 可以被任务捕获,或者 在任务之间传递。 在这个示例中, 子任务进行了一项费时的计算,然后 将结果通过捕获 的频道发送回来。

最后,亲代任务自身也做了一些费时的计算,然后在接收器上等待子任务发来的结果:

some_other_expensive_computation();

let result = rx.recv();

channel 创建的 Sender Receiver 二元对,使得妳可以在单个发送者和单个接收者之间高效地通信,但是 不允许多个发送者使用同一个 Sender 值, 也不允许多个接收者使用同一个 Receiver 值。那么 ,如果我们的示例代码中需要由多个任务同时计算多个结果的话,怎么搞?下面 这个程序的代码写得不对:

let (tx, rx) = channel();

spawn(proc() {

tx.send(some_expensive_computation());

});

// 错误!前面的一个spawn语句已经拥有了发送器,

// 所以编译器不会允许再次捕获它

spawn(proc() {

tx.send(some_expensive_computation());

});

办法 还是有的,我们可以复制 tx ,这样就可以被多个发送者使用了。

let (tx, rx) = channel();

for init_val in range(0u, 3) {

// 创建 一个新的频道把柄( handle )以向子任务发送信息

let child_tx = tx.clone();

spawn( proc () {

child_tx.send(some_expensive_computation(init_val));

});

}

let result = rx.recv() + rx.recv() + rx.recv();

复制 一个 Sender 的话,会产生针对同一个频道的一个新的把柄, 这样就允许多个任务向单个接收器发送数据了。 它会在内部升级这个频道,以支持此处 所需的功能, 这就意味着, 未被复制的频道呢, 可以避免用于处理多个发送者所需的开销。但是 这个事实不会影响到对于频道的使用:升级过程是透明的。

注意 以上这个复制的示例是比较牵强的,因为 妳可以直接使用三对 Sender ,但是 ,这个示例只是为了说明问题。 供妳参考,如果使用多个通信 流的话,可以写出以下示例代码。

// 创建一个端口向量,其中每个端口用于一个子任务

let rxs = Vec::from_fn(3, |init_val| {

let (tx, rx) = channel();

spawn( proc () {

tx.send(some_expensive_computation(init_val));

});

rx

});

// 等待每个端口,最后将它们的结果累加起来

let result = rxs.iter().fold(0, |accum, rx| accum + rx.recv() );

2.2 后台计算 :未来对象 Futures

利用 sync::Future rust 中就有了一种机制, 可以先请求进行一个计算,日后再获取其结果。

以下基本示例展示了这种用法。

use std::sync::Future;

fn fib(n: u64) -> u64 {

// 费时 的计算,会返回一个 uint

12586269025

}

let mut delayed_fib = Future::spawn(proc() fib(50));

make_a_sandwich();

println!("fib(50) = {}", delayed_fib.get())

无论 fib(50) 需要计算多久, 在调用 future::spawn 时都会立即返回一个 future 对象。然后 妳可以趁着 fib 仍然在计算的时候给自己做个三明治来吃吃。日后 ,可调用 get 来获取这个方法的执行结果。 这个调用会阻塞,直到那个值变为可用( 也就是说 计算已经完 )。注意 这个未来对象必须是可变的, 这样,它就可以保存结果, 以便下次调用 get 时返回。

以下是另一个示例,展示的是如果使用未来对象来进行后台计算。这个负载会平均分配到各个CPU内核上。

fn partial_sum(start: uint) -> f64 {

let mut local_sum = 0f64 ;

for num in range(start* 100000 , (start+ 1 )* 100000 ) {

local_sum += (num as f64 + 1.0 ).powf(- 2.0 );

}

local_sum

}

fn main() {

let mut futures = Vec::from_fn( 1000 , |ind| Future::spawn( proc () { partial_sum(ind) }));

let mut final_res = 0f64 ;

for ft in futures.mut_iter()  {

final_res += ft.get();

}

println! ( "π^2/6 is not far from : {}" , final_res);

}

2.3 在不复制的情况下共享不可变数据:自动引用计数 Arc

要想在不同任务之间共享不可变数据的话,第一种手段就是使用管道,我们之前已经见识过。这样的话,对于每个任务,都会对该数据做一次复制以便于共享。某些情况下,这会导致显著的浪费内存,而且会引起对于同一份数据的多次不必要的复制。

为了避免这个问题, 妳可以使用Rust 的 sync 库中实现的自动引用计数(Atomically Reference Counted)包装器( Arc )。用上 了自动引用计数之后,数据 就不必为每个任务复制一次了。自动引用计数对象 就充当着被共享的数据的引用,并且只有 这个引用会被共享及复制。

以下是一个简单示例,展示了如何使用自动引用计数。我们希望,在一个巨大的浮点数向量上,并行地运行多个计算过程。每个任务都需要访问到整个向量,以完成它的任务。

use std::rand;

use std::sync::Arc;

fn pnorm(nums: &[f64], p: uint) -> f64 {

nums.iter().fold( 0.0 , |a, b| a + b.powf(p as f64)).powf( 1.0 / (p as f64))

}

fn main() {

let numbers = Vec::from_fn( 1000000 , |_| rand::random::<f64>());

let numbers_arc = Arc::new(numbers);

for num in range( 1u , 10 ) {

let task_numbers = numbers_arc.clone();

spawn( proc () {

println! ( "{}-norm = {}" , num, pnorm(task_numbers.as_slice(), num));

});

}

}

函数 pnorm 对该向量进行一个简单的计算( 它计算每个元素的幂,以给定的参数为指数,然后再计算这个值的幂的倒数,最后将所有结果相加 )。 该向量的自动引用计数对象是由下面这行代码创建的

let numbers_arc=Arc::new(numbers);

并且,每个任务都会捕获它的一个唯一的副本。这个过程只会复制包装器本身,而不会复制内容。在任务的代码体中,所捕获的自动引用计数对象可被当作是对于底层向量的一个不可变的引用,就像它是一个局部变量一样。

let task_numbers = numbers_arc.clone();

spawn(proc() {

// 捕获task_numbers并且使用 它,就好像它就是底层的向量一样

println! ( "{}-norm = {}" , num, pnorm(task_numbers.as_slice(), num));

});

arc 模块 中还实现了对于可变数据的自动引用计数功能,在此不赘述。

3 处理任务 失败 的情况

Rust 中内置了一种抛出异常的机制。 fail!() ( 也可以带上一个错误字符串作为参数: fail!( ~reason) ) assert! 结构(如果某个逻辑表达式 的值为假(false),它就会调用 fail!() ) 都可以用于抛出异常。 当某个任务抛出异常时,该任务会展开它的栈—— 一路运行对应的析构函数,并且释放内存——然后退出 与C++中的异常不同的是, Rust 里的异常对于单个任务来说是不可恢复的: 一旦某个任务失败了,就没有任何方法能够“捕获”那个异常。

尽管单个任务无法 从错误中恢复,但是,任务互相之间可以告知自己的失败。 最简单的处理任务失败情况的方式就是使用 try 函数 它类似于 spawn ,只是会立即阻塞,直到 子任务结束。 try 返回 一个类型为 Result<T, ()> 的值。 Result 是一个枚举( enum )类型,有两个变种: Ok Err 在以下示例中,因为 Result 的类型参数是 int () ,所以,调用 者可以 对结果进行模式匹配, 以检查, 它是一个带 int 字段的 Ok 结果(代表 着成功 ),还是一个 Err 结果(代表 着因为某个错误而终止运行 )。

let result: Result<int, ()> = task::try(proc() {

if some_condition() {

calculate_result()

} else {

fail! ( "oops!" );

}

});

assert!(result.is_err());

spawn 不同,使用 try 启动 的函数会返回一个值, try 会尽责地将这个值用一个 Result 枚举来回传给调用者。如果 子任务正常终止,则 try 会返回一个 Ok 结果 ;如果 子任务失败了, try 会返回一个 Error 结果。

注意 目前 ,失败的任务不会产生 一个有用的错误值 ( try 一定会返回 Err(()) ) 在未来,也许能够 让任务解析到被传递给 fail!() 的值。

待办 :需要 future_result 进行讨论, 以让失败模式变得有用。

但是,并非所有失败的地位都是平等的。某些情况下,妳可能确实需要终止整个程序(也许妳写的是一个断言,并且当它应验的话,就表示出现了一个不可恢复的逻辑错误);其它情况下,妳可能希望将该失败情况限制在某个范围内(也许是从外部世界来的一小砣输入信息,妳正在并行地处理它,但发现它不符合要求,而对它进行处理的过程无法继续下去)

3.1 创建 一个带有双向通信路径的任务

一个常见的事情就是,启动 一个子任务,并且让亲代任务和子任务之间能够互相交换消息。 sync::comm::duplex 函数支持 这种使用模式。 我们简要地看看怎么使用它。

为了观察 duplex 的工作过程, 我们创建一个子任务, 它会重复地接收一个 uint 消息,转换 为字符串,然后 将那个字符串回复回来。 当子任务接收到 0 的时候就会终止。 以下是子任务的函数

use std::comm::DuplexStream;

fn stringifier(channel: &DuplexStream<String, uint>) {

let mut value: uint;

loop {

value = channel.recv();

channel.send(value.to_str());

if value == 0 { break ; }

}

}

DuplexStream 的实现支持同时发送和接收。 stringifier 函数接受 一个 DuplexStream 作为参数, 它可以发送字符串 (第一 个类型参数 ) 及接收 uint 消息(第二 个类型参数 ) 。函数 体本身只是简单地循环,从频道中读取数据,然后回复结果。实际 的回复内容只是 所接收到的值的字符串版本, uint::to_str(value)

以下是亲代任务的代码:

use std::comm::duplex;

let (from_child, to_child) = duplex();

spawn(proc() {

stringifier( & to_child);

});

from_child.send(22);

assert!(from_child.recv().as_slice() == "22");

from_child.send(23);

from_child.send(0);

assert!(from_child.recv().as_slice() == "23");

assert!(from_child.recv().as_slice() == "0");

亲代任务首先调用 DuplexStream 来创建一对双向通信端点 。然后使用 task::spawn 来创建子任务, 子任务会捕获其中一个通信频道。结果 呢,亲代任务和子任务 都可以互相发送及接收数据。

Your opinions

Your name:Email:Website url:Opinion content:
- no title specified

HxLauncher: Launch Android applications by voice commands