【发布时间】:2016-11-15 17:46:43
【问题描述】:
我有一个流式应用程序,当我尝试调用 HiveContext.getOrCreate 时,它会因以下 stmt 出错。 '包 hive 中的对象 HiveContext 无法在包 org.apache.spark.sql.hive 中访问'
我的应用程序需要 HiveContext 而不是 SQLContext,并且每次都创建新的 HiveContext 不是一个可行的解决方案。
这是我的代码 sn-p:
object sampleStreamingApp {
def createStreamingContext(checkpointDirectory: String): StreamingContext = {
val conf = new SparkConf().setAppName("sampleStreaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Milliseconds(5000))
ssc.checkpoint(checkpointDirectory)
val smDStream = ssc.textFileStream("/user/hdpuser/data")
val smSplitted = smDStream.map( x => x.split(";") ).map( x => Row.fromSeq( x ) )
smSplitted.foreachRDD { rdd =>
val sqlContext = HiveContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
<other code logic goes here>
}
}
ssc
}
def main(args: Array[String]) {
val checkpointDirectory = "hdfs://localhost:8020/user/dfml/checkpointing/AAA"
val ssc = StreamingContext.getActiveOrCreate(checkpointDirectory, () => createStreamingContext(checkpointDirectory))
ssc.start()
ssc.awaitTermination()
}
}
任何帮助将不胜感激
【问题讨论】:
标签: apache-spark apache-spark-sql spark-streaming spark-dataframe