Перейти к основному содержимому

Потоки и синхронные каналы в Rust

Разработчику Архитектору

Потоки и синхронные каналы в Rust

О чём эта глава

Поток ОС (std::thread) — отдельная «дорожка» выполнения с собственным стеком. Потоки могут работать параллельно на разных ядрах процессора. Это отличается от async (отдельная глава): там много задач по очереди отдают управление runtime, пока одна ждёт диск или сеть.

В Rust оба подхода допустимы. Компилятор через трейты Send и Sync запрещает передавать между потоками типы, для которых это небезопасно — так в safe-коде отсекают типичные data race (гонки данных).

База владения: типы и владение. Общие примитивы: важные трейты.


Поток и задача — короткая аналогия

ПонятиеАналогия
ПроцессОтдельная программа с памятью
ПотокРабочий внутри процесса; потоки делят память процесса
async-задачаЗапись в планировщике: «когда сокет готов — продолжить эту функцию»

Один HTTP-сервер на Tokio может обслуживать тысячи соединений на небольшом числе потоков. Тяжёлый расчёт на всех ядрах CPU лучше разнести по std::thread или библиотеке rayon.


Когда потоки, когда async

СценарийПодход
Много ожидания сети или диска в одном сервисеasync + Tokio
CPU-bound расчёты на всех ядрахstd::thread или rayon
Фоновая работа из обычного sync-кода (CLI, часть GUI)thread::spawn
Блокирующая библиотека внутри async-сервераtokio::task::spawn_blocking

Async снижает стоимость ожидания I/O — пока один запрос ждёт ответа сети, runtime переключается на другой. Async не ускоряет чистую математику на CPU: для этого нужны потоки или пул rayon.


Запуск потока и join

use std::thread;
use std::time::Duration;

fn main() {
let handle = thread::spawn(|| {
thread::sleep(Duration::from_millis(100));
42
});

let answer = handle.join().expect("поток завершился с паникой");
println!("{answer}");
}

Разбор:

  • thread::spawn(closure) — создаёт поток и сразу начинает выполнять замыкание.
  • Замыкание || { ... } возвращает 42 — это значение попадёт в join.
  • handle.join() — главный поток ждёт завершения дочернего. Тип: Result<T, Box<dyn Any + Send>>. Если внутри потока была panic, join вернёт Err.
  • thread::sleep — блокирует только этот поток ОС на 100 мс.

Передача данных в поток — move

По умолчанию замыкание заимствует переменные из окружения. Для передачи владения в другой поток нужно move:

let data = vec![1, 2, 3];
thread::spawn(move || {
println!("{:?}", data);
});
// здесь `data` уже нельзя использовать — владение ушло в поток

Компилятор проверит, что data реализует Send. Если бы это был Rc (счётчик ссылок без атомарности), spawn выдал бы ошибку: Rc нельзя безопасно делить между потоками.


Канал mpsc

mpsc = multi-producer, single-consumer — много отправителей, один получатель. Стандартный модуль: std::sync::mpsc.

Идея: потоки передают владение сообщением по очереди (как конвейер) — у получателя одна копия данных, без совместной записи в общую структуру.

use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
tx.send("готово".to_string()).unwrap();
});

match rx.recv() {
Ok(msg) => println!("{msg}"),
Err(_) => println!("отправитель отключился"),
}
}
ИмяРоль
txSender — конец «отправки»
rxReceiver — конец «приёма»
send(value)Перемещает value в очередь; блокируется, если буфер переполнен (у channel() буфер неограничен)
recv()Блокирующее чтение; ждёт сообщение

Дополнительные возможности:

  • tx.clone() — второй отправитель в другом потоке.
  • recv_timeout(dur) — ждать не дольше заданного времени.
  • try_recv() — проверить очередь без долгого ожидания (удобно в цикле событий).
  • rx.iter() — итератор: каждый элемент — следующее сообщение, пока все Sender не уничтожены.

Сообщениями передают String, Vec, структуры — всё, что владеет данными целиком. Сырые указатели без договорённостей между потоками в учебном safe-коде не используют.


Общее изменяемое состояние — Arc + Mutex

Каналы хороши, когда владение переезжает от потока к потоку. Если нужен общий счётчик или кэш, применяют разделяемое состояние:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..4 {
let c = Arc::clone(&counter);
handles.push(thread::spawn(move || {
let mut n = c.lock().unwrap();
*n += 1;
}));
}

for h in handles {
h.join().unwrap();
}
println!("{}", *counter.lock().unwrap());
}
ТипЗачем
Mutex<T>В каждый момент только один поток держит &mut T (эксклюзивный доступ).
Arc<Mutex<T>>Arc — атомарный счётчик ссылок, можно клонировать в несколько spawn; Mutex — защита данных.
lock().unwrap()Захват; при панике в другом потоке с удержанным lock вернётся PoisonError (редко).
*n += 1Разыменование MutexGuard как изменяемой ссылки на число.

Итог печатает 4: четыре потока по разу увеличили счётчик.

RwLock — много читателей или один писатель (удобно для кэша). AtomicUsize — счётчик без блокировки для простых операций += 1.


Send и Sync — что проверяет компилятор

ТрейтСмысл
Sendзначение можно переместить в другой поток
Syncна &T можно смотреть из нескольких потоков одновременно (ссылка &T сама Send)

Примеры:

  • i32, String, Vec<T> (если T: Send) — Send.
  • Rc<T>не Send: счётчик ссылок без атомарных операций.
  • Arc<Mutex<T>>Send, если T: Send.

Ошибка компилятора вида Rc<dyn Fn()> cannot be sent between threads — защита: вы пытались разделить тип, не предназначенный для многопоточности.


Потоки внутри async (Tokio)

В async fn нельзя долго блокировать worker runtime — остальные запросы на этом потоке встанут.

// в async-обработчике — блокирует worker
std::thread::sleep(std::time::Duration::from_secs(1));

// корректная пауза в async
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

Синхронная библиотека (сжатие, старый SQLite, блокирующий HTTP):

let result = tokio::task::spawn_blocking(|| heavy_sync_work())
.await
.expect("join spawn_blocking")?;

spawn_blocking берёт поток из отдельного пула; основной runtime продолжает обслуживать I/O. Тяжёлую работу выносите туда осознанно, пул ограничен.


Типичные ошибки

  1. Дедлок — поток A ждёт lock X, поток B ждёт lock Y, каждый держит другой lock. Решение: один порядок захвата, меньше вложенных Mutex, чаще каналы.
  2. Долгий I/O под Mutex — остальные потоки простаивают. Копируйте нужные данные, отпускайте guard, затем сетевой вызов.
  3. thread::sleep в async — «замораживает» воркер Tokio.
  4. Паника в spawn без join — ошибка теряется; в серверах логируют результат join или используют обёртки.

Связанные материалы


См. также

Другие статьи этого же раздела в боковом меню (как на странице «О разделе»).