Fearless concurrent
Concurrency generally refers to:
Concurrent
Independent execution between different parts of a programParallel
Different parts of the program run simultaneously
Rust concurrency: Allows you to write code that is bug-free and does not make it easy to refactor if new bugs are introduced.
Processes and threads
In most OSS, code runs in processes, and the OS manages multiple processes simultaneously.
In your program, separate parts can run at the same time, and those separate parts are run by threads.
Although multithreading can improve performance, it also increases complexity and cannot guarantee the execution order of each thread.
Multithreading can cause problems:
- A race state in which threads access data or resources in an inconsistent order
- A deadlock occurs when two threads wait for each other to run out of resources and the thread cannot continue
- Bugs that only occur under certain circumstances are difficult to reliably replicate and fix
How to implement threads:
- Threads are created by calling OS apis: 1:1 model, requiring a smaller runtime
- The language’s own implementation of threads (green threads) : M:N model, requires a larger runtime
Rust needs to trade off runtime support, and Rust’s standard library only provides threads in a 1:1 model.
Thread ::spawn Creates the thread
Directly on the code:
use std::{thread, time::Duration};
fn main() {
thread::spawn(|| { / / closures
for i in 1.10 {
println!("number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1)); }});for i in 1.5 {
println!("number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1)); }}Copy the code
The closure content in this code is never finished, because the main program is finished, and the output may not be the same each time.
The thread::spawn function returns a JoinHandle. To resolve this problem, wait for all threads to complete in the JoinHandle.
The JoinHandle holds ownership of the value, and calling its Join method blocks execution of the current thread until the threads represented by the Handle terminate, thus waiting for the corresponding threads to complete.
use std::{thread, time::Duration};
fn main() {
let handle = thread::spawn(|| {
for i in 1.10 {
println!("number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1)); }});for i in 1.5 {
println!("number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap();
}
Copy the code
At this point the thread can finish executing.
Use a Move closure
The move closure is usually used with the Thread ::spawn function, which allows you to use data from other threads. When a thread is created, ownership of a value is transferred from one thread to another.
use std::thread;
fn main() {
let v = vec![1.2.3];
let handle = thread::spawn(move| | {println!("Here's a vector: {:? }", v);
});
handle.join().unwrap();
}
Copy the code
If you don’t add move, this code will fail because the lifetime of the V may be shorter than that of the closure and will be dropped before the closure can point to it.
The messaging
Messaging is a popular and secure concurrency technique. Threads communicate by sending messages to each other.
Channel
A Channel is provided by the standard library and is divided into a sender and a receiver. The sender’s method is called to send data, and the receiver checks and receives incoming data. If either end is discarded, the Channel is “closed”.
create
Create a channel using the MPSC ::channel function, which stands for multiple producer, single consumer. Calling this function returns a tuple, with the first element being the sender and the second element being the receiver.
use std::{thread, sync::mpsc};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move| | {let val = String::from("hi");
tx.send(val).unwrap(); // The ownership of val will be transferred
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
Copy the code
The send method on the sender, which takes the data you want to send, returns Result
, or an error if there is a problem (for example, the receiver has been dropped).
Methods at the receiving end:
recv
Method: Block the current thread from executing untilChannel
The median is sent in and returned as soon as any value is receivedResult<T, E>
When the sender is closed, an error is received.try_recv
Method: Does not block and returns immediatelyResult<T, E>
Return as data arrivesOk
, which also contains data. Otherwise, an error is returned. A circular call is usually used to checktry_recv
Results.
Channel and transfer of ownership
Ownership is very important in messaging: it helps you write secure, concurrent code.
The send method in the above example has the transfer ownership operation.
Send multiple values and see that the recipient is waiting
use std::{thread, sync::mpsc, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move| | {let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread")];for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1000)); }});for received in rx {
println!("Got: {}", received); }}Copy the code
Create multiple senders by cloning
use std::{thread, sync::mpsc, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move| | {let vals = vec![
String::from("1:hi"),
String::from("1:from"),
String::from("1:the"),
String::from("1:thread")];for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_millis(1000)); }}); thread::spawn(move| | {let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread")];for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1000)); }});for received in rx {
println!("Got: {}", received); }}Copy the code
Concurrency in shared state
Whereas earlier messaging shared memory through communication, Rust also supports concurrency through shared state.
A Channel is like single ownership in that once the ownership of a value is moved to a Channel, it is no longer usable, whereas shared memory is like multiple ownership in that multiple threads can access the same memory block at the same time.
Mutex
Mutex stands for “Mutual Exclusion.” Mutex allows only one thread to access some data at a time. To access the data, the thread must first acquire a Mutex lock.
The Lock data structure is part of MUtex and keeps track of who has exclusive access to the data.
Mutex is often described as protecting the data it holds by locking down the system
Mutex usage rules
- Before using the data, you must try to acquire the lock (
lock
) - In the use of the
mutex
After the data is secured, the data must be unlocked so that other threads can acquire the lock
Mutex API < T >
Mutex
is created by Mutex::new(data), which is a smart pointer.
Using the lock method to acquire the lock before accessing the data blocks the current thread, the lock may fail, and the MutexGuard (smart pointer that implements Deref and Drop) is returned.
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:? }", m);
}
Copy the code
Lock method locks, out of scope will be customized unlock.
Mutex<T>
Let’s start with an example of a mistake:
use std::{sync::Mutex, thread};
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
// Create ten threads
for _ in 0.10 {
let handle = thread::spawn(move| | {/ / complains
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Copy the code
The reason for the error is that the ownership of counter has been moved during the first loop and subsequent threads cannot take ownership.
We learned that Rc
can have multiple owners for a value, but Rc
still returns an error because it does not implement the send trait, so it cannot be passed safely between threads.
In concurrent scenarios, Arc
can be used for atomic reference counting. Arc
is similar to Rc
in that its API is almost identical. A is for atomic, which means atom. Arc
is not used by default in the Arc
library, and the base types are not atomic because of the performance trade-off.
use std::{sync::{Mutex, Arc}, thread};
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
// Create ten threads
for _ in 0.10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move| | {let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Copy the code
RefCell<T>/Rc<T> VS Mutex<T>/Arc<T>
Mutex<T>
Provides internal variability andCell
The family as- We use the
RefCell<T>
To change theRc<T>
What’s inside - We use the
Mutex<T>
To change theArc<T>
What’s inside - Note:
Mutex<T>
There is deadlock risk
The Send and Sync trait
Rust speech has few concurrency features, and most of the concurrency features currently available come from the standard library (rather than the language itself). We don’t need to be limited to the concurrency of the standard library, we can implement concurrency ourselves.
But there are two concepts of concurrency, or traits, in Rust:
std::marker::Sync
std::marker::Send
Note: Manually implementing Send and Sync is not safe.
Send
- Allows transfer of ownership between threads
- Almost all types are implemented in Rust
Send
, butRc<T>
Do not implementSend
, so it is only suitable for single-threaded scenarios - Anything wholly owned by
Send
Types composed of types are also marked asSend
- With the exception of primitive Pointers, almost all the base types are
Send
Sync
- implementation
Sync
Type can be safely referenced by multiple threads - In other words: if
T
是Sync
, then&T
isSend
References can be safely sent to another thread - The base types are both
Sync
- By completely
Sync
So are types composed of typesSync
, butRc<T>
notSync
The,RefCell<T>
和Cell<T>
The family is notSync
, andMutex<T>
To achieve theSync