The C++ framework for developing highly scalable, high performance servers on Windows platforms.

Writing your first UDP client

As with writing a UDP server, the easiest way to start writing a new client is to take a look at the examples. The framework is designed for handling many connections, either to a server or from a client, to that end the client examples tend to all be dealing with 'multiple connections'. The basic TCP client example, for example, would make a good starting point for something like a web spider that connects to a vast number of sites concurrently. Since UDP is connectionless by design the easiest UDP client actually connects to a single server; more advanced UDP client examples manage 'virtual connections' and use multiple sockets but the simplest example uses a single client socket to talk to a single server socket.

This example is also explained in more detail in UDP Echo Server Test.

Since simple UDP servers are connectionless and simply deal in single datagrams our client simply opens a single connection to a server and sends a stream of datagrams. The client expects the server to echo the datagrams back to it and it validates the data that is echoed and keeps track of the number of datagrams that have been sent and received. Since UDP datagrams by their very nature can get discarded or lost the client has a timer that it uses to determine if the test has timed out - it needs this to prevent it waiting forever for datagrams that have been lost or discarded. When the test is complete the client reports information on the number of datagrams send and receieved and the minimum, maximum and average round trip time. The client also tracks the round trip time for all datagrams send and received and can produce this, rather large, report if required.

The TestMain.cpp file is where everything starts and, as usual, we start by parsing the command line and setting up the various objects that we need to create our connection manager.

          const CCommandLine::Count numRepeats = commandLine.Repeats();

          const CCommandLine::Count numPackets = commandLine.Packets();

          const CCommandLine::Count packetSize = commandLine.PacketSize();

          const CCommandLine::Count delayAfter = commandLine.PacketBatchSize();

          const CCommandLine::Count delayFor = commandLine.PacketBatchPause();

          const CCommandLine::Count maxSocketsInPool = numPackets;

          const CCommandLine::Count maxBuffersInPool = static_cast<CCommandLine::Count>(numPackets * 2);

          const CCommandLine::Count bufferSize = packetSize;

          CDatagramSocketAllocator socketAllocator(
             maxSocketsInPool,
             commandLine.SpinCount());

          CBufferAllocator bufferAllocator(
             bufferSize,
             maxBuffersInPool);

          CIOPool ioPool(
             0);                           // number of threads (0 = 2 x processors)

          ioPool.Start();


The test client operates on the basis of sending a certain number of datagrams (packets) of a certain size at a certain rate and once it has sent them all repeating the process a set number of times. This allows us to send more datagrams with less memory; sending 1 datagram 10000 times simply creates a single datagram in memory, sending 10000 datagrams once creates 10000 datagrams in memory... Remember, this is a simple test, something more "real world" would likely operate somewhat differently.

We build our datagrams here:

          CTestDatagrams datagrams(commandLine.Repeats());

          for (DWORD i = 0; i < numPackets; ++i)
          {
             datagrams.AddDatagram(new CTestDatagram(i, packetSize));
          }


The datagram itself is simple, it starts with an id which we use as a key into the datagram collection for matching the echo response to the original datagram. The rest of the data is then set to a pattern that is different for each datagram in the set. Note that if you send 1 datagram 10000 times you will be sending the same datagram 10000 times, whereas if you send 10000 datagrams once they'll all be different...

 CTestDatagram::CTestDatagram(
    const DWORD id,
    const DataLength messageSize)
    :  m_id(id),
       m_pData(new BYTE[messageSize]),
       m_messageSize(messageSize),
       m_repeatCount(0),
       m_currentSendIteration(0),
       m_currentRecvIteration(0)
 {
    memcpy(m_pData, &id, sizeof(id));

    for (size_t i = sizeof(id); i < m_messageSize; ++i)
    {
       if (i % 2)
       {
          m_pData[i] = static_cast<BYTE>(i);
       }
       else
       {
          m_pData[i] = static_cast<BYTE>(id);
       }
    }
 }


