¿Cómo crear un Flujo de Reactor de Primavera a partir del flujo de integración Http?

Tengo una pregunta muy similar a esta ¿Cómo crear un flujo de Spring Reactor a partir de una cola ActiveMQ?

Con una diferencia, los mensajes provienen del punto final de HTTP en lugar de la cola JMS. El problema es que el Canal de mensajes no se llena por algún motivo o Flux.from () no lo ha detectado. ¿Las entradas del registro muestran que GenericMessage se crea a partir del flujo de integración de HTTP con una carga útil como variable de ruta pero no se pone en cola / se publica en un canal? Intenté .channel(MessageChannels.queue()) y .channel(MessageChannels.publishSubscribe()) no hace ninguna diferencia, el flujo de eventos está vacío. Aquí está el código:

 @Bean public Publisher<Message> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter("/eventmessage/{id}") .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression("#pathVariables.id") ) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages(@PathVariable String id){ return Flux.from(httpReactiveSource()) .map(Message::getPayload); } 

ACTUALIZACIÓN1:

construir.gradle

 buildscript { ext { springBootVersion = '2.0.0.M2' } repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { compile('org.springframework.boot:spring-boot-starter-freemarker') compile('org.springframework.boot:spring-boot-starter-integration') compile('org.springframework.boot:spring-boot-starter-web') compile('org.springframework.boot:spring-boot-starter-webflux') compile('org.springframework.integration:spring-integration-http') testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('io.projectreactor:reactor-test') } 

Actualización2

Funciona cuando @SpringBootApplication y @RestController se definen en un archivo, pero deja de funcionar cuando @SpringBootApplication y @RestController están en archivos separados.

TestApp.java

 package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class TestApp { public static void main(String[] args) { SpringApplication.run(TestApp.class, args); } } 

TestController.java

 package com.example.controller; import org.springframework.context.annotation.Bean; import org.reactivestreams.Publisher; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.channel.MessageChannels; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.http.dsl.Http; import org.springframework.messaging.Message; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping; import reactor.core.publisher.Flux; @RestController public class TestController { @Bean public Publisher<Message> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter("/message/{id}") .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression("#pathVariables.id") ) .channel(MessageChannels.queue()) .toReactivePublisher(); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages() { return Flux.from(httpReactiveSource()) .map(Message::getPayload); } } 

Esto me funciona bien:

 @SpringBootApplication @RestController public class SpringIntegrationSseDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringIntegrationSseDemoApplication.class, args); } @Bean public Publisher> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter("/message/{id}") .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression("#pathVariables.id") ) .channel(MessageChannels.queue()) .toReactivePublisher(); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux eventMessages() { return Flux.from(httpReactiveSource()) .map(Message::getPayload); } } 

Tengo estas dependencias en POM:

  org.springframework.boot spring-boot-starter-parent 2.0.0.BUILD-SNAPSHOT      org.springframework.boot spring-boot-starter-integration   org.springframework.boot spring-boot-starter-web   org.springframework.boot spring-boot-starter-webflux   org.springframework.integration spring-integration-http   org.springframework.boot spring-boot-starter-test test      org.springframework.boot spring-boot-maven-plugin    

Ejecuto la aplicación y tengo dos terminales:

 curl http://localhost:8080/events 

para escuchar las ESE.

Y en el segundo realizo esto:

 curl -X POST http://localhost:8080/message/foo curl -X POST http://localhost:8080/message/bar curl -X POST http://localhost:8080/message/666 

Entonces, el primer terminal responde como:

 data:foo data:bar data:666 

Tenga en cuenta que no necesitamos la dependencia de spring-boot-starter-webflux . El Flux a SSE funciona bien con MVC regular en el contenedor de Servlets.

Spring Integration también será compatible con WebFlux pronto: https://jira.spring.io/browse/INT-4300 . Entonces, podrás configurar allí algo como:

  IntegrationFlows .from(Http.inboundReactiveGateway("/sse") .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) 

Y confíe totalmente solo en WebFlux sin ninguna dependencia de Servlet Container.