Earlier we talked about how Rust manages threads and how to program with locks in Rust. Today we continue our study of concurrent programming.
Atom type
Many programming languages provide atomic types, and Rust is no exception. In the previous article we talked about the use of locks in Rust. With locks, you need to be careful about deadlocks. Atomic types are the best way that programming languages can provide us with lock-free concurrent programming. For those of you familiar with Java, the Java compiler does not guarantee the order in which code is executed. Instead, the compiler optimizes the order in which our code is executed. This operation is called instruction rearrangement. Rust’s multithreaded memory model does not rearrange instructions, but ensures their execution order.
In general, atomic types provide the following operations:
- Load: Reads the value from the atomic type
- Store: Writes a value to an atomic type
- CAS: compares And swaps CAS
- Exchange Swap:
- Fetch-add (sub/and/or) : represents a series of atomic addition and subtraction or logical operations
Ok, with those basic concepts behind us, let’s take a look at what types of atoms Rust provides us with. Rust’s atomic types are defined in the standard library STD :: Sync :: Atomic, and it currently offers 12 atomic types.
In the following code Rust demonstrates how to implement a spin lock using atomic types.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
let spinlock = Arc::new(AtomicUsize::new(1));
let spinlock_clone = spinlock.clone();
let thread = thread::spawn(move|| {
spinlock_clone.store(0, Ordering::SeqCst);
});
whilespinlock.load(Ordering::SeqCst) ! =0 {}
if let Err(panic) = thread.join() {
println!("Thread had an error: {:? }", panic); }}Copy the code
We use AtomicUsize’s store method to set its value to 0, and then use the load method to get its value. If it is not 0, the program will keep idling. In the store and load methods, we have used the same parameter :SeqCst, which is also atomic.
We found in the documentation that it is an enumeration. The definition for
pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}
Copy the code
What it does is give control of the memory order to the developer, and we can define the underlying memory order ourselves. So let’s take a look at what these five sorts mean
- Relaxed: means “no order”, i.e. the developer does not interfere with thread order, threads only perform atomic operations
- Release: For store operations that use Release, all load operations that use Acquire before it are visible
- Acquire: For the load operation using Acquire, all store operations that used Release before it are also visible
- AcqRel: This represents load operations in Acquire order when reading and Store operations in Release order when writing
- SeqCst: Atomic operations that use SeqCst must be stored before being loaded.
Relaxed is generally recommended over SeqCst.
Interthread communication
The Go language documentation says: Don’t use shared memory to communicate, use communication to implement shared memory.
The Rust standard library has chosen the CSP concurrency model, which relies on channels for communication between threads. It is defined in the standard library STD ::sync:: MPSC, which defines three types of CSP processes:
- Sender: Sends asynchronous messages
- SyncSender: Sends a synchronization message
- Receiver: Used to receive messages
Let’s take a look at how a channel is created and sends and receives messages.
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move| | {let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
Copy the code
First, we use the channel() function to create a channel, which returns a (Sender, Receiver) tuple. Its buffer is unbounded. In addition, we can use sync_channel() to create a channel, which returns a tuple (SyncSender, Receiver) that sends messages synchronously and can be buffer-sized.
Next, in the child thread, we define a string variable and use the send() function to send a message to the channel. Send returns a Result, so use unwrap to propagate the error.
At the end of main, we use the recv() function to receive the message.
It is important to note that the send() function transfers ownership, so if you use the val variable after sending the message, the program will report an error.
Now that we have mastered the method of using Channel to communicate between threads, here is a code, interested students can run this code to see if it can be successfully executed. If not, how do you modify this code?
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
for i in 0.5 {
let tx = tx.clone();
thread::spawn(move || {
tx.send(i).unwrap();
});
}
for rx in rx.iter() {
println!("{:? }", j); }}Copy the code
The thread pool
In practice, if a new thread is created every time, the overhead of each creation and destruction of the thread becomes significant and even becomes a bottleneck of system performance. For this kind of problem, we usually use thread pools.
There is no thread pool readily available in Rust’s standard library, but there are some third-party libraries to support it. Here I’m using a ThreadPool.
You first need to add a dependency threadPool = “1.7.1” to Cargo. Toml. You can then use use threadpool:: threadpool; Introducing a ThreadPool into our program.
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
fn main() {
let n_workers = 4;
let n_jobs = 8;
let pool = ThreadPool::new(n_workers);
let (tx, rx) = channel();
for _ in 0..n_jobs {
let tx = tx.clone();
pool.execute(move|| {
tx.send(1).expect("channel will be there waiting for the pool");
});
}
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
}
Copy the code
Here we use ThreadPool::new() to create a ThreadPool and initialize four worker threads. Use the execute() method to take out a thread to do specific work.
conclusion
Today we looked at three features of Rust concurrent programming: atomic types, interthread communication, and the use of thread pools.
Atomic types are an important tool for lock-free concurrency, and inter-thread communication and thread pooling are also required in our work. Of course, the knowledge of concurrent programming is far more than this, you can learn by yourself or communicate with me.