使用Spring WebFlux进行实时事件流

  1. 介绍

在本文中,我们将探索Spring的新WebFlux框架,并使用它来构建实时事件流API。 另外,我们将通过Spring Webclient使用该API。

Webflux框架基于Project Reactor,它是JVM的非阻塞反应式编程库。 如果您想了解有关Reactive Spring项目的更多信息,可以访问Reactive Stack上的Web。

2. Maven依赖

让我们将spring-boot-starter-webflux依赖项添加到我们的Spring Boot项目中。 我们还可以为JUnit测试添加反应堆测试依赖项:

     org.springframework.boot   spring-boot-starter-webflux     org.springframework.boot   spring-boot-starter-test   test     org.projectlombok   lombok   true     io.projectreactor   reactor-test   test    

可以从Maven Central下载spring-boot-starter-webflux依赖项。

3.创建模型

现在,让我们创建一些模型类来表示我们的流事件。 我们可以使用项目Lombok中的注释来生成构造器,获取器和设置器:

 包com.baeldung.webflux.model;导入lombok.AllArgsConstructor; 
导入lombok.Data;
导入lombok.NoArgsConstructor; @AllArgsConstructor
@数据
@NoArgsConstructor
公共类Weather {private String temprature;
专用弦湿度;
私有String windSpeed;} @ AllArgsConstructor
@ NoArgsConstructor @ Data
公共类WeatherEvent {
私人天气;
私有LocalDateTime日期;
}

4.创建助焊剂源

为了生成连续的事件流,让我们创建一个WeatherEvent数据流:

  @Servicepublic类WeatherService {public Flux  streamWeather(){ 
Flux 间隔= Flux.interval(Duration.ofSeconds(1));
Flux 事件=
助焊剂
.fromStream(Stream.generate(
()->新的WeatherEvent(
新天气(getTemprature(),
getHumidity(),
getWindSpeed()),
LocalDateTime
。现在()))); 返回Flux.zip(事件,间隔,(键,值)->键);
}私有字符串getWindSpeed(){
String []风速=“ 100 km / h,101 km / h,
102 km / h,103 km / h,
104 km / h“ .split(”,“); return windSpeeds [new Random()。nextInt(windSpeeds.length)];
}私有字符串getHumidity(){
String []湿度=“ 40%,41%,
42%,42%,44%,45%,46%“。split(”,“);返回湿度[new Random()。nextInt(humidity.length)];}私有字符串getTemprature(){
String []温度=“ 19C,19.5C,20C,20.5C,
21C,21.5 C,22C,22.5C,23C,23.5C,24 C“
。分裂(”,”); 返回温度[新的Random()
.nextInt(tempratures.length)];}
}

时间间隔表示流量是无限的,并且每1秒从时钟发出规则的滴答声。

这些事件代表无限的天气事件通量,每个天气事件都会包装天气数据和该数据的时间戳。

最后,zip方法将两个源组合在一起,它等待每个源发出一个元素,然后使用组合函数将这些元素组合在一起。

5,创建端点

Webflux框架采用了新的功能模型来设计端点。

5.1。创建一个请求处理程序

我们需要某种可以将我们的控制器/路由器耦合到weatherService的请求处理机制。 因此,让我们创建一个请求处理程序:

  @零件 
公共类RequestHandler {
@Autowired
私有WeatherService weatherService; 公共Mono
streamWeather(ServerRequest request){return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(weatherService
.streamWeather(),WeatherEvent.class);}
}

ServerRequest和ServerResponse接口提供对HTTP消息的访问。 TEXT_EVENT_STREAM的内容类型使服务器可以将数据推送到客户端。

5.2。创建一个请求路由器

最后,我们需要创建一个控制器/路由器,它可以接受对天气流的请求,并将这些请求转发到适当的请求处理程序。

Spring Webflux还支持常规的spring-mvc样式控制器,但在本示例中,我们将利用功能样式路由。因此,让我们创建一个反应式样式端点:

  @组态 
公共类RequestRouter {@Bean
RouterFunction route(RequestHandler requestHandler){
返回RouterFunctions
.route(RequestPredicates
.GET(“ / weatherstream”),
requestHandler :: streamWeather);
}
}

RequestRouter类充当Spring功能性Web框架的入口点。

6,使用Curl客户端

