【问题标题】:How to set up Kafka as a dependency when using Delta Lake in PySpark?How to set up Kafka as a dependency when using Delta Lake in PySpark?
【发布时间】:2022-12-01 21:41:58
【问题描述】:

This is the code to set up Delta Lake as part of a regular Python script, according to their documentation:

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

The official docs for Kafka integration in Spark show how to set up Kafka when using a spark-submit command (through the --packages parameter), but not in Python.

Digging around, turns out that you can also include this parameter when building the Spark session:

import pyspark
from delta import *

packages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1",
]

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", ",".join(packages))
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

However, when I try to stream to Kafka using the spark session created above I still get the following error:

Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

I'm using Delta 2.1.0 and PySpark 3.3.0.

【问题讨论】:

    标签: python pyspark apache-kafka databricks delta-lake


    【解决方案1】:

    Turns out that Delta overwrites any packages provided in spark.jars.packages if you're using configure_spark_with_delta_pip (source). The proper way is to make use of the extra_packages parameter when setting up your Spark Session:

    import pyspark
    from delta import *
    
    packages = [
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1",
    ]
    
    builder = pyspark.sql.SparkSession.builder.appName("MyApp") 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    
    spark = configure_spark_with_delta_pip(builder, extra_packages=packages).getOrCreate()
    

    【讨论】:

      猜你喜欢
      • 2022-11-09
      • 2022-12-02
      • 2022-12-27
      • 2022-12-26
      • 2022-12-27
      • 2022-12-27
      • 2022-12-19
      • 1970-01-01
      • 2022-12-02
      相关资源
      最近更新 更多