【问题标题】:Can a model be created on Spark batch and use it in Spark streaming?可以在 Spark 批处理上创建模型并在 Spark 流式传输中使用它吗?
【发布时间】:2016-05-09 11:11:36
【问题描述】:

我可以在 Spark 批处理中创建一个模型并将其用于 Spark 流式处理以进行实时处理吗?

我在 Apache Spark 网站上看到了各种示例,其中训练和预测都建立在相同类型的处理(线性回归)之上。

【问题讨论】:

  • 据我了解,Spark 流式传输并不是真正的流式传输。它将流分成批次,这将使您训练的批次模型能够很好地工作。如果你想对实时数据进行真正的流处理,你可以看看 Kafka、Flink 或 Storm。
  • @erip 我不确定 OP 在亚秒级流媒体中的意思是“实时”。
  • @YuvalItzchakov 它还有什么其他含义?实时批量训练数据?
  • @erip 你永远不知道他们说“实时”是什么意思。可能 2-3 秒的处理时间对他们来说就足够了。

标签: apache-spark machine-learning spark-streaming


【解决方案1】:

我可以在 Spark 批处理中创建一个模型并将其用于 Spark 流式处理以进行实时处理吗?

当然,是的。在 Spark 社区中,他们称之为离线训练在线预测。 Spark 中的许多训练算法允许您将模型保存在文件系统 HDFS/S3 上。流式应用程序可以加载相同的模型。您只需调用模型的 predict 方法进行预测即可。

请参阅this link 中的 Streaming + MLLib 部分。

例如,如果您想离线训练决策树并在线进行预测......

批量申请 -

    val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,impurity, maxDepth, maxBins)
    model.save(sc, "target/tmp/myDecisionTreeClassificationModel")

在流式应用程序中 -

    val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
    sameModel.predict(newData)

【讨论】:

  • 在batch中,sc属于sparkcontext,sc在streaming中又属于spark batch或者它的sparkstreamingcontext。我正在使用 Java 开发应用程序.. 在这里我看到保存和加载操作仅适用于 Sparkcontext(Scala) 但不适用于 JavaSparkContext
  • @Saurabh StreamingContext 包装了原始的 SparkContext。对于 java 和 scala 也是如此。查看this 示例,切换到“Java”选项卡以获取线性回归保存和加载示例(每个示例的最后两行)。看JavaStreamingContext的这个javadoc,sparkContext()方法会返回JavaSparkContext。
【解决方案2】:

这是我刚刚实施的另一种解决方案。

我在 spark-Batch 中创建了一个模型。 假设最终的模型对象名称是 regmodel。

final LinearRegressionModel regmodel =algorithm.run(JavaRDD.toRDD(parsedData));

并且火花上下文名称是 sc as

JavaSparkContext sc = new JavaSparkContext(sparkConf);

现在,我在同一代码中使用同一 sc 创建火花流

final JavaStreamingContext jssc = new JavaStreamingContext(sc,new Duration(Integer.parseInt(conf.getWindow().trim())));

并像这样进行预测:

JavaPairDStream<Double, Double> predictvalue = dist1.mapToPair(new PairFunction<LabeledPoint, Double,Double>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<Double, Double> call(LabeledPoint v1) throws Exception {
                    Double p = v1.label();
                    Double q = regmodel.predict(v1.features());
                    return new Tuple2<Double, Double>(p,q);
                }
            });

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-01-11
    • 2019-05-19
    • 1970-01-01
    • 1970-01-01
    • 2018-05-27
    • 2017-12-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多