Spring - RabbitMQ

Overview

Spring with RabbitMQ message broker that implements Advanced Message Queuing Protocol(AMQP)

Github: https://github.com/gitorko/project78

RabbitMQ

Exchanges are like post offices or mailboxes and clients publish a message to an AMQP exchange. There are four built-in exchange types

  1. Direct Exchange – Routes messages to a queue by matching a complete routing key
  2. Fanout Exchange – Routes messages to all the queues bound to it
  3. Topic Exchange – Routes messages to multiple queues by matching a routing key to a pattern
  4. Headers Exchange – Routes messages based on message headers

Queues are bound to an exchange using a routing key. Messages are sent to an exchange with a routing key. AMQP (Advanced Message Queuing Protocol) is an open standard wire specification for asynchronous message communication, AMQP provides platform-neutral binary protocol standard, hence it can run on different environments & programming languages unlike JMS.

Remote procedure call (RPC) is a way to invoking a function on another computer and waiting for the result. The call is synchronous and blocking in nature, so the client will wait for the response.

Code

Queue to send and receive messages

 1package com.demo.project78.queue;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.Queue;
 5import org.springframework.amqp.core.QueueBuilder;
 6import org.springframework.context.annotation.Bean;
 7import org.springframework.context.annotation.Configuration;
 8
 9@Configuration
10public class QueueConfig {
11
12    @Bean
13    public Queue simpleQueue() {
14        return QueueBuilder.durable(AmqpConfig.SIMPLE_QUEUE).build();
15    }
16
17}
18
19
20
21
 1package com.demo.project78.queue;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class QueueSender {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer) {
18        rabbitTemplate.convertAndSend(AmqpConfig.SIMPLE_QUEUE, customer);
19        log.info("Sent to {} : {}", AmqpConfig.SIMPLE_QUEUE, customer);
20    }
21}
 1package com.demo.project78.queue;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class QueueReceiver {
12
13    @RabbitListener(queues = AmqpConfig.SIMPLE_QUEUE, containerFactory = "rabbitListenerContainerFactory")
14    public void receive(Customer customer) {
15        log.info("{} Received: {}", AmqpConfig.SIMPLE_QUEUE, customer);
16
17        //Simulate a failure on processing
18        if (customer.getName().equals("NO_NAME")) {
19            throw new RuntimeException("No customer name!");
20        }
21    }
22}

Direct exchange with routing key

 1package com.demo.project78.exchange;
 2
 3import static com.demo.project78.config.AmqpConfig.DIRECT_ROUTING_KEY;
 4
 5import com.demo.project78.config.AmqpConfig;
 6import org.springframework.amqp.core.Binding;
 7import org.springframework.amqp.core.BindingBuilder;
 8import org.springframework.amqp.core.Exchange;
 9import org.springframework.amqp.core.ExchangeBuilder;
10import org.springframework.amqp.core.Queue;
11import org.springframework.amqp.core.QueueBuilder;
12import org.springframework.context.annotation.Bean;
13import org.springframework.context.annotation.Configuration;
14
15@Configuration
16public class ExchangeConfig {
17
18    @Bean
19    public Queue directQueue() {
20        return QueueBuilder.durable(AmqpConfig.DIRECT_QUEUE).build();
21    }
22
23
24    @Bean
25    public Exchange directExchange() {
26        /**
27         * DirectExchange - Routes messages with a routing key that exactly matches the binding key of a queue.
28         * routing logic is based on exact matches
29         */
30        return ExchangeBuilder.directExchange(AmqpConfig.DIRECT_EXCHANGE).durable(true).build();
31    }
32
33    @Bean
34    Binding binding(Queue directQueue, Exchange directExchange) {
35        return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY).noargs();
36    }
37
38}
 1package com.demo.project78.exchange;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class ExchangeSender {
14
15    final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer, String routingKey) {
18        rabbitTemplate.convertAndSend(AmqpConfig.DIRECT_EXCHANGE, routingKey, customer);
19        log.info("Sent to {} with Key: {}, {}", AmqpConfig.DIRECT_EXCHANGE, routingKey, customer);
20    }
21}
 1package com.demo.project78.exchange;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class ExchangeReceiver {
12
13    @RabbitListener(queues = AmqpConfig.DIRECT_QUEUE)
14    public void receive(Customer customer) {
15        log.info("{} Received {}", AmqpConfig.DIRECT_QUEUE, customer);
16    }
17
18}

