Spring Kafka and Spring Boot Configuration Example
In the following tutorial we demonstrate how to configure Spring Kafka with Spring Boot. Spring Boot uses sensible default to configure Spring Kafka. We can override these defaults using the application.yml property file.
Project Setup
- Spring Kafka: 2.1.4.RELEASE
- Spring Boot: 2.0.0.RELEASE
- Apache Kafka: kafka_2.11-1.0.0
- Maven: 3.5
Previously we saw how to create a spring kafka consumer and producer which manually configures the Producer and Consumer. In this example we’ll use Spring Boot to automatically configure them for us using sensible defaults.
Download and Install Apache Kafka
To download and install Apache Kafka, please read the official documentation here. This tutorial assumes that server is started using the default configuration and no server ports are changed.
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version= "1.0" encoding= "UTF-8" ?> <project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http: //maven.apache.org/POM/4.0.0 http: //maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion> 4.0 . 0 </modelVersion> <groupId>com.memorynotfound.spring.kafka</groupId> <artifactId>spring-boot</artifactId> <version> 1.0 . 0 -SNAPSHOT</version> <url>http: //memorynotfound.com</url> <description>Spring Kafka Spring Boot Configuration Example</description> <name>Spring Kafka - ${project.artifactId}</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version> 2.0 . 0 .RELEASE</version> </parent> <properties> <java.version> 1.8 </java.version> <project.build.sourceEncoding>UTF- 8 </project.build.sourceEncoding> <spring-kafka.version> 2.1 . 4 .RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <!-- testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> |
Sending Spring Kafka Messages with Spring Boot
Spring Boot automatically configures and initializes a KafkaTemplate based on the properties configured in the application.yml property file. By using the @Service annotation we make the Sender class eligible for the spring container to do auto discovery.
package com.memorynotfound.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class Sender { private static final Logger LOG = LoggerFactory.getLogger(Sender. class ); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value ( "${app.topic.foo}" ) private String topic; public void send(String message){ LOG.info( "sending message='{}' to topic='{}'" , message, topic); kafkaTemplate.send(topic, message); } } |
Receiving Kafka Messages with Spring Boot
The ConcurrentKafkaListenerContainerFactory and KafkaMessageListenerContainer beans are also automatically configured by Spring Boot. You can optionally configure these beans using the application.yml property file.
By annotating a method with @KafkaListener annotation Spring Kafka will automatically create a message listener container.
package com.memorynotfound.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver. class ); @KafkaListener (topics = "${app.topic.foo}" ) public void receive( @Payload String message, @Headers MessageHeaders headers) { LOG.info( "received message='{}'" , message); headers.keySet().forEach(key -> LOG.info( "{}: {}" , key, headers.get(key))); } } |
Configure Application with application.yml
Spring Boot tries to automatically configure your application with sensible defaults based on the specified dependencies inside your pom.xml file. We haven’t configured any Consumer, Producer or KafkaTemplate beans, spring boot will auto-configure them using spring boot default values. These values can be overridden using the application.yml property file. You can find more information about Spring Boot Kafka Properties.
We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.
spring: kafka: consumer: group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer app: topic: foo: foo.t logging: level: root: WARN org.springframework.web: INFO com.memorynotfound: DEBUG |
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.
package com.memorynotfound.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringKafkaApplication. class , args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { sender.send( "Spring Kafka and Spring Boot Configuration Example" ); } } |
Demo
When we run the application we receive the following output.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | ' _| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2. 0.0 .RELEASE) Running with Spring Boot v2. 0.0 .RELEASE, Spring v5. 0.4 .RELEASE Started SpringKafkaApplication in 2.334 seconds (JVM running for 3.186 ) sending message= 'Spring Kafka and Spring Boot Configuration Example' to topic= 'foo.t' received message= 'Spring Kafka and Spring Boot Configuration Example' kafka_offset: 144 kafka_nativeHeaders: RecordHeaders(headers = [], isReadOnly = false ) kafka_consumer: <a href= "/cdn-cgi/l/email-protection" class = "__cf_email__" data-cfemail= "305f42571e5140515358551e5b51565b511e535c59555e44431e535f5e43455d55421e7b51565b51735f5e43455d5542700654070554040852" >[email protected ]</a> kafka_timestampType: CREATE_TIME kafka_receivedMessageKey: null kafka_receivedPartitionId: 0 kafka_receivedTopic: foo.t kafka_receivedTimestamp: 1520508611795 |
Download
From:一号门
COMMENTS