【问题标题】:Kafka Stream KTable-KTable keyed join producing duplicate events(Detected out-of-order KTable)Kafka Stream KTable-KTable键控连接产生重复事件(检测到乱序KTable)
【发布时间】:2020-06-22 14:51:22
【问题描述】:

我有两个流来自两个主题 ordersfsource。主要订单是静态的并且很少更新,而fsource 以每秒 1000 的速率更新。这里我使用了KTable-KTabke join,因为它们有相同的密钥。

PObject:
private String orderId;{1,2,3,4,5,6}
private Double price;
private Long oTNum;//{1,2,3,4,5,6}

FSource:
private String orderId;{1,2,3,4,5,6}
private Double adPrice;
private Long fTNum;//{1,2,3,4,5.........} Sequence number for for each event

EnOrder:
private String orderId;
private Double price;
private Double adPrice;
private Long oTNum;
private Long fTNum;
private Long eTNum;//{1,2,3,4,5.........}

        public class EnStreamApp implements PTMS{
            
            private static final Logger logger = Logger.getLogger(EnStreamApp.class);
            
            public static void main(String[] args) {
                
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app1-application");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, URL_KAFKA_BROKERS);
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
                props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //       props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
                props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); //commit as fast as possible
                props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);   
                props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 30000);
                
                 
                 
                 StreamsBuilder builder = new StreamsBuilder();
                 KTable<String, POrder> pOrderTable = builder.<String, POrder>table("orders"); // Static KTable
                 KTable<String, FSource> fTable = builder.<String, FSource>table("fsource"); // Events 1000 per seconds 
                 
                 KTable<String, EnOrder> enrichedTable = pOrderTable.join(fTable, new ValueJoiner<POrder, FSource, EnOrder>() {

                        @Override
                        public EnOrder apply(POrder order, FSource fSource) {
                            EnOrder enOrder = EnOrder.builder()
                                            .orderId(order.getOrderId())
                                            .price(order.getPrice())
                                            .oTNum(order.getOTNum())
                                            .adPrice(fSource!=null ? fSource.getAdPrice():null)
                                            .fTNum(fSource!=null ? fSource.getFTNum():0)
                                            .eTNum(AtomicSequenceGenerator.INSTANCE.getNext())  // This should be in-sync with events from fSource fTNum                                    
                                            .build();
                            
                            logger.info(String.format("Enriched:{OrderId=%s, oTNum=%s, fTNum=%s, eTNum=%s}", enOrder.getOrderId(), enOrder.getOTNum(), enOrder.getFTNum(), enOrder.getETNum()));
                            return enOrder;
                        }
                    }); 
                
                 
                 enrichedTable.toStream().to("enriched", Produced.with(Serdes.String(), new JSONSerdeComp<>()));

                 KafkaStreams streams = new KafkaStreams(builder.build(), props);
                 streams.start();
                 
                 Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
            }

        }


    2020-06-16 14:00:56,577 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=3, oTNum=3, fTNum=232, eTNum=232}
    2020-06-16 14:00:56,578 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=4, oTNum=4, fTNum=233, eTNum=233}
    2020-06-16 14:00:56,578 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=234, eTNum=234}

    2020-06-16 14:18:54,979 WARN  org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9560, partition 0.
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=9564, eTNum=15742}
    2020-06-16 14:26:50,799 WARN  org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9562, partition 0.
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=6, oTNum=6, fTNum=9565, eTNum=15743}
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=1, oTNum=1, fTNum=9566, eTNum=15744}
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=2, oTNum=2, fTNum=9567, eTNum=15745}
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=3, oTNum=3, fTNum=9568, eTNum=15746}
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=4, oTNum=4, fTNum=9569, eTNum=15747}

流的合并在一段时间内看起来不错,但是在此期间我看到join 函数正在处理重复的事件。理想情况下,fsource 的一个事件应该在“已加入流”上产生一个事件,那么为什么加入处理的事件多于它接收到的事件。

这看起来是正确的

    2020-06-16 14:00:56,578 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-251f1c80-dfcf-433a-a361-5f0fc3cf887e-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=234, eTNum=234}

