【问题标题】:Spark Redshift with Python使用 Python 的 Spark Redshift
【发布时间】:2016-11-13 10:36:12
【问题描述】:

我正在尝试将 Spark 与亚马逊 Redshift 连接,但出现此错误:

我的代码如下:

from pyspark.sql import SQLContext
from pyspark import SparkContext

sc = SparkContext(appName="Connect Spark with Redshift")
sql_context = SQLContext(sc)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", <ACCESSID>)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", <ACCESSKEY>)

df = sql_context.read \
    .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central-    1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd") \
    .option("dbtable", "table_name") \
    .option("tempdir", "bucket") \
    .load()

【问题讨论】:

    标签: apache-spark amazon-redshift databricks


    【解决方案1】:

    我认为s3n:// URL 样式已被弃用和/或删除。

    尝试将您的密钥定义为"fs.s3.awsAccessKeyId"

    【讨论】:

    • 谢谢,我试过改了,还是一样的错误
    • s3n 仍在使用中且未被弃用,请参考此链接github.com/databricks/…
    【解决方案2】:

    如果您使用的是数据块,我认为您不必创建新的 sql 上下文,因为他们这样做是因为您只需使用 sqlContext,请尝试使用以下代码:

    from pyspark.sql import SQLContext
        sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
        sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
    
    df = sqlContext.read \ .......
    

    可能bucket没有挂载

    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
    

    【讨论】:

      【解决方案3】:

      我认为您需要将.format("com.databricks.spark.redshift") 添加到您的sql_context.read 通话中;我的直觉是 Spark 无法推断此数据源的格式,因此您需要明确指定我们应该使用 spark-redshift 连接器。

      有关此错误的更多详细信息,请参阅https://github.com/databricks/spark-redshift/issues/230

      【讨论】:

        【解决方案4】:

        错误是由于缺少依赖项。

        验证您在 spark 主目录中是否有这些 jar 文件:

        1. spark-redshift_2.10-3.0.0-preview1.jar
        2. RedshiftJDBC41-1.1.10.1010.jar
        3. hadoop-aws-2.7.1.jar
        4. aws-java-sdk-1.7.4.jar
        5. (aws-java-sdk-s3-1.11.60.jar)(较新的版本,但并非一切都适用)

        把这些jar文件放到$SPARK_HOME/jars/然后启动spark

        pyspark --jars $SPARK_HOME/jars/spark-redshift_2.10-3.0.0-preview1.jar,$SPARK_HOME/jars/RedshiftJDBC41-1.1.10.1010.jar,$SPARK_HOME/jars/hadoop-aws-2.7.1.jar,$SPARK_HOME/jars/aws-java-sdk-s3-1.11.60.jar,$SPARK_HOME/jars/aws-java-sdk-1.7.4.jar
        

        (SPARK_HOME 应该是 = "/usr/local/Cellar/apache-spark/$SPARK_VERSION/libexec")

        这将运行带有所有必要依赖项的 Spark。请注意,如果您使用 awsAccessKeys,您还需要指定身份验证类型 'forward_spark_s3_credentials'=True。

        from pyspark.sql import SQLContext
        from pyspark import SparkContext
        
        sc = SparkContext(appName="Connect Spark with Redshift")
        sql_context = SQLContext(sc)
        sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", <ACCESSID>)
        sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", <ACCESSKEY>)
        
        df = sql_context.read \
             .format("com.databricks.spark.redshift") \
             .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central-    1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd") \
             .option("dbtable", "table_name") \
             .option('forward_spark_s3_credentials',True) \
             .option("tempdir", "s3n://bucket") \
             .load()
        

        之后的常见错误是:

        • Redshift 连接错误:“SSL 关闭”
          • 解决方案: .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central- 1.redshift.amazonaws.com:5439/agcdb?user=user&amp;password=pwd?ssl=true&amp;sslfactory=org.postgresql.ssl.NonValidatingFactory")
        • S3 错误:卸载数据时,例如在 df.show() 之后,您会收到消息:“您尝试访问的存储桶必须使用指定的端点进行寻址。请将所有未来的请求发送到此端点。”
          • 解决方案:bucket 和 cluster 必须在同一个 region 内运行

        【讨论】:

        • 伙计,我使用了此处列出的依赖项的确切列表,但仍然出现错误:java.lang.IllegalArgumentException: Invalid hostname in URI s3n://somewherejava.lang.IllegalArgumentException: Invalid hostname in URI s3://somweheres3a 给了我java.lang.NoClassDefFoundError: com/amazonaws/SdkClientException at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235) :@
        【解决方案5】:

        这是连接到 redshift 的分步过程。

        • 下载红移连接器文件。试试下面的命令
        wget "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.2.1.1001.jar"
        
        • 将以下代码保存在 python 文件中(您要运行的.py)和 相应地替换凭据。
        from pyspark.conf import SparkConf
        from pyspark.sql import SparkSession
        
        #initialize the spark session 
        spark = SparkSession.builder.master("yarn").appName("Connect to redshift").enableHiveSupport().getOrCreate()
        sc = spark.sparkContext
        sqlContext = HiveContext(sc)
        
        sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "<ACCESSKEYID>")
        sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "<ACCESSKEYSECTRET>")
        
        
        taxonomyDf = sqlContext.read \
            .format("com.databricks.spark.redshift") \
            .option("url", "jdbc:postgresql://url.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") \
            .option("dbtable", "table_name") \
            .option("tempdir", "s3://mybucket/") \
            .load() 
        
        • 如下运行 spark-submit
        spark-submit --packages com.databricks:spark-redshift_2.10:0.5.0 --jars RedshiftJDBC4-1.2.1.1001.jar test.py
        

        【讨论】:

          【解决方案6】:

          如果您使用 Spark 2.0.4 并在 AWS EMR 集群上运行您的代码,请按照以下步骤操作:-

          1) 使用以下命令下载 Redshift JDBC jar:-

          wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar
          

          参考:-AWS Document

          2) 将以下代码复制到 python 文件中,然后将所需的值替换为您的 AWS 资源:-

          import pyspark
          from pyspark.sql import SQLContext
          from pyspark.sql import SparkSession
          
          spark = SparkSession.builder.getOrCreate()
          
          spark._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "access key")
          spark._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "secret access key")
          
          sqlCon = SQLContext(spark)
          df = sqlCon.createDataFrame([
              (1, "A", "X1"),
              (2, "B", "X2"),
              (3, "B", "X3"),
              (1, "B", "X3"),
              (2, "C", "X2"),
              (3, "C", "X2"),
              (1, "C", "X1"),
              (1, "B", "X1"),
          ], ["ID", "TYPE", "CODE"])
          
          df.write \
            .format("com.databricks.spark.redshift") \
            .option("url", "jdbc:redshift://HOST_URL:5439/DATABASE_NAME?user=USERID&password=PASSWORD") \
            .option("dbtable", "TABLE_NAME") \
            .option("aws_region", "us-west-1") \
            .option("tempdir", "s3://BUCKET_NAME/PATH/") \
            .mode("error") \
            .save()
          

          3) 运行以下 spark-submit 命令:-

          spark-submit --name "App Name" --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 --py-files python_script.py python_script.py
          

          注意:-

          1) 在Reshift集群的安全组的入站规则中应该允许EMR节点的Public IP地址(spark-submit作业将在该节点上运行)。

          2) Redshift 集群和“tempdir”下使用的 S3 位置应该在同一个地理位置。在上面的示例中,这两个资源都在 us-west-1 中。

          3) 如果数据很敏感,请确保保护所有通道。为了确保连接安全,请按照提到的步骤here配置下。

          【讨论】:

            猜你喜欢
            • 2018-07-17
            • 1970-01-01
            • 1970-01-01
            • 2019-10-22
            • 1970-01-01
            • 1970-01-01
            • 2017-04-17
            • 2015-04-16
            • 1970-01-01
            相关资源
            最近更新 更多