ZeroMQ Messaging

NOTE:
The ZeroMQ Source code is available from the ZeroMQ Web site. This website provides examples and is not a mirror for downloading ZeroMQ.

(Request/Response pattern example: C/C++ Server / Client,  BASIC Server / Client. Compiled projects and dll library can be downloaded.)

See the API Reference for more information on each of the functions in the API and also see the ØMQ Guide.


ZeroMQ Introduction


If you are new to messaging, the benefits are that you can implement multiple connections to multiple "end points" (like clients or servers for example) and distribute work over processor cores, networked machines and even remote machines via the internet.

This is significant for server side implementation, because rigid development design often breaks when it needs to scale. This can be a significant issue for developers that build database servers and get hit with unexpected traffic. Just when the bean counters are rubbing their hands together picturing a wall street IPO and new Ferrari's all round, the business expansion hits an invisible wall.... oh, and you're fired.  (I have actually watched this play out)

ØMQ delivers blobs of data (messages) to nodes. You can map nodes to threads, processes, or boxes. It gives your applications a single socket API to work with, no matter what the actual transport (TCP, inter-process, multicast etc). It automatically reconnects to peers as they come and go. It queues messages at both sender and receiver, as needed. It manages these queues carefully to ensure processes don't run out of memory, overflowing to disk when appropriate. It does all I/O in background threads and handles socket errors. It uses lock-free techniques for talking between nodes, so there are never locks, waits, semaphores, or deadlocks.

The concept of messaging is that endpoints can be "dumb" meaning they can function without having to know anything about any other part of the processing. A message is discrete and contains a BLOB payload so an SQL endpoint can receive a request and produce a result set without knowing where it is to be delivered to or worrying about accounting for lost bytes. Clients produce a message and wait for the response. Servers receive a message, complete some task and return a response. Because a client can send work out to multiple servers (across multiple cores/machines/networks) you can instantly add horsepower without having to re-write all your code. Even with badly implemented code, messaging middleware provides scaling with little overhead.

From the ØMQ website: "After participating in the standardization of the AMQP protocol almost from it’s inception in 2004, we came to realize that the major obstacle to its adoption was it’s inherent complexity. ØMQ ("Zero-Em-Queue") started life in 2007 as an iMatix project to build a low-latency version of the OpenAMQ messaging product. The ØMQ project was built around an open source community of expert engineers from iMatix, Cisco, Intel, Novell, and other firms. The project has always focussed on benchmarking, transparent tests, and user-driven growth. It is a small, fast, and free software library that gives you message-passing concurrency for applications in most common languages. It's portable, runs on lots of operating systems and has loads of language interfaces enabling your applications to harness many cores, and many boxes."

Unlike heavyweight messaging middleware, ØMQ does NOT have a central hub (broker - "hub and spoke" architecture) through which all messages pass and in which messages are queued or managed. This means there is no single point of failure. ØMQ is to messaging what SQLite is to databases. It requires no configuration and is lightweight and flexible. The server app must be running before your client app can send messages to it or they will be queued and delivered to the server once it's started. This brokerless model is particularly interesting for minimum latency and maximum performance.

The authors argue that any RPC mechanism (HTTP, AJAX, REST etc) is in fact a leaky abstraction on top of message-passing and that the message-passing model is far superior to the shared-state model for writing distributed applications. ØMQ is primarily intended to power stock trading business where performance is crucial.

ØMQ is evoloving fast. While the design concepts remain strong as witnessed by the increasing bravado of the authors, the implementation is racing to keep up with some fundamental problems exposed by the wider use base (Assert conflicts et al). A high level C binding has emerged called czmq that implements a nice wrapper to make writing applications faster and more concise. Why C? well it remains one of the few languages that is highly portable and library independant. The manual states: The general model is that of a class (the source package) that provides objects (in fact C structures). The application creates objects and then works with them. When done, the application destroys the object.


Communication Transport

Server applications can run anywhere (over the network or the internet) as the communication utilizes UDP via IP. TCP via IP was chosen very deliberately because, despite its flaws with raw throughput (throttling etc), it works very reliably and is widely available. TCP does a great job even though it is 1-to-1 and byte oriented. ØMQ utilizes the socket paradigm and offers four different transports:
  1. INPROC an In-Process communication model
  2. IPC an Inter-Process communication model
  3. MULTICAST multicast via PGM, possibly encapsulated in UDP
  4. TCP a network based transport
