Spring Kafka JSON Serializer and Deserializer Example
The following tutorial demonstrates how to send and receive a Java Object as a JSON byte[] to and from Apache Kafka using Spring Kafka, Spring Boot and Maven. We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterwards we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.
Project Setup
- Spring Kafka: 2.1.4.RELEASE
- Spring Boot: 2.0.0.RELEASE
- Apache Kafka: kafka_2.11-1.0.0
- Maven: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path. Since we are working with JSON, we need to include the Jackson JSON library com.fasterxml.jackson.core:ackson-databind.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.memorynotfound.spring.kafka</groupId> <artifactId>message-conversion-json</artifactId> <version>1.0.0-SNAPSHOT</version> <url>http://memorynotfound.com/spring-kafka-json-serializer-deserializer-example</url> <description>Spring Kafka - JSON Serializer Deserializer Example</description> <name>Spring Kafka - ${project.artifactId}</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <!-- json support --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!-- testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Simple POJO to Serialize/Deserialize
In this example we’ll send and receive a Foo object to and from a Kafka topic.
package com.memorynotfound.kafka; public class Foo { private String name; private String description; public Foo() { } public Foo(String name, String description) { this.name = name; this.description = description; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "Foo{" + "name='" + name + '\'' + ", description='" + description + '\'' + '}'; } }
Apache Kafka stores and transports bye[]. There are a number of built in serializers and deserializers but it doesn’t include any for JSON. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON.
Producing JSON messages with Spring Kafka
Let’s start by sending a Foo object to a Kafka Topic. Notice: we created a KafkaTemplate<String, Foo> since we are sending Java Objects to the Kafka topic that’ll automatically be transformed in a JSON byte[]. In this example we created a Message<Foo> using the MessageBuilder. It’s important to add the topic where we are going to send the message to.
package com.memorynotfound.kafka.producer; import com.memorynotfound.kafka.Foo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; @Service public class FooSender { private static final Logger LOG = LoggerFactory.getLogger(FooSender.class); @Autowired private KafkaTemplate<String, Foo> kafkaTemplate; @Value("${app.topic.example}") private String topic; public void send(Foo data){ LOG.info("sending data='{}' to topic='{}'", data, topic); Message<Foo> message = MessageBuilder .withPayload(data) .setHeader(KafkaHeaders.TOPIC, topic) .build(); kafkaTemplate.send(message); } }
Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka Properties.
- JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature.
- JsonSerializer.DEFAULT_KEY_TYPE; fallback type for deserialization of keys if no header information is present.
- JsonSerializer.DEFAULT_VALUE_TYPE; fallback type for deserialization of values if no header information is present.
- JsonSerializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of packages patterns allowed for deserialization; * means deserialize all.
We need to configure the correct Serializer to support JSON types. We can register this by setting the ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to the JsonSerializer class. Finally, we need to set the correct value type for our ProducerFactory and KafkaTemplate to the Foo object.
package com.memorynotfound.kafka.producer; import com.memorynotfound.kafka.Foo; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class FooSenderConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory<String, Foo> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, Foo> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
Consuming JSON Messages with Spring Kafka
Next, we’ll look at how we can receive JSON messages. In the FooListener we simply need to add the Foo Java Object as a parameter in our method.
package com.memorynotfound.kafka.consumer; import com.memorynotfound.kafka.Foo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service public class FooListener { private static final Logger LOG = LoggerFactory.getLogger(FooListener.class); @KafkaListener(topics = "${app.topic.example}") public void receive(@Payload Foo data, @Headers MessageHeaders headers) { LOG.info("received data='{}'", data); headers.keySet().forEach(key -> { LOG.info("{}: {}", key, headers.get(key)); }); } }
The FooListenerConfig is a bit more complex. First we need to add the appropriate Deserializer which can convert JSON byte[] into a Java Object. To do this, we need to set the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG with the JsonDeserializer class. Next we need to create a ConsumerFactory and pass the consumer configuration, the key deserializer and the typed JsonDeserializer<>(Foo.class). Finally, we need to make sure the ConsumerFactory and the ConcurrentKafkaListenerContainerFactory all have the correct value type of Foo.
package com.memorynotfound.kafka.consumer; import com.memorynotfound.kafka.Foo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class FooListenerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "json"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ConsumerFactory<String, Foo> consumerFactory() { return new DefaultKafkaConsumerFactory<>( consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Foo.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Foo> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Foo> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Configure with application.yml
We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.
spring: kafka: bootstrap-servers: localhost:9092 app: topic: example: example.t logging: level: root: WARN org.springframework.web: INFO com.memorynotfound: DEBUG
Running with Spring Boot
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.
package com.memorynotfound.kafka; import com.memorynotfound.kafka.producer.FooSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringKafkaApplication.class, args); } @Autowired private FooSender sender; @Override public void run(String... strings) throws Exception { Foo foo = new Foo("Spring Kafka", "sending and receiving JSON messages"); sender.send(foo); } }
Output
When we run the application we receive the following output.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.0.RELEASE) Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE No active profile set, falling back to default profiles: default sending data='Foo{name='Spring Kafka', description='sending and receiving JSON messages'}' to topic='example.t' received data='Foo{name='Spring Kafka', description='sending and receiving JSON messages'}' kafka_offset: 18 kafka_consumer: org.apac[email protected] kafka_timestampType: CREATE_TIME kafka_receivedMessageKey: null kafka_receivedPartitionId: 0 kafka_receivedTopic: example.t kafka_receivedTimestamp: 1520332684097 __TypeId__: [99, 111, 109, 46, 109, 101, 109, 111, 114, 121, 110, 111, 116, 102, 111, 117, 110, 100, 46, 107, 97, 102, 107, 97, 46, 70, 111, 111]
Download
From:一号门
Previous:Spring Kafka Batch Listener Example
COMMENTS