【问题标题】:how do I fix the filter map and mapWithState function如何修复过滤器映射和 mapWithState 函数
【发布时间】:2018-06-28 08:08:55
【问题描述】:

我是一个新的 scala corder,我有一个 flatMap 函数,它返回一个 FlatMappedDStream 对象,它是一个触发流式作业,句柄函数返回一个 Map[String, Any],代码如下:

val parseAction = filterActions.flatMap(record => ParseOperation.parseMatch(categoryMap, record))

函数定义:

val parseMatch = (categoryMap: collection.Map[Int, String], record: Map[String, Any]) => {
record.get("operation").get.toString match {
  case "view" => parseView(categoryMap, record)
  case "impression" => parseRecord(record)
  case "click" => parseRecord(record)
  case _ => ListBuffer.apply(record)

}

}

parseMatch函数返回处理后的流式记录,类型为Map[String, Any],现在我想打印所有结果并放入新的过滤器map函数和mapWithState函数,我尝试了,但它不起作用。 错误代码如下:

val finalActions = parseAction.filter(record => record.get("invalid").get == None)

val userModels = finalActions.map(record => (record.asInstanceOf[Map[String, Any]].getOrElse("deviceid", ""), record))
                       .mapWithState(StateSpec.function(stateUpdateFunction))

mapWithState 函数是:

val stateUpdateFunction = (deviceId: Any, newRecord: Option[Map[String, Any]], stateData: State[Map[String, Any]]) => {
  XXXX

}

但是filter函数和mapWithState函数不正确,如何解决?

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    我已经修复了它,我将返回类型从 Map[String,Any] 修改为 ListBuffer[Map[String,Any]],它确实有效!

    【讨论】:

      猜你喜欢
      • 2021-09-15
      • 2022-01-14
      • 2011-06-30
      • 1970-01-01
      • 2019-12-26
      • 2019-09-29
      • 1970-01-01
      • 2020-04-06
      • 2021-12-27
      相关资源
      最近更新 更多