【问题标题】:MapReduce single item from streaming source against finite items from map in Hazelcast JetMapReduce 来自流源的单个项目与 Hazelcast Jet 中地图的有限项目
【发布时间】:2018-01-17 04:40:51
【问题描述】:

作为 Hazelcast Jet 的新手,我试图构建一个设置,其中来自无限源(即用户请求的 Map Journal)的单个项目针对(可能变化的和)巨大的参考项目 Map 进行 MapReduced。

具体来说,对于这个例子,我想确定向量映射(references)中最小欧几里德距离的向量的 ID(读取:float[]),给定一个使用过的 -定义的输入向量(查询)。

如果在单台机器上简单地实现,这将遍历引用的 Map 项并确定每个项到查询的欧几里得距离,同时保持 k 最小匹配,其中输入来自用户请求(HTTP POST、按钮单击等),计算完成后结果集立即可用。

我最近的做法是:

  • 在地图日志上收听请求
  • .distributed().broadcast()对映射作业的请求
  • 让映射作业获取参考向量的.localKeySet()
  • 发出 k 最小向量的 ID(按欧几里得距离)
  • 通过.partitioned(item -> item.requestId) 分区在单个节点上减少/收集结果
  • 将结果存储到客户端具有关键侦听器的地图中。

从概念上讲,这里的每个查询都是一个大小为1 的批次,而我实际上是在批量处理来的时候。但是,我很难让映射器和化简器知道批处理何时完成,以便收集器知道它们何时完成(以便他们可以发出最终结果)。

我尝试使用带有真实和虚假时间戳的水印(通过AtomicLong 实例自动获得)并从tryProcessWm 函数发出,但这似乎是一个非常脆弱的解决方案,因为一些事件被丢弃了。我还需要确保没有两个请求是交错的(即对请求 ID 使用分区),但同时让映射器在所有节点上运行...

我将如何完成这项任务?


编辑#1:

现在,我的映射器看起来像这样:

private static class EuclideanDistanceMapP extends AbstractProcessor {
    private IMap<Long, float[]> referenceVectors;

    final ScoreComparator comparator = new ScoreComparator();

    @Override
    protected void init(@Nonnull Context context) throws Exception {
        this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
        super.init(context);
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        final Tuple3<Long, Long, float[]> query = (Tuple3<Long, Long, float[]>)item;
        final long requestId = query.f0();
        final long timestamp = query.f1();
        final float[] queryVector = query.f2();

        final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
        for (Long vectorKey : referenceVectors.localKeySet()) {
            float[] referenceVector = referenceVectors.get(vectorKey);
            float distance = 0.0f;

            for (int i = 0; i < queryVector.length; ++i) {
                distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
            }

            final Tuple2<Long, Float> score = Tuple2.tuple2(vectorKey, (float) Math.sqrt(distance));
            if (buffer.size() < MAX_RESULTS) {
                buffer.add(score);
                continue;
            }

            // If the value is larger than the largest entry, discard it.
            if (comparator.compare(score, buffer.last()) >= 0) {
                continue;
            }

            // Otherwise we remove the largest entry after adding the new one.
            buffer.add(score);
            buffer.pollLast();
        }

        return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray()));
    }

    private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
        @Override
        public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
            return Float.compare(a.f1(), b.f1());
        }
    }
}

reducer 本质上是在复制它(当然,减去向量计算)。


编辑#2:

这是 DAG 设置。当前,当有多个并发请求时,它会失败。由于水印,大部分项目被丢弃。

DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
    SourceProcessors.<Long, float[], Tuple2<Long, float[]>>streamMapP(QUERY_VECTOR_MAP_NAME,
            e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
            e -> Tuple2.tuple2(e.getKey(), e.getNewValue()),true));

// simple map() using an AtomicLong to create the timestamp    
Vertex addTimestamps = dag.newVertex("addTimestamps", AddTimestampMapP::new);

// the class shown above.
Vertex map = dag.newVertex("map", EuclideanDistanceMapP::new);

Vertex insertWatermarks = dag.newVertex("insertWatermarks",
        insertWatermarksP((Tuple3<Long, Long, float[]> t) -> t.f1(), withFixedLag(0), emitByMinStep(1)));

Vertex combine = dag.newVertex("combine", CombineP::new);

// simple map() that drops the timestamp
Vertex removeTimestamps = dag.newVertex("removeTimestamps", RemoveTimestampMapP::new);

