【发布时间】:2018-06-28 08:08:55
【问题描述】:
我是一个新的 scala corder,我有一个 flatMap 函数,它返回一个 FlatMappedDStream 对象,它是一个触发流式作业,句柄函数返回一个 Map[String, Any],代码如下:
val parseAction = filterActions.flatMap(record => ParseOperation.parseMatch(categoryMap, record))
函数定义:
val parseMatch = (categoryMap: collection.Map[Int, String], record: Map[String, Any]) => {
record.get("operation").get.toString match {
case "view" => parseView(categoryMap, record)
case "impression" => parseRecord(record)
case "click" => parseRecord(record)
case _ => ListBuffer.apply(record)
}
}
parseMatch函数返回处理后的流式记录,类型为Map[String, Any],现在我想打印所有结果并放入新的过滤器map函数和mapWithState函数,我尝试了,但它不起作用。 错误代码如下:
val finalActions = parseAction.filter(record => record.get("invalid").get == None)
val userModels = finalActions.map(record => (record.asInstanceOf[Map[String, Any]].getOrElse("deviceid", ""), record))
.mapWithState(StateSpec.function(stateUpdateFunction))
mapWithState 函数是:
val stateUpdateFunction = (deviceId: Any, newRecord: Option[Map[String, Any]], stateData: State[Map[String, Any]]) => {
XXXX
}
但是filter函数和mapWithState函数不正确,如何解决?
【问题讨论】:
标签: scala apache-spark spark-streaming