Sunday, March 14, 2010

Get started with RabbitMQ

RabbitMQ is an open-source implementation of AMQP. If you don't know what AMQP is, I encourage you to check it out on the official web site, or alternatively read articles listed on the reference page. Here I want to mention only the reasons why it drew my attention as an Erlang enthusiast and Java developer working in financial industry:
  • AMQP is a replacement for TIBCO Randezvous;
  • in terms of functionality it's a superset of JMS;
  • it's written in Erlang, which means fault-tolerance, reliability and high performance.

In this blog post I just want to show how to install RabbitMQ on Ubuntu box, and verify that it works with simple Groovy client.

Installing RabbitMQ server


As everything with Ubuntu, this step is pretty trivial:

$ sudo apt-get install rabbitmq-server

The only requirement for this package is Erlang distribution. If you already have Erlang installed on your system, the installation of rabbitmq-server is a quick procedure. The following directories will be created during the installation:


/usr/lib/rabbitmq/binexecutables added to the path
/usr/lib/erlang/lib/rabbitmq_server-1.x.xcompiled modules
/var/lib/rabbitmq/mnesiapersistent storage for messages
/var/log/rabbitmqlog files (e.g. startup_log, rabbit.log)

After installation is finished the RabbitMQ server is started and listens to incoming requests on port 5672. You can check /var/log/rabbitmq/startup_log file to see if everything was ok.

Groovy clients


I followed official Java client API to build two scripts: consumer.groovy
import com.rabbitmq.client.*

@Grab(group='com.rabbitmq', module='amqp-client', version='1.7.2')
params = new ConnectionParameters(
username: 'guest',
password: 'guest',
virtualHost: '/',
requestedHeartbeat: 0
)
factory = new ConnectionFactory(params)
conn = factory.newConnection('lab.ndpar.com', 5672)
channel = conn.createChannel()

exchangeName = 'myExchange'; queueName = 'myQueue'

channel.exchangeDeclare exchangeName, 'direct'
channel.queueDeclare queueName
channel.queueBind queueName, exchangeName, 'myRoutingKey'

def consumer = new QueueingConsumer(channel)
channel.basicConsume queueName, false, consumer

while (true) {
delivery = consumer.nextDelivery()
println "Received message: ${new String(delivery.body)}"
channel.basicAck delivery.envelope.deliveryTag, false
}
channel.close()
conn.close()

and publisher.groovy
import com.rabbitmq.client.*

@Grab(group='com.rabbitmq', module='amqp-client', version='1.7.2')
params = new ConnectionParameters(
username: 'guest',
password: 'guest',
virtualHost: '/',
requestedHeartbeat: 0
)
factory = new ConnectionFactory(params)
conn = factory.newConnection('lab.ndpar.com', 5672)
channel = conn.createChannel()

channel.basicPublish 'myExchange', 'myRoutingKey', null, "Hello, world!".bytes

channel.close()
conn.close()

Now start consumer in one terminal window
$ groovy consumer.groovy

and run publisher in another:
$ groovy publisher.groovy

On the consumer window you should see Received message: Hello, world! text, which means RabbitMQ works correctly.

Monitoring logs


You can check RabbitMQ logs by doing tail -f /var/log/rabbitmq/rabbit.log For example, starting the consumer results the following log entries:
=INFO REPORT==== 14-Mar-2010::11:20:53 ===
accepted TCP connection on 0.0.0.0:5672 from 192.168.2.10:62424

=INFO REPORT==== 14-Mar-2010::11:20:53 ===
starting TCP connection <0.24154.1> from 192.168.2.10:62424

Running the publisher:
=INFO REPORT==== 14-Mar-2010::11:22:08 ===
accepted TCP connection on 0.0.0.0:5672 from 192.168.2.10:62432

=INFO REPORT==== 14-Mar-2010::11:22:08 ===
starting TCP connection <0.24232.1> from 192.168.2.10:62432

=INFO REPORT==== 14-Mar-2010::11:22:08 ===
closing TCP connection <0.24232.1> from 192.168.2.10:62432

Now if we terminate the consumer by ^C there will be a warning
=WARNING REPORT==== 14-Mar-2010::11:25:03 ===
exception on TCP connection <0.24154.1> from 192.168.2.10:62424
connection_closed_abruptly

=INFO REPORT==== 14-Mar-2010::11:25:03 ===
closing TCP connection <0.24154.1> from 192.168.2.10:62424

but the connection is closed properly by the server.

That's it for now. Stay tuned for the future updates on my RabbitMQ experience.

Links


• Rapid application prototyping with Groovy DSL

4 comments:

Wolfgang Schell said...

Hi Andrey,

thanks for this nice tutorial.

I tried it with the newly released RabbitMQ 2.0 and had to adjust some parts due to API changes in the corresponding java client.

The connection part now looks like this:

@Grab(group='com.rabbitmq', module='amqp-client', version='2.0.0')
import com.rabbitmq.client.*

params = [
username: 'guest',
password: 'guest',
virtualHost: '/',
requestedHeartbeat: 0
]
factory = new ConnectionFactory(params)
conn = factory.newConnection(['localhost', 5672] as Address)
channel = conn.createChannel()


Note that class ConnectionParameters no longer exists and ConnectionFactory.newConnection() expects an Address as parameter.


When declaring a queue, some more parameters are necessary as well:


channel.queueDeclare queueName, false, false, true, [:]


Otherwise the scripts run just fine.

Thanks again,

Wolfgang

Andrey Paramonov said...

Hi Wolfgang, thank you for the update!

Matt Passell said...

Thanks for the nice little example!

I've updated the code to work with the latest Java client and posted it as a Gist on GitHub - https://gist.github.com/3801945

Andrey Paramonov said...

Thanks Matt!