启动Spring Boot应用程序并使用命令

  卷曲 http:// localhost:8080 / weatherevents 

请求天气事件流

7.使用Spring WebClient

让我们创建另一个具有相同maven依赖项的Spring Boot项目,我们将使用Spring WebClient消耗天气流.WebClient Interface是一个完全反应性强,功能更强大的客户端,它比传统的RestTemplate更强大:

 包com.baeldung.webflux.client;导入org.slf4j.Logger; 
导入org.slf4j.LoggerFactory;
导入org.springframework.boot.ApplicationRunner;
导入org.springframework.context.annotation.Bean;
导入org.springframework.context.annotation.Configuration;
导入org.springframework.web.reactive.function.client.WebClient; @Configuration
公共类WeatherEventWebClient {记录器logger = LoggerFactory.getLogger(WeatherEventWebClient.class); @ Bean
WebClient client(){
返回WebClient.create(“ http:// localhost:8080 /”);
}@豆角,扁豆
ApplicationRunnerRunner(WebClient webClient){
返回参数-> webClient.get()
.uri(“ weatherstream”)
.retrieve()
.bodyToFlux(WeatherEvent.class)
.subscribe(数据-> logger.info(data.toString()));
}
}

与RestTeamplate一样,WebClient还提供了从响应主体中提取模型数据的功能。 在这里,bodyToFlux函数从响应中提取WeatherEvent通量,subscribe函数触发流并将通量元素传递给使用者函数,在这种情况下为logger.info。

AplicationRunner bean将开始侦听来自端口8080上部署的应用程序的天气事件。

在此示例中,客户端应用程序部署在端口8081上,并从部署在端口8080的服务器应用程序请求数据。

在客户端记录天气流

2018-06-13 20:03:34.142 INFO 10864 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:8081 2018-06-13 20:03:34.143 INFO 10864 --- [ main] osbweb.embedded.netty.NettyWebServer : Netty started on port(s): 8081 2018-06-13 20:03:34.148 INFO 10864 --- [ main] .SpringWebFluxStreamingClientApplication : Started SpringWebFluxStreamingClientApplication in 3.463 seconds (JVM running for 4.257) 2018-06-13 20:03:35.473 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=22 C, humidity=45 %, windSpeed=101 km/h), date=2018-06-13T20:03:34.285) 2018-06-13 20:03:36.289 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=19.5 C, humidity=42 %, windSpeed=102 km/h), date=2018-06-13T20:03:35.354) 2018-06-13 20:03:37.288 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=20 C, humidity=40 %, windSpeed=104 km/h), date=2018-06-13T20:03:36.286) 2018-06-13 20:03:38.288 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=21 C, humidity=41 %, windSpeed=102 km/h), date=2018-06-13T20:03:37.285) 2018-06-13 20:03:39.289 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=22.5 C, humidity=42 %, windSpeed=103 km/h), date=2018-06-13T20:03:38.285) 2018-06-13 20:03:40.287 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=20.5 C, humidity=45 %, windSpeed=100 km/h), date=2018-06-13T20:03:39.285) 2018-06-13 20:03:41.288 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=23.5 C, humidity=42 %, windSpeed=101 km/h), date=2018-06-13T20:03:40.285) 2018-06-13 20:03:42.289 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=22.5 C, humidity=46 %, windSpeed=103 km/h), date=2018-06-13T20:03:41.285) 2018-06-13 20:03:43.288 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=21 C, humidity=42 %, windSpeed=104 km/h), date=2018-06-13T20:03:42.286) 2018-06-13 20:03:44.288 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=19 C, humidity=40 %, windSpeed=104 km/h), date=2018-06-13T20:03:43.285) 2018-06-13 20:03:45.288 INFO 10864 --- [ctor-http-nio-1] cbwclient.WeatherEventWebClient : WeatherEvent(weather=Weather(temprature=23 C, humidity=40 %, windSpeed=103 km/h), date=2018-06-13T20:03:44.285)

8,结论

我们创建了一个Spring Webflux应用程序,该应用程序可以传输实时天气数据。 这个示例应用程序使用Project Reactor的各种功能来生成非阻塞数据流。本文还介绍了Spring的函数式编程模型来生成和使用Web服务。

该示例的代码可以在我的GitHub存储库中找到。