【问题标题】:Why java.util.concurrent.RejectedExecutionException using Twitter4J to sample tweets, when I restart twitterStream?当我重新启动 twitterStream 时,为什么 java.util.concurrent.RejectedExecutionException 使用 Twitter4J 对推文进行采样?
【发布时间】:2016-02-09 09:32:32
【问题描述】:

在以下 java 应用程序中,我使用 TwitterStream 使用示例函数收集推文。我需要在用户需要时启动和停止流,但出现以下异常:

java.util.concurrent.RejectedExecutionException: Task twitter4j.StatusStreamBase$1@74e75335 rejected from java.util.concurrent.ThreadPoolExecutor@5117b235[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at twitter4j.DispatcherImpl.invokeLater(DispatcherImpl.java:58)
    at twitter4j.StatusStreamBase.handleNextElement(StatusStreamBase.java:80)
    at twitter4j.StatusStreamImpl.next(StatusStreamImpl.java:56)
    at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:568)

当用户按下“Crawl”或“Stop Crawling”时,方法actionPerformed 被正确调用。但是,如果用户按 Crawl,然后按 Stop,然后再次按 Crawl,则会出现上述错误

我有几个课程,但主要的课程如下:

第一个创建接口并与爬虫类通信。

import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JTextArea;

public class StackOv extends JFrame implements ActionListener{

    private JTextArea usersSaved;
    private boolean alreadyCrawling;
    private boolean stopReceived;
    private Stream stream;
    private JButton Crawl;
    private JButton stopCrawl;
    private Mongo m;

    public StackOv(){
        this.stopReceived = false;
        this.alreadyCrawling = false;
        setLayout(new FlowLayout(FlowLayout.CENTER));
        Crawl = new JButton("Crawl");
        Crawl.setActionCommand("Crawl");
        Crawl.addActionListener(this);
        stopCrawl = new JButton("Stop Crawling");
        stopCrawl.setActionCommand("Stop Crawling");
        stopCrawl.addActionListener(this);
        m = new Mongo(); //instance of class that uses MongoDB
       /*
       *
       *bla bla bla create the rest of the interface as you wish
       *add(button)
       *add(button)
       *etc...
       */

    }


    public void setOut(String out){
        usersSaved.setText(out);
    }

    public void setOffAlreadyCrawling(){
        this.alreadyCrawling = false;
    }
    @Override
    public void actionPerformed(ActionEvent e){
        if(e.getActionCommand().equals("Stop Crawling") && !this.stopReceived){
            this.stopReceived = true;
            stream.setStop();
        }
        else if(e.getActionCommand().equals("Crawl") && !alreadyCrawling){
            if(stream != null && stream.isAlive()){
                stream.interrupt();
            }
            alreadyCrawling = true;
            stream = new Stream(m, this);
            //independently of using one of the following two calls, I get the same exception above
            stream.execute1();
            //stream.start();
            this.stopReceived = false;
        }
    }

    public void main(String[] args){
        StackOv so = new StackOv();
        so.setSize(800, 800);
        so.setVisible(true);
    }

}

以下类是爬虫类,当 stopCrawl 为 true 或 twitterStream 采样的推文数量超过最大限制时关闭 twitterStream。

import java.awt.TextArea;
import java.util.ArrayList;
import java.util.List;

import javax.swing.JTextArea;
import javax.swing.JTextField;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

public class Stream extends Thread{

    private Crawler cr;
    private TwitterStream twitterStream;
    private int maxTweets;
    private int usersSaved;
    private Mongo database;
    private CreateIndex ci;
    private TwitterSearch twitterSearch;
    private static boolean stopCrawl;

    public Stream(Mongo database, TwitterSearch twitterSearch){

        Stream.stopCrawl = false;
        this.database = database;
        this.cr = new Crawler(database);
        this.twitterStream = new TwitterStreamFactory(DefaultConfiguration.getConfiguration()).getInstance();
        this.maxTweets = 1000;
        ci = new CreateIndex(database);
        this.twitterSearch = twitterSearch;


    }

    public void setStop(){
        Stream.stopCrawl = true;
    }

    public void execute() throws TwitterException {

        final List<Status> statuses = new ArrayList<Status>();

        StatusListener listener = new StatusListener() {

            public void onStatus(Status status) {
                statuses.add(status);
                System.out.println(statuses.size() + ":" + status.getText());
                int usersIndexed = cr.retrieve(status.getUser());
                usersSaved = database.countDocuments();
                twitterSearch.setOut("usersSaved: "+usersSaved);
                if(usersIndexed > maxTweets || Stream.stopCrawl){
                    //ci.load();
                    ci.load(); //this call creates my index
                    twitterSearch.setOut("INDEX CREATED");
                    System.out.println("shutdown...");
                    twitterSearch.setOffAlreadyCrawling();
                    twitterStream.shutdown();
                }
            }

            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {

            }

            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {

            }

            public void onScrubGeo(long userId, long upToStatusId) {

            }

            public void onException(Exception ex) {
                ex.printStackTrace();
            }
            @Override
            public void onStallWarning(StallWarning arg0) {
                // TODO Auto-generated method stub

            }

        };


        twitterStream.addListener(listener);
        twitterStream.sample("en");
    }

    public void execute1(){
        try{
            this.execute();
        }catch(TwitterException e){
            e.printStackTrace();
        }
    }

    public void run(){
        try {
            this.execute();
        } catch (TwitterException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

【问题讨论】:

    标签: java twitter threadpool twitter4j


    【解决方案1】:

    当你的线程被关闭/关闭时,它会阻止它被“重新启动”,就像其他 java IO 类一样。换句话说,一旦你关闭它,你就不能真正重新启动它。我很确定在 Twitter 代码或您的代码中的某个地方,您的线程正在停止。为了防止这种情况发生,这里有一个可能有效的代码 sn-p:http://pastebin.com/APByKuiY 另外,试试这个堆栈溢出的东西:What could be the cause of RejectedExecutionException

    【讨论】:

    • 请将代码添加到答案中,这里最好有代码而不是外部站点。
    • 我自己以一种我不知道如何解释的奇怪方式解决了这个问题......现在我将面临其他问题。当我有时间时,我会评估你的答案,然后决定是否选择它作为最佳答案。对不起
    猜你喜欢
    • 1970-01-01
    • 2016-04-22
    • 2020-03-08
    • 2017-04-08
    • 1970-01-01
    • 2015-09-05
    • 1970-01-01
    • 2015-05-21
    • 1970-01-01
    相关资源
    最近更新 更多