Spring Kafka Forwarding Listener Results using @SendTo
This tutorial demonstrates how to forward listener results using the @SendTo annotation using Spring Kafka, Spring Boot and Maven. We can use static typed topics, runtime expressions or application initialization expressions. Take a look at the following example.
Project Setup
- Spring Kafka: 2.1.4.RELEASE
- Spring Boot: 2.0.0.RELEASE
- Apache Kafka: kafka_2.11-1.0.0
- Maven: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.memorynotfound.spring.kafka</groupId> <artifactId>send-to</artifactId> <version>1.0.0-SNAPSHOT</version> <url>http://memorynotfound.com</url> <description>Spring Kafka - forwarding listener results using @SendTo</description> <name>Spring Kafka - ${project.artifactId}</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <!-- testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Sending Messages to Kafka Topic
We use the Constants object as a placeholder for the Kafka topics. This way we can easily use constants to specify which topic we are sending to.
package com.memorynotfound.kafka; public class Constants { public static final String FOO_TOPIC = "foo.t"; public static final String BAR_TOPIC = "bar.t"; }
Spring Boot auto-configures the KafkaTemplate using properties from the application.yml property file. We use it to send a message to a Kafka topic.
package com.memorynotfound.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import static com.memorynotfound.kafka.Constants.FOO_TOPIC; @Service public class Sender { private static final Logger LOG = LoggerFactory.getLogger(Sender.class); @Autowired private KafkaTemplate<String, Double> kafkaTemplate; public void send(Double data){ LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC); kafkaTemplate.send(FOO_TOPIC, data); } }
Forwarding Listener Results using @SendTo
Starting from version 2.0, if you also annotate a @KafkaListener with a @SendTo annotation and the method invocation returns a result, the result will be forwarded to the topic specified by the @SendTo annotation.
The @SendTo value argument can accept several forms:
- @SendTo("someTopic") specifies a static topic to rout to.
- @SendTo("#{someExpression}") specifies an application initialization expression. This expression is evaluated once during application context initialization and’ll be forwarded to the result.
- @SendTo("!{someExpression}") specifies a runtime expression. This expression is evaluated at runtime. The #root object for the evaluation has 3 properties:
- request – the inbound ConsumerRecord (or ConsumerRecords object for a batch listener)
- source – the Message<?> converted from the request.
- result – the method return result.
- @SendTo() (no properties) is treaded as !{source.headers["kafka_replyTopic']} (since version 2.1.3).
package com.memorynotfound.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Service; import static com.memorynotfound.kafka.Constants.BAR_TOPIC; import static com.memorynotfound.kafka.Constants.FOO_TOPIC; @Service public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver.class); @SendTo(BAR_TOPIC) @KafkaListener(topics = FOO_TOPIC) public Double calculate(Double data) { LOG.info("calculating square root from='{}'", data); return Math.sqrt(data); } @KafkaListener(topics = BAR_TOPIC) public void result(Double data) { LOG.info("received square root='{}'", data); } }
Configure Application with application.yml
Spring Boot tries to automatically configure your application with sensible defaults based on the specified dependencies inside your pom.xml file. We haven’t configured any Consumer, Producer or KafkaTemplate beans, spring boot will auto-configure them using spring boot default values. These values can be overridden using the application.yml property file. You can find more information about Spring Boot Kafka Properties.
We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.
spring: kafka: consumer: group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer value-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer producer: key-serializer: org.apache.kafka.common.serialization.DoubleSerializer value-serializer: org.apache.kafka.common.serialization.DoubleSerializer logging: level: root: WARN org.springframework.web: INFO com.memorynotfound: DEBUG
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.
package com.memorynotfound.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringKafkaApplication.class, args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { sender.send(123.123); } }
When we run the application we receive the following output.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.0.RELEASE) Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE Started SpringKafkaApplication in 2.334 seconds (JVM running for 3.186) Started SpringKafkaApplication in 3.272 seconds (JVM running for 4.534) sending data='123.123' to topic='foo.t' calculating square root from='123.123' received square root='11.096080389038285'