Spring - RabbitMQ
Overview
Spring with RabbitMQ message broker that implements Advanced Message Queuing Protocol(AMQP)
Github: https://github.com/gitorko/project78
RabbitMQ
Exchanges are like post offices or mailboxes and clients publish a message to an AMQP exchange. There are four built-in exchange types
- Direct Exchange – Routes messages to a queue by matching a complete routing key
- Fanout Exchange – Routes messages to all the queues bound to it
- Topic Exchange – Routes messages to multiple queues by matching a routing key to a pattern
- Headers Exchange – Routes messages based on message headers
Queues are bound to an exchange using a routing key. Messages are sent to an exchange with a routing key. AMQP (Advanced Message Queuing Protocol) is an open standard wire specification for asynchronous message communication, AMQP provides platform-neutral binary protocol standard, hence it can run on different environments & programming languages unlike JMS.
Remote procedure call (RPC) is a way to invoking a function on another computer and waiting for the result. The call is synchronous and blocking in nature, so the client will wait for the response.
Code
Queue to send and receive messages
1package com.demo.project78.queue;
2
3import com.demo.project78.config.AmqpConfig;
4import org.springframework.amqp.core.Queue;
5import org.springframework.amqp.core.QueueBuilder;
6import org.springframework.context.annotation.Bean;
7import org.springframework.context.annotation.Configuration;
8
9@Configuration
10public class QueueConfig {
11
12 @Bean
13 public Queue simpleQueue() {
14 return QueueBuilder.durable(AmqpConfig.SIMPLE_QUEUE).build();
15 }
16
17}
18
19
20
21
1package com.demo.project78.queue;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.RequiredArgsConstructor;
6import lombok.extern.slf4j.Slf4j;
7import org.springframework.amqp.rabbit.core.RabbitTemplate;
8import org.springframework.stereotype.Component;
9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class QueueSender {
14
15 private final RabbitTemplate rabbitTemplate;
16
17 public void send(Customer customer) {
18 rabbitTemplate.convertAndSend(AmqpConfig.SIMPLE_QUEUE, customer);
19 log.info("Sent to {} : {}", AmqpConfig.SIMPLE_QUEUE, customer);
20 }
21}
1package com.demo.project78.queue;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.extern.slf4j.Slf4j;
6import org.springframework.amqp.rabbit.annotation.RabbitListener;
7import org.springframework.stereotype.Component;
8
9@Component
10@Slf4j
11public class QueueReceiver {
12
13 @RabbitListener(queues = AmqpConfig.SIMPLE_QUEUE, containerFactory = "rabbitListenerContainerFactory")
14 public void receive(Customer customer) {
15 log.info("{} Received: {}", AmqpConfig.SIMPLE_QUEUE, customer);
16
17 //Simulate a failure on processing
18 if (customer.getName().equals("NO_NAME")) {
19 throw new RuntimeException("No customer name!");
20 }
21 }
22}
Direct exchange with routing key
1package com.demo.project78.exchange;
2
3import static com.demo.project78.config.AmqpConfig.DIRECT_ROUTING_KEY;
4
5import com.demo.project78.config.AmqpConfig;
6import org.springframework.amqp.core.Binding;
7import org.springframework.amqp.core.BindingBuilder;
8import org.springframework.amqp.core.Exchange;
9import org.springframework.amqp.core.ExchangeBuilder;
10import org.springframework.amqp.core.Queue;
11import org.springframework.amqp.core.QueueBuilder;
12import org.springframework.context.annotation.Bean;
13import org.springframework.context.annotation.Configuration;
14
15@Configuration
16public class ExchangeConfig {
17
18 @Bean
19 public Queue directQueue() {
20 return QueueBuilder.durable(AmqpConfig.DIRECT_QUEUE).build();
21 }
22
23
24 @Bean
25 public Exchange directExchange() {
26 /**
27 * DirectExchange - Routes messages with a routing key that exactly matches the binding key of a queue.
28 * routing logic is based on exact matches
29 */
30 return ExchangeBuilder.directExchange(AmqpConfig.DIRECT_EXCHANGE).durable(true).build();
31 }
32
33 @Bean
34 Binding binding(Queue directQueue, Exchange directExchange) {
35 return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY).noargs();
36 }
37
38}
1package com.demo.project78.exchange;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.RequiredArgsConstructor;
6import lombok.extern.slf4j.Slf4j;
7import org.springframework.amqp.rabbit.core.RabbitTemplate;
8import org.springframework.stereotype.Component;
9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class ExchangeSender {
14
15 final RabbitTemplate rabbitTemplate;
16
17 public void send(Customer customer, String routingKey) {
18 rabbitTemplate.convertAndSend(AmqpConfig.DIRECT_EXCHANGE, routingKey, customer);
19 log.info("Sent to {} with Key: {}, {}", AmqpConfig.DIRECT_EXCHANGE, routingKey, customer);
20 }
21}
1package com.demo.project78.exchange;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.extern.slf4j.Slf4j;
6import org.springframework.amqp.rabbit.annotation.RabbitListener;
7import org.springframework.stereotype.Component;
8
9@Component
10@Slf4j
11public class ExchangeReceiver {
12
13 @RabbitListener(queues = AmqpConfig.DIRECT_QUEUE)
14 public void receive(Customer customer) {
15 log.info("{} Received {}", AmqpConfig.DIRECT_QUEUE, customer);
16 }
17
18}
Fanout exchange
1package com.demo.project78.fanout;
2
3import com.demo.project78.config.AmqpConfig;
4import org.springframework.amqp.core.BindingBuilder;
5import org.springframework.amqp.core.Declarables;
6import org.springframework.amqp.core.FanoutExchange;
7import org.springframework.amqp.core.Queue;
8import org.springframework.context.annotation.Bean;
9import org.springframework.context.annotation.Configuration;
10
11import static org.springframework.amqp.core.BindingBuilder.bind;
12
13@Configuration
14public class FanoutExchangeConfig {
15
16 @Bean
17 public Declarables fanoutBindings() {
18 Queue fanoutQueue1 = new Queue(AmqpConfig.FANOUT_QUEUE_1, false);
19 Queue fanoutQueue2 = new Queue(AmqpConfig.FANOUT_QUEUE_2, false);
20 FanoutExchange fanoutExchange = new FanoutExchange(AmqpConfig.FANOUT_EXCHANGE);
21
22 return new Declarables(
23 fanoutQueue1,
24 fanoutQueue2,
25 fanoutExchange,
26 bind(fanoutQueue1).to(fanoutExchange),
27 BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
28 }
29}
1package com.demo.project78.fanout;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.RequiredArgsConstructor;
6import lombok.extern.slf4j.Slf4j;
7import org.springframework.amqp.rabbit.core.RabbitTemplate;
8import org.springframework.stereotype.Component;
9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class FanoutSender {
14
15 private final RabbitTemplate rabbitTemplate;
16
17 public void send(Customer customer, String routingKey) {
18 rabbitTemplate.convertAndSend(AmqpConfig.FANOUT_EXCHANGE, routingKey, customer);
19 //routing key doesnt matter
20 log.info("Sent to {} with Key: {}, {}", AmqpConfig.FANOUT_EXCHANGE, routingKey, customer);
21 }
22}
1package com.demo.project78.fanout;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.extern.slf4j.Slf4j;
6import org.springframework.amqp.rabbit.annotation.RabbitListener;
7import org.springframework.stereotype.Component;
8
9@Component
10@Slf4j
11public class FanoutReceiver {
12
13 @RabbitListener(queues = AmqpConfig.FANOUT_QUEUE_1)
14 public void receive1(Customer customer) {
15 log.info("{} Received {}", AmqpConfig.FANOUT_QUEUE_1, customer);
16 }
17
18 @RabbitListener(queues = AmqpConfig.FANOUT_QUEUE_2)
19 public void receive2(Customer customer) {
20 log.info("{} Received {}", AmqpConfig.FANOUT_QUEUE_2, customer);
21 }
22}
Topic exchange
1package com.demo.project78.topic;
2
3import com.demo.project78.config.AmqpConfig;
4import org.springframework.amqp.core.BindingBuilder;
5import org.springframework.amqp.core.Declarables;
6import org.springframework.amqp.core.ExchangeBuilder;
7import org.springframework.amqp.core.Queue;
8import org.springframework.amqp.core.QueueBuilder;
9import org.springframework.amqp.core.TopicExchange;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13@Configuration
14public class TopicExchangeConfig {
15
16 @Bean
17 public Declarables topicBindings() {
18 Queue topicQueue1 = QueueBuilder.durable(AmqpConfig.TOPIC_QUEUE_1).build();
19 Queue topicQueue2 = QueueBuilder.durable(AmqpConfig.TOPIC_QUEUE_2).build();
20
21 TopicExchange topicExchange = ExchangeBuilder.topicExchange(AmqpConfig.TOPIC_EXCHANGE).durable(true).build();
22
23 return new Declarables(
24 topicQueue1,
25 topicQueue2,
26 topicExchange,
27 BindingBuilder
28 .bind(topicQueue1)
29 .to(topicExchange).with("*.booking.*"),
30 BindingBuilder
31 .bind(topicQueue2)
32 .to(topicExchange).with("#.reward"));
33 }
34}
1package com.demo.project78.topic;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.RequiredArgsConstructor;
6import lombok.extern.slf4j.Slf4j;
7import org.springframework.amqp.rabbit.core.RabbitTemplate;
8import org.springframework.stereotype.Component;
9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class TopicSender {
14
15 private final RabbitTemplate rabbitTemplate;
16
17 public void send(Customer customer, String routingKey) {
18 rabbitTemplate.convertAndSend(AmqpConfig.TOPIC_EXCHANGE, routingKey, customer);
19 log.info("Sent to {} with Key: {}, {}", AmqpConfig.TOPIC_EXCHANGE, routingKey, customer);
20 }
21}
1package com.demo.project78.topic;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.extern.slf4j.Slf4j;
6import org.springframework.amqp.rabbit.annotation.RabbitListener;
7import org.springframework.stereotype.Component;
8
9@Component
10@Slf4j
11public class TopicReceiver {
12
13 @RabbitListener(queues = AmqpConfig.TOPIC_QUEUE_1)
14 public void receive1(Customer customer) {
15 log.info("[Processor1] {} Received {}", AmqpConfig.TOPIC_QUEUE_1, customer);
16 }
17
18 @RabbitListener(queues = AmqpConfig.TOPIC_QUEUE_2)
19 public void receive2(Customer customer) {
20 log.info("[Processor2] {} Received {}", AmqpConfig.TOPIC_QUEUE_2, customer);
21 }
22}
Error handling & Exchange creation
1package com.demo.project78.config;
2
3import com.fasterxml.jackson.databind.ObjectMapper;
4import lombok.extern.slf4j.Slf4j;
5import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
6import org.springframework.amqp.rabbit.connection.ConnectionFactory;
7import org.springframework.amqp.rabbit.core.RabbitTemplate;
8import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
9import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12import org.springframework.util.ErrorHandler;
13
14@Configuration
15@Slf4j
16public class AmqpConfig {
17
18 public static final String SIMPLE_QUEUE = "project78.simple.queue";
19
20 public static final String DIRECT_EXCHANGE = "project78.direct.exchange";
21 public static final String DIRECT_QUEUE = "project78.direct.queue";
22 public static final String DIRECT_ROUTING_KEY = "project78.direct.key";
23
24 public static final String RPC_EXCHANGE = "project78.rpc.exchange";
25 public static final String RPC_QUEUE = "project78.rpc.queue";
26 public static final String RPC_KEY = "project78.rpc.key";
27
28 public static final String FANOUT_QUEUE_1 = "project78.fanout.queue1";
29 public static final String FANOUT_QUEUE_2 = "project78.fanout.queue2";
30 public static final String FANOUT_EXCHANGE = "project78.fanout.exchange";
31 public static final String FANOUT_KEY1 = "*.fan-key1.*";
32 public static final String FANOUT_KEY2 = "*.fan-key2.*";
33
34 public static final String TOPIC_QUEUE_1 = "project78.topic.booking";
35 public static final String TOPIC_QUEUE_2 = "project78.topic.reward";
36 public static final String TOPIC_EXCHANGE = "project78.topic.exchange";
37
38 static final ObjectMapper objectMapper = new ObjectMapper();
39
40 @Bean
41 public Jackson2JsonMessageConverter jsonMessageConverter() {
42 return new Jackson2JsonMessageConverter(objectMapper);
43 }
44
45 @Bean
46 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
47 final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
48 rabbitTemplate.setMessageConverter(jsonMessageConverter());
49 return rabbitTemplate;
50 }
51
52 /**
53 * Without setting the error handle the failed message getting re-queued will cause infinite loops
54 */
55 @Bean
56 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
57 Jackson2JsonMessageConverter jsonMessageConverter,
58 ErrorHandler errorHandler) {
59 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
60 factory.setConnectionFactory(connectionFactory);
61 factory.setMessageConverter(jsonMessageConverter);
62 factory.setErrorHandler(errorHandler);
63 //Not to re-queue
64 factory.setDefaultRequeueRejected(false);
65 return factory;
66 }
67
68 @Bean
69 public ErrorHandler errorHandler() {
70 return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
71 }
72
73 public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
74 @Override
75 public boolean isFatal(Throwable t) {
76 /**
77 * Check exception and decide to re-queue or not.
78 * default is set to not re-queue in spring application property default-requeue-rejected: false
79 */
80 return super.isFatal(t);
81 }
82 }
83}
84
RPC
1package com.demo.project78.rpc.config;
2
3import com.demo.project78.config.AmqpConfig;
4import org.springframework.amqp.core.Binding;
5import org.springframework.amqp.core.BindingBuilder;
6import org.springframework.amqp.core.Exchange;
7import org.springframework.amqp.core.ExchangeBuilder;
8import org.springframework.amqp.core.Queue;
9import org.springframework.amqp.core.QueueBuilder;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13@Configuration
14public class RpcConfig {
15
16 @Bean
17 public Queue rpcQueue() {
18 return QueueBuilder.durable(AmqpConfig.RPC_QUEUE).build();
19 }
20
21 @Bean
22 Exchange rpcExchange() {
23 return ExchangeBuilder.directExchange(AmqpConfig.RPC_EXCHANGE).durable(true).build();
24 }
25
26 @Bean
27 Binding binding(Queue rpcQueue, Exchange rpcExchange) {
28 return BindingBuilder.bind(rpcQueue).to(rpcExchange).with(AmqpConfig.RPC_KEY).noargs();
29 }
30
31}
32
1package com.demo.project78.rpc.client;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.RequiredArgsConstructor;
6import lombok.extern.slf4j.Slf4j;
7import org.springframework.amqp.rabbit.core.RabbitTemplate;
8import org.springframework.stereotype.Component;
9
10@Component
11@RequiredArgsConstructor
12@Slf4j
13public class RpcClient {
14
15 private final RabbitTemplate rabbitTemplate;
16
17 public String send(Customer customer, String routingKey) {
18 rabbitTemplate.setReplyTimeout(60000);
19 log.info("RPC Call with key: {} Sent to Exchange: {}", routingKey, customer);
20 String response = (String) rabbitTemplate.convertSendAndReceive(AmqpConfig.RPC_EXCHANGE, routingKey, customer);
21 log.info("RPC Call got '{}'", response);
22 return response;
23 }
24
25}
1package com.demo.project78.rpc.server;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import lombok.SneakyThrows;
6import lombok.extern.slf4j.Slf4j;
7import org.springframework.amqp.rabbit.annotation.RabbitListener;
8import org.springframework.stereotype.Component;
9
10import java.util.concurrent.TimeUnit;
11
12@Component
13@Slf4j
14public class RpcServer {
15
16 @SneakyThrows
17 @RabbitListener(queues = AmqpConfig.RPC_QUEUE)
18 public String receive(Customer customer) {
19 log.info("{} Received {}", AmqpConfig.RPC_QUEUE, customer);
20 TimeUnit.SECONDS.sleep(2);
21 return "Hello world, " + customer.getName();
22 }
23
24}
Rest
1package com.demo.project78.controller;
2
3import com.demo.project78.config.AmqpConfig;
4import com.demo.project78.domain.Customer;
5import com.demo.project78.exchange.ExchangeSender;
6import com.demo.project78.fanout.FanoutSender;
7import com.demo.project78.queue.QueueSender;
8import com.demo.project78.rpc.client.RpcClient;
9import com.demo.project78.topic.TopicSender;
10import lombok.RequiredArgsConstructor;
11import org.springframework.http.ResponseEntity;
12import org.springframework.web.bind.annotation.GetMapping;
13import org.springframework.web.bind.annotation.RestController;
14
15@RestController
16@RequiredArgsConstructor
17public class HomeController {
18
19 final QueueSender queueSender;
20 final ExchangeSender exchangeSender;
21 final FanoutSender fanoutSender;
22 final TopicSender topicSender;
23 final RpcClient rpcClient;
24
25 @GetMapping("/simple")
26 public ResponseEntity simple() {
27 Customer customer = Customer.builder()
28 .name("Jack")
29 .age(35)
30 .build();
31 //Simple object sent to queue
32 queueSender.send(customer);
33 return ResponseEntity.ok().build();
34 }
35
36 @GetMapping("/direct")
37 public ResponseEntity directExchange() {
38 Customer customer = Customer.builder()
39 .name("Adam")
40 .age(40)
41 .build();
42 //Will be received by the queue
43 exchangeSender.send(customer, AmqpConfig.DIRECT_ROUTING_KEY);
44 //Will not be received by any queue
45 exchangeSender.send(customer, "");
46 return ResponseEntity.ok().build();
47 }
48
49 @GetMapping("/fanout")
50 public ResponseEntity fanOut() {
51 Customer customer1 = Customer.builder()
52 .name("Raj")
53 .age(30)
54 .build();
55 Customer customer2 = Customer.builder()
56 .name("David")
57 .age(32)
58 .build();
59
60 //All queue registered will receive message irrespective of routing key
61 fanoutSender.send(customer1, AmqpConfig.FANOUT_KEY1);
62 fanoutSender.send(customer2, AmqpConfig.FANOUT_KEY2);
63 return ResponseEntity.ok().build();
64 }
65
66 @GetMapping("/topic")
67 public ResponseEntity topicExchange() {
68 Customer customer = Customer.builder()
69 .name("David")
70 .age(32)
71 .build();
72 //This will goto the booking queue only.
73 topicSender.send(customer, "status.booking.confirmed");
74 //This will goto both the queue as it has booking + reward key
75 topicSender.send(customer, "status.booking.reward");
76 return ResponseEntity.ok().build();
77 }
78
79 @GetMapping("/error")
80 public ResponseEntity error() {
81 Customer customer = Customer.builder()
82 .name("NO_NAME")
83 .age(35)
84 .build();
85 queueSender.send(customer);
86 return ResponseEntity.ok().build();
87 }
88
89 @GetMapping("/rpc")
90 public ResponseEntity<String> rpc() {
91 Customer customer = Customer.builder()
92 .name("Adam")
93 .age(40)
94 .build();
95 String response = rpcClient.send(customer, AmqpConfig.RPC_KEY);
96 return ResponseEntity.ok(response);
97 }
98}
Postman
Import the postman collection to postman
Setup
1# Project 78
2
3Spring & RabbitMQ
4
5[https://gitorko.github.io/spring-amqp/](https://gitorko.github.io/spring-amqp/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk 21.0.3 2024-04-16 LTS
14```
15
16### RabbitMQ
17
18Run the docker command to start a rabbitmq instance
19
20```bash
21docker run -d --hostname my-rabbit --name my-rabbit -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 8085:15672 -p 5672:5672 rabbitmq:3-management
22```
23
24Open the rabbitmq console
25
26[http://localhost:8085](http://localhost:8085)
27
28```
29user:guest
30pwd: guest
31```
32
33### Dev
34
35```bash
36./gradlew bootRun
37```
References
https://spring.io/projects/spring-amqp
https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html