The TCP transport is often the best choice, it is very performant and robust. However, when there is no need to cross the machine border it can be interesting to look at the IPC or INPROC protocol to lower the latency even more. IPC is basically abstracted from LAN but it is not implemented on windows yet due to the UNIX origins ofht e code.INPROC is an inter-thread protocol that requires binding before connecting.

ØMQ offers four core patterns of operation (the complete list of patterns is in the manual). A pattern is nothing more than a pair of sockets with matching types.


1. Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.






2. Publish Subscribe (Pub Sub) which connects a set of publishers to a set of subscribers. This is a data distribution pattern. Note: ØMQ sends the channel as part of the message, use a NULL string for the Sub Name in zmq_setsockopt()  to subscribe to all channels on that port.








3. Pipeline, connects nodes in a fan-out / fan-in pattern that can have multiple steps using PUSH/PULL, and loops. This is a parallel task distribution and collection pattern.






4. Exclusive pair, which connects two sockets in an exclusive pair. This is a low-level pattern for specific, advanced use-cases. We'll see an example at the end of this chapter



Interestingly, because of the implications of the messaging model (that all endpoints are essentially dumb, meaning they are totally independant and have no coupling with other componants) it becomes possible to extend and combine patterns within a distributed system of compnants that connect using the same or different messaging patterns:














The primary benefit of ØMQ is the message queuing and task management without the use of locks. The focus has been on speed and with very small messages, it can theoretically handle a 10GB switch which is essentially an end to end latency of 13.4 usec or 8 million messages per second.

A big advantage of ØMQ is that it does not force a payload protocol on you. Your messages can be designed to be anything that makes sense for your implementation and are just treated as a BLOB. The biggest drawback of the AMPQ spec is that it has become a hopeless sprawling mess. There are many comparisons of messaging middleware, but in the end, simplicity wins the day for most projects.

ØMQ is a fairly simple programming interface that allows you to create a messaging system.The C Dll is around 600k and the API is quite straight forward to implement in most languages that can call using the standard C CDECL calling convention (windows)


In this article we look at the C API in particular, and though latency is a few microseconds above the raw C++ code (~35 us) until network limit (1Gb/sec) is reached. If messages are large enough to exhaust the network (~256 bytes) the throughputs of C++ and C are exactly the same. Messages are simply a stream of bytes preceeded by a length descriptor:


For more in depth information see the abundant articles written by the authors. An excellent presentation from Pieter then Martin (9:45) is in this video download. They explain the design choices implemented in ØMQ implementation including why it makes sense to offload most functionality and use proven layers like TCP/IP.


End Point Implementations


ØMQ servers probably makes a good candidates for a windows service. As you can see in the example Server code, the flow loops back to the zmq_recv() call which blocks there until a new request is received. ØMQ clients can be bolted to just about any application as a means of farming out work load. The messages sent by the client are sent asynchronously so your application does not have to wait for the message to be processed, an important element of an event-based framework. ØMQ (0.3) allows a single thread to handle an unlimited number of underlying network connections.

ZMQ has four unique transports:
1. INPROC  -  In-Process communication model
2. IPC
  - Inter-Process communication model
3. MULTICAST
- multicast via PGM
4. TCP
  - network based transport

The TCP transport is often the best choice, it is very performant and robust. IPC or INPROC protocols lower latency when there is no requirement to go outside the local machine. MULTICAST is for special cases but it is probably best to go with TCP as the other transports are not guaranteed to be available on the different platforms. I am going to deal with TCP assuming you are considering messaging as a means of assigning work scaleably to different machines in some sort of distributed system.


In general the Server will bind to a port first and then the client will connect.

ØMQ offers three devices, one for each pattern:
1.
QUEUE for request/response pattern
2. FORWARDER
for publish/subscribe pattern
3. STREAMER
for pipelining pattern

(There is actually a fourth pattern, PAIR, but this will block once the high watermark is reached making it suitable in carefully thought out circumstances only.)

The  request/response pattern works well for most messaging and protocols like HTTP, SMTP etc.