Fanout exchange

 1package com.demo.project78.fanout;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.BindingBuilder;
 5import org.springframework.amqp.core.Declarables;
 6import org.springframework.amqp.core.FanoutExchange;
 7import org.springframework.amqp.core.Queue;
 8import org.springframework.context.annotation.Bean;
 9import org.springframework.context.annotation.Configuration;
10
11import static org.springframework.amqp.core.BindingBuilder.bind;
12
13@Configuration
14public class FanoutExchangeConfig {
15
16    @Bean
17    public Declarables fanoutBindings() {
18        Queue fanoutQueue1 = new Queue(AmqpConfig.FANOUT_QUEUE_1, false);
19        Queue fanoutQueue2 = new Queue(AmqpConfig.FANOUT_QUEUE_2, false);
20        FanoutExchange fanoutExchange = new FanoutExchange(AmqpConfig.FANOUT_EXCHANGE);
21
22        return new Declarables(
23                fanoutQueue1,
24                fanoutQueue2,
25                fanoutExchange,
26                bind(fanoutQueue1).to(fanoutExchange),
27                BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
28    }
29}
 1package com.demo.project78.fanout;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class FanoutSender {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer, String routingKey) {
18        rabbitTemplate.convertAndSend(AmqpConfig.FANOUT_EXCHANGE, routingKey, customer);
19        //routing key doesnt matter
20        log.info("Sent to {} with Key: {},  {}", AmqpConfig.FANOUT_EXCHANGE, routingKey, customer);
21    }
22}
 1package com.demo.project78.fanout;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class FanoutReceiver {
12
13    @RabbitListener(queues = AmqpConfig.FANOUT_QUEUE_1)
14    public void receive1(Customer customer) {
15        log.info("{} Received {}", AmqpConfig.FANOUT_QUEUE_1, customer);
16    }
17
18    @RabbitListener(queues = AmqpConfig.FANOUT_QUEUE_2)
19    public void receive2(Customer customer) {
20        log.info("{} Received {}", AmqpConfig.FANOUT_QUEUE_2, customer);
21    }
22}

Topic exchange

 1package com.demo.project78.topic;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.BindingBuilder;
 5import org.springframework.amqp.core.Declarables;
 6import org.springframework.amqp.core.ExchangeBuilder;
 7import org.springframework.amqp.core.Queue;
 8import org.springframework.amqp.core.QueueBuilder;
 9import org.springframework.amqp.core.TopicExchange;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13@Configuration
14public class TopicExchangeConfig {
15
16    @Bean
17    public Declarables topicBindings() {
18        Queue topicQueue1 = QueueBuilder.durable(AmqpConfig.TOPIC_QUEUE_1).build();
19        Queue topicQueue2 = QueueBuilder.durable(AmqpConfig.TOPIC_QUEUE_2).build();
20
21        TopicExchange topicExchange = ExchangeBuilder.topicExchange(AmqpConfig.TOPIC_EXCHANGE).durable(true).build();
22
23        return new Declarables(
24                topicQueue1,
25                topicQueue2,
26                topicExchange,
27                BindingBuilder
28                        .bind(topicQueue1)
29                        .to(topicExchange).with("*.booking.*"),
30                BindingBuilder
31                        .bind(topicQueue2)
32                        .to(topicExchange).with("#.reward"));
33    }
34}
 1package com.demo.project78.topic;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class TopicSender {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer, String routingKey) {
18        rabbitTemplate.convertAndSend(AmqpConfig.TOPIC_EXCHANGE, routingKey, customer);
19        log.info("Sent to {} with Key: {},  {}", AmqpConfig.TOPIC_EXCHANGE, routingKey, customer);
20    }
21}
 1package com.demo.project78.topic;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class TopicReceiver {
12
13    @RabbitListener(queues = AmqpConfig.TOPIC_QUEUE_1)
14    public void receive1(Customer customer) {
15        log.info("[Processor1] {}  Received {}", AmqpConfig.TOPIC_QUEUE_1, customer);
16    }
17
18    @RabbitListener(queues = AmqpConfig.TOPIC_QUEUE_2)
19    public void receive2(Customer customer) {
20        log.info("[Processor2] {} Received {}", AmqpConfig.TOPIC_QUEUE_2, customer);
21    }
22}

Error handling & Exchange creation

 1package com.demo.project78.config;
 2
 3import com.fasterxml.jackson.databind.ObjectMapper;
 4import lombok.extern.slf4j.Slf4j;
 5import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 6import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
 9import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12import org.springframework.util.ErrorHandler;
