【问题标题】:How can i run spark-streaming wthout interruptions如何在不中断的情况下运行火花流
【发布时间】:2019-12-14 11:12:46
【问题描述】:

我正在尝试在 twitter-streaming 的帮助下保存来自 twitter 的推文。但我有一个问题:我的程序在一段时间后停止工作(取决于 1 毫秒的批处理间隔,接近 4-5 秒)。所以,你能帮我解决这个问题吗)。请告诉我有什么问题吗?

当批处理间隔接近 100 毫秒时,我会看到一些记录,例如

19/08/06 23:45:26 INFO BlockRDD: Removing RDD 103 from persistence list
19/08/06 23:45:26 INFO BlockManager: Removing RDD 103
19/08/06 23:45:26 INFO TwitterInputDStream: Removing blocks of RDD BlockRDD[103] at createStream at Twitter.java:35 of time 1565124324340 ms
19/08/06 23:45:26 INFO ReceivedBlockTracker: Deleting batches: 1565124324320 ms
19/08/06 23:45:26 INFO InputInfoTracker: remove old batch metadata: 1565124324320 ms
-------------------------------------------
Time: 1565124325500 ms

当批处理间隔“大”并且没有任何数据可用时,我只会看到有关 Spark UI 启动和完成的消息。

package TwitterAnalysis;

import org.apache.spark.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.twitter.*;

import twitter4j.Status;



public class Twitter {

    private static void setTwitterOAuth() {
        System.setProperty("twitter4j.oauth.consumerKey", TwitterOAuthKey.consumerKey);
        System.setProperty("twitter4j.oauth.consumerSecret", TwitterOAuthKey.consumerSecret);
        System.setProperty("twitter4j.oauth.accessToken", TwitterOAuthKey.accessToken);
        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterOAuthKey.accessTokenSecret);
    }



    public static void main(String [] args) {

        setTwitterOAuth();

        SparkConf conf = new SparkConf().setMaster("local[*]")
                                         .setAppName("SparkTwitter");

      //  JavaSparkContext sparkContext = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));


        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);

        //Stream that contains just tweets in english
        JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
        enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());


        enTweetsDStream.print();
        jssc.start();


    }

}

【问题讨论】:

    标签: java apache-spark twitter spark-streaming


    【解决方案1】:

    根据这个答案:Spark 2.0.0 twitter streaming driver is no longer available 在 spark 2.0 及更高版本中没有可用的 twitter-streaming-driver。解决方案选择早期版本的 Spark)

    【讨论】:

      猜你喜欢
      • 2017-08-04
      • 1970-01-01
      • 2021-05-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-15
      • 2020-11-25
      相关资源
      最近更新 更多