【问题标题】:How to avoid one Spark Streaming window blocking another window with both running some native Python code如何避免一个 Spark Streaming 窗口阻塞另一个窗口同时运行一些本机 Python 代码
【发布时间】:2016-05-04 06:25:42
【问题描述】:

我正在使用两个不同的窗口运行 Spark Streaming(一个窗口用于使用 SKLearn 训练模型,另一个用于基于该模型预测值),我想知道如何避免一个窗口(“慢”训练window) 来训练模型,而不会“阻塞”“快速”预测窗口。
我的简化代码如下所示:

conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)

stream = ssc.socketTextStream("localhost", 7000)


import Custom_ModelContainer

### Window 1 ###
### predict data based on model computed in window 2 ###

def predict(time, rdd):
    try:
       # ... rdd conversion to df, feature extraction etc...

       # regular python code 
       X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
       pred = Custom_ModelContainer.getmodel().predict(X)

       # send prediction to GUI

    except Exception, e: print e

predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)


### Window 2 ###
### fit new model ###

def trainModel(time, rdd):
try:
    # ... rdd conversion to df, feature extraction etc...

    X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
    y = np.array(df.map(lambda lp: lp.label).collect())

    # train test split etc...

    model = SVR().fit(X_train, y_train)
    Custom_ModelContainer.setModel(model)

except Exception, e: print e

modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)

(注意:Custom_ModelContainer是我写的一个类,用来保存和检索训练好的模型)

我的设置通常运行良好,但每次在第二个窗口中训练一个新模型时(大约需要一分钟),第一个窗口在模型训练完成之前不会计算预测。实际上,我想这是有道理的,因为模型拟合和预测都是在主节点上计算的(在非分布式设置中 - 由于 SKLearn)。

所以我的问题如下:是否可以在单个工作节点(而不是主节点)上训练模型?如果是这样,我怎样才能实现后者,这真的能解决我的问题吗?

如果没有,关于如何在不延迟窗口 1 中的计算的情况下进行此类设置的任何其他建议?

非常感谢任何帮助。

编辑:我想更普遍的问题是: 如何在两个不同的工作人员上并行运行两个不同的任务?

【问题讨论】:

    标签: python apache-spark scikit-learn spark-streaming


    【解决方案1】:

    我认为您正在寻找的是属性:“spark.streaming.concurrentJobs”,默认为 1。增加它应该允许您并行运行多个 foreachRDD 函数。

    In JobScheduler.scala:

    private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
    

    如果您要并行修改和读取,请注意自定义模型容器上的线程安全。 :)

    【讨论】:

      【解决方案2】:

      免责声明:这只是一组想法。这些都没有经过实际测试。


      您可以尝试以下几件事:

      1. 不要将collect 转为predictscikit-learn 模型通常是可序列化的,因此可以在集群上轻松处理预测过程:

        def predict(time, rdd):
            ... 
        
            model = Custom_ModelContainer.getmodel()
            pred = (df.rdd.map(lambda lp: lp.features.toArray())
                .mapPartitions(lambda iter: model.predict(np.array(list(iter)))))
            ...
        

        它不仅应该并行化预测,而且如果未将原始数据传递到 GUI,还应该减少必须收集的数据量。

      2. 尝试collect异步发送数据。 PySpark 不提供collectAsync 方法,但您可以尝试使用concurrent.futures 实现类似的功能:

        from pyspark.rdd import RDD
        from concurrent.futures import ThreadPoolExecutor
        
        executor = ThreadPoolExecutor(max_workers=4)
        
        def submit_to_gui(*args): ...
        
        def submit_if_success(f):
            if not f.exception():
                executor.submit(submit_to_gui, f.result())
        

        从 1 继续。

        def predict(time, rdd):
            ...
            f = executor.submit(RDD.collect, pred)
            f.add_done_callback(submit_if_success)
            ...
        
      3. 如果您真的想使用本地 scikit-learn 模型,请尝试使用上述期货 collectfit。您也可以尝试只收集一次,尤其是在没有缓存数据的情况下:

        def collect_and_train(df):
            y, X = zip(*((p.label, p.features.toArray()) for p in df.collect()))
            ...
            return SVR().fit(X_train, y_train)
        
        def set_if_success(f):
            if not f.exception():
                Custom_ModelContainer.setModel(f.result())  
        
        def trainModel(time, rdd): 
           ...
            f = excutor.submit(collect_and_train, df)
            f.add_done_callback(set_if_success) 
           ...
        
      4. 使用现有的解决方案(如spark-sklearn)或自定义方法将训练过程移至集群:

        • 简单的解决方案 - 准备您的数据,coalesce(1) 并使用 mapPartitions 训练单个模型。
        • 分布式解决方案 - 使用 mapPartitions 为每个分区创建和验证单独的模型,收集模型并用作集成,例如通过平均或中值预测。
      5. 扔掉scikit-learn 并使用可以在分布式流式环境中训练和维护的模型(例如StreamingLinearRegressionWithSGD)。

        您当前的方法使 Spark 过时了。如果您可以在本地训练模型,那么您很有可能可以在本地机器上更快地执行所有其他任务。否则你的程序将在collect 上失败。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-09-30
        • 2023-03-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多