Basics of multithreading in Rust

Basics of multithreading in Rust

Greeting!

I think everyone already knows that multithreading is a must-have for most programs.

Rust offers good solutions for multithreading tasks. Common problems like data races or mismanagement of memory have no place in Rust thanks to its system of owning and borrowing.

Multithreading in Rust

At the beginning, it is worth understanding what streams are. A thread is the smallest unit of processing, which can be performed by the operating system. In Rust, threads can be created using the standard library using std::thread. This allows you to perform several tasks at the same time.

use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Привет из потока! {}", i);
            thread::sleep(std::time::Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("Привет из основного потока! {}", i);
        thread::sleep(std::time::Duration::from_millis(1));
    }

    handle.join().unwrap();
}

We create a new stream using thread::spawn and synchronize with the main flow through join.

The data race occurs when two or more threads simultaneously attempt to access the same data, and at least one of the threads modifies that data. In Rust, thanks to the system of owning and borrowing, data races can be prevented at compile time:

Mutex, or mutual exception, is a synchronization mechanism used to prevent multiple threads from accessing a shared resource at the same time. Mutex<T> provides safe access to the data it contains: only one thread can own the mutex at a given time.

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Результат: {}", *counter.lock().unwrap());
}

Atomic operations are operations that are guaranteed to be performed completely, without the possibility of interruption by other threads. In Rust, atomic types such as AtomicBool or AtomicUsizeprovide primitives for safe execution of operations in a multi-threaded environment:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    let counter = AtomicUsize::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = &counter;
        let handle = thread::spawn(move || {
            for _ in 0..1000 {
                counter.fetch_add(1, Ordering::SeqCst);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Результат: {}", counter.load(Ordering::SeqCst));
}

RwLock allows many threads to read data at the same time, but only one thread to change it at any time:

use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    let data = Arc::new(RwLock::new(5));

    let reader = thread::spawn({
        let data = Arc::clone(&data);
        move || {
            println!("Значение данных: {}", *data.read().unwrap());
        }
    });

    let writer = thread::spawn({
        let data = Arc::clone(&data);
        move || {
            let mut data = data.write().unwrap();
            *data += 1;
        }
    });

    reader.join().unwrap();
    writer.join().unwrap();
    println!("Новое значение данных: {}", *data.read().unwrap());
}

When the data is wrapped in Arc, they can be safely shared between multiple threads. Each thread increments the data access link counter and decrements it when it terminates.

Immutability of data – if the data does not change, it is safe to share it between threads without additional synchronization. Rust encourages the use of immutable data wherever possible.

Stream Pools and Rayon

A thread pool is a collection of pre-created threads that can perform tasks. Instead of creating a new thread every time it is needed, which is a resource-intensive operation, tasks can be sent to a pool where they are executed by available threads.

Increasingly, thread pools are often used for distributed data processing, parallel computing, and in scenarios where tasks can be independently executed in a parallel manner. Creating and managing a thread pool by hand can be quite a challenge, but luckily there are libraries in the Rust ecosystem that make this process easier.

Rayon – This is a library in the Rust ecosystem designed to simplify parallel computing. It makes it easy to apply parallel iterations and other parallel patterns to data, greatly simplifying the process of writing multi-threaded code.

Parallel iterators – this is the basis of Rayon, they allow you to easily perform iterative operations in parallel mode, for example in map:

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5];
    let squares: Vec<_> = nums.par_iter().map(|&i| i * i).collect();
    println!("{:?}", squares);
}

The code takes a vector of numbers and creates a new vector containing the squares of those numbers using parallel map.

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    let even_nums: Vec<_> = nums.par_iter().filter(|&&x| x % 2 == 0).collect();
    println!("{:?}", even_nums);
}

We filter the vector of numbers, leaving only even numbers, again using parallel operations.

Rayon also provides features for parallel packagessuch as reduce and fold:

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5];
    let sum: i32 = nums.par_iter().reduce(|| 0, |a, b| a + b);
    println!("Сумма: {}", sum);
}

We sum up the numbers in the vector using parallel reduce.

use rayon::prelude::*;

fn main() {
    let nums = vec![1, 2, 3, 4, 5];
    let sum: i32 = nums.par_iter().fold(|| 0, |a, &b| a + b).sum();
    println!("Сумма: {}", sum);
}

We do the same as in the previous example, but use fold instead reduce.

Rayon also offers functions for parallel sorting of arrays and vectors.

use rayon::prelude::*;

fn main() {
    let mut nums = vec![10, 5, 3, 1, 4, 2, 6, 9, 8, 7];
    nums.par_sort();
    println!("{:?}", nums);
}

