【问题标题】:Spring WebFlux: WebSocketSession.send() doesn't send messagesSpring WebFlux:WebSocketSession.send() 不发送消息
【发布时间】:2021-11-16 18:41:54
【问题描述】:

你好 我正在尝试使用 Spring WebFlux 创建 webscoket 端点。我希望这个端点返回一些事件。
为此,我创建了事件的 ConnectableFlux,并在 handle(..) 方法中将其映射到 Flux。但是在我把它交给 WebSocketSession 之后,什么都没有发生——websocket 客户端没有收到任何东西。但与此同时,您可以在下面我的 handle(..) 方法中看到的 println(event.toString()) 实际上将信息打印到控制台。
你能告诉我我错过了什么吗?

public class EventWebsocketHandler implements WebSocketHandler {

    //  constructors and etc.

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        ObjectMapper objectMapper = new ObjectMapper();

        Flux<WebSocketMessage> messages = eventService.events()
                .flatMap(event -> {
                    try {
                        System.out.println(event.toString());
                        return Mono.just(objectMapper.writeValueAsString(event));
                    } catch (JsonProcessingException e) {
                        return Mono.error(e);
                    }
                })
                .map(session::textMessage);

        return session.send(messages);
    }

@Service
public class EventService {

    List<EventDto> events = new ArrayList<>();

    private final Flux<EventDto> eventFlux = Flux.<EventDto>create(fluxSink -> {
        while (true) {
            if (!events.isEmpty()) {
                fluxSink.next(events.get(0));
                events.remove(0);
            }
        }
    })
            .publish()
            .autoConnect();

    public void push(EventDto event) {
        events.add(event);
    }

    public Flux<EventDto> events() {
        return eventFlux;
    }

}

我的项目中有另一个 WebSocketHandler,它工作正常,这意味着配置一切正常:


public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<Long> source = Flux.interval(Duration.ofMillis(1000 * 3));
        return session.send(source.map(l -> session.textMessage(String.valueOf(l))));
    }
}

【问题讨论】:

    标签: java spring-boot websocket spring-webflux spring-websocket


    【解决方案1】:

    这个

        private final Flux<EventDto> eventFlux = Flux.<EventDto>create(fluxSink -> {
        while (true) {
            if (!events.isEmpty()) {
                fluxSink.next(events.get(0));
                events.remove(0);
            }
        }
    })
            .publish()
            .autoConnect();
    

    必须用这个替换

    private final Sinks.Many<EventDto> processor = Sinks.many().multicast().onBackpressureBuffer();
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-06-12
      • 1970-01-01
      • 1970-01-01
      • 2012-01-01
      • 2017-01-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多