【发布时间】:2021-11-16 18:41:54
【问题描述】:
你好
我正在尝试使用 Spring WebFlux 创建 webscoket 端点。我希望这个端点返回一些事件。
为此,我创建了事件的 ConnectableFlux,并在 handle(..) 方法中将其映射到 Flux。但是在我把它交给 WebSocketSession 之后,什么都没有发生——websocket 客户端没有收到任何东西。但与此同时,您可以在下面我的 handle(..) 方法中看到的 println(event.toString()) 实际上将信息打印到控制台。
你能告诉我我错过了什么吗?
public class EventWebsocketHandler implements WebSocketHandler {
// constructors and etc.
@Override
public Mono<Void> handle(WebSocketSession session) {
ObjectMapper objectMapper = new ObjectMapper();
Flux<WebSocketMessage> messages = eventService.events()
.flatMap(event -> {
try {
System.out.println(event.toString());
return Mono.just(objectMapper.writeValueAsString(event));
} catch (JsonProcessingException e) {
return Mono.error(e);
}
})
.map(session::textMessage);
return session.send(messages);
}
@Service
public class EventService {
List<EventDto> events = new ArrayList<>();
private final Flux<EventDto> eventFlux = Flux.<EventDto>create(fluxSink -> {
while (true) {
if (!events.isEmpty()) {
fluxSink.next(events.get(0));
events.remove(0);
}
}
})
.publish()
.autoConnect();
public void push(EventDto event) {
events.add(event);
}
public Flux<EventDto> events() {
return eventFlux;
}
}
我的项目中有另一个 WebSocketHandler,它工作正常,这意味着配置一切正常:
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<Long> source = Flux.interval(Duration.ofMillis(1000 * 3));
return session.send(source.map(l -> session.textMessage(String.valueOf(l))));
}
}
【问题讨论】:
标签: java spring-boot websocket spring-webflux spring-websocket