Spring - RabbitMQ

Overview

Spring with RabbitMQ message broker that implements Advanced Message Queuing Protocol(AMQP)

Github: https://github.com/gitorko/project78

RabbitMQ

Exchanges are like post offices or mailboxes and clients publish a message to an AMQP exchange. There are four built-in exchange types

  1. Direct Exchange – Routes messages to a queue by matching a complete routing key
  2. Fanout Exchange – Routes messages to all the queues bound to it
  3. Topic Exchange – Routes messages to multiple queues by matching a routing key to a pattern
  4. Headers Exchange – Routes messages based on message headers

Queues are bound to an exchange using a routing key. Messages are sent to an exchange with a routing key. AMQP (Advanced Message Queuing Protocol) is an open standard wire specification for asynchronous message communication, AMQP provides platform-neutral binary protocol standard, hence it can run on different environments & programming languages unlike JMS.

Remote procedure call (RPC) is a way to invoking a function on another computer and waiting for the result. The call is synchronous and blocking in nature, so the client will wait for the response.

Code

Queue to send and receive messages

 1package com.demo.project78.queue;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.Queue;
 5import org.springframework.context.annotation.Bean;
 6import org.springframework.context.annotation.Configuration;
 7
 8@Configuration
 9public class QueueConfig {
10
11    @Bean
12    public Queue simpleQueue() {
13        return new Queue(AmqpConfig.SIMPLE_QUEUE, true);
14    }
15
16}
17
18
19
20
 1package com.demo.project78.queue;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class QueueSender {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer) {
18        rabbitTemplate.convertAndSend(AmqpConfig.SIMPLE_QUEUE, customer);
19        log.info("Sent to Simple Queue : {}", customer);
20    }
21}
 1package com.demo.project78.queue;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class QueueReceiver {
12
13    @RabbitListener(queues = AmqpConfig.SIMPLE_QUEUE)
14    public void receive(Customer customer) {
15        log.info("{} Received '{}'", AmqpConfig.SIMPLE_QUEUE, customer);
16
17        //Simulate a failure on processing
18        if (customer.getName().equals("NO_NAME")) {
19            throw new RuntimeException("No customer name!");
20        }
21    }
22}

Direct exchange with routing key

 1package com.demo.project78.exchange;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.Binding;
 5import org.springframework.amqp.core.BindingBuilder;
 6import org.springframework.amqp.core.Queue;
 7import org.springframework.amqp.core.TopicExchange;
 8import org.springframework.context.annotation.Bean;
 9import org.springframework.context.annotation.Configuration;
10
11@Configuration
12public class ExchangeConfig {
13
14    @Bean
15    public Queue directQueue() {
16        return new Queue(AmqpConfig.DIRECT_QUEUE, true);
17    }
18
19    @Bean
20    TopicExchange directExchange() {
21        return new TopicExchange(AmqpConfig.DIRECT_EXCHANGE);
22    }
23
24    @Bean
25    Binding binding() {
26        return BindingBuilder.bind(directQueue()).to(directExchange()).with(AmqpConfig.DIRECT_KEY);
27    }
28
29}
 1package com.demo.project78.exchange;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class ExchangeSender {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer, String routingKey) {
18        rabbitTemplate.convertAndSend(AmqpConfig.DIRECT_EXCHANGE, routingKey, customer);
19        log.info("Direct with key: {} Sent to Exchange: {}", routingKey, customer);
20    }
21}
 1package com.demo.project78.exchange;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class ExchangeReceiver {
12
13    @RabbitListener(queues = AmqpConfig.DIRECT_QUEUE)
14    public void receive(Customer customer) {
15        log.info("{} Received '{}'", AmqpConfig.DIRECT_QUEUE, customer);
16    }
17
18}

Fanout exchange

 1package com.demo.project78.fanout;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.BindingBuilder;
 5import org.springframework.amqp.core.Declarables;
 6import org.springframework.amqp.core.FanoutExchange;
 7import org.springframework.amqp.core.Queue;
 8import org.springframework.context.annotation.Bean;
 9import org.springframework.context.annotation.Configuration;
