【问题标题】:Spark 2.4.0 dependencies to write to AWS Redshift写入 AWS Redshift 的 Spark 2.4.0 依赖项
【发布时间】:2019-09-06 08:40:50
【问题描述】:

我正在努力寻找正确的包依赖项及其相关版本,以便使用 Pyspark 微批处理方法写入 Redshfit DB。

实现这个目标的正确依赖是什么?

【问题讨论】:

    标签: apache-spark pyspark spark-streaming amazon-redshift


    【解决方案1】:

    正如AWS tutorial 所建议的,必须提供 JDBC 驱动程序

    wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar
    

    下载此 jar 并将其提供给 spark-submit 命令后,我就是这样为其提供依赖项的:

    spark-submit --master yarn --deploy-mode cluster \
      --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar \
      --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 \
      my_script.py
    

    最后这是我提供给spark-submitmy_script.py

    from pyspark.sql import SparkSession
    
    def foreach_batch_function(df, epoch_id, table_name):
        df.write\
            .format("com.databricks.spark.redshift") \
            .option("aws_iam_role", my_role) \
            .option("url", my_redshift_url) \
            .option("user", my_redshift_user) \
            .option("password", my_redshift_password) \
            .option("dbtable", my_redshift_schema + "." + table_name)\
            .option("tempdir", "s3://my/temp/dir") \
            .mode("append")\
            .save()
    
    spark = SparkSession.builder.getOrCreate()
    
    sc = spark.sparkContext
    
    sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", my_aws_access_key_id)
    sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", my_aws_secret_access_key)
    
    my_schema = spark.read.parquet(my_schema_file_path).schema
    
    df = spark \
        .readStream \
        .schema(my_schema) \
        .option("maxFilesPerTrigger", 100) \
        .parquet(my_source_path)
    
    df.writeStream \
        .trigger(processingTime='30 seconds') \
        .foreachBatch(lambda df, epochId: foreach_batch_function(df, epochId, my_redshift_table))\
        .option("checkpointLocation", my_checkpoint_location) \
        .start(outputMode="update").awaitTermination()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-08
      • 2016-03-13
      • 1970-01-01
      • 2021-11-12
      • 2020-05-08
      • 2019-10-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多