MultiThreading with the Actor Model


Multi threaded application design has been a significant challenge since I first understood the problem in the 80's. It turns out the guys at ØMQ also have a thing or two to say about this that introduces the Actor Model and is worth repeating for those new to the subject. You do not need to be a developer with experience in multithreading, but the more you've suffered from multithreaded code design, the more you'll enjoy this article.

Why Write Multithreaded Code?

Before we look at how to write multithreaded code using ØMQ, it's worth asking why we want to do this at all.  In my experience, there are two reasons why people write multithreaded code.
  1. It is a way to get concurrency, i.e. to handle many events in parallel.  A typical example would be to write a web server capable of handling many requests in parallel.
  2. It is a way to get performance, i.e. to use more CPU cores in parallel.  A typical example would be a supercomputing grid capable of running thousands of tasks in parallel.
Multithreading is about distributing work over multiple CPUs.  An ideal design would let us use 100% of each core, and add cores up to any scale.  However the traditional approach to not only wastes a lot of CPU time with wait states and context switching, it also fails to scale beyond ten or so cores, due to increasing conflicts between threads.

One example from ØMQ experience.  In 2005 we wrote our AMQP messaging server, OpenAMQ.  The first versions used the same virtual threading engine as Xitami, and could push 50,000 messages per second in and out (100K in total).  These are largish, complex messages, with nasty stateful processing on each message, so crunching 50K in a second was a good result.  This was 5x more than the software we were replacing.  But our goal was to process 100K per second.  So after some thought and discussion with our client, who liked multithreading, we chose the "random insanity" option and built a real multithreaded version of our engine from scratch.

The result was two years of late nights tracking down weird errors and making the code more and more complex until it was finally robust.  OpenAMQ is extremely solid nowadays, and this pain is apparently "normal" in multithreaded applications, but coming from years of doing painless pseudo-threading, it was a shock.

Worse, OpenAMQ was slower on one core and did not scale linearly.  It did 35K when running on one core, and 120K when running on four.  So the exercise was worth it, in terms of meeting our goals, but it was horribly expensive.

The Failure of Traditional Multithreading
The core problem with (conventional) concurrent programming techniques is that they all make one major assumption that just happens to be insane. The assumption maybe comes from the Dijkstran theory that "software = data structures + algorithms", which is flawed.  Software consisting of object classes and methods is no better, it just merges the data structures and algorithms into a single pot with multiple levels of abstraction like rice noodles.

Before we expound a less insane model of software, let's see why placing data structures (or objects) in such a central position fails to scale across multiple cores.

In the Dikstran model of software, its object-oriented successors, and even the IBM-gifted relational database, data is the golden honey comb that the little busy bees of algorithms work on.  It's all about the data structures, the relations, the indexes, the sets, and the algorithms we use to compute on these sets.  We can call this the "Data + Compute" model of software:



When two busy algorithmic bees try to modify the same data, they naturally negotiate with each other over who will go first.  It's what you would do in the real world when you 'negotiate' with a smaller, weaker car for that one remaining parking place right next to the Starbucks.  First one in wins, the other has to wait or go somewhere else.

And so the entire mainstream science of concurrent programming has focused on indicator lights, hand signals, horns, bumpers, accident reporting forms, traffic courts, bail procedures, custodial sentences, bitter divorces, alcoholic deaths, generational trauma, and all the other consequences of letting two withdrawn caffeine addicts fight over the same utterly essential parking spot.

Here, with less visual impact and more substance, is what really happens:

  • Two threads cannot safely access non-trivial data at the same time.  It is possible to do atomic accesses to short chunks of memory (bytes, words) but it rapidly becomes a question of what CPU you're using.
  • There are ways to access larger types like pointers atomically but these require pretty freaky assembly language 'compare and swap' instructions that are definitely not portable, and not something you want to try to teach ordinary developers.
  • So, two threads need to agree in advance what data they won't touch at the same time, and for how long.  Unfortunately there is no way to say "this data is special", rather the code has to say, like every over-confident driver heading for disaster, "I'm special".  I.e. every single piece of code that wants to access some data has to do all the effort of being careful.
  • Threads can do this by sending each other atomic signals, called 'semaphores'.  Or, they can raise mutual exclusion zones ('mutexes'), which are like signals to the operating system saying, "I want to be the only person in this mutex zone".  Or they can define 'critical sections' that say, "I really don't trust anyone at all, while I'm in this section of code please don't let anyone else at all here".