10
11import static org.springframework.amqp.core.BindingBuilder.bind;
12
13@Configuration
14public class FanoutExchangeConfig {
15
16    @Bean
17    public Declarables fanoutBindings() {
18        Queue fanoutQueue1 = new Queue(AmqpConfig.FANOUT_QUEUE_1, false);
19        Queue fanoutQueue2 = new Queue(AmqpConfig.FANOUT_QUEUE_2, false);
20        FanoutExchange fanoutExchange = new FanoutExchange(AmqpConfig.FANOUT_EXCHANGE);
21
22        return new Declarables(
23                fanoutQueue1,
24                fanoutQueue2,
25                fanoutExchange,
26                bind(fanoutQueue1).to(fanoutExchange),
27                BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
28    }
29}
 1package com.demo.project78.fanout;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class FanoutSender {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer, String routingKey) {
18        rabbitTemplate.convertAndSend(AmqpConfig.FANOUT_EXCHANGE, routingKey, customer);
19        //routing key doesnt matter
20        log.info("Fanout with key: {} Sent to Exchange: {}", routingKey, customer);
21    }
22}
 1package com.demo.project78.fanout;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class FanoutReceiver {
12
13    @RabbitListener(queues = AmqpConfig.FANOUT_QUEUE_1)
14    public void receive1(Customer customer) {
15        log.info("{} Received '{}'", AmqpConfig.FANOUT_QUEUE_1, customer);
16    }
17
18    @RabbitListener(queues = AmqpConfig.FANOUT_QUEUE_2)
19    public void receive2(Customer customer) {
20        log.info("{} Received '{}'", AmqpConfig.FANOUT_QUEUE_2, customer);
21    }
22}

Topic exchange

 1package com.demo.project78.topic;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.BindingBuilder;
 5import org.springframework.amqp.core.Declarables;
 6import org.springframework.amqp.core.Queue;
 7import org.springframework.amqp.core.TopicExchange;
 8import org.springframework.context.annotation.Bean;
 9import org.springframework.context.annotation.Configuration;
10
11@Configuration
12public class TopicExchangeConfig {
13
14    @Bean
15    public Declarables topicBindings() {
16        Queue topicQueue1 = new Queue(AmqpConfig.TOPIC_QUEUE_1, false);
17        Queue topicQueue2 = new Queue(AmqpConfig.TOPIC_QUEUE_2, false);
18
19        TopicExchange topicExchange = new TopicExchange(AmqpConfig.TOPIC_EXCHANGE);
20
21        return new Declarables(
22                topicQueue1,
23                topicQueue2,
24                topicExchange,
25                BindingBuilder
26                        .bind(topicQueue1)
27                        .to(topicExchange).with("*.booking.*"),
28                BindingBuilder
29                        .bind(topicQueue2)
30                        .to(topicExchange).with("#.error"));
31    }
32}
 1package com.demo.project78.topic;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@Slf4j
12@RequiredArgsConstructor
13public class TopicSender {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer, String routingKey) {
18        rabbitTemplate.convertAndSend(AmqpConfig.TOPIC_EXCHANGE, routingKey, customer);
19        log.info("Topic with key: {} Sent to Exchange: {}", routingKey, customer);
20    }
21}
 1package com.demo.project78.topic;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.rabbit.annotation.RabbitListener;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10@Slf4j
11public class TopicReceiver {
12
13    @RabbitListener(queues = AmqpConfig.TOPIC_QUEUE_1)
14    public void receive1(Customer customer) {
15        log.info("{} Received '{}'", AmqpConfig.TOPIC_QUEUE_1, customer);
16    }
17
18    @RabbitListener(queues = AmqpConfig.TOPIC_QUEUE_2)
19    public void receive2(Customer customer) {
20        log.info("{} Received '{}'", AmqpConfig.TOPIC_QUEUE_2, customer);
21    }
22}

Error handling & Exchange creation

 1package com.demo.project78.config;
 2
 3import com.fasterxml.jackson.databind.ObjectMapper;
 4import lombok.extern.slf4j.Slf4j;
 5import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 6import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
 9import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
