【问题标题】:How to read input from S3 in a Spark Streaming EC2 cluster application如何在 Spark Streaming EC2 集群应用程序中从 S3 读取输入
【发布时间】:2019-03-02 20:10:17
【问题描述】:

我正在尝试让我的 Spark Streaming 应用程序从 S3 目录读取他的输入,但在使用 spark-submit 脚本启动它后我一直收到此异常:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)
    at MainClass$.main(MainClass.scala:1190)
    at MainClass.main(MainClass.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我正在按照此处http://spark.apache.org/docs/latest/ec2-scripts.html(页面底部)的建议通过此代码块设置这些变量:

val ssc = new org.apache.spark.streaming.StreamingContext(
  conf,
  Seconds(60))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))

args(2) 和 args(3) 当然是我的 AWS Access Key ID 和 Secrete Access Key。

为什么它一直说他们没有设置?

编辑:我也尝试过这种方式,但我得到了同样的例外:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")

【问题讨论】:

  • 您是否出于某种原因回避 IAM 角色?并想改用访问/密钥?
  • 不是真的,我只是按照文档上的说明进行操作(当时)。现在有了所有新的更新,对于大多数用例来说,这可能是一个过时的问题。
  • 您是否尝试过使用 s3a://.... 访问文件?

标签: amazon-ec2 amazon-s3 apache-spark


【解决方案1】:

在java中,以下是代码行。您必须在 SparkContext 中添加 AWS 凭据,而不是在 SparkSession 中。

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);

