【发布时间】:2014-02-01 02:44:38
【问题描述】:
我有一个数据迁移服务,它从一个数据库中分块读取数据,然后将其迁移到另一个数据库中。
为了处理数据的“块”——我正在尝试使用 RecursiveAction 和分叉连接池。这样做的原因是我想并行执行这些“块”上的工作,然后获取另一个块,然后执行,然后获取另一个块,等等。
发生的事情是我的进程刚刚停止。我在日志中没有看到异常,也没有看到死锁线程。我的代码如下,我的问题是:
- 我的 Worker 中是否缺少某些内容?我是否需要返回一些值或调用一些方法来释放资源?
- 我在这里使用 RecursiveAction 真的很愚蠢吗?我应该使用不同的方法来并行处理大量工作吗?
- 我的线程转储中有一个工作线程似乎只是在等待 - 这是正常现象还是我的问题的某种迹象?
ForkJoinPool-1-worker-18 id=12191 state=WAITING - 等待 (一个 java.util.concurrent.ForkJoinPool) - 在 sun.misc.Unsafe.park(Native Method) 处锁定 (a java.util.concurrent.ForkJoinPool) java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) 在 java.util.concurrent.ForkJoinPool.tryAwaitWork(ForkJoinPool.java:864) 在 java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:647) 在 java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
代码:
@Component
public class BulkMigrationService {
final ForkJoinPool pool = new ForkJoinPool();
private static final Logger log = LoggerFactory.getLogger(BulkMigrationService.class);
private SourceDataApi api;
private final Migrator migrator;
private MetadataService metadataService;
@Autowired
public BulkMigrationService(SourceDataApi api, Migrator migrator, MetadataService metadataService) {
this.api = api;
this.migrator = migrator;
this.metadataService = metadataService;
}
public void migrate(Integer batchSize, Long max) throws MigrationException {
Long currentCount = 0l;
Integer currentIndex = 0;
while (currentCount < max) {
List<String> itemsToMigrate = api.findItemRange(currentIndex, currentIndex + batchSize);
if (assetsToMigrate.size() > 0) {
MigrateForkedWorker starter = new MigrateForkedWorker(assetsToMigrate);
pool.invoke(starter);
}
currentCount += assetsToMigrate.size();
currentIndex += batchSize - 1;
if (log.isDebugEnabled()) {
log.debug("Migrated " + currentCount + " Items.");
}
}
}
public class MigrateForkedWorker extends RecursiveAction {
private int max = 10;
private List<String> allItems;
public MigrateForkedWorker(List<String> allItems) {
this.allItems = allItems;
}
@Override
protected void compute() {
if (allItems.size() <= max) {
for (String itemInfo : allItems) {
try {
migrator.migrateAsset(itemInfo);
}
catch (MigrationException e) {
e.printStackTrace();
}
}
}
else {
int targetSize = allItems.size() % 2 == 0 ? allItems.size() / 2 : (allItems.size() + 1) / 2;
List<List<String>> splits = Lists.partition(allItems, targetSize);
MigrateForkedWorker migrateForkedWorkerOne = new MigrateForkedWorker(splits.get(0));
MigrateForkedWorker migrateForkedWorkerTwo = new MigrateForkedWorker(splits.get(1));
invokeAll(migrateForkedWorkerOne, migrateForkedWorkerTwo);
}
}
}
}
【问题讨论】:
-
我想知道我是否看到这样的东西:stackoverflow.com/questions/16894929/… 我可能只是不太了解这些东西,无法使用它。最近在互联网上阅读:开发人员遇到了问题,尽管“我知道,我会使用线程”。现在他有10个问题
标签: java multithreading concurrency