【问题标题】:How to stream data from Azure eventhub using Pyspark?如何使用 Pyspark 从 Azure eventthub 流式传输数据?
【发布时间】:2019-12-04 13:18:48
【问题描述】:

我在 azure 中创建了一个事件中心,并使用 python 脚本在其上发布了一些消息。我可以使用另一个 python 脚本从事件中心获取消息,但我无法使用 Pyspark 流式传输消息。下面是我用来流式传输消息的 Pyspark 代码:

connectionString = <MyConnectionString>

ehConf = {
  'eventhubs.connectionString' : connectionString
}

ehConf['eventhubs.consumerGroup'] = "$default"

df = spark.readStream.format("eventhubs").options(**ehConf).load()

df.writeStream.format("parquet").outputMode("append").option("path", "azure_streaming_test").option("checkpointLocation", "azure_streaming_checkpoint").start()
query.awaitTermination()

通过在 Pyspark shell 中运行上述代码,我遇到了以下错误:

java.lang.IncompatibleClassChangeError:方法 'com.microsoft.azure.eventhubs.EventHubClient com.microsoft.azure.eventhubs.EventHubClient.createSync(java.lang.String, java.util.concurrent.ScheduledExecutorService)' 必须是 InterfaceMethodref常数

附上报错信息截图。

需要纠正什么?提前致谢! Error Message

【问题讨论】:

    标签: azure apache-spark pyspark spark-streaming azure-eventhub


    【解决方案1】:

    我确实尝试过重现这个问题,下面的代码运行良好。你能告诉我你使用的是哪个版本的 Pyspark 吗?您找到问题的解决方案了吗?

    from pyspark.sql.functions import *
    
    from pyspark.sql.types import *
    
    connectionString = "XX"
    
    
    ehConf = {
    
      'eventhubs.connectionString' : connectionString
    
    }
    
    ehConf['eventhubs.consumerGroup'] = "$Default"
    
    df = spark.readStream.format("eventhubs").options(**ehConf).load()
    
    Schema = StructType([StructField("cardNumber", StringType(), True),
    
                          StructField("transactionId", StringType(), True),
    
                          StructField("transactionTime", StringType(), True)                   
    
    
    
                        ])
    
    rawData = df. \
    
      selectExpr("cast(Body as string) as json"). \
    
      select(from_json("json", Schema).alias("data")). \
    
      select("data.*")
    
    parsedData=rawData.select('transactionId','cardNumber','transactionTime')  
    
    display(parsedData)
    
    
    
    df.writeStream.format("parquet").outputMode("append").option("path", "/azure_streaming_test").option("checkpointLocation", "/azure_streaming_checkpoint").start()
    

    【讨论】:

    • 感谢 Himanshu 的帮助。不确定错误的确切原因。但现在我在 Azure/HD Insight 集群上的 Databricks 上运行相同的代码,并且运行良好。之前我在 AWS EMR 上运行它。
    猜你喜欢
    • 1970-01-01
    • 2019-04-15
    • 1970-01-01
    • 1970-01-01
    • 2018-08-28
    • 1970-01-01
    • 2016-02-16
    • 2016-03-24
    • 1970-01-01
    相关资源
    最近更新 更多