Параллельное программирование на С++ в действии. Практика разработки многопоточных программ | страница 34



В листинге 2.8 приведена простая реализация параллельной версии >std::accumulate. Она распределяет работу между несколькими потоками и, чтобы не создавать слишком много потоков, задает ограничение снизу на количество элементов, обрабатываемых одним потоком. Отмстим, что в этой реализации предполагается, что ни одна операция не возбуждает исключений, хотя в принципе исключения возможны; например, конструктор >std::thread возбуждает исключение, если не может создать новый поток. Но если добавить в этот алгоритм обработку исключений, он перестанет быть таким простым; эту тему мы рассмотрим в главе 8.


Листинг 2.8. Наивная реализация параллельной версии алгоритма >std::accumulate

>template

> struct accumulate_block {

> void operator()(Iterator first, Iterator last, T& result) {

>  result = std::accumulate(first, last, result);

> }

>};


>template

>T parallel_accumulate(Iterator first, Iterator last, T init) {

> unsigned long const length = std::distance(first, last);

> if (!length) ←(1)

>  return init;


> unsigned long const min_per_thread = 25;

> unsigned long const max_threads =

>  (length+min_per_thread - 1) / min_per_thread; ←(2)


> unsigned long const hardware_threads =

>  std::thread::hardware_concurrency();


> unsigned long const num_threads = ←(3)

>  std::min(

>   hardware.threads != 0 ? hardware_threads : 2, max_threads);


> unsigned long const block_size = length / num_threads; ←(4)


> std::vector results(num_threads);

> std::vector threads(num_threads - 1); ←(5)


> Iterator block_start = first;

> for(unsigned long i = 0; i < (num_threads - 1); ++i) {

>  Iterator block_end = block_start;

>  std::advance(block_end, block_size); ←(6)


>  threads[i] = std::thread( ←(7)

>   accumulate_block(),

>   block_start, block_end, std::ref(results(i)));

>  block_start = block_end;  ←(8)

> }

> accumulate_block()(

>  block_start, last, results[num_threads-1]); ←(9)


> std::for_each(threads.begin(), threads.end(),

> std::mem_fn(&std::thread::join)); ←(10)


> return

>  std::accumulate(results.begin(), results.end(), init); ←(11)

>}

Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение >init. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).

Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.