Monday, August 02, 2010

Working with RabbitMQ in Spring applications

Recently SpringSource released Spring AMQP 1.0.0.M1. Now, if you are a Spring shop working with RabbitMQ, you don't need to write low level code to connect to RabbitMQ server anymore. Instead, you can use well-known Spring abstractions (message templates and containers) to produce/consume AMQP messages, the same approach you would use for JMS. Here is my previous example re-implemented using Spring AMQP.

Very simple application classes (sender and receiver)

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class MessageSender {

@Autowired
private AmqpTemplate template;

public void send(String text) {
template.convertAndSend(text);
}
}

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageHandler implements MessageListener {

@Override
public void onMessage(Message message) {
System.out.println("Received message: " + message);
}
}

and pretty standard application context

<context:annotation-config />

<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory"
p:username="guest" p:password="guest" p:virtualHost="/" p:port="5672">
<constructor-arg value="lab.ndpar.com" />
</bean>

<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
p:connectionFactory-ref="rabbitConnectionFactory"
p:routingKey="myRoutingKey"
p:exchange="myExchange" />

<bean id="messageSender" class="com.ndpar.spring.rabbitmq.MessageSender" />


<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
p:connectionFactory-ref="rabbitConnectionFactory"
p:queueName="myQueue"
p:messageListener-ref="messageListener" />

<bean id="messageListener" class="com.ndpar.spring.rabbitmq.MessageHandler" />

That's it, simple and clean.

Resources

• Spring AMQP official page

• Source code for this blog

28 comments:

Kshitiz said...

Hi
I found your post useful. i tried running it, but getting the following errors:

Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0'; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:169)
at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:335)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:908)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:428)
at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:139)
at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:83)
at rabbitMQSimpleApp.Sender.main(Sender.java:9)
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitUtils.convertRabbitAccessException(RabbitUtils.java:118)
at org.springframework.amqp.rabbit.support.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:107)
at org.springframework.amqp.rabbit.listener.AbstractRabbitListeningContainer.start(AbstractRabbitListeningContainer.java:200)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:166)
... 9 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:121)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:145)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:548)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:71)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.createBlockingQueueConsumer(SimpleMessageListenerContainer.java:214)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.initializeConsumers(SimpleMessageListenerContainer.java:191)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:157)
at org.springframework.amqp.rabbit.listener.AbstractRabbitListeningContainer.start(AbstractRabbitListeningContainer.java:197)
... 10 more

Kshitiz said...

My applicationcontext.xml has only these lines changed:

bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory"
p:username="guest" p:password="guest" p:port="5672">

constructor-arg value="localhost"

Andrey Paramonov said...

It looks like the queue does not exist. Can you execute the following command on the box where RabbitMQ server is running

sudo rabbitmqctl list_queues

and make sure you see 'myQueue' queue in the output?

Kshitiz said...

Thanks Andrey, that might be the cause of the problem. But I have to create the queue programatically. I have referred the sample hello world application example given with spring amqp framework. But since they have written so much code in java files, I am now trying to transfer all the configuration code from java to xml file. May be you can help me in that direction. Actually, I have a doubt with the ConsumerConfiguration class. They have defined listenerContainer() in ConsumerConfiguration class which is returning a SimpleMessageListenerContainer object. Now I am not able to find out that who is using that SimpleMessageListenerContainer object in the hierarchy of ConsumerConfiguration class? I have to map this object to something in my xml config file. After running my producer code, message is correctly being sent to the Broker but nothing happens when I run my Consumer.

Here is the code i am using:

Producer.java
==============
package rabbitMQSampleApp1;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ConfigurableApplicatio nContext;
import org.springframework.context.support.ClassPathXmlAp plicationContext;

public class Producer {

public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("rabbitMQSampleApp1/rabbitConfiguration.xml");
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Message Sent");
}

}


ConsumerHandler.java
======================
package rabbitMQSampleApp1;

public class ConsumerHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}

Kshitiz said...

Consumer.java
===============
package rabbitMQSampleApp1;

import org.springframework.context.annotation.AnnotationC onfigApplicationContext;

public class Consumer {

public static void main(String[] args) {
new AnnotationConfigApplicationContext("rabbitMQSample App1/rabbitConfiguration.xml");
}
}