These techniques all try to achieve the same thing, namely safe access to shared data.  What they all end up doing is:
  1. Stopping other threads that want to access the same data.  When the reason for concurrency is to get performance, stopping threads for any reason except "there is no work to do" is a Really Bad Idea.
  2. Breaking, because it is impossible to eliminate bugs in such a model.  Concurrent code that works under normal loads will break under heavier loads, as developers wrongly judge the size of critical sections, forget mutexes, or find threads deadlocked like two drivers half-way into the same parking spot.
  3. Getting complex, because the solution to all these problems is to add yet more untestable synchronization mechanisms.
In practice, and this is being optimistic, the best classic multithreaded applications can scale to perhaps ten threads, with around ten times the cost of writing equivalent single threaded code, and that's it.  Above ten threads, the cost of locking exceeds real work so that adding another thread will slow things down.

The only way to scale beyond single digit concurrency is to share less data.  If you share less data, or use black magic techniques like flipped data structures (you lock and modify a copy of a structure, then use an atomic compare-and-swap to flip a pointer from the live copy to your new version), you can scale further.  (That last technique serializes writers only, and lets readers work on a safe copy.)

Lots of Chatty Boxes
Reducing the conflicts over data is the way to scale concurrency.  The more you reduce the conflicts, the more you can scale.  So if there was a way to write real software with zero shared data, that would scale infinitely. There is no catch. As with many things in technology, this is not a new idea, it's simply an old one that never made the mainstream.  Many fantastically great ideas are like this, they don't build on existing (mediocre) work, so are never adopted by the market.

Historically, the Data + Compute theory of software stems from the earliest days of commercial computers, when IBM estimated the global market at 5,000 computers. The original model of computing is basically "huge big thing that does stuff". As hardware models became software models, so the Big Iron model became Data + Compute.  But in a world where every movable object will eventually have a computer embedded in it, Data + Compute turns into the shared state dog pit where algorithms fight it out over memory that is so expensive it has to be shared.

But there are alternative models of concurrent computing than the shared state that most mainstream languages and manufacturers have adoped.  The relevant alternate reality from the early 1970's is "computing = lots of boxes that send each other messages".  As Wikipedia says of the Actor model of computing:

"Unlike previous models of computation, the Actor model was inspired by physical laws... Its development was "motivated by the prospect of highly parallel computing machines consisting of dozens, hundreds or even thousands of independent microprocessors, each with its own local memory and communications processor, communicating via a high-performance communications network." Since that time, the advent of massive concurrency through multi-core computer architectures has rekindled interest in the Actor model."

Just as Von Neumann's Big Iron view of the world translated into software, the Actor model of lots of chatty boxes translates into a new model for software.  Instead of boxes, think "tasks".  *Software = tasks + messages*.  It turns out that this works a lot better than focusing on pretty bijoux data structures (and who doesn't enjoy crafting an elegant doubly-linked list or super-efficient hash table?)

So here's are some interesting things about the Actor model, apart from "how on earth did mainstream computer science ignore such a powerfully accurate view of software for so long":
  • It's a better model of a real world with trillions of CPUs.
  • It lets you create massive concurrency with no resource conflicts.
  • It scales literally without limit.
  • It lets you exercise every CPU to maximum capacity.

To explain again how broken the shared-state model is, imagine there was just one mobile phone directory in the world.  Forget the questions of privacy and who gets to choose which number goes with "Mom&Dad".  Just consider the pain if access had to be serialized.

Luckily, every mobile phone has its own directory, and they communicate with each other by sending messages. Mobile phones have demonstrated this works well, and scales to the point where we have over 3 billion mobile phones on the planet with no contest over access to a SIM card.

But the actor model has more advantages over shared state concurrency:
  • While shared state has very fuzzy contracts between threads, Actor thrives on contractual interfaces.  Messages are contracts, that are easy to document, validate, and enforce.
  • While shared state is insanely sensitive to every possible aspect of the environment, Actor is insensitive to language, operating system, CPU architecture, time of day, etc
  • While shared state is sensitive to timing, and demands precise coordination and synchronization, Actor doesn't know or care. Tasks are asynchronous and do what they do as they want to do it.
  • While shared state looks calm and reliable, it cannot handle stress.  Actor on the other hand, performs as elegantly when hit by massive storms of data. It just crunches through the work without slowing or stopping.
  • While shared state code is complex and has many cross-thread dependencies, Actor code is serial, event driven and much easier to write.
  • While shared state code is practically impossible to fully test, Actor code is trivial to stress test and once it works, it always works.
