The C++ framework for developing highly scalable, high performance servers on Windows platforms.

Example Servers - Multi Connection TCP Client

This example shows you how to build a client that can manage multiple connections and send and recieve data from a server and validate it. We use this kind of client to stress test our servers but the basic design could be used to create any client that needs to create multiple connections; such as a web spider, perhaps. Some of the basic structure is very similar to the Basic Echo Server example and you should go and read about that first and have a good understanding of how everything fits together. The server example uses a helper library, ServerCommon, which provides some utility code that many of the example server use. Things such as command line parsing, allocators and server callback implementations that display debug traces to show what they're doing.

This example is shipped with all licensed versions of The Server Framework and it requires the core server framework libraries (see here for licensing options). You can always download the latest version of this example from here; and although you will need the correct libraries to be able to build it you can look at the example code and see how it works and perhaps get ideas from it. A compiled, unicode release, build of this example is available on request if you require it for performance analysis of the framework.

The TestMain.cpp file for this example looks something like the following:

 #include "JetByteTools\Admin\Admin.h"

 #include "JetByteTools\SocketTools\WinsockWrapper.h"

 #include "JetByteTools\Win32Tools\Exception.h"
 #include "JetByteTools\Win32Tools\Utils.h"
 #include "JetByteTools\Win32Tools\DebugTrace.h"
 #include "JetByteTools\Win32Tools\SharedCriticalSectionFactory.h"
 #include "JetByteTools\Win32Tools\StringConverter.h"
 #include "JetByteTools\Win32Tools\GlobalName.h"
 #include "JetByteTools\Win32Tools\ManualResetEvent.h"
 #include "JetByteTools\Win32Tools\SEHException.h"

 #include "StreamSocketConnectionManager.h"
 #include "TestMessages.h"
 #include "TestMessage.h"
 #include "MessageCollection.h"

 #pragma hdrstop

 #include "CommandLine.h"

 #include "JetByteTools\Win32Tools\PerformanceCounter.h"
 #include "JetByteTools\Win32Tools\SecurityDescriptorAllowAll.h"
 #include "JetByteTools\Win32Tools\SecurityAttributes.h"

 #include "JetByteTools\IOTools\IOPool.h"
 #include "JetByteTools\IOTools\BufferAllocator.h"

 #include "JetByteTools\SocketTools\FullAddress.h"
 #include "JetByteTools\SocketTools\AddressTypeIPv4.h"
 #include "JetByteTools\SocketTools\SequencedStreamSocketAllocator.h"
 #include "JetByteTools\SocketTools\SmartStreamSocket.h"

 #include <iostream>

 ///////////////////////////////////////////////////////////////////////////////
 // Using directives
 ///////////////////////////////////////////////////////////////////////////////

 using JetByteTools::Milliseconds;

 using JetByteTools::Win32::CException;
 using JetByteTools::Core::Output;
 using JetByteTools::Win32::ToString;
 using JetByteTools::Win32::ToStringA;
 using JetByteTools::Win32::_tstring;
 using JetByteTools::Win32::CSharedCriticalSectionFactory;
 using JetByteTools::Win32::CStringConverter;
 using JetByteTools::Win32::CGlobalName;
 using JetByteTools::Win32::CManualResetEvent;
 using JetByteTools::Win32::CSecurityDescriptorAllowAll;
 using JetByteTools::Win32::CSecurityAttributes;
 using JetByteTools::Win32::CSEHException;
 using JetByteTools::Win32::CWaitableCounter;
 using JetByteTools::Win32::CPerformanceCounter;

 using JetByteTools::Socket::CFullAddress;
 using JetByteTools::Socket::CAddressTypeIPv4;
 using JetByteTools::Socket::CAddressType;
 using JetByteTools::Socket::CSmartStreamSocket;
 using JetByteTools::Socket::CSequencedStreamSocketAllocator;

 using JetByteTools::IO::CIOPool;
 using JetByteTools::IO::CBufferAllocator;

 using std::cout;
 using std::endl;
 using std::cin;

 ///////////////////////////////////////////////////////////////////////////////
 // Static helper functions
 ///////////////////////////////////////////////////////////////////////////////

 static void BuildMessagesForConnection(
    const CTestMessage::DataLength messageSize,
    CTestMessages &messages);

 ///////////////////////////////////////////////////////////////////////////////
 // Program entry point
 ///////////////////////////////////////////////////////////////////////////////

 int main(int /*argc*/, char * /*argv[ ]*/)
 {
    ::SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX | SEM_NOOPENFILEERRORBOX);

    int result = 2;

    CSEHException::Translator sehTranslator;

    try
    {
       CCommandLine commandLine;

       if (commandLine.Parse())
       {
          result = 1;

          const CCommandLine::Count messageRepeatCount = commandLine.Messages();

          const CSequencedStreamSocketAllocator::SocketCount numConnections = commandLine.Connections();

          const Milliseconds transferRate = commandLine.MessageRate();

          const CBufferAllocator::BufferSize messageSize = commandLine.MessageSize();

          const Milliseconds delayAfter = commandLine.ConnectionBatchSize();

          const Milliseconds delayFor = commandLine.ConnectionBatchPause();

          const bool sendAfterRecv = commandLine.SendAfterRecv();

          const bool syncConnect = commandLine.SyncConnect();

          const bool holdConnections = commandLine.HoldConnections();

          const CSequencedStreamSocketAllocator::SocketCount maxSocketsInPool = numConnections;

          const CBufferAllocator::BufferCount maxBuffersInPool = numConnections * 2;

          const CBufferAllocator::BufferSize bufferSize = commandLine.MessageSize();

          const bool fragments = commandLine.Fragments();

          const bool multipleMessages = commandLine.MultipleMessages();

          const Milliseconds dataFlowTimeoutMillis = commandLine.MessageFlowTimeout();

          CSharedCriticalSectionFactory lockFactory(47);

          CSequencedStreamSocketAllocator socketAllocator(
             lockFactory,
             maxSocketsInPool);

          CBufferAllocator bufferAllocator(
             bufferSize,
             maxBuffersInPool);

          CIOPool ioPool(
             0);                           // number of threads (0 = 2 x processors)

          ioPool.Start();

          CStreamSocketConnectionManager connector(
             ioPool,
             socketAllocator,
             bufferAllocator);

          const CAddressType addressPreference = CAddressTypeIPv4::instance;

          const CFullAddress address(
             commandLine.Server(),
             commandLine.Port(),
             addressPreference);

          if (!connector.AsyncConnectAvailable(address.Type()) && !syncConnect)
          {
             commandLine.ShowHelp(
                _T("ConnectEx() is not available on this platform.\n")
                _T("Use -syncConnect to force synchronous connections"));

             return 1;
          }

          if (commandLine.PreAllocate())
          {
             socketAllocator.EnlargePool(maxSocketsInPool);

             bufferAllocator.EnlargePool(maxBuffersInPool);
          }

          // now loop opening connections and kicking things off...

          CMessageCollection messages(numConnections);

          try
          {
             // build messages here and pass in to each connection

             if (messageRepeatCount != 0)
             {
                cout << "Building Messages" << endl;
             }

             size_t i = 0;

             const CBufferAllocator::BufferSize skipSignOnMessageBytes = commandLine.SkipSignOnMessageBytes();

             for (i = 0; i < numConnections; ++i)
             {
                CTestMessages *pMessages = new CTestMessages(
                   i,
                   messageRepeatCount,
                   transferRate,
                   messages,
                   sendAfterRecv,
                   holdConnections,
                   skipSignOnMessageBytes,
                   fragments,
                   multipleMessages);

                BuildMessagesForConnection(messageSize, *pMessages);

                messages.Add(pMessages);
             }

             cout << "Creating " << numConnections << " connections" << endl;

             i = 0;

             CPerformanceCounter counter(CPerformanceCounter::LockThreadToOneCPU);

             bool ok = true;

             for (CMessageCollection::Iterator it = messages.Begin(); ok && it != messages.End(); ++it)
             {
                CTestMessages *pMessages = *it;

                if (syncConnect)
                {
                   CSmartStreamSocket socket = connector.ConnectNoThrow(address, pMessages);

                   ok = (socket.Get() != 0);
                }
                else
                {
                   CSmartStreamSocket socket = connector.AsyncConnectNoThrow(address, pMessages);

                   ok = (socket.Get() != 0);
                }

                if (delayAfter && (++i % delayAfter == 0))
                {
                   cout << i << " connections created" << endl;

                   if (delayFor)
                   {
                      Sleep(delayFor);
                   }
                }
             }

             if (ok)
             {
                cout << "All connections in progress" << endl;
             }
             else
             {
                SetEvent(messages.GetConnectionsCompleteEvent());
             }
             CSecurityDescriptorAllowAll sd;

             CSecurityAttributes sa(sd);

             CManualResetEvent shutdownEvent(CGlobalName(_T("JetByteToolsServerShutdown")), &sa);

             HANDLE handlesToWaitFor[2];

             handlesToWaitFor[0] = shutdownEvent.GetWaitHandle();
             handlesToWaitFor[1] = messages.GetConnectionsCompleteEvent();

             DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, INFINITE);

             if (waitResult == WAIT_OBJECT_0)
             {
                result = 1;
             }
             else if (waitResult == WAIT_OBJECT_0 + 1)
             {
                const _tstring elapsedTime = counter.GetElapsedTimeAsString();

                cout << "All connections complete in " << CStringConverter::TtoA(elapsedTime) << endl;
                cout << messages.ConnectionsEstablished() << " established. " << messages.ConnectionsFailed() << " failed."<< endl;

                if (messageRepeatCount != 0)
                {
                   cout << "Waiting for data flow to complete" << endl;

                   handlesToWaitFor[1] = messages.GetMessagesCompleteEvent();

                   DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, dataFlowTimeoutMillis);

                   if (waitResult == WAIT_TIMEOUT)
                   {
                      const _tstring elapsedTime = counter.GetElapsedTimeAsString();

                      cout << "Waiting for data flow to complete timed out after " << CStringConverter::TtoA(elapsedTime) << endl;

                      result = 1;
                   }
                   else if (waitResult == WAIT_OBJECT_0)
                   {
                      result = 1;
                   }
                   else if (waitResult == WAIT_OBJECT_0 + 1)
                   {
                      const _tstring elapsedTime = counter.GetElapsedTimeAsString();

                      cout << "Data flow complete in " << CStringConverter::TtoA(elapsedTime) << endl;

                      result = 0;
                   }
                   else
                   {
                      throw CException(_T("main()"), _T("Unexpected result from WaitForMultipleObjects"));
                   }
                }
                else
                {
                   result = 0;
                }
             }
             else
             {
                throw CException(_T("main()"), _T("Unexpected result from WaitForMultipleObjects"));
             }

             if (commandLine.HoldConnections() && commandLine.IsInteractive() && 0 != messages.ConnectionsEstablished())
             {
                cout << "Press return to close connections" << endl;

                char input;

                cin >> std::noskipws >> input;
             }
          }
          catch(const CException &e)
          {
             cout << "Exception: " << CStringConverter::TtoA(e.GetWhere()) << " - " << CStringConverter::TtoA(e.GetWhat()) << endl;

             result = 1;
          }
          catch(...)
          {
             cout << "Unexpected exception" << endl;

             result = 1;
          }

          if (messageRepeatCount != 0)
          {
             CMessageCollection::StringErrors errors;

             if (!messages.CheckMessages(errors))
             {
                result = 1;
             }

             if (errors.size() != 0)
             {
                cout << "Message errors:" << endl;

                for (CMessageCollection::StringErrors::const_iterator it = errors.begin(); it != errors.end(); ++it)
                {
                   cout << ToStringA(it->second) << " - " << CStringConverter::TtoA(it->first) << endl;
                }
             }
          }

          cout << "Wait for connector shutdown..." << endl;

          if (!connector.WaitForShutdownToComplete(10000))
          {
             cout << "Connector shutdown timedout!" << endl;

             connector.ForceShutdown();
          }

          cout << "Wait for IO pool shutdown..." << endl;

          ioPool.WaitForShutdownToComplete();

          cout << "Waiting for buffer allocator to flush..." << endl;

          bufferAllocator.Flush();

          cout << "Waiting for socket allocator to flush..." << endl;

          socketAllocator.ReleaseSockets();

          connector.DumpErrors();

          cout << "All done..." << endl;
       }
    }
    catch(const CException &e)
    {
       cout << "Exception: " << CStringConverter::TtoA(e.GetDetails()) << endl;

       result = 1;
    }
    catch(const CSEHException &e)
    {
       cout << "SEH Exception: " << CStringConverter::TtoA(e.GetDetails()) << endl;

       result = 1;
    }
    catch(const char *pMsg)
    {
       cout << pMsg << endl;

       result = 1;
    }
    catch(...)
    {
       cout << "Unexpected exception" << endl;

       result = 1;
    }

    cout << "Test " << (result == 0 ? "Passed" : "Failed") << endl;

    return result;
 }

 ///////////////////////////////////////////////////////////////////////////////
 // Static helper functions
 ///////////////////////////////////////////////////////////////////////////////

 static void BuildMessagesForConnection(
    const CTestMessage::DataLength messageSize,
    CTestMessages &messages)
 {
    messages.AddMessage(new CTestMessage(messageSize));
 }

