【问题标题】:How do I join two streams in apache flink?如何在 apache flink 中加入两个流?
【发布时间】:2020-05-22 16:32:35
【问题描述】:

我开始使用 flink 并查看one of the official tutorials

据我了解,本练习的目标是在时间属性上加入两个流。

任务:

这个练习的结果是一个 Tuple2 记录的数据流,每个不同的rideId 对应一个。你应该忽略 END 活动,并且仅在每次骑行开始时加入活动 其对应的票价数据。

结果流应该被打印到标准输出。

问题: EnrichmentFunction 又如何能够加入这两个流。它怎么知道参加哪个展会?我希望它能够缓冲多个展会/游乐设施,直到有一个匹配的合作伙伴。

据我了解,它只是保存了它看到的每一次骑行/公平,并将其与下一个最佳骑行/公平结合起来。为什么这是一个正确的连接?

提供的解决方案:

/*
 * Copyright 2017 data Artisans GmbH
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.dataartisans.flinktraining.solutions.datastream_java.state;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

/**
 * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
 * (http://training.data-artisans.com).
 *
 * The goal for this exercise is to enrich TaxiRides with fare information.
 *
 * Parameters:
 * -rides path-to-input-file
 * -fares path-to-input-file
 *
 */
public class RidesAndFaresSolution extends ExerciseBase {
    public static void main(String[] args) throws Exception {

        ParameterTool params = ParameterTool.fromArgs(args);
        final String ridesFile = params.get("rides", pathToRideData);
        final String faresFile = params.get("fares", pathToFareData);

        final int delay = 60;                   // at most 60 seconds of delay
        final int servingSpeedFactor = 1800;    // 30 minutes worth of events are served every second

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(ExerciseBase.parallelism);

        DataStream<TaxiRide> rides = env
                .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
                .filter((TaxiRide ride) -> ride.isStart)
                .keyBy("rideId");

        DataStream<TaxiFare> fares = env
                .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
                .keyBy("rideId");

        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction());

        printOrTest(enrichedRides);

        env.execute("Join Rides with Fares (java RichCoFlatMap)");
    }

    public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
        // keyed, managed state
        private ValueState<TaxiRide> rideState;
        private ValueState<TaxiFare> fareState;

        @Override
        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
        }

        @Override
        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            if (fare != null) {
                fareState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                rideState.update(ride);
            }
        }

        @Override
        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiRide ride = rideState.value();
            if (ride != null) {
                rideState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                fareState.update(fare);
            }
        }
    }
}

【问题讨论】:

    标签: java apache-flink


    【解决方案1】:

    在这个特定的training exercise on stateful enrichment 上下文中,rideId 的每个值都有三个事件——TaxiRide 开始事件、TaxiRide 结束事件和 TaxiFare。本练习的目的是将每个 TaxiRide 开始事件与一个具有相同rideId 的 TaxiFare 事件联系起来——或者换句话说,在rideId 上加入乘车流和票价流,同时知道每个事件只有一个。

    这个练习展示了键控状态在 Flink 中是如何工作的。键控状态实际上是一个分片键值存储。当我们有 ValueState 的项目时,例如 ValueState&lt;TaxiRide&gt; rideState,Flink 将在其状态后端为键的每个不同值(rideId)存储单独的记录。

    每次调用flatMap1flatMap2 时,上下文中都会隐含一个键(rideId),当我们调用rideState.update(ride)rideState.value() 时,我们不是在访问单个变量,而是在设置并使用 rideId 作为键在键值存储中获取条目。

    在本练习中,两个流都由rideId 键控,因此对于每个不同的rideId,可能有一个rideState 元素和一个fareState 元素。因此,提供的解决方案是缓冲大量的游乐设施和票价,但每个rideId 仅缓冲一个(这已经足够了,因为游乐设施和票价在此数据集中完美配对)。

    所以,你问:

    EnrichmentFunction 又是如何加入这两个流的。它如何知道要加入哪种票价?

    答案是

    加入票价相同的rideId

    您询问过的这个特定练习展示了如何实现简单的扩充连接,以了解键控状态和连接流的概念。但更复杂的连接当然可以使用 Flink。请参阅 joins using the DataStream APIjoins with Flink's Table APIjoins with Flink SQL 上的文档。

    【讨论】:

    • 非常感谢!我将这两个状态作为“单个变量”而不是作为键值存储。现在一切都说得通了。 P.S:感谢您在 HPI 的演讲。
    猜你喜欢
    • 1970-01-01
    • 2019-04-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-09
    相关资源
    最近更新 更多