Волшебство Parallel.ForEach. Краткий обзор

В этой части мы рассмотрим внутренний механизм работы параллельного цикла Parallel.ForEach. Вам может казаться, что понимание внутреннего механизма параллельного цикла при создании программ вам не к чему, однако автор приводит несколько доводов для того что бы вы более глубоко вникли в этот механизм.

Во-первых вы узнаете, тонкости использования OtlDataManager и сможете более глубоко применять эти знания в ваших приложениях.
Во-вторых - это интересная тема :)
И в третьих использование Parallel.ForEach гораздо более лучшее решение, чем использование доморощенного многопоточного кода, так как OTL более гибка.

Давайте начнем знакомство со страшной диаграммой. Картина ниже показывает внутренний механизм OtlDataManager. В будущем мы будем часто обращаться к этой диаграмме, но пока Вы можете благополучно пропустить её
OtlDataManager
Впечатляет, не правда ли. Но давайте сосредоточимся на коде.
Parallel.ForEach(1, 1000)
  .Execute(
    procedure (const elem: integer)
    begin
    end);

Этот простой код выполняется в цикле от 1 до 1000 на всех доступных ядрах параллельно и выполняет простую процедуру, которая не содержит рабочего кода. При первом взгляде, код ничего не делает - но внутри механизма распараллеливания он делает работу в очень сложной манере.
Метод ForEach создает новый объект TOmniParallelLoop<integer>. Этот объект, который координирует параллельные задачи, и является исходным провайдером, который знает, как получить доступ к значению перечисления, которое в данный момент перечисляется (от 1 до 1000 в этом примере)
OtlDataManager содержит четырех других исходных провайдера - один для каждого типа данных, который можно передать к методу ForEach (подробнее мы рассмотрим это далее). Как пишет автор "Если бы была бы потребность расширить ForEach с новым источником перечисления, я должен был бы только добавить немного простых методов к модулю OtlParallel и написать новый исходный провайдер".

class function Parallel.ForEach(low, high: integer; step: integer):
  IOmniParallelLoop<integer>;
begin
  Result := TOmniParallelLoop<integer>.Create(
  CreateSourceProvider(low, high, step), true);
end; { Parallel.ForEach }

В конце концов, вызывается InternalExecuteTask. Этот метод ответственен за создание и старт параллельных задач цикла.
InternalExecuteTask сначала создает менеджер данных  и привязан к исходному поставщику (сравните это с картиной выше - есть один исходный поставщик и один менеджер данных).
Затем он создает соответствующее число задач и вызывает определенный для задачи метод делегата от каждого.
Этот делегат обертывает Ваш параллельный код и предоставляет ему надлежащий вход (и иногда выход) в созданную задачу.
procedure TOmniParallelLoopBase.InternalExecuteTask(
  taskDelegate: TOmniTaskDelegate);
var
  dmOptions    : TOmniDataManagerOptions;
  iTask        : integer;
  numTasks     : integer;
  task         : IOmniTaskControl;
  begin
    …
    oplDataManager := CreateDataManager(oplSourceProvider,
      numTasks, dmOptions);
    …
    for iTask := 1 to numTasks do
    begin
      task := CreateTask(
        procedure (const task: IOmniTask)
        begin
          …
          taskDelegate(task);
          …
        end,
        …
      task.Schedule(GParallelPool);
    end;
    …
  end;
end;

Объект "менеджер данных" это часть в  TOmniParallelLoop<T>. Он является глобальным для всех делегатов. Это сделано для того, чтобы можно было его просто использовать и вызывать в делегате задачи.
Более совершенный проект должен был бы послать этот объект делегату задачи как входящий параметр. Возможно, автор сделает это в будущем,  однако в версии 2.0 менеджер данных один и он глобальный, примите это за факт.
Самый простой делегат задачи (ниже) только создает локальную  очередь и передает  значение перечисления друг за другом. Такой подход приводит ко многим локальным очередям. Полученный результат передается в задачу связанную с менеджером данных.
На случай если Вы задаетесь вопросом, что такое loopBody - то это анонимный метод, который Вы передали в методе Execute  Parallel.ForEach.

procedure InternalExecuteTask(const task: IOmniTask)
var
  localQueue: TOmniLocalQueue;
  value     : TOmniValue;
begin
  localQueue := oplDataManager.CreateLocalQueue;
  try
    while (not Stopped) and localQueue.GetNext(value)
    do
      loopBody(task, value);
    finally
      FreeAndNil(localQueue);
    end;
end;


Давайте повторим:
  • Исходный провайдер создан. 
  • Менеджер данных создан и связан с исходным провайдером.  
  • Каждая задача создает свою собственную локальную очередь и использует ее, чтобы получить доступ к исходным данным 
Как вы видите, локальная очередь получает данные в пакетах (data package) с менеджера данных и посылает эти данные в выходной буфер который удостоверяется, что присланные данные получены в правильном порядке.
Если задача исчерпывает работу, она просит новый пакет данных от менеджера данных, который получает эти данные от исходного провайдера (более подробно рассмотрим это далее). Если исходный провайдер исчерпает данные, то менеджер данных попытается украсть некоторые данные от других задач. Схематично это выглядит так:

Все это было разработано, чтобы обеспечить быстрый доступ к данным (блокирование ограничено исходным провайдером все другие взаимодействия происходят без блокировки), что обеспечивает хорошее распределение рабочей нагрузки.
 
Автор: Кузан Дмитрий

Страница сайта http://test.interface.ru
Оригинал находится по адресу http://test.interface.ru/home.asp?artId=25040