Windows Sockets, IOCP и Delphi

Источник: habrahabr
Mr_Developer

Пролог


Недавно я столкнулся с необходимостью эффективной работы с сокетами в Windows приложении. Задача типичная для нагруженного сервера. Нетипичным тут будет казаться только язык реализации - Delphi.
Я хочу описать способ массовой асинхронной работы с большим количеством сокетов с использованием I/O Completion Ports. Microsoft в своей литературе рекомендует использовать именно эту технологию. Я думаю, многие с ней знакомы, но на всякий случай укажу ссылку на MSDN. Суть технологии в том, что система организует высокоэффективную очередь событий, а программа обрабатывает её из тред-пула, размер которого подобран по количеству вычислительных ядер. Данный подход имеет преимущества при большом количестве одновременно производимых асинхронных операций ввода вывода для разных конечных точек. Готовый исходник можно (лучше) сразу гляуть здесь. Не всё идеально, но для эксперементов сойдёт.

Roadmap


Я, в некотором смысле, буду придерживаться идеологии Node.Js во всём, что касается организации объектов и операций ввода вывода. 
В случае с серверной частью понадобиться реализовать следующее:
  • Прослушивание сокета. Принятием или отклонением новых соединений.
  • Отслеживание сигнала закрытия клиентских сокетов.

Для клиента первый пункт этого списка не актуален, но необходимо реализовать асинхронное подключение к серверу. В обоих классах будет возможность одновременного и чтения и записи на одну конечную точку.
Все созданные экземпляры клиентских и серверных сокетов будут использовать одну общую очередь сообщений и один тред-пул. Это нужно для возможности использовать оба типа сокетов в одном приложении оптимальным образом.

Реализация


Приступим. Для начала отмечу, что в связи с абсолютно асинхронной событийной моделью построения я буду реализовывать не классы а интерфейсы. Это очень удобно в данном случае, так как с конечного программиста снимается ответственность за выделенную память. Да и вообще, отследить тут её использование другим способом либо очень затратно либо вовсе невозможно. Очень много работы должно происходить при инициализации модуля. 
  • Создание списков сокетов разных типов.
  • Инициализация подсистемы сокетов.
  • Создание очереди сообщений.
  • Создание пула для обработки очереди.
  • Создание событий для сокетов.
  • Создание потоков отслеживающих сокетные события( например подключение нового клиента).

И так, секция инициализации содержит следующую процедуру, которая реализует список пункт за пунктом.
procedure Init;
var
  WSAData: TWsaData;
  i: Integer;
begin
  gClients := TProtoStore.Create;
  gListeners := TProtoStore.Create;
  gServerClients := TProtoStore.Create;
  if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then
    raise IOCPClientException.Create(sErrorInit_WSAtartup);
  gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2);
  if gIOCP = INVALID_HANDLE_VALUE then
    raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort);
  for i := 1 to CPUCount * 2 do
  begin
    SetLength(gWorkers, Length(gWorkers) + 1);
    gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create();
  end;
  gListenerAcceptEvent := WSACreateEvent;
  if gListenerAcceptEvent = WSA_INVALID_EVENT then
    raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
  gServerClientsCloseEvent := WSACreateEvent;
  if gServerClientsCloseEvent = WSA_INVALID_EVENT then
    raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
  gClisentsConnectAndCloseEvents := WSACreateEvent;
  if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then
    raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
  gClientSocketEventThread := TSocketEventThread.Create
    (gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED);
  gClientSocketEventThread.Start;
  gServerClientsSocketEventThread := TSocketEventThread.Create
    (gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED);
  gServerClientsSocketEventThread.Start;
  gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent,
    gListeners, ET_EVENT_SIGNALED);
  gServerSocketEventThread.Start;
end;

Функция CreateIoCompletionPort в данном случае выполняет создание специальной очереди сообщений.
Видно, что для отслеживания событий на сокетах с разным назначением используется один и тот же класс потока TSocketEventThread. Потоки этого класса выполняют процедуру, которая ожидает сокетные события, и сразу же ставят в очередь сообщения (для каждого сокета относящегося к типу, который облуживает этот поток) о том что произошло какое-то событие. 
procedure TSocketEventThread.WaitForClientsEvents;
var
  WaitResult: DWORD;
