【问题标题】:Correct way to make Left Join on two Streams in Apache Flink在 Apache Flink 中的两个流上进行左连接的正确方法
【发布时间】:2020-04-06 06:35:09
【问题描述】:

我正在使用 Apache Flink 开发欺诈检测系统,但我是初学者,并且被这个问题困扰:

我想从两个流中进行左连接,一个包含当前交易,另一个包含与银行的验证交易,在那里我可以找到是否存在诸如被盗卡等错误。所以我需要加入它们知道一张卡过去是否被拒绝过。

   DataStream<Card> currentDataStream =  getCardsStream(env, Parameters.CURRENT_SOCKET)
            .keyBy((card) -> card.getCardID);

    DataStream<Card> historicDataStream =  getCardsStream(env, Parameters.HISTORIC_SOCKET)
            .keyBy((card) -> card.getCardID()); 

我现在正在做的是一个 RichCoFlatMapFunction,它在每次historyDataStream 到达时更新一个名为 historicList 的列表状态,并返回一个包含当前卡的元组和一个包含所有连接事件的列表编号:

public class LeftJoin extends RichCoFlatMapFunction<Card, Card, Tuple2<Card, List<Card>> > {

    private ValueState<Card> currentValueState;
    private ListState<Card> historicListState;

    @Override
    public void open(Configuration parameters) throws Exception {
        currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
        historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
    }

    @Override
    public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        Iterable<Card> historicCardList =  historicListState.get();

        //If there is a coincidence
        if (Iterables.size(historicCardList) > 0) {
            out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
        } else {
            currentValueState.update(currentCard);
            //Returning null if there are no cards for the Id
            out.collect(new Tuple2<>(currentCard, null));
        }
    }

    @Override
    public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        historicListState.add(historicCard); //Updates the historicListState
    }
}

问题是List&lt;Card&gt; 给我带来了很多麻烦,因为我想检查包含的卡片的规则,因为它总是会再次获得所有卡片,我需要一种方法来标记我已经使用的卡片根据我的规则进行处理,如下所示:

  //I don't like this list because it always gets me all the join coincidences
    for (Card card : historicList) {

        //Comparar cada regla del Broadcast state con el error que contiene el elemento card
        if (rule.getBankDecision().equals(card.getErrors())) {


            //Evaluate some rules
            for (Long stateEventTime : windowState.keys()) {
                if (isStateValueInWindow(stateEventTime, windowStartForEvent, System.currentTimeMillis())) {
                    aggregateValuesInState(stateEventTime, aggregator);
                }

            }
    }

有没有更好的方法将加入的卡片作为流获取?

【问题讨论】:

    标签: stream left-join apache-flink


    【解决方案1】:

    希望我理解正确,如果不是请纠正我。

    1. private ValueState&lt;Card&gt; currentValueState 是多余的(在这个例子中,你只更新它而不读取它的值)
    2. 如果我对您的理解正确,问题是您在整个historyListState 上发出了您的规则系统,而您已经检查了其中一些。 为什么不从historyListState 中删除已经超过规则的卡片?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多