The Socket Object

A major difference between a ØMQ application and a conventional networked application is that one socket handles all incoming/outgoing connections. When you publish to a thousand subscribers, it's via one socket. When you distribute work among twenty services, it's via one socket. When you collect data from a thousand web applications, it's via one socket.

In general you should create a new socket flow for each type of problem you need to solve. If you have a pub/sub model but you need to sync it, then add a Request/Response pattern to do that (fig 23 above)

Socket objects zmq_socket  can occur zero or more times within a device object, and have these properties:

  • The name is any value except "type".
  • "type" - (string) - specifies the type of the socket.
  • "bind" - (string) - specifies zero or more endpoints to bind the socket to.
  • "connect" - (string) - specifies zero or more endpoints to connect the socket to.
  • "option" - (object) - specifies configuration of the socket.

Available socket types:

  • "sub" - ZMQ_SUB
  • "pub" - ZMQ_PUB
  • "req" - ZMQ_REQ
  • "rep" - ZMQ_REP
  • "xreq" - ZMQ_XREQ
  • "xrep" - ZMQ_XREP
  • "push" - ZMQ_PUSH
  • "pull" - ZMQ_PULL
  • "pair" - ZMQ_PAIR

As covered here, the procedural steps for a simple Client & Server Request/Response pattern are:

Your Client will initialize a ZMQ_REQ socket because it will initiate the request by calling zmq_send.
Your Server will initialize a ZMQ_REP becuase it awaits a request and then produces a response.

Client

Server
Since the server must wait for requests to arrive, it must block. This can either be done by calling zmq_recv() in blocking mode, or polling with the zmq_poll() function which provides a mechanism for applications to multiplex input/output events in a level-triggered fashion over a set of sockets. If you need to terminate a server that is in blocking mode, the API provides a solution. Pass the context handle to another thread and call zmq_term(). That will cause zmq_recv() (and any other subsequent message function) to return with error code ETERM allowing you to terminate the server.


Messages

ØMQ messages are blobs of any size from zero to n bytes long (that will fit in the available memory). They have no format. It is up to you to design a message format that represents your data. An obvious example is JSON, XML etc. Messages are in fact instances of zmq_msg_t structures (or classes depending on your language) and you create and pass around zmq_msg_t objects, not blocks of data. To release (not destroy) a message you call zmq_msg_close(3). This drops a reference, and eventually ØMQ will destroy the message.

After you have passed the message (structure) to zmq_send(3) the message will be "cleared" (size set to zero) and you cannot access it any longer. You cannot send the same mesage twice without cloning it first using zmq_msg_copy(3).

Some other things that are worth knowing about messages:

  • ØMQ sends and receives them atomically, i.e. you get a whole message, or you don't get it at all.
  • ØMQ does not send a message right away but at some indeterminate later time.
  • You can send zero-length messages, e.g. for sending a signal from one thread to another.
  • A message must fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as a separate message.
  • You must call zmq_msg_close(3) when finished with a message, in languages that don't automatically destroy objects when a scope closes.

Multipart messges are used for sending larger amounts of data as a single message. If you are sending a message with five parts, you must construct, send, and destroy five zmq_msg items. THe first four parts are sent with:

  • zmq_send (socket, &message, ZMQ_SNDMORE);

The last part is sent with

  • zmq_send (socket, &message, 0);
After each receive has completed in the client, you check if there is more data with:
  • zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
If there is, loop back and receive it.

Some things to know about multipart messages:

  • When you send a multipart message, the first part (and all following parts) are only sent when you send the final part.
  • If you are using zmq_poll(3), when you receive the first part of a message, all the rest have also arrived.
  • You will receive all parts of a message, or none at all.
  • Each part of a message is a separate zmq_msg item.
  • You will receive all parts of a message whether or not you check the RCVMORE option.
  • On sending, ØMQ queues message parts in memory until the last is received, then sends them all.
  • There is no way to cancel a partially sent message, except by closing the socket.

To do zero-copy you use zmq_msg_init_data(3) to create a message that refers to a block of data already allocated on the heap with malloc(), and then you pass that to zmq_send(3). When you create the message you also pass a function that ØMQ will call to free the block of data, when it has finished sending the message. This is the simplest example, assuming 'buffer' is a block of 1000 bytes allocated on the heap:

In this way, you can send multiple buffers from different sources as individual message parts. We send each field as a length-delimited frame. To the application it looks like a series of send and recv calls. But internally the multiple parts get written to the network and read back with single system calls, so it's very efficient.

There is no way to do zero-copy on receive: ØMQ delivers you a buffer that you can store as long as you wish but it will not write data directly into application buffers.



Pub-Sub matches messages based on the prefix. Putting the key into a separate frame makes the matching very obvious, since there is no chance an application will accidentally match on part of the data. The ØMQ subscription filter rejects or accepts the entire multipart message (key plus data) based on the key. You will never get part of a multipart message.

Message size is defined in the header file as:
ZMQ_MAX_VSM_SIZE = 30 bytes.
This can obviously be changed in your application for longer messages, but generally messages are stored on the stack if they are small or the heap if they are larger messages (depending upon the implementation).

Polling

When using a Pub/Sub pattern, the subscriber may be waiting for some time before a message arrives. Since zmq_recv() blocks, this will cause your application to freeze making user interaction impossible unless you utilize a second thread for the blocking call. Another approach is to use zmq_poll(3). This function call can be put in a tight loop (and will not tie up the processor) returning the flag revents (a bitmask with ZMQ_POLLIN) when a message is available. Even though zmq_recv() blocks, because there is a message there it can be called without setting the non blocking option.

This design allows all aspects of a connection to be managed from a single thread. There are many examples in C in the download.


Performance & Memory

When calling zmq_recv(), the api function zmq_msg_data() returns a pointer to the msg buffer, so the application may use the buffer without copying the data out of the message struct. That data buffer remains viable until zmq_msg_close() is called because that function will trigger a call to free (since the library handled the memory allocation on the receive side).  Performance will be affected if you perform memory copying for each message.

In high performance networking, copying data must be avoided wherever possible. This article demonstrates the impact of single copy of the data on latency. It shows, for example, that for 256MB of data, single copy can increase latency by 0.1 second. Data must be copied from memory to a network interface card and vice versa. This cannot be avoided, but copying user data into ØMQ messages can.

Consider the following example. We create a message million bytes long and copy the user data into it before sending:

zmq_msg_t msg;
zmq_msg_init_size (&msg, 1000000);
memcpy (zmq_msg_data (&msg), buffer, 1000000);
zmq_send (s, &msg, 0);
But since we have the data in a buffer we need to pass a pointer to it instead of copying it to the message. To accomplish this we define a deallocation function for the buffer, and pass this to ØMQ along with the buffer:
void my_free (void *data, void *hint)
{
free (data);
// buffer allocated with malloc() is now freed
}

Once the deallocation function is defined we can create a "zero-copy" message and pass it the buffer and deallocation function:

zmq_msg_t msg;
void *hint = NULL;
zmq_msg_init_data (&msg, buffer, 1000000, my_free, hint);
zmq_send (s, &msg, 0);

The buffer is now owned by the message. It will be deallocated once the message is sent. DO NOT deallocate the buffer in your code.