const
  TimeOut: DWORD = 100;
begin
  WaitResult := WSAWaitForMultipleEvents(1, @fEvent, FALSE, TimeOut, FALSE);
  if WaitResult = WSA_WAIT_FAILED then
    raise IOCPClientException.Create
      (sErrorWaitForClientsEvents_WSAWaitForMultipleEvents);
  if WaitResult = WSA_WAIT_EVENT_0 then
  begin
    if not WSAResetEvent(fEvent) then
      raise IOCPClientException.Create
        (sErrorWaitForClientsEvents_WSAResetEvent);
    fStore.Post(fKey);
  end;
end;

Тут метод fStore.Post(fKey); как раз и выполняет отправку сообщений в очередь.
procedure TProtoStore.Post(CompletionKey: DWORD);
var
  i: Integer;
begin
  fLock.Enter;
  try
    for i := 0 to Length(ProtoArray) - 1 do
    begin
      ProtoArray[i]._AddRef;
      if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey,
        POverlapped(ProtoArray[i])) then
      begin
        ProtoArray[i]._Release;
        raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus);
      end;
    end;
  finally
    fLock.Leave;
  end;
end;

Особое внимание заслуживает тут использование объектов с интерфейсами.
Метод _AddRef используется для того, чтобы обозначить тот факт, что объект "находится в очереди" и его не следует уничтожать. (Позже после обработки будет вызван _Release). Процедура PostQueuedCompletionStatus непосредственно выполняет постановку сообщения в очередь.
Пул обработает каждое сообщение в асинхронном режиме.
Для этого он выполняет следующую процедуру.
procedure TWorkerThread.ProcessIOCP;
var
  NumberOfBytes: DWORD;
  CompletionKey: NativeUInt;
  Overlapped: POverlapped;
  Proto: TIOCPSocketProto;
begin
  if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey,
    Overlapped, INFINITE)) and (Overlapped = nil)) then
  begin
    if CompletionKey = ET_EVENT_SIGNALED then
    begin
      Proto := TIOCPSocketProto(Overlapped);
      with Proto do
      begin
        IOCPProcessEventsProc();
        _Release;
      end
    end
    else if CompletionKey <> 0 then
    begin
      Proto := TIOCPSocketProto(CompletionKey);
      if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then
        Proto._Release;
    end;
  end
end;

Процедура GetQueuedCompletionStatus служит для получения сообщения из очереди. Далее определяется является ли это сообщение сообщением о завершенном вводе/выводе или это сообщение о произошедшем событии. Тут продемонстрированы два способа передать через очередь какую-то информацию, в данном случае это ссылка на конкретный экземпляр класса сокетов. 
Обработка ведётся унифицировано для всех типов сокетов, это достигнуто с помощью наследования от общего предка который содержит общие обработчики, допускается их переопределение. 
Рассмотрим механизм обработки сокетных событий.
procedure TIOCPSocketProto.IOCPProcessEventsProc();
var
  WSAEvents: TWsaNetworkEvents;
  AcceptedSocket: TSocket;
  RemoteAddress: string;
