【发布时间】:2020-07-14 23:07:52
【问题描述】:
我正在编写一个有状态流式处理应用程序,我在其中使用 mapGroupsWithState 为组创建聚合,但我需要基于输入行中的多个列创建组。 “Spark:权威指南”中的所有示例仅使用一列,例如“用户”或“设备”。我正在使用类似于下面给出的代码。 如何在“groupByKey”中指定多个字段?
还有其他挑战。这本书说我们可以按照下面给出的方式使用“updateAcrossEvents”,但我得到编译时错误提示:错误:(43, 65) missing argument list for method updateAcrossEvents in object Main
只有在需要函数类型时,未应用的方法才会转换为函数。
您可以通过编写 updateAcrossEvents _ 或 updateAcrossEvents(_,_,_,_,_) 而不是 updateAcrossEvents 来明确此转换。
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
另一个挑战:编译器还抱怨我的 MyReport:错误:(41, 12) 无法找到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_ 将在未来的版本中添加对序列化其他类型的支持。
我们将不胜感激帮助解决这些错误。提前致谢。
withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
updateAcrossEvents:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {
var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
}
state
}
updateWithEvent:
def updateWithEvent(state: MyState, report: MyReport): MyState = {
state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())
state
}
【问题讨论】:
标签: scala apache-spark databricks spark-structured-streaming