Some notes on using the HWM option:

  • This affects both the transmit and receive buffers of a single socket. Some sockets (PUB, PUSH) only have transmit buffers. Some (SUB, PULL, REQ, REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both transmit and receive buffers.
  • When your socket reaches its high-water mark, it will either block or drop data depending on the socket type. PUB sockets will drop data if they reach their high-water mark, while other socket types will block.
  • Over the inproc transport, the sender and reciever share the same buffers, so the real HWM is the sum of the HWM set by both sides. This means in effect that if one side does not set a HWM, there is no limit to the buffer size.




Multi threading for Multi Cores  Multi Threading and the Actor Model

Today, multi-core CPUs have become ubiquitous. While clock speeds are stable at around 2-3GHz, the number of cores per chip is doubling every couple of years, and this is likely to continue because CPU designers have signaled multi-core to be the next growth avenue for their products. 

However, traditional compilers are not well equipped to leverage this new evolution. The most widely used languages, C and C++, do not offer any support for concurrency so developers typically roll their own by using threading APIs. Languages that do support concurrency (Java, Python, .NET, Ruby) do it in a brute-force manner. Depending on the implementation - there are over a dozen Ruby interpreters, for example - they may offer "green threading" or true multi threading. Neither approach scales, due to reliance on locks. Only Erlang was designed specifically with multi threading in mind.

As covered in this article, the four deadly horsemen of performance are

 1. Data copies
 2. Context switches
 3. Lock contention
 4. Memory allocation

For this reason, single threaded processing is dramatically faster when compared to multi-threaded processing, because it involves no context switching and synchronisation/locking mechanisms. In addition (as mentioned in this article) traditional approaches have many inherent problems like
  • Forgotten synchronization
  • Incorrect granularity
  • Read and write tearing
  • Lock-free reordering
  • Lock convoys
  • Two-step dance
  • Priority inversion
Short of writing in a language designed around multi-threading (Erlang), messaging is the only way to write conventional code that scales to any number of cores, avoids all locks, costs little more than conventional single-threaded programming, is easy to learn, and does not crash in strange ways. ØMQ passes messages between threads using "lock-free" algorithms. The result is that ØMQ application code looks just like ordinary single-threaded code, except that it processes messages. For developers that need scalability across multiple cores/boxs ØMQ provides a simple reliable solution.

Using just a single thread within ØMQ (thus avoiding synchronization altogether) is not possible. ØMQ should process incoming messages even when client application is busy processing business tasks. If it were not so, incoming messages would be delayed on the other side of the connection until the client application would cede control to ØMQ, thus damaging latency in a severe manner. Therefore, the minimal number of threads required is two.

Although the lock-free and wait-free synchronization is extremely fast, it is still not enough to get to several millions messages a second. To Achieve this, messages are batched avoiding synchronization altogether for most messages. For example, imagine that synchronization is done only for each 100th message. Given that actual enqueueing and dequeueing of messages is almost for free, the time needed to pass a given amount of messages through the pipe will only be 1% with batching.



Network Addressing


Addresses are composed of the name of the protocol to use followed by a colon (:) and a protocol-specific address.
Available protocols:

tcp
The address is composed of IP address and port delimited by colon sign (:). The IP address can be a hostname (with 'connect') or a network interface name (with 'bind').
  

Examples
  • "tcp://eth0:5555"
  • "tcp://192.168.0.1:20000"  
  • "tcp://www.somehost.com:80"
  • "tcp://*:5555" - * for wildcard (INET_ANYADDR)
  • "tcp://lo:5555" - Note: interface names like "lo" are OS specific and thus non-portable
A server node can bind to many endpoints and it can do this using a single socket. This means it will accept connections across different transports but you cannot bind to the same endpoint twice, that will cause an exception. Each time a client connects, the server node's socket gets another connection. There is no inherent limit to how many connections a socket can have. A client node can also connect to many endpoints using a single socket.

In most cases, which node acts as client, and which as server, is about network topology rather than message flow. However, there are cases (resending when connections are broken) where the same socket type will behave differently if it's a server or if it's a client.



pgm & udp
Both protocols have same address format. It's network interface to use, semicolon (), multicast group IP address, colon (:) and port.
    Examples:
  • "pgm://eth2;224.0.0.1:8000"
  • "udp://192.168.0.111;224.1.1.1:5555"




Load Distribution


ØMQ comes alive when we want to distribute requests amongst multiple Servers. To accomplish this we create multiple instances of the Server each listening on a different port. Now when we create the Client connection we specify all the ports on which servers are listening for requests. ØMQ will load balance the requests amongst the machines in a round robin fashion. Obviously the major benefit of using TCP is that we can now distribute load to other machines and harness their horsepower. Since ØMQ has more than ten language bindings and can run on multiple platforms, you have the ability to utilize almost any hardware.




Limitations
While ØMQ is fast and lightweight but it offers no persistence or transactions. There is also no way to detect failed or slow consumers; meaning messages can get dropped without any notification to the producer.

Further Reading
Linnux C++ pseudo code.
Discussing other patterns (with diagrams)
The ØMQ website has some code examples on the cookbook page. Keep in mind that on windows, Winsock must be initialized to use the C code example if you copy and paste it directly into a compiler.



Comments