今天,我将把第二部分带到我以前关于Java EE 7批处理和《魔兽世界–第1部分》的帖子中。 在本文中,我们将了解如何从第1部分中获得的数据中汇总和提取指标。
概括
批处理目的是下载魔兽世界拍卖行的数据,处理拍卖并提取指标。 这些指标将建立拍卖项目价格随时间变化的历史记录。 在第1部分中 ,我们已经下载了数据并将其插入数据库。
应用程序
处理作业
在将原始数据添加到数据库之后,我们将添加一个带有Chunk样式处理的步骤。 在块中,我们将读取聚合的数据,然后将其插入数据库中的另一个表中以便于访问。 这是在process-job.xml :
process-job.xml
<step id="importStatistics">
<chunk item-count="100">
<reader ref="processedAuctionsReader"/>
<processor ref="processedAuctionsProcessor"/>
<writer ref="processedAuctionsWriter"/>
</chunk>
</step>
块一次读取一个数据,并在事务内创建要写出的块。 从ItemReader读入一项,交给ItemProcessor并进行聚合。 一旦读取的项目数等于提交间隔,就通过ItemWriter写入整个块,然后提交事务。
ProcessedAuctionsReader
在读者中,我们将使用数据库功能选择和汇总指标。
ProcessedAuctionsReader.java
@Named
public class ProcessedAuctionsReader extends AbstractAuctionFileProcess implements ItemReader {
@Resource(name = "java:comp/DefaultDataSource")
protected DataSource dataSource;
private PreparedStatement preparedStatement;
private ResultSet resultSet;
@Override
public void open(Serializable checkpoint) throws Exception {
Connection connection = dataSource.getConnection();
preparedStatement = connection.prepareStatement(
"SELECT" +
" itemid as itemId," +
" sum(quantity)," +
" sum(bid)," +
" sum(buyout)," +
" min(bid / quantity)," +
" min(buyout / quantity)," +
" max(bid / quantity)," +
" max(buyout / quantity)" +
" FROM auction" +
" WHERE auctionfile_id = " +
getContext().getFileToProcess().getId() +
" GROUP BY itemid" +
" ORDER BY 1",
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT
);
// Weird bug here. Check https://java.net/bugzilla/show_bug.cgi?id=5315
//preparedStatement.setLong(1, getContext().getFileToProcess().getId());
resultSet = preparedStatement.executeQuery();
}
@Override
public void close() throws Exception {
DbUtils.closeQuietly(resultSet);
DbUtils.closeQuietly(preparedStatement);
}
@Override
public Object readItem() throws Exception {
return resultSet.next() ? resultSet : null;
}
@Override
public Serializable checkpointInfo() throws Exception {
return null;
}
在此示例中,我们通过使用具有简单可滚动结果集的纯JDBC获得最佳性能结果。 这样,仅执行一个查询,并根据需要在readItem中提取结果。 您可能想探索其他替代方法。
Plain JPA在标准中没有可滚动的结果集,因此您需要对结果进行分页。 这将导致多个查询,这将减慢阅读速度。 另一个选择是使用新的Java 8 Streams API来执行聚合操作。 这些操作很快,但是您需要从数据库中选择整个数据集到流中。 最终,这会削弱您的性能。
我确实尝试了这两种方法,并通过使用数据库聚合功能获得了最佳结果。 我并不是说这始终是最好的选择,但是在这种情况下,这是最好的选择。
在实施过程中,我还发现了Batch中的错误。 您可以在这里检查。 在PreparedStatement中设置参数时会引发异常。 解决方法是将参数直接注入查询SQL中。 丑陋,我知道...
ProcessedAuctionsProcessor
在处理器中,让我们将所有聚合值存储在一个holder对象中,以存储在数据库中。
ProcessedAuctionsProcessor.java
@Named
public class ProcessedAuctionsProcessor extends AbstractAuctionFileProcess implements ItemProcessor {
@Override
@SuppressWarnings("unchecked")
public Object processItem(Object item) throws Exception {
ResultSet resultSet = (ResultSet) item;
AuctionItemStatistics auctionItemStatistics = new AuctionItemStatistics();
auctionItemStatistics.setItemId(resultSet.getInt(1));
auctionItemStatistics.setQuantity(resultSet.getLong(2));
auctionItemStatistics.setBid(resultSet.getLong(3));
auctionItemStatistics.setBuyout(resultSet.getLong(4));
auctionItemStatistics.setMinBid(resultSet.getLong(5));
auctionItemStatistics.setMinBuyout(resultSet.getLong(6));
auctionItemStatistics.setMaxBid(resultSet.getLong(7));
auctionItemStatistics.setMaxBuyout(resultSet.getLong(8));
auctionItemStatistics.setTimestamp(getContext().getFileToProcess().getLastModified());
auctionItemStatistics.setAvgBid(
(double) (auctionItemStatistics.getBid() / auctionItemStatistics.getQuantity()));
auctionItemStatistics.setAvgBuyout(
(double) (auctionItemStatistics.getBuyout() / auctionItemStatistics.getQuantity()));
auctionItemStatistics.setRealm(getContext().getRealm());
return auctionItemStatistics;
}
}
由于指标会及时记录数据的准确快照,因此计算仅需执行一次。 这就是为什么我们要保存汇总指标。 它们永远不会改变,我们可以轻松地检查历史。
如果您知道源数据是不可变的,并且需要对其进行操作,那么建议您将结果保留在某处。 这样可以节省您的时间。 当然,如果将来要多次访问此数据,则需要平衡。 如果不是这样,也许您就不需要经历持久化数据的麻烦了。
ProcessedAuctionsWriter
最后,我们只需要将数据写到数据库中即可:
ProcessedAuctionsWriter.java
@Named
public class ProcessedAuctionsWriter extends AbstractItemWriter {
@PersistenceContext
protected EntityManager em;
@Override
@SuppressWarnings("unchecked")
public void writeItems(List items) throws Exception {
List<AuctionItemStatistics> statistis = (List<AuctionItemStatistics>) items;
statistis.forEach(em::persist);
}
}
指标
现在,为了对数据做一些有用的事情,我们将公开一个REST端点,以对所计算的指标执行查询。 方法如下:
WowBusinessBean.java
@Override @GET
@Path("items")
public List<AuctionItemStatistics> findAuctionItemStatisticsByRealmAndItem(@QueryParam("realmId") Long realmId,
@QueryParam("itemId") Integer itemId) {
Realm realm = (Realm) em.createNamedQuery("Realm.findRealmsWithConnectionsById")
.setParameter("id", realmId)
.getSingleResult();
// Workaround for https://bugs.eclipse.org/bugs/show_bug.cgi?id=433075 if using EclipseLink
List<Realm> connectedRealms = new ArrayList<>();
connectedRealms.addAll(realm.getConnectedRealms());
List<Long> ids = connectedRealms.stream().map(Realm::getId).collect(Collectors.toList());
ids.add(realmId);
return em.createNamedQuery("AuctionItemStatistics.findByRealmsAndItem")
.setParameter("realmIds", ids)
.setParameter("itemId", itemId)
.getResultList();
}
如果您还记得第1部分中的一些细节,那么魔兽世界服务器称为Realms 。 这些领域可以相互链接并共享同一拍卖行 。 为此,我们还拥有有关领域之间如何相互联系的信息。 这很重要,因为我们可以在所有连接的领域中搜索拍卖品 。 其余的逻辑只是简单的查询以获取数据。
在开发过程中,我还发现了Eclipse Link (如果在Glassfish中运行)和Java 8的错误。显然, Eclipse Link返回的基础Collection的元素计数设置为0。如果您使用Streams,则此方法效果不佳尝试内联查询调用以及Stream操作。 流将认为它为空,并且不会返回任何结果。 您可以在这里阅读更多有关此的内容。
接口
我还使用Angular和Google Charts开发了一个小界面来显示指标。 看一看:
在这里,我在寻找一个名为“Aggra(葡萄牙语)”的境界与拍卖项目编号72092对应于鬼铁矿石 。 如您所见,我们可以检查待售数量,出价和买断值以及价格随时间的波动。 整齐? 我可能会写另一篇关于将来构建Web Interface的文章。
资源资源
您可以从我的github存储库中克隆完整的工作副本,然后将其部署到Wildfly或Glassfish中 。 您可以在那里找到部署说明: 魔兽世界拍卖
也请检查Java EE示例项目,其中包含大量完整的批处理示例。
翻译自: https://www.javacodegeeks.com/2015/01/java-ee-7-batch-processing-and-world-of-warcraft-part-2.html