// Using a list here for testing.
Vertex sink = dag.newVertex("sink", SinkProcessors.writeListP(SINK_NAME));

dag.edge(between(sourceStream, addTimestamps))
    .edge(between(addTimestamps, map.localParallelism(1))
        .broadcast()
        .distributed())
    .edge(between(map, insertWatermarks).isolated())
    .edge(between(insertWatermarks, combine.localParallelism(1))
            .distributed()
            .partitioned((Tuple2<Long, Tuple2<Long, Float>[]> item) -> item.f0()))
    .edge(between(combine, removeTimestamps)
            .partitioned((Tuple3<Long, Long, Tuple2<Long, Float>[]> item) -> item.f0()))
    .edge(between(removeTimestamps, sink.localParallelism(1)));

编辑#3:

这是我当前的组合器实现。我假设所有物品都会根据水印订购;或者一般来说,相同的组合器实例只会收集相同请求的项目。但这似乎不是真的……

private static class CombineP extends AbstractProcessor {
    private final ScoreComparator comparator = new ScoreComparator();
    private final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
    private Long requestId;
    private Long timestamp = -1L;

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        final Tuple3<Long, Long, Tuple2<Long, Float>[]> itemTuple = (Tuple3<Long, Long, Tuple2<Long, Float>[]>)item;
        requestId = itemTuple.f0();

        final long currentTimestamp = itemTuple.f1();
        if (currentTimestamp > timestamp) {
            buffer.clear();
        }
        timestamp = currentTimestamp;

        final Object[] scores = itemTuple.f2();

        for (Object scoreObj : scores) {
            final Tuple2<Long, Float> score = (Tuple2<Long, Float>)scoreObj;

            if (buffer.size() < MAX_RESULTS) {
                buffer.add(score);
                continue;
            }

            // If the value is larger than the largest entry, discard it.
            if (comparator.compare(score, buffer.last()) >= 0) {
                continue;
            }

            // Otherwise we remove the largest entry after adding the new one.
            buffer.add(score);
            buffer.pollLast();
        }

        return true;
    }

    @Override
    protected boolean tryProcessWm(int ordinal, @Nonnull Watermark wm) {
        // return super.tryProcessWm(ordinal, wm);
        return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray())) && super.tryProcessWm(ordinal, wm);
    }

    private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
        @Override
        public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
            return Float.compare(a.f1(), b.f1());
        }
    }
}

【问题讨论】:

  • 当您说“映射作业”时,您的意思是您对 Processor 的自定义实现?
  • 是的。我添加了代码以进行澄清。
  • 为了保持最佳结果,使用PriorityQueue 更有效(它使用内部的最小/最大堆)。此外,每次从tryEmit 获得false 时,您都会处理大量工作
  • 你所有的处理器都应该有一个本地并行度。特别是,我认为由于insertWatermarksP 的并行化,您正在丢弃项目,这会在流中引入偏斜。
  • 是的,你不能指望分布式边缘背后的如此严格的顺序。这些物品是分批发送的。当您实施像Processor 这样的低级基础设施时,您不会受到处理管道中发生的任何混乱的影响。因此,您应该保留从时间戳到组合数据的映射,当您观察到水印 N 时,您可以发出(并从映射中删除)时间戳小于 N 的所有项目。

标签: hazelcast hazelcast-jet


【解决方案1】:

您必须始终记住,两个顶点之间的项目可以重新排序。当您有并行请求时,它们的中间结果可以在CombineP 中交错。

CombineP 中,您可以依靠中间结果的数量等于集群中的成员数量这一事实。从globalParallelism / localParallelism 计算init 中的参与成员数。当您收到此数量的中间体时,您可以发出最终结果。

另一个技巧可能是在每个成员上并行运行多个请求。您可以通过使用两条边来实现这一点: 1.广播+分布式边缘到并行= 1个处理器 2. 单播边缘到并行度=N 处理器

还要注意localKeys不适用于大地图:查询大小为limited

这是上面的代码。代码适用于 Jet 0.5:

DAG:

DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
        streamMapP(QUERY_VECTOR_MAP_NAME,
                e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
                e -> entry(e.getKey(), e.getNewValue()),true));

Vertex identity = dag.newVertex("identity", mapP(identity()))
        .localParallelism(1);
