Spring - Apache Kafka

Overview

Spring Boot integration with kafka & kafka streams

Github: https://github.com/gitorko/project80

Kafka

Kafka is a distributed & fault-tolerant,high throughput, scalable stream processing & messaging system.

  1. Kafka as publisher-subscriber messaging system.
  2. Kafka as queue (point-point) messaging system.
  3. Kafka as stream processing system that reacts to event in realtime.
  4. Kafka as a store for data.

Kafka stores streams of records (messages) in topics. Topics are partitioned and replicated across multiple nodes thus kafka can scale and be a distributed system. Producers publish data to the topics. Consumer groups can subscribe to topics.

  1. Data Store - Kafka is append only commit log. Which means it can also act as a data store.
  2. Queue (point-point) - If only one consumer group subscribes to a topic it behaves like a Queue (point-point) messaging system.
  3. Pub-Sub - If more than one consumer group subscribe to a topic it behaves like Pub-Sub messaging system.
  4. Consumer Group - Number of consumers in a group must be less than or equal to number of partitions. Cant have more consumers in a group than there are partitions.
  5. Partition - Producer needs to be aware of the partition its publishing to.
  6. Partition - When you add a new kafka broker the partition is replicated so loss of one node doesnt crash the system.
  7. Ordering - Ordering of messages is guaranteed only in a partition and not across partitions.
  8. Offset - Consumer can choose to read records from latest or from beginning.
  9. Long polling - Uses poll model compared to RabbitMQ which uses push model
  10. Adapters - Provides adapters that can be used to write data to db and other endpoints
  11. Stream - Provides stream processing capabilities

Similar to spring rest template or jdbc template which abstracts the rest/jdbc calls spring provides kafka template which provides high level abstraction to interact with kafka. There is an even higher level of abstraction provided by spring cloud stream which lets we integrate with kafka or rabbitmq and other messaging systems. So when the messaging systems changes you dont need to make code changes in producer or consumer.

Kafka Producer & Consumer

 1package com.demo.project80.producer;
 2
 3import java.util.Arrays;
 4import java.util.List;
 5import java.util.Random;
 6import java.util.UUID;
 7import java.util.concurrent.TimeUnit;
 8
 9import com.demo.project80.pojo.User;
10import lombok.RequiredArgsConstructor;
11import lombok.extern.slf4j.Slf4j;
12import org.apache.kafka.clients.producer.ProducerRecord;
13import org.springframework.beans.factory.annotation.Value;
14import org.springframework.boot.CommandLineRunner;
15import org.springframework.boot.SpringApplication;
16import org.springframework.boot.autoconfigure.SpringBootApplication;
17import org.springframework.context.annotation.Bean;
18import org.springframework.kafka.core.KafkaTemplate;
19
20@SpringBootApplication
21@Slf4j
22@RequiredArgsConstructor
23public class KafkaProducer {
24
25    @Value(value = "${topic.name}")
26    private String topicName;
27
28    private final KafkaTemplate<String, User> kafkaTemplate;
29
30    @Bean
31    public CommandLineRunner sendData() {
32        return args -> {
33            List<String> users = Arrays.asList("david", "john", "raj", "peter");
34            Random random = new Random();
35            while (true) {
36                User user = new User(users.get(random.nextInt(users.size())), random.nextInt(100));
37                ProducerRecord<String, User> producerRecord = new ProducerRecord<>(topicName, user);
38                producerRecord.headers().add("message-id", UUID.randomUUID().toString().getBytes());
39                log.info("Sending user: {}", user);
40                kafkaTemplate.send(producerRecord);
41                TimeUnit.SECONDS.sleep(10);
42            }
43        };
44    }
45
46    public static void main(String[] args) {
47        SpringApplication.run(KafkaProducer.class, args);
48    }
49
50}
 1package com.demo.project80.consumer;
 2
 3import com.demo.project80.pojo.User;
 4import lombok.SneakyThrows;
 5import lombok.extern.slf4j.Slf4j;
 6import org.apache.kafka.clients.consumer.ConsumerRecord;
 7import org.springframework.boot.SpringApplication;
 8import org.springframework.boot.autoconfigure.SpringBootApplication;
 9import org.springframework.kafka.annotation.KafkaListener;
