【问题标题】:Elasticsearch pyspark connection in insecure mode不安全模式下的 Elasticsearch pyspark 连接
【发布时间】:2020-11-30 00:42:24
【问题描述】:

我的最终目标是将数据从 hdfs 插入到 elasticsearch,但我面临的问题是连接性

我可以使用以下 curl 命令连接到我的 elasticsearch 节点

curl -u username -X GET https://xx.xxx.xx.xxx:9200/_cat/indices?v' --insecure

但是当涉及到与 spark 的连接时,我无法这样做。我插入数据的命令是 df.write.mode("append").format('org.elasticsearch.spark.sql').option("es.net.http.auth.user", "username").option("es.net.http.auth.pass", "password").option("es.index.auto.create","true").option('es.nodes', 'https://xx.xxx.xx.xxx').option('es.port','9200').save('my-index/my-doctype')

我得到的错误是

org.elastisearch.hadoop.EsHadoopIllegalArgumentException:Cannot detect ES version - typical this happens if then network/Elasticsearch cluster is not accessible or when targetting a Wan/Cloud instance without the proper setting 'es.nodes.wan.only'
....
....
Caused by: org.elasticseach.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proy settings)- all nodes failed; tried [[xx.xxx.xx.xxx:9200]]
....
...

这里,pyspark 相当于 curl --insecure

谢谢

【问题讨论】:

    标签: apache-spark elasticsearch curl pyspark elasticsearch-hadoop


    【解决方案1】:

    经过多次尝试和不同的配置选项。我找到了一种不安全地连接在 https 上运行的 elastisearch 的方法

            dfToEs.write.mode("append").format('org.elasticsearch.spark.sql') \
            .option("es.net.http.auth.user", username) \
            .option("es.net.http.auth.pass", password) \
            .option("es.net.ssl", "true") \
            .option("es.net.ssl.cert.allow.self.signed", "true") \
            .option("mergeSchema", "true") \
            .option('es.index.auto.create', 'true') \
            .option('es.nodes', 'https://{}'.format(es_ip)) \
            .option('es.port', '9200') \
            .option('es.batch.write.retry.wait', '100s') \
            .save('{index}/_doc'.format(index=index))
    

    (es.net.ssl, true)
    

    我们还必须提供如下自签名证书

    (es.net.ssl.cert.allow.self.signed, true)
    

    【讨论】:

      【解决方案2】:

      你可以试试下面的 sparkConfs,

      val sparkConf = new SparkConf()
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.es.index.auto.create", "true")
      .set("spark.es.nodes", "yourESaddress")
      .set("spark.es.port", "9200")
      .set("spark.es.net.http.auth.user","")
      .set("spark.es.net.http.auth.pass", "")
      .set("spark.es.resource", indexName)
      .set("spark.es.nodes.wan.only", "true")
      

      那么你仍然面临问题,es.net.ssl = true 看看。

      如果仍然出现错误,请尝试添加以下配置,

      'es.resource' = 'ctrl_rater_resumen_lla/hb',
      'es.nodes' = 'localhost',
      'es.port' = '9200',
      'es.index.auto.create' = 'true',
      'es.index.read.missing.as.empty' = 'true',
      'es.nodes.discovery'='true',
      'es.net.ssl'='false'
      'es.nodes.client.only'='false',
      'es.nodes.wan.only' = 'true'
      'es.net.http.auth.user'='xxxxx',
      'es.net.http.auth.pass' = 'xxxxx'
      'es.nodes.discovery' = 'false'
      
      

      【讨论】:

        【解决方案3】:

        我确实检查了很多东西,最后我可以在 AWS ElasticSearch 服务 (ES) 中编写,但使用 scala/spark。

        1. 在 VPC 中,创建安全组以使用端口 443 从 EMR 访问 ES(ES 中的入站规则到 EMR 的 SG,以及 EMR 中的入站规则到同一端口)
        2. 使用 telnet 命令检查来自 EMR 主节点的连接性
            telnet xyz.eu-west-1.es.amazonaws.com 443
        
        1. 一旦检查以上,使用 curl 命令检查应用程序级别

          curl https://xyz.eu-west-1.es.amazonaws.com:443/domainname/_search?pretty=true&?q=*```
          
        2. 之后,转到代码,在我的例子中,我使用 spark-shell 进行了测试,但服务器配置文件包含在 start 中,如下所示:

           spark-shell --jars elasticsearch-spark-20_2.11-7.1.1.jar --conf spark.es.nodes="xyz.eu-west-1.es.amazonaws.com" --conf spark.es.port=443 --conf spark.es.nodes.wan.only=true --conf spark.es.nodes.discovery="false" --conf spark.es.index.auto.create="true" --conf spark.es.resource="domain/doc" --conf spark.es.scheme="https"
          
          1. 终于要写代码了:
          import java.util.Date
          import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
          import org.elasticsearch.spark._
          import org.elasticsearch.spark.sql._
          val dateformat =  new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
          val currentdate = dateformat.format(new Date)
          val colorsDF = spark.read.json("multilinecolors.json")
          val mcolors = colorsDF.withColumn("Date",lit(currentdate))
          mcolors.write.mode("append").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "").option("es.net.http.auth.pass", "").option("es.net.ssl", "true").option("es.net.ssl.cert.allow.self.signed", "true").option("mergeSchema", "true").option("es.index.auto.create", "true").option("es.nodes","https://xyz.eu-west-1.es.amazonaws.com").option("es.port", "443").option("es.batch.write.retry.wait", "100").save("domainname/_doc")```
          
          
          
          
          
          
          

        【讨论】:

          猜你喜欢
          • 2021-07-04
          • 2023-03-05
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2016-08-27
          • 1970-01-01
          相关资源
          最近更新 更多