Spring Kafka Adding Custom Header to Kafka Message Example
In this tutorial we demonstrate how to add/read custom headers to/from a Kafka Message using Spring Kafka. We start by adding headers using either Message<?> or ProducerRecord<String, String>. Followed by reading the values inside the KafkaListener using @Header annotation and MessageHeaders class.
Project Setup
- Spring Kafka: 2.1.4.RELEASE
- Spring Boot: 2.0.0.RELEASE
- Apache Kafka: kafka_2.11-1.0.0
- Maven: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path. Important: We need to include the com.fasterxml.jackson.core:jackson-databind dependency for working with rich header values.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.memorynotfound.spring.kafka</groupId> <artifactId>message-headers</artifactId> <version>1.0.0-SNAPSHOT</version> <url>http://memorynotfound.com</url> <name>Spring Kafka - ${project.artifactId}</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version> <spring-boot.version>2.0.0.RELEASE</spring-boot.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <!-- for serializing/deserializing complex headers --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!-- testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
The 0.11.0.0 Apache Kafka client introduced support for headers in messages. Spring for Apache Kafka supports mapping these headers to/from MessageHeaders since version 2.0.
Sending Custom Headers with Spring Kafka
Let’s start by adding custom header values to a Kafka Message. We can add headers to a Kafka message using either Message<?> or ProducerRecord<String, String> like shown in the following code.
package com.memorynotfound.kafka.producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; @Service public class Sender { private static final Logger LOG = LoggerFactory.getLogger(Sender.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${app.topic.foo}") private String topicFoo; @Value("${app.topic.bar}") private String topicBar; public void sendFoo(String data){ Message<String> message = MessageBuilder .withPayload(data) .setHeader(KafkaHeaders.TOPIC, topicFoo) .setHeader(KafkaHeaders.MESSAGE_KEY, "999") .setHeader(KafkaHeaders.PARTITION_ID, 0) .setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka") .build(); LOG.info("sending message='{}' to topic='{}'", data, topicFoo); kafkaTemplate.send(message); } public void sendBar(String data){ List<Header> headers = new ArrayList<>(); headers.add(new RecordHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka".getBytes())); ProducerRecord<String, String> bar = new ProducerRecord<>(topicBar, 0, "111", data, headers); LOG.info("sending message='{}' to topic='{}'", data, topicBar); kafkaTemplate.send(bar); } }
We configure the KafkaTemplate inside the SenderConfig class. For simplicity we used a StringSerializer for both key and value fields.
package com.memorynotfound.kafka.producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class SenderConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
Reading Custom Header Values with Spring Kafka
Previously we saw how to send custom header values. Now we are going to read those values. We have a couple of options. We can inject each header individually using the @Header annotation. Or we can inject the MessageHeaders which you can use to iterate over each header. You can use whichever you find more suitable.
package com.memorynotfound.kafka.consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service public class Listener { private static final Logger LOG = LoggerFactory.getLogger(Listener.class); @KafkaListener(topics = "${app.topic.foo}") public void receive(@Payload String data, @Header(KafkaHeaders.OFFSET) Long offset, @Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer, @Header(KafkaHeaders.TIMESTAMP_TYPE) String timestampType, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp, @Header("X-Custom-Header") String customHeader) { LOG.info("- - - - - - - - - - - - - - -"); LOG.info("received message='{}'", data); LOG.info("consumer: {}", consumer); LOG.info("topic: {}", topic); LOG.info("message key: {}", messageKey); LOG.info("partition id: {}", partitionId); LOG.info("offset: {}", offset); LOG.info("timestamp type: {}", timestampType); LOG.info("timestamp: {}", timestamp); LOG.info("custom header: {}", customHeader); } @KafkaListener(topics = "${app.topic.bar}") public void receive(@Payload String data, @Headers MessageHeaders messageHeaders) { LOG.info("- - - - - - - - - - - - - - -"); LOG.info("received message='{}'", data); messageHeaders.keySet().forEach(key -> { Object value = messageHeaders.get(key); if (key.equals("X-Custom-Header")){ LOG.info("{}: {}", key, new String((byte[])value)); } else { LOG.info("{}: {}", key, value); } }); } }
The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name. To support rich content i’ll automatically convert objects to JSON. You can optionally initialize the DefaultKafkaHeaderMapper using your own ObjectMapper and patterns.
package com.memorynotfound.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class ListenerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public DefaultKafkaHeaderMapper headerMapper(){ return new DefaultKafkaHeaderMapper(); } }
Configure with application.yml
We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.
spring: kafka: bootstrap-servers: localhost:9092 app: topic: foo: foo.t bar: bar.t logging: level: root: WARN org.springframework.web: INFO com.memorynotfound: DEBUG
Running with Spring Boot
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.
package com.memorynotfound.kafka; import com.memorynotfound.kafka.producer.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringKafkaApplication.class, args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { String data = "Spring Kafka Custom Header Example"; sender.sendFoo(data); sender.sendBar(data); } }
Output
When we run the application we receive the following output.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.0.RELEASE) Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE No active profile set, falling back to default profiles: default sending message='Spring Kafka Custom Header Example' to topic='foo.t' sending message='Spring Kafka Custom Header Example' to topic='bar.t' - - - - - - - - - - - - - - - received message='Spring Kafka Custom Header Example' X-Custom-Header: Sending Custom Header with Spring Kafka kafka_offset: 49 kafka_consumer: [email protected] kafka_timestampType: CREATE_TIME kafka_receivedMessageKey: 111 kafka_receivedPartitionId: 0 kafka_receivedTopic: bar.t kafka_receivedTimestamp: 1520322866725 - - - - - - - - - - - - - - - received message='Spring Kafka Custom Header Example' consumer: [email protected] topic: foo.t message key: 999 partition id: 0 offset: 137 timestamp type: CREATE_TIME timestamp: 1520322866701 custom header: Sending Custom Header with Spring Kafka
Download
From:一号门
COMMENTS