Перетворюємо наш однопотоковий сервер на багатопотоковий
Зараз наш сервер обробляє кожен запит по черзі, тобто він не обробить друге з'єднання, поки не завершить обробку першого. Якщо сервер отримує більше і більше запитів, це послідовне виконання буде все менш і менш оптимальним. Якщо сервер отримує запит, що обробляється довгий час, наступні запити муситимуть чекати, доки довгий запит не буде завершено, навіть якщо нові запити можна обробити швидко. Нам потрібно буде виправити це, але спершу ми поглянемо на цю проблему в дії.
Симуляція повільного запиту в поточній реалізації сервера
Ми подивимося на те, як запит з повільною обробкою може вплинути на інші запити, зроблені до нашої поточної реалізації сервера. Блок коду 20-10 реалізує обробку запиту до /sleep з симуляцією повільної реакції, що заблокує сервер у режимі сну на 5 секунд до відповіді.
Файл: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Тепер ми перейшли з if
на match
, бо у нас є три випадки. Ми маємо явно зіставляти слайс з request_line
із шаблоном зі стрічковими літералами; match
не робить автоматичних посилань і розіменувань, як методи порівняння на рівність.
Перший рукав той же, що й у блоці if
з Блоку коду 20-9. Другий рукав зіставляє запит зі /sleep. Коли цей запит отримано, сервер спатиме 5 секунд перед передачею успішної HTML сторінки. Третій рукав той же, що й у блоці else
з Блоку коду 20-9.
Ви можете побачити, наскільки примітивними є наш сервер: справжні бібліотеки оброблять розпізнавання кількох запитів у набагато менш розлогий спосіб!
Запустімо сервер командою cargo run
. Тоді відкрийте два вікна браузера: одне для http://127.0.0.1:7878/, а інше - для http://127.0.0.1:7878/sleep. Якщо ви введете URL / кілька разів, то, як і раніше, ви побачите, що відповідь надходить швидко. Але якщо ви введете /sleep, а потім завантажте /, ви побачите, що / чекає, доки sleep
"проспить" 5 секунд до завантаження.
Існує безліч методів, якими ми могли б скористатися, щоб уникнути гальмування через повільний запит; те, що ми реалізуємо - це пул потоків.
Поліпшення пропускної здатності за допомогою пулу потоків
Пул потоків - це група породжених потоків, що чекають і готові до обробки завдання. Коли програма отримує нове завдання, то призначає один із потоків з пулу на це завдання, і цей потік обробляє завдання. Решта потоків у пулі доступні, щоб обробити будь-яке інше завдання, що надійде, поки перший потік зайнятий обробкою. Коли перший потік завершить обробку свого завдання, то повернеться до пулу незайнятих потоків, готовий обробляти нове завдання. Пул потоків дозволяє вам обробляти з'єднання конкурентно, збільшуючи пропускну здатність вашого сервера.
Ми обмежимо кількість потоків в пулі невеликим числом, щоб захистити нас від атак на відмову в обслуговуванні (Denial of Service, DoS); якби наша програма створювала по потоку на кожен вхідний запит, то хтось, створивши 10 мільйонів запитів до нашого сервера, може обвалити його, вичерпавши всі його ресурси та призвівши до повної зупинки обробки запитів.
Замість необмеженого породження потоків ми матимемо фіксовану кількість потоків, що чекатимуть у пулі. Вхідні запити надсилатимуться в пул для обробки. Пул підтримуватиме чергу вхідних запитів. Кожен з потоків у пулі братиме запит з цієї черги, оброблятиме його і запитуватиме наступний запит з черги. З таким дизайном ми можемо обробити до N
запитів конкурентно, де N
є кількістю потоків. Якщо кожен потік відповідатиме на довгий запит, наступні запити все ж накопичуватиметься в черзі, але ми збільшили кількість довгих запитів, які ми можемо обробити до досягнення цього моменту.
Ця техніка - лише один із багатьох способів покращити пропускну здатність вебсервера. Інші варіанти, які ви можете дослідити, включають модель fork/join, однопотокова модель асинхронного I/O та *багатопотокова модель асинхронного I/O *. Якщо ви зацікавилися цією темою, то можете прочитати більше про інші рішення і спробувати реалізувати їх; усі ці варіанти доступні низькорівневій мові на кшталт Rust.
Перед тим, як почати реалізовувати пул тредів, поговоримо про те, як має виглядати його використання. Коли ви намагаєтеся проєктувати код, написання спершу клієнтського інтерфейсу може допомогти керувати проєктуванням. Напишіть API коду, щоб він був структурованим відносно способу його виклику; тоді реалізуйте функціональність відповідно до цієї структури, а не спершу реалізуйте функціональність, а тоді проєктуйте публічний API.
Подібно до того, як ми використовували керовану тестами розробку у проєкті з Розділу 12, ми використаємо тут керовану компілятором розробку. Ми напишемо код, що викликає потрібні нам функції, а потім ми подивимося на помилки компілятора, щоб вирішити, що ми маємо далі змінити, щоб цей код працював. Але перед цим ми дослідимо техніку, яку ми не збираємося використовувати, як відправну точку.
Породження потоку для кожного запиту
Спершу дослідимо, як наш код міг би виглядати, якби створював новий потік для кожного з'єднання. Як зазначено раніше, ми не плануємо так робити через проблеми з потенційним породженням нескінченої кількості потоків, але це вихідна точка, щоб спершу отримати робочий багатопотоковий сервер. Тоді ми покращимо код, додавши пул потоків, і відмінності між двома рішеннями стануть очевиднішими. Блок коду 20-11 показує зміни, які треба внести, щоб main
породжував новий потік для обробки кожного потоку у циклі for
.
Файл: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Як ви дізналися у Розділі 16, thread::spawn
створить новий потік, а потім запустить код у замиканні в цьому новому потоці. Якщо ви запустите цей код і завантажите в браузері /sleep, а тоді / у двох додаткових вкладках браузера, ви й справді побачите, що запит до / не мусить чекати, доки не завершиться /sleep. Однак, як ми згадували, це кінець-кінцем перенавантажить систему, бо нові потоки створюються без будь-яких обмежень.
Створення скінченної кількості потоків
Ми хочемо, щоб наш пул потоків працював у схожий, знайомий спосіб, щоб перехід з потоків до пулу потоків не вимагав значних змін у коді, що використовує наш API. Блок коду 20-12 показує гіпотетичний інтерфейс для структури ThreadPool
, яку ми хочемо використати замість thread::spawn
.
Файл: src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Ми використовуємо ThreadPool::new
для створення нового пулу потоків налаштовуваним числом потоків, у цьому випадку чотирма. Тоді, в циклі for
циклу, pool.execute
має інтерфейс, подібний до thread::spawn
у тому, що він для кожного вхідного потоку приймає замикання, яке пул має виконати. Ми маємо реалізувати pool.execute
так, щоб він приймав замикання і передавав його треду в пулі на виконання. Цей код ще не компілюється, але ми спробуємо це зробити, щоб компілятор міг підказати, як це виправити.
Збірка ThreadPool
за допомогою керованої компілятором розробки
Внесіть зміни з Блоку коду 20-12 до src/main.rs, а потім використаймо помилки компілятора з cargo check
для керування розробкою. Ось яку першу помилку ми отримуємо:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error
Чудово! Ця помилка говорить, що нам потрібен тип чи модуль ThreadPool
, тож ми його створимо. Наша реалізація ThreadPool
буде незалежною від виду роботи, що її виконує наш вебсервер. Отже, переробимо крейт hello
з двійкового крейта на бібліотеку, де міститиметься наша реалізація ThreadPool
. Після перероблення на бібліотечний крейт ми також могли б використати окрему бібліотеку пулу потоків для будь-якої роботи, яку ми хочемо виконати за допомогою пулу потоків, а не лише для обслуговування вебзапитів.
Створіть src/lib.rs, що містить найпростіше визначення структури ThreadPool
, яке ми можемо наразі мати:
Файл: src/lib.rs
pub struct ThreadPool;
Далі відредагуйте файл main.rs, щоб ввести ThreadPool
до області видимості з бібліотечного крейта, додавши наступний код зверху src/main.rs:
Файл: src/lib.rs
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Цей код все ще не працює, але перевірмо його ще раз, щоб отримати наступну помилку, над якою нам потрібно буде працювати:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error
Ця помилка означає, що нам необхідно створити для ThreadPool
асоційовану функцію з назвою new
. Ми також знаємо, що new
повинна мати один параметр, який може прийняти 4
як аргумент і має повернути екземпляр ThreadPool
. Реалізуймо найпростішу функцію new
, що матиме такі характеристики:
Файл: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
Ми обрали типом параметра size
тип usize
, бо ми знаємо що від'ємна кількість потоків не має сенсу. Ми також знаємо, що ми використаємо 4 як число елементів у колекції потоків, а це саме те, для чого призначений тип usize
, як говорилося у підрозділі “Цілі типи” Розділу 3.
Ще раз перевіримо код:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| ^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error
Тепер стається помилка, бо ми не маємо методу execute
на ThreadPool
. Згадайте з підрозділу "Створення скінченної кількості потоків" , що ми вирішили, що інтерфейс нашого пула тредів має бути схожим на thread::spawn
. На додачу ми реалізуємо функцію execute
, щоб приймала передане їй замикання і передавало її вільному потоку з пула на виконання.
Ми визначимо метод execute
для ThreadPool
так, щоб він приймав параметром замикання. Згадайте з підрозділу “Переміщення захоплених значень із замикання та трейти Fn
” Розділу 13, що ми можемо приймати замикання параметрами за допомогою трьох різних трейтів: Fn
, FnMut
і FnOnce
. Ми маємо вирішити, який тип замикань використовується тут. Ми знаємо, що в результаті вийде щось схоже на реалізацію thread::spawn
зі стандартної бібліотеки, тож можемо подивитися на обмеження на параметр з сигнатури thread::spawn
. Документація показує нам таке:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Тип-параметр F
- це те, що нас тут цікавить; тип-параметр T
стосується значення, що повертається, і він нас не цікавить. Ми бачимо, що spawn
використовує FnOnce
як обмеження трейту для F
. Ймовірно, це те саме, що нам треба, тому що ми зрештою передамо аргумент, який отримали у execute
, до spawn
. Ми можемо бути впевнені, що FnOnce
- це трейт, який ми хочемо використовувати, оскільки потік для виконання запиту виконає замикання цього запиту тільки один раз, що відповідає Once
у FnOnce
.
Тип-параметр F
також має трейтове обмеження Send
і обмеження часу Існування 'static
, що є корисним у нашій ситуації: нам потрібен Send
, щоб передавати замикання від одного потоку до іншого, і 'static
, бо ми не знаємо, скільки часу виконуватиметься потік. Створімо метод execute
для ThreadPool
, що прийматиме узагальнений параметр типу F
із цими обмеженнями:
Файл: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Ми все ще використовуємо ()
після FnOnce
, бо FnOnce
представляє замикання, що не приймає параметрів і повертає одиничний тип ()
. Як і у визначеннях функцій, тип, що повертається, можна не вказувати у сигнатурі, але навіть якщо ми не маємо параметрів, то все одно потребуємо дужки.
Знову ж таки, це найпростіша реалізація методу execute
: вона не робить нічого, але ми намагаємося лише змусити наш код компілюватися. Ще раз перевіримо:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 0.24s
Компілюється! Але зверніть увагу, що якщо ви спробуєте запустити cargo run
і зробити запит у браузері, то побачите в браузері помилки, які ми вже бачили на початку розділу. Наша бібліотека ще не викликає замикання, передане до execute
!
Примітка: ви могли чути, що про мови з жорсткими компіляторами, такими як Haskell and Rust, кажуть "якщо код компілюється, то він працює." Але це твердження не завжди правильне. Наш проєкт компілюється, але абсолютно нічого не робить! Якби ми збирали реальний, повний проєкт, це був би вдалий час почати написати юніт-тести, щоб перевірити, що код компілюється і має бажану поведінку.
Валідація числа потоків у new
Ми ще нічого не робимо з параметрами new
та execute
. Реалізуймо тіла цих функцій з бажаною для нас поведінкою. Для початку, подумаємо про new
. Раніше ми вибрали беззнаковий тип для параметра size
, бо пул з від'ємним числом потоків не має сенсу. Однак пул з нулем потоків також не має жодного сенсу, проте нуль є абсолютно валідним usize
. Ми додамо код, щоб перевірити, чи size
є більшим, ніж нуль, перш ніж повертати екземпляр ThreadPool
і змусимо програму паніку якщо вона отримує нуль, використовуючи макрос assert!
, як показано в Блоці коду 20-13.
Файл: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Ми також додали трохи документації до нашого ThreadPool
документаційним коментарем. Зверніть увагу, що ми слідували за хорошими практиками документації, додавши розділ, який описує ситуації, в яких наша функція може панікувати, як обговорювалося в Розділі 14. Спробуйте запустити cargo doc --open
і натисніть на структуру ThreadPool
, щоб побачити як виглядає документація для new
!
Замість додавати макрос assert!
, як ми зробили тут, ми могли б змінити new
на build
і повертати Result
, як ми робили з Config::build
у проєкті I/O з Блоку коду 12-9. Але ми вирішили, що в цьому випадку створити пул потоків без жодного потоку має бути невиправною помилкою. Якщо ви почуваєтеся амбітним, спробуйте написати функцію, що зветься build
, щоб порівняти з функцією new
, з такою сигнатурою:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
Створення місця для зберігання потоків
Тепер, коли ми можемо переконатися, що маємо валідну кількість потоків для зберігання в пулі, ми можемо створити ці потоки і зберегти їх у структурі ThreadPool
перед тим, як її повертати. Але як нам "зберегти" потік? Ще раз погляньмо на сигнатуру thread::spawn
:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Функція spawn
повертає JoinHandle<T>
, де T
- тип, що повертає замикання. Спробуймо також використати JoinHandle
і побачимо, що вийде. У нашому випадку, замикання, які ми передаємо до пулу потоків, будуть обробляти з'єднання, нічого не повертаючи, так що T
буде одинчним типом ()
.
Код у Блоці коду 20-14 скомпілюється, але ще не створює жодних потоків. Ми змінили визначення ThreadPool
, додавши в нього вектор екземплярів thread::JoinHandle<()>
, ініціалізували цей вектор об'ємом size
, організували цикл for
, який виконуватиме певний код для створення потоків, та повернули екземпляр ThreadPool
, що містить їх.
Файл: src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Ми ввели до області видимості std::thread
з бібліотечного крейта, бо ми використовуємо thread::JoinHandle
як тип елементів у векторі у ThreadPool
.
Коли отримано валідний розмір, наш ThreadPool
створює новий вектор, що може містити size
елементів. Функція with_capacity
виконує те саме завдання, що й Vec::new
, але з важливою відмінністю: вона наперед виділяє місце у векторі. Оскільки ми знаємо, що нам потрібно зберігати size
елементів у векторі, цей розподіл наперед є дещо ефективнішим, ніж використання Vec::new
, який змінює розмір при вставленні елементів.
Коли ви знову запустите cargo check
, він має відпрацювати успішно.
Структура Worker
, відповідальна за пересилання коду з ThreadPool
до потоку
Ми залишили коментар у циклі for
у Блоці коду 20-14 про створення потоків. Тут ми розберемо, як насправді створювати потоки. Стандартна бібліотека уможливлює створення потоків через thread::spawn
, який очікує отримати якийсь код, який потік має запустити, щойно його було створено. Однак у нашому випадку ми хочемо створити потоки, що очікують на код, який ми надішлемо пізніше. Реалізація зі стандартної бібліотеки не надає жодного способу це зробити; ми маємо реалізувати його вручну.
Ми реалізуємо таку поведінку, впровадивши нову структуру даних між ThreadPool
і потоками, що оброблятиме цю поведінку. Ми назвемо цю структуру даних Worker, що є звичним терміном у реалізації пула. Worker бере код, який потрібно запустити і запускає його в своєму потоці. Уявіть працівників кухні в ресторані: вони чекають, поки не прийде замовлення від клієнтів, і тоді вони відповідають за прийняття цих замовлень і їхнє виконання.
Замість зберігання вектора екземплярів JoinHandle<()>
у пулі потоків, Ми зберігатимемо екземпляри структуриWorker
. Кожен Worker
зберігатиме один екземпляр JoinHandle<()>
. Тоді ми реалізуємо метод для Worker
, який прийматиме замикання з кодом для запуску і відправлятиме його в уже робочий потік на виконання. Також ми надамо кожному worker id
, щоб ми могли розрізняти різних worker в пулі для журналювання або налагодження.
Ось новий процес, що відбудеться, коли ми створимо ThreadPool
. Ми реалізуємо код, який відправляє замикання в потік після того, як в нас вже є Worker
, налаштований таким чином:
- Визначимо структуру
Worker
, яка міститьid
іJoinHandle<()>
. - Змінимо
ThreadPool
, щоб містив вектор екземплярівWorker
. - Визначимо функцію
Worker::new
, що приймає номерid
і повертає екземплярWorker
, що міститьid
та потік, породжений із порожнім замиканням. - У
ThreadPool::new
ми використовуємо лічильник циклуfor
, щоб згенеруватиid
, створити новогоWorker
з цимid
, і зберегти worker у векторі.
If you’re up for a challenge, try implementing these changes on your own before looking at the code in Listing 20-15.
Готові? Ось Блок коду 20-15 з одним із можливих способів зробити описані зміни.
Файл: src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
Ми змінили назву поля у ThreadPool
з threads
на workers
, бо воно тепер містить екземпляри Worker
замість JoinHandle<()>
. Ми використовуємо лічильник циклу for
циклі як аргумент Worker::new
, і зберігаємо кожен новий Worker
у векторі під назвою workers
.
Зовнішній код (скажімо, наш сервер з src/main.rs) не має знати деталей реалізації стосовно використання структури Worker
у ThreadPool
, тож ми робимо структуру Worker
і її функцію new
приватними. Функція Worker::new
використовує id
, що ми їй передаємо, і зберігає екземпляр JoinHandle<()>
, створений породженням нового потоку за допомогою порожнього замикання.
Примітка: якщо операційна система не може створити потік через нестачу системних ресурсів,
thread::spawn
панікуватиме. Це призведе до паніки усього нашого сервера, навіть якщо створення деяких потоків і буде вдалим. Заради простоти ця поведінка прийнятна, але у виробничій реалізації пулу потоків ви, швидше за все, захочете скористатисяstd::thread::Builder
і його методомspawn
, що повертає натомістьResult
.
Цей код скомпілюється і зберігатиме кількість екземплярів Worker
, яку ми передали як аргумент для ThreadPool::new
. Але ми все ще не обробляємо замикання, які ми отримали у execute
. Подивімося, як це зробити, далі.
Надсилання запитів до потоків через канали
Наступна проблема, якою ми займемося, полягає в тому, що замикання, передані thread::spawn
, не роблять абсолютно нічого. Наразі ми отримуємо замикання, що хочемо виконати, у методі execute
. Але ми маємо передати до thread::spawn
якесь замикання, коли ми створюємо кожного Worker
при створенні ThreadPool
.
Ми хочемо, щоб щойно створені структури Worker
отримували код з черги, що міститься в ThreadPool
, і відправляли цей код у свій потік на виконання.
Канали, про які ми дізналися у Розділі 16 — простий спосіб спілкування між двома потоками — ідеально підходять для цього випадку. Ми скористаємося каналом як чергою завдань, і execute
відправить завдання з ThreadPool
до екземплярів Worker
, які перешлють завдання до своїх потоків. Ось наш план:
ThreadPool
створить канал і утримуватиме відправника.- Кожен
Worker
утримуватиме отримувача. - Ми створимо нову структуру
Job
, що міститиме замикання, що їх ми хочемо відправити каналом. - Метод
execute
відправить завдання, яке треба виконати, через відправника. - У своєму потоці
Worker
буде в циклі запитувати свого отримувача і виконувати замикання з отриманих завдань.
Почнімо з створення каналу в ThreadPool::new
та утримання відправника у екземплярі ThreadPool
, як показано у Блоці коду 20-16. Структура Job
наразі не містить нічого, але буде типом елементів, що їх ми відправляємо каналом.
Ім'я файлу: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
У ThreadPool::new
ми створюємо новий канал і пул тепер містить відправника. Це успішно компілюється.
Спробуймо передати отримувача каналу усім worker, коли пул потоків створює канал. Ми знаємо, що хотіли б використати приймач у потоці, породженому worker, тож ми посилатимемося на параметр receiver
у замиканні. Код у Блоці коду 20-17 поки що не компілюється.
Файл: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Ми зробили деякі дрібні і очевидні зміни: ми передаємо приймач до Worker::new
, а потім використовуємо його всередині замикання.
Коли ми спробуємо перевірити цей код, то отримаємо таку помилку:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error
Код намагається передати receiver
кільком екземплярам Worker
. Так не виходить, бо, як ви пам'ятаєте з Розділу 16, реалізація каналу в Rust має багатьох виробників і одного споживача. Це означає, що ми не можемо просто клонувати споживацький вихід каналу, щоб виправити код. Також ми не хочемо надсилати повідомлення кілька разів декільком споживачам; ми хочемо єдиниц список повідомлень з декількома worker, таким чином, щоб кожне повідомлення було оброблене один раз.
На додачу, приймання завдання з черги в каналі включає зміну receiver
, тож потокам потрібен безпечний спосіб спільно використовувати та змінювати receiver
; інакше ми можемо отримати стан гонитви (як розповідалося в Розділі 16).
Згадайте потокобезпечні розумні вказівники, про які йшлося в Розділі 16: щоб розділити володіння між кількома потоками і дозволити потокам змінювати значення, нам треба було скористатися Arc<Mutex<T>>
. Тип Arc
дозволить кільком worker володіти приймачем, а Mutex
гарантує, що лише один worker отримує завдання з приймача за раз. Блок коду 20-18 показує зміни, які ми маємо зробити.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
У ThreadPool::new
, ми розміщуємо приймач у Arc
і Mutex
. Для кожного нового worker ми клонуємо Arc
, щоб збільшити лічильник посилань, щоб worker могли спільно володіти приймачем.
З цими змінами, код компілюється! Ми вже близько!
Реалізація методу execute
Нарешті реалізуймо метод execute
для ThreadPool
. Також ми змінимо Job
зі структури на псевдонім типу для трейтового об'єкта, який містить тип замикання, яку приймає execute
. Як уже говорилося в підрозділі “Створення типів-синонімів за допомогою псевдонімів типів”
Розділу 19, псевдоніми типів дозволяють нам скорочувати довгі типи для простоти використання. Подивіться на Блок коду 20-19.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Після створення нового екземпляра Job
за допомогою замикання, що ми отримали в execute
, ми підправляємо це завдання через вхід каналу. Ми викликаємо unwrap
для send
на випадок, якщо відправлення буде невдалим. Це може статися якщо, наприклад, ми зупинимо всі потоки, тобто вихід каналу припинить отримувати нові повідомлення. На цей час ми не можемо зупинити наші потоки: вони продовжують виконуватись, доки пул існує. Причина, чому ми використовуємо unwrap
, полягає в тому, що ми знаємо, що невдача тут неможлива, але компілятор цього не знає.
Та ми ще не зовсім закінчили! У worker наше замикання, що передається до thread::spawn
, лише посилається на вихід каналу. Натомість нам треба, щоб замикання у вічному циклі отримувало з вихідного кінця каналу завдання і після отримання виконувало його. Зробімо зміни, показані в Блоці коду 20-20, у Worker::new
.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
Тут ми спершу викликаємо lock
для receiver
, щоб отримати м'ютекс, а потім викликаємо unwrap
для паніки, якщо сталася якась помилка. Здійснення блокування може призвести до невдачі, якщо м'ютекс знаходиться у стані poisoned, що може статися, якщо якийсь інший потік запанікував, поки утримував блокування, а не відпустив його. У цій ситуації виклик unwrap
для паніки є правильною дією. Можете за бажання змінити unwrap
на expect
зі змістовним для вас повідомленням про помилку.
Якщо ми отримали блокування м'ютекса, то викличемо recv
, щоб отримати Job
з каналу. Останній unwrap
також покриває всі помилки, що могли виникнути якщо потік, що утримує відправника, завершився, так само як метод send
повертає Err
, якщо отримувач завершився.
Виклик recv
блокує, тож якщо завдань немає, поточний потік чекатиме, доки не з'явиться доступне завдання. Mutex<T>
гарантує, що лише один потік worker
за раз намагатиметься отримати завдання.
Наш пул потоків нарешті у робочому стані! Виконайте cargo run
і зробіть кілька запитів:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Успіх! Тепер у нас є пул потоків, який виконує з'єднання асинхронно. Також ніколи не буде створено більше ніж чотири потоки, тому наша система не перенавантажиться, якщо сервер отримає забагато запитів. Якщо ми робимо запит до /sleep, сервер буде мати можливість обслуговувати інші запити, бо їх виконувати буде інший потік.
Примітка: якщо ви відкриєте /sleep в декількох вікнах браузера одночасно, вони можуть вантажитися по одному з 5-секундним інтервалом. Деякі веббраузери виконують кілька екземплярів одного запиту послідовно для потреб кешування. Це обмеження не викликане нашим вебсервером.
Після вивчення циклу while let
у Розділі 18, ви можете поцікавитися, чому ми не написали код потоку worker, як показано в Блоці коду 20-21.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Цей код компілюється і запускається, але не дає бажаного багатопотокового результату: повільний запит усе ще змушує інші потоки чекати на обробку. Причина дещо тонка: структура Mutex
не має публічного методу unlock
, тому що володіння блокуванням базується на часі існування MutexGuard<T>
у LockResult<MutexGuard<T>>
, повернутим методом lock
. Під час компіляції borrow checker може гарантувати правило, що ресурс, захищений Mutex
, не буде доступним, якщо ми не маємо блокування. Однак ця реалізація також призведе до того, що блокування буде утримуватися довше, ніж потрібно, якщо ми не пам'ятаємо про час існування MutexGuard<T>
.
Код у Блоці коду 20-20, що робить let job = receiver.lock().unwrap().recv().unwrap();
, працює, бо в let
будь-які тимчасові значення, використані у правій стороні знаку рівності, негайно очищуються, коли завершується інструкція let
. Проте, while let
(і if let
та match
) не очищують тимчасові значення до кінця відповідного блоку. У Блоці коду 20-21 блокування утримується на час виклику job()
, тобто інші worker не можуть отримувати завдання.
ch19-04-advanced-types.html#creating-type-synonyms-with-type-aliases ch13-01-closures.html#moving-captured-values-out-of-the-closure-and-the-fn-traits