【问题标题】:Scala: To fetch more than 10000 documents/messages from ElasticsearchScala:从 Elasticsearch 获取超过 10000 个文档/消息
【发布时间】:2019-11-22 01:15:51
【问题描述】:

我使用的是 Scala 2.12,而我们使用的是 Elasticsearch 5.2.2。 我的要求是仅根据条件进行获取/搜索。搜索将一次性返回 10,000 多个文档或消息。所以我不能使用常规搜索。 数据(每个文档/消息)是一个复杂的 JSON,我可以稍后对其进行解析。 因此,我需要获取所有这些消息并将其存储在 Json 或任何内容的单个列表中。 我对 Scala 不是很流利。我可以在 Scala 中使用 Elastic4s 进行搜索。我看到它有滚动和扫描选项,但没有找到任何完整的工作示例。所以寻求一些帮助。

我看到一些示例代码如下,但需要更多帮助来获取所有内容并将所有内容放在上面:

client.execute {
   search in "index" / "type" query <yourquery> scroll "1m"
}

client.execute {
   search scroll <id>
}

但是如何获取滚动 id 以及如何继续获取所有数据?


更新:

上面提到了scala版本和ES版本。

我正在使用以下示例:

SBT:

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-core" % "7.0.2"

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "5.5.10"

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % "6.5.1"

libraryDependencies += "org.elasticsearch" % "elasticsearch" % "5.6.0"

代码:

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.Response
import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.HttpClient

import com.sksamuel.elastic4s.http.ElasticDsl._

val client = HttpClient(ElasticsearchClientUri("host", 9200))

val resp1 = client.execute {
     search("index")
       .matchQuery("key", "value")
       .scroll("1m")
       .limit(500)
   }.await.result

val resp2 = client.execute {
      searchScroll(resp1.scrollId.get).keepAlive(1.minute)
    }.await

我认为我没有为 elastic4s 模块使用正确的版本。

问题:

  • import com.sksamuel.elastic4s.HttpClient:无法识别 HttpClient 类。当我尝试初始化“客户端”变量时,它显示错误 HttpClient not found。

  • 接下来,在我的 resp2 中,当我试图获取“scrollId”时,它无法识别。如何从 resp1 中获取 scrollId?

基本上,这里缺少什么?

更新 2:

我根据 github 上的示例(示例)更改了以下依赖项的版本

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "6.3.3"

代码:

val client = ElasticClient(ElasticProperties("http://host:9200"))

现在,我收到以下错误;

错误:

Symbol 'type <none>.term.BuildableTermsQuery' is missing from the classpath.
[error] This symbol is required by 'method com.sksamuel.elastic4s.http.search.SearchHandlers.BuildableTermsNoOp'.
[error] Make sure that type BuildableTermsQuery is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'SearchHandlers.class' was compiled against an incompatible version of <none>.term.
[error]     val client = ElasticClient(ElasticProperties("host:9200"))
[error]                                                 ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

【问题讨论】:

  • 你看过 elastic4s 中的scroll tests 吗?另一种解决方案是使用search_after,它不需要使用特定的端点并保留滚动ID。
  • 我正在更新我的问题,提供更多细节。

标签: scala elasticsearch elastic4s


【解决方案1】:

我个人会将 Akka Streams 用于这种类型的工作流,因为它使并行处理和工作流构建更容易。

参考文档可能有点密集,但基本思想是您从具有一个输出的 Source 开始...通过任意数量的 Flow 推动它...然后收集到 Sink 中。

Elastic4s 支持(几乎)本机使用此功能,因此您不必直接处理滚动等。

现在,我不知道你想对你的记录做什么。但是为您的数据创建一个 Source 是这样的:

import akka.stream.scaladsl.{GraphDSL, Sink, Source}

class MyIndexer(indexName:String) {
  def getIndexSource(client:ElasticClient)(implicit actorRefFactory: ActorRefFactory) = Source.fromPublisher(
    client.publisher(search(indexName) (your-query-here) sortByFieldAsc "originalSource.ctime" scroll "5m")
  )
}

致电MyIndexer.getIndexSource 将返回Source[SearchHit]。然后,您可以将 SearchHit 转换为您的域对象,但是您通常会处理来自 Elastic4s 的结果(在我的情况下,使用 Circe 的 generic.auto;与使用非流接口时可以使用 .to[Domainobject] 相同)。

您可能想知道 ActorRefFactory 隐含的;那就是akka ActorSystem。如果您正在使用 Play 框架之类的东西,您可以通过使用依赖注入在任何注入的类(即class MyClass @Inject() (implicit sys:ActorSystem))中请求 ActorSystem 的实例来免费获得它。如果您使用的是普通 Scala,您可以在 Main 函数中执行此操作:

  private implicit val actorSystem = ActorSystem("some-name-here")
  private implicit val mat:Materializer = ActorMaterializer.create(actorSystem)

并使用隐式参数将这些值传递到需要它们的位置。

一个关于如何使用它来获得所有结果序列的示例(可能不完全是您需要的,给出描述,但一个很好的例子)会像这样工作:

import com.sksamuel.elastic4s.circe._
import io.circe.generic.auto._

val source = indexer.getIndexSource(esclient)
val resultFuture = source
  .log("logger-name-here")
  .map(_.to[Yourdomainobject])
  .toMat(Sink.seq[Yourdomainobject])(Keep.right)
  .run()

resultFuture
  .map(resultSeq=>{ do stuff with result seq })
  .recover({
      case err:Throwable=>{handle error}
  })

现在,如果您想高效地进行处理,您需要将处理实现为 GraphStages 并将其固定到此处的流中。我一直在实现一堆扫描仪,它们可以处理数十万个对象,每个扫描仪只不过是一个 Main 函数,它设置并运行一个流来执行所有实际处理。

我倾向于将我的逻辑设计为流程图,然后将图表的每个框实现为单独的 akka GraphStage,然后将它们连接在一起并使用广播和合并等内置元素来获得良好的并行处理。

希望这有一些用处!

【讨论】:

  • 哦,我建议将所有这些 elastic4s 依赖项保持相同的版本,这应该可以防止您遇到那些烦人的类加载器错误。当我弄乱库版本时,有时我不得不做一个sbt clean 并删除我的target/scala-2.12 目录以防止它变得混乱
猜你喜欢
  • 1970-01-01
  • 2017-05-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-04
  • 1970-01-01
  • 2021-05-22
相关资源
最近更新 更多