Ironically, the reason IBM were able to run thousands of concurrent interactive sessions on their mainframes in the 80's and 90's was that they basically reinvented the Actor model and called it "CICS".  Mainframe transaction monitors turned COBOL sloths into nimble Actors. For decades the worlds airlines and banks depended on this to scale their applications up to handle tens of thousands of interactive users.

So, eliminate shared state, turn your application into tasks that communicate only by sending each other messages, and those tasks can run without ever locking or waiting for other tasks to make way for them.  It's kind of like discovering that hey, there are other Starbucks, other parking spaces, and frankly it's easy enough to give everyone theor own private city. Of course you need some good connectivity between your tasks.

How to use ØMQ for Multithreading
ØMQ is not RFC1149.  No bird seed, no mops.  Just a small library you link into your applications.  Let's look how to send a message from one thread to another.  This program has a main thread and a child thread.  The main thread wants to know when the child thread has finished doing its work:



//  Show inter-thread signalling using ØMQ sockets
#include "zhelpers.h"

static void * child_thread (void *context)
{
    void *socket = zmq_socket (context, ZMQ_PAIR);
    assert (zmq_connect (socket, "inproc://sink") == 0);

    s_send (socket, "happy");
    s_send (socket, "sad");
    s_send (socket, "done");

    zmq_close (socket);
    return (NULL);
}

int main ()
{
    s_version ();
    //  Threads communicate via shared context
    void *context = zmq_init (1);

    //  Create sink socket, bind to inproc endpoint
    void *socket = zmq_socket (context, ZMQ_PAIR);
    assert (zmq_bind (socket, "inproc://sink") == 0);

    //  Start child thread
    pthread_t thread;
    pthread_create (&thread, NULL, child_thread, context);

    //  Get messages from child thread
    while (1) {
        char *mood = s_recv (socket);
        printf ("You're %s\n", mood);
        if (strcmp (mood, "done") == 0)
            break;
        free (mood);
    }
    zmq_close (socket);
    zmq_term (context);
    return 0;
}


There is no direct mapping from traditional MultiTthreaded code to ØMQ code.  Whereas shared state threads interact in many indirect and subtle ways, ØMQ threads interact only by sending and receiving messages.


You should follow some rules to write happy multithreaded code with ØMQ:

  • You MUST NOT access the same data from multiple threads. Using classic MT techniques like mutexes are an anti-pattern in ØMQ applications. The only exception to this is a ØMQ context object, which is threadsafe.
  • You MUST create a ØMQ context for your process, and pass that to all threads that you want to connect via inproc sockets.
  • You MAY treat threads as separate tasks, with their own context, but these threads cannot communicate over inproc. However they will be easier to break into standalone processes afterwards.
  • You MUST NOT share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.
If you need to start more than one device in an application, for example, you will want to run each in their own thread. It is easy to make the error of creating the device sockets in one thread, and then passing the sockets to the device in another thread. This may appear to work but will fail randomly. Remember: Do not use or close sockets except in the thread that created them.

f you follow these rules, you can quite easily split threads into separate processes, when you need to. Application logic can sit in threads, processes, boxes: whatever your scale needs.

ØMQ uses native OS threads rather than virtual "green" threads. The advantage is that you don't need to learn any new threading API, and that ØMQ threads map cleanly to your operating system. You can use standard tools like Intel's ThreadChecker to see what your application is doing. The disadvantages are that your code, when it for instance starts new threads, won't be portable, and that if you have a huge number of threads (thousands), some operating systems will get stressed.

Let's see how this works in practice. We'll turn our old Hello World server into something more capable. The original server was a single thread. If the work per request is low, that's fine: one ØMQ thread can run at full speed on a CPU core, with no waits, doing an awful lot of work. But realistic servers have to do non-trivial work per request. A single core may not be enough when 10,000 clients hit the server all at once. So a realistic server must starts multiple worker threads. It then accepts requests as fast as it can, and distributes these to its worker threads. The worker threads grind through the work, and eventually send their replies back.

You can of course do all this using a queue device and external worker processes, but often it's easier to start one process that gobbles up sixteen cores, than sixteen processes, each gobbling up one core. Further, running workers as threads will cut out a network hop, latency, and network traffic.

The MT version of the Hello World service basically collapses the queue device and workers into a single process:

// Multithreaded Hello World server

#include "zhelpers.h"
#include <pthread.h>

static void *
worker_routine (void *context) {
// Socket to talk to dispatcher
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");

while (1) {
char *string = s_recv (receiver);
printf ("Received request: [%s]\n", string);
free (string);
// Do some 'work'
sleep (1);
// Send reply back to client
s_send (receiver, "World");
}
zmq_close (receiver);
return NULL;
}

