RabbitMQ Streams

Overview

RabbitMQ Stream implementation.

Github: https://github.com/gitorko/project74

RabbitMQ Streams

Streams implement append-only log, messages are persistent and replicated.

  1. Large fan-outs - Deliver the same message to multiple subscribers
  2. Replay / Time-travelling - Read messages from any point.
  3. Throughput Performance - Log based messaging deliver performance compared to traditional queues.
  4. Large logs - Streams are designed to store larger amounts of data in an efficient manner with minimal in-memory overhead.

Code

 1package com.demo.project74;
 2
 3import java.nio.charset.StandardCharsets;
 4import java.time.Duration;
 5import java.util.concurrent.CountDownLatch;
 6import java.util.concurrent.TimeUnit;
 7import java.util.stream.IntStream;
 8
 9import com.rabbitmq.stream.ByteCapacity;
10import com.rabbitmq.stream.Consumer;
11import com.rabbitmq.stream.Environment;
12import com.rabbitmq.stream.Message;
13import com.rabbitmq.stream.OffsetSpecification;
14import com.rabbitmq.stream.Producer;
15import lombok.SneakyThrows;
16import lombok.extern.slf4j.Slf4j;
17import org.springframework.scheduling.annotation.Async;
18import org.springframework.scheduling.annotation.EnableAsync;
19import org.springframework.stereotype.Service;
20
21@EnableAsync
22@Service
23@Slf4j
24public class AsyncService {
25
26    private static final int MESSAGE_COUNT = 10;
27    private static final String STREAM_NAME = "my-stream";
28
29    @SneakyThrows
30    @Async
31    public void producer() {
32        log.info("Starting producer!");
33        try (Environment environment = Environment.builder().uri("rabbitmq-stream://localhost:5552").build()) {
34            environment.streamCreator()
35                    .stream(STREAM_NAME)
36                    .maxAge(Duration.ofHours(6))
37                    .maxSegmentSizeBytes(ByteCapacity.MB(500))
38                    .create();
39            Producer producer = environment
40                    .producerBuilder()
41                    .stream(STREAM_NAME)
42                    .build();
43
44            CountDownLatch confirmLatch = new CountDownLatch(MESSAGE_COUNT);
45            IntStream.range(0, MESSAGE_COUNT).forEach(i -> {
46                Message message = producer.messageBuilder()
47                        .properties()
48                        .creationTime(System.currentTimeMillis())
49                        .messageId(i)
50                        .messageBuilder()
51                        .addData(("customer_" + i).getBytes(StandardCharsets.UTF_8))
52                        .build();
53                producer.send(message, confirmationStatus -> confirmLatch.countDown());
54                log.info("Published:  {}", message.getBody());
55                try {
56                    TimeUnit.SECONDS.sleep(1);
57                } catch (InterruptedException e) {
58                    throw new RuntimeException(e);
59                }
60            });
61            boolean done = confirmLatch.await(1, TimeUnit.MINUTES);
62            log.info("Completed send: {}", done);
63            //environment.deleteStream(STREAM_NAME);
64        }
65    }
66
67    @SneakyThrows
68    @Async
69    public void consumer() {
70        log.info("Starting consumer!");
71        TimeUnit.SECONDS.sleep(2);
72        try (Environment environment = Environment.builder().uri("rabbitmq-stream://localhost:5552").build()) {
73            Consumer consumer = environment.consumerBuilder()
74                    .stream(STREAM_NAME)
75                    .offset(OffsetSpecification.last())
76                    .messageHandler((context, message) -> {
77                        log.info("Consumed:  {}", message.getBody());
78                    })
79                    .build();
80            //Don't let the thread end.
81            CountDownLatch finishLatch = new CountDownLatch(1);
82            finishLatch.await();
83        }
84    }
85}

Setup

 1# Project 74
 2
 3RabbitMQ Stream
 4
 5[https://gitorko.github.io/rabbitmq-stream/](https://gitorko.github.io/rabbitmq-stream/)
 6
 7### Version
 8
 9Check version
10
11```bash
12$java --version
13openjdk version "21.0.3" 2024-04-16 LTS
14```
15
16### RabbitMQ
17
18Run the docker command to start a rabbitmq instance
19
20```bash
21docker run -it --hostname my-rabbit --rm --name my-rabbit -e RABBITMQ_DEFAULT_USER=guest \
22-e RABBITMQ_DEFAULT_PASS=guest -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
23-p 8080:15672 -p 5672:5672 -p 5552:5552 rabbitmq:3-management 
24```
25
26```bash
27docker exec my-rabbit rabbitmq-plugins enable rabbitmq_stream
28```
29
30Open the rabbitmq console
31
32[http://localhost:8080](http://localhost:8080)
33
34```
35user:guest
36pwd: guest
37```
38
39### Dev
40
41```bash
42./gradlew bootRun
43```

References

https://www.rabbitmq.com/streams.html

comments powered by Disqus