Spring Kafka Consumer and Producer Example
This tutorial demonstrates how to send and receive messages from Spring Kafka. We start by creating a Spring Kafka Producer which is able to send messages to a Kafka topic. Next we create a Spring Kafka Consumer which is able to listen to messages send to a Kafka topic. We configure both with appropriate key/value serializers and deserializers. Finally we demonstrate the application using a simple Spring Boot application.
Download and Install Apache Kafka
To download and install Apache Kafka, please read the official documentation here. This tutorial assumes that server is started using the default configuration and no server ports are changed.
Project Setup
- Spring Kafka: 2.1.4.RELEASE
- Spring Boot: 2.0.0.RELEASE
- Apache Kafka: kafka_2.11-1.0.0
- Maven: 3.5
Project Structure
Let’s start by looking at the project structure.
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.memorynotfound.spring.kafka</groupId> <artifactId>producer-consumer</artifactId> <version>1.0.0-SNAPSHOT</version> <url>http://memorynotfound.com/spring-kafka-consumer-producer-example</url> <name>Spring Kafka - ${project.artifactId}</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Spring Kafka Sending Messages to Topic
Let’s start by sending messages. We use the KafkaTemplate class which wraps a Producer and provides high-level operations to send data to Kafka topics. Both asynchronous and synchronous methods are provided, with the async methods returning a Future.
package com.memorynotfound.kafka.producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class Sender { private static final Logger LOG = LoggerFactory.getLogger(Sender.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${app.topic.foo}") private String topic; public void send(String message){ LOG.info("sending message='{}' to topic='{}'", message, topic); kafkaTemplate.send(topic, message); } }
In order to successfully send messages to a Kafka topic, we need to configure The KafkaTemplate. This configuration is handled by the SenderConfig class.
We configure the KafkaTemplate using an implementation of the ProducerFactory more specifically the DefaultKafkaProducerFactory. We can initialize this producer factory using a Map<String, Object>. We can use the keys taken from the ProducerConfig class.
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping/this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG specifies the serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG specifies the serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
For a complete list of configuration options take a look at the ProducerConfig class.
package com.memorynotfound.kafka.producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class SenderConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
Spring Kafka Listening Messages from Topic
Next, we’ll show how to listen to messages from a Kafka topic. The Receiver class will consume messages form a Kafka topic. We created the Listen() method and annotated it with the @KafkaListener annotation which marks the method to be the target of a Kafka message listener on the specified topics.
package com.memorynotfound.kafka.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver.class); @KafkaListener(topics = "${app.topic.foo}") public void listen(@Payload String message) { LOG.info("received message='{}'", message); } }
This mechanism requires an @EnableKafka annotation on one of the @Configuration classes and listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer.
It is important that we use the same type of key/value deserializers which we used in the SenderConfig class.
- ConsumerConfig.GROUP_ID_CONFIG specifies a unique string that identifies the consumer group this consumer belongs to.
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG specifies what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g.
because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer’s group
- anything else: throw exception to the consumer.
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances. If all the consumer instances have different consumer groups, then each record will be broadcasted to all the consumer processes.
For a complete list of configuration options take a look at the ConsumerConfig class.
package com.memorynotfound.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class ReceiverConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Configure Application with application.yml
We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.
spring: kafka: bootstrap-servers: localhost:9092 app: topic: foo: foo.t logging: level: root: ERROR org.springframework.web: ERROR com.memorynotfound: DEBUG
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.
package com.memorynotfound.kafka; import com.memorynotfound.kafka.producer.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ProducerConsumerApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(ProducerConsumerApplication.class, args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { sender.send("Spring Kafka Producer and Consumer Example"); } }
Demo
When we run the application we receive the following output.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.0.RELEASE) Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE sending message='Spring Kafka Producer and Consumer Example' to topic='foo.t' received message='Spring Kafka Producer and Consumer Example'
Download
From:一号门
COMMENTS