10
11@SpringBootApplication
12@Slf4j
13public class KafkaConsumer {
14
15    @SneakyThrows
16    @KafkaListener(id = "my-client-app", topics = "${topic.name}")
17    public void topicConsumer(ConsumerRecord<String, User> consumerRecord) {
18        User user = consumerRecord.value();
19        log.info("Received User : {}", user);
20    }
21
22    public static void main(String[] args) {
23        SpringApplication.run(KafkaConsumer.class, args);
24    }
25
26}
 1spring:
 2  main:
 3    banner-mode: "off"
 4    web-application-type: none
 5  kafka:
 6    bootstrap-servers: localhost:9092
 7    consumer:
 8      client-id: my-client-consumer
 9      group-id: group-01
10      auto-offset-reset: earliest
11      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
12      value-deserializer: com.demo.project80.converter.MessageDeserializer
13    producer:
14      client-id: my-client-app
15      key-serializer: org.apache.kafka.common.serialization.StringSerializer
16      value-serializer: com.demo.project80.converter.MessageSerializer
17      #value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
18topic:
19  name: mytopic.000
 1version: '2'
 2services:
 3  zookeeper:
 4    container_name: zookeeper
 5    image: 'bitnami/zookeeper:latest'
 6    ports:
 7      - 2181:2181
 8    environment:
 9      - ALLOW_ANONYMOUS_LOGIN=yes
10  kafkaserver:
11    hostname: kafkaserver
12    container_name: kafkaserver
13    image: 'bitnami/kafka:latest'
14    ports:
15      - 9092:9092
16    depends_on:
17      - zookeeper
18    environment:
19      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
20      - KAFKA_ADVERTISED_HOST_NAME=kafkaserver
21      - ALLOW_PLAINTEXT_LISTENER=yes
22    links:
23      - zookeeper:zookeeper
24  kafka-ui:
25    container_name: kafka-ui
26    image: provectuslabs/kafka-ui:latest
27    ports:
28      - 9090:8080
29    depends_on:
30      - zookeeper
31      - kafkaserver
32    environment:
33      KAFKA_CLUSTERS_0_NAME: local
34      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafkaserver:9092
35      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

Run the main method of producer and then the consumer.

The group id of your client which uses group management to assign topic partitions to consumers, auto-offset-reset=earliest ensures the new consumer group will get the oldest available message.

we can have multiple kafka listener for a topic with different group id A consumer can listen to more than one topic. We have created the topic 'mytopic' with only one partition. For a topic with multiple partitions, @KafkaListener can explicitly subscribe to a particular partition of a topic with an initial offset.

1@KafkaListener(topics = "topic1", group = "group1")
2@KafkaListener(topics = "topic1,topic2", group = "group1")
3@KafkaListener(topicPartitions = @TopicPartition(topic = "topic1",
4  partitionOffsets = {
5    @PartitionOffset(partition = "0", initialOffset = "0"), 
6    @PartitionOffset(partition = "2", initialOffset = "0")
7}))

Kafka Streams

Kafka Streams has stream-table duality. Tables are a set of evolving facts. Each new event overwrites the old one, whereas streams are a collection of immutable facts. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. KTable manages the changelog stream with the latest state of a given key For not partitioned tables we can use GlobalKTables to broadcast information to all tasks.

When we use other projects like apache spark, storm,flink we write code and copy the jar to the nodes where the actual work happens. With the introduction of kafka stream we can now write your processing logic for streams and then it can run anywhere the jar can run. KafkaStreams enables us to consume from Kafka topics, analyze or transform data, and potentially, send it to another Kafka topic.

We will now count the users by age group.

 1package com.demo.project80.stream;
 2
 3import java.util.Properties;
 4
 5import com.demo.project80.pojo.User;
 6import lombok.extern.slf4j.Slf4j;
 7import org.apache.kafka.common.serialization.Serde;
 8import org.apache.kafka.common.serialization.Serdes;
 9import org.apache.kafka.streams.KafkaStreams;