Vertex map = dag.newVertex("map", peekOutputP(EuclideanDistanceMapP::new));
Vertex combine = dag.newVertex("combine", peekOutputP(new CombineMetaSupplier()));
Vertex sink = dag.newVertex("sink", writeListP(SINK_NAME));

dag.edge(between(sourceStream, identity)
           .broadcast()
           .distributed())
   .edge(between(identity, map))
   .edge(between(map, combine)
           .distributed()
           .partitioned((Entry item) -> item.getKey()))
   .edge(between(combine, sink));

EuclideanDistanceMapP 类:

private static class EuclideanDistanceMapP extends AbstractProcessor {

    private IMap<Long, float[]> referenceVectors;
    final ScoreComparator comparator = new ScoreComparator();
    private Object pendingItem;

    @Override
    protected void init(@Nonnull Context context) throws Exception {
        this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
        super.init(context);
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        if (pendingItem == null) {
            final Entry<Long, float[]> query = (Entry<Long, float[]>) item;
            final long requestId = query.getKey();
            final float[] queryVector = query.getValue();

            final PriorityQueue<Entry<Long, Float>> buffer = new PriorityQueue<>(comparator.reversed());
            for (Long vectorKey : referenceVectors.localKeySet()) {
                float[] referenceVector = referenceVectors.get(vectorKey);
                float distance = 0.0f;

                for (int i = 0; i < queryVector.length; ++i) {
                    distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
                }

                final Entry<Long, Float> score = entry(vectorKey, (float) Math.sqrt(distance));
                if (buffer.size() < MAX_RESULTS || comparator.compare(score, buffer.peek()) < 0) {
                    if (buffer.size() == MAX_RESULTS)
                        buffer.remove();
                    buffer.add(score);
                }
            }
            pendingItem = entry(requestId, buffer.toArray(new Entry[0]));
        }
        if (tryEmit(pendingItem)) {
            pendingItem = null;
            return true;
        }
        return false;
    }
}

组合P类:

private static class CombineP extends AbstractProcessor {
    private final ScoreComparator comparator = new ScoreComparator();
    private final Map<Long, PriorityQueue<Entry<Long, Float>>> buffer = new HashMap<>();
    private final Map<Long, Integer> accumulatedCount = new HashMap<>();
    private final int upstreamMemberCount;
    private Entry<Long, Entry<Long, Float>[]> pendingItem;

    private CombineP(int upstreamMemberCount) {
        this.upstreamMemberCount = upstreamMemberCount;
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        if (pendingItem == null) {
            final Entry<Long, Entry<Long, Float>[]> localValue = (Entry<Long, Entry<Long, Float>[]>) item;
            long requestId = localValue.getKey();
            PriorityQueue<Entry<Long, Float>> globalValue = buffer.computeIfAbsent(requestId, key -> new PriorityQueue<>(comparator.reversed()));
            globalValue.addAll(asList(localValue.getValue()));
            while (globalValue.size() > MAX_RESULTS) {
                globalValue.remove();
            }
            int count = accumulatedCount.merge(requestId, 1, Integer::sum);
            if (count == upstreamMemberCount) {
                // we've received enough local values, let's emit and remove the accumulator
                pendingItem = entry(requestId, globalValue.toArray(new Entry[0]));
                Arrays.sort(pendingItem.getValue(), comparator);
                buffer.remove(requestId);
                accumulatedCount.remove(requestId);
            } else {
                return true;
            }
        }
        if (tryEmit(pendingItem)) {
            pendingItem = null;
            return true;
        }
        return false;
    }
}

您还需要CombineP 的自定义元供应商:

private static class CombineMetaSupplier implements ProcessorMetaSupplier {
    private int upstreamMemberCount;

    @Override
    public void init(@Nonnull Context context) {
        upstreamMemberCount = context.totalParallelism() / context.localParallelism();
    }

    @Nonnull
    @Override
    public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
        return address -> ProcessorSupplier.of(() -> new CombineP(upstreamMemberCount));
    }
}

【讨论】:

  • 当我在作业运行时向集群添加更多节点时,globalParallelism / localParallelism 假设是否成立?
  • 新成员不会默默开始参与工作。如果作业重新调整为它们,它将停止并重新运行,因此您的初始化将使用新值再次运行。但要支持这一点,您必须参与快照协议。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-08
  • 1970-01-01
  • 1970-01-01
  • 2012-01-26
  • 1970-01-01
相关资源
最近更新 更多