【问题标题】:Error in twitter streaming推特流媒体错误
【发布时间】:2015-10-20 00:22:39
【问题描述】:

我正在使用火花流编写一个 twitter 连接器。
我正面临以下异常

ERROR ReceiverTracker:取消注册流 0 的接收器:重新启动 延迟 2000 毫秒的接收器:启动 Twitter 流时出错 - java.lang.NullPointerException 在 org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:89) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:159) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:152) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:152) 在 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 在 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 在 scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

下面是相关代码sn-p。

val config = new twitter4j.conf.ConfigurationBuilder()
    .setOAuthConsumerKey("*********************")
               .setOAuthConsumerSecret("**********************************************")
    .setOAuthAccessToken("****************************************************")
    .setOAuthAccessTokenSecret("**********************************************************")
    .build

val twitter_auth = new TwitterFactory(config)
val a = new twitter4j.auth.OAuthAuthorization(config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())

val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// ssc.checkpoint("D:/test")
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)

val hashTags = stream.map(status => status.getUser().getName())
hashTags.foreachRDD(rdd => {
  rdd.foreach(println)
})

ssc.start()
ssc.awaitTermination()

谁能帮我解决这个问题?
谢谢:)

【问题讨论】:

    标签: twitter apache-spark streaming twitter4j spark-streaming


    【解决方案1】:

    转到抛出异常的那一行,我们可以看到:

    if (filters.size > 0) {

    要使该行抛出 NPE,filters 必须为 null,这正是 TwitterStream 实例化时发生的情况:

    val 流 = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)

    作为filter 一个序列,使用Seq() 而不是null 对其进行初始化。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-16
      • 2014-12-12
      • 1970-01-01
      • 2014-04-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多