Rust的并发

我爱海鲸 2024-03-08 12:21:58 rust学习

简介多线程

1、并发

Concurrent:程序的不同部分之间独立的执行

Parallel:程序的不同部分同时运行

Rust无畏并发:允许你编写没有细微Bug的代码,并在不引入新Bug的情况下易于重构

注意:我们的“并发”泛指concurrent 和 parallel。

2、使用线程同时运行代码

进程与线程

在大部分OS里,代码运行在进程(process)中,OS同时管理多个进程。

在你的程序里,各独立部分可以同时运行,运行这些独立部分的就是线程(thread)

多线程运行:

   提升性能表现

   增加复杂性:无法保障各线程的执行顺序

多线程可导致的问题

竞争状态,线程以不一致的顺序访问数据或资源

死锁,两个线程彼此等待对方使用完所持有的资源,线程无法继续

只在某些情况下发生的 Bug,很难可靠地复制现象和修复

实现线程的方式

通过调用OS的API来创建线程: 1:1模型

   需要较小的运行时

语言自己实现的线程(绿色线程):M:N模型

   需要更大的运行时

Rust:需要权衡运行时的支持

Rust标准库仅提供1:1模型的线程

除了汇编语言以外,其他的语言或多或少都会有运行时,像c/c++的运行时比较小

通过spawn创建新线程

通过thread:spawn函数可以创建新线程:

   参数:一个闭包(在新线程里运行的代码)

use std::{thread, time::Duration};

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!",i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!",i);
        thread::sleep(Duration::from_millis(1));
    }
}

上面的代码中我们能新建了一个线程,但是只要主线程结束,我们的分线程不管有没有执行完都会立马结束。

thread:sleep会导致当前线程暂停执行

通过join Handle来等待所有线程的完成

thread:spawn函数的返回值类型是JoinHandle

JoinHandle持有值的所有权

   调用其join方法,可以等待对应的其它线程的完成

join方法:调用handle 的 join方法会阻止当前运行线程的执行,直到 handle 所表示的这些线程终结。

use std::{thread, time::Duration};

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!",i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!",i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

use std::{thread, time::Duration};

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!",i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    
    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!",i);
        thread::sleep(Duration::from_millis(1));
    }

}

使用move闭包

move闭包通常和 threadspawn函数一起使用,它允许你使用其它线程的数据

创建线程时,把值的所有权从一个线程转移到另一个线程

use std::thread;

fn main() {
    let v = vec![1,2,3];
    let handle = thread::spawn(move ||{
        println!("Here's a vector :{:?}",v)
    });

    handle.join().unwrap();
}

这里为啥要使用move呢?那是因为v的生命周期可能在闭包执行之前就已经结束了,所以需要把v的所有权移交到闭包中

3、使用消息传递来跨线程传递数据

一种很流行且能保证安全并发的技术就是:消息传递。

线程(或Actor)通过彼此发送消息(数据)来进行通信

Go语言的名言:不要用共享内存来通信,要用通信来共享内存。

Rust:Channel(标准库提供)

Channel

Channel包含:发送端、接收端

调用发送端的方法,发送数据

接收端会检查和接收到达的数据

如果发送端、接收端中任意一端被丢弃了,那么Channel就“关闭”了

创建Channel

使用mpsc::channel函数来创建Channel

  • mpsc表示multiple producer, single consumer(多个生产者、一个消费者)
  • ―返回一个tuple(元组):里面元素分别是发送端、接收端
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx,rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let reveived = rx.recv().unwrap();

    println!("Got {}",reveived);
}

在分线程中发送消息,主线程中接收消息,当主线程中的消息还没被接收到的时候,会一直阻塞知道接收到消息才会放通

发送端的send方法

参数:想要发送的数据

   返回:Result<T,E>

   ―如果有问题(例如接收端已经被丢弃),就返回一个错误

接收端的方法

   recv方法:阻止当前线程执行,直到Channel中有值被送来

   一旦有值收到,就返回Result<T,E>

   一当发送端关闭,就会收到一个错误

   try_recv方法:不会阻塞,

   ―立即返回Result<T,E>:

      ·有数据达到:返回Ok,里面包含着数据。

      否则,返回错误

   一通常会使用循环调用来检查try_recv的结果

