#include "JetByteTools\Admin\Admin.h" #include "JetByteTools\SocketTools\WinsockWrapper.h" #include "JetByteTools\Win32Tools\Exception.h" #include "JetByteTools\Win32Tools\Utils.h" #include "JetByteTools\Win32Tools\DebugTrace.h" #include "JetByteTools\Win32Tools\SharedCriticalSectionFactory.h" #include "JetByteTools\Win32Tools\StringConverter.h" #include "JetByteTools\Win32Tools\GlobalName.h" #include "JetByteTools\Win32Tools\ManualResetEvent.h" #include "JetByteTools\Win32Tools\SEHException.h" #include "StreamSocketConnectionManager.h" #include "TestMessages.h" #include "TestMessage.h" #include "MessageCollection.h" #pragma hdrstop #include "CommandLine.h" #include "JetByteTools\Win32Tools\PerformanceCounter.h" #include "JetByteTools\Win32Tools\SecurityDescriptorAllowAll.h" #include "JetByteTools\Win32Tools\SecurityAttributes.h" #include "JetByteTools\IOTools\IOPool.h" #include "JetByteTools\IOTools\BufferAllocator.h" #include "JetByteTools\SocketTools\FullAddress.h" #include "JetByteTools\SocketTools\AddressTypeIPv4.h" #include "JetByteTools\SocketTools\SequencedStreamSocketAllocator.h" #include "JetByteTools\SocketTools\SmartStreamSocket.h" #include <iostream> /////////////////////////////////////////////////////////////////////////////// // Using directives /////////////////////////////////////////////////////////////////////////////// using JetByteTools::Milliseconds; using JetByteTools::Win32::CException; using JetByteTools::Core::Output; using JetByteTools::Win32::ToString; using JetByteTools::Win32::ToStringA; using JetByteTools::Win32::_tstring; using JetByteTools::Win32::CSharedCriticalSectionFactory; using JetByteTools::Win32::CStringConverter; using JetByteTools::Win32::CGlobalName; using JetByteTools::Win32::CManualResetEvent; using JetByteTools::Win32::CSecurityDescriptorAllowAll; using JetByteTools::Win32::CSecurityAttributes; using JetByteTools::Win32::CSEHException; using JetByteTools::Win32::CWaitableCounter; using JetByteTools::Win32::CPerformanceCounter; using JetByteTools::Socket::CFullAddress; using JetByteTools::Socket::CAddressTypeIPv4; using JetByteTools::Socket::CAddressType; using JetByteTools::Socket::CSmartStreamSocket; using JetByteTools::Socket::CSequencedStreamSocketAllocator; using JetByteTools::IO::CIOPool; using JetByteTools::IO::CBufferAllocator; using std::cout; using std::endl; using std::cin; /////////////////////////////////////////////////////////////////////////////// // Static helper functions /////////////////////////////////////////////////////////////////////////////// static void BuildMessagesForConnection( const CTestMessage::DataLength messageSize, CTestMessages &messages); /////////////////////////////////////////////////////////////////////////////// // Program entry point /////////////////////////////////////////////////////////////////////////////// int main(int /*argc*/, char * /*argv[ ]*/) { ::SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX | SEM_NOOPENFILEERRORBOX); int result = 2; CSEHException::Translator sehTranslator; try { CCommandLine commandLine; if (commandLine.Parse()) { result = 1; const CCommandLine::Count messageRepeatCount = commandLine.Messages(); const CSequencedStreamSocketAllocator::SocketCount numConnections = commandLine.Connections(); const Milliseconds transferRate = commandLine.MessageRate(); const CBufferAllocator::BufferSize messageSize = commandLine.MessageSize(); const Milliseconds delayAfter = commandLine.ConnectionBatchSize(); const Milliseconds delayFor = commandLine.ConnectionBatchPause(); const bool sendAfterRecv = commandLine.SendAfterRecv(); const bool syncConnect = commandLine.SyncConnect(); const bool holdConnections = commandLine.HoldConnections(); const CSequencedStreamSocketAllocator::SocketCount maxSocketsInPool = numConnections; const CBufferAllocator::BufferCount maxBuffersInPool = numConnections * 2; const CBufferAllocator::BufferSize bufferSize = commandLine.MessageSize(); const bool fragments = commandLine.Fragments(); const bool multipleMessages = commandLine.MultipleMessages(); const Milliseconds dataFlowTimeoutMillis = commandLine.MessageFlowTimeout(); CSharedCriticalSectionFactory lockFactory(47); CSequencedStreamSocketAllocator socketAllocator( lockFactory, maxSocketsInPool); CBufferAllocator bufferAllocator( bufferSize, maxBuffersInPool); CIOPool ioPool( 0); // number of threads (0 = 2 x processors) ioPool.Start(); CStreamSocketConnectionManager connector( ioPool, socketAllocator, bufferAllocator); const CAddressType addressPreference = CAddressTypeIPv4::instance; const CFullAddress address( commandLine.Server(), commandLine.Port(), addressPreference); if (!connector.AsyncConnectAvailable(address.Type()) && !syncConnect) { commandLine.ShowHelp( _T("ConnectEx() is not available on this platform.\n") _T("Use -syncConnect to force synchronous connections")); return 1; } if (commandLine.PreAllocate()) { socketAllocator.EnlargePool(maxSocketsInPool); bufferAllocator.EnlargePool(maxBuffersInPool); } // now loop opening connections and kicking things off... CMessageCollection messages(numConnections); try { // build messages here and pass in to each connection if (messageRepeatCount != 0) { cout << "Building Messages" << endl; } size_t i = 0; const CBufferAllocator::BufferSize skipSignOnMessageBytes = commandLine.SkipSignOnMessageBytes(); for (i = 0; i < numConnections; ++i) { CTestMessages *pMessages = new CTestMessages( i, messageRepeatCount, transferRate, messages, sendAfterRecv, holdConnections, skipSignOnMessageBytes, fragments, multipleMessages); BuildMessagesForConnection(messageSize, *pMessages); messages.Add(pMessages); } cout << "Creating " << numConnections << " connections" << endl; i = 0; CPerformanceCounter counter(CPerformanceCounter::LockThreadToOneCPU); bool ok = true; for (CMessageCollection::Iterator it = messages.Begin(); ok && it != messages.End(); ++it) { CTestMessages *pMessages = *it; if (syncConnect) { CSmartStreamSocket socket = connector.ConnectNoThrow(address, pMessages); ok = (socket.Get() != 0); } else { CSmartStreamSocket socket = connector.AsyncConnectNoThrow(address, pMessages); ok = (socket.Get() != 0); } if (delayAfter && (++i % delayAfter == 0)) { cout << i << " connections created" << endl; if (delayFor) { Sleep(delayFor); } } } if (ok) { cout << "All connections in progress" << endl; } else { SetEvent(messages.GetConnectionsCompleteEvent()); } CSecurityDescriptorAllowAll sd; CSecurityAttributes sa(sd); CManualResetEvent shutdownEvent(CGlobalName(_T("JetByteToolsServerShutdown")), &sa); HANDLE handlesToWaitFor[2]; handlesToWaitFor[0] = shutdownEvent.GetWaitHandle(); handlesToWaitFor[1] = messages.GetConnectionsCompleteEvent(); DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, INFINITE); if (waitResult == WAIT_OBJECT_0) { result = 1; } else if (waitResult == WAIT_OBJECT_0 + 1) { const _tstring elapsedTime = counter.GetElapsedTimeAsString(); cout << "All connections complete in " << CStringConverter::TtoA(elapsedTime) << endl; cout << messages.ConnectionsEstablished() << " established. " << messages.ConnectionsFailed() << " failed."<< endl; if (messageRepeatCount != 0) { cout << "Waiting for data flow to complete" << endl; handlesToWaitFor[1] = messages.GetMessagesCompleteEvent(); DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, dataFlowTimeoutMillis); if (waitResult == WAIT_TIMEOUT) { const _tstring elapsedTime = counter.GetElapsedTimeAsString(); cout << "Waiting for data flow to complete timed out after " << CStringConverter::TtoA(elapsedTime) << endl; result = 1; } else if (waitResult == WAIT_OBJECT_0) { result = 1; } else if (waitResult == WAIT_OBJECT_0 + 1) { const _tstring elapsedTime = counter.GetElapsedTimeAsString(); cout << "Data flow complete in " << CStringConverter::TtoA(elapsedTime) << endl; result = 0; } else { throw CException(_T("main()"), _T("Unexpected result from WaitForMultipleObjects")); } } else { result = 0; } } else { throw CException(_T("main()"), _T("Unexpected result from WaitForMultipleObjects")); } if (commandLine.HoldConnections() && commandLine.IsInteractive() && 0 != messages.ConnectionsEstablished()) { cout << "Press return to close connections" << endl; char input; cin >> std::noskipws >> input; } } catch(const CException &e) { cout << "Exception: " << CStringConverter::TtoA(e.GetWhere()) << " - " << CStringConverter::TtoA(e.GetWhat()) << endl; result = 1; } catch(...) { cout << "Unexpected exception" << endl; result = 1; } if (messageRepeatCount != 0) { CMessageCollection::StringErrors errors; if (!messages.CheckMessages(errors)) { result = 1; } if (errors.size() != 0) { cout << "Message errors:" << endl; for (CMessageCollection::StringErrors::const_iterator it = errors.begin(); it != errors.end(); ++it) { cout << ToStringA(it->second) << " - " << CStringConverter::TtoA(it->first) << endl; } } } cout << "Wait for connector shutdown..." << endl; if (!connector.WaitForShutdownToComplete(10000)) { cout << "Connector shutdown timedout!" << endl; connector.ForceShutdown(); } cout << "Wait for IO pool shutdown..." << endl; ioPool.WaitForShutdownToComplete(); cout << "Waiting for buffer allocator to flush..." << endl; bufferAllocator.Flush(); cout << "Waiting for socket allocator to flush..." << endl; socketAllocator.ReleaseSockets(); connector.DumpErrors(); cout << "All done..." << endl; } } catch(const CException &e) { cout << "Exception: " << CStringConverter::TtoA(e.GetDetails()) << endl; result = 1; } catch(const CSEHException &e) { cout << "SEH Exception: " << CStringConverter::TtoA(e.GetDetails()) << endl; result = 1; } catch(const char *pMsg) { cout << pMsg << endl; result = 1; } catch(...) { cout << "Unexpected exception" << endl; result = 1; } cout << "Test " << (result == 0 ? "Passed" : "Failed") << endl; return result; } /////////////////////////////////////////////////////////////////////////////// // Static helper functions /////////////////////////////////////////////////////////////////////////////// static void BuildMessagesForConnection( const CTestMessage::DataLength messageSize, CTestMessages &messages) { messages.AddMessage(new CTestMessage(messageSize)); }
EchoServerTest: v6.4 - Copyright (c) 2009 JetByte Limited Usage: EchoServerTest -server xxx.xxx.xxx.xxx -port xxxx Command line parameters: r -server The server address to connect to (dotted ip). r -port The port to connect to. o -connections The number of connections to create. Defaults to 1000 o -connectionBatchSize Batch connections in groups of X size. Defaults to 0 (no batching) o -connectionBatchDelay Delay for Y milliseconds between each batch. Defaults to 0 (no delay) o -messages Number of messages to send on each connection. Defaults to 0 (don't send any data) o -messageSize Size of each message. Defaults to 1024 o -messageRate Delay for Y milliseconds between each message. Defaults to 1000 o -sendAfterRecv Wait for echo of previous message before starting timer for next send (reduce pace to server speed). o -syncConnect Use synchronous connect rather than ConnectEx(). o -hold Hold all connections open until test completes. o -pause Wait for a key press before closing connections. o -preallocate Preallocate sockets and buffers before connecting. o -skipSignOnBytes Ignores the first x bytes that the server sends. This can be used to ignore a welcome message. o -messageFlowTimeout Time in milliseconds that is allowed for the test to complete once all connections are established. Defaults to no timeout, the test doesn't complete until the data flow completes. o -spinCount The spin count used for per socket critical sections. r = required, o = optional ///
-server YYY
and -port XXX
to tell us which server to connect to, the client will begin to construct the objects that it needs in order to run. -server 192.168.0.44
) or an IPv6 address (-server [ffff:ffff:ffff:ffff:ffff]
and the client will connect appropriately. listenBacklog
. Note that in most circumstances it's not especially realistic to issue 1000s of connections at the same time. Batching the connections into more realistic numbers and delaying between the batches tends to give a more real world test. class CStreamSocketConnectionManager : public JetByteTools::Socket::CStreamSocketConnectionManager, private JetByteTools::Win32::IQueueTimers::Timer, private JetByteTools::Socket::CStreamSocketServerExCallback { public : CStreamSocketConnectionManager( JetByteTools::IO::IIOPool &pool, JetByteTools::Socket::IAllocateStreamSockets &socketAllocator, JetByteTools::IO::IAllocateBuffers &bufferAllocator); ~CStreamSocketConnectionManager(); void DumpErrors() const; private : virtual void OnPreOutgoingConnect( JetByteTools::Win32::IIndexedOpaqueUserData &userData, const JetByteTools::Socket::IAddress &address, const void *pUserData); virtual void OnConnectionEstablished( JetByteTools::Socket::IStreamSocket &socket, const JetByteTools::Socket::IAddress &address); virtual void OnOutgoingConnectionFailed( JetByteTools::Socket::IStreamSocket &socket, const JetByteTools::Socket::IAddress &address, const DWORD lastError); virtual void OnSocketReleased( JetByteTools::Win32::IIndexedOpaqueUserData &userData); virtual void OnReadCompleted( JetByteTools::Socket::IStreamSocket &socket, JetByteTools::IO::IBuffer &buffer); virtual void OnConnectionReset( JetByteTools::Socket::IStreamSocket &socket, const DWORD lastError); virtual void OnError( const JetByteTools::Win32::_tstring &message); // Implement IQueueTimers::Timer virtual void OnTimer( UserData userData); JetByteTools::IO::IBuffer *ProcessData( JetByteTools::Socket::IStreamSocket &socket, JetByteTools::IO::IBuffer &buffer); void SendData( JetByteTools::Socket::IStreamSocket &socket); void SetSendTimer( JetByteTools::Socket::IStreamSocket &socket, const CTestMessages &testMessages); CThreadedCallbackTimerQueue m_timerQueue; JetByteTools::Win32::CCriticalSection m_connectionErrorCriticalSection; JetByteTools::Win32::CCriticalSection m_connectionResetErrorCriticalSection; JetByteTools::Win32::CCriticalSection m_errorCriticalSection; const JetByteTools::Win32::IIndexedOpaqueUserData::UserDataIndex m_timerHandleIndex; const JetByteTools::Win32::IIndexedOpaqueUserData::UserDataIndex m_userDataIndex; typedef std::map<JetByteTools::Win32::_tstring, size_t> StringErrors; typedef std::map<DWORD, size_t> LastErrors; LastErrors m_connectionErrors; LastErrors m_connectionResetErrors; StringErrors m_errors; void DumpErrors( const JetByteTools::Win32::_tstring &errorType, const LastErrors &lastErrors) const; /// No copies do not implement CStreamSocketConnectionManager(const CStreamSocketConnectionManager &rhs); /// No copies do not implement CStreamSocketConnectionManager &operator=(const CStreamSocketConnectionManager &rhs); /// };
CStreamSocketConnectionManager::~CStreamSocketConnectionManager() { WaitForShutdownToComplete(); }
void CStreamSocketConnectionManager::OnPreOutgoingConnect( IIndexedOpaqueUserData &userData, const IAddress & /*address*/, const void *pUserData) { CTestMessages *pTestMessages = const_cast<CTestMessages *>(static_cast<const CTestMessages*>(pUserData)); userData.SetUserPointer(m_userDataIndex, pTestMessages); }
/ref JetByteTools::Socket::IStreamSocketConnectionManagerCallback::OnPreOutgoingConnect "OnPreOutgoingConnect()"
is called. This allows us to pass data from the connect call to the user data of the socket that will be managing the connection. See here for more details on how these callbacks are sequenced and when they occur. We simply take the pUserData
that was passed into the connect call in TestMain.cpp
and store it in our per connection user data so that it will be available to other callbacks. void CStreamSocketConnectionManager::OnConnectionEstablished( IStreamSocket &socket, const IAddress & /*address*/) { CTestMessages *pTestMessages = static_cast<CTestMessages*>(socket.GetUserPointer(m_userDataIndex)); pTestMessages->ConnectionComplete(true); const IQueueTimers::Handle timerHandle = m_timerQueue.CreateTimer(); socket.SetUserData(m_timerHandleIndex, timerHandle); socket.Read(); SendData(socket); } void CStreamSocketConnectionManager::OnOutgoingConnectionFailed( IStreamSocket &socket, const IAddress & /*address*/, const DWORD lastError) { { ICriticalSection::Owner lock(m_connectionErrorCriticalSection); m_connectionErrors[lastError]++; } CTestMessages *pTestMessages = static_cast<CTestMessages*>(socket.GetUserPointer(m_userDataIndex)); socket.SetUserData(m_timerHandleIndex, IQueueTimers::InvalidHandleValue); pTestMessages->ConnectionComplete(false); socket.SetUserPointer(m_userDataIndex, 0); }
OnConnectionEstablished()
handler simply extracts the per connection data, marks the connection attempt as completed, creates a timer handle in case we will be using timers to pace the data that we send, issues a read and then starts sending data to the server. OnOutgoingConnectionFailed()
is called and we log the error for this connection, mark the connection as failed and clear the per connection user data. void CStreamSocketConnectionManager::OnSocketReleased( IIndexedOpaqueUserData &userData) { CTestMessages *pTestMessages = static_cast<CTestMessages*>(userData.GetUserPointer(m_userDataIndex)); IQueueTimers::Handle timerHandle = userData.GetUserData(m_timerHandleIndex); if (timerHandle != IQueueTimers::InvalidHandleValue) { m_timerQueue.DestroyTimer(timerHandle); } if (pTestMessages) { pTestMessages->MessagesComplete(false); } }
OnSocketReleased()
will be called. This cleans up any timer handle that may have been being used and marks the message flow as failed; note that inside our CTestMessages
class we only allow the first call to MessagesComplete()
to update our success state. If the message flow completed successfully then MessagesComplete()
will already have been called with true
as a parameter and we will ignore this call. void CStreamSocketConnectionManager::OnReadCompleted( IStreamSocket &socket, IBuffer &buffer) { try { IBuffer *pBuffer = ProcessData(socket, buffer); socket.TryRead(pBuffer); } catch(const CException &e) { OutputEx(_T("ReadCompleted - Exception - ") + e.GetDetails()); socket.Shutdown(); } catch(...) { OutputEx(_T("ReadCompleted - Unexpected exception")); socket.Shutdown(); } } void CStreamSocketConnectionManager::OnConnectionReset( IStreamSocket & /*socket*/, const DWORD lastError) { ICriticalSection::Owner lock(m_connectionResetErrorCriticalSection); m_connectionResetErrors[lastError]++; } void CStreamSocketConnectionManager::OnError( const _tstring &message) { ICriticalSection::Owner lock(m_errorCriticalSection); m_errors[message]++; }
OnReadCompleted()
looks remarkably like the code in the simple server as all of our 'business logic' is in ProcessData()
. The other two callbacks simply log connection errors for us so that we can report them when the client exits.