Understanding Multithreading in C++

Multi-threading refers to an application with multiple threads running within a process, while multi-processing refers to an application organized across multiple OS-level processes.

A thread is a stream of instructions within a process. Each thread has its own instruction pointer, set of registers and stack memory. The virtual address space is process specific, or common to all threads within a process. So, data on the heap can be readily accessed by all threads, for good or ill.

Multi-threading is a more "light weight" form of concurrency: there is less context per thread than per process. As a result thread lifetime, context switching and synchronization costs are lower. The shared address space (noted above) means data sharing requires no extra work.

Multi-processing has the opposite benefits. Since processes are insulated from each other by the OS, an error in one process cannot bring down another process. Contrast this with multi-threading, in which an error in one thread can bring down all the threads in the process. Further, individual processes may run as different users and have different permissions.

Subsequent sections introduce some common problems with multi-threaded code, and solves them using low-level synchronization constructs.

A race condition is where the behavior of code depends on the interleaving of multiple threads. This is perhaps the most fundamental problem with multi-threaded programming.

When analyzing or writing single-threaded code we only have to think about the sequence of statements right in front of us; we can assume that data will not magically change between statements. However, with improperly written multi-threaded code non-local data can change unexpectedly due to the actions of another thread.

Race Conditions

Race conditions can result in a high-level logical fault in your program, or (more excitingly) it may even pierce C++'s statement-level abstraction. That is, we cannot even assume that single C++ statements execute atomically because they may compile to multiple assembly instructions. In short, this means that we cannot guarantee the outcome of a statement such as foo += 1; if foo is non-local and may be accessed from multiple threads.

A contrived example follows.

Listing 1. A logical race condition

int sharedCounter = 50;

void* workerThread(void*)
{
while(sharedCounter > 0)
{
doSomeWork();
--sharedCounter;
}
}

Now imagine that we start a number of threads, all executing workerThread(). If we have just one thread, doSomeWork() is going to be executed the correct number of times (whatever sharedCounter starts out at).

However, with more than one thread doSomeWork() will most likely be executed too many times. Exactly how many times depends on the number of threads spawned, computer architecture, operating system scheduling and...chance. The problem arises because we do not test and update sharedCounter as an atomic operation, so there is a period where the value of sharedCounter is incorrect. During this time other threads can pass the test when they really shouldn't have.

The value of sharedCounter on exit tells us how many extra times doSomeWork() is called. With a single thread, the final value of sharedCounter is of course 0. With multiple threads running, it will be between 0 and -N where N is the number of threads.

Moving the update adjacent to the test will not make these two operations atomic. The window during which sharedCounter is out of date will be smaller, but the race condition remains. An illustration of this non-solution follows:

Listing 2. Still a race condition

