Winsock Registered I/O - Traditional Multi threaded IOCP UDP Example Server

Page content

This article presents the sixth in my series of example servers for comparing the performance of the Winsock Registered I/O Networking extensions, RIO, and traditional Windows networking APIs. This example server is a traditional multi-threaded, IOCP based, UDP design that we can use to compare to the multi-threaded RIO IOCP UDP example server. I’ve been looking at the Winsock Registered I/O Networking Extensions since October when they first made an appearance as part of the Windows 8 Developer Preview, though lately most of my testing has been using Windows Server 2012 RC. Whilst exploring and understanding the new API I spent some time putting together some simple UDP servers using the various notification styles that RIO provides. I then put together some equally simple UDP servers using the “traditional” APIs so that I could compare performance. This series of blog posts describes each of the example servers in turn. You can find an index to all of the articles about the Winsock Registered I/O example servers here.

A traditional multi-threaded IOCP UDP server

This server is structured in a similar way to the other example servers and uses the same shared helper code and limited error handling (see here for more details). We start by initialising things in a similar way to the other servers.

int _tmain(int argc, _TCHAR* argv[])
{
   if (argc > 2)
   {
      cout << "Usage: IOCPUDPMT [workLoad]" << endl;
   }

   if (argc > 1)
   {
      g_workIterations = _ttol(argv[1]);
   }

   SetupTiming("IOCP UDP MT");

   InitialiseWinsock();

   SOCKET s = CreateSocket(WSA_FLAG_OVERLAPPED);

   HANDLE hIOCP = CreateIOCP();

   if (0 == ::CreateIoCompletionPort(
      reinterpret_cast<HANDLE>(s),
      hIOCP,
      1,
      0))
   {
      ErrorExit("CreateIoCompletionPort");
   }

   Bind(s, PORT);

   PostIOCPRecvs(RECV_BUFFER_SIZE, IOCP_PENDING_RECVS);

   CreateIOCPThreads(NUM_IOCP_THREADS);

   WaitForProcessingStarted();

   WaitForProcessingStopped();

   StopIOCPThreads();

   PrintTimings();
}

To help simulate servers that actually do some work with each datagram we can pass a command line argument to control how much ‘busy work’ we do for each datagram.

Once we have the socket created and bound we need to post some read requests. This involves creating and managing a set of buffers in a similar way to what we do for the RIO server; though we don’t need to register these buffers before performing I/O with them.

inline void PostIOCPRecvs(
   const DWORD recvBufferSize,
   const DWORD pendingRecvs)
{
   DWORD totalBuffersAllocated = 0;

   while (totalBuffersAllocated < pendingRecvs)
   {
      DWORD receiveBuffersAllocated = 0;

      char *pBuffer = AllocateBufferSpace(
         recvBufferSize,
         pendingRecvs,
         receiveBuffersAllocated);

      totalBuffersAllocated += receiveBuffersAllocated;

      DWORD offset = 0;

      const DWORD recvFlags = 0;

      EXTENDED_OVERLAPPED *pBufs = new EXTENDED_OVERLAPPED[receiveBuffersAllocated];

      DWORD bytesRecvd = 0;
      DWORD flags = 0;

      for (DWORD i = 0; i < receiveBuffersAllocated; ++i)
      {
         EXTENDED_OVERLAPPED *pOverlapped = pBufs + i;

         ZeroMemory(pOverlapped, sizeof(EXTENDED_OVERLAPPED));

         pOverlapped->buf.buf = pBuffer + offset;
         pOverlapped->buf.len = recvBufferSize;

         offset += recvBufferSize;

         if (SOCKET_ERROR == ::WSARecv(
            g_s,
            &(pOverlapped->buf),
            1,
            &bytesRecvd,
            &flags,
            static_cast<OVERLAPPED *>(pOverlapped), 0))
         {
            const DWORD lastError = ::GetLastError();

            if (lastError != ERROR_IO_PENDING)
            {
               ErrorExit("WSARecv", lastError);
            }
         }
      }

      if (totalBuffersAllocated != pendingRecvs)
      {
         cout << pendingRecvs << " receives pending" << endl;
      }
   }

   cout << totalBuffersAllocated << " total receives pending" << endl;
}

