【问题标题】:Creating single object DataFrame for predictions为预测创建单个对象 DataFrame
【发布时间】:2017-07-08 00:43:02
【问题描述】:

一旦我的分类模型得到训练,我希望它们在我的 Web 应用程序中使用,以便对为给定会话收集的数据进行分类预测。 那是: 1) 我有一些会话数据结构需要映射到 DataFrame 行 2) 将 DataFrame 行输入到我的 ML 模型中以预测分类 3) 将预测与原始会话一起使用,在浏览器前向用户显示。

到目前为止,我所看到的创建 DataFrame 作为 Spark 管道输入的示例是从文件等数据源创建它的。现在,首先创建单个 POJO 或 JsonNode,将其序列化为仅包含记录的文件,然后使用该文件创建 DataFrame 来提供模型,这似乎有点笨拙。 在写这篇文章时,我也觉得为每个请求创建和拆除 ML 管道可能不是一个好主意,这似乎遵循了这种方法。

所以也许我应该更好地考虑“Spark Streaming”?

将映射的会话数据输入某种消息队列并将其输入我的 Spark 管道?什么样的“流”适合这里?

我在某处读到,Spark 流式处理以微批次的形式消耗流,而不是逐条记录 - 这意味着在收集到足够的记录以填满微批次之前会有一些延迟(或者在考虑微批次之前有一些预配置的延迟要“足够饱”)。这对 Web 应用程序的响应能力意味着什么?我可以每 100 毫秒触发一次微批处理吗?

如果有人能指出我正确的方向,我将不胜感激。 也许 Spark 不适合这里,我应该切换到 Apache Flink?

提前致谢,伯恩德

【问题讨论】:

  • 不,您不需要 Spark Streaming(尽管您也可以用它实现一些东西)。您需要的是为您的模型服务。如果您在 Spark 中执行此操作,它是一个保持实时 Spark 会话的应用程序。它接受来自您的 Web 应用程序的请求并将其传递给 Spark 进行评分。您通常会根据请求(而不是文件!)构建单记录数据框,并将其传递给您从训练模型中获得的 SparkML transformer。然后你用结果回复你的客户。

标签: spark-streaming


【解决方案1】:

好的,现在我已经找到了一些方法来解决我的问题,也许是这样 帮助别人:

使用包含一个元组的序列并分别命名列

val df= spark.createDataFrame(
  Seq("val1", "val2")
).toDF("label1", "label2")    

使用 JSON 字符串

val sqlContext = spark.sqlContext 
val jsonData= """{ "label1": "val1", "label2": "val2" }"""    
val rdd= sparkSession.sparkContext.parallelize(Seq(jsonData))
val df= sqlContext.read.json(rdd)

不工作:从序列案例类对象创建:

val sqlContext = sparkSession.sqlContext 
import sqlContext.implicits._

val myData= Seq(Feat("value1", "value2"))
val ds: Dataset[Feat]= myData.toDS()
ds.show(10, false)

编译正常,但在运行时产生异常:

 [错误] a.a.OneForOneStrategy - java.lang.RuntimeException:
编码时出错:java.lang.ClassCastException:
es.core.recommender.Feat 不能转换为 es.core.recommender.Feat

我很想包含更多的堆栈跟踪,但是这个光荣的编辑器 不会让我...

很高兴知道为什么这个替代方案不起作用......

【讨论】:

    猜你喜欢
    • 2021-03-01
    • 1970-01-01
    • 2016-01-24
    • 2017-12-11
    • 2020-12-05
    • 2014-06-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多