【发布时间】:2018-01-17 04:40:51
【问题描述】:
作为 Hazelcast Jet 的新手,我试图构建一个设置,其中来自无限源(即用户请求的 Map Journal)的单个项目针对(可能变化的和)巨大的参考项目 Map 进行 MapReduced。
具体来说,对于这个例子,我想确定向量映射(references)中最小欧几里德距离的向量的 ID(读取:float[]),给定一个使用过的 -定义的输入向量(查询)。
如果在单台机器上简单地实现,这将遍历引用的 Map 项并确定每个项到查询的欧几里得距离,同时保持 k 最小匹配,其中输入来自用户请求(HTTP POST、按钮单击等),计算完成后结果集立即可用。
我最近的做法是:
- 在地图日志上收听请求
-
.distributed().broadcast()对映射作业的请求 - 让映射作业获取参考向量的
.localKeySet() - 发出 k 最小向量的 ID(按欧几里得距离)
- 通过
.partitioned(item -> item.requestId)分区在单个节点上减少/收集结果 - 将结果存储到客户端具有关键侦听器的地图中。
从概念上讲,这里的每个查询都是一个大小为1 的批次,而我实际上是在批量处理来的时候。但是,我很难让映射器和化简器知道批处理何时完成,以便收集器知道它们何时完成(以便他们可以发出最终结果)。
我尝试使用带有真实和虚假时间戳的水印(通过AtomicLong 实例自动获得)并从tryProcessWm 函数发出,但这似乎是一个非常脆弱的解决方案,因为一些事件被丢弃了。我还需要确保没有两个请求是交错的(即对请求 ID 使用分区),但同时让映射器在所有节点上运行...
我将如何完成这项任务?
编辑#1:
现在,我的映射器看起来像这样:
private static class EuclideanDistanceMapP extends AbstractProcessor {
private IMap<Long, float[]> referenceVectors;
final ScoreComparator comparator = new ScoreComparator();
@Override
protected void init(@Nonnull Context context) throws Exception {
this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
super.init(context);
}
@Override
protected boolean tryProcess0(@Nonnull Object item) {
final Tuple3<Long, Long, float[]> query = (Tuple3<Long, Long, float[]>)item;
final long requestId = query.f0();
final long timestamp = query.f1();
final float[] queryVector = query.f2();
final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
for (Long vectorKey : referenceVectors.localKeySet()) {
float[] referenceVector = referenceVectors.get(vectorKey);
float distance = 0.0f;
for (int i = 0; i < queryVector.length; ++i) {
distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
}
final Tuple2<Long, Float> score = Tuple2.tuple2(vectorKey, (float) Math.sqrt(distance));
if (buffer.size() < MAX_RESULTS) {
buffer.add(score);
continue;
}
// If the value is larger than the largest entry, discard it.
if (comparator.compare(score, buffer.last()) >= 0) {
continue;
}
// Otherwise we remove the largest entry after adding the new one.
buffer.add(score);
buffer.pollLast();
}
return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray()));
}
private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
@Override
public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
return Float.compare(a.f1(), b.f1());
}
}
}
reducer 本质上是在复制它(当然,减去向量计算)。
编辑#2:
这是 DAG 设置。当前,当有多个并发请求时,它会失败。由于水印,大部分项目被丢弃。
DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
SourceProcessors.<Long, float[], Tuple2<Long, float[]>>streamMapP(QUERY_VECTOR_MAP_NAME,
e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
e -> Tuple2.tuple2(e.getKey(), e.getNewValue()),true));
// simple map() using an AtomicLong to create the timestamp
Vertex addTimestamps = dag.newVertex("addTimestamps", AddTimestampMapP::new);
// the class shown above.
Vertex map = dag.newVertex("map", EuclideanDistanceMapP::new);
Vertex insertWatermarks = dag.newVertex("insertWatermarks",
insertWatermarksP((Tuple3<Long, Long, float[]> t) -> t.f1(), withFixedLag(0), emitByMinStep(1)));
Vertex combine = dag.newVertex("combine", CombineP::new);
// simple map() that drops the timestamp
Vertex removeTimestamps = dag.newVertex("removeTimestamps", RemoveTimestampMapP::new);
// Using a list here for testing.
Vertex sink = dag.newVertex("sink", SinkProcessors.writeListP(SINK_NAME));
dag.edge(between(sourceStream, addTimestamps))
.edge(between(addTimestamps, map.localParallelism(1))
.broadcast()
.distributed())
.edge(between(map, insertWatermarks).isolated())
.edge(between(insertWatermarks, combine.localParallelism(1))
.distributed()
.partitioned((Tuple2<Long, Tuple2<Long, Float>[]> item) -> item.f0()))
.edge(between(combine, removeTimestamps)
.partitioned((Tuple3<Long, Long, Tuple2<Long, Float>[]> item) -> item.f0()))
.edge(between(removeTimestamps, sink.localParallelism(1)));
编辑#3:
这是我当前的组合器实现。我假设所有物品都会根据水印订购;或者一般来说,相同的组合器实例只会收集相同请求的项目。但这似乎不是真的……
private static class CombineP extends AbstractProcessor {
private final ScoreComparator comparator = new ScoreComparator();
private final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
private Long requestId;
private Long timestamp = -1L;
@Override
protected boolean tryProcess0(@Nonnull Object item) {
final Tuple3<Long, Long, Tuple2<Long, Float>[]> itemTuple = (Tuple3<Long, Long, Tuple2<Long, Float>[]>)item;
requestId = itemTuple.f0();
final long currentTimestamp = itemTuple.f1();
if (currentTimestamp > timestamp) {
buffer.clear();
}
timestamp = currentTimestamp;
final Object[] scores = itemTuple.f2();
for (Object scoreObj : scores) {
final Tuple2<Long, Float> score = (Tuple2<Long, Float>)scoreObj;
if (buffer.size() < MAX_RESULTS) {
buffer.add(score);
continue;
}
// If the value is larger than the largest entry, discard it.
if (comparator.compare(score, buffer.last()) >= 0) {
continue;
}
// Otherwise we remove the largest entry after adding the new one.
buffer.add(score);
buffer.pollLast();
}
return true;
}
@Override
protected boolean tryProcessWm(int ordinal, @Nonnull Watermark wm) {
// return super.tryProcessWm(ordinal, wm);
return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray())) && super.tryProcessWm(ordinal, wm);
}
private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
@Override
public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
return Float.compare(a.f1(), b.f1());
}
}
}
【问题讨论】:
-
当您说“映射作业”时,您的意思是您对
Processor的自定义实现? -
是的。我添加了代码以进行澄清。
-
为了保持最佳结果,使用
PriorityQueue更有效(它使用内部的最小/最大堆)。此外,每次从tryEmit获得false时,您都会处理大量工作 -
你所有的处理器都应该有一个本地并行度。特别是,我认为由于
insertWatermarksP的并行化,您正在丢弃项目,这会在流中引入偏斜。 -
是的,你不能指望分布式边缘背后的如此严格的顺序。这些物品是分批发送的。当您实施像
Processor这样的低级基础设施时,您不会受到处理管道中发生的任何混乱的影响。因此,您应该保留从时间戳到组合数据的映射,当您观察到水印 N 时,您可以发出(并从映射中删除)时间戳小于 N 的所有项目。