Spring - Apache Kafka
Overview
Spring Boot integration with kafka & kafka streams
Github: https://github.com/gitorko/project80
Kafka
Kafka is a distributed & fault-tolerant,high throughput, scalable stream processing & messaging system.
- Kafka as publisher-subscriber messaging system.
- Kafka as queue (point-point) messaging system.
- Kafka as stream processing system that reacts to event in realtime.
- Kafka as a store for data.
Kafka stores streams of records (messages) in topics. Topics are partitioned and replicated across multiple nodes thus kafka can scale and be a distributed system. Producers publish data to the topics. Consumer groups can subscribe to topics.
- Data Store - Kafka is append only commit log. Which means it can also act as a data store.
- Queue (point-point) - If only one consumer group subscribes to a topic it behaves like a Queue (point-point) messaging system.
- Pub-Sub - If more than one consumer group subscribe to a topic it behaves like Pub-Sub messaging system.
- Consumer Group - Number of consumers in a group must be less than or equal to number of partitions. Cant have more consumers in a group than there are partitions.
- Partition - Producer needs to be aware of the partition its publishing to.
- Partition - When you add a new kafka broker the partition is replicated so loss of one node doesnt crash the system.
- Ordering - Ordering of messages is guaranteed only in a partition and not across partitions.
- Offset - Consumer can choose to read records from latest or from beginning.
- Long polling - Uses poll model compared to RabbitMQ which uses push model
- Adapters - Provides adapters that can be used to write data to db and other endpoints
- Stream - Provides stream processing capabilities
Similar to spring rest template or jdbc template which abstracts the rest/jdbc calls spring provides kafka template which provides high level abstraction to interact with kafka. There is an even higher level of abstraction provided by spring cloud stream which lets we integrate with kafka or rabbitmq and other messaging systems. So when the messaging systems changes you dont need to make code changes in producer or consumer.
Kafka Producer & Consumer
1package com.demo.project80.producer;
2
3import java.util.Arrays;
4import java.util.List;
5import java.util.Random;
6import java.util.UUID;
7import java.util.concurrent.TimeUnit;
8
9import com.demo.project80.pojo.User;
10import lombok.RequiredArgsConstructor;
11import lombok.extern.slf4j.Slf4j;
12import org.apache.kafka.clients.producer.ProducerRecord;
13import org.springframework.beans.factory.annotation.Value;
14import org.springframework.boot.CommandLineRunner;
15import org.springframework.boot.SpringApplication;
16import org.springframework.boot.autoconfigure.SpringBootApplication;
17import org.springframework.context.annotation.Bean;
18import org.springframework.kafka.core.KafkaTemplate;
19
20@SpringBootApplication
21@Slf4j
22@RequiredArgsConstructor
23public class KafkaProducer {
24
25 @Value(value = "${topic.name}")
26 private String topicName;
27
28 private final KafkaTemplate<String, User> kafkaTemplate;
29
30 @Bean
31 public CommandLineRunner sendData() {
32 return args -> {
33 List<String> users = Arrays.asList("david", "john", "raj", "peter");
34 Random random = new Random();
35 while (true) {
36 User user = new User(users.get(random.nextInt(users.size())), random.nextInt(100));
37 ProducerRecord<String, User> producerRecord = new ProducerRecord<>(topicName, user);
38 producerRecord.headers().add("message-id", UUID.randomUUID().toString().getBytes());
39 log.info("Sending user: {}", user);
40 kafkaTemplate.send(producerRecord);
41 TimeUnit.SECONDS.sleep(10);
42 }
43 };
44 }
45
46 public static void main(String[] args) {
47 SpringApplication.run(KafkaProducer.class, args);
48 }
49
50}
1package com.demo.project80.consumer;
2
3import com.demo.project80.pojo.User;
4import lombok.SneakyThrows;
5import lombok.extern.slf4j.Slf4j;
6import org.apache.kafka.clients.consumer.ConsumerRecord;
7import org.springframework.boot.SpringApplication;
8import org.springframework.boot.autoconfigure.SpringBootApplication;
9import org.springframework.kafka.annotation.KafkaListener;
10
11@SpringBootApplication
12@Slf4j
13public class KafkaConsumer {
14
15 @SneakyThrows
16 @KafkaListener(id = "my-client-app", topics = "${topic.name}")
17 public void topicConsumer(ConsumerRecord<String, User> consumerRecord) {
18 User user = consumerRecord.value();
19 log.info("Received User : {}", user);
20 }
21
22 public static void main(String[] args) {
23 SpringApplication.run(KafkaConsumer.class, args);
24 }
25
26}
1spring:
2 main:
3 banner-mode: "off"
4 web-application-type: none
5 kafka:
6 bootstrap-servers: localhost:9092
7 consumer:
8 client-id: my-client-consumer
9 group-id: group-01
10 auto-offset-reset: earliest
11 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
12 value-deserializer: com.demo.project80.converter.MessageDeserializer
13 producer:
14 client-id: my-client-app
15 key-serializer: org.apache.kafka.common.serialization.StringSerializer
16 value-serializer: com.demo.project80.converter.MessageSerializer
17 #value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
18topic:
19 name: mytopic.000
1version: '2'
2services:
3 zookeeper:
4 container_name: zookeeper
5 image: 'bitnami/zookeeper:latest'
6 ports:
7 - 2181:2181
8 environment:
9 - ALLOW_ANONYMOUS_LOGIN=yes
10 kafkaserver:
11 hostname: kafkaserver
12 container_name: kafkaserver
13 image: 'bitnami/kafka:latest'
14 ports:
15 - 9092:9092
16 depends_on:
17 - zookeeper
18 environment:
19 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
20 - KAFKA_ADVERTISED_HOST_NAME=kafkaserver
21 - ALLOW_PLAINTEXT_LISTENER=yes
22 links:
23 - zookeeper:zookeeper
24 kafka-ui:
25 container_name: kafka-ui
26 image: provectuslabs/kafka-ui:latest
27 ports:
28 - 9090:8080
29 depends_on:
30 - zookeeper
31 - kafkaserver
32 environment:
33 KAFKA_CLUSTERS_0_NAME: local
34 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafkaserver:9092
35 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
Run the main method of producer and then the consumer.
The group id of your client which uses group management to assign topic partitions to consumers, auto-offset-reset=earliest ensures the new consumer group will get the oldest available message.
we can have multiple kafka listener for a topic with different group id A consumer can listen to more than one topic. We have created the topic 'mytopic' with only one partition. For a topic with multiple partitions, @KafkaListener can explicitly subscribe to a particular partition of a topic with an initial offset.
1@KafkaListener(topics = "topic1", group = "group1")
2@KafkaListener(topics = "topic1,topic2", group = "group1")
3@KafkaListener(topicPartitions = @TopicPartition(topic = "topic1",
4 partitionOffsets = {
5 @PartitionOffset(partition = "0", initialOffset = "0"),
6 @PartitionOffset(partition = "2", initialOffset = "0")
7}))
Kafka Streams
Kafka Streams has stream-table duality. Tables are a set of evolving facts. Each new event overwrites the old one, whereas streams are a collection of immutable facts. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. KTable manages the changelog stream with the latest state of a given key For not partitioned tables we can use GlobalKTables to broadcast information to all tasks.
When we use other projects like apache spark, storm,flink we write code and copy the jar to the nodes where the actual work happens. With the introduction of kafka stream we can now write your processing logic for streams and then it can run anywhere the jar can run. KafkaStreams enables us to consume from Kafka topics, analyze or transform data, and potentially, send it to another Kafka topic.
We will now count the users by age group.
1package com.demo.project80.stream;
2
3import java.util.Properties;
4
5import com.demo.project80.pojo.User;
6import lombok.extern.slf4j.Slf4j;
7import org.apache.kafka.common.serialization.Serde;
8import org.apache.kafka.common.serialization.Serdes;
9import org.apache.kafka.streams.KafkaStreams;
10import org.apache.kafka.streams.KeyValue;
11import org.apache.kafka.streams.StreamsBuilder;
12import org.apache.kafka.streams.StreamsConfig;
13import org.apache.kafka.streams.kstream.Consumed;
14import org.apache.kafka.streams.kstream.Grouped;
15import org.apache.kafka.streams.kstream.KStream;
16import org.apache.kafka.streams.kstream.KTable;
17import org.springframework.beans.factory.annotation.Value;
18import org.springframework.boot.CommandLineRunner;
19import org.springframework.boot.SpringApplication;
20import org.springframework.boot.autoconfigure.SpringBootApplication;
21import org.springframework.context.annotation.Bean;
22import org.springframework.context.annotation.Profile;
23import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
24import org.springframework.kafka.support.serializer.JsonSerde;
25
26@SpringBootApplication
27@Slf4j
28public class KafkaStream {
29
30 @Value(value = "${topic.name}")
31 private String topicName;
32
33 private static final Serde<String> STRING_SERDE = Serdes.String();
34
35 @Bean
36 public CommandLineRunner streamData() {
37 return args -> {
38 StreamsBuilder streamsBuilder = new StreamsBuilder();
39 KStream<String, User> streamOfUsers = streamsBuilder
40 .stream(topicName, Consumed.with(STRING_SERDE, new JsonSerde<>(User.class)));
41
42 streamOfUsers.foreach((k, v) -> {
43 log.info("user: {}, age: {}", v.getName(), v.getAge());
44 });
45
46 KTable<String, Long> employeeCountByCompany = streamOfUsers
47 .map((k, v) -> new KeyValue<>(v.getAge(), String.valueOf(v.getAge())))
48 .groupBy((k, w) -> w, Grouped.with(STRING_SERDE, STRING_SERDE))
49 .count();
50 employeeCountByCompany.toStream().foreach((w, c) -> log.info("Age: " + w + " -> " + c));
51
52 KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kStreamsConfigs());
53 streams.cleanUp();
54 streams.start();
55 Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
56 };
57 }
58
59 @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
60 public Properties kStreamsConfigs() {
61 Properties props = new Properties();
62 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-group");
63 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
64 return props;
65 }
66
67 public static void main(String[] args) {
68 SpringApplication.run(KafkaStream.class, args);
69 }
70
71}
Run the main method of KafkaStream.
Setup
Project 80
Spring Boot & Kafka
https://gitorko.github.io/spring-apache-kafka/
Version
Check version
1$java --version
2openjdk 17.0.3 2022-04-19 LTS
Kafka
To run kafka we need zookeeper, use the docker compose command to run kafka as a container
For windows ensure the C:\Windows\System32\drivers\etc\hosts file has these 2 entries. For link ensure /etc/hosts has these 2 entries.
1127.0.0.1 zookeeper
2127.0.0.1 kafkaserver
1docker-compose -f docker/docker-compose.yml up
To create topic
1docker exec -it kafkaserver /bin/bash
2$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic mytopic.000 --bootstrap-server localhost:9092
To delete topic
1docker exec -it kafkaserver /bin/bash
2$ /opt/bitnami/kafka/bin/kafka-topics.sh --delete --topic mytopic.000 --bootstrap-server localhost:9092
Clean up
1docker-compose -f docker/docker-compose.yml stop
2docker rm kafka-ui kafkaserver zookeeper
Restart
1docker-compose -f docker/docker-compose.yml start
Dashboard for kafka, wait for a few seconds as it takes time to come up.
References
https://www.baeldung.com/java-kafka-streams
https://kafka.apache.org/quickstart
https://baeldung-cn.com/java-kafka-streams-vs-kafka-consumer