【问题标题】:Convert Scala Serialized Code to perform Parallel Operations转换 Scala 序列化代码以执行并行操作
【发布时间】:2017-09-18 12:44:25
【问题描述】:

我有一个 scala 代码,它以 csv 作为输入,读取每一行,对每一行执行文档分类并将预测的文档标签存储到 MySQL 数据库中。

sn-p 的问题是,有时 csv 有 3200 行,完成整个操作需要很长时间。我需要转换此代码,例如 csv 在执行程序之间分发,执行文档预测并存储标签。

下面是代码sn -p -

    val reader = new CSVReader(new FileReader(args(4)))
    var readFirstLine = false;

    for (row <- reader.readAll) {
        if(readFirstLine) {
            var date = row(1).split(" ");
            var split_date = date(0).split('-').toList;
            val documentTransformed = tf.transform(row(2).split(" "))
            val emotionPredicted = model.predict(documentTransformed)
            val emotionMapped = emotionMaps(emotionPredicted);          

            //Insert Emotions               
            var query = "insert into emotions_values(user_id, year, month, day, emotion)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ emotionMapped +"')";
            statement.executeUpdate(query)

            val polarityPredicted = polarityModel.predict(documentTransformed)
            val polarityMapped = polarityMaps(polarityPredicted);

            //Insert Polarity
            var polarityQuery = "insert into polarity_values(user_id, year, month, day, polarity)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ polarityMapped +"')";
            statement.executeUpdate(polarityQuery)
        }
        else {
            readFirstLine = true;
        }
    }

【问题讨论】:

  • 这是一个相当宽泛的问题——您似乎是在要求我们为您编写一个完整的 Spark 作业。如果您可以将范围缩小到需要帮助的特定 Spark 问题,您可能会得到更好的答案。
  • 我需要做些什么来将我的行从 csv 分发到执行程序,进行文档标记并将预测标记插入到 mySQL?我已经在做文档标签并将数据插入到 mysql 中。我只需要了解如何将 csv 行分发给执行者?
  • 有一个用于 spark 的 csv 阅读器:github.com/databricks/spark-csv 我不确定它是否已过时。
  • 你不需要这个库,因为它的功能已经被 Spark 本身吸收了。

标签: apache-spark apache-spark-sql rdd apache-spark-mllib


【解决方案1】:

您需要做的就是使用 Spark 中内置的 CSV 功能:

sparkSession.read
    .option("header", "true")
    .option("inferSchema", "true") //Maybe
    .csv(args(4))
    .rdd { row =>
       ...
    }

这会将 CSV 的内容转换为 RDD,然后您可以根据需要对其进行操作。请注意,只需将header 选项设置为true 会忽略第一行。

我建议您研究一下您是否可以使用由 csv 方法返回的 DataFrame - 这将使您能够利用 Spark 中的 Catalyst optimizations - 而不是由返回的 RDD rdd 方法。

【讨论】:

    猜你喜欢
    • 2021-07-17
    • 1970-01-01
    • 2016-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-28
    • 2015-01-27
    • 1970-01-01
    相关资源
    最近更新 更多