Spring Boot WebFlux + Server-sent events example
In this article, we will show you how to develop a reactive web application, using Server-sent events
- Spring Boot 2.1.2.RELEASE
- Spring WebFlux 5.1.4.RELEASE
- Thymeleaf 3.0.11.RELEASE
- JUnit 5.3.2
- Maven 3
In Spring, returns JSON and header MediaType.TEXT_EVENT_STREAM_VALUE
@RestController public class CommentController { @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Comment> feed() { //...
In Javascript, uses EventSource to send a request to the above endpoint.
function loadComments () { this.source = null; this.start = function () { this.source = new EventSource("/comment/stream"); this.source.addEventListener("message", function (event) { var comment = JSON.parse(event.data); //... update somewhere }); this.source.onerror = function () { this.close(); }; }; this.stop = function() { this.source.close(); comment = new loadComments(); window.onload = function() { comment.start(); }; window.onbeforeunload = function() { comment.stop();
1. Project Directory
2. Maven
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mkyong.spring.reactive</groupId> <artifactId>webflux-thymeleaf-sse</artifactId> <version>1.0</version> <properties> <java.version>1.8</java.version> <junit-jupiter.version>5.3.2</junit-jupiter.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.2.RELEASE</version> </parent> <dependencies> <!-- webflux reactive --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <!-- thymeleaf --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <!-- exclude junit 4, prefer junit 5 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>junit</groupId> <artifactId>junit</artifactId> </exclusion> </exclusions> </dependency> <!-- junit 5 --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>${junit-jupiter.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.0</version> </plugin> </plugins> </build> </project>
Display the project dependencies.
$ mvn dependency:tree [INFO] com.mkyong.spring.reactive:webflux-thymeleaf-sse:jar:1.0 [INFO] +- org.springframework.boot:spring-boot-starter-webflux:jar:2.1.2.RELEASE:compile [INFO] | +- org.springframework.boot:spring-boot-starter:jar:2.1.2.RELEASE:compile [INFO] | | +- org.springframework.boot:spring-boot-starter-logging:jar:2.1.2.RELEASE:compile [INFO] | | | +- ch.qos.logback:logback-classic:jar:1.2.3:compile [INFO] | | | | \- ch.qos.logback:logback-core:jar:1.2.3:compile [INFO] | | | +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.11.1:compile [INFO] | | | | \- org.apache.logging.log4j:log4j-api:jar:2.11.1:compile [INFO] | | | \- org.slf4j:jul-to-slf4j:jar:1.7.25:compile [INFO] | | +- javax.annotation:javax.annotation-api:jar:1.3.2:compile [INFO] | | \- org.yaml:snakeyaml:jar:1.23:runtime [INFO] | +- org.springframework.boot:spring-boot-starter-json:jar:2.1.2.RELEASE:compile [INFO] | | +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.8:compile [INFO] | | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile [INFO] | | | \- com.fasterxml.jackson.core:jackson-core:jar:2.9.8:compile [INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.8:compile [INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.8:compile [INFO] | | \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.8:compile [INFO] | +- org.springframework.boot:spring-boot-starter-reactor-netty:jar:2.1.2.RELEASE:compile [INFO] | | \- io.projectreactor.netty:reactor-netty:jar:0.8.4.RELEASE:compile [INFO] | | +- io.netty:netty-codec-http:jar:4.1.31.Final:compile [INFO] | | | \- io.netty:netty-codec:jar:4.1.31.Final:compile [INFO] | | +- io.netty:netty-codec-http2:jar:4.1.31.Final:compile [INFO] | | +- io.netty:netty-handler:jar:4.1.31.Final:compile [INFO] | | | +- io.netty:netty-buffer:jar:4.1.31.Final:compile [INFO] | | | \- io.netty:netty-transport:jar:4.1.31.Final:compile [INFO] | | | \- io.netty:netty-resolver:jar:4.1.31.Final:compile [INFO] | | +- io.netty:netty-handler-proxy:jar:4.1.31.Final:compile [INFO] | | | \- io.netty:netty-codec-socks:jar:4.1.31.Final:compile [INFO] | | \- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.31.Final:compile [INFO] | | +- io.netty:netty-common:jar:4.1.31.Final:compile [INFO] | | \- io.netty:netty-transport-native-unix-common:jar:4.1.31.Final:compile [INFO] | +- org.hibernate.validator:hibernate-validator:jar:6.0.14.Final:compile [INFO] | | +- javax.validation:validation-api:jar:2.0.1.Final:compile [INFO] | | +- org.jboss.logging:jboss-logging:jar:3.3.2.Final:compile [INFO] | | \- com.fasterxml:classmate:jar:1.4.0:compile [INFO] | +- org.springframework:spring-web:jar:5.1.4.RELEASE:compile [INFO] | | \- org.springframework:spring-beans:jar:5.1.4.RELEASE:compile [INFO] | +- org.springframework:spring-webflux:jar:5.1.4.RELEASE:compile [INFO] | | \- io.projectreactor:reactor-core:jar:3.2.5.RELEASE:compile [INFO] | | \- org.reactivestreams:reactive-streams:jar:1.0.2:compile [INFO] | \- org.synchronoss.cloud:nio-multipart-parser:jar:1.1.0:compile [INFO] | +- org.slf4j:slf4j-api:jar:1.7.25:compile [INFO] | \- org.synchronoss.cloud:nio-stream-storage:jar:1.1.3:compile [INFO] +- org.springframework.boot:spring-boot-starter-thymeleaf:jar:2.1.2.RELEASE:compile [INFO] | +- org.thymeleaf:thymeleaf-spring5:jar:3.0.11.RELEASE:compile [INFO] | | \- org.thymeleaf:thymeleaf:jar:3.0.11.RELEASE:compile [INFO] | | +- org.attoparser:attoparser:jar:2.0.5.RELEASE:compile [INFO] | | \- org.unbescape:unbescape:jar:1.1.6.RELEASE:compile [INFO] | \- org.thymeleaf.extras:thymeleaf-extras-java8time:jar:3.0.2.RELEASE:compile [INFO] +- org.springframework.boot:spring-boot-starter-test:jar:2.1.2.RELEASE:test [INFO] | +- org.springframework.boot:spring-boot-test:jar:2.1.2.RELEASE:test [INFO] | +- org.springframework.boot:spring-boot-test-autoconfigure:jar:2.1.2.RELEASE:test [INFO] | +- com.jayway.jsonpath:json-path:jar:2.4.0:test [INFO] | | \- net.minidev:json-smart:jar:2.3:test [INFO] | | \- net.minidev:accessors-smart:jar:1.2:test [INFO] | | \- org.ow2.asm:asm:jar:5.0.4:test [INFO] | +- org.assertj:assertj-core:jar:3.11.1:test [INFO] | +- org.mockito:mockito-core:jar:2.23.4:test [INFO] | | +- net.bytebuddy:byte-buddy:jar:1.9.7:test [INFO] | | +- net.bytebuddy:byte-buddy-agent:jar:1.9.7:test [INFO] | | \- org.objenesis:objenesis:jar:2.6:test [INFO] | +- org.hamcrest:hamcrest-core:jar:1.3:test [INFO] | +- org.hamcrest:hamcrest-library:jar:1.3:test [INFO] | +- org.skyscreamer:jsonassert:jar:1.5.0:test [INFO] | | \- com.vaadin.external.google:android-json:jar:0.0.20131108.vaadin1:test [INFO] | +- org.springframework:spring-core:jar:5.1.4.RELEASE:compile [INFO] | | \- org.springframework:spring-jcl:jar:5.1.4.RELEASE:compile [INFO] | +- org.springframework:spring-test:jar:5.1.4.RELEASE:test [INFO] | \- org.xmlunit:xmlunit-core:jar:2.6.2:test [INFO] | \- javax.xml.bind:jaxb-api:jar:2.3.1:test [INFO] | \- javax.activation:javax.activation-api:jar:1.2.0:test [INFO] +- org.junit.jupiter:junit-jupiter-engine:jar:5.3.2:test [INFO] | +- org.apiguardian:apiguardian-api:jar:1.0.0:test [INFO] | +- org.junit.platform:junit-platform-engine:jar:1.3.2:test [INFO] | | +- org.junit.platform:junit-platform-commons:jar:1.3.2:test [INFO] | | \- org.opentest4j:opentest4j:jar:1.1.1:test [INFO] | \- org.junit.jupiter:junit-jupiter-api:jar:5.3.2:test [INFO] \- org.springframework.boot:spring-boot-devtools:jar:2.1.2.RELEASE:compile (optional) [INFO] +- org.springframework.boot:spring-boot:jar:2.1.2.RELEASE:compile [INFO] | \- org.springframework:spring-context:jar:5.1.4.RELEASE:compile [INFO] | +- org.springframework:spring-aop:jar:5.1.4.RELEASE:compile [INFO] | \- org.springframework:spring-expression:jar:5.1.4.RELEASE:compile [INFO] \- org.springframework.boot:spring-boot-autoconfigure:jar:2.1.2.RELEASE:compile
3. Spring Boot + Spring WebFlux
3.1 Spring WebFlux annotation based controller. To enable data streaming. write produces = MediaType.TEXT_EVENT_STREAM_VALUE
package com.mkyong.reactive.controller; import com.mkyong.reactive.model.Comment; import com.mkyong.reactive.repository.CommentRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; @RestController public class CommentController { @Autowired private CommentRepository commentRepository; @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Comment> feed() { return this.commentRepository.findAll();
package com.mkyong.reactive.controller; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.GetMapping; @Controller public class MainController { @GetMapping("/") public String index(final Model model) { return "index";
3.2 In repository, return a Flux object.
package com.mkyong.reactive.repository; import com.mkyong.reactive.model.Comment; import reactor.core.publisher.Flux; public interface CommentRepository { Flux<Comment> findAll();
package com.mkyong.reactive.repository; import com.mkyong.reactive.model.Comment; import com.mkyong.reactive.utils.CommentGenerator; import org.springframework.stereotype.Repository; import reactor.core.publisher.Flux; import java.time.Duration; import java.util.Arrays; import java.util.List; @Repository public class ReactiveCommentRepository implements CommentRepository { @Override public Flux<Comment> findAll() { //simulate data streaming every 1 second. return Flux.interval(Duration.ofSeconds(1)) .onBackpressureDrop() .map(this::generateComment) .flatMapIterable(x -> x); private List<Comment> generateComment(long interval) { Comment obj = new Comment( CommentGenerator.randomAuthor(), CommentGenerator.randomMessage(), CommentGenerator.getCurrentTimeStamp()); return Arrays.asList(obj);
3.3 A utils class to generate random comments.
package com.mkyong.reactive.utils; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; import java.util.Random; public class CommentGenerator { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"); private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final List<String> COMMENT_AUTHOR = Arrays.asList( "Mkyong", "Oliver", "Jack", "Harry", "Jacob", "Isla", "Emily", "Poppy", "Ava", "Isabella"); private static final List<String> COMMENT_MESSAGE = Arrays.asList( "I Love this!", "Me too!", "Wow", "True!", "Hello everyone here?", "Good!"); public static String randomAuthor() { return COMMENT_AUTHOR.get(RANDOM.nextInt(COMMENT_AUTHOR.size())); public static String randomMessage() { return COMMENT_MESSAGE.get(RANDOM.nextInt(COMMENT_MESSAGE.size())); public static String getCurrentTimeStamp() { return dtf.format(LocalDateTime.now());
3.4 Comment model.
package com.mkyong.reactive.model; public class Comment { private String author; private String message; private String timestamp; //getter, setter and constructor
3.5 Start Spring Boot.
package com.mkyong.reactive; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class CommentWebApplication { public static void main(String[] args) { SpringApplication.run(CommentWebApplication.class, args);
4. Thymeleaf
There is no special reactive tag in thymeleaf template, just uses the normal loop.
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <link data-th-href="@{/css/bootstrap.min.css}" rel="stylesheet"> <link data-th-href="@{/css/main.css}" rel="stylesheet"> </head> <body> <div class="container"> <div class="row"> <div id="title"> <h1>Spring WebFlux + Server Sent Events</h1> </div> <table id="comments" class="table table-striped"> <thead> <tr> <th width="10%">Author</th> <th width="60%">Message</th> <th width="30%">Date</th> </tr> </thead> <tbody> <tr class="result" data-th-each="comment : ${comments}"> <td>[[${comment.author}]]</td> <td>[[${comment.message}]]</td> <td>[[${comment.timestamp}]]</td> </tr> </tbody> </table> </div> </div> <script data-th-src="@{/js/main.js}"></script> </body> </html>
5. JavaScript EventSource.
The key is to use the Javascript EventSource class to send a request and listen to the message event, and update the streaming data into a table reactively.
function loadComments () { this.source = null; this.start = function () { var commentTable = document.getElementById("comments"); this.source = new EventSource("/comment/stream"); this.source.addEventListener("message", function (event) { // These events are JSON, so parsing and DOM fiddling are needed var comment = JSON.parse(event.data); var row = commentTable.getElementsByTagName("tbody")[0].insertRow(0); var cell0 = row.insertCell(0); var cell1 = row.insertCell(1); var cell2 = row.insertCell(2); cell0.className = "author-style"; cell0.innerHTML = comment.author; cell1.className = "text"; cell1.innerHTML = comment.message; cell2.className = "date"; cell2.innerHTML = comment.timestamp; }); this.source.onerror = function () { this.close(); }; }; this.stop = function() { this.source.close(); comment = new loadComments(); /* * Register callbacks for starting and stopping the SSE controller. */ window.onload = function() { comment.start(); }; window.onbeforeunload = function() { comment.stop();
6. Unit Test
WebTestClient to unit test the Streaming Responses
package com.mkyong.reactive; import com.mkyong.reactive.model.Comment; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.http.MediaType; import org.springframework.test.web.reactive.server.WebTestClient; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class TestCommentWebApplication { @Autowired private WebTestClient webClient; @Test public void testCommentStream() { List<Comment> comments = webClient .get().uri("/comment/stream") .accept(MediaType.valueOf(MediaType.TEXT_EVENT_STREAM_VALUE)) .exchange() .expectStatus().isOk() .returnResult(Comment.class) .getResponseBody() .take(3) // take 3 comment objects .collectList() .block(); comments.forEach(x -> System.out.println(x)); assertEquals(3, comments.size());
7.Demo
$ mvn spring-boot:run 2019-02-11 15:41:17.657 INFO 257192 --- [ restartedMain] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
URL = http://localhost:8080
The data are streaming, and random comment will be displayed every 1 second.
From:一号门
Previous:Spring Boot + JUnit 5 + Mockito
COMMENTS