void* workerThread(void*)
{
while(sharedCounter > 0)
{
--sharedCounter;
doSomeWork();
}

The solution is to use a mutex to synchronise the threads with respect to the test and update. Another way of saying this is that we need to define a critical section in which we both test and update the sharedCounter. The next section introduces mutexes and solves the example race condition.

Mutexes

A mutex is an OS-level synchronisation primitive that can be used to ensure a section of code can only be executed by one thread at a time.

It has two states: locked and unlocked. When the mutex is locked, any further attempt to lock it will block (the calling thread will be suspended). When the mutex becomes unlocked, if there are threads waiting one of these will be resumed and will lock the mutex. Furthermore, the mutex may only be unlocked by the thread that locked it.

If we have a resource we need to share between threads, we associate a mutex with it and use the mutex to synchronise resource access. All we need to do is ensure our code locks the mutex before using the resource, and unlocks it after it is finished. This will prevent race conditions related to multiple threads simultaneously accessing that resource.

Diagram 1. Two thread contention for a mutex

Mutexes in Practice - Boost.Threads solution

Boost.Threads is a part of the excellent Boost libraries. It has been intelligently designed to enhance safety by making error-prone code more difficult to write.

We'll be using Boost.Threads throughout the tutorial, since we may as well get used to using a well designed C++ library from the outset. Furthermore, the upcoming C++ 0x standard (due sometime this decade) will use Boost.Threads as the model for the new threading support, so learning this library will help future-proof your C++ skills.

Listing 2. Boost.Threads synchronisation

int sharedCounter = 50;
boost::mutex counterMutex;

void solutionWorkerThread()
{
while(sharedCounter > 0)
{
bool doWork = false;
{
// scoped_lock locks mutex
boost::mutex::scoped_lock lock(counterMutex);
if(sharedCounter > 0)
{
--sharedCounter;
doWork = true;
}
// scoped_lock unlocks mutex automatically at end of scope
}

        if(doWork) doSomeWork();
}
}

In the above solution, the shared counter is checked and updated as an atomic operation (with respect to multiple threads) so the race condition is solved.

Note the way the scoped_lock works: the constructor locks the associated mutex and the destructor unlocks it. This is the RAII (Resource Acquisition Is Initialisation) idiom [1], and it helps exception safety. If an exception were thrown while we had locked the mutex, the scoped_lock would be destroyed during the normal stack unwinding process and the mutex would be automatically freed.

Exception safety is not an issue with this simple example, since no statement can throw while we have the mutex locked. However, real-world code will almost always benefit from the scoped_lock design.

Unfortunately concurrent code can have many problems: race conditions are only the most fundamental. The next problem we'll cover is called Deadlock, and it commonly arises from the interaction of blocking mutexes.

Deadlock is where one or more threads wait for resources that can never become available.

The classic case (illustrated below) is where two threads both require two shared resources, and they use blocking mutexes to lock them in opposite order. Thread A locks resource X while thread B locks resource Y. Next, thread A attempts to lock resource Y and thread B attempts to lock resource X: since both resources are already locked (by the other thread), both threads wait indefinitely.

The following diagram should make the sequence clear.

Figure 1. Classic deadlock

It is easy to write code where deadlock is inevitable, here is the classic case:

Listing 1. Classic deadlock

boost::mutex resourceX;
boost::mutex resourceY;

void deadlockThreadAFunc()
{
uint counter = 0;
while(true)
{
boost::mutex::scoped_lock lockX(resourceX);
boost::thread::yield(); // to encourage deadlock
boost::mutex::scoped_lock lockY(resourceY);

        std::cout << "threadA working: " << ++counter << "\n";
}
}

void deadlockThreadBFunc()
{
uint counter = 0;
while(true)
{
boost::mutex::scoped_lock lockY(resourceY);
boost::thread::yield(); // to encourage deadlock
boost::mutex::scoped_lock lockX(resourceX);

        std::cout << "threadB working: " << ++counter << "\n";
}
}

The yield statements in the above example force the current thread to stop executing and allow another thread to continue. They are for demonstration purposes only, to encourage the deadlock to occur quickly. Without them, a single core machine may run for some time without having a context switch between the two resource locking statements (and thus triggering the deadlock).

For this toy example the fix is simple but non-intuitive. All we need to do is ensure we lock resources in a consistent order, so changing deadlockThreadBFunc() to lock resourceX before resourceY ensures there will be no deadlock. Unlock order is not significant.

One valuable technique to ensure strict lock-order discipline is to always lock a group of resources in the order of their memory address. However, it should be clear that deadlock will become much more of a problem in non-trivial code with complex data sharing requirements where resources are being locked at multiple levels and in many different contexts. This is one of the main reasons multi-threaded programming is so difficult - it sometimes requires coordination between multiple levels in your code, and this is the enemy of encapsulation.

Self Deadlock

Another problem is self-deadlock. Self-deadlock occurs when a single thread attempts to lock a mutex twice: the second attempt will block indefinitely. This can easily happen when the same resource is used at multiple levels within an algorithm.

In particular, consider a class that attempts to provide a threadsafe interface by synchronising all member function calls with a single internal mutex. The mutex is locked at the beginning of every method, and unlocked on method return. If that class now calls a member function from within a member function, there will be a self-deadlock.

To counter this problem there is the concept of recursive mutexes. A recursive mutex will allow multiple locks from within a single thread to succeed, though that thread must unlock the mutex as many times as it has locked it. The disadvantage of a recursive mutex is a slight performance decrease.

Livelock

Livelock is when multiple threads continue to run (ie. do not block indefinitely like in deadlock), but the system as a whole does not make progress due to repeating patterns of non-productive resource contention.

Livelock may arise from attempts to avoid threads blocking (which can hurt performance) via a try-lock. A try-lock attempts to lock a mutex but does not block if the mutex is already locked. The following example should make usage of the Boost try-lock clear.

Listing 1. Contrived livelock

boost::try_mutex resourceX;
boost::try_mutex resourceY;

void threadAFunc()
{
uint counter = 0;
while(true)
{
boost::try_mutex::scoped_lock lockX(resourceX);
boost::thread::yield(); // encourage the livelock

        boost::try_mutex::scoped_try_lock lockY(resourceY);
if(lockY.locked() == false) continue;

        std::cout << "threadA working: " << ++counter << "\n";
}
}

void threadBFunc()
{
uint counter = 0;
while(true)
{
boost::try_mutex::scoped_lock lockY(resourceY);
boost::thread::yield(); // encourage the livelock

        boost::try_mutex::scoped_try_lock lockX(resourceX);
if(lockX.locked() == false) continue;

        std::cout << "threadB working: " << ++counter << "\n";
}
}

This code exhibits an almost full livelock, though for each yield statement removed the lock gets a little less severe. When I run this example, at best threads do a few pieces of work per second. How does the livelock occur? The probable sequence is illustrated below:

Another use of the term livelock involves starvation, where one part of a system monopolises system resources and starves another part of the system. For example, a system composed of request-queueing and request-servicing components might exhibit starvation if an overwhelming number of requests cause the request-queueing component to use all system resources.

Condition Variables

Mutexes cater to the most general form of resource sharing, but sometimes threads have more specific sharing requirements. Condition Variables allow us to express and control a directed dependency between threads. In general this means that one group of threads should only execute when a given condition becomes true, and this condition will be satisfied by another group of threads.

When thread operation is coordinated by a condition variable, one group of threads waits until a condition is triggered, at which point one or more of the waiting threads is woken.

An example: there are two groups of threads: one producing something, and the other consuming it. The Producer-Consumer pattern is also useful outside multi-threaded programming, where it allows you to decouple data/event/request/whatever production from consumption. For our contrived example, we produce and consume characters from the alphabet.

std::queue<char> producedChars;

boost::condition characterAvailable;
boost::mutex characterMutex;

void conditionDataProducer()
{
unsigned char c = 'A';
for(uint i = 0; i < numCharacters;)
{
if((c >= 'A' && c <= 'Z'))
{
boost::mutex::scoped_lock lock(characterMutex);
producedChars.push(c++);
characterAvailable.notify_one();
++i;
}
else
{
c = 'A';
}
}
boost::mutex::scoped_lock lock(characterMutex);
producedChars.push(EOF);
characterAvailable.notify_one();
}

void conditionDataConsumer()
{
boost::mutex::scoped_lock lock(characterMutex);

    while(true)
{
// on entry, releases mutex and suspends this thread
// on return, reacquires mutex
while(producedChars.empty())
{
characterAvailable.wait(lock);
}

        if(producedChars.front() == EOF) break;
producedChars.pop();
}
}

Take a look at the consumer first: it acquires a mutex and then uses the condition variable to wait. When wait is called the mutex is unlocked and the calling thread is suspended. The consumer thread will now only be resumed when the condition represented by characterAvailable becomes true.

The producer simply pushes characters onto the shared container and then calls notify_one. This will allow one of the threads waiting on this condition (a consumer) to resume and process the new data. This will be much more efficient than having consumers endlessly polling an empty queue.

Condition variables are also useful in implementing the Monitor Object concurrency pattern, which we talk about next.

Scoped Locking

We've been using the scoped-locking idiom throughout the first chapter, but it deserves to be treated separately. After all, much multi-threaded programming is still done using more procedural APIs such as POSIX threads (pthreads), in which the programmer must explicitly unlock mutexes.

The Boost.Threads library uses the Resource-Acquisition-Is-Initialisation (RAII) idiom in the implementation of its scoped locks. When you create a scoped_lock object (associated with the mutex you want to acquire) the constructor locks the mutex, and the destructor unlocks the mutex. C++ has deterministic destruction, so the language guarantees that local objects will be destroyed when a scope is exited by any means.

The most obvious benefit of this idiom is that it is now impossible to forget to unlock a mutex. The scoped_lock destructor will be called no matter how or where the function is exited, so we nolonger have to check that all return paths unlock the mutex. This is an especially big win when a function has multiple exit points, or when a maintenance programmer adds a new exit point without fully comprehending the function.

Another important benefit of scoped locking is that we gain a measure of exception safety. When exceptions are thrown local objects are destroyed during stack unwinding, so the scoped_lock will help ensure the function exits in a consistent state.

Arguably, a disadvantage of scoped locking is that it decreases the clarity of your locking strategy since unlocking is implicit rather than explicit.

Read/Write Lock AKA Reader/Writer Lock

A Read/Write Lock is a performance improvement over a standard mutex for cases where reads outnumber writes. The idea is to divide locks up into two classes: reading and writing. Multiple simultaneous read locks may be held, but write locks are exclusively held.

The exclusive writing lock ensures that race conditions do not occur, since if one client is writing to the data no other client may read or write. Also, the allowance for multiple simultaneous read locks decreases resource contention since multiple readers can safely use the shared data. This increases performance over a standard mutex for the assumed usage pattern of frequent simultaneous reads and infrequent writes.

However, a simple Read/Write Lock implementation that avoids starvation in all cases is not easy to create. The following implementation prioritises write locks, and so does not allow the assumed greater number of readers to starve writers. This is accomplished by making new read locks wait if there is a writer waiting for a lock.

The disadvantage of this implementation is that if usage is not as expected writers may overwhelm the mutex and starve readers indefinitely.

Listing 1. Read/Write Lock favouring writers

// multiple clients may read simultaneously
// but write access is exclusive
// writers are favoured over readers
class ReadWriteMutex : boost::noncopyable
{
public:
ReadWriteMutex() :
m_readers(0),
m_pendingWriters(0),
m_currentWriter(false)
{}

    // local class has access to ReadWriteMutex private members, as required
class ScopedReadLock : boost::noncopyable
{
public:
ScopedReadLock(ReadWriteMutex& rwLock) :
m_rwLock(rwLock)
{
m_rwLock.acquireReadLock();
}

        ~ScopedReadLock()
{
m_rwLock.releaseReadLock();
}

    private:
ReadWriteMutex& m_rwLock;
};

    class ScopedWriteLock : boost::noncopyable
{
public:
ScopedWriteLock(ReadWriteMutex& rwLock) :
m_rwLock(rwLock)
{
m_rwLock.acquireWriteLock();
}

        ~ScopedWriteLock()
{
m_rwLock.releaseWriteLock();
}

    private:
ReadWriteMutex& m_rwLock;
};

 

private: // data
boost::mutex m_mutex;

    unsigned int m_readers;
boost::condition m_noReaders;

    unsigned int m_pendingWriters;
bool m_currentWriter;
boost::condition m_writerFinished;

 

private: // internal locking functions
void acquireReadLock()
{
boost::mutex::scoped_lock lock(m_mutex);

        // require a while loop here, since when the writerFinished condition is notified
// we should not allow readers to lock if there is a writer waiting
// if there is a writer waiting, we continue waiting
while(m_pendingWriters != 0 || m_currentWriter)
{
m_writerFinished.wait(lock);
}
++m_readers;
}

    void releaseReadLock()
{
boost::mutex::scoped_lock lock(m_mutex);
--m_readers;

        if(m_readers == 0)
{
// must notify_all here, since if there are multiple waiting writers
// they should all be woken (they continue to acquire the lock exclusively though)
m_noReaders.notify_all();
}
}

    // this function is currently not exception-safe:
// if the wait calls throw, m_pendingWriter can be left in an inconsistent state
void acquireWriteLock()
{
boost::mutex::scoped_lock lock(m_mutex);

        // ensure subsequent readers block
++m_pendingWriters;

// ensure all reader locks are released
while(m_readers > 0)
{
m_noReaders.wait(lock);
}

        // only continue when the current writer has finished
// and another writer has not been woken first
while(m_currentWriter)
{
m_writerFinished.wait(lock);
}
--m_pendingWriters;
m_currentWriter = true;
}

    void releaseWriteLock()
{       
boost::mutex::scoped_lock lock(m_mutex);
m_currentWriter = false;
m_writerFinished.notify_all();
}
};

 

Monitor Object

The Monitor Object pattern encapsulates an object's synchronisation policy along with its functionality, providing a thread-safe public interface and making the object a reliable system building block.

The Monitor Object is a normal object except that it provides a threadsafe interface, one that may be called safely without external synchronisation by any number of concurrent clients. The simplest form of this pattern is embodied by a class with a public interface synchronised with a per-object mutex. The first thing each interface method does is to lock that object's mutex, and the last thing it does is to unlock the mutex. This ensures only one interface method is executing at any time.

However, another common aspect of a Monitor Object is to have more complex blocking interactions between the interface methods. These interactions are coordinated by condition variables. For example, one interface method may internally reach a point where it must wait for some condition within the object to become true before it can continue. This is the kind of situation condition variables were born to handle.

Note that as the synchronisation complexity inside a Monitor Object rises, the value of encapsulating this complexity also rises (the burden on clients remains minimal). In lieu of the Monitor Object pattern, relying on clients to coordinate synchronisation could improve performance, but scattering an object's locking policy throughout its clients is not going to be a reliable design.

Let's see some code. We'll reimplement the example from the condition variable section, replacing the std::queue with a Monitor Object.

Listing 1. MonitorQueue.hpp

template <typename DataT>
class MonitorQueue : boost::noncopyable
{
public:
void push(const DataT& newData)
{
boost::mutex::scoped_lock lock(m_monitorMutex);
m_queue.push(newData);
m_itemAvailable.notify_one();
}

    DataT popWait()
{
boost::mutex::scoped_lock lock(m_monitorMutex);

        if(m_queue.empty())
{
m_itemAvailable.wait(lock);
}

        DataT temp(m_queue.front());
m_queue.pop();
return temp;
}

private:
std::queue<DataT> m_queue;

    boost::mutex m_monitorMutex;
boost::condition m_itemAvailable;
};

Listing 2. main.cpp

MonitorQueue<char> producedChars;

 

void dataProducer()
{
unsigned char c = 'A';
for(uint i = 0; i < numCharacters;)
{
if((c >= 'A' && c <= 'Z'))
{
producedChars.push(c++);
++i;
}
else
{
c = 'A';
}
}
producedChars.push(EOF);
}

void dataConsumer()
{
while(true)
{
if(producedChars.pop() == EOF) return;
}
}

As you can see, I parameterized the MonitorQueue with the type of data it handles (I couldn't bear to hardcode this). You should also be able to see how much simpler the clients are than in the original example. Additionally, it is now a simple matter to create new consumers or producers: we've abstracted away the synchronization details so all this new code has to worry about is doing its specific job.

One issue to be aware of when implementing a Monitor Object is the possibility of self-deadlock, as mentioned in the deadlock section. This can be avoided by strictly following the rule that interface methods do not call other interface methods.

The Monitor Object pattern is also called "Threadsafe Passive Object" ("passive" because it runs in the client thread). The next pattern we'll cover is Active Object.

The Active Object pattern decouples method invocation and method execution by doing all work in a private per-object thread. Whereas in the Monitor Object pattern interface methods could block, Active Object interface methods return after queueing the relevant processing in an internal "Activation List".

The full Active Object pattern can be quite complex since we need some way of returning method return values, as well as a way to bind method calls to their parameters for deferred execution. Implementations will vary, but conceptually the interface methods will immediately return an object called a "Future", through which the method result may be obtained when the queued processing is completed.

This class diagram shows the Active Object pattern (scheduler component not included for simplicity):

Simplified Active Object Pattern

The Proxy provides the Active Object public interface, and runs in the calling thread. It instantiates a Concrete Method implementation object along with the corresponding Future for the result, and these are added to the Activation Queue. The Active Object's private thread will continue to dequeue and execute methods while they are available from the Activation Queue.

An important implementation detail is that Future lifetime is non-trivial - it depends on both the client and the concrete implementation methods. In C++ a good solution is to use a shared pointer so that the Future is cleaned up when there are no more references to it.

Let's have a look at a simplified implementation of the ActiveObject pattern. The servant we are wrapping up has a single function that takes a reasonably long time to run. In fact, the first parameter tells the doSomeWork function how long, in milliseconds, to execute for -- and we'll use this to test the results we get.

Listing 1. Servant interface.

class Servant
{
public:
Servant();
int doSomeWork(unsigned int, bool);
};

Now we'll turn this into an ActiveObject, but first let's see how it will be used:

Listing 2. int main

int main(int argc, char* argv[])
{
ActiveObjectProxy activeObject;

    std::cout << "Dispatching...\n";
shared_ptr<FutureResult<int> > result_a = activeObject.doSomeWork(500, false);
shared_ptr<FutureResult<int> > result_b = activeObject.doSomeWork(1000, false);
shared_ptr<FutureResult<int> > result_c = activeObject.doSomeWork(2000, false);
shared_ptr<FutureResult<int> > result_d = activeObject.doSomeWork(4000, false);

    std::cout << "Waiting for results...\n";
std::cout << result_a->getResult() << "\n"; // outputs 500 after blocking for ~500ms
std::cout << result_b->getResult() << "\n"; // outputs 1000 after blocking for an additional ~1000ms
std::cout << result_c->getResult() << "\n"; // outputs 2000 after blocking for an additional ~2000ms
std::cout << result_d->getResult() << "\n"; // outputs 4000 after blocking for an additional ~4000ms

    std::cout << "Press <Enter> to continue...";
std::cin.get();

    return 0;
}

The way the above code should work is that the method request dispatches should execute very quickly, and then the code should block when actually retrieving the results. In a less contrived example, you might be doing other work while periodically polling the FutureResult to see when the method request was complete.

Listing 3. ActiveObjectProxy

/**
The ActiveObjectProxy coordinates the behaviour of the ActiveObject including instantiating
shared FutureResult instances, binding method request parameters, and adding concrete methods
to the activation queue.
*/
class ActiveObjectProxy
{
public:
ActiveObjectProxy() :
m_exit(false)
{
m_servantThread.reset(new boost::thread(boost::bind(&ActiveObjectProxy::processActivationQueue, this)));
}

    ~ActiveObjectProxy()
{
// ensure servant thread is shut down
{
boost::mutex::scoped_lock lock(m_exitMutex);
m_exit = true;
}
m_servantThread->join();
}

typedef FutureResult<int> ResultT;
typedef shared_ptr<ResultT> RefCountedResultT;
typedef boost::function<int (void)> CallbackT;

    RefCountedResultT doSomeWork(unsigned int inputInteger, bool inputBool)
{
// instantiate ref-counted FutureResult
RefCountedResultT futureResult(new ResultT);

        // bind parameters to the relevant Servant method
CallbackT callback = boost::bind(&Servant::doSomeWork, m_servant, inputInteger, inputBool);

        // instantiate and enqueue a concrete method request
shared_ptr<MethodRequest> concreteMethod(new ConcreteMethod<int>(futureResult, callback));
m_activationQueue.push(concreteMethod);

        return futureResult;
}

private:
// this method is executed in a private thread,
// allowing the servant to do work in the background
void processActivationQueue()
{
while(true)
{
{
boost::mutex::scoped_lock lock(m_exitMutex);
if(m_exit) return;
}

            // attempt to unqueue and process the next waiting method request
// but don't wait any longer than 100ms
const int waitMs = 100;
try
{
m_activationQueue.pop(waitMs)->execute();
}
catch(const WaitTimeout&) {}
// exceptions should not generally be used for non-exceptional flow control,
// but it is a simple, adequate solution for this toy example
// a better, but more confusing solution might use boost::optional
}
}

 

private:
Servant m_servant;
scoped_ptr<boost::thread> m_servantThread;

    typedef MonitorQueue<shared_ptr<MethodRequest> > ActivationQueueT;
ActivationQueueT m_activationQueue;

    boost::mutex m_exitMutex;
bool m_exit;
};

The ActiveObjectProxy coordinates the whole ActiveObject. It binds parameters and enqueues concrete method requests in the activation queue, and also manages the servant thread which processes those method requests.








}