Channel和所有权转移

   所有权在消息传递中非常重要:能帮你编写安全、并发的代码

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx,rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}",val);
    });

    let reveived = rx.recv().unwrap();

    println!("Got {}",reveived);
}

这个时候就会报错了,因为val已经被借用出去了

发送多个值,看到接收者在等待

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx,rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(1));
        }
    });

    for receive in rx {
        println!("Got {}",receive);
    }

}

通过克隆创建多个发送者

 

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx,rx) = mpsc::channel();

    let tx1 = mpsc::Sender::clone(&tx);

    thread::spawn(move || {
        let vals = vec![
            String::from("1:hi"),
            String::from("1:from"),
            String::from("1:the"),
            String::from("1:thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
    });

    for receive in rx {
        println!("Got {}",receive);
    }

}

4、共享状态的并发

Go语言的名言:不要用共享内存来通信,要用通信来共享内存。

Rust支持通过共享状态来实现并发。

Channel类似单所有权:一旦将值的所有权转移至Channel,就无法使用它了

共享内存并发类似多所有权:多个线程可以同时访问同一块内存

使用Mutex来每次只允许一个线程来访问数据

   Mutex是 mutual exclusion(互斥锁)的简写

   在同一时刻,Mutex只允许一个线程来访问某些数据

想要访问数据:

   –线程必须首先获取互斥锁(lock)

   lock数据结构是mutex的一部分,它能跟踪谁对数据拥有独占访问权

   mutex通常被描述为:通过锁定系统来保护它所持有的数据

Mutex的两条规则

   在使用数据之前,必须尝试获取锁(lock)

   使用完mutex所保护的数据,必须对数据进行解锁,以便其它线程可以获取锁。

Mutex<T>的API

   通过Mutex:new(数据)来创建Mutex<T>

   - Mutex<T>是一个智能指针

访问数据前,通过lock方法来获取锁

   会阻塞当前线程lock可能会失败

   返回的是 Mutex

   Guard(智能指针,实现了Deref和 Drop)

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();

        *num = 6;

    }

    println!("m = {:?}",m);
}

多线程共享Mutex<T>

使用Arc<T>来进行原子引用计数

Arc<T>和Rc<T>类似,它可以用于并发情景

A: atpmic,原子的

为什么所有的基础类型都不是原子的,为什么标准库类型不默认使用Arc<T>?

需要性能作为代价

Arc<T>和Rc<T>的API是相同的

use std::sync::{Mutex,Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));

    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move||{
            let mut num = counter.lock().unwrap();

            *num += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result : {}",*counter.lock().unwrap());
}

RefCell<T>/Rc<T> vs Mutex<T>/Arc<T>

Mutex<T>提供了内部可变性,和Cell家族一样

我们使用RefCell<T>来改变Rc<T>里面的内容

我们使用Mutex<T>来改变Arc<T>里面的内容

注意:Mutex<>有死锁风险

5、通过Send和 SyncTrait 来扩展并发

Send和 Sync trait

Rust语言的并发特性较少,目前讲的并发特新都来自标准库(而不是语言本身)

无需局限于标准库的并发,可以自己实现并发

但在 Rust语言中有两个并发概念:
- std::marker::Sync和std::marker.Send这两个trait

Send:允许线程间转移所有权

实现Send trait的类型可在线程间转移所有权

Rust中几乎所有的类型都实现了Send
一但Rc<T>没有实现Send,它只用于单线程情景

任何完全由Send类型组成的类型也被标记为Send

除了原始指针之外,几乎所有的基础类型都是Send

Sync:允许从多线程访问

实现Sync的类型可以安全的被多个线程引用

也就是说:如果T是Sync,那么&T就是Send

一引用可以被安全的送往另一个线程
基础类型都是Sync
完全由Sync类型组成的类型也是Sync

   但,Rc<T>不是Sync的

   RefCell<T>和Cell<T>家族也不是Sync的

而,Mutex<T>是Sync的

手动来实现Send和 Sync是不安全的

你好:我的2025