【问题标题】:How to create a DStream from a List of string?如何从字符串列表创建 DStream?
【发布时间】:2017-03-09 14:41:43
【问题描述】:

我有一个字符串列表,但我找不到将列表更改为火花流 DStream 的方法。 我试过这个:

val tmpList = List("hi", "hello")    
val rdd = sqlContext.sparkContext.parallelize(Seq(tmpList))   
val rowRdd = rdd.map(v => Row(v: _*))

但是eclipse说sparkContext不是sqlContext的成员,所以,我该怎么做呢? 请感谢您的帮助。

【问题讨论】:

    标签: apache-spark streaming dstream


    【解决方案1】:

    DStream 是 RDD 的序列,它是在您将接收到的数据注册到某个流媒体源(如 Kafka)时创建的。为了测试您是否想从 RDD 列表中创建 DStream,您可以执行以下操作:

    val rdd1 = sqlContext.sparkContext.parallelize(Seq(tmpList))
    val rdd2 = sqlContext.sparkContext.parallelize(Seq(tmpList1))
    ssc.queueStream[String](mutable.Queue(rdd1,rdd2))
    

    希望它能回答你的问题。

    【讨论】:

    • 感谢您的回答,我对 Spark 很陌生,我不太明白您的回答。您说:从 RDD 列表中创建 DStream。但是我怎样才能得到一个带有 String 列表的 RDD 列表,因为我不确定我在问题中写的代码是 write。
    • 谢谢,我重写了代码:val sparkContext = new SparkContext(sparkConf) val rdd = sparkContext.parallelize(coutList) val resultInputStream = ssc.queueStream(scala.collection.mutable.Queue(rdd) ) val results = resultInputStream.map(x=>x), sqlContext 是 org.apache.spark.sql.SQLContext 类的对象吗?以及我写的代码是不是写的?
    • 嗨,因为 ssc 是 StreamingContext 的对象:val ssc = new StreamingContext(sparkConf, Seconds(10)),所以当我添加 val sparkContext = new SparkContext(sparkConf) 时,有一个SparkException 说这个 JVM 中可能只运行了一个 SparkContext,所以 sparkContext 和 ssc 之间可能存在中断,你知道为什么吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-23
    • 2011-04-20
    • 2016-02-13
    • 1970-01-01
    • 2016-12-02
    • 1970-01-01
    相关资源
    最近更新 更多