const CCommandLine::Count numRepeats = commandLine.Repeats(); const CCommandLine::Count numPackets = commandLine.Packets(); const CCommandLine::Count packetSize = commandLine.PacketSize(); const CCommandLine::Count delayAfter = commandLine.PacketBatchSize(); const CCommandLine::Count delayFor = commandLine.PacketBatchPause(); const CCommandLine::Count maxSocketsInPool = numPackets; const CCommandLine::Count maxBuffersInPool = static_cast<CCommandLine::Count>(numPackets * 2); const CCommandLine::Count bufferSize = packetSize; CDatagramSocketAllocator socketAllocator( maxSocketsInPool, commandLine.SpinCount()); CBufferAllocator bufferAllocator( bufferSize, maxBuffersInPool); CIOPool ioPool( 0); // number of threads (0 = 2 x processors) ioPool.Start();
CTestDatagrams datagrams(commandLine.Repeats()); for (DWORD i = 0; i < numPackets; ++i) { datagrams.AddDatagram(new CTestDatagram(i, packetSize)); }
id
which we use as a key into the datagram collection for matching the echo response to the original datagram. The rest of the data is then set to a pattern that is different for each datagram in the set. Note that if you send 1 datagram 10000 times you will be sending the same datagram 10000 times, whereas if you send 10000 datagrams once they'll all be different... CTestDatagram::CTestDatagram( const DWORD id, const DataLength messageSize) : m_id(id), m_pData(new BYTE[messageSize]), m_messageSize(messageSize), m_repeatCount(0), m_currentSendIteration(0), m_currentRecvIteration(0) { memcpy(m_pData, &id, sizeof(id)); for (size_t i = sizeof(id); i < m_messageSize; ++i) { if (i % 2) { m_pData[i] = static_cast<BYTE>(i); } else { m_pData[i] = static_cast<BYTE>(id); } } }
CDatagramSocketConnectionManager connector( ioPool, socketAllocator, bufferAllocator, datagrams, commandLine.ReadTimeout()); const CAddressIPv4 address( commandLine.Server(), commandLine.Port()); if (commandLine.PreAllocate()) { socketAllocator.EnlargePool(maxSocketsInPool); bufferAllocator.EnlargePool(maxBuffersInPool); }
CPerformanceCounter counter(CPerformanceCounter::LockThreadToOneCPU); for (int y = 0; y < numRepeats; ++y) { for (int i = 0; i < numPackets && !datagrams.HasFailed(); ++i) { ITestDatagram &datagram = datagrams.GetDatagram(i); connector.SendTo(address, datagram.GetBytes(), datagram.GetSize()); if (delayAfter && ((i + 1) % delayAfter == 0)) { cout << i + 1 << " packets sent if (delayFor) { Sleep(delayFor); } } } }
HANDLE handlesToWaitFor[2]; handlesToWaitFor[0] = shutdownEvent.GetWaitHandle(); handlesToWaitFor[1] = datagrams.GetWaitHandle(); const Milliseconds checkEvery = commandLine.DisplayStatusEvery(); Milliseconds elapsedTime = 0; bool done = false; const Milliseconds dataFlowTimeoutMillis = commandLine.DataFlowTimeout(); while (!done) { const Milliseconds timeout = (dataFlowTimeoutMillis == INFINITE ? checkEvery : min(checkEvery, dataFlowTimeoutMillis - elapsedTime)); elapsedTime += timeout; DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, timeout); if (waitResult == WAIT_TIMEOUT) { if (elapsedTime >= dataFlowTimeoutMillis) { cout << "Waiting for data flow to complete timed out after " << CStringConverter::TtoA(counter.GetElapsedTimeAsString()) << endl; result = 1; done = true; } else { cout << "Waiting for " << datagrams.GetNumberOfRepliesPending() << " packets to be echoed connector.DumpErrors(); } } else if (waitResult == WAIT_OBJECT_0) { done = true; result = 1;
else if (waitResult == WAIT_OBJECT_0 + 1) { const _tstring elapsedTime = counter.GetElapsedTimeAsString(); cout << "Data flow complete in " << CStringConverter::TtoA(elapsedTime) << endl; cout << " Packets sent: " << datagrams.GetNumSent() << endl; cout << "Echoes recieved: " << datagrams.GetNumReceived() << endl; cout << " Average RTT: " << datagrams.GetAverageRTT() << "ms cout << " Max RTT: " << datagrams.GetMaxRTT() << "ms cout << " Min RTT: " << datagrams.GetMinRTT() << "ms done = true; result = datagrams.DataflowSucceeded() ? 0 : 1; connector.DumpErrors(); if (commandLine.DumpRTT()) { datagrams.DumpRTT(); }
OnSendToCompleted()
to start our read timer. Once we've sent a datagram to the server the server has until our read timeout expires to reply. If the server replies but the response datagram gets discarded or lost then the timeout will fire and the test will timeout. This allows you to push a server (and the test client!) to the point where you are sending datagrams too fast for it to respond to before your timeout expires; you can then work out how many datagrams per second your server can reliably deal with under the prevailing networking conditions and how quickly it can respond. We use OnReadCompleted() to validate the inbound datagram responses. We also use various error callbacks to track errors from the framework and shutdown the test on failure.
CDatagramSocketConnectionManager::CDatagramSocketConnectionManager(
IIOPool &pool,
IAllocateDatagramSockets &socketAllocator,
IAllocateBuffers &bufferAllocator,
CTestDatagrams &datagrams,
const Milliseconds readTimeout)
: CFilteringDatagramSocketConnectionManager(*this, pool, socketAllocator, bufferAllocator),
m_timerQueue(CThreadedCallbackTimerQueue::BestForPlatformNoLock),
m_readTimeoutFilter(*this, m_timerQueue),
m_readTimeout(readTimeout),
m_datagrams(datagrams)
{
}
CDatagramSocketConnectionManager::~CDatagramSocketConnectionManager()
{
WaitForShutdownToComplete();
}
bool CDatagramSocketConnectionManager::WaitForShutdownToComplete(
const Milliseconds timeout)
{
const bool shutdownComplete = CFilteringDatagramSocketConnectionManager::WaitForShutdownToComplete(timeout);
if (shutdownComplete)
{
m_timerQueue.WaitForShutdownToComplete();
}
return shutdownComplete;
}
void CDatagramSocketConnectionManager::OnRecvFromCompleted(
IDatagramSocket &socket,
const IAddress &address,
IBuffer &buffer)
{
(void)socket;
(void)address;
//DEBUG_ONLY(Output(_T(" Local Address: ") + CAddressRenderer::AsString(socket.GetLocalAddress(), true)));
//DEBUG_ONLY(Output(_T("Remote Address: ") + CAddressRenderer::AsString(address, true)));
//DEBUG_ONLY(Output(DumpData(buffer.GetMemory(), buffer.GetUsed(), 60)));
m_datagrams.ValidateReceivedData(buffer.GetMemory(), buffer.GetUsed());
}
void CDatagramSocketConnectionManager::OnSendToCompleted(
IDatagramSocket &socket,
const IAddress & /*address*/,
IBuffer & /*buffer*/)
{
if (m_readTimeout != 0)
{
m_readTimeoutFilter.SetReadTimeout(socket, m_readTimeout, *this);
}
socket.RecvFrom();
}
void CDatagramSocketConnectionManager::OnSendToCompletionError(
IDatagramSocket & /*socket*/,
const IAddress & /*address*/,
IBuffer & /*buffer*/,
const DWORD lastError)
{
OnError(_T("OnSendToCompletionError - ") + GetLastErrorMessage(lastError, true));
m_datagrams.Fail();
}
void CDatagramSocketConnectionManager::OnError(
IDatagramSocket & /*socket*/,
const _tstring &message)
{
ICriticalSection::Owner lock(m_criticalSection);
m_errors[message]++;
}
void CDatagramSocketConnectionManager::OnError(
const _tstring &message)
{
ICriticalSection::Owner lock(m_criticalSection);
m_errors[message]++;
}
void CDatagramSocketConnectionManager::OnConnectionClosure(
IDatagramSocket & /*socket*/,
const ConnectionClosureReason /*reason*/)
{
// Suppress the debug message that the server common version gives us.
}
void CDatagramSocketConnectionManager::OnConnectionClosed(
IDatagramSocket & /*socket*/)
{
// Suppress the debug message that the server common version gives us.
}
void CDatagramSocketConnectionManager::OnTimer(
IDatagramSocket & /*socket*/,
CReadTimeoutDatagramSocketConnectionFilter::UserData /*userData*/)
{
OnError(_T("Read timeout"));
m_datagrams.Fail();
}