这看起来不对

    2020-06-16 14:18:54,979 WARN  org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9560, partition 0.
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=9564, eTNum=15742}
    2020-06-16 14:26:50,799 WARN  org.apache.kafka.streams.kstream.internals.KTableSource [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Detected out-of-order KTable update for fsource-STATE-STORE-0000000003 at offset 9562, partition 0.
    2020-06-16 14:26:50,799 INFO  com.amicngh.mr.streams.AaggregatorStream [my-app1-aggregation-application-82e14b19-9ae3-4ce7-9c1b-a041ccbe70ed-StreamThread-1]: Enriched:{OrderId=6, oTNum=6, fTNum=9565, eTNum=15743}

知道为什么 join 没有按预期工作吗?这些警告是否会导致此问题?我该如何解决这个问题?

 <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.1</version>
</dependency>

更新:

知道为什么join 处理重复事件(见下文,我可以看到相同fTNum 的多个eTNum 值)

2020-06-17 17:59:05,033 INFO  com.cs.pt.mr.streams.AaggregatorStream [my-app1-aggregation-application-47c109a1-d4ad-4d11-a833-9e24d8b99f6b-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=6989, eTNum=120749}
2020-06-17 17:59:19,194 INFO  com.cs.pt.mr.streams.AaggregatorStream [my-app1-aggregation-application-47c109a1-d4ad-4d11-a833-9e24d8b99f6b-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=6989, eTNum=139709}
2020-06-17 17:59:33,438 INFO  com.cs.pt.mr.streams.AaggregatorStream [my-app1-aggregation-application-47c109a1-d4ad-4d11-a833-9e24d8b99f6b-StreamThread-1]: Enriched:{OrderId=5, oTNum=5, fTNum=6989, eTNum=158669}

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    警告实际上只是警告,它们告诉您输入主题分区包含无序数据,即同一分区内记录的时间戳倒退。 -- 这可能是连接正确性的问题,因为它需要有序的输入数据。 -- 仍然会处理所有数据,但就连接的时间语义而言,结果可能不正确。

    不确定为什么您发布的输出不是预期的:

    Enriched:{OrderId=5, oTNum=5, fTNum=9564, eTNum=15742}
    Enriched:{OrderId=6, oTNum=6, fTNum=9565, eTNum=15743}
    

    好像是两个不同的键?

    【讨论】:

    • 这里 fTNum 代表一个 eventId(序列号),它应该与 eTNum 匹配,因为 fsource 上的一个事件应该导致一个富集事件,这意味着 fTNumeTNum 的值应该始终匹配。你不这么认为吗?我只想在这里提一下,我有单个分区,单个生产者来维护有序流。
    • 我还可以看到多次加入流处理相同的fTNum。理想情况下,一个事件应该被处理一次。
    • 不确定我是否完全理解这个场景。从您的描述中不清楚输入的 key 是什么。连接发生在键上,我确实假设两个输入的键都是orderId?如果这是正确的,我不明白为什么fTNum 应该等于eTNum(如果orderId 在两个流中分别具有相同的1:1 映射到fTNumeTNum)。 -- 此外,如果fsourceorder 为相同的键(即orderId)更新,结果将相应更新。我不认为日志真的有帮助,但输入数据 sn-p 可能会有所帮助。
    • 是的,两个输入流都有keyorderIdexpect if orderId has the same 1:1 mapping to fTNum and eTNum in both streams respectively -> 你不觉得是这样吗?我的意思是它是一个 KTable-KTable 连接,这意味着 fsourcexyz orderId 上的事件将与 order's xyz 键连接,并将导致合并流上的一个事件(eTNum)?
    • 如果是这样,我倾向于同意。如果每个主题恰好包含一条 orderId = xyz 的记录,那么应该只有一个连接结果。 - 因此,我的结论是,您的输入主题似乎有多个 orderId = xyz 的记录。 -- 您显示的连接结果中的fNum 相同,因此我假设 order-topic 有多个记录,其中包含相同的 orderId。
    猜你喜欢
    • 2021-03-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-13
    • 1970-01-01
    • 2022-01-09
    • 1970-01-01
    相关资源
    最近更新 更多