13
14@Configuration
15@Slf4j
16public class AmqpConfig {
17
18    public static final String SIMPLE_QUEUE = "project78.simple.queue";
19
20    public static final String DIRECT_EXCHANGE = "project78.direct.exchange";
21    public static final String DIRECT_QUEUE = "project78.direct.queue";
22    public static final String DIRECT_ROUTING_KEY = "project78.direct.key";
23
24    public static final String RPC_EXCHANGE = "project78.rpc.exchange";
25    public static final String RPC_QUEUE = "project78.rpc.queue";
26    public static final String RPC_KEY = "project78.rpc.key";
27
28    public static final String FANOUT_QUEUE_1 = "project78.fanout.queue1";
29    public static final String FANOUT_QUEUE_2 = "project78.fanout.queue2";
30    public static final String FANOUT_EXCHANGE = "project78.fanout.exchange";
31    public static final String FANOUT_KEY1 = "*.fan-key1.*";
32    public static final String FANOUT_KEY2 = "*.fan-key2.*";
33
34    public static final String TOPIC_QUEUE_1 = "project78.topic.booking";
35    public static final String TOPIC_QUEUE_2 = "project78.topic.reward";
36    public static final String TOPIC_EXCHANGE = "project78.topic.exchange";
37
38    static final ObjectMapper objectMapper = new ObjectMapper();
39
40    @Bean
41    public Jackson2JsonMessageConverter jsonMessageConverter() {
42        return new Jackson2JsonMessageConverter(objectMapper);
43    }
44
45    @Bean
46    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
47        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
48        rabbitTemplate.setMessageConverter(jsonMessageConverter());
49        return rabbitTemplate;
50    }
51
52    /**
53     * Without setting the error handle the failed message getting re-queued will cause infinite loops
54     */
55    @Bean
56    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
57                                                                               Jackson2JsonMessageConverter jsonMessageConverter,
58                                                                               ErrorHandler errorHandler) {
59        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
60        factory.setConnectionFactory(connectionFactory);
61        factory.setMessageConverter(jsonMessageConverter);
62        factory.setErrorHandler(errorHandler);
63        //Not to re-queue
64        factory.setDefaultRequeueRejected(false);
65        return factory;
66    }
67
68    @Bean
69    public ErrorHandler errorHandler() {
70        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
71    }
72
73    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
74        @Override
75        public boolean isFatal(Throwable t) {
76            /**
77             * Check exception and decide to re-queue or not.
78             * default is set to not re-queue in spring application property default-requeue-rejected: false
79             */
80            return super.isFatal(t);
81        }
82    }
83}
84

RPC

 1package com.demo.project78.rpc.config;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.Binding;
 5import org.springframework.amqp.core.BindingBuilder;
 6import org.springframework.amqp.core.Exchange;
 7import org.springframework.amqp.core.ExchangeBuilder;
 8import org.springframework.amqp.core.Queue;
 9import org.springframework.amqp.core.QueueBuilder;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13@Configuration
14public class RpcConfig {
15
16    @Bean
17    public Queue rpcQueue() {
18        return QueueBuilder.durable(AmqpConfig.RPC_QUEUE).build();
19    }
20
21    @Bean
22    Exchange rpcExchange() {
23        return ExchangeBuilder.directExchange(AmqpConfig.RPC_EXCHANGE).durable(true).build();
24    }
25
26    @Bean
27    Binding binding(Queue rpcQueue, Exchange rpcExchange) {
28        return BindingBuilder.bind(rpcQueue).to(rpcExchange).with(AmqpConfig.RPC_KEY).noargs();
29    }
30
31}
32
 1package com.demo.project78.rpc.client;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@RequiredArgsConstructor
12@Slf4j
13public class RpcClient {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public String send(Customer customer, String routingKey) {
18        rabbitTemplate.setReplyTimeout(60000);
19        log.info("RPC Call with key: {} Sent to Exchange: {}", routingKey, customer);
20        String response = (String) rabbitTemplate.convertSendAndReceive(AmqpConfig.RPC_EXCHANGE, routingKey, customer);
21        log.info("RPC Call got '{}'", response);
22        return response;
23    }
24
25}
 1package com.demo.project78.rpc.server;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import lombok.SneakyThrows;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.annotation.RabbitListener;
 8import org.springframework.stereotype.Component;
 9
