Параллельное программирование на С++ в действии. Практика разработки многопоточных программ | страница 34
В листинге 2.8 приведена простая реализация параллельной версии >std::accumulate
. Она распределяет работу между несколькими потоками и, чтобы не создавать слишком много потоков, задает ограничение снизу на количество элементов, обрабатываемых одним потоком. Отмстим, что в этой реализации предполагается, что ни одна операция не возбуждает исключений, хотя в принципе исключения возможны; например, конструктор >std::thread
возбуждает исключение, если не может создать новый поток. Но если добавить в этот алгоритм обработку исключений, он перестанет быть таким простым; эту тему мы рассмотрим в главе 8.
Листинг 2.8. Наивная реализация параллельной версии алгоритма >std::accumulate
>template
> struct accumulate_block {
> void operator()(Iterator first, Iterator last, T& result) {
> result = std::accumulate(first, last, result);
> }
>};
>template
>T parallel_accumulate(Iterator first, Iterator last, T init) {
> unsigned long const length = std::distance(first, last);
> if (!length) ←
(1)
> return init;
> unsigned long const min_per_thread = 25;
> unsigned long const max_threads =
> (length+min_per_thread - 1) / min_per_thread; ←
(2)
> unsigned long const hardware_threads =
> std::thread::hardware_concurrency();
> unsigned long const num_threads = ←
(3)
> std::min(
> hardware.threads != 0 ? hardware_threads : 2, max_threads);
> unsigned long const block_size = length / num_threads; ←
(4)
> std::vector
> std::vector
(5)
> Iterator block_start = first;
> for(unsigned long i = 0; i < (num_threads - 1); ++i) {
> Iterator block_end = block_start;
> std::advance(block_end, block_size); ←
(6)
> threads[i] = std::thread( ←
(7)
> accumulate_block
> block_start, block_end, std::ref(results(i)));
> block_start = block_end; ←
(8)
> }
> accumulate_block()(
> block_start, last, results[num_threads-1]); ←
(9)
> std::for_each(threads.begin(), threads.end(),
> std::mem_fn(&std::thread::join)); ←
(10)
> return
> std::accumulate(results.begin(), results.end(), init); ←
(11)
>}
Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение >init
. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).
Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.