【问题标题】:How to use EventHubsForeachWriter in Azure Databricks如何在 Azure Databricks 中使用 EventHubsForeachWriter
【发布时间】:2018-11-28 14:46:36
【问题描述】:

我正在尝试使用 EventHubsForeachWriter,如图 here:

val ehConf = EventHubsConf("YOUR_CONNECTION_STRING") 
val writer = EventHubsForeachWriter(ehConf)

val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

但我遇到了一个例外:

notebook:22: error: type mismatch;
 found   : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
 required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
    .foreach(writer)

【问题讨论】:

  • 对我来说也一样。希望得到修复并更新您。在此期间您没有找到解决方案?

标签: spark-structured-streaming azure-eventhub azure-databricks


【解决方案1】:

在这个 github issue 中找到了答案。

所以,我想以下应该可行:

val query =
  streamingSelectDF
    .select(to_json(struct("*")) as 'body)
    .selectExpr("cast(body as string)")
    .as[String]
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

【讨论】:

  • 是的,我也是在那里发布问题的人.. :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-29
  • 2020-07-07
  • 2022-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多