10import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
11import org.springframework.context.annotation.Bean;
12import org.springframework.context.annotation.Configuration;
13import org.springframework.util.ErrorHandler;
14
15@Configuration
16@Slf4j
17public class AmqpConfig {
18
19    public static final String SIMPLE_QUEUE = "simple.queue";
20
21    public static final String DIRECT_EXCHANGE = "direct.exchange";
22    public static final String DIRECT_QUEUE = "direct.queue";
23    public static final String DIRECT_KEY = "direct-key";
24
25    public static final String RPC_EXCHANGE = "rpc.exchange";
26    public static final String RPC_QUEUE = "rpc.queue";
27    public static final String RPC_KEY = "rpc-key";
28
29    public static final String FANOUT_QUEUE_1 = "fanout.queue1";
30    public static final String FANOUT_QUEUE_2 = "fanout.queue2";
31    public static final String FANOUT_EXCHANGE = "fanout.exchange";
32    public static final String FANOUT_KEY1 = "*.fan-key1.*";
33    public static final String FANOUT_KEY2 = "*.fan-key2.*";
34
35    public static final String TOPIC_QUEUE_1 = "topic.booking";
36    public static final String TOPIC_QUEUE_2 = "topic.error";
37    public static final String TOPIC_EXCHANGE = "topic.exchange";
38
39    @Bean
40    public Jackson2JsonMessageConverter jsonMessageConverter() {
41        ObjectMapper objectMapper = new ObjectMapper();
42        return new Jackson2JsonMessageConverter(objectMapper);
43    }
44
45    @Bean
46    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
47        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
48        rabbitTemplate.setMessageConverter(jsonMessageConverter());
49        return rabbitTemplate;
50    }
51
52    /**
53     * Without setting the error handle the failed message getting re-queued will cause infinite loops
54     */
55    @Bean
56    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
57        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
58        factory.setConnectionFactory(connectionFactory);
59        factory.setMessageConverter(jsonMessageConverter());
60        factory.setErrorHandler(errorHandler());
61        return factory;
62    }
63
64    @Bean
65    public ErrorHandler errorHandler() {
66        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
67    }
68
69    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
70        @Override
71        public boolean isFatal(Throwable t) {
72            if (t instanceof ListenerExecutionFailedException) {
73                log.error("Fatal error: {}", t.getMessage());
74                return false;
75            }
76            return super.isFatal(t);
77        }
78    }
79}
80

RPC

 1package com.demo.project78.rpc.config;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import org.springframework.amqp.core.Binding;
 5import org.springframework.amqp.core.BindingBuilder;
 6import org.springframework.amqp.core.Queue;
 7import org.springframework.amqp.core.TopicExchange;
 8import org.springframework.context.annotation.Bean;
 9import org.springframework.context.annotation.Configuration;
10
11@Configuration
12public class RpcConfig {
13
14    @Bean
15    public Queue rpcQueue() {
16        return new Queue(AmqpConfig.RPC_QUEUE, true);
17    }
18
19    @Bean
20    TopicExchange rpcExchange() {
21        return new TopicExchange(AmqpConfig.RPC_EXCHANGE);
22    }
23
24    @Bean
25    Binding binding() {
26        return BindingBuilder.bind(rpcQueue()).to(rpcExchange()).with(AmqpConfig.RPC_KEY);
27    }
28
29}
30
 1package com.demo.project78.rpc.client;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.RequiredArgsConstructor;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.core.RabbitTemplate;
 8import org.springframework.stereotype.Component;
 9
10@Component
11@RequiredArgsConstructor
12@Slf4j
13public class RpcClient {
14
15    private final RabbitTemplate rabbitTemplate;
16
17    public void send(Customer customer, String routingKey) {
18        rabbitTemplate.setReplyTimeout(60000);
19        log.info("RPC Call with key: {} Sent to Exchange: {}", routingKey, customer);
20        String response = (String) rabbitTemplate.convertSendAndReceive(AmqpConfig.RPC_EXCHANGE, routingKey, customer);
21        log.info("RPC Call got '{}'", response);
22    }
23
24}
 1package com.demo.project78.rpc.server;
 2
 3import com.demo.project78.config.AmqpConfig;
 4import com.demo.project78.model.Customer;
 5import lombok.SneakyThrows;
 6import lombok.extern.slf4j.Slf4j;
 7import org.springframework.amqp.rabbit.annotation.RabbitListener;
 8import org.springframework.stereotype.Component;
 9
10import java.util.concurrent.TimeUnit;
11
12@Component
13@Slf4j
14public class RpcServer {
15
16    @SneakyThrows
17    @RabbitListener(queues = AmqpConfig.RPC_QUEUE)
18    public String receive(Customer customer) {
19        log.info("{} Received '{}'", AmqpConfig.RPC_QUEUE, customer);
20        TimeUnit.SECONDS.sleep(5);
21        return "Hello world, " + customer.getName();
22    }
23
24
25}

Setup

Project 78

Spring & RabbitMQ

https://gitorko.github.io/spring-amqp/

Version

Check version

1$java --version
2openjdk 17.0.3 2022-04-19 LTS

RabbitMQ

Run the docker command to start a rabbitmq instance

1docker run -d --hostname my-rabbit --name my-rabbit -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 8080:15672 -p 5672:5672 rabbitmq:3-management

Open the rabbitmq console

http://localhost:8080

1user:guest
2pwd: guest

Dev

1./gradlew bootRun

References

https://spring.io/projects/spring-amqp

https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html

comments powered by Disqus