10import java.util.concurrent.TimeUnit;
11
12@Component
13@Slf4j
14public class RpcServer {
15
16    @SneakyThrows
17    @RabbitListener(queues = AmqpConfig.RPC_QUEUE)
18    public String receive(Customer customer) {
19        log.info("{} Received {}", AmqpConfig.RPC_QUEUE, customer);
20        TimeUnit.SECONDS.sleep(2);
21        return "Hello world, " + customer.getName();
22    }
23
24}

Rest

 1package com.demo.project78.controller;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.domain.Customer;
 5import com.demo.project78.exchange.ExchangeSender;
 6import com.demo.project78.fanout.FanoutSender;
 7import com.demo.project78.queue.QueueSender;
 8import com.demo.project78.rpc.client.RpcClient;
 9import com.demo.project78.topic.TopicSender;
10import lombok.RequiredArgsConstructor;
11import org.springframework.http.ResponseEntity;
12import org.springframework.web.bind.annotation.GetMapping;
13import org.springframework.web.bind.annotation.RestController;
14
15@RestController
16@RequiredArgsConstructor
17public class HomeController {
18
19    final QueueSender queueSender;
20    final ExchangeSender exchangeSender;
21    final FanoutSender fanoutSender;
22    final TopicSender topicSender;
23    final RpcClient rpcClient;
24
25    @GetMapping("/simple")
26    public ResponseEntity simple() {
27        Customer customer = Customer.builder()
28                .name("Jack")
29                .age(35)
30                .build();
31        //Simple object sent to queue
32        queueSender.send(customer);
33        return ResponseEntity.ok().build();
34    }
35
36    @GetMapping("/direct")
37    public ResponseEntity directExchange() {
38        Customer customer = Customer.builder()
39                .name("Adam")
40                .age(40)
41                .build();
42        //Will be received by the queue
43        exchangeSender.send(customer, AmqpConfig.DIRECT_ROUTING_KEY);
44        //Will not be received by any queue
45        exchangeSender.send(customer, "");
46        return ResponseEntity.ok().build();
47    }
48
49    @GetMapping("/fanout")
50    public ResponseEntity fanOut() {
51        Customer customer1 = Customer.builder()
52                .name("Raj")
53                .age(30)
54                .build();
55        Customer customer2 = Customer.builder()
56                .name("David")
57                .age(32)
58                .build();
59
60        //All queue registered will receive message irrespective of routing key
61        fanoutSender.send(customer1, AmqpConfig.FANOUT_KEY1);
62        fanoutSender.send(customer2, AmqpConfig.FANOUT_KEY2);
63        return ResponseEntity.ok().build();
64    }
65
66    @GetMapping("/topic")
67    public ResponseEntity topicExchange() {
68        Customer customer = Customer.builder()
69                .name("David")
70                .age(32)
71                .build();
72        //This will goto the booking queue only.
73        topicSender.send(customer, "status.booking.confirmed");
74        //This will goto both the queue as it has booking + reward key
75        topicSender.send(customer, "status.booking.reward");
76        return ResponseEntity.ok().build();
77    }
78
79    @GetMapping("/error")
80    public ResponseEntity error() {
81        Customer customer = Customer.builder()
82                .name("NO_NAME")
83                .age(35)
84                .build();
85        queueSender.send(customer);
86        return ResponseEntity.ok().build();
87    }
88
89    @GetMapping("/rpc")
90    public ResponseEntity<String> rpc() {
91        Customer customer = Customer.builder()
92                .name("Adam")
93                .age(40)
94                .build();
95        String response = rpcClient.send(customer, AmqpConfig.RPC_KEY);
96        return ResponseEntity.ok(response);
97    }
98}

Postman

Import the postman collection to postman

Postman Collection

Setup

 1# Project 78
 2
 3Spring & RabbitMQ
 4
 5[https://gitorko.github.io/spring-amqp/](https://gitorko.github.io/spring-amqp/)
 6
 7### Version
 8
 9Check version
10
11```bash
12$java --version
13openjdk 21.0.3 2024-04-16 LTS
14```
15
16### RabbitMQ
17
18Run the docker command to start a rabbitmq instance
19
20```bash
21docker run -d --hostname my-rabbit --name my-rabbit -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 8085:15672 -p 5672:5672 rabbitmq:3-management
22```
23
24Open the rabbitmq console
25
26[http://localhost:8085](http://localhost:8085)
27
28```
29user:guest
30pwd: guest
31```
32
33### Dev
34
35```bash
36./gradlew bootRun
37```

References

https://spring.io/projects/spring-amqp

https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html

comments powered by Disqus