We then create our connection manager, set up our target address and prepare our buffer and socket pools if required.

          CDatagramSocketConnectionManager connector(
             ioPool,
             socketAllocator,
             bufferAllocator,
             datagrams,
             commandLine.ReadTimeout());

          const CAddressIPv4 address(
             commandLine.Server(),
             commandLine.Port());

          if (commandLine.PreAllocate())
          {
             socketAllocator.EnlargePool(maxSocketsInPool);

             bufferAllocator.EnlargePool(maxBuffersInPool);
          }


We then send our datagrams. Note that we have a simple send rate delay but we cannot limit our send rate to the rate that the server is operating at as we can with the TCP client; there's no "send after recv" concept. If we send too fast and either the server can't keep up with the inbound datagrams or we can't keep up with the replies then datagrams will be more likely to be lost. Because of this we have a timeout on our read call. If we wait too long for a datagram to come in and we're expecting a datagram then the test will fail.

             CPerformanceCounter counter(CPerformanceCounter::LockThreadToOneCPU);

             for (int y = 0; y < numRepeats; ++y)
             {
                for (int i = 0; i < numPackets && !datagrams.HasFailed(); ++i)
                {
                   ITestDatagram &datagram = datagrams.GetDatagram(i);

                   connector.SendTo(address, datagram.GetBytes(), datagram.GetSize());

                   if (delayAfter && ((i + 1) % delayAfter == 0))
                   {
                      cout << i + 1 << " packets sent

                      if (delayFor)
                      {
                         Sleep(delayFor);
                      }
                   }
                }
             }


