RabbitMQ Streams
Overview
RabbitMQ Stream implementation.
Github: https://github.com/gitorko/project74
RabbitMQ Streams
Streams implement append-only log, messages are persistent and replicated.
- Large fan-outs - Deliver the same message to multiple subscribers
- Replay / Time-travelling - Read messages from any point.
- Throughput Performance - Log based messaging deliver performance compared to traditional queues.
- Large logs - Streams are designed to store larger amounts of data in an efficient manner with minimal in-memory overhead.
Code
1package com.demo.project74;
2
3import java.nio.charset.StandardCharsets;
4import java.time.Duration;
5import java.util.concurrent.CountDownLatch;
6import java.util.concurrent.TimeUnit;
7import java.util.stream.IntStream;
8
9import com.rabbitmq.stream.ByteCapacity;
10import com.rabbitmq.stream.Consumer;
11import com.rabbitmq.stream.Environment;
12import com.rabbitmq.stream.Message;
13import com.rabbitmq.stream.OffsetSpecification;
14import com.rabbitmq.stream.Producer;
15import lombok.SneakyThrows;
16import lombok.extern.slf4j.Slf4j;
17import org.springframework.scheduling.annotation.Async;
18import org.springframework.scheduling.annotation.EnableAsync;
19import org.springframework.stereotype.Service;
20
21@EnableAsync
22@Service
23@Slf4j
24public class AsyncService {
25
26 private static final int MESSAGE_COUNT = 10;
27 private static final String STREAM_NAME = "my-stream";
28
29 @SneakyThrows
30 @Async
31 public void producer() {
32 log.info("Starting producer!");
33 try (Environment environment = Environment.builder().uri("rabbitmq-stream://localhost:5552").build()) {
34 environment.streamCreator()
35 .stream(STREAM_NAME)
36 .maxAge(Duration.ofHours(6))
37 .maxSegmentSizeBytes(ByteCapacity.MB(500))
38 .create();
39 Producer producer = environment
40 .producerBuilder()
41 .stream(STREAM_NAME)
42 .build();
43
44 CountDownLatch confirmLatch = new CountDownLatch(MESSAGE_COUNT);
45 IntStream.range(0, MESSAGE_COUNT).forEach(i -> {
46 Message message = producer.messageBuilder()
47 .properties()
48 .creationTime(System.currentTimeMillis())
49 .messageId(i)
50 .messageBuilder()
51 .addData(("customer_" + i).getBytes(StandardCharsets.UTF_8))
52 .build();
53 producer.send(message, confirmationStatus -> confirmLatch.countDown());
54 log.info("Published: {}", message.getBody());
55 try {
56 TimeUnit.SECONDS.sleep(1);
57 } catch (InterruptedException e) {
58 throw new RuntimeException(e);
59 }
60 });
61 boolean done = confirmLatch.await(1, TimeUnit.MINUTES);
62 log.info("Completed send: {}", done);
63 //environment.deleteStream(STREAM_NAME);
64 }
65 }
66
67 @SneakyThrows
68 @Async
69 public void consumer() {
70 log.info("Starting consumer!");
71 TimeUnit.SECONDS.sleep(2);
72 try (Environment environment = Environment.builder().uri("rabbitmq-stream://localhost:5552").build()) {
73 Consumer consumer = environment.consumerBuilder()
74 .stream(STREAM_NAME)
75 .offset(OffsetSpecification.last())
76 .messageHandler((context, message) -> {
77 log.info("Consumed: {}", message.getBody());
78 })
79 .build();
80 //Don't let the thread end.
81 CountDownLatch finishLatch = new CountDownLatch(1);
82 finishLatch.await();
83 }
84 }
85}
Setup
1# Project 74
2
3RabbitMQ Stream
4
5[https://gitorko.github.io/rabbitmq-stream/](https://gitorko.github.io/rabbitmq-stream/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk version "21.0.3" 2024-04-16 LTS
14```
15
16### RabbitMQ
17
18Run the docker command to start a rabbitmq instance
19
20```bash
21docker run -it --hostname my-rabbit --rm --name my-rabbit -e RABBITMQ_DEFAULT_USER=guest \
22-e RABBITMQ_DEFAULT_PASS=guest -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
23-p 8080:15672 -p 5672:5672 -p 5552:5552 rabbitmq:3-management
24```
25
26```bash
27docker exec my-rabbit rabbitmq-plugins enable rabbitmq_stream
28```
29
30Open the rabbitmq console
31
32[http://localhost:8080](http://localhost:8080)
33
34```
35user:guest
36pwd: guest
37```
38
39### Dev
40
41```bash
42./gradlew bootRun
43```
References
comments powered by Disqus