【发布时间】:2017-09-18 12:44:25
【问题描述】:
我有一个 scala 代码,它以 csv 作为输入,读取每一行,对每一行执行文档分类并将预测的文档标签存储到 MySQL 数据库中。
sn-p 的问题是,有时 csv 有 3200 行,完成整个操作需要很长时间。我需要转换此代码,例如 csv 在执行程序之间分发,执行文档预测并存储标签。
下面是代码sn -p -
val reader = new CSVReader(new FileReader(args(4)))
var readFirstLine = false;
for (row <- reader.readAll) {
if(readFirstLine) {
var date = row(1).split(" ");
var split_date = date(0).split('-').toList;
val documentTransformed = tf.transform(row(2).split(" "))
val emotionPredicted = model.predict(documentTransformed)
val emotionMapped = emotionMaps(emotionPredicted);
//Insert Emotions
var query = "insert into emotions_values(user_id, year, month, day, emotion)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ emotionMapped +"')";
statement.executeUpdate(query)
val polarityPredicted = polarityModel.predict(documentTransformed)
val polarityMapped = polarityMaps(polarityPredicted);
//Insert Polarity
var polarityQuery = "insert into polarity_values(user_id, year, month, day, polarity)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ polarityMapped +"')";
statement.executeUpdate(polarityQuery)
}
else {
readFirstLine = true;
}
}
【问题讨论】:
-
这是一个相当宽泛的问题——您似乎是在要求我们为您编写一个完整的 Spark 作业。如果您可以将范围缩小到需要帮助的特定 Spark 问题,您可能会得到更好的答案。
-
我需要做些什么来将我的行从 csv 分发到执行程序,进行文档标记并将预测标记插入到 mySQL?我已经在做文档标签并将数据插入到 mysql 中。我只需要了解如何将 csv 行分发给执行者?
-
有一个用于 spark 的 csv 阅读器:github.com/databricks/spark-csv 我不确定它是否已过时。
-
你不需要这个库,因为它的功能已经被 Spark 本身吸收了。
标签: apache-spark apache-spark-sql rdd apache-spark-mllib