begin
  if fStateLock <> CLI_SOCKET_LOCK_CLOSED then
  begin
    fClosingLock.BeginRead;
    try
      if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then
        if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then
        begin
          if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then
          begin
            if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then
              InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED);
            CallOnConnect;
          end;
          if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and
            (0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then
            CallOnClose;
          if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and
            (0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then
          begin
            AcceptedSocket := DoAccept(RemoteAddress);
            if AcceptedSocket <> INVALID_SOCKET then
            begin
              fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose,
                RemoteAddress).Prepare;
            end;
          end;
        end
    finally
      fClosingLock.EndRead;
    end;
  end;
end;

Здесь интересно применён класс TMultiReadExclusiveWriteSynchronizer. Он используется для предотвращения попытки закрыть сокет и уничтожить объект из другой нити пула (fClosingLock.BeginRead). Все операции с сокетом проходят как операции чтения для этого объекта синхронизации, кроме операции создания и операции закрытия сокета - они являются операциями записи и потому могут выполняться только при монопольном владении ресурсом.
Во всём же остальном работа с сокетами в данной процедуре совершенно обыкновенная.
Единственное что в этой процедуре стоит рассмотреть дополнительно - это подключение нового клиента к серверу, метод DoAccept. 
function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket;
var
  addr: TSockAddr;
  addrlen: Integer;
  dwCallbackData: NativeUInt;
  RemoteAddrLen: DWORD;
begin
  dwCallbackData := NativeUInt(self);
  addrlen := SizeOf(addr);
  Result := WSAAccept(fSocket, @addr, @addrlen, ServerAcceptCallBack,
    dwCallbackData);
  if Result <> INVALID_SOCKET then
  begin
    SetLength(RemoteAddress, 255);
    RemoteAddrLen := Length(RemoteAddress);
    if WSAAddressToString(addr, addrlen, nil, PChar(<hh user=RemoteAddress>[1]),
      RemoteAddrLen) = SOCKET_ERROR then
      raise IOCPClientException.Create(sErrorAccept_WSAAddressToString);
    SetLength(RemoteAddress, RemoteAddrLen - 1)
  end
end;

Здесь ключевым моментом является использование WSAAccept. Эта функция позволяет отклонять подключение клиентов таким образом, что клиент на самом деле получает событие FD_CONNECT.
Это предочтительный путь для организации так называемых чёрных списков.
Идём далее. Расмотрим организацию ввода вывода. Сделаем это на примере операции чтения.
procedure TIOCPSocketProto.Read(Length: DWORD;
  OnRead, OnReadProcess: TOnReadEvent);
var
  Bytes, Flags: DWORD;
  WsaBuf: TWsaBuf;
begin
  fClosingLock.BeginRead;
  try
    if fStateLock = CLI_SOCKET_LOCK_CONNECTED then
    begin
      if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE
      then
      begin
        fOnRead := OnRead;
        fOnReadProcess := OnReadProcess;
        fReaded := 0;
        fReadBufLength := Length;
        fReadBuffer := nil;
        GetMem(fReadBuffer, Length);
        if fReadBuffer <> nil then
        begin
          Bytes := 0;
          FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0);
          WsaBuf.buf := fReadBuffer;
          WsaBuf.len := fReadBufLength;
          Flags := 0;
          Bytes := 0;
          _AddRef;
          if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead, nil)
            = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
          begin
            FreeMem(fReadBuffer, Length);
            InterlockedExchange(fReadLock, IO_IDLE);
            _Release;
            raise IOCPClientException.Create(sErrorRead_WSARecv);
          end;
        end
        else
          raise IOCPClientException.Create(sErrorRead_GetMem);
      end
      else
        raise IOCPClientException.Create(sErrorRead_InProcess);
    end
    else
      raise IOCPClientException.Create(sErrorRead_NotConnected);
  finally
    fClosingLock.EndRead;
  end;
end;

Здесь пришлось использовать интерлокед блокировку, т.к. она очень быстрая и удовлетворяет потребность в отсечении попытки повторного вызова опрации ввода/вывода. Память выделяется под буфер единажды в каждой операции. Далее вызывается чтение из сокета в асинхронном режиме. Объект также "помечается" с помощью AddRef, для невозможности его удаления во время нахождения в очереди. По завершении вычитывания пакета сообщения об этом автоматически выставляется в очередь.
Рассмотрим, что происходит при выборке из очереди сообщения о завершенном вводе/выводе.
function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD;
  Overlapped: POverlapped): Boolean;
var
  Bytes, Flags: DWORD;
  WsaBuf: TWsaBuf;