rabbitConfiguration.xml
========================
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<!-- Define a connectionFactory -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection. SingleConnectionFactory">
<constructor-arg value="localhost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>

<!-- Tell the AMQP Admin about that connectionFactory -->
<bean id="amqpAdmin" class="org.springframework.amqp.rabbit.core.Rabbit Admin">
<constructor-arg ref="connectionFactory"/>
</bean>

<!-- Since the AMQP Admin knows about the connectionFactory,
we can create a queue on Rabbit Broker using the RabbitTemplate provided by Spring framework-Rabbit APIs -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.Rabbit Template"
p:connectionFactory-ref="connectionFactory" p:queue="hello.world.queue" />

<!-- With the help of all the configurations mentioned above this line, a producer can simply use an
amqp template in its java file to publish anything on the that queue only ??
Question is that how and why an amqp template objects sends the notification only on that queue? -->


<!-- Below are the consumer settings -->

<!-- make one consumer handler -->
<bean id="consumerHandler" class="rabbitMQSampleApp1.ConsumerHandler" />

<!-- pass a new instance of that consumer handler as a constructor argument to a messageListenerAdaptor object-->
<bean id="messageListenerAdaptor" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
<constructor-arg ref="consumerHandler"/>
</bean>

<!-- make an object of SimpleMessageListenerContainer -->
<bean class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer"
p:connectionFactory-ref="connectionFactory" p:queueName="hello.world.queue"
p:messageListener-ref="messageListenerAdaptor" />

<!-- What to do with the object of SimpleMessageListenerContainer -->

</beans>

Andrey Paramonov said...

SimpleMessageListenerContainer is a Spring infrastructural bean, you shouldn't inject it to any of your classes. Instead, you need to pass your message listener to it, which you already do.

To learn how to create queue/exchange/binding programmatically please take a look at Admin class and corresponding application context.

Hope it helps.

Kshitiz said...

Thanks a lot Andrey,

I am able to run the sample now, the only doubt I have now is about my Producer class.

Producer.java
=============
public class Producer {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("rabbitMQSampleApp/rabbitConfiguration.xml");
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);

/* We have retrieved an amqpTemplate from the context.
* Producer can simply use that amqp template to publish anything.
* Question is that how and why this amqp template object
* sends the notification only on queue which we have declared through Admin class , i.e. "myQueue"
* where is this mapping present ? */

amqpTemplate.convertAndSend("Hello World");
System.out.println("Message Sent");
}
}

Andrey Paramonov said...

Template sends messages to the exchange, not to the queue. Then RabbitMQ server dispatches the message to appropriate queue(s)
based on the type of the exchange and routing key. And the "mapping" between exchange and queue is done in Admin:

Binding binding = new Binding(queue, exchange, "myRoutingKey");
admin.declareBinding(binding);

Kshitiz said...

Thanks a lot Andrey, I have understood it completely.. :)

Actually I am trying to create one notification engine (PubSub model). Do you think this model will work and is scalable and extensible?

Restful APIs <--> Spring Integration(lighweight ESBs like framework) <--> ActiveMQ (+MySql for message persistence)

Andrey Paramonov said...

I don't see RabbitMQ in your list :) but anyways...

It's hard to answer your question. Since it's too generic, the answer will be generic too: Keep your design simple, use proven technologies, follow design principles, and your application will be scalable and extensible.

Andrey Paramonov said...

I've changed the source repository for this post to http://github.com/ndpar/rabbitmq-spring-demo

Kshitiz said...

Hi Andrey,

I am back on using RabbitMQ as it seems cloud ready. This is my flow now:

Restful APIs <--> Spring Integration(tentative) <--> Spring AMQP <--> RabbitMQ

Will be needing help from you ;)

Kshitiz said...

Hi Andrey,

I am trying to run a simple 'fanout' app. I am not able to find out that how does a listener configures the binding of its queue to an existing 'fanout' exchange. As I understand, there are two ways to listen to a message:

=============
1. Implement MessageListener and override the onMessage method along with the configuration of SimpleMessageListenerContainer (where there is no mention of exchange & only the queueName can be set).

2. Use the RabbitTemplate's receiveAndConvert() method along with the configuration of RabbitTemplate (where there is no mention of queue & only the exchange can be set).