10import org.apache.kafka.streams.KeyValue;
11import org.apache.kafka.streams.StreamsBuilder;
12import org.apache.kafka.streams.StreamsConfig;
13import org.apache.kafka.streams.kstream.Consumed;
14import org.apache.kafka.streams.kstream.Grouped;
15import org.apache.kafka.streams.kstream.KStream;
16import org.apache.kafka.streams.kstream.KTable;
17import org.springframework.beans.factory.annotation.Value;
18import org.springframework.boot.CommandLineRunner;
19import org.springframework.boot.SpringApplication;
20import org.springframework.boot.autoconfigure.SpringBootApplication;
21import org.springframework.context.annotation.Bean;
22import org.springframework.context.annotation.Profile;
23import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
24import org.springframework.kafka.support.serializer.JsonSerde;
25
26@SpringBootApplication
27@Slf4j
28public class KafkaStream {
29
30    @Value(value = "${topic.name}")
31    private String topicName;
32
33    private static final Serde<String> STRING_SERDE = Serdes.String();
34
35    @Bean
36    public CommandLineRunner streamData() {
37        return args -> {
38            StreamsBuilder streamsBuilder = new StreamsBuilder();
39            KStream<String, User> streamOfUsers = streamsBuilder
40                    .stream(topicName, Consumed.with(STRING_SERDE, new JsonSerde<>(User.class)));
41
42            streamOfUsers.foreach((k, v) -> {
43                log.info("user: {}, age: {}", v.getName(), v.getAge());
44            });
45
46            KTable<String, Long> employeeCountByCompany = streamOfUsers
47                    .map((k, v) -> new KeyValue<>(v.getAge(), String.valueOf(v.getAge())))
48                    .groupBy((k, w) -> w, Grouped.with(STRING_SERDE, STRING_SERDE))
49                    .count();
50            employeeCountByCompany.toStream().foreach((w, c) -> log.info("Age: " + w + " -> " + c));
51
52            KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kStreamsConfigs());
53            streams.cleanUp();
54            streams.start();
55            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
56        };
57    }
58
59    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
60    public Properties kStreamsConfigs() {
61        Properties props = new Properties();
62        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-group");
63        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
64        return props;
65    }
66
67    public static void main(String[] args) {
68        SpringApplication.run(KafkaStream.class, args);
69    }
70
71}

Run the main method of KafkaStream.

Setup

Project 80

Spring Boot & Kafka

https://gitorko.github.io/spring-apache-kafka/

Version

Check version

1$java --version
2openjdk 17.0.3 2022-04-19 LTS

Kafka

To run kafka we need zookeeper, use the docker compose command to run kafka as a container

For windows ensure the C:\Windows\System32\drivers\etc\hosts file has these 2 entries. For link ensure /etc/hosts has these 2 entries.

1127.0.0.1 zookeeper
2127.0.0.1 kafkaserver
1docker-compose -f docker/docker-compose.yml up

To create topic

1docker exec -it kafkaserver /bin/bash
2$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic mytopic.000 --bootstrap-server localhost:9092

To delete topic

1docker exec -it kafkaserver /bin/bash
2$ /opt/bitnami/kafka/bin/kafka-topics.sh --delete --topic mytopic.000 --bootstrap-server localhost:9092

Clean up

1docker-compose -f docker/docker-compose.yml stop
2docker rm kafka-ui kafkaserver zookeeper

Restart

1docker-compose -f docker/docker-compose.yml start

Dashboard for kafka, wait for a few seconds as it takes time to come up.

Open http://localhost:9090/

References

https://www.baeldung.com/java-kafka-streams

https://kafka.apache.org/quickstart

https://baeldung-cn.com/java-kafka-streams-vs-kafka-consumer

https://tanzu.vmware.com/developer/guides/kafka-gs/

https://kafka.apache.org/

comments powered by Disqus