【问题标题】:Connecting from Spark to ElasticSearch using Hadoop not working使用 Hadoop 从 Spark 连接到 ElasticSearch 不起作用
【发布时间】:2016-12-10 19:50:10
【问题描述】:

我在从我的 Java 代码连接到本地运行的 ElasticSearch 节点时遇到问题,该代码作为提交给 Spark 的作业运行(在本地运行)。但是,当我不使用 Spark 时,连接没有问题。还运行 Python 作业并将其提交给 spark 工作正常。

我知道对于 Java,我需要通过端口 9300 而不是 9200(HTTP 端口)进行连接。尽管如此,我总是得到同样的例外,阅读或写作没有区别:

16/08/04 16:51:55 错误 NetworkClient: 节点 [服务器 localhost 未能以有效的 HTTP 响应响应] 失败 (localhost:9300);没有其他节点了 - 正在中止... 线程“主”org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException 中的异常:连接错误(检查网络和/或代理设置)- 所有节点都失败;试过 [[localhost:9300]] 在 org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:102) 在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:282) 在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:266) 在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:270) 在 org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:108) 在 org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:90) 在 org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:61) 在 org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:434) 在 org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:415) 在 org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) 在 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 在 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 在 scala.Option.getOrElse(Option.scala:120) 在 org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 在 org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.take(RDD.scala:1302) 在 org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.first(RDD.scala:1341) 在 org.apache.spark.api.java.JavaPairRDD.first(JavaPairRDD.scala:211) 在 com.dd.mediaforce.spark.most_popular.ExecutorMostPopular.main(ExecutorMostPopular.java:564) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我们在多个节点上运行 Spark 和 ElasticSearch。 Python 代码在这里运行良好,但尝试使用这种 ES 设置的 Java 代码也无助于解决问题。

我正在使用从 Java 连接的代码:

    SparkConf _sparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("Test");
    JavaSparkContext jsc = new JavaSparkContext(_sparkConf);
    Configuration conf = new Configuration();
    conf.set("cluster.name", "our_clustername");
    conf.set("es.nodes", "localhost");
    conf.setInt("es.port", 9300);
    conf.set("es.resource", index_and_type);
    JavaPairRDD readRdd = jsc.newAPIHadoopRDD(conf, org.elasticsearch.hadoop.mr.EsInputFormat.class, org.apache.hadoop.io.NullWritable.class, org.elasticsearch.hadoop.mr.LinkedMapWritable.class);
    System.out.println(readRdd.first());
    jsc.stop();

以下使用 TransportClient(并且没有 Spark)的 Java 代码如上所述连接到 ES 没有问题,写入和读取都可以正常工作:

    Client client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

    ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
    for (ObjectCursor<IndexMetaData> value : indices.values()) {
        log.info("Index: " + value.index + " : " + value.toString());
    }

    GetResponse response = client.prepareGet("index_name", "type_name", "1").get();
    log.info(response.getIndex() + " : " + response.getId() + " : " + response.isExists());

    String field_id = "6";
    IndexRequest indexRequest = new IndexRequest("index_name", "type", "2")
        .source(jsonBuilder()
                .startObject()
                .prettyPrint()
                .field("field_id", field_id)
                .field("another_field", "value")
                .field("integer_field", 100)
                .endObject());

    UpdateRequest updateRequest = new UpdateRequest("index_name", "type_name", article_id)
        .doc(jsonBuilder()
                .startObject()
                .prettyPrint()
                .field("field_id", field_id)
                .field("another_field", "value")
                .field("integer_field", 100)
                .endObject())
                .upsert(indexRequest);

    UpdateResponse responseUpdate = client.update(updateRequest).get();
    log.info(responseUpdate.getIndex() + " : " + responseUpdate.getGetResult() + " : " + responseUpdate.getType());
    client.close();

欢迎提出任何建议,因为我已经被困在这里好几天了,没有任何进一步的印象。我显然用谷歌搜索了这个问题并在 StackOverflow 上进行了搜索,但到目前为止我还没有找到我的问题的答案。

为了完整起见,一些 Python 代码也可以很好地使用 Spark 读写 ES。

conf = SparkConf()
conf = conf.setAppName('Test')
sc = SparkContext(conf=conf)

#Omitting some of the code in creating some_rdd on Spark: 

index_and_type = index_name + '/type_name'
groovy_script = "if (ctx._source.%s) { ctx._source.%s+=value } else { ctx._source.%s=value }" % (field, field, field)

es_db_connection_dictionary = {
    "es.nodes": db_hosts,
    "es.port": db_port,
    "es.resource": index_and_type,
    "es.write.operation": "upsert",
    "es.mapping.id": "field_id",
    "es.update.script": groovy_script,
    "es.update.script.params": "value:%s" % integer_field,
    "es.http.timeout": "10s"
}


es_input = views_tuple_rdd.map(lambda item: (item[0],
        {
            'field_id': item[0],
            "integer_field": item[1],
            "another_field": client_name,
        }))

es_input.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_db_connection_dictionary)

【问题讨论】:

  • 通常如果你使用elasticsearch-spark连接器,你不需要使用端口9300,默认是9200。它的行为与常规的elasticsearch API不同
  • 我也试过了,但是当我这样做时,我得到了这个异常:Exception in thread "main" java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1967) at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:99) at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:61) at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:434) 它似乎也找不到任何节点。
  • 感谢@eliasah,您确实为我指明了正确的方向。我使用的是旧版本的 elasticsearch-hadoop 库。它的 2.0 版实际上与 ES 2.x 不兼容。为此,我需要 2.2 或更高版本,尽管这留下了一些不太难解决的库版本冲突。所以,是的,通过 Spark 连接到 ES 确实需要端口 9200,而不是 9300。这仅适用于本机 Java 传输客户端。
  • 我已经添加了答案。如果你想关闭问题,你可以接受它。
  • 虽然这个问题有一个公认的答案,但我也遇到了类似的问题。我的具体问题与设置 es.nodes.wan.only=true (默认为 false)有关。

标签: java hadoop elasticsearch apache-spark


【解决方案1】:

通常,如果您使用的是 elasticsearch-spark 连接器,则如果默认端口为 9200,则不需要使用端口 9300。它的行为与常规的 elasticsearch API 不同。

而且您似乎还使用了与 elasticsearch 不兼容的连接器版本。这是一个常见的错误,因为大多数情况下它们主要在 2.x 中。

我相信弹性搜索 5.x 不会出现这种情况,他们已将所有其他弹性产品版本都与之对齐。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-10-16
  • 1970-01-01
  • 1970-01-01
  • 2019-06-02
  • 1970-01-01
  • 2016-12-09
  • 2016-12-10
相关资源
最近更新 更多