This article was originally published in 2002 and it refers to designs and code that form part of The Free Framework. The design and code of The Server Framework has moved on considerably since this article was written. Please browse the documentation and the rest of this site to learn more about The Server Framework.
Overview
In the previous article we designed a reusable socket server class to make writing high performance socket based servers easy. We presented a series of simple examples, from the humble echo server through to some slightly more real-world packet echo server and a fake POP3 server. This article continues to make the example server more usable in the real-world by adding a business logic thread pool to the server so that messages are processed by a thread that isn’t part of the IO thread pool. This helps to maintain the scalability and performance of the server by moving potentially blocking work off into its own thread pool.
Why do we need another thread pool
To be able to handle variable load it’s often useful to have a thread pool that can be expanded and contracted depending on the current load on the server. As we pointed out in the last article, all of our asynchronous socket IO is handled by the socket server’s thread pool. The threads in this pool cannot be terminated whilst they have outstanding IO operations or the operations will be terminated. This means that the socket server’s thread pool cannot shrink without us keeping track of the IO operations associated with a particular worker thread and only allowing the thread to terminate when all IO operations have completed. To maintain performance we need to make sure that the threads in the socket server’s thread pool do not block, there are a finite number of them and if they all block then no socket IO will occur until they unblock. The easiest way to ensure that the IO threads don’t block is to move the business logic processing out of the IO thread pool and into a new thread pool. The IO threads then simply handle the IO, chunk the data stream into messages and pass the messages off to the business logic thread pool.
A business logic thread pool
Our requirements for the business logic thread pool are that it should be flexible and capable of increasing and decreasing the number of worker threads as the load on the server dictates. Passing work items into the thread pool should be a non blocking operation so that the IO threads can operate at maximum efficiency but we need to be able to know when a work item hasnt been picked up by a thread within a certain time period so that we can add more threads to the pool. We also need to keep a track off the number of idle threads that we have and, every so often, reduce the number of threads in the pool to conserve resources in times of low server loading.
As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to worker threads. To be able to monitor how long a work item takes to be processed and therefore be able to work out when we need to add more threads to the pool we use an event. When we dispatch a work item to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks up a work item from the completion port the first thing that it does is signal the event. If all threads are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this case we may wish to add another thread to the pool to deal with the work load. The dispatch code could look something like this:
void CThreadPool::HandleDispatch(
ULONG_PTR completionKey,
DWORD dwNumBytes,
OVERLAPPED *pOverlapped)
{
m_dispatchCompleteEvent.Reset();
bool processed = false;
m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
// wait for someone to toggle the 'got message' event?
bool threadStarted = false;
while (!processed)
{
DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis);
if (result == WAIT_OBJECT_0)
{
processed = true;
}
else if (result == WAIT_TIMEOUT)
{
if (!threadStarted &&
m_processingThreads == m_activeThreads &&
(size_t)m_activeThreads < m_maxThreads)
{
StartWorkerThread();
threadStarted = true;
}
}
else
{
throw CWin32Exception(_T("CThreadPool::Dispatch()"), GetLastError());
}
}
}
Whilst there are threads available to process the work items we don’t need to start new threads. As soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we’re not already running with the maximum number of threads that we’ve been configured for, we start a new thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the timeout when we’re already running at our maximum number of threads. The dispatcher needs to know how many threads we have in the pool and how many of those threads are processing so each worker thread calls back to the thread pool to let the pool know what state it’s in.
The problem with this piece of work item dispatch code is that it doesn’t fulfill our requirement of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add another level of indirection, and another IO Completion Port.
Non-blocking dispatch
To ensure that users wishing to dispatch a work item to the thread pool can do so without blocking we implement the user level dispatch function as follows:
void CThreadPool::Dispatch(
ULONG_PTR completionKey,
DWORD dwNumBytes /*= 0*/,
OVERLAPPED *pOverlapped /*= 0*/)
{
if (completionKey == 0)
{
throw CException(
_T("CThreadPool::Dispatch()"),
_T("0 is an invalid value for completionKey"));
}
m_dispatchPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
}
The restriction on 0 valued completion keys is unfortunate but allows us to shut down the thread pool’s dispatch thread by posting a 0 to its completion
port. The thread pool now has two IO Completion Ports. The dispatch port is serviced by a single maintenance thread which executes the HandleDispatch()
method to dispatch work items to the worker threads. Users dispatch without blocking and the maintenance thread dispatches in a blocking manner so that it
can expand the thread pool when it needs to. The work item port is serviced by a variable number of threads. We’ve seen how we know when we need to expand
the number of threads, now we’ll look at how we reduce the number of threads when the work load is low.
Shutting down dormant threads
Often work items come in batches, the thread pool gets busy, expands, services all of the work items and then becomes less busy. At this point the pool contains threads which aren’t being used but which are still consuming resources. These dormant threads can be safely shutdown as the pool can expand again as load increases. The question is, how do we decide when to shut down some threads?
The maintenance thread that handles our blocking dispatch also handles checking for dormant threads. Every so often (a configurable amount) the maintenance thread uses an algorithm to determine if it should shut some threads down. The current algorithm is as follows:
void CThreadPool::HandleDormantThreads()
{
if ((size_t)m_activeThreads > m_minThreads)
{
const size_t dormantThreads = m_activeThreads - m_processingThreads;
if (dormantThreads > m_maxDormantThreads)
{
const size_t threadsToShutdown = (dormantThreads - m_maxDormantThreads) / 2 + 1;
StopWorkerThreads(threadsToShutdown);
}
}
}
If we have more threads than the minimum number we’re allowed to have, find out how many threads aren’t currently processing work items and if that number is more than the number of dormant threads that we’re allowed to have, shut half of them down (rounding up). Stopping worker threads is a simple case of posting an IO completion key of 0 to the work port for each worker thread that we want to shut down.
Doing the work
We now have a thread pool that fulfills our requirements of automatic expansion and contraction depending upon load and non blocking dispatch for users. The remaining thing to do is allow the derived class to provide its own WorkerThread class to do the work. The worker thread class must implement the following interface:
virtual bool Initialise();
virtual void Process(
ULONG_PTR completionKey,
DWORD dwNumBytes,
OVERLAPPED *pOverlapped) = 0;
virtual void Shutdown();
Initialise()
is called when it’s first created, Shutdown()
is called when the thread is terminating and Process()
is called for each work item.
A socket server with a business logic thread pool
Now that we have a suitable thread pool we can integrate it with our fake POP3 socket server so that the actual processing of commands can occur in the business logic pool whilst the IO pool is left to get on with the IO operations. We can also move socket closure off to the business logic pool so that we don’t block the IO threads with a lingering socket close.
The first thing we need to do is create and configure our thread pool. Then we can pass a reference to it to our socket server class so that it can pass a reference to it to our IO threads.
CThreadPool pool(
5, // initial number of threads to create
5, // minimum number of threads to keep in the pool
10, // maximum number of threads in the pool
5, // maximum number of "dormant" threads
5000, // pool maintenance period (millis)
100, // dispatch timeout (millis)
10000); // dispatch timeout for when pool is at max threads
pool.Start();
CSocketServer server(
INADDR_ANY, // address to listen on
5001, // port to listen on
10, // max number of sockets to keep in the pool
10, // max number of buffers to keep in the pool
1024, // buffer size
pool);
server.Start();
When our socket server has a complete, distinct, message to process it can dispatch it to the thread pool for processing, rather than processing it on one of its IO threads.
void CSocketServer::ProcessCommand(
CSocketServer::Socket *pSocket,
CIOBuffer *pBuffer)
{
pSocket->AddRef();
pBuffer->AddRef();
m_pool.Dispatch(
reinterpret_cast<ulong_ptr>(pSocket),
0,
pBuffer->GetAsOverlapped());
}
Since we’re passing the socket and IO buffer to another thread we have to increment their reference counts so that they dont get cleared up from underneath us. Over in our business logic thread we can finally process the message, and then release the references we took on the socket and IO buffer.
void CThreadPoolWorkerThread::Process(
ULONG_PTR completionKey,
DWORD operation,
OVERLAPPED *pOverlapped)
{
Socket *pSocket = reinterpret_cast<Socket *>(completionKey);
CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped);
ProcessMessage(pSocket, pBuffer);
pSocket->Release();
pBuffer->Release();
}
Since the socket class marshals all IO requests back to the IO thread pool we can safely make read and write requests from our business logic thread even though the thread may be terminated before the IO requests completes.
Maintaining per-connection state
The final thing that our server may need to do is associate some internal server state with a particular socket connection, the Socket
class makes this
particularly easy as it provides the following member functions:
void *GetUserPtr() const;
void SetUserPtr(void *pData);
unsigned long GetUserData() const;
void SetUserData(unsigned long data);
These provide access to a single void *
user data pointer which is stored in the Socket
. The common usage pattern for this user data is as
follows: When the connection is established the socket server is notified by OnConnectionEstablished()
, the server can allocate a new per-
connection data structure and associate it with the socket passed to OnConnectionEstablished()
by calling SetUserPtr()
in
subsequent read and write completions the pointer to the per-connection user data structure can be extracted with GetUserPtr()
. When the
connection is terminated the server is notified by OnConnectionClosed()
and the per-connection user data can be retrieved and deleted.
Although there are two versions of the user data access functions, one for a void *
and one for an unsigned long
there is only
a single storage location. The two versions are merely for convenience and to reduce casting if the user data required is simply an index into an internal
server structure rather than a pointer.
The example server marshals the OnConnectionEstablished()
and OnConnectionClosed()
calls across to the business logic thread
pool and maintains some fairly trivial per-connection user data there. The data we maintain is the address of the client connection (obtained from the
buffer passed into OnConnectionEstablished()
and the number of messages that have been processed on this particular connection.
The complete example
The shell of a POP3 server which performs its business logic processing in a seperate thread pool to its IO can be downloaded from
here, in the SimpleProtocolServer2 directory. The server has a call to
::Sleep()
within its message processing code so that the processing takes some time and blocks. Notice how the IO on other connections is
unaffected by this, and, if you want, add a similar call to the server we developed at the end of the last article and compare the behavior.
As with the other examples, simply telnet to localhost 5001 to test the server. The server runs until a named event is set and then shuts down. The very simple Server Shutdown program, available available as part of the download, provides the off switch.
Download
The following source was built using Visual Studio 6.0 SP5 and Visual Studio .Net. You need to have a version of the Microsoft Platform SDK installed
Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output. It’s only worth profiling the release builds.
You can download the code from here.
Revision history
- 21st May 2002 - Initial revision.
- 27th May 2002 - Added pause/resume functionality to all servers and the server shutdown program. Use
CSocket
to protect from resource leaks when creating the listening socket. Refactored theSocket
andCIOBuffer
classes so that common list management code is now inCNodeList
and common user data code is now inCOpaqueUserData
. - 29th May 2002 - Linting and general code cleaning
- 18th June 2002 - Removed call to
ReuseAddress()
during the creation of the listening socket as it not required - Thanks to Alun Jones for pointing this out to me. - 28th June 2002 - Adjusted how we handle socket closure.
- 30th June 2002 - Removed the requirement for users to subclass the socket server’s worker thread class. All of the work can now be done by simply subclassing the socket server class.
- 15th July 2002 -
Socket
closure notifications now occur when the server shuts down whilst there are active connections. SocketServer can now be set to ensure read and write packet sequences. - 23rd July 2002 - Bug fix to
CSocketServer::ProcessDataStream()
. We were reusuing the buffer when we shouldn’t have been. Code was fine up until the changes on 30th June and is fine again now. Thanks to an anonymous CodeProject reader for pointing this out to me. - 6th August 2002 - Fixed download link. Thanks to Marco Marigo for reporting the problem.
- 12th August 2002 - Removed the race condition in socket closure - Thanks to David McConnell for pointing this out.
Derived class can receive connection reset and connection error notifications.
Socket
provides a means to determine if send/receive are connected. Dispatch to the thread pool now uses shared enums rather than hard coded constant values. General code cleaning and lint issues. - 14th January 2011 - Reprinted the article on The Server Framework which is the new home of the code that originally shipped with this article and which is now known as The Free Framework.