【问题标题】:ThreadPoolExecutor : Get a specific Runnable that is being executedThreadPoolExecutor :获取正在执行的特定 Runnable
【发布时间】:2017-06-23 14:05:42
【问题描述】:

我正在使用ThreadPoolExecutor 在后台执行多个长时间运行的任务,ThreadPoolExecutor 的池大小为 4,因此当添加超过 4 个任务时,它们被推送到队列中,并且当 4 个任务之一完成时任务从队列中弹出执行。

我想知道有什么方法可以访问当前正在执行且不在队列中的Runnable 对象,即前 4 个任务。

目标:我想在任何给定点获取任务的当前状态,在mThreadPoolExecutor.getQueue() 的帮助下我正在访问正在排队并准备执行的任务,请建议我访问当前正在执行的任务的方法执行,以便我可以在需要时附加和删除侦听器/处理程序。

我的 Runnable 类:

public class VideoFileUploadRunner implements Runnable {

    private final VideoFileSync mVideoFileSync;
    private final DataService dataService;

    private Handler handler;

    public VideoFileUploadRunner(VideoFileSync videoFileSync, DataService dataService) {
        this.mVideoFileSync = videoFileSync;
        this.dataService = dataService;

    }

    public int getPK()
    {
        return  mVideoFileSync.get_idPrimaryKey();
    }

    public void setHandler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public void run() {
        try {

            if (mVideoFileSync.get_idPrimaryKey() < 0) {
                addEntryToDataBase();
            }
            updateStatus(VideoUploadStatus.IN_PROGRESS);
            FileUploader uploader = new FileUploader();
            updateStatus(uploader.uploadFile(mVideoFileSync.getVideoFile()));



        } catch (Exception e) {
            updateStatus(VideoUploadStatus.FAILED);
            e.printStackTrace();
        }
    }

    private void addEntryToDataBase() {
        int pk = dataService.saveVideoRecordForSync(mVideoFileSync);
        mVideoFileSync.set_idPrimaryKey(pk);
    }

    private void updateStatus(VideoUploadStatus status) {
        if (handler != null) {
            Message msg = new Message();
            Bundle b = new Bundle();
            b.putString(AppConstants.Sync_Status, status.toString());
            msg.setData(b);
            handler.sendMessage(msg);
        }
        dataService.updateUploadStatus(mVideoFileSync.get_idPrimaryKey(), status.toString());


    }
} 

在任务进度列表视图中:

public void setData(VideoFileSync fileSync) {
        tvIso.setText(fileSync.getVideoFile().getISO_LOOP_EQUP());
        tvUnit.setText(fileSync.getVideoFile().getUnit());
        tvName.setText(fileSync.getVideoFile().getLocalPath());
        tvStatus.setText(fileSync.getCurentStatus().toString());
        addHandleForUpdate(fileSync);
    }

    private void addHandleForUpdate(VideoFileSync fileSync) {

        Handler.Callback callBack = new Handler.Callback() {
            @Override
            public boolean handleMessage(Message msg) {
                if(msg.getData()!=null)
                {
                    tvStatus.setText(msg.getData().getString(AppConstants.Sync_Status));

                }
                return false;
            }
        };
        mHadler = new Handler(Looper.getMainLooper(),callBack);

        VideoFileUploadRunner runner = VideoUploadManager.getInstance().getRunnerForSyncFile(fileSync);
        if(runner!=null)
        runner.setHandler(mHadler);
    }

在 VideoUploadManager 中,我有以下方法返回 Runnable 对象,在这里我需要帮助,以便我可以返回当前正在执行的任务。

public synchronized VideoFileUploadRunner getRunnerForSyncFile(VideoFileSync fileSync) {
        Iterator<Runnable> itr = mThreadPoolExecutor.getQueue().iterator();
        while (itr.hasNext()) {
            VideoFileUploadRunner runner = (VideoFileUploadRunner) itr.next();
            if (runner.getPK() == fileSync.get_idPrimaryKey()) {
                return runner;
            }
        }
        return null;

    } 

【问题讨论】:

  • 这样我就可以在需要时在其上附加和删除侦听器/处理程序。你能详细说明你的意思吗?
  • 我正在开发移动应用程序,其中我有一个屏幕来显示任务的当前状态,用户可以关闭应用程序并返回检查状态,所以当用户在屏幕上时,我想将处理程序附加到可运行对象
  • 而不是从 runnable 的外部尝试通过 executor 找到 runnable 并将侦听器附加到它。从 runnables run 方法内部,绑定到外部侦听器并在结束时取消绑定方法。因此,您可以在 runnable 中发布您的事件、订阅和取消订阅,并且只有当前处于活动状态的 runnable 才会发布它们的更新。
  • 使用FutureTask 可能会有所帮助。看看stackoverflow.com/questions/30789402/…
  • @DCoder 查看我的回答。您可以使用 ThreadFactory 为您保存 Runnable 引用。请留下评论以进一步澄清。

标签: java android multithreading runnable threadpoolexecutor


【解决方案1】:

这个答案与我上面的评论有关。

与其尝试通过执行器找到可运行对象并为其附加侦听器,不如在创建时将侦听器绑定到可运行对象,并将事件从可运行对象的执行代码发布到侦听器。

只有当前活动的可运行文件会发布他们的更新。

这是一个例子。

为你的监听器创建一个接口来实现。您的监听器可以是线程池执行器、私有内部类等。