We then wait for all replies to arrive or a signal to shutdown. You can supply a command line parameter to tell the test to display its status every so often, this makes it possible to see if the server is still sending or if it has stalled. With this and the timeout you can watch the affect of changing the rate at which you send your datagrams.

             HANDLE handlesToWaitFor[2];

             handlesToWaitFor[0] = shutdownEvent.GetWaitHandle();
             handlesToWaitFor[1] = datagrams.GetWaitHandle();

             const Milliseconds checkEvery = commandLine.DisplayStatusEvery();

             Milliseconds elapsedTime = 0;

             bool done = false;

             const Milliseconds dataFlowTimeoutMillis = commandLine.DataFlowTimeout();

             while (!done)
             {
                const Milliseconds timeout = (dataFlowTimeoutMillis == INFINITE ? checkEvery : min(checkEvery, dataFlowTimeoutMillis - elapsedTime));

                elapsedTime += timeout;

                DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, timeout);

                if (waitResult == WAIT_TIMEOUT)
                {
                   if (elapsedTime >= dataFlowTimeoutMillis)
                   {
                      cout << "Waiting for data flow to complete timed out after " << CStringConverter::TtoA(counter.GetElapsedTimeAsString()) << endl;

                      result = 1;
                      done = true;
                   }
                   else
                   {
                      cout << "Waiting for " << datagrams.GetNumberOfRepliesPending() << " packets to be echoed

                      connector.DumpErrors();
                   }
                }
                else if (waitResult == WAIT_OBJECT_0)
                {
                   done = true;
                   result = 1;


When all the echoes have arrived, or when we timeout we display the stats on the test.

                else if (waitResult == WAIT_OBJECT_0 + 1)
                {
                   const _tstring elapsedTime = counter.GetElapsedTimeAsString();

                   cout << "Data flow complete in " << CStringConverter::TtoA(elapsedTime) << endl;

                   cout << "   Packets sent: " << datagrams.GetNumSent() << endl;
                   cout << "Echoes recieved: " << datagrams.GetNumReceived() << endl;
                   cout << "    Average RTT: " << datagrams.GetAverageRTT() << "ms
                   cout << "        Max RTT: " << datagrams.GetMaxRTT() << "ms
                   cout << "        Min RTT: " << datagrams.GetMinRTT() << "ms

                   done = true;

                   result = datagrams.DataflowSucceeded() ? 0 : 1;

                   connector.DumpErrors();

                   if (commandLine.DumpRTT())
                   {
                      datagrams.DumpRTT();
                   }


And finally we clean up.

The connection manager is fairly simple, we only need to deal with a few callbacks. We use OnSendToCompleted() to start our read timer. Once we've sent a datagram to the server the server has until our read timeout expires to reply. If the server replies but the response datagram gets discarded or lost then the timeout will fire and the test will timeout. This allows you to push a server (and the test client!) to the point where you are sending datagrams too fast for it to respond to before your timeout expires; you can then work out how many datagrams per second your server can reliably deal with under the prevailing networking conditions and how quickly it can respond. We use OnReadCompleted() to validate the inbound datagram responses. We also use various error callbacks to track errors from the framework and shutdown the test on failure.

 CDatagramSocketConnectionManager::CDatagramSocketConnectionManager(
    IIOPool &pool,
    IAllocateDatagramSockets &socketAllocator,
    IAllocateBuffers &bufferAllocator,
    CTestDatagrams &datagrams,
    const Milliseconds readTimeout)
    :  CFilteringDatagramSocketConnectionManager(*this, pool, socketAllocator, bufferAllocator),
       m_timerQueue(CThreadedCallbackTimerQueue::BestForPlatformNoLock),
       m_readTimeoutFilter(*this, m_timerQueue),
       m_readTimeout(readTimeout),
       m_datagrams(datagrams)
 {

 }

 CDatagramSocketConnectionManager::~CDatagramSocketConnectionManager()
 {
    WaitForShutdownToComplete();
 }

 bool CDatagramSocketConnectionManager::WaitForShutdownToComplete(
    const Milliseconds timeout)
 {
    const bool shutdownComplete = CFilteringDatagramSocketConnectionManager::WaitForShutdownToComplete(timeout);

    if (shutdownComplete)
    {
       m_timerQueue.WaitForShutdownToComplete();
    }

    return shutdownComplete;
 }

 void CDatagramSocketConnectionManager::OnRecvFromCompleted(
    IDatagramSocket &socket,
    const IAddress &address,
    IBuffer &buffer)
 {
    (void)socket;
    (void)address;

    //DEBUG_ONLY(Output(_T(" Local Address: ") + CAddressRenderer::AsString(socket.GetLocalAddress(), true)));
    //DEBUG_ONLY(Output(_T("Remote Address: ") + CAddressRenderer::AsString(address, true)));

    //DEBUG_ONLY(Output(DumpData(buffer.GetMemory(), buffer.GetUsed(), 60)));

    m_datagrams.ValidateReceivedData(buffer.GetMemory(), buffer.GetUsed());
 }

 void CDatagramSocketConnectionManager::OnSendToCompleted(
    IDatagramSocket &socket,
    const IAddress & /*address*/,
    IBuffer & /*buffer*/)
 {
    if (m_readTimeout != 0)
    {
       m_readTimeoutFilter.SetReadTimeout(socket, m_readTimeout, *this);
    }

    socket.RecvFrom();
 }

 void CDatagramSocketConnectionManager::OnSendToCompletionError(
    IDatagramSocket & /*socket*/,
    const IAddress & /*address*/,
    IBuffer & /*buffer*/,
    const DWORD lastError)
 {
    OnError(_T("OnSendToCompletionError - ") + GetLastErrorMessage(lastError, true));

    m_datagrams.Fail();
 }

 void CDatagramSocketConnectionManager::OnError(
    IDatagramSocket & /*socket*/,
    const _tstring &message)
 {
    ICriticalSection::Owner lock(m_criticalSection);

    m_errors[message]++;
 }

 void CDatagramSocketConnectionManager::OnError(
    const _tstring &message)
 {
    ICriticalSection::Owner lock(m_criticalSection);

    m_errors[message]++;
 }

 void CDatagramSocketConnectionManager::OnConnectionClosure(
    IDatagramSocket & /*socket*/,
    const ConnectionClosureReason /*reason*/)
 {
    // Suppress the debug message that the server common version gives us.
 }

 void CDatagramSocketConnectionManager::OnConnectionClosed(
    IDatagramSocket & /*socket*/)
 {
    // Suppress the debug message that the server common version gives us.
 }

 void CDatagramSocketConnectionManager::OnTimer(
    IDatagramSocket & /*socket*/,
    CReadTimeoutDatagramSocketConnectionFilter::UserData /*userData*/)
 {
    OnError(_T("Read timeout"));

    m_datagrams.Fail();
 }



Generated on Sun Sep 12 19:06:45 2021 for The Server Framework - v7.4 by doxygen 1.5.3