This article was originally published in 2002 and it refers to designs and code that form part of The Free Framework. The design and code of The Server Framework has moved on considerably since this article was written. Please browse the documentation and the rest of this site to learn more about The Server Framework.
Overview
“How do you handle the problem of multiple pending WSARecv()
calls?” is a common question on
the Winsock news groups. It seems that everyone knows that it’s often a good idea to have more than
one outstanding read waiting on a socket and everyone’s equally aware that sometimes code doesn’t work
right when you do that. This article explains the potential problems with multiple pending recvs and
provides a solution within the reusable server framework that we’ve been developing over the last few
articles.
That’s out of order
There is a subtle issue when using IO completion ports with multiple threads. Although operations
using the IO completion port will always complete in the order that they were submitted, thread scheduling
issues may mean that the actual work associated with the completion is processed in an undefined order.
For example, if we were to submit three WSARecv()
requests on a socket then they are guaranteed to complete
in the order that we submitted them, however if we have 2 threads servicing the IO completion port two of the
completions could be being processed simultaneously. If the thread processing the ‘first’ WSARecv()
completion
is interrupted the second may be completely processed before the first. This is even more likely to occur
on machines with multiple processors where the two threads may really be executing simultaneously, but it’s
possible on single processor boxes too. As always, this is the kind of subtle problem that probably wont
show its face until you release the software to production…
The example above is easy to avoid, simply don’t have multiple WSARecv
requests outstanding on a single
socket. This is what we have done so far in the example servers developed in the previous articles.
However this reduces performance, it’s always more performant to have a receive pending when the data
actually arrives on the wire than it is to post a receive after the data has already arrived. Having
multiple WSARecv
calls outstanding ensures that there’s always a call pending. What’s more, the problem
isn’t limited to having multiple WSARecvs()
.In our server framework we marshal all socket IO calls from
the user’s threads into our IO thread pool using the IO completion port. This means that there is the
potential for a user thread to issue multiple consecutive writes to the socket and for them to be executed
in an undefined order.
In our example servers so far, code like this:
void CSocketServer::ReadCompleted(
Socket *pSocket,
CIOBuffer *pBuffer)
{
// do stuff...
pSocket->Write(pBuffer); // echo the command
pSocket->Write(pResponse1); // send part 1 response
pSocket->Write(pResponse2); // send part 2 response
}
is potentially unsafe because the writes don’t occur synchronously on the user’s thread, they are posted to the IO completion port and occur in the IO thread pool.
Preserving the order of IO completion operations is relatively straight forward. As you’ll remember, the
overlapped structure passed to all calls that use IO completion ports represents ‘per call’ data. We can,
and do, extend the overlapped structure to include our own ‘per call’ data by using the CIOBuffer
class.
If we add a sequence number to the CIOBuffer
we can set the sequence number to the ’next’ value in
the user’s thread and then make sure we process the buffers in order in the IO thread pool. This concept
applies to any IO completion port operation and each distinct operation requires its own sequence number.
For our server framework that means that our Socket class must now maintain independent sequence numbers
for read and write requests.
The sequence number management code inside the Socket’s Write method could be something like this:
pBuffer->SetSequenceNumber(m_writeSequenceNumber++);
To ensure that the sequence numbers actually represent the order that the operations are submitted requires
that the setting of the sequence number and the submission of the operation are an atomic operation. For
our socket writes this isn’t a problem as we only guarantee the order of writes that are performed on a
single thread, for socket reads we need to ensure that the allocation of the sequence number and call
to WSARecv()
occur without another thread having a chance to perform read at the same time. This involves using
a critical section to lock access to the socket during the sequence number allocation and WSARecv()
call.
Failure to lock in this area can lead to the actual order that the WSARecv()
calls are made failing to match
the ordering of the sequence numbers allocated.
Orderly processing
The code to ensure that the IO completions are handled in order is a little more complex. For each distinct
IO operation we need to keep track of the next sequence number that we can process. When a call to
GetQueuedCompletionStatus()
returns we need to compare the sequence number in the request with
the next sequence number that we can process. If these numbers match then we can process the request. If they don’t
then the request cannot be processed at this time. If an IO operation cannot be processed it should be
stored for later processing. The storage of the out of sequence request needs to be keyed on the sequence
number. When an IO thread finds that it can’t process the current request it should add the current request
to the store and see if there’s a request in the store that can be processed. When a request is processed
the last thing that the IO thread should do is atomically increment the value representing the next sequence
number to process and check to see if there’s an IO request in the store that can be processed.
The above strategy handles the situation where multiple IO requests complete concurrently. Only one thread can be processing an IO request that meets the criteria of being the next one to process, all other threads will simply add their requests to the store. When the thread that’s processing a request finishes processing it can check to see if there are other requests in the store that can now be processed. If a thread needs to store its IO request then it can do so and then check for a request that can be processed in an atomic operation.
It’s actually more complex to read about than it is to look at, the code to process an operation in order might look like this:
pBuffer = pSocket->m_outOfSequenceWrites.GetNext(pBuffer);
while(pBuffer)
{
DWORD dwFlags = 0;
DWORD dwSendNumBytes = 0;
if (SOCKET_ERROR == ::WSASend(
pSocket->m_socket,
pBuffer->GetWSABUF(),
1,
&dwSendNumBytes,
dwFlags,
pBuffer,
NULL))
{
// handle errors etc.
}
pBuffer = pSocket->m_outOfSequenceWrites.ProcessAndGetNext();
}
The store itself needs to map sequence numbers to CIOBuffers
. The obvious choice of data structure is
a std::map<>
though your performance requirements and profiling may dictate a different choice. GetNext()
takes a buffer, compares its sequence number with the next one we can process and either returns the
buffer or adds the buffer to the map and then checks the map to see if the first buffer in the map is
the one we can process. Remember that the map stores its elements in order of their keys and that we’re
using the sequence number as the key, so m_list.begin()
refers to the element in the map with the
lowest sequence number. If this function returns null
then we’re still waiting for the ’next’ buffer
to arrive.
CIOBuffer *CIOBuffer::InOrderBufferList::GetNext(
CIOBuffer *pBuffer)
{
CCriticalSection::Owner lock(m_criticalSection);
if (m_next == pBuffer->GetSequenceNumber())
{
return pBuffer;
}
BS::value_type value(pBuffer->GetSequenceNumber(), pBuffer));
std::pair<BS::iterator, bool> result = m_list.insert(value);
if (result.second == false)
{
// handle error, element already in map
}
CIOBuffer *pNext = 0;
BS::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
After processing a buffer the thread can check to see if there’s another buffer that it can handle. It needs to increase the last processed value and perform the check atomically, hence the locking.
CIOBuffer *CIOBuffer::InOrderBufferList::ProcessAndGetNext()
{
CCriticalSection::Owner lock(m_criticalSection);
::InterlockedIncrement(&m_next);
CIOBuffer *pNext = 0;
BS::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
Handling reads
If the CIOBuffer
used by every write that occurs contains a sequence number then similar code could be
used to ensure that completed read requests are processed in the correct order. However, there’s little
point in this code being placed in the server framework as different users of the framework may require different
functionality. The CSocketServer
derived class could use the CIOBuffer::InOrderBufferList
class to
maintain processing order or it could simply dispatch the read completions to another IO completion
port to pass them across to a business logic thread pool.
In this case it’s the code in the business logic thread pool that actually
processes the data and the order should be maintained there. It may even need to do both, ensuring packet
order in the CSocketServer
class itself so that it can successfully break the byte stream into messages
and then dispatching the messages to the business logic thread pool and ensuring that these complete messages
are also processed in the correct order.
Locking granularity
Each Socket must now keep track of independent read and write sequence numbers and maintain a map of
out of sequence write requests. Manipulation of the map and associated next sequence number counter must
be protected. We use a critical section to protect this code. Be aware that allocating a critical section for each
Socket
connection is potentially resource intensive. Instead we could choose to trade locking granularity for
performance. The CSocketServer
class already has a critical section that it uses to protect its lists
of Sockets
, we could pass a reference to this critical section to each Socket
rather than have them
create their own critical section. The problem with doing this is that we serialise every Socket’s map access.
This work performed inside the critical section is small, but a better solution might be to create a critical
section for every X sockets where X is a value that is determined by profiling your application.
Only paying for what you use
Including sequence numbers in all buffers used for sending and receiving and ensuring the writes are
processed in order adds a little overhead to the work of the IO threads. If you are sure that your server
doesn’t require this functionality, perhaps because you know that due to your protocol design there will
only ever be a single read or write request pending, you can opt not to include this functionality by
passing false as the useSequenceNumbers
flag in the CSocketServer
object’s constructor. Enabling read or write
sequence numbers independently is left as an exercise for the reader.
An example
To demonstrate the concept of ensuring the ordering of multiple reads and writes we’ve come up with a rather contrived example. The packet echo server that we developed in the previous article has been changed as follows:
- It now does its work in a business logic thread pool so that we can demonstrate maintaining receive order in both the socket server and the business logic thread pool when the socket server’s worker thread isn’t doing the processing itself.
- It works with larger packets; we use a two byte packet header rather than a one byte header. This two byte header represents the length of the packet using the following format: packetLength = byte1 + (byte2 * 256). The length of the packet includes the two byte header.
- When the client initially connects it posts a configurable number of reads. As each read completes it posts a new read so that it maintains the number of outstanding reads.
- It processes the reads in order, and due to the fact that we now have multiple reads outstanding,
CSocketServer::ProcessDataStream()
has changed so that when more data is required we don’t simply reissue a read to read more data into the same buffer. - It echoes the packet back to the client in pieces by issuing multiple write requests.
The large packet echo server is available for download from here, in the ThreadPoolLargePacketEchoServer directory. Testing using telnet is possible, though more complex, you may find it easier to use the test harness that we develop here to test it. As with the previous examples, the server runs until a named event is set and then shuts down. The very simple Server Shutdown program, available available as part of the download, provides an off switch for the server.
Although both the server and thread pool classes are configurable as to whether they use sequence numbers to maintain packet order these settings can only be set in one way for the server to actually work in the way that the test harness expects. All packet ordering flags must be set to true. The purpose of the flags is so that you can turn off the various sequencing required and see the effect on the test. It’s not intended that the server can run reliably in any other configuration.
CThreadPool pool(
5, // initial number of threads to create
5, // minimum number of threads to keep in the pool
10, // maximum number of threads in the pool
5, // maximum number of "dormant" threads
5000, // pool maintenance period (millis)
100, // dispatch timeout (millis)
10000, // dispatch timeout for when pool is at max threads
20, // (1) number of reads to post
true, // (2) maintain packet order with sequence numbers
true); // (3) echo packets with multiple writes
pool.Start();
CSocketServer server(
INADDR_ANY, // address to listen on
5001, // port to listen on
10, // max number of sockets to keep in the pool
10, // max number of buffers to keep in the pool
1024, // buffer size
pool,
65536, // max message size
true, // (4) maintain read packet order with sequence numbers
true, // (5) maintain write packet order with sequence numbers
true); // (6) issue a new read before we've completely processed this one
The configuration flags can be adjusted to witness the following effects:
- The configuration shown above ensures that packets into the read completion method are processed in sequence - this maintains the validity of the incoming packet data; Packets into the worker thread are maintained in sequence - this maintains the order of echoing the actual packets; and the sequence of write calls is maintained - this maintains the validity of the outgoing packet data.
- If the number of reads to post (1) is reduced to 1 then there is no need to maintain the read completion sequencing ((4) can be set to false) as long as read completion method doesn’t issue another read until it has completed the processing of the current one ((6) should be set to false).
- If the business logic thread pool doesn’t attempt to maintain packet ordering ((2) set to false) then the test will likely report sequence number mismatches - as the packets are echoed out of sequence, and response != message errors as multiple threads in the business logic thread pool attempt to write fragments of the message to the socket in an unsynchronised manner and thus interleave sections of different messages.
- If (2) is left set to false but (3) is also set to false then the test will only fail with sequence number mismatches as the threads are now echoing their packets in a single write so the data in the individual packets cant be corrupted by being interleaved with sections of other packets.
- Unfortunately it’s currently impossible to simply turn off write packet ordering at the socket server (option (5)) as doing so also turns off read packet sequencing and so makes it impossible to get valid data to the business logic thread so that it can echo it and we can witness the writes being performed out of sequence. If you’re interested in seeing this in action then you can hack at the socket server code.
Download
The following source was built using Visual Studio 6.0 SP5 and Visual Studio .Net. You need to have a version of the Microsoft Platform SDK installed
Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output. It’s only worth profiling the release builds.
You can download the code from here.
Revision history
- 15th July 2002 - Initial revision.
- 12th August 2002 - Removed the race condition in socket closure - Thanks to David McConnell for pointing this out.
Derived class can receive connection reset and connection error notifications.
Socket
provides a means to determine if send/receive are connected. Dispatch to the thread pool now uses shared enums rather than hard coded constant values. General code cleaning and lint issues. - 14th January 2011 - Reprinted the article on The Server Framework which is the new home of the code that originally shipped with this article and which is now known as The Free Framework.