【问题标题】:Cannot Write Data to ElasticSearch with AbstractReactiveElasticsearchConfiguration无法使用 AbstractReactiveElasticsearchConfiguration 将数据写入 ElasticSearch
【发布时间】:2019-11-25 18:00:00
【问题描述】:

我正在尝试将数据写入我的本地 Elasticsearch Docker 容器 (7.4.2),为简单起见,我使用了 Spring 提供的 AbstractReactiveElasticsearchConfiguration 也覆盖了 entityMapper 函数。我构建了扩展 ReactiveElasticsearchRepository 的存储库 最后,我使用我的自动装配存储库来 saveAll() 我包含数据的元素集合。但是 Elasticsearch 不写入任何数据。我还有一个 REST 控制器,它开始我的整个过程,基本上什么都不返回,DeferredResult>

来自我的 ApiDelegateImpl 的 REST 方法

  @Override
  public DeferredResult<ResponseEntity<Void>> openUsageExporterStartPost() {

    final DeferredResult<ResponseEntity<Void>> deferredResult = new DeferredResult<>();

    ForkJoinPool.commonPool().execute(() -> {

          try {
            openUsageExporterAdapter.startExport();
            deferredResult.setResult(ResponseEntity.accepted().build());

          } catch (Exception e) {
            deferredResult.setErrorResult(e);
          }
        }
    );

    return deferredResult;
  }

我的 Elasticsearch 配置

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {

  @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
  private String elasticSearchEndpoint;

  @Bean
  @Override
  public EntityMapper entityMapper() {

    final ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(), new DefaultConversionService());
    entityMapper.setConversions(elasticsearchCustomConversions());
    return entityMapper;
  }

  @Override
  public ReactiveElasticsearchClient reactiveElasticsearchClient() {
    ClientConfiguration clientConfiguration = ClientConfiguration.builder()
        .connectedTo(elasticSearchEndpoint)
        .build();

    return ReactiveRestClients.create(clientConfiguration);
  }
}

我的仓库

public interface OpenUsageRepository extends ReactiveElasticsearchRepository<OpenUsage, Long> {

}

我的 DTO

@Data
@Document(indexName = "open_usages", type = "open_usages")
@TypeAlias("OpenUsage")
public class OpenUsage {

  @Field(name = "id")
  @Id
  private Long id;

  ......
}

我的适配器实现

  @Autowired
  private final OpenUsageRepository openUsageRepository;

  ...transform entity into OpenUsage...

  public void doSomething(final List<OpenUsage> openUsages){
   openUsageRepository.saveAll(openUsages)
  }

最后是我的 IT 测试

@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestPropertySource(locations = {"classpath:application-it.properties"})
@ContextConfiguration(initializers = OpenUsageExporterApplicationIT.Initializer.class)
class OpenUsageExporterApplicationIT {


  @LocalServerPort
  private int port;

  private final static String STARTCALL = "http://localhost:%s/open-usage-exporter/start/";

  @Container
  private static ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.4").withExposedPorts(9200);

  static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    public void initialize(final ConfigurableApplicationContext configurableApplicationContext) {

      final List<String> pairs = new ArrayList<>();

      pairs.add("spring.data.elasticsearch.client.reactive.endpoints=" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
      pairs.add("spring.elasticsearch.rest.uris=http://" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
      TestPropertyValues.of(pairs).applyTo(configurableApplicationContext);
    }
  }

  @Test
  void testExportToES() throws IOException, InterruptedException {

    final List<OpenUsageEntity> openUsageEntities = dbPreparator.insertTestData();
    assertTrue(openUsageEntities.size() > 0);

    final String result = executeRestCall(STARTCALL);

    // Awaitility here tells me nothing is in ElasticSearch :(

  }

  private String executeRestCall(final String urlTemplate) throws IOException {

    final String url = String.format(urlTemplate, port);

    final HttpUriRequest request = new HttpPost(url);
    final HttpResponse response = HttpClientBuilder.create().build().execute(request);

    // Get the result.
    return EntityUtils.toString(response.getEntity());
  }
}

【问题讨论】:

    标签: spring-boot elasticsearch junit5 java-11 spring-reactive


    【解决方案1】:
    public void doSomething(final List<OpenUsage> openUsages){
     openUsageRepository.saveAll(openUsages)
    }
    

    这在末尾缺少分号,所以它不应该编译。

    但我认为这只是一个错字,实际上有一个分号。

    无论如何,saveAll() 返回一个Flux。这个Flux 只是一个保存数据的方法,直到有人(或类似blockLast())调用subscribe() 之前它不会“执行”。您只需将 Flux 扔掉即可,因此不会执行保存。

    如何解决这个问题?一种选择是添加.blockLast() call:

    openUsageRepository.saveAll(openUsages).blockLast();
    

    但这将以阻塞方式保存数据,有效地破坏反应性。

    另一种选择是,如果您调用 saveAll() 的代码支持反应性只是返回 saveAll() 返回的 Flux,但是,因为您的 doSomething() 具有 @987654334 @返回类型,这个值得怀疑。

    无论如何,您的startExport()doSomething() 的连接方式尚未可知。但看起来您的“调用代码”没有使用任何反应性概念,因此真正的解决方案是重写调用代码以使用反应性(在其上获取 Publishersubscribe(),然后等到数据到达),或恢复使用阻塞 API(ElasticsearchRepository 而不是 ReactiveElasticsearchRepository)。

    【讨论】:

    • 是的,我意识到需要订阅频道的人,我最初的想法是 elasticsearch 会为我执行 block(),但我是个傻瓜。然而,我的想法得出结论,如果我应该以反应为目标,我需要返回响应通量。无论如何感谢您的回复
    猜你喜欢
    • 1970-01-01
    • 2017-05-14
    • 1970-01-01
    • 1970-01-01
    • 2017-10-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-13
    相关资源
    最近更新 更多