Abstract
RabbitMQ (http://www.rabbitmq.com) is a popular Open Source
message queuing system that implements the Advanced Message Queuing Protocol
(AMQP). It has been estimated that there
are presently some 30,000 production deployments of RabbitMQ across the globe,
and this number is growing rapidly. Most of these deployments are
business-critical, underpinning everything from internet-based pizza ordering systems
through to providing the central nervous system for OpenStack-based cloud
deployments. RabbitMQ
currently supports versions 0.8.0 and 0.9.1 of AMQP and will soon also provide
support for 1.0. However, a somewhat overlooked capability of RabbitMQ is its
ability to also readily provide support via its flexible plugin architecture
for a variety other popular Open Source message queuing protocols, including
STOMP, MQTT, ZeroMQ, and RESTful messaging via the RabbitHub plugin. Most good
message queuing protocols share many features in common; however some are
better suited to a particular set of use cases than others. This ability of RabbitMQ to be
able to seamlessly receive and propagate messages simultaneously via multiple
protocols is an extremely powerful facility, and one that affords great
flexibility. For example, it means that it is possible to use the most
appropriate protocol for a particular function or to simultaneously use
different protocols to disseminate the same data to different types of users
via the most appropriate protocol without having to develop and maintain any
separate gateway components. The following text discusses this ability of
RabbitMQ to support multiple message queuing protocols and presents a number of
simple examples to illustrate how this facility may be used.
Introduction
I like
visiting Belgium. Aside from the fact the food is good and the beer excellent,
the people are friendly and I never have any communication problems, as just
about everyone speaks and understands English. I have always taken the somewhat
arrogant position that I do not need to learn another language; however the
truth of the matter is that I am not sure whether I could do it, and it
fascinates me greatly to observe how the vast majority of the Belgian
population seem to be able to speak at least three languages and seamlessly
interchange between them as the need arises (such as when dealing with
monoglots such as myself). Generally speaking my Belgian friends will have a
preferred language (one in which they are strongest) and for sure there may be
occasions when a particular word or colloquialism from one language will not be
understood or will not be interpreted correctly; however for the most part the
mapping is exceptionally good in terms of both accuracy and efficiency. Nor
does it seem to be a particular bother for my friends to contend with my
inadequacies; for them it is simply a normal part of life to speak multiple
languages, doing so even as part of day-to-day family life, where husband and
wife may have a different preferred language, and children will from birth
learn to communicate in both.
There are
clearly advantages to being able to communicate with people in multiple
languages, and in an analogous fashion there are advantages to message queuing
software being able to accommodate and seamlessly map between different message
queuing protocols. The world is not uniform, and while one day there may be
something approaching a common language that is spoken and understood by just
about everyone, there will always be accents, colloquialisms, and other
divergences that will ensure that total uniformity will never be achieved.
While it is
perhaps somewhat easier to argue the merits and feasibility of having a
standardised messaging protocol over the merits and feasibility of having a standardised
language for the entire human race, the notion of a “one-size-fits-all” messaging
protocol is similarly flawed. Some messaging protocols are optimised for
low-latency transmission of small messages, while others are designed for
efficient transmission of large messages; some messaging protocols sacrifice
performance for reliability, while others favour performance at the risk of
occasional message loss, and so on. Any attempt to accommodate all possible
messaging scenarios within the scope of a single ubiquitous protocol must
entail some level compromise, thereby making the resultant protocol less
well-suited to one or more messaging scenarios over a less generic protocol
that has been optimally designed for a specific use case.
Evolution
has not resulted in one type of plant or animal; it has resulted in a vast
array of forms with unique characteristics that allow them to take optimal
advantage of their particular environment. Similar comments are applicable to
message queuing protocols (and indeed to all other aspects of software). While
most good message queuing protocols share many features in common, some are better
suited to a particular task than others. Nor does evolution stand still, and
what may seem like a good idea today may not be such a good idea tomorrow. The
key therefore is adaptability. Just as with plant and animal species, if
software cannot readily (and potentially rapidly) adapt to change then it will
become extinct.
RabbitMQ (http://www.rabbitmq.com) is a
popular Open Source message queuing system that implements the Advanced Message
Queuing Protocol (AMQP). RabbitMQ currently supports versions 0.8.0 and 0.9.1
of AMQP and will soon also provide support for AMQP 1.0. However, a somewhat
overlooked but extremely powerful capability of RabbitMQ is its ability to also
readily provide support for other popular message queuing protocols via its
flexible plugin architecture. For example, there are plugins[1]
available for STOMP (http://stomp.github.com/), MQTT (http://mqtt.org/), ZeroMQ (http://www.zeromq.org/)[2],
and for RESTful messaging via HTTP (https://github.com/tonyg/rabbithub). Each of
these protocols was designed to address a particular subset of messaging use
cases, and while AMQP is for the most part able to accommodate all of these use
cases, the specific protocols are better-suited to the particular set of use
cases for which they were specifically designed. For example, MQTT (Message
Queue Telemetry Transport) was designed to facilitate the transfer of data from
pervasive devises over high-latency or otherwise constrained network links.
This task may be achieved using AMQP, but MQTT is arguably a more appropriate
choice of protocol for these types of scenario.
The fact that the RabbitMQ broker can support the MQTT protocol
via a plugin and can map between MQTT and its native AMQP protocol means for
example that data can be published into the messaging environment via MQTT and
disseminated to consumers via AMQP without developers needing to implement any specific
gateway components; they only need to write the MQTT producer and AMQP consumer
functions. Alternatively, data received via MQTT may be disseminated via one of
the other protocols listed above using the relevant plugin, or potentially a
combination of protocols could be used in tandem – a message published to a fanout
or topic exchange in the RabbitMQ broker via the STOMP protocol may for example
be simultaneously consumed via AMQP, HTTP, ZeroMQ, or STOMP-based consumers.
Being able to seamlessly publish messages via one protocol and
consume them via another (or via a combination of others) using this plugin
model is an incredibly powerful concept, and one that illustrates very well not
only the richness and flexibility of the AMQP 0.9.1 model but also the
flexibility and adaptability of RabbitMQ and of the Erlang programming
language, in which RabbitMQ is written (see http://www.erlang.org).
It should be
noted that all of the protocols mentioned above are essentially Open Standards,
and indeed one of the key motivations behind the creation of AMQP was to escape
the “middleware hell” (as it was described by John O’Hara, one of the fathers
of AMQP) of having multiple proprietary message queuing products that speak
different languages (protocols) scattered across the enterprise and having to
integrate between them by developing complicated gateways and adapters, which
then require costly on-going maintenance and support. However there is a
significant difference between developing gateways and adapters to bridge
proprietary messaging technologies and having a core open messaging technology
that can be readily extended via plugins to facilitate multi-protocol
communication amongst a set of Open Standards-based messaging protocols. To
extend the language analogy somewhat, it might be likened in part to hiring and
paying a translator versus learning to speak the language oneself; the latter
being a decidedly better option on multiple levels, assuming you have the
aptitude for it.
The
following text presents and discusses some simple examples of how these
RabbitMQ protocol plugins may be used. These examples by no means provide a
complete illustration of everything that can be done, but are intended to
illustrate some of the basic permutations that can be achieved, which will
hopefully serve to stimulate the reader to come up with other ideas. For additional
insights into what other permutations may be possible and for details about
installing and configuring the plugins, the reader should refer the relevant plugin
documentation (see links provided above).
Some examples
For the purposes of developing and testing the examples
presented below (and for a bit of fun) a 6-node RabbitMQ cluster was created on
HP Cloud (http://www.hpcloud.com) with 2
nodes in each Availability Zone (AZ). The configuration details common to all
cluster nodes were as follows:
- Ubuntu 12.04 LTS 64-bit, 8 vCPU, 32GB RAM
- RabbitMQ 2.8.7 (specifically build 2.8.7.31106 or later, as this includes some important fixes to the MQTT plugin)
- Erlang OTP 15B02 (Debian-based Linux distribution available from the Erlang Solutions repository; https://www.erlang-solutions.com/downloads/download-erlang-otp)
- RabbitMQ STOMP 2.8.7 plugin
- RabbitMQ MQTT 2.8.7 plugin
- RabbitHub plugin (version 0.0.1)
The cluster nodes in AZ1 had the RabbitHub plugin
enabled; the nodes in AZ2 had the STOMP plugin enabled; and the nodes in AZ3
had the MQTT plugin enabled. The RabbitMQ Management Plugin (http://www.rabbitmq.com/management.html) was
enabled on all cluster nodes. The example clients (producers and consumers)
described below were run from a personal laptop that communicated with the
cluster over the internet.
Whilst such a lavish configuration is most certainly not
required in order to experiment with the different protocol plugins (a
considerably smaller single RabbitMQ instance would generally suffice), using a
cluster with different plugins enabled on different nodes serves as a means of
demonstrating the perhaps obvious fact that messages published via one protocol
to one of the cluster nodes can be consumed via different protocols from other cluster
nodes.
A crude illustration of the cluster configuration is
provided below.
It should also be noted at this point that there is
nothing particularly special that needs to be done in order to install RabbitMQ
or to set up a RabbitMQ cluster on HP Cloud other than to ensure that all necessary
ports are open for the relevant security group(s). Depending on the types of
virtual instance used, the root file system (/) can be
quite small relative to the available RAM, and it may therefore also be wise to
consider placing the Mnesia partition under /mnt (which is
generally much larger) in order to avoid any flow control issues associated
with RabbitMQ’s disk space monitoring.
Example #1:
The first example illustrates using a simple STOMP
consumer to consume messages published to the cluster via AMQP. The code for
the consumer is shown below.
#include
<stdio.h>
#include
<stdlib.h>
#include
<string.h>
#include
"stompdef.h"
static
void die(void *handle)
{
fprintf(stderr, "%s\n",
STOMP_Errstr(handle));
exit(EXIT_FAILURE);
}
static
int foobar(void *handle, char *idata, size_t *ilen, char **odata, size_t *olen)
{
fprintf(stderr, "%.*s\n", (int)
*ilen, idata);
return (0);
}
int
main()
{
void * handle
= NULL;
if ((handle = STOMP_Init()) == NULL)
{
die(handle);
}
if (STOMP_Connect(handle, "az2-2xl-1",
61613, "guest", "guest") == -1)
{
die(handle);
}
if (STOMP_Subscribe(handle,
"/queue/stomp", 0, 0) == -1)
{
die(handle);
}
if (STOMP_Register(handle,
"/queue/stomp", foobar) == -1)
{
die(handle);
}
while (1)
{
if (STOMP_Consume(handle, NULL, NULL) ==
-1)
{
fprintf(stderr, "%s\n",
STOMP_Errstr(handle));
break;
}
}
if (STOMP_Disconnect(handle) == -1)
{
die(handle);
}
STOMP_Done(handle);
return (0);
}
The consumer uses a simple C API developed by the author
that supports STOMP 1.1[3]
and currently runs on HP OpenVMS and on most UNIX/Linux variants[4].
A detailed description of the API is beyond the scope of this document; however
the operation of the API should hopefully be reasonably evident from examining the
sample code. Possibly the only aspect of the code that warrants some
explanation at this time is the STOMP_Register() function.
This function can be used to associate a callback function (foobar() in this
case) with a particular destination. When STOMP_Consume() consumes a
message, it will examine the value of the destination header and will invoke
the associated callback, if one has been registered. A default callback may also
be specified, or STOMP_Consume() can itself
return directly the details of any messages that are read. To keep the sample code
as simple as possible, some of the more advanced features of STOMP such as
acknowledgements and transactions have not been used here; however the API does
support these functions.
Note that if you wish to retain consumed messages for
subsequent processing, it is important to take a copy of the message, as memory
allocated by the API for storage of consumed messages may be reclaimed by
subsequent API calls.
Running this program will result in the creation of a
persistent queue named “stomp” (if it did
not already exist) under the default vhost. As with all queues, “stomp” will be
automatically bound to the AMQP default exchange, and if we now publish a
message to the default exchange using a routing key of “stomp”, the
message will be received and displayed by the STOMP client. Running the
following piece of producer code (which uses the Pika Python AMQP library)
would cause the STOMP consumer to display the message “Hello STOMP
consumer”. Note that the message is published to a different
cluster node from that to which the consumer is connected.
import pika
conn =
pika.BlockingConnection(pika.ConnectionParameters(host='az1-2xl-1'))
chan = conn.channel()
chan.basic_publish(exchange='',
routing_key='stomp', body='Hello STOMP consumer!')
conn.close()
One limitation with STOMP that should not be forgotten is
that it is (as its name specifies) a text-based protocol, while AMQP can
accommodate data of any type. Certainly it may be possible to get around this
limitation of the STOMP protocol by encoding any binary data; however a better
approach would be to use another protocol better suited to the transmission of
such data, such as AMQP or MQTT. It might also be suggested that being
constrained to the transmission of textual messages is not a particularly
significant limitation, as the use of JSON-formatted messages is today commonplace,
and such messages may contain an assortment of data types, encoded or
otherwise.
Example #2:
This next example provides a simple illustration of how
messages published to a fanout exchange may be consumed via both AMQP and STOMP
clients.
The following code fragment shows the producer for this
example. Instead of publishing to the default exchange, messages are published
to the predefined fanout exchange amq.fanout with an
empty routing key (the routing key is not relevant in this situation since all
queues bound to a fanout exchange receive all messages, regardless of the
content of the routing key).
import pika
conn =
pika.BlockingConnection(pika.ConnectionParameters( host='az1-2xl-2'))
chan = conn.channel()
chan.basic_publish(exchange='amq.fanout',
routing_key='', body='Silly message')
conn.close()
The STOMP consumer for this example is listed below. The
key difference from the previous example is that instead of specifying a queue
destination in the call to STOMP_Subscribe(), we specify
an exchange destination (“/exchange/amq.fanout”).
Internally, the STOMP plugin creates an exclusive auto-delete queue for the
subscription and binds this queue to the specified exchange (amq.fanout). Note that
for this example we have also not bothered to register a callback function to
process messages, but instead use the last two arguments of the STOMP_Consume() call to
return consumed messages and their length.
As noted previously, if you wish to retain messages for
subsequent processing, it is necessary to make a copy of them, as memory
allocated internally by the API to hold message data may be freed or reused by
subsequent API calls.
#include
<stdio.h>
#include
<stdlib.h>
#include
<string.h>
#include
"stompdef.h"
static
void die(void *handle)
{
fprintf(stderr, "%s\n",
STOMP_Errstr(handle));
exit(EXIT_FAILURE);
}
int
main(int argc, char *argv[])
{
void * handle
= NULL;
size_t len;
char * data;
if ((handle = STOMP_Init()) == NULL)
{
die(handle);
}
if (STOMP_Connect(handle,
"az2-2xl-2", 61613, "guest", "guest") == -1)
{
die(handle);
}
if (STOMP_Subscribe(handle,
"/exchange/amq.fanout", 0, 0) == -1)
{
die(handle);
}
while (1)
{
if (STOMP_Consume(handle, &data,
&len) == -1)
{
fprintf(stderr, "%s\n",
STOMP_Errstr(handle));
break;
}
fprintf(stderr, "Received message:
%.*s\n", (int) len, data);
fprintf(stderr, "destination:
%s\n", STOMP_GetHeader(handle, "destination"));
}
if (STOMP_Disconnect(handle) == -1)
{
die(handle);
}
STOMP_Done(handle);
return (0);
}
In addition to displaying the message text, the above
code also outputs the value of the STOMP destination header, which is obtained
using the STOMP_GetHeader() function.
When using the STOMP plugin, it is important to
understand how STOMP destinations are mapped to exchanges, queues, and routing
keys. The examples provided here are trivial illustrations of what is possible
in this regard, and a complete description of these mappings can be found at http://www.rabbitmq.com/stomp.html.
The AMQP consumer for this example is as follows. If you run
this and the STOMP consumer and then run the producer, the message will be
received and displayed simultaneously by both consumers.
import pika
creds = pika.PlainCredentials(username='guest',
password='guest')
params =
pika.ConnectionParameters(host='az3-2xl-2', credentials=creds)
conn = pika.BlockingConnection(params)
chan = conn.channel()
res =
chan.queue_declare(exclusive=True)
q = res.method.queue
chan.queue_bind(exchange='amq.fanout',
queue=q)
def callback(chan, method,
properties, body):
print "%r" % (body,)
chan.basic_consume(callback,
queue=q)
chan.start_consuming()
Example #3:
This next example extends the previous example by adding
an HTTP consumer, using the RabbitHub plugin.
The following trivial Ruby Sinatra script provides the
necessary consumer code. The script implements a single route named “/hubsub” and
specifies both get and post verbs (methods)
for this route, such that HTTP GET requests for /hubsub will run
the get method, and POST requests
will execute the post method.
require 'sinatra'
get '/hubsub' do
puts "Received #{params}"
puts "Responding to challenge
request..."
params[:"hub.challenge"]
end
post '/hubsub' do
puts "Received message: #{params}
"
end
Note that both HTTP POST and GET requests for
the same route must be supported in this way, as RabbitHub uses both methods
with the same URL for different purposes.
Before the above script will be posted any messages by RabbitHub, it must first be registered. This can be achieved using a simple cURL command such as the following:
Before the above script will be posted any messages by RabbitHub, it must first be registered. This can be achieved using a simple cURL command such as the following:
curl -vd "hub.mode=subscribe&hub.callback=http://10.1.1.251:4567/hubsub&hub.topic=foo\
&hub.verify=sync&hub.lease_seconds=86400"
\
http://guest:guest@az1-2xl-1:55670/subscribe/x/amq.fanout
Running this command will cause the RabbitHub plugin
(listening at http://az1-2xl-1:55670) to issue an
HTTP GET request to the Ruby Sinatra script listening at http://10.1.1.251:4567/hubsub. This
request consists of a simple “challenge” string that must be responded to be
sending back to RabbitHub the same string. If this process completes successfully
then the endpoint http://10.1.1.251:4567/hubsub will be
registered with RabbitHub and this endpoint will be POSTed any
messages that are published to the amq.fanout exchange.
This may be readily verified by running the Pika publisher from the previous
example. In addition to the STOMP and AMQP consumers receiving any messages
that are published, messages should also be received and displayed by the Ruby
Sinatra script.
Before moving on to the next example, there are a several
points regarding the operation of RabbitHub that should be mentioned. Firstly,
it should be noted that when subscribing it is possible to specify the
subscription duration in seconds via the hub.lease_seconds parameter
in the subscription GET request. For the cURL command
shown above, a subscription duration of 1 day (86400 seconds) has been
specified. Once this duration is reached, the subscription will be terminated
by RabbitHub, and no more messages will be posted to the associated endpoint. If
no duration is specified when creating the subscription, an essentially
infinite value is used. More information regarding this and other options
supported by RabbitHub may be found at https://github.com/tonyg/rabbithub.
Another important factor that must be considered when
using RabbitHub to deliver messages via HTTP is the performance of HTTP
relative to that of AMQP. For a given network, HTTP will invariably be
considerably slower than AMQP (or indeed any of the other protocols mentioned
here), and care must be taken to ensure that HTTP endpoints are able to keep up
with message rates and will not precipitate RabbitMQ flow control to prevent
messages being published faster than they can be consumed.
Example #4:
In addition
to consuming messages via protocols other than AMQP, it is also straightforward
to publish messages via other protocols. For example, the following cURL
command may be used to publish a message via HTTP to the RabbitHub plugin that
will be delivered into all queues that are bound to the amq.fanout exchange. Assuming that all of the consumers
described in Examples #2 and #3 above were running, all would receive a copy of
the message.
curl -v -d "Hello via
HTTP" \
http://guest:guest@az1-2xl-1:55670/endpoint/x/amq.fanout?hub.topic=anything
Similarly, messages may be published via the STOMP
protocol. The following script uses the stomp.py
Python client to publish a message into RabbitMQ via the STOMP plugin that
again will be delivered into all queues bound to the amq.fanout exchange. Additional information about stomp.py may be found at http://code.google.com/p/stomppy/.
import sys
import stomp
conn = stomp.Connection([('az2-2xl-1',
61613)], 'guest', 'guest')
conn.start()
conn.connect(wait=True)
conn.send('This is a
test!', destination='/exchange/amq.fanout')
conn.disconnect()
Example #5:
This and the next example illustrate the use of the
relatively new MQTT adapter, which provides RabbitMQ with the ability to
support the MQTT 3.1 protocol. As with the plugins used for the previous
examples, it is possible via the MQTT adapter to seamlessly mix the use of MQTT
with other protocols.
The following C program illustrates a simple MQTT
consumer implemented using the Paho client API (see http://andypiper.co.uk/2012/03/10/paho-gets-started/ for a good
overview on getting stated with the Paho client). When this program is run, it
will connect to the MQTT adapter, and assuming that the client it successfully
authenticated, the adapter will create a queue for the consumer with a name
derived from the MQTT client ID and QOS (quality of service), and this queue
will be bound by default to the amq.topic exchange
with a routing key of “MQTT Examples”[5]. Once this infrastructure has been set up, the
consumer will listen for messages.
#include
"stdio.h"
#include
"stdlib.h"
#include
"string.h"
#include
"MQTTClient.h"
#define
ADDRESS
"tcp://az3-2xl-1:1883"
#define
CLIENTID "ExampleClientSub"
#define
TOPIC "MQTT Examples"
#define
QOS 1
#define
TIMEOUT 10000L
volatile
MQTTClient_deliveryToken deliveredtoken;
void
delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d
delivery confirmed\n", dt);
deliveredtoken = dt;
}
int
msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message
*message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void
connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int
main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts =
MQTTClient_connectOptions_initializer;
int rc;
int ch;
MQTTClient_create(&client, ADDRESS,
CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = "guest";
conn_opts.password = "guest";
MQTTClient_setCallbacks(client, NULL,
connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(client,
&conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return
code %d\n", rc);
exit(-1);
}
printf("Subscribing to topic %s\nfor
client %s using QoS%d\n\n"
"Press Q<Enter> to
quit\n\n", TOPIC, CLIENTID, QOS);
MQTTClient_subscribe(client, TOPIC, QOS);
do
{
ch = getchar();
} while(ch!='Q' && ch != 'q');
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
If we now use the following Pika Python script to publish
a message via AMQP to the amq.topic exchange using
a routing key of “MQTT Examples”, the message
will be received and displayed by the MQTT consumer.
import pika
connection =
pika.BlockingConnection(pika.ConnectionParameters(host='az1-2xl-1'))
channel =
connection.channel()
channel.basic_publish(exchange='amq.topic',
routing_key='MQTT Examples', body='Hello World!')
connection.close()
Example #6:
This final example extends the previous example by adding
an MQTT publisher and a STOMP consumer to the configuration.
The code for the publisher (again implemented using the
Paho C API) is illustrated below. Messages will be published via the adapter to
the RabbitMQ amq.topic exchange,
whereupon they will be routed to any queues bound to the amq.topic exchange
with routing key “MQTT Examples”.
#include
"stdio.h"
#include
"stdlib.h"
#include
"string.h"
#include
"MQTTClient.h"
#define
ADDRESS
"tcp://az3-2xl-1:1883"
#define
CLIENTID "ExampleClientPub"
#define
TOPIC "MQTT Examples"
#define
PAYLOAD "Hello World!"
#define
QOS 1
#define
TIMEOUT 10000L
int
main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts =
MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg =
MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
MQTTClient_create(&client, ADDRESS,
CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = "guest";
conn_opts.password = "guest";
if ((rc = MQTTClient_connect(client,
&conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return
code %d\n", rc);
exit(-1);
}
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC,
&pubmsg, &token);
printf("Waiting for up to %d seconds
for publication of %s\n"
"on topic %s for client with
ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD,
TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client,
token, TIMEOUT);
printf("Message with delivery token %d
delivered\n", token);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
In addition to consuming messages via the MQTT consumer
presented in the previous example, we can also simultaneously consume these
messages via STOMP simply by changing the subscription in the consumer code for
Example #2 to specify the following topic destination. Alternatively an
explicit exchange destination could be used (see https://www.rabbitmq.com/stomp.html for
additional information).
if (STOMP_Subscribe(handle,
"/topic/MQTT Examples", 0, 0) == -1)
{
die(handle);
}
Summary
It has been
estimated that there are some 30,000 deployments of RabbitMQ across the globe
(and this number is growing rapidly, somewhat in accordance with the
reproductive rate of its mammalian namesake), handling everything from internet
pizza orders through to providing the central nervous system for
OpenStack-based cloud deployments. It may be speculated that the individuals
who chose RabbitMQ for these deployments for the most part were not concerned
about what protocol standard the software supported; this was largely an
irrelevance to most of them or at most an interesting side note. What they arguably
cared more about was that the software did the job required and did it well,
and that it was easy to use with good administrative tools, had good support
and documentation, and that there was a client API available for their language
or choice. Having standards is important from an interoperability perspective,
but it becomes less relevant in an Open Source world and where wire protocols
are fully open and readily amenable to the implementation of client API’s in any
programming language; it also becomes less relevant when the messaging product
is highly adaptable and able to natively support and map between different Open
Source messaging protocols and protocol versions in an efficient and seamless
manner.
RabbitMQ is a humble and modest polyglot that does not boast about
its ability to speak multiple languages and different dialects of the same
language[6],
but perhaps it should. The ability to be able to seamlessly receive and
propagate data via multiple protocols affords great flexibility. It means that
it is possible to use the most appropriate protocol for a particular function
or to simultaneously use different protocols to disseminate the same data to
different types of users via the most appropriate mechanism without having to
develop and maintain separate gateway components. Other than enabling the
appropriate plugin, in general all that is required is the specification a few
configuration details and potentially some level user management. For example,
while AMQP might be the best way to distribute data to users directly connected
to the corporate LAN, it may be more appropriate for remote field workers using
mobile devices to receive the same data via HTTP, which can be readily
facilitated by enabling the RabbitHub plugin and subscribing the users to the
relevant data feeds.
There are client APIs available in numerous languages for all of
the messaging protocols mentioned here (ZeroMQ, STOMP, HTTP, MQTT), and none of
the protocols is particularly complex to understand or use. While it is true
that some of these protocol plugins are still evolving and that there are some
limitations in terms of fully mapping some of the protocols to the AMQP 0.9.1
model, these matters are quite insignificant compared to the capabilities that
these plugins provide. The software is actively supported by the RabbitMQ team
and the wider RabbitMQ community, and it is very likely that support for
additional protocols will be provided in the future, along with ongoing
enhancements to existing plugins.
[1] Details of
these and other plugins can be found via the RabbitMQ web site (http://www.rabbitmq.com). Note that
in addition to the protocol plugins discussed in this document, there are also
plugins for different authentication mechanisms, federation, message replication,
management, alternative exchange types, and numerous other functions.
[2] ZeroMQ is not strictly an Open Standard; however this seems to
have had little bearing on its continued impressive rate of adoption, which is
perhaps indicative of the fact that the overall quality and usefulness of a
particular piece of Open Source software is of considerably greater importance
to its users than the standards it supports.
[3] Version 1.2
of the STOMP protocol specification was published October 22nd 2012,
and includes several updates. The API will be updated in due course to include
support for these updates.
[4] Details of
other STOMP client implementations in various languages (and in various states
of completeness) may be found at http://stomp.github.com/implementations.html. It is
hoped that the C API used for some of the examples in this document can be made
available in the not too distant future.
[5] An
alternative exchange name and various other adapter settings can be specified in
the RabbitMQ configuration file (rabbitmq.config). See http://hg.rabbitmq.com/rabbitmq-mqtt/file/default/README.md for details of the various configuration
options.
[6] AMQP 1.0 is
arguably not a dialect of earlier versions of the protocol, being a
considerable departure from earlier versions in several respects (see http://www.amqp.org for more information), and it will be most
interesting to monitor the adoption rate of 1.0 or whether developers prefer to
continue using the 0.9.1 model. The motivations behind the design and inception
of AMQP 1.0 are possibly a topic for subsequent discussion. There has already
been much discussion on this topic – see for example http://www.250bpm.com/blog:11, which provides some excellent perspectives
on the matter.