【问题标题】:Read CSV File in Spark and Write it to Cassandra在 Spark 中读取 CSV 文件并将其写入 Cassandra
【发布时间】:2015-04-30 11:48:17
【问题描述】:

我正在尝试使用 Spark 读取 CVS 文件,然后将其保存到 Cassandra。当我使用琐碎的值时,保存到 Cassandra 是有效的。

我有一个包含以下值的文件:

id,name,tag1|tag2|tag3

我想将它存储在 cassandra 表中:

id bigint, name varchar, tags set

我为此定义了一个案例类:

case class Item(id:Integer,name:String,tag:Set[String])

然后我使用这个表达式从 CVS 文件中取出 RDD

val items = sc.textFile("items.csv").map(l => l.split(",") match {case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})

当我现在对项目(开始处理)调用 collectsaveToCassandra 时,我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 1 times, most recent failure: Lost task 1.0 in stage 29.0 (TID 38, localhost): scala.MatchError: [Ljava.lang.String;@6030bbe6 (of class [Ljava.lang.String;) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

  • 什么是 case class Item(id:Integer,name:String,tag[String]) ?标签是 Set[String] 吗?
  • 是的,它是一个 Set[String]。我在上面改了

标签: scala csv apache-spark


【解决方案1】:

如前所述,问题在于对某些输入进行拆分会生成一个数组,该数组的元素少于或多于匹配中使用的 3 个元素。

但用于匹配的partialFuntion 可用于过滤匹配 符合匹配条件的元素。 rdd.collect{partialFunction} 正是为此而生的:

val data = sc.textFile("items.csv")
val arrayData = data.map(l => l.split(","))
val items = arrayData.collect{case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})
 items.saveToCassandra(...)
  • 注意1:您还应该防止脏值。例如parseInt 在一个不是 int 数字的值上,...)
  • 注意2:rdd.collect{partialFunc}(使用部分函数过滤/映射数据)不应与rdd.collect(将数据返回给驱动程序)混淆)

【讨论】:

  • 你们说的都对,数据格式不正确。我将此标记为已接受的答案,因为当您不依赖整个数据时,它提供了一个很好的解决方法。
【解决方案2】:

如果您的输入 不是 3 个条目的数组,例如

,您将收到匹配错误
String("a,b").split(",") match {
   case Array(a,b,c) => ....
}

所以我怀疑这是一些输入数据问题,您需要在 match 中解决它。

【讨论】:

    【解决方案3】:

    我使用下面来保存我的 CSV 文件“|”与 cassandra DB 分开。 希望这会有所帮助

    package com
    import java.io.FileInputStream
    import java.util.Properties
    import org.apache.log4j.LogManager
    import org.apache.spark.{SparkConf, SparkContext}
    
    object CsvLoad {
      def main(args: Array[String]): Unit = {
        val log = LogManager.getRootLogger
    
        log.info("**********JAR EXECUTION STARTED**********")
        val properties: Properties = new Properties
        properties.load(new FileInputStream(args(0)))
    
        val sparkConf = new SparkConf()
          .setAppName(getClass.getName)
          .set("spark.cassandra.connection.host", properties.getProperty("CASSANDRA_HOST"))
          .set("spark.cassandra.connection.port", properties.getProperty("CASSANDRA_PORT"))
          .setMaster("local[*]")
          .set("spark.cassandra.auth.username", properties.getProperty("CASSANDRA_USERNAME"))
          .set("spark.cassandra.auth.password", "Jcloud@1357")
          .set("spark.cassandra.output.concurrent.writes", "32")
          .set("spark.cassandra.output.consistency.level", "ONE")
          .set("spark.cassandra.input.split.size_in_mb","67108864")
          .set("spark.cassandra.output.batch.grouping.key", "none")
    
         val sc=new SparkContext(sparkConf)
        val spark = new org.apache.spark.sql.SQLContext(sc)
        val data = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema", "true").
          load("D:\\data.csv")
    
        val output = data.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "mytable", "keyspace" -> "test", "cluster" -> "Test Cluster"))
        output.save()
      }
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-10-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-13
      • 1970-01-01
      • 2018-08-09
      • 1970-01-01
      相关资源
      最近更新 更多