We’re using the same buffer allocation code as the earlier RIO servers, so see here for more details or download the code at the end of this article.

We then create our worker threads and start everything up. The main work is done in the worker thread function shown below.

unsigned int __stdcall ThreadFunction(
   void *pV)
{
#ifdef TRACK_THREAD_STATS
   const DWORD index = (DWORD)(ULONG_PTR)pV;

   ThreadData &threadData = g_threadData[index];

   threadData.threadId = ::GetCurrentThreadId();

   threadData.maxPacketsProcessed = 1;
   threadData.minPacketsProcessed = 1;
#endif

   DWORD numberOfBytes = 0;

   ULONG_PTR completionKey = 0;

   OVERLAPPED *pOverlapped = 0;

   if (!::GetQueuedCompletionStatus(
      g_hIOCP,
      &numberOfBytes,
      &completionKey,
      &pOverlapped,
      INFINITE))
   {
      ErrorExit("GetQueuedCompletionStatus");
   }

   int workValue = 0;

   if (completionKey == 1)
   {
      bool done = false;

      ::SetEvent(g_hStartedEvent);

      DWORD bytesRecvd = 0;
      DWORD flags = 0;

      do
      {
#ifdef TRACK_THREAD_STATS
         threadData.dequeueCalled++;

         threadData.packetsProcessed++;
#endif

         if (numberOfBytes == EXPECTED_DATA_SIZE)
         {
            ::InterlockedIncrement(&g_packets);

            workValue += DoWork(g_workIterations);

            EXTENDED_OVERLAPPED *pExtOverlapped = static_cast<EXTENDED_OVERLAPPED *>(pOverlapped);

            if (SOCKET_ERROR == ::WSARecv(
               g_s,
               &(pExtOverlapped->buf),
               1,
               &bytesRecvd,
               &flags,
               pExtOverlapped,
               0))
            {
               const DWORD lastError = ::GetLastError();

               if (lastError != ERROR_IO_PENDING)
               {
                  ErrorExit("WSARecv", lastError);
               }
            }

            done = false;

         }
         else
         {
            done = true;
         }

         if (!done)
         {
            if (!::GetQueuedCompletionStatus(
               g_hIOCP,
               &numberOfBytes,
               &completionKey,
               &pOverlapped,
               INFINITE))
            {
               const DWORD lastError = ::GetLastError();

               if (lastError != ERROR_OPERATION_ABORTED)
               {
                  ErrorExit("GetQueuedCompletionStatus", lastError);
               }
            }

            if (completionKey == 0)
            {
               done = true;
            }
         }
      }
      while (!done);
   }

   ::SetEvent(g_hStoppedEvent);

   return workValue;
}

We’ve added some optional stats collection as this has proved useful in reasoning about the relative performance of the IOCP servers. These stats are now displayed along with the timings at the end of the test.

The code for this example can be downloaded from here. This code requires Visual Studio 2012, but would work with earlier compilers if you have a Windows SDK that supports RIO. Note that Shared.h and Constants.h contain helper functions and tuning constants for ALL of the examples and so there will be code in there that is not used by this example. You should be able to unzip each example into the same directory structure so that they all share the same shared headers. This allows you to tune all of the examples the same so that any performance comparisons make sense.

Join in

Comments and suggestions are more than welcome. I’m learning as I go here and I’m quite likely to have made some mistakes or come up with some erroneous conclusions, feel free to put me straight and help make these examples better.

Code is here

Code - updated 15th April 2023

Full source can be found here on GitHub.

This isn’t production code, error handling is simply “panic and run away”.

This code is licensed with the MIT license.