We sort the vector of numbers in ascending order using parallel sorting.

Rayon also makes it easy to parallelize execution of arbitrary tasks:

use rayon::prelude::*;

fn main() {
    let data = vec![1, 2, 3, 4, 5];

    data.into_par_iter().for_each(|num| {
        println!("Обработка числа: {}", num);
    });
}

We perform the function of each element of the vector in parallel.

Suppose we have a large vector of integers. We need to perform the following steps:

  1. Filter the numbers leaving only those that are divisible by 3.

  2. Square each filtered number.

  3. Calculate the sum of all squares.

use rayon::prelude::*;

fn main() {
    // Создаем большой вектор данных
    let data: Vec<i32> = (1..=1000000).collect();

    // Выполняем обработку данных в параллельном режиме
    let result: i32 = data
        .par_iter() // Используем параллельный итератор
        .filter(|&&x| x % 3 == 0) // Фильтруем числа, делящиеся на 3
        .map(|&x| x * x) // Возводим каждое число в квадрат
        .sum(); // Суммируем все квадраты

    println!("Сумма квадратов чисел, делящихся на 3: {}", result);
}

We create a vector datacontaining numbers from 1 to 1,000,000 by Yuzaimpar_iter()to convert a regular iterator into a parallel one that allows you to process data in multiple threads. WITH filter select those numbers that are divisible by 3. map is used to square each filtered number. sum sums up all the squares of the resulting numbers, giving us the final result.

A bit about asynchronous programming in Rust

Asynchronous programming in Rust is based on the event loop model, where the program executes code in response to various events (such as the completion of I/O operations). This allows the program to process multiple tasks simultaneously without creating a separate thread for each.

Future Rust is the basic building block of asynchronous programming. Future is an abstraction over an operation that may be completed in the future. It can be in one of three states:

  1. Pending: not yet completed.

  2. Ready: The operation is complete and the result is available.

  3. Cancelled: the operation was cancelled

When Future is created, it starts in the state Pending and goes into the state Readywhen the operation completes. Future does not perform any work by itself; an asynchronous executor is required for its execution.

Asynchronous executor is the component that controls the execution Future. It is responsible for starting asynchronous tasks and pushing them to completion. In Rust, there are various libraries that provide executors, for example tokio and async-std. This is, of course, a completely separate topic tokio,we will analyze in future articles. For now, here are a few examples:

Let’s create two asynchronous functions and connect them using a method thenwhich allows you to create a chain Future.

use futures::future::FutureExt; // Для доступа к методу `then`
use tokio;

async fn compute_length(input: &str) -> usize {
    input.len()
}

async fn report_length(length: usize) {
    println!("Длина строки: {}", length);
}

#[tokio::main]
async fn main() {
    let input = "Hello, world!";
    compute_length(input)
        .then(|length| report_length(length))
        .await;
}

compute_length first calculates the length of the string and then report_length outputs this length. then is used to chain these two operations.

There is in Tokyoasync move block that allows you to capture variables from the environment:

use tokio;

#[tokio::main]
async fn main() {
    let my_string = String::from("Hello, async world!");
    tokio::spawn(async move {
        println!("{}", my_string);
    }).await.unwrap();
}

my_string is moved to the async block and we use tokio::spawn to run this asynchronous task.

Macro join! allows you to wait for several to complete at the same time Future:

use tokio;

async fn task_one() -> String {
    "Результат задачи один".to_string()
}

async fn task_two() -> String {
    "Результат задачи два".to_string()
}

#[tokio::main]
async fn main() {
    let (result_one, result_two) = tokio::join!(task_one(), task_two());
    println!("{}, {}", result_one, result_two);
}

task_one and task_two are performed in parallel, and join! waiting for both tasks to complete.

Macro select! lets wait a few Future and continue execution from the one that ends first:

use tokio::time::{delay_for, Duration};
use tokio::select;

async fn task_slow() -> &'static str {
    delay_for(Duration::from_secs(5)).await;
    "Медленная задача завершена"
}

async fn task_fast() -> &'static str {
    delay_for(Duration::from_secs(1)).await;
    "Быстрая задача завершена"
}

#[tokio::main]
async fn main() {
    select! {
        result = task_slow() => {
            println!("{}", result);
        }
        result = task_fast() => {
            println!("{}", result);
        }
    };
}

task_slow and task_fast are performed simultaneously, but select! allows you to continue running the program immediately after the fastest task is completed


Overall, Rust offers one of the safest and at the same time interesting ways to work with multithreading in my opinion, thanks to its system of types and properties.

Continuing the topic, I want to recommend free webinars from market experts about safe unsafe Rust and how Rust encourages the use of composition.

Related posts