ServerMain.cpp
file that puts together the objects required to run the server and configures them, and SocketServer.h
and SocketServer.cpp
files that provide a link between the framework and the socket server callbacks that you will implement to act on various network events that happen during the lifetime of the connections to your server. We'll start by looking at the SocketServer.h
file. For the simplest EchoServer this file could look something like this:
#include "JetByteTools\SocketTools\StreamSocketServer.h" #include "JetByteTools\SocketTools\StreamSocketServerExCallback.h" #include <string> class CSocketServer : public JetByteTools::Socket::CStreamSocketServer, private JetByteTools::Socket::CStreamSocketServerExCallback { public : CSocketServer( const std::string &welcomeMessage, const JetByteTools::Socket::IFullAddress &address, const JetByteTools::Socket::ListenBacklog listenBacklog, JetByteTools::IO::IIOPool &pool, JetByteTools::Socket::IAllocateStreamSockets &socketAllocator, JetByteTools::IO::IAllocateBuffers &bufferAllocator, JetByteTools::Socket::ILimitConnections &connectionLimiter = JetByteTools::Socket::CConnectionLimiter::NoLimitLimiter); ~CSocketServer(); private : // Implement just the bits of IStreamSocketServerCallback that we need virtual void OnConnectionEstablished( JetByteTools::Socket::IStreamSocket &socket, const JetByteTools::Socket::IAddress &address); virtual void OnReadCompleted( JetByteTools::Socket::IStreamSocket &socket, JetByteTools::IO::IBuffer &buffer); // Our business logic void EchoMessage( JetByteTools::Socket::IStreamSocket &socket, JetByteTools::IO::IBuffer &buffer) const; const std::string m_welcomeMessage; /// No copies do not implement CSocketServer(const CSocketServer &rhs); /// No copies do not implement CSocketServer &operator=(const CSocketServer &rhs); };
CSocketServer
class derives from the framework's basic TCP server class but it doesn't actually need to except for the convenience of being able to manipulate a single object as your server. The important base class is the CStreamSocketServerExCallback
as this allows you to override just the callback functions that you're interested in dealing with rather than having to provide a default implementation for the whole of IStreamSocketServerCallback
. We derive from CStreamSocketServerExCallback
rather than a CStreamSocketServerCallback
class as there's only one 'do nothing' implementation of the stream socket callback interfaces, it derives from the most specific interface, IStreamSocketServerExCallback
which, as you'll see, derives from IStreamSocketServerCallback
, etc. CSocketServer( const std::string &welcomeMessage, const JetByteTools::Socket::IFullAddress &address, const ListenBacklog listenBacklog, JetByteTools::IO::IIOPool &pool, JetByteTools::Socket::IAllocateStreamSockets &socketAllocator, JetByteTools::IO::IAllocateBuffers &bufferAllocator, JetByteTools::Socket::ILimitConnections &connectionLimiter = JetByteTools::Socket::CConnectionLimiter::NoLimitLimiter);
std::string
which is simply part of our simple server's "business logic", we'll ignore that for now. Next comes an instance of IFullAddress
which is the address that we'll be listening on. There are several concrete addressing classes that we can select from, but for now we'll assume that we're passing in an instance of CAddressIPv4
which is a TCP/IPv4 address. Notice that we could simply pass in an instance of CAddressIPv6
if we wished our server to listen on a TCP/IPv6 address or CFullAddress
if we didn't know what kind of address we'd be using (CFullAddress can construct itself from string representations of addresses and can represent any valid address type given a valid construction string). Next comes the listenBacklog
which is the maximum length of the queue of pending connections. If set to SOMAXCONN
, the underlying service provider responsible for the server's listening socket will set the backlog to a maximum reasonable value. Something small usually works well for most simple servers, and you can increase it if you find that your server is refusing connections. Note that this isn't the number of connections that your server can handle, it's just the number of connections that are queing to be accepted by the server, the server accepts connections very quickly and so this queue of pending connections can usually be quite small. Next comes an instance of IIOPool
this is where all of the multi-threading is done. The reason that the pool is passed in rather than part of the server itself is that multiple servers can share the same pool, and, indeed, the pool can be shared with other code that performs asynchronous I/O if required. Next is an instance of IAllocateStreamSockets
, our socket allocator. This can often pool sockets for later reuse so that once the server is running with the 'normal' number of clients connecting and disconnecting there's no need to allocate memory. Likewise an instance of IAllocateBuffers
does the same for our data buffers. Finally there's an optional instance of ILimitConnections
. This is a very important part of a high availability server that needs (or could) service many thousands of concurrent connections. The connection limiter can protect the machine that the server runs on from running out of essential resources, the result of which is often a Blue Screen Of Death, see Limiting Resource Usage for more details... CSocketServer::CSocketServer( const string &welcomeMessage, const IFullAddress &address, const ListenBacklog listenBacklog, IIOPool &pool, IAllocateStreamSockets &socketAllocator, IAllocateBuffers &bufferAllocator, ILimitConnections &connectionLimiter) : CStreamSocketServer(address, listenBacklog, *this, pool, socketAllocator, bufferAllocator, NoZeroByteRead, connectionLimiter), m_welcomeMessage(welcomeMessage) { }
OnConnectionEstablished()
and OnReadCompleted()
. OnConnectionEstablished()
is called, not surprisingly, when a new connection is established to your server. void CSocketServer::OnConnectionEstablished( IStreamSocket &socket, const IAddress & /*address*/) { Output(_T("OnConnectionEstablished")); if (socket.TryWrite(m_welcomeMessage.c_str(), GetStringLength<DWORD>(m_welcomeMessage))) { socket.TryRead(); } }
void CSocketServer::OnReadCompleted( IStreamSocket &socket, IBuffer &buffer) { try { EchoMessage(socket, buffer); socket.Read(); } catch(const CException &e) { Output(_T("ReadCompleted - Exception - ") + e.GetDetails()); socket.AbortConnection(); } catch(...) { Output(_T("ReadCompleted - Unexpected exception")); socket.AbortConnection(); } }
OnReadCompleted()
handler is called and we are given a buffer which contains the bytes that were read from the TCP stream. Now, remember, TCP connections are an unstructured stream of bytes, so this buffer may contain one or one hundred bytes, it doesn't matter that the client at the other end sent a "packet" of exactly 100 bytes in a single call to send, we can recieve any number of those bytes as the result of our read completing. We may, or may not, get the rest of the bytes later on; we probably will and, when testing on a LAN in your office you're unlikely to see too much packet fragmentation, but, you have to assume that every lump of data that is sent to you will arrive one byte at a time, each as the result of a separate call to OnReadCompleted()
(in fact, there's scope for someone to write a TCP stream filter that can be used during development and that ensures that you get fragmented packets when your reads complete...) EchoMessage()
function and, well, this is what it does with them: void CSocketServer::EchoMessage( IStreamSocket &socket, IBuffer &buffer) const { DEBUG_ONLY(Output(_T("Data Echoed -\r\n") + DumpData(buffer.GetMemory(), buffer.GetUsed(), 60, true))); socket.Write(buffer); }
Read()
and the call to Write()
could fail and throw exceptions, possibly due to the connection being closed by the client or due to a network problem. If we wanted to be able to cleanly deal with these failures then we might choose to use TryRead()
and TryWrite()
instead and deal with the failures directly. ServerMain.cpp
for this server might look like this: int main(int /*argc*/, char * /*argv[ ]*/) { try { CIOPool pool( 0); // Number of threads, 0 = 2 x number of CPU pool.Start(); CStreamSocketAllocator socketAllocator( 10); // Number of sockets kept in the pool CBufferAllocator bufferAllocator( 1024, // Size of the data buffers 10); // Number of buffers kept in the pool const CAddressIPv4 address( INADDR_ANY, // Accept connections on all interfaces 5050); // Accept connections on port 5050 const ListenBacklog listenBacklog = 5; CConnectionLimiter connectionLimiter( 1000); // Allow, at most, this many connections CSocketServer server( "Welcome to echo server\r\n", address, listenBacklog, pool, socketAllocator, bufferAllocator, connectionLimiter);
server.Start(); server.StartAcceptingConnections();
CManualResetEvent shutdownEvent(CGlobalName(_T("JetByteToolsServerShutdown"))); CManualResetEvent pauseResumeEvent(CGlobalName(_T("JetByteToolsServerPauseResume"))); HANDLE handlesToWaitFor[2]; handlesToWaitFor[0] = shutdownEvent.GetWaitHandle(); handlesToWaitFor[1] = pauseResumeEvent.GetWaitHandle(); bool accepting = true; bool done = false; while (!done) { DWORD waitResult = ::WaitForMultipleObjects(2, handlesToWaitFor, false, INFINITE); if (waitResult == WAIT_OBJECT_0) { done = true; } else if (waitResult == WAIT_OBJECT_0 + 1) { if (accepting) { server.StopAcceptingConnections(); } else { server.StartAcceptingConnections(); } accepting = !accepting; } else { throw CException( _T("CSimpleServerShutdownHandler::WaitForShutdownRequest()"), _T("Unexpected result from WaitForMultipleObjects - ") + ToString(waitResult)); } }
server.WaitForShutdownToComplete(); pool.WaitForShutdownToComplete();
bufferAllocator.Flush(); socketAllocator.ReleaseSockets();
#include "JetByteTools\Admin\Admin.h" #include "JetByteTools\SocketTools\WinsockWrapper.h" #include "JetByteTools\Win32Tools\Exception.h" #include "JetByteTools\Win32Tools\Utils.h" #include "JetByteTools\Win32Tools\ManualResetEvent.h" #include "JetByteTools\Win32Tools\GlobalName.h" #include "SocketServer.h" #include "JetByteTools\IOTools\IOPool.h" #include "JetByteTools\IOTools\BufferAllocator.h" #include "JetByteTools\SocketTools\AddressIPv4.h" #include "JetByteTools\SocketTools\StreamSocketAllocator.h" #include "ServerCommon\SimpleServerShutdownHandler.h" #pragma hdrstop /////////////////////////////////////////////////////////////////////////////// // Using directives /////////////////////////////////////////////////////////////////////////////// using JetByteTools::Win32::_tstring; using JetByteTools::Win32::CException; using JetByteTools::Core::Output; using JetByteTools::IO::CIOPool; using JetByteTools::IO::CBufferAllocator; using JetByteTools::Socket::CAddressIPv4; using JetByteTools::Socket::CStreamSocketAllocator; using JetByteTools::Socket::CConnectionLimiter; /////////////////////////////////////////////////////////////////////////////// // Program entry point /////////////////////////////////////////////////////////////////////////////// int main(int /*argc*/, char * /*argv[ ]*/) { try { CIOPool pool( 0); // Number of threads, 0 = 2 x number of CPU pool.Start(); CStreamSocketAllocator socketAllocator( 10); // Number of sockets kept in the pool CBufferAllocator bufferAllocator( 1024, // Size of the data buffers 10); // Number of buffers kept in the pool const CAddressIPv4 address( INADDR_ANY, // Accept connections on all interfaces 5050); // Accept connections on port 5050 const ListenBacklog listenBacklog = 5; CConnectionLimiter connectionLimiter( 1000); // Allow, at most, this many connections CSocketServer server( "Welcome to echo server\r\n", address, listenBacklog, pool, socketAllocator, bufferAllocator, connectionLimiter); server.Start(); server.StartAcceptingConnections(); CSimpleServerShutdownHandler shutdownHandler(server); shutdownHandler.WaitForShutdownRequest(); server.WaitForShutdownToComplete(); pool.WaitForShutdownToComplete(); bufferAllocator.Flush(); socketAllocator.ReleaseSockets(); } catch(const CException &e) { Output(_T("Exception: ") + e.GetDetails()); } catch(...) { Output(_T("Unexpected exception")); } return 0; }