int main (void)
{
void *context = zmq_init (1);

// Socket to talk to clients
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");

// Socket to talk to workers
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");

// Launch pool of worker threads
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// Connect work threads to client threads via a queue
zmq_device (ZMQ_QUEUE, clients, workers);

// We never get here but clean up anyhow
zmq_close (clients);
zmq_close (workers);
zmq_term (context);
return 0;
}

How it works:

  • The server starts a set of worker threads. Each worker thread creates a REP socket and then processes requests on this socket. Worker threads are just like single-threaded servers. The only differences are the transport (inproc instead of tcp), and the bind-connect direction.
  • The server creates a ROUTER socket to talk to clients and binds this to its external interface (over tcp).
  • The server creates a DEALER socket to talk to the workers and binds this to its internal interface (over inproc).
  • The server starts a QUEUE device that connects the two sockets. The QUEUE device keeps a single queue for incoming requests, and distributes those out to workers. It also routes replies back to their origin.
Note that creating threads is not portable in most programming languages. The POSIX library is pthreads, but on Windows you have to use a different API.

Here the 'work' is just a one-second pause. We could do anything in the workers, including talking to other nodes. This is what the MT server looks like in terms of ØMQ sockets and nodes. Note how the request-reply chain is REQ-ROUTER-queue-DEALER-REP:


Signalling

When you start making multithreaded applications with ØMQ, you'll hit the question of how to coordinate your threads. Though you might be tempted to insert 'sleep' statements, or use multithreading techniques such as semaphores or mutexes, the only mechanism that you should use are ØMQ messages. Remember the story of The Drunkards and the Beer Bottle.

Here is a simple example showing three threads that signal each other when they are ready.

fig22.png


// Multithreaded relay

#include "zhelpers.h"
#include <pthread.h>

static void *
step1 (void *context) {
// Connect to step2 and tell it we're ready
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step2");
s_send (xmitter, "READY");
zmq_close (xmitter);

return NULL;
}

static void *
step2 (void *context) {
// Bind inproc socket before starting step1
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step2");
pthread_t thread;
pthread_create (&thread, NULL, step1, context);

// Wait for signal and pass it on
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);

// Connect to step3 and tell it we're ready
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step3");
s_send (xmitter, "READY");
zmq_close (xmitter);

return NULL;
}

int main (void)
{
void *context = zmq_init (1);

// Bind inproc socket before starting step2
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step3");
pthread_t thread;
pthread_create (&thread, NULL, step2, context);

// Wait for signal
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);

printf ("Test successful!\n");
zmq_term (context);
return 0;
}



This is a classic pattern for multithreading with ØMQ:

  1. Two threads communicate over inproc, using a shared context.
  2. The parent thread creates one socket, binds it to an inproc:// endpoint, and then starts the child thread, passing the context to it.
  3. The child thread creates the second socket, connects it to that inproc:// endpoint, and then signals to the parent thread that it's ready.

Note that multithreading code using this pattern is not scalable out to processes. If you use inproc and socket pairs, you are building a tightly-bound application. Do this when low latency is really vital. For all normal apps, use one context per thread, and ipc or tcp. Then you can easily break your threads out to separate processes, or boxes, as needed.

This is the first time we've shown an example using PAIR sockets. Why use PAIR? Other socket combinations might seem to work but they all have side-effects that could interfere with signaling:

  • You can use PUSH for the sender and PULL for the receiver. This looks simple and will work, but remember that PUSH will load-balance messages to all available receivers. If you by accident start two receivers (e.g. you already have one running and you start a second), you'll "lose" half of your signals. PAIR has the advantage of refusing more than one connection, the pair is exclusive.
  • You can use DEALER for the sender and ROUTER for the receiver. ROUTER however wraps your message in an "envelope", meaning your zero-size signal turns into a multipart message. If you don't care about the data, and treat anything as a valid signal, and if you don't read more than once from the socket, that won't matter. If however you decide to send real data, you will suddenly find ROUTER providing you with "wrong" messages. DEALER also load-balances, giving the same risk as PUSH.
  • You can use PUB for the sender and SUB for the receiver. This will correctly deliver you messages exactly as you sent them and PUB does not load-balance as PUSH or DEALER do. However you need to configure the subscriber with an empty subscription, which is annoying. Worse, the reliability of the PUB-SUB link is timing dependent and messages can get lost if the SUB socket is connecting while the PUB socket is sending its message.

For these reasons, PAIR makes the best choice for coordination between pairs of threads.








Comments