【讨论】:

    【解决方案2】:

    最新的 EMR 版本(在 4.6.0 上测试)需要以下配置:

    val sc = new SparkContext(conf)
    val hadoopConf = sc.hadoopConfiguration
    hadoopConf.set("fs.s3.impl", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem")
    hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
    hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
    

    尽管在大多数情况下,开箱即用的配置应该可以工作 - 如果您的 S3 凭据与您启动集群时使用的凭据不同。

    【讨论】:

      【解决方案3】:

      增加@nealmcb 的答案,最直接的方法是定义

      HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop 
      

      conf/spark-env.sh 中或在~/.bashrc~/.bash_profile 中导出该环境变量。

      只要您可以通过 hadoop 访问 s3,这将起作用。例如,如果你可以运行

      hadoop fs -ls s3n://path/
      

      然后hadoop就可以看到s3路径了。

      如果hadoop看不到路径,请按照How can I access S3/S3n from a local Hadoop 2.6 installation?中的建议

      【讨论】:

        【解决方案4】:

        这在 1.4.1 shell 中适用于我:

        val conf = sc.getConf
        conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
        conf.set("spark.hadoop.fs.s3.awsAccessKeyId", <your access key>)
        conf.set("spark.hadoop.fs.s3.awsSecretAccessKey", <your secret key>)
        SparkHadoopUtil.get.conf.addResource(SparkHadoopUtil.get.newConfiguration(conf))
        ...
        sqlContext.read.parquet("s3://...")
        

        【讨论】:

        • 这在技术上与@harel 的答案相同,您只需在配置树中从较高位置进行设置,而不是先进入 Hadoop 配置。但它是一样的。
        • spark shell 不一样 - 你已经在 shell 中有 sc 和 sqlContext,@harel 的回答会创建一个新的 conf 和新的 sc。
        【解决方案5】:

        奇怪。尝试在sparkContext 上执行.set。尝试在启动应用程序之前导出环境变量:

        export AWS_ACCESS_KEY_ID=<your access>
        export AWS_SECRET_ACCESS_KEY=<your secret>
        

        ^^这就是我们的做法。

        更新:根据@tribbloid 的说法,上述问题在 1.3.0 中出现,现在您必须使用 hdfs-site.xml 进行多年的工作,或者您可以做到(这在 spark-shell 中有效):

        val hadoopConf = sc.hadoopConfiguration;
        hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
        hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
        hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
        

        【讨论】:

        • 您的意思是在使用 spark-submit 之前从 master 和所有 slave 的 shell 中导出它们?还是使用 sys.env 在应用程序内部进行?
        • 在 shell 中执行应用程序之前。只需在该 shell 中,无需在除您执行应用程序的 shell 之外的任何其他 shell 中执行它。
        • 在启动应用程序之前导出环境变量有效!谢谢!
        • 注意:在 SparkConf().set(...) 中设置 AWS_ACCESS_KEY_IDfs.s3n.awsAccessKeyId 并没有这样做。在 Env 中设置 AWS_ACCESS_KEY_ID 或在 spark-env.sh 中设置确实有效。可悲的是,非工作案例应该工作
        • 这在 Spark 1.3 之后不再有效。现在,如果您想静态设置它。您必须将 hdfs-site.xml 添加到 Spark 的 conf 目录中。没有办法在命令行中设置它。我不知道这个设计的意义,但它只是发生了
        【解决方案6】:

        我想将凭据更安全地放在我的一个加密分区上的配置文件中。所以我在运行我的 spark 应用程序之前执行了export HADOOP_CONF_DIR=~/Private/.aws/hadoop_conf,并将一个名为core-site.xml 的文件(通过ecryptfs 加密)放在该目录中,其中包含如下凭据:

        <?xml version="1.0"?>
        <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
        <configuration>
          <property>
          <name>fs.s3n.awsAccessKeyId</name>
          <value>my_aws_access_key_id_here</value>
          </property>
          <property>
          <name>fs.s3n.awsSecretAccessKey</name>
          <value>my_aws_secret_access_key_here</value>
          </property>
        </configuration>
        

        HADOOP_CONF_DIR也可以设置在conf/spark-env.sh中。

        【讨论】:

          【解决方案7】:

          对于使用 EMR 的用户,请使用 https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark 中所述的 Spark 构建,并且只需使用 s3:// URI 引用 S3。无需设置 S3 实施或其他配置,因为凭证由 IAM 或角色设置。

          【讨论】:

          • 建议使用 IAM 角色而不是以任何其他方式指定密钥。
          • 来自当前的 4.x EMR 管理员指南:docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/…
          • 我拥有用于​​创建 Spark 集群的 IAM 的所有 S3 权限。但我仍然面临这个错误。
          • @nishant 这是 EMR 吗?如果是这样,什么 EMR 版本?在 EMR 上,不要在 Spark 应用程序中设置 AWS 密钥。
          • @ChristopherB 我定义的角色有问题。我修复了它,现在它可以工作了。
          【解决方案8】:

          在 AWS EMR 上,上述建议不起作用。相反,我在 conf/core-site.xml 中更新了以下属性:

          fs.s3n.awsAccessKeyId 和 fs.s3n.awsSecretAccessKey 以及您的 S3 凭证。

          【讨论】:

          • “以上”没有任何意义,因为答案的顺序可以随时更改。对于 EMR,@ChristopherB 的答案看起来是正确的。
          【解决方案9】:

          以下配置对我有用,请确保您还设置了“fs.s3.impl”:

          val conf = new SparkConf().setAppName("Simple Application").setMaster("local")      
          val sc = new SparkContext(conf)
          val hadoopConf=sc.hadoopConfiguration;
          hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
          hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
          hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
          

          【讨论】:

          • 比将秘密转储到 Env 中要干净得多。
          • 似乎在 Python 中 hadoopConfiguration 属性不可用。任何解决方法的想法?
          • 在 pyspark 中是 hadoopConf=sc._jsc.hadoopConfiguration()
          • @JosephLust 清洁器根本不设置这些,而是​​使用 IAM 角色。当不需要时,您正在强迫某些东西管理和保护这些秘密。
          • @JaysonMinard 好点。由于作业的多租户,我们不在 Spark 集群上使用 EC2 角色,其中特定作业仅限于特定存储桶。我们的分布式配置服务负责为适当的工作提供适当的秘密。
          猜你喜欢
          • 2018-12-08
          • 2017-04-17
          • 1970-01-01
          • 2016-03-28
          • 2013-12-13
          • 1970-01-01
          • 2012-03-27
          • 1970-01-01
          相关资源
          最近更新 更多