【问题标题】:How Do I Generate Event Stream Every Time Variable Is Changed?每次更改变量时如何生成事件流?
【发布时间】:2020-04-22 14:37:22
【问题描述】:

我想在每次整数变量改变时发送更新

@PutMapping
public void update() {
    integer = random.nextInt();
}

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> eventFlux() {
    Flux<Integer> eventFlux = Flux.fromStream(
            Stream.generate(() -> integer)
    );
    return Flux.zip(eventFlux, onUpdate()).map(Tuple2::getT1);
}

private Flux<Long> onUpdate() {
    return Flux.interval(Duration.ofSeconds(1));
}

【问题讨论】:

  • 那么,您的问题是什么?你有什么错误吗?请清楚地解释您的查询!
  • 就目前的代码而言,它每秒都在发送事件,我想知道如何仅在变量值更改时发送事件。谢谢。
  • 如果对你有帮助请点此链接stackoverflow.com/questions/55923326/…

标签: java spring-boot stream reactive-programming spring-webflux


【解决方案1】:

您可以使用FluxProcessor 作为:

  • 上游Subscriber;您可以使用它来添加数字:
    @PutMapping("/{number}")
    public void update(@PathVariable Integer number) {
        processor.onNext(number);
    }
  • 下游PublisherFlux);哪些客户可以订阅接收添加的号码:
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Integer> eventFlux() {
        return processor;
    }

以下是一个完整的工作示例:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;

import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;

@SpringBootApplication
@EnableWebFlux
@RestController
public class EventStreamApp implements WebFluxConfigurer {

    public static void main(String[] args) {
        SpringApplication.run(EventStreamApp.class);
    }

    private FluxProcessor<Integer, Integer> processor = DirectProcessor.create();

    @PutMapping("/{number}")
    public void update(@PathVariable Integer number) {
        processor.onNext(number);
    }

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Integer> eventFlux() {
        return processor;
    }
}

Complete code on GitHub

希望这会有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-01-27
    • 1970-01-01
    • 2010-11-30
    • 2021-08-20
    • 2016-12-21
    • 1970-01-01
    • 2015-01-01
    • 2023-01-04
    相关资源
    最近更新 更多