1、并发
Concurrent:程序的不同部分之间独立的执行
Parallel:程序的不同部分同时运行
Rust无畏并发:允许你编写没有细微Bug的代码,并在不引入新Bug的情况下易于重构
注意:我们的“并发”泛指concurrent 和 parallel。
2、使用线程同时运行代码
进程与线程
在大部分OS里,代码运行在进程(process)中,OS同时管理多个进程。
在你的程序里,各独立部分可以同时运行,运行这些独立部分的就是线程(thread)
多线程运行:
提升性能表现
增加复杂性:无法保障各线程的执行顺序
多线程可导致的问题
竞争状态,线程以不一致的顺序访问数据或资源
死锁,两个线程彼此等待对方使用完所持有的资源,线程无法继续
只在某些情况下发生的 Bug,很难可靠地复制现象和修复
实现线程的方式
通过调用OS的API来创建线程: 1:1模型
需要较小的运行时
语言自己实现的线程(绿色线程):M:N模型
需要更大的运行时
Rust:需要权衡运行时的支持
Rust标准库仅提供1:1模型的线程
除了汇编语言以外,其他的语言或多或少都会有运行时,像c/c++的运行时比较小
通过spawn创建新线程
通过thread:spawn函数可以创建新线程:
参数:一个闭包(在新线程里运行的代码)
use std::{thread, time::Duration};
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!",i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!",i);
thread::sleep(Duration::from_millis(1));
}
}
上面的代码中我们能新建了一个线程,但是只要主线程结束,我们的分线程不管有没有执行完都会立马结束。
thread:sleep会导致当前线程暂停执行
通过join Handle来等待所有线程的完成
thread:spawn函数的返回值类型是JoinHandle
JoinHandle持有值的所有权
调用其join方法,可以等待对应的其它线程的完成
join方法:调用handle 的 join方法会阻止当前运行线程的执行,直到 handle 所表示的这些线程终结。
use std::{thread, time::Duration};
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!",i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!",i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap();
}
use std::{thread, time::Duration};
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!",i);
thread::sleep(Duration::from_millis(1));
}
});
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!",i);
thread::sleep(Duration::from_millis(1));
}
}
使用move闭包
move闭包通常和 threadspawn函数一起使用,它允许你使用其它线程的数据
创建线程时,把值的所有权从一个线程转移到另一个线程
use std::thread;
fn main() {
let v = vec![1,2,3];
let handle = thread::spawn(move ||{
println!("Here's a vector :{:?}",v)
});
handle.join().unwrap();
}
这里为啥要使用move呢?那是因为v的生命周期可能在闭包执行之前就已经结束了,所以需要把v的所有权移交到闭包中
3、使用消息传递来跨线程传递数据
一种很流行且能保证安全并发的技术就是:消息传递。
线程(或Actor)通过彼此发送消息(数据)来进行通信
Go语言的名言:不要用共享内存来通信,要用通信来共享内存。
Rust:Channel(标准库提供)
Channel
Channel包含:发送端、接收端
调用发送端的方法,发送数据
接收端会检查和接收到达的数据
如果发送端、接收端中任意一端被丢弃了,那么Channel就“关闭”了
创建Channel
使用mpsc::channel函数来创建Channel
- mpsc表示multiple producer, single consumer(多个生产者、一个消费者)
- ―返回一个tuple(元组):里面元素分别是发送端、接收端
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let reveived = rx.recv().unwrap();
println!("Got {}",reveived);
}
在分线程中发送消息,主线程中接收消息,当主线程中的消息还没被接收到的时候,会一直阻塞知道接收到消息才会放通
发送端的send方法
参数:想要发送的数据
返回:Result<T,E>
―如果有问题(例如接收端已经被丢弃),就返回一个错误
接收端的方法
recv方法:阻止当前线程执行,直到Channel中有值被送来
一旦有值收到,就返回Result<T,E>
一当发送端关闭,就会收到一个错误
try_recv方法:不会阻塞,
―立即返回Result<T,E>:
·有数据达到:返回Ok,里面包含着数据。
否则,返回错误
一通常会使用循环调用来检查try_recv的结果
Channel和所有权转移
所有权在消息传递中非常重要:能帮你编写安全、并发的代码
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}",val);
});
let reveived = rx.recv().unwrap();
println!("Got {}",reveived);
}
这个时候就会报错了,因为val已经被借用出去了
发送多个值,看到接收者在等待
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx,rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for receive in rx {
println!("Got {}",receive);
}
}
通过克隆创建多个发送者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx,rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1:hi"),
String::from("1:from"),
String::from("1:the"),
String::from("1:thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
for receive in rx {
println!("Got {}",receive);
}
}
4、共享状态的并发
Go语言的名言:不要用共享内存来通信,要用通信来共享内存。
Rust支持通过共享状态来实现并发。
Channel类似单所有权:一旦将值的所有权转移至Channel,就无法使用它了
共享内存并发类似多所有权:多个线程可以同时访问同一块内存
使用Mutex来每次只允许一个线程来访问数据
Mutex是 mutual exclusion(互斥锁)的简写
在同一时刻,Mutex只允许一个线程来访问某些数据
想要访问数据:
–线程必须首先获取互斥锁(lock)
lock数据结构是mutex的一部分,它能跟踪谁对数据拥有独占访问权
mutex通常被描述为:通过锁定系统来保护它所持有的数据
Mutex的两条规则
在使用数据之前,必须尝试获取锁(lock)
使用完mutex所保护的数据,必须对数据进行解锁,以便其它线程可以获取锁。
Mutex<T>的API
通过Mutex:new(数据)来创建Mutex<T>
- Mutex<T>是一个智能指针
访问数据前,通过lock方法来获取锁
会阻塞当前线程lock可能会失败
返回的是 Mutex
Guard(智能指针,实现了Deref和 Drop)
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}",m);
}
多线程共享Mutex<T>
使用Arc<T>来进行原子引用计数
Arc<T>和Rc<T>类似,它可以用于并发情景
A: atpmic,原子的
为什么所有的基础类型都不是原子的,为什么标准库类型不默认使用Arc<T>?
需要性能作为代价
Arc<T>和Rc<T>的API是相同的
use std::sync::{Mutex,Arc};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move||{
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result : {}",*counter.lock().unwrap());
}
RefCell<T>/Rc<T> vs Mutex<T>/Arc<T>
Mutex<T>提供了内部可变性,和Cell家族一样
我们使用RefCell<T>来改变Rc<T>里面的内容
我们使用Mutex<T>来改变Arc<T>里面的内容
注意:Mutex<>有死锁风险
5、通过Send和 SyncTrait 来扩展并发
Send和 Sync trait
Rust语言的并发特性较少,目前讲的并发特新都来自标准库(而不是语言本身)
无需局限于标准库的并发,可以自己实现并发
但在 Rust语言中有两个并发概念:
- std::marker::Sync和std::marker.Send这两个trait
Send:允许线程间转移所有权
实现Send trait的类型可在线程间转移所有权
Rust中几乎所有的类型都实现了Send
一但Rc<T>没有实现Send,它只用于单线程情景
任何完全由Send类型组成的类型也被标记为Send
除了原始指针之外,几乎所有的基础类型都是Send
Sync:允许从多线程访问
实现Sync的类型可以安全的被多个线程引用
也就是说:如果T是Sync,那么&T就是Send
一引用可以被安全的送往另一个线程
基础类型都是Sync
完全由Sync类型组成的类型也是Sync
但,Rc<T>不是Sync的
RefCell<T>和Cell<T>家族也不是Sync的
而,Mutex<T>是Sync的
手动来实现Send和 Sync是不安全的