============

So the question is where and how do we bind the exchange (on which a published message has been sent) to the consumer/listener's queue?

Thanks,
Kshitiz

Andrey Paramonov said...

Hi Kshitiz,

I guess your question is how to create exchange, queue and binding declaratively. If so, add the following snippet to your app context. It will create fanout and queue and bind them:

<bean id="fanout" class="org.springframework.amqp.core.FanoutExchange">
<constructor-arg value="fanout.name" />
</bean>

<bean id="queue" class="org.springframework.amqp.core.Queue">
<constructor-arg value="queue.name" />
</bean>

<bean id="binding" class="org.springframework.amqp.core.Binding">
<constructor-arg index="0" ref="queue" />
<constructor-arg index="1" ref="fanout" />
</bean>

<bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
<constructor-arg ref="rabbitConnectionFactory" />
</bean>

- Andrey -

Kshitiz said...

Thanks Andrey,

So we have to bind the exchange and queues at the broker level. does that mean we should know in advance that how many consumers are sitting out there and on which queue(s) are they listening?

Kshitiz

Andrey Paramonov said...

No, you can bind any exchange to any queue, even if some of the are already bound.

Kshitiz said...

Hi Andrey,

Well please forgive me If I am not understanding it. I have to publish a message and that should be delivered to each subscriber for that message. Now, what are the steps I should take after creating an exchange and send a message to that exchange..

1. Should I keep some queues (bounded to that exchange) handy so that a consumer binds to one of those handy queues and get the message?
In this case, I should be knowing how many consumers are going to get that message and make those many queues handy at the broker level..

OR

2. Create a queue, and bind to that queue in the consumer's code
but I don't know how to do that as I understand, there are two ways to listen to a message:
Either implement MessageListener and override the onMessage method along with the configuration of SimpleMessageListenerContainer (where there is no mention of exchange & only the queueName can be set).
Or use the RabbitTemplate's receiveAndConvert() method along with the configuration of RabbitTemplate (where there is no mention of queue & only the exchange can be set).

Please let me know about the other ways I should do it..

Thanks,
Kshitiz

Kshitiz said...

Even if I have to make as many queues as there are consumers (since I want that the message published should be available to each consumer), I am not able to get the messages consumed at the consumer end.

My application is a war file deployed in Tomcat and there is no exception as such. Interestingly, a separate java application (as urs) is working fine.

I have implemented MessageListener and overriden the onMessage method along with the configuration of SimpleMessageListenerContainer.

Do I need to acknowledge the message or something else to consume it properly?

Kshitiz said...

My consumer configurations and java files are exactly like yours !

Andrey Paramonov said...

@Kshitiz I assume it works now.

Kshitiz said...

nope Andrey, Please read my last 3 comments...I am still struggling to get the message consumed under Tomcat

Kshitiz said...

Hi Andrey,

It's working now. The next milestone I have to achieve is a faster consumption of each message

The scenario is like this:

I have to send emails/SMSes to different people after consuming the messages internally.

How should I go about it.

My requirement is actually neither a direct or a topic posting. it's like post a message and consume it faster by having multiple consumers ready. The message should be consumed only by one consumer...


Should i make a no. of consumers using a shared rabbitmq queue in which each consumer is capable of consuming the message posted on the exchange and then passes it to a new thread and becomes free.

But having a single queue might hamper the performance. is there any other way ? what do you suggest?

Andrey Paramonov said...

@Kshitiz What you described is called work distribution pattern.

For performance I don't have concrete numbers. Try to implement your solution, and see if the performance is good for you.

Kshitiz said...

Thanks for the directions Andrey :)

Unknown said...

Hi,
I have following configuration




























I am getting following error

channel error; reason: {#method(reply-code=404,reply-text=NOT_FOUND - no queue 'queue.name' in vhost '/',class-id=50,method-id=10),null,""}
can any one reply

Shane said...

Andrey
I want to programmaticaly set Listeners on the ListenerContainer is this possible?

Andrey Paramonov said...

@Shane

Yes, it's possible. Just look up your ListenerContainer from the ApplicationContext, and call setMessageListener method.

Manoj Arya said...

Simple and efficient !! works :)