We start by setting up a Windows Structured Exception Handling translator which will translate SEH exceptions into C++ exceptions.

Once we're set up to deal with any exceptions, we parse the command line. There are some command line arguments that are required, so running the client without them gives some help:
 EchoServerTest: v6.4 - Copyright (c) 2009 JetByte Limited
 Usage: EchoServerTest -server xxx.xxx.xxx.xxx -port xxxx

 Command line parameters:
  r -server                The server address to connect to (dotted ip).
  r -port                  The port to connect to.
  o -connections           The number of connections to create.
                           Defaults to 1000
  o -connectionBatchSize   Batch connections in groups of X size.
                           Defaults to 0 (no batching)
  o -connectionBatchDelay  Delay for Y milliseconds between each batch.
                           Defaults to 0 (no delay)
  o -messages              Number of messages to send on each connection.
                           Defaults to 0 (don't send any data)
  o -messageSize           Size of each message.
                           Defaults to 1024
  o -messageRate           Delay for Y milliseconds between each message.
                           Defaults to 1000
  o -sendAfterRecv         Wait for echo of previous message before starting
                           timer for next send (reduce pace to server speed).
  o -syncConnect           Use synchronous connect rather than ConnectEx().
  o -hold                  Hold all connections open until test completes.
  o -pause                 Wait for a key press before closing connections.
  o -preallocate           Preallocate sockets and buffers before connecting.
  o -skipSignOnBytes       Ignores the first x bytes that the server sends.
                           This can be used to ignore a welcome message.
  o -messageFlowTimeout    Time in milliseconds that is allowed for the test
                           to complete once all connections are established.
                           Defaults to no timeout, the test doesn't complete
                           until the data flow completes.
  o -spinCount             The spin count used for per socket critical sections.

  r = required, o = optional
 ///

Assuming we are passed the correct arguments, at least -server YYY and -port XXX to tell us which server to connect to, the client will begin to construct the objects that it needs in order to run.

Unlike the simple echo server, we don't bother with a instance of a lock factory class, each of our sockets will have their own critical section. We talk about how and why you might tune lock usage here and in general using 'shared locks' should work fine and uses fewer resources so it's a good default choice, but here we use unique locks per connection. We specify this decision to the framework by creating a a socket allocator which doesn't take a lock factory as a parameter. Next we create our buffer allocator and our I/O pooL.

Once these objects are ready we create our custom CStreamSocketConnectionManager. This class is the one that contains our data sending and connection management logic and is the heart of the client.

Next we pass the server connection details that we've been provided to an instance of JetByteTools::Socket::CFullAddress. This allows our server to be address family agnostic. You can pass either an IPv4 address (-server 192.168.0.44) or an IPv6 address (-server [ffff:ffff:ffff:ffff:ffff] and the client will connect appropriately.

The client will default to using asynchronous connection establishment if it's supported by the underlying operating system and the user hasn't specifically restricted its use with a command line switch.

If we have been requested to preallocate our sockets and buffers we do so now. Doing this can improve performance slightly as we can create our sockets and buffers before they're needed and then simply remove them from the pools when we need them.

We're now ready to construct the messages that we're going to send. This particular test client is optimised to send the same message multiple times and to reduce the memory used in doing so. To this end we only create a single message and then 'repeat' it. This means that we only need one message in memory and we can send 100,000 copies if we wish. You may prefer to create separate messages and not repeat them, if, for example, you need to send a sequence of messages.

Once the messages are created we can begin to create connections to the server. We loop and issue the connect requests in batches with a delay between them. This allows us to simulate batches of connections occurring very close together. The number of concurrent connections that this client can manage to a particular server will be limited by the server's listenBacklog. Note that in most circumstances it's not especially realistic to issue 1000s of connections at the same time. Batching the connections into more realistic numbers and delaying between the batches tends to give a more real world test.

Once all of the connections are in progress the client waits for the connections to complete and then waits for the data flow to complete. At this point the client is running. Since everything that the client does happens on its own I thread pool the main application thread is no longer required and can simply sit around and wait until it's time to shut the client down. We do that by waiting for the data flow to complete. Alternatively you can shut the client down usingh the ServerShutdown utility that ships as part of these examples.

Once the client is ready to shut down we simply tell it to do so and then wait for it to finish and clean up the resources used.

The CStreamSocketConnectionManager class provides a link between the framework and the socket server callbacks that you will implement to act on various network events that happen during the lifetime of the connections to from your client. The class definition looks like this:

 class CStreamSocketConnectionManager :
    public JetByteTools::Socket::CStreamSocketConnectionManager,
    private JetByteTools::Win32::IQueueTimers::Timer,
    private JetByteTools::Socket::CStreamSocketServerExCallback
 {
    public :

       CStreamSocketConnectionManager(
          JetByteTools::IO::IIOPool &pool,
          JetByteTools::Socket::IAllocateStreamSockets &socketAllocator,
          JetByteTools::IO::IAllocateBuffers &bufferAllocator);

       ~CStreamSocketConnectionManager();

       void DumpErrors() const;

    private :

       virtual void OnPreOutgoingConnect(
          JetByteTools::Win32::IIndexedOpaqueUserData &userData,
          const JetByteTools::Socket::IAddress &address,
          const void *pUserData);

       virtual void OnConnectionEstablished(
          JetByteTools::Socket::IStreamSocket &socket,
          const JetByteTools::Socket::IAddress &address);

       virtual void OnOutgoingConnectionFailed(
          JetByteTools::Socket::IStreamSocket &socket,
          const JetByteTools::Socket::IAddress &address,
          const DWORD lastError);

       virtual void OnSocketReleased(
          JetByteTools::Win32::IIndexedOpaqueUserData &userData);

       virtual void OnReadCompleted(
          JetByteTools::Socket::IStreamSocket &socket,
          JetByteTools::IO::IBuffer &buffer);

       virtual void OnConnectionReset(
           JetByteTools::Socket::IStreamSocket &socket,
           const DWORD lastError);

       virtual void OnError(
          const JetByteTools::Win32::_tstring &message);

       // Implement IQueueTimers::Timer

       virtual void OnTimer(
          UserData userData);

       JetByteTools::IO::IBuffer *ProcessData(
          JetByteTools::Socket::IStreamSocket &socket,
          JetByteTools::IO::IBuffer &buffer);

       void SendData(
          JetByteTools::Socket::IStreamSocket &socket);

       void SetSendTimer(
          JetByteTools::Socket::IStreamSocket &socket,
          const CTestMessages &testMessages);

       CThreadedCallbackTimerQueue m_timerQueue;

       JetByteTools::Win32::CCriticalSection m_connectionErrorCriticalSection;
       JetByteTools::Win32::CCriticalSection m_connectionResetErrorCriticalSection;
       JetByteTools::Win32::CCriticalSection m_errorCriticalSection;

       const JetByteTools::Win32::IIndexedOpaqueUserData::UserDataIndex m_timerHandleIndex;

       const JetByteTools::Win32::IIndexedOpaqueUserData::UserDataIndex m_userDataIndex;

       typedef std::map<JetByteTools::Win32::_tstring, size_t> StringErrors;

       typedef std::map<DWORD, size_t> LastErrors;

       LastErrors m_connectionErrors;
       LastErrors m_connectionResetErrors;

       StringErrors m_errors;

       void DumpErrors(
          const JetByteTools::Win32::_tstring &errorType,
          const LastErrors &lastErrors) const;

       /// No copies do not implement
       CStreamSocketConnectionManager(const CStreamSocketConnectionManager &rhs);
       /// No copies do not implement
       CStreamSocketConnectionManager &operator=(const CStreamSocketConnectionManager &rhs);
 /// };

This is more complex than the simple echo server since this client records information on connection errors and sends data at various user configurable rates (which uses per connection timers).

As with the server class shown in the simple echo server example, it's vitally important that we have a destructor and it's vitally important that the destructor calls the WaitForShutdownToComplete() method of the base class. So, ideally, the destructor will look something like this.

 CStreamSocketConnectionManager::~CStreamSocketConnectionManager()
 {
    WaitForShutdownToComplete();
 }


The reason for this is that we need to be sure that we have shut down the connection manager before our destruction begins, the reason for this is that we have supplied a reference to ourself as the callback interface to the base class and if we didn't wait for the base class to complete its shutdown then we (including our callback interface implementation) would be destroyed before our server base class and this would be bad... This problem is not something that manifests in servers that are built without deriving from the server base class, see the callback echo server example for details.

We implement several callback methods from JetByteTools::Socket::IStreamSocketConnectionManagerCallback but not all of them, so we inherit from CStreamSocketServerExCallback which lives in the ServerCommon library and which inherits from JetByteTools::Socket::CStreamSocketServerExCallback and provides some standard debug trace output for other callback methods. Note that CStreamSocketServerExCallback implements ALL of the callbacks that servers OR clients (which are merely subsets of servers) can generate. We also inherit from JetByteTools::Socket::CStreamSocketConnectionManager but this is entirely optional and we only do this as a convenience, see EchoServerCallback for an example server which does not inherit from a server base class (the same logic can be applied to connection manager base classes).

 void CStreamSocketConnectionManager::OnPreOutgoingConnect(
    IIndexedOpaqueUserData &userData,
    const IAddress & /*address*/,
    const void *pUserData)
 {
    CTestMessages *pTestMessages = const_cast<CTestMessages *>(static_cast<const CTestMessages*>(pUserData));

    userData.SetUserPointer(m_userDataIndex, pTestMessages);
 }

Before an outgoing connection is established /ref JetByteTools::Socket::IStreamSocketConnectionManagerCallback::OnPreOutgoingConnect "OnPreOutgoingConnect()" is called. This allows us to pass data from the connect call to the user data of the socket that will be managing the connection. See here for more details on how these callbacks are sequenced and when they occur. We simply take the pUserData that was passed into the connect call in TestMain.cpp and store it in our per connection user data so that it will be available to other callbacks.

Depending on the success of the connection establishment attempt one of the following callbacks will be called.

 void CStreamSocketConnectionManager::OnConnectionEstablished(
    IStreamSocket &socket,
    const IAddress & /*address*/)
 {
    CTestMessages *pTestMessages = static_cast<CTestMessages*>(socket.GetUserPointer(m_userDataIndex));

    pTestMessages->ConnectionComplete(true);

    const IQueueTimers::Handle timerHandle = m_timerQueue.CreateTimer();

    socket.SetUserData(m_timerHandleIndex, timerHandle);

    socket.Read();

    SendData(socket);
 }

 void CStreamSocketConnectionManager::OnOutgoingConnectionFailed(
    IStreamSocket &socket,
    const IAddress & /*address*/,
    const DWORD lastError)
 {
    {
       ICriticalSection::Owner lock(m_connectionErrorCriticalSection);

       m_connectionErrors[lastError]++;
    }

    CTestMessages *pTestMessages = static_cast<CTestMessages*>(socket.GetUserPointer(m_userDataIndex));

    socket.SetUserData(m_timerHandleIndex, IQueueTimers::InvalidHandleValue);

    pTestMessages->ConnectionComplete(false);

    socket.SetUserPointer(m_userDataIndex, 0);
 }

Our OnConnectionEstablished() handler simply extracts the per connection data, marks the connection attempt as completed, creates a timer handle in case we will be using timers to pace the data that we send, issues a read and then starts sending data to the server.

If the connection fails then OnOutgoingConnectionFailed() is called and we log the error for this connection, mark the connection as failed and clear the per connection user data.

 void CStreamSocketConnectionManager::OnSocketReleased(
    IIndexedOpaqueUserData &userData)
 {
    CTestMessages *pTestMessages = static_cast<CTestMessages*>(userData.GetUserPointer(m_userDataIndex));

    IQueueTimers::Handle timerHandle = userData.GetUserData(m_timerHandleIndex);

    if (timerHandle != IQueueTimers::InvalidHandleValue)
    {
       m_timerQueue.DestroyTimer(timerHandle);
    }

    if (pTestMessages)
    {
       pTestMessages->MessagesComplete(false);
    }
 }

When the connection finaly completes, either due to a failure to connect or successful completion of the required data flow OnSocketReleased() will be called. This cleans up any timer handle that may have been being used and marks the message flow as failed; note that inside our CTestMessages class we only allow the first call to MessagesComplete() to update our success state. If the message flow completed successfully then MessagesComplete() will already have been called with true as a parameter and we will ignore this call.

 void CStreamSocketConnectionManager::OnReadCompleted(
    IStreamSocket &socket,
    IBuffer &buffer)
 {
    try
    {
       IBuffer *pBuffer = ProcessData(socket, buffer);

       socket.TryRead(pBuffer);
    }
    catch(const CException &e)
    {
       OutputEx(_T("ReadCompleted - Exception - ") + e.GetDetails());
       socket.Shutdown();
    }
    catch(...)
    {
       OutputEx(_T("ReadCompleted - Unexpected exception"));
       socket.Shutdown();
    }
 }

 void CStreamSocketConnectionManager::OnConnectionReset(
    IStreamSocket & /*socket*/,
    const DWORD lastError)
 {
    ICriticalSection::Owner lock(m_connectionResetErrorCriticalSection);

    m_connectionResetErrors[lastError]++;
 }

 void CStreamSocketConnectionManager::OnError(
    const _tstring &message)
 {
    ICriticalSection::Owner lock(m_errorCriticalSection);

    m_errors[message]++;
 }

Our OnReadCompleted() looks remarkably like the code in the simple server as all of our 'business logic' is in ProcessData(). The other two callbacks simply log connection errors for us so that we can report them when the client exits.

The rest of the code deals with sending the data and validating the response messages.

Generated on Sun Sep 12 19:06:45 2021 for The Server Framework - v7.4 by doxygen 1.5.3