begin
  Result := FALSE;
  fClosingLock.BeginRead;
  try
    if Overlapped = @fOverlappedRead then
    begin
      if NumberOfBytes <> 0 then
      begin
        if fReadLock = IO_PROCESS then
        begin
          inc(fReaded, NumberOfBytes);
          if fReaded < fReadBufLength then
          begin
            CallOnReadProcess;
            WsaBuf.buf := fReadBuffer;
            inc(WsaBuf.buf, fReaded);
            WsaBuf.len := fReadBufLength;
            dec(WsaBuf.len, fReaded);
            Flags := 0;
            Bytes := 0;
            if (WSARecv(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedRead,
              nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
            begin
              CallOnRead;
              Result := True;
            end
          end
          else
          begin
            CallOnReadProcess;
            CallOnRead;
            Result := True;
          end;
        end
      end
      else
      begin
        CallOnRead;
        Result := True;
      end;
    end
    else if Overlapped = @fOverlappedWrite then
    begin
      if NumberOfBytes <> 0 then
      begin
        if fWriteLock = IO_PROCESS then
        begin
          inc(fWrited, NumberOfBytes);
          if fWrited < fWriteBufLength then
          begin
            CallOnWriteProcess;
            WsaBuf.buf := fWriteBuffer;
            inc(WsaBuf.buf, fWrited);
            WsaBuf.len := fWriteBufLength;
            dec(WsaBuf.len, fWrited);
            Flags := 0;
            Bytes := 0;
            if (WSASend(fSocket, @WsaBuf, 1, Bytes, Flags, @fOverlappedWrite,
              nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
            begin
              CallOnWrite;
              Result := True;
            end
          end
          else
          begin
            CallOnWriteProcess;
            CallOnWrite;
            Result := True;
          end;
        end
      end
      else
      begin
        CallOnWrite;
        Result := True;
      end;
    end
  finally
    fClosingLock.EndRead;
  end;
end;

Суть этой процедуры в том, что она вызывает чтение или запись в сокет до того момента, когда выделенный буфер не окажется заполненным. Интересный момент в данном случае, это определение типа операции по ссылке на оверлапед структуру. Эту ссылку предоставляет очередь и необходимо лишь сравнить её с соответсвующими полями класса, в которых храняться структуры для чтения и записи.
Так же примечательно, то что если операция чтения/записи выполнилась мгновенно, то она всё равно попадает в очередь, однако это можно настроить через апи.
Стоит так же рассмотреть создание класса сокета и внедрение в очередь.
constructor TIOCPClientSocket.Create(RemoteAddress: string;
  OnConnect, OnClose: TOnSimpleSocketEvenet);
var
  lRemoteAddress: TSockAddr;
  lRemoteAddressLength: Integer;
begin
  inherited Create();
  fStore := gClients;
  fOnConnect := OnConnect;
  fOnClose := OnClose;
  fStateLock := 0;
  fRemoteAddressStr := RemoteAddress;
  fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
  if fSocket = INVALID_SOCKET then
    raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket);
  if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents,
    FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then
    raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect);
  if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then
    raise IOCPClientException.Create
      (sErrorTIOCPClientSocket_CreateIoCompletionPort);
  fStateLock := CLI_SOCKET_LOCK_CREATED;
  fStore.Add(self);
  lRemoteAddressLength := SizeOf(lRemoteAddress);
  lRemoteAddress.sa_family := AF_INET;
  if WSAStringToAddress(PChar(@fRemoteAddressStr[1]), AF_INET, nil,
    lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then
    raise IOCPClientException.Create
      (sErrorTIOCPClientSocket_WSAStringToAddress);
  if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil,
    nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then
    raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect);
end; 

В конструкторе клиентского сокета создаётся непосрественно сокет (WSASocket), регестрируется в очереди (CreateIoCompletionPort), асоциируется с событием и вызывает асинхронную функцию подключения(WSAConnect). Сам факт подключения ожидается в потоке который был рассмотрен первым(поток ожидания событий в сокетах). Тот в свою очередь поставит это событие в очередь.

Эпилог


В данной статье кратко рассмотрены, на мой взгляд, удачные приёмы создания классов для событийного программирования. 
Удалось создать класс для выскопроизводительной работы с сокетами для Delphi. Тема эта освещена в целом крайне слабо и я планирую продолжить эту публикацию ещё 2 - 3 постами по темам контекстов сокетов при использовании интерфейсов и создание защищённых соединений при использовании IOCP (криптопровайдеры и Winsock Secure Socket Extensions). Полный код примера здесь.

Страница сайта http://test.interface.ru
Оригинал находится по адресу http://test.interface.ru/home.asp?artId=29256