Spring Kafka Batch Listener Example
In the following tutorial we demonstrate how to setup a batch listener using Spring Kafka, Spring Boot and Maven. We start by configuring the BatchListener. You can optionally configure a BatchErrorHandler. We also demonstrate how to set the upper limit of batch size messages. When we receive messages we also have the possibility of grabbing header values for individual messages.
Project Setup
- Spring Kafka: 2.1.4.RELEASE
- Spring Boot: 2.0.0.RELEASE
- Apache Kafka: kafka_2.11-1.0.0
- Maven: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.memorynotfound.spring.kafka</groupId> <artifactId>batch-listener</artifactId> <version>1.0.0-SNAPSHOT</version> <url>http://memorynotfound.com</url> <description>Spring Kafka - Batch Listener Example</description> <name>Spring Kafka - ${project.artifactId}</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <!-- testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Sending Messages to Kafka
In a previous tutorial we saw how to produce and consume messages using Spring Kafka. The Sender and SenderConfig are identical. In the following example we show how to batch receive messages using a BatchListener.
Configuring a Batch Listener
Starting with version 1.1, @KafkaListener methods can be configured to receive the entire batch of consumer records received from the consumer poll. To configure the listener container factory to create batch listeners, set the batchListener property of the ConcurrentKafkaListenerContainerFactory to true.
We can optionally create a BatchErrorHandler by using the ConcurrentKafkaListenerContainerFactory#getContainerProperties().setBatchErrorHandler() and providing your Batch Error Handler.
We can configure Spring Kafka to set an upper limit for the batch size by setting the ConsumerConfig.MAX_POLL_RECORDS_CONFIG to a value that suits you. By default, the number of records received in each batch is dynamically calculated. In the following example we configured the upper limit to 5.
package com.memorynotfound.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.BatchLoggingErrorHandler; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class ListenerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler()); return factory; } }
Batch Receive Kafka Messages using a Batch Listener
Since we are receiving batch messages we need to update the receive() method to accept a List of messages. Alternatively you can receive a List of Message<?> or ConsumerRecord<?,?&glt; objects with each offset, etc in each message, but it must be the only parameter (aside from an optional Acknowledgment when using manual commits) defined on the method.
While receiving batch messages it’s also possible to receive the complementary headers of individual messages. You’ll also need to accept a List of headers you want to get.
package com.memorynotfound.kafka.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import java.util.List; @Service public class Listener { private static final Logger LOG = LoggerFactory.getLogger(Listener.class); @KafkaListener(id = "batch-listener", topics = "${app.topic.batch}") public void receive(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -"); LOG.info("beginning to consume batch messages"); for (int i = 0; i < messages.size(); i++) { LOG.info("received message='{}' with partition-offset='{}'", messages.get(i), partitions.get(i) + "-" + offsets.get(i)); } LOG.info("all batch messages consumed"); } }
Configure Application with application.yml
We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.
spring: kafka: bootstrap-servers: localhost:9092 app: topic: foo: foo.t logging: level: root: ERROR org.springframework.web: ERROR com.memorynotfound: DEBUG
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.
package com.memorynotfound.kafka; import com.memorynotfound.kafka.producer.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringKafkaApplication.class, args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { for (int i = 1; i < 13; i++){ sender.send("message-" + i); } } }
Demo
When we run the application we receive the following output.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.0.RELEASE) Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE sending message='message-1' to topic='batch.t' sending message='message-2' to topic='batch.t' sending message='message-3' to topic='batch.t' sending message='message-4' to topic='batch.t' sending message='message-5' to topic='batch.t' sending message='message-6' to topic='batch.t' sending message='message-7' to topic='batch.t' sending message='message-8' to topic='batch.t' sending message='message-9' to topic='batch.t' sending message='message-10' to topic='batch.t' sending message='message-11' to topic='batch.t' sending message='message-12' to topic='batch.t' - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - beginning to consume batch messages received message='message-1' with partition-offset='0-295' received message='message-2' with partition-offset='0-296' received message='message-3' with partition-offset='0-297' received message='message-4' with partition-offset='0-298' received message='message-5' with partition-offset='0-299' all batch messages consumed - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - beginning to consume batch messages received message='message-6' with partition-offset='0-300' received message='message-7' with partition-offset='0-301' received message='message-8' with partition-offset='0-302' received message='message-9' with partition-offset='0-303' received message='message-10' with partition-offset='0-304' all batch messages consumed - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - beginning to consume batch messages received message='message-11' with partition-offset='0-305' received message='message-12' with partition-offset='0-306' all batch messages consumed
Download
From:一号门
COMMENTS