【问题标题】:Implement while loop in Reactor to fetch latest Elasticsearch index在 Reactor 中实现 while 循环以获取最新的 Elasticsearch 索引
【发布时间】:2020-12-09 16:45:55
【问题描述】:

我在响应式弹性搜索中的索引名称如下:

logs-2020.08.18
logs-2020.08.17
logs-2020.08.16

它将每天创建。

我想获取最新的索引名称并使用 reactiveElasticsearchClient 或 spring 数据获取日志。 有可能吗?

我在我的 spring webflux 应用程序中尝试了以下方式:

我有以下代码 sn-p 来查找索引可用性:

public Flux<Log> getLogFromLatestIndex(String serialId) {
    Calendar cal = Calendar.getInstance();
    String currentIndex = StringUtils.EMPTY;
    boolean indexExists = false;
    while (!indexExists) {
        currentIndex = String.format("logs-%s”, format(cal.getTime(), "yyyy.MM.dd"));
        indexExists = isIndexExists(currentIndex).block();
        cal.add(Calendar.DATE, -1); // Decrease day 1 until you find index
    }

    SearchQuery searchQuery = new NativeSearchQueryBuilder()
            .withQuery(matchQuery("serialId", serialId))
            .withIndices(currentIndex)
            .build();

    return reactiveElasticsearchTemplate.find(searchQuery, Log.class);
}

public Mono<Boolean> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}

如何在不使用块的情况下获取布尔值

indexExists = isIndexExists(currentIndex).block();

显然我会得到以下错误:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

【问题讨论】:

  • 你的应用是 Spring WebFlux 应用吗?你想最后返回一个Mono&lt;String&gt;,这是最新索引的名称吗?
  • 是的,它是 spring webflux 应用程序。我需要最新的索引名称来搜索一些数据。如下所示: SearchQuery searchQuery = new NativeSearchQueryBuilder() .withQuery(matchQuery("serialId", serialId)) .withIndices(currentIndex) .build(); return reactiveElasticsearchTemplate.find(searchQuery, Log.class); // 最后我会返回flux。
  • @MartinTarjány 我已经更新了我的问题
  • cal 变量来自哪里?
  • 抱歉,现在更新

标签: java elasticsearch spring-webflux project-reactor reactive


【解决方案1】:

您可以使用Flux.generate(take/skip)(Until/While) 在反应器中执行while 循环。

注意事项:

  • Calendar 替换为 LocalDate,因为它是不可变的,更适合函数式/反应式编程。
  • isIndexExists 方法返回一个 Tuple 以获取索引名称的引用,但显然它可以根据需要替换为一些更具描述性的类
public Flux<Log> getLog(String serialId) {
    return Flux.generate(LocalDate::now, this::generateNextDate)
               .map(day -> String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))))
               .concatMap(this::isIndexExists)
               .skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
               .next() // takes first existing
               .flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
}

private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
    sink.next(currentDay);
    return currentDay.minusDays(1);
}

private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
            .map(exists -> Tuples.of(indexName, exists));
}

private Flux<Log> findLogs(String index, String serialId) {
    // your other ES query here
}

【讨论】:

  • 谢谢哥们。我认为它不会先获取 currentDay,它总是会获取 currentDay-1 作为第一个检查,对吗?
  • 哎呀,是的,你是对的。更新了答案,现在它从当天开始。
  • 它作为独立代码运行良好。但是相同的代码在应用程序中不起作用,因为它继续调用 onNext 方法。我收到“不止一次调用 onnext”错误。请协助。
  • 我只能用这么少的信息猜测。索引名称是否按预期生成?它们中的任何一个都存在于 ES 集群中吗?错误来自哪里?是来自答案中的代码还是引发此异常的其他部分?你能创建一个产生这个错误的例子吗?
  • 索引名称很好。是的,它存在于 ES 集群中。错误来自我们编写的代码,而不是来自任何地方。由于 onNext 一直在调用,因此我收到“连接在响应之前过早关闭”。我的代码与我们这里的代码完全相同。
【解决方案2】:

application.yml

spring.data.elasticsearch.client.reactive.endpoints: ip1:9200,ip2:9200,ip3:9200
spring.data.elasticsearch.rest.uris: ip1:9200,ip2:9200,ip3:9200

配置.java

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {

    @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
    private String elasticSearchEndpoint;

    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo(elasticSearchEndpoint.split(","))
            .withWebClientConfigurer(webClient -> {
                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                        .codecs(configurer -> configurer.defaultCodecs()
                                .maxInMemorySize(-1))
                        .build();
                return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
            })
            .build();
    return ReactiveRestClients.create(clientConfiguration);

    }
}

Controller.java

@GetMapping("/getLog/{serialId}")
public Flux<Log> getLog(@PathVariable String serialId) {
    return loggerService.getLog(serialId);
}

剩下的都是你的代码。我只是在地图内打印索引名称。虽然我在弹性搜索中有索引logs-2020.08.21,但它一直在打印logs-2020.08.20、logs-2020.08.19、logs-2020.08.18等索引,最终会引发错误。

注意:当我尝试在 application.yml 中使用单个 ip 时,我得到了同样的错误。

public Flux<Log> getLog(String serialId) {
    return Flux.generate(LocalDate::now, this::generateNextDate)
           .map(day -> {
                System.out.println(String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))));
                return String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd")));
            })               
           .flatMap(this::isIndexExists)
           .skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
           .next() // takes first existing
           .flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
}

private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
    sink.next(currentDay);
    return currentDay.minusDays(1);
}

private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
        .map(exists -> Tuples.of(indexName, exists));
}

private Flux<Log> findLogs(String index, String serialId) {
    // your other ES query here
}

【讨论】:

    猜你喜欢
    • 2012-09-06
    • 1970-01-01
    • 1970-01-01
    • 2017-03-03
    • 2015-12-11
    • 2021-10-19
    • 1970-01-01
    • 1970-01-01
    • 2013-04-10
    相关资源
    最近更新 更多