【问题标题】:Spark Streaming with mapGroupsWithState使用 mapGroupsWithState 进行 Spark 流式处理
【发布时间】:2020-07-14 23:07:52
【问题描述】:

我正在编写一个有状态流式处理应用程序,我在其中使用 mapGroupsWithState 为组创建聚合,但我需要基于输入行中的多个列创建组。 “Spark:权威指南”中的所有示例仅使用一列,例如“用户”或“设备”。我正在使用类似于下面给出的代码。 如何在“groupByKey”中指定多个字段?

还有其他挑战。这本书说我们可以按照下面给出的方式使用“updateAcrossEvents”,但我得到编译时错误提示:错误:(43, 65) missing argument list for method updateAcrossEvents in object Main 只有在需要函数类型时,未应用的方法才会转换为函数。 您可以通过编写 updateAcrossEvents _updateAcrossEvents(_,_,_,_,_) 而不是 updateAcrossEvents 来明确此转换。 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)

另一个挑战:编译器还抱怨我的 MyReport:错误:(41, 12) 无法找到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_ 将在未来的版本中添加对序列化其他类型的支持。

我们将不胜感激帮助解决这些错误。提前致谢。

withEventTime
 .as[MyReport]
 .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
 .writeStream
 .queryName("test_query")
 .format("memory")
 .outputMode("update")
 .start()

updateAcrossEvents:

def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {

 var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)

 for (input <- inputs) {
 state = updateWithEvent(state, input)
 oldState.update(state)
 }

 state
}

updateWithEvent:

def updateWithEvent(state: MyState, report: MyReport): MyState = {

 state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
 state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())

 state
}

【问题讨论】:

    标签: scala apache-spark databricks spark-structured-streaming


    【解决方案1】:

    您可以形成一个键元组 - 检查此代码:

    withEventTime
     .as[MyReport]
     .groupByKey(row => (row.getKeys.getKey1,row.getKeys.getKey2)) 
     .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
     .writeStream
     .queryName("test_query")
     .format("memory")
     .outputMode("update")
     .start()
    

    现在您获得了一个用于 (getKey1, getKey2) 组合的唯一组。您可能需要相应地更改更新功能。

    第二个问题:

    是的,spark 默认只支持案例类和原始类型。

    要消除此错误,请确保“MyReport”是一个案例类,并在上述代码之前使用以下代码导入隐式:

    import <your_spark_session_variable>.implicits._
    

    【讨论】:

    • 好的。我会尝试这些改变。 'updateAcrossEvents' 的错误呢?
    • 添加了“updateAcrossEvents”方法。仍然收到“未应用的方法...”错误消息。
    • 另外,当你按 2 个键分组时,你应该有 tuple2 而不是 tuple3
    • 您的 updateWithEvent 函数是否返回“MyState”对象?
    • 是的,如上面的代码所示。我做的不对吗?
    猜你喜欢
    • 2017-06-26
    • 2019-12-10
    • 2023-04-03
    • 2018-03-22
    • 2018-01-11
    • 2016-09-28
    • 2017-07-03
    • 2015-04-23
    • 2017-03-16
    相关资源
    最近更新 更多