/** 
 * Callback interface to notify when a video upload's state changes 
 */
interface IVideoUploadListener {

    /**
     * Called when a video upload's state changes

     * @param pUploadId The ID of the video upload
     * @param pStatus The new status of the upload
     */
    void onStatusChanged(int pUploadId, VideoUploadStatus pStatus);
}

为您的状态类型创建一个枚举(例如)

/**
 * Enum to hold different video upload states
 */
enum VideoUploadStatus {
    IN_PROGRESS,
    ADDED_TO_DB,
    FILE_UPLOADED,
    FINISHED,
    FAILED
}

在每个 Runnable 中保存监听器的引用。

public class VideoFileUploadRunner implements Runnable {

    private final IVideoUploadListener mUploadListener;
    private final VideoFileSync mVideoFileSync;
    private final DataService   mDataService;
    private Handler mHandler;

    // etc...
}

通过构造函数传递接口的实例

public VideoFileUploadRunner(IVideoUploadListener pUploadListener, VideoFileSync pVideoFileSync, DataService pDataService) {
    mUploadListener = pUploadListener;
    mVideoFileSync  = pVideoFileSync;
    mDataService    = pDataService;
}

在 run 方法中,根据需要向侦听器发布更新。

@Override
public void run() {
    mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.IN_PROGRESS);
    try {
        if (mVideoFileSync.get_idPrimaryKey() < 0) {
            addEntryToDataBase();
            mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.ADDED_TO_DB);
        }
        FileUploader uploader = new FileUploader();
        uploader.uploadFile(mVideoFileSync.getVideoFile());
        mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FILE_UPLOADED);

        // Other logic here...

        mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FINISHED);
    }

    catch (Exception e) {
        mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FAILED);
        e.printStackTrace();
    }
}

您的 onStatusChanged() 方法的侦听器实现应该是同步的。这将有助于避免竞争条件导致的错误结果。

private IVideoUploadListener mUploadListener = new IVideoUploadListener() {
    @Override
    public synchronized void onStatusChanged(int pUploadId, VideoUploadStatus pStatus) {
        Log.i("ListenerTag", "Video file with ID " + pUploadId + " has the status " + pStatus.toString());
    }
};

【讨论】:

    【解决方案2】:

    我的答案集中在这个问题上:“如何知道正在执行哪些可运行”。

    这种方法保持活动 Runnables 的并发集:

    private final Set<VideoFileUploadRunner> active = Collections.newSetFromMap(new ConcurrentHashMap<>());
    

    提交给 ThreadPoolExecutor 的 Runnable 应该用更新这个集合的 Runnable 装饰:

    class DecoratedRunnable implements Runnable {
    
        final VideoFileUploadRunner runnable;
    
        public DecoratedRunnable(VideoFileUploadRunner runnable) {
            this.runnable = runnable;
        }
    
        @Override
        public void run() {
            active.add(runnable); // add to set
            try {
                runnable.run();
            } finally {
                active.remove(runnable); // finally remove from set (even when something goes wrong)
            }
        }
    }
    

    所以我们可以在提交之前装饰VideoFileUploadRunner 实例:

    executorService.submit(new DecoratedRunnable(videoFileUploadRunner));
    

    getRunnerForSyncFile 方法将简单地像这样实现:

    public VideoFileUploadRunner getRunnerForSyncFile(VideoFileSync fileSync) {
        return active.stream()
                .filter(videoFileUploadRunner -> videoFileUploadRunner.getPK() == fileSync.get_idPrimaryKey())
                .findAny()
                .orElse(null);
    }
    

    备注:作为@Charlie cmets,这不是将侦听器附加到 Runnable 的最佳方式。您可以请求从 VideoFileUploadRunnerrun() 方法内部设置消息处理程序,或使用 MessageHandler 集初始化此类实例,或使用此装饰方法将其排除在 VideoFileUploadRunner 类之外。

    【讨论】:

    • 这类似于this。应该是重复的。
    • @CKing 链接到了吗?
    • 我的意思是我链接到了。
    • 那个答案没有使用 try.. 最后虽然:p
    【解决方案3】:

    最好的方法是公开一个同步变量,保存当前执行任务的信息。

    public MyTask implements Runnable {
        private String id;
        private Map<String, MyTask> mapTasks;
    
        public MyTask(String id, Map<String, MyTask> mapTasks) {
            this.id = id;
            this.mapTasks = mapTasks;
        }
    
        public void run() {
             synchronized(mapTasks) {
                 mapTasks.put(id, this);
             }
    
             ...
    
             synchronized(mapTasks) {
                 mapTasks.remove(id);
             }
        }
    }
    
    
    // Create a map of tasks
    Map<String, MyTask> mapTasks = new HashMap<String, MyTask>();
    
    // How to create tasks
    MyTask myTask1 = new MyTask("task1", mapTasks);
    MyTask myTask2 = new MyTask("task2", mapTasks);
    
    executorService.execute(myTask1);
    executorService.execute(myTask2);
    
    ....
    

    并打印当前正在执行的任务列表:

    public void printCurrentExecutingTasks(Map<String, MyTask> tasks) {
        for (String id: tasks.keySet()) {
            System.out.println("Executing task with id: " + id);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2015-05-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-27
      • 1970-01-01
      • 2021-07-23
      • 2011-07-24
      • 1970-01-01
      相关资源
      最近更新 更多