const CCommandLine::Count numRepeats = commandLine.Repeats(); const CCommandLine::Count numPackets = commandLine.Packets(); const CCommandLine::Count packetSize = commandLine.PacketSize(); const CCommandLine::Count delayAfter = commandLine.PacketBatchSize(); const CCommandLine::Count delayFor = commandLine.PacketBatchPause(); const CCommandLine::Count maxSocketsInPool = numPackets; const CCommandLine::Count maxBuffersInPool = static_cast<CCommandLine::Count>(numPackets * 2); const CCommandLine::Count bufferSize = packetSize; CDatagramSocketAllocator socketAllocator( maxSocketsInPool, commandLine.SpinCount()); CBufferAllocator bufferAllocator( bufferSize, maxBuffersInPool); CIOPool ioPool( 0); // number of threads (0 = 2 x processors) ioPool.Start();
CTestDatagrams datagrams(commandLine.Repeats()); for (DWORD i = 0; i < numPackets; ++i) { datagrams.AddDatagram(new CTestDatagram(i, packetSize)); }
id
which we use as a key into the datagram collection for matching the echo response to the original datagram. The rest of the data is then set to a pattern that is different for each datagram in the set. Note that if you send 1 datagram 10000 times you will be sending the same datagram 10000 times, whereas if you send 10000 datagrams once they'll all be different... CTestDatagram::CTestDatagram( const DWORD id, const DataLength messageSize) : m_id(id), m_pData(new BYTE[messageSize]), m_messageSize(messageSize), m_repeatCount(0), m_currentSendIteration(0), m_currentRecvIteration(0) { memcpy(m_pData, &id, sizeof(id)); for (size_t i = sizeof(id); i < m_messageSize; ++i) { if (i % 2) { m_pData[i] = static_cast<BYTE>(i); } else { m_pData[i] = static_cast<BYTE>(id); } } }
CDatagramSocketConnectionManager connector( ioPool, socketAllocator, bufferAllocator, datagrams, commandLine.ReadTimeout()); const CAddressIPv4 address( commandLine.Server(), commandLine.Port()); if (commandLine.PreAllocate()) { socketAllocator.EnlargePool(maxSocketsInPool); bufferAllocator.EnlargePool(maxBuffersInPool); }
CPerformanceCounter counter(CPerformanceCounter::LockThreadToOneCPU); for (int y = 0; y < numRepeats; ++y) { for (int i = 0; i < numPackets && !datagrams.HasFailed(); ++i) { ITestDatagram &datagram = datagrams.GetDatagram(i); connector.SendTo(address, datagram.GetBytes(), datagram.GetSize()); if (delayAfter && ((i + 1) % delayAfter == 0)) { cout << i + 1 << " packets sent if (delayFor) { Sleep(delayFor); } } } }
HANDLE handlesToWaitFor[2]; handlesToWaitFor[0] = shutdownEvent.GetWaitHandle(); handlesToWaitFor[1] = datagrams.GetWaitHandle(); const Milliseconds checkEvery = commandLine.DisplayStatusEvery(); Milliseconds elapsedTime = 0; bool done = false; const Milliseconds dataFlowTimeoutMillis = commandLine.DataFlowTimeout(); while (!done) { const Milliseconds timeout = (dataFlowTimeoutMillis == INFINITE ? checkEvery : min(checkEvery, dataFlowTimeoutMillis - elapsedTime)); elapsedTime += timeout; DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, timeout); if (waitResult == WAIT_TIMEOUT) { if (elapsedTime >= dataFlowTimeoutMillis) { cout << "Waiting for data flow to complete timed out after " << CStringConverter::TtoA(counter.GetElapsedTimeAsString()) << endl; result = 1; done = true; } else { cout << "Waiting for " << datagrams.GetNumberOfRepliesPending() << " packets to be echoed connector.DumpErrors(); } } else if (waitResult == WAIT_OBJECT_0) { done = true; result = 1;
else if (waitResult == WAIT_OBJECT_0 + 1) { const _tstring elapsedTime = counter.GetElapsedTimeAsString(); cout << "Data flow complete in " << CStringConverter::TtoA(elapsedTime) << endl; cout << " Packets sent: " << datagrams.GetNumSent() << endl; cout << "Echoes recieved: " << datagrams.GetNumReceived() << endl; cout << " Average RTT: " << datagrams.GetAverageRTT() << "ms cout << " Max RTT: " << datagrams.GetMaxRTT() << "ms cout << " Min RTT: " << datagrams.GetMinRTT() << "ms done = true; result = datagrams.DataflowSucceeded() ? 0 : 1; connector.DumpErrors(); if (commandLine.DumpRTT()) { datagrams.DumpRTT(); }
OnSendToCompleted()
to start our read timer. Once we've sent a datagram to the server the server has until our read timeout expires to reply. If the server replies but the response datagram gets discarded or lost then the timeout will fire and the test will timeout. This allows you to push a server (and the test client!) to the point where you are sending datagrams too fast for it to respond to before your timeout expires; you can then work out how many datagrams per second your server can reliably deal with under the prevailing networking conditions and how quickly it can respond. We use OnReadCompleted() to validate the inbound datagram responses. We also use various error callbacks to track errors from the framework and shutdown the test on failure.
CDatagramSocketConnectionManager::CDatagramSocketConnectionManager(
IIOPool &pool,
IAllocateDatagramSockets &socketAllocator,
IAllocateBuffers &bufferAllocator,
CTestDatagrams &datagrams,
const Milliseconds readTimeout)
: CFilteringDatagramSocketConnectionManager(*this, pool, socketAllocator, bufferAllocator),
m_timerQueue(CThreadedCallbackTimerQueue::BestForPlatformNoLock),
m_readTimeoutFilter(*this, m_timerQueue),
m_readTimeout(readTimeout),
m_datagrams(datagrams)
{
}
CDatagramSocketConnectionManager::~CDatagramSocketConnectionManager()
{
WaitForShutdownToComplete();
}
bool CDatagramSocketConnectionManager::WaitForShutdownToComplete(
const Milliseconds timeout)
{
const bool shutdownComplete = CFilteringDatagramSocketConnectionManager::WaitForShutdownToComplete(timeout);
if (shutdownComplete)
{
m_timerQueue.WaitForShutdownToComplete();
}
return shutdownComplete;
}
void CDatagramSocketConnectionManager::OnRecvFromCompleted(
IDatagramSocket &socket,
const IAddress &address,
IBuffer &buffer)
{
(void)socket;
(void)address;
//DEBUG_ONLY(Output(_T(" Local Address: ") + CAddressRenderer::AsString(socket.GetLocalAddress(), true)));
//DEBUG_ONLY(Output(_T("Remote Address: ") + CAddressRenderer::AsString(address, true)));
//DEBUG_ONLY(Output(DumpData(buffer.GetMemory(), buffer.GetUsed(), 60)));
m_datagrams.ValidateReceivedData(buffer.GetMemory(), buffer.GetUsed());
}
void CDatagramSocketConnectionManager::OnSendToCompleted(
IDatagramSocket &socket,
const IAddress & /*address*/,
IBuffer & /*buffer*/)
{
if (m_readTimeout != 0)
{
m_readTimeoutFilter.SetReadTimeout(socket, m_readTimeout, *this);
}
socket.RecvFrom();
}
void CDatagramSocketConnectionManager::OnSendToCompletionError(
IDatagramSocket & /*socket*/,
const IAddress & /*address*/,
IBuffer & /*buffer*/,
const DWORD lastError)
{
OnError(_T("OnSendToCompletionError - ") + GetLastErrorMessage(lastError, true));
m_datagrams.Fail();
}
void CDatagramSocketConnectionManager::OnError(
IDatagramSocket & /*socket*/,
const _tstring &message)
{
ICriticalSection::Owner lock(m_criticalSection);
m_errors[message]++;
}
void CDatagramSocketConnectionManager::OnError(
const _tstring &message)
{
ICriticalSection::Owner lock(m_criticalSection);
m_errors[message]++;
}
void CDatagramSocketConnectionManager::OnConnectionClosure(
IDatagramSocket & /*socket*/,
const ConnectionClosureReason /*reason*/)
{
// Suppress the debug message that the server common version gives us.
}
void CDatagramSocketConnectionManager::OnConnectionClosed(
IDatagramSocket & /*socket*/)
{
// Suppress the debug message that the server common version gives us.
}
void CDatagramSocketConnectionManager::OnTimer(
IDatagramSocket & /*socket*/,
CReadTimeoutDatagramSocketConnectionFilter::UserData /*userData*/)
{
OnError(_T("Read timeout"));
m_datagrams.Fail();
}
The datagram collection is fairly straight forward and deals with maintaining statistics for each datagram. The datagram class maintains round trip time details for each time it is sent and deals with validation of responses.
You can configure the test from the command line using the following command line arguments.
Usage: EchoServerUDPTest -server xxx.xxx.xxx.xxx -port xxxx
Command line parameters:
r -server The server address to connect to (dotted ip).
r -port The port to connect to.
o -packets The number of packets to send.
Defaults to 100
o -packetSize Size of each packet.
Defaults to 1024
o -packetBatchSize Batch packets in groups of X size.
Defaults to 0 (no batching)
o -packetBatchDelay Delay for Y milliseconds between each batch.
Defaults to 0 (no delay)
o -repeat The number of times to send each sequence of packets.
Defaults to 1, no repeats, send each sequence only once.
Note that you can run a test with more packets echoed
per connection using less memory by setting high value
for -repeat and a low value for -packets.
o -readTimeout Fail the test if it takes longer than Y milliseconds
for an echoed packet to arrive back.
Defaults to 0; no timeout
o -preallocate Preallocate sockets and buffers before connecting.
o -spinCount The spin count used for per socket critical sections.
o -displayRTT Display the round trip times for each packet.
o -dataFlowTimeout Fail the test if it takes longer than Y milliseconds
for all dataflow to complete.
Defaults to no timeout
o -displayStatusEvery During data flow, display the status of each connection
every Y milliseconds.
Defaults to not displaying status.
o -displayRTT Display the round trip times for each packet.
r = required, o = optional