【问题标题】:Java Fork Join Pool getting stuckJava Fork Join Pool 卡住了
【发布时间】:2014-02-01 02:44:38
【问题描述】:

我有一个数据迁移服务,它从一个数据库中分块读取数据,然后将其迁移到另一个数据库中。

为了处理数据的“块”——我正在尝试使用 RecursiveAction 和分叉连接池。这样做的原因是我想并行执行这些“块”上的工作,然后获取另一个块,然后执行,然后获取另一个块,等等。

发生的事情是我的进程刚刚停止。我在日志中没有看到异常,也没有看到死锁线程。我的代码如下,我的问题是:

  • 我的 Worker 中是否缺少某些内容?我是否需要返回一些值或调用一些方法来释放资源?
  • 我在这里使用 RecursiveAction 真的很愚蠢吗?我应该使用不同的方法来并行处理大量工作吗?
  • 我的线程转储中有一个工作线程似乎只是在等待 - 这是正常现象还是我的问题的某种迹象?

ForkJoinPool-1-worker-18 id=12191 state=WAITING - 等待 (一个 java.util.concurrent.ForkJoinPool) - 在 sun.misc.Unsafe.park(Native Method) 处锁定 (a java.util.concurrent.ForkJoinPool) java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) 在 java.util.concurrent.ForkJoinPool.tryAwaitWork(ForkJoinPool.java:864) 在 java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:647) 在 java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)

代码:

@Component
public class BulkMigrationService {

final ForkJoinPool pool = new ForkJoinPool();

private static final Logger log = LoggerFactory.getLogger(BulkMigrationService.class);

private SourceDataApi api;
private final Migrator migrator;
private MetadataService metadataService;

@Autowired
public BulkMigrationService(SourceDataApi api, Migrator migrator, MetadataService metadataService) {
    this.api = api;
    this.migrator = migrator;
    this.metadataService = metadataService;
}

public void migrate(Integer batchSize, Long max) throws MigrationException {

    Long currentCount = 0l;
    Integer currentIndex = 0;

    while (currentCount < max) {
        List<String> itemsToMigrate = api.findItemRange(currentIndex, currentIndex + batchSize);

        if (assetsToMigrate.size() > 0) {
            MigrateForkedWorker starter = new MigrateForkedWorker(assetsToMigrate);
            pool.invoke(starter);
        }

        currentCount += assetsToMigrate.size();
        currentIndex += batchSize - 1;
        if (log.isDebugEnabled()) {
            log.debug("Migrated " + currentCount + " Items.");
        }
    }

}

public class MigrateForkedWorker extends RecursiveAction {

    private int max = 10;
    private List<String> allItems;

    public MigrateForkedWorker(List<String> allItems) {
        this.allItems = allItems;

    }

    @Override
    protected void compute() {
        if (allItems.size() <= max) {
            for (String itemInfo : allItems) {

                try {
                    migrator.migrateAsset(itemInfo);

                }
                catch (MigrationException e) {
                    e.printStackTrace();
                }
            }
        }
        else {
            int targetSize = allItems.size() % 2 == 0 ? allItems.size() / 2 : (allItems.size() + 1) / 2;
            List<List<String>> splits = Lists.partition(allItems, targetSize);
            MigrateForkedWorker migrateForkedWorkerOne = new MigrateForkedWorker(splits.get(0));
            MigrateForkedWorker migrateForkedWorkerTwo = new MigrateForkedWorker(splits.get(1));

            invokeAll(migrateForkedWorkerOne, migrateForkedWorkerTwo);
        }
    }
}
 }

【问题讨论】:

  • 我想知道我是否看到这样的东西:stackoverflow.com/questions/16894929/… 我可能只是不太了解这些东西,无法使用它。最近在互联网上阅读:开发人员遇到了问题,尽管“我知道,我会使用线程”。现在他有10个问题

标签: java multithreading concurrency


【解决方案1】:

您的第一个问题是您正在使用invokeAll()。这只是向池提交新请求并等待它们完成。按照 JavaDoc 中的示例:使用 fork() migrateForkedWorkerOne.fork(); migrateForkedWorkerTwo.compute(); migrateForkedWorkerOne.join();

【讨论】:

    猜你喜欢
    • 2021-09-26
    • 1970-01-01
    • 2017-10-01
    • 2012-12-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多