【问题标题】:spark streaming with results send to another topic using comitAsync使用 comitAsync 将结果发送到另一个主题的火花流
【发布时间】:2018-07-19 15:03:10
【问题描述】:

我正在使用here in spark streaming documentation 提供的策略来提交卡夫卡本身。我的流程是这样的: 主题 A --> Spark Stream [foreachRdd 进程 -> 发送到主题 b] 提交到主题 A 的偏移量

    JavaInputDStream<ConsumerRecord<String, Request>> kafkaStream = KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, Request>Subscribe(inputTopics, kafkaParams)
    );

    kafkaStream.foreachRDD(rdd -> {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
                rdd.foreachPartition(
                        consumerRecords -> {
                            OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                            System.out.println(String.format("$s %d %d $d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
                            consumerRecords.forEachRemaining(record -> doProcess(record));
                        });

                ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
            }
    );

假设 RDD 从主题 A 获得 10 个事件,在处理每个事件时,我向主题 B 发送一个新事件。现在假设其中一个响应失败。现在我不想将那个特定的偏移量提交给主题 A。主题 A 和 B 具有相同数量的分区 N。所以每个 RDD 应该从同一个分区消费。继续处理的最佳策略是什么?如何重置流以尝试处理主题 A 中的这些事件,直到成功?我知道我是否不能在不提交的情况下继续处理该分区,因为这会自动移动偏移量并且不会再次处理失败的记录。

我不知道流/rdd 是否有可能继续尝试仅为该分区处理相同的消息,而其他分区/rdd 可以继续工作。如果我从那个特定的 RDD 中抛出一个异常,我的工作会发生什么。会失败吗?我需要手动重新启动它吗?对于普通消费者,您可以重试/恢复,但我不确定 Streaming 会发生什么。

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming


    【解决方案1】:

    这是我想出的,它接受输入数据,然后使用输出主题发送请求。生产者必须在 foreach 循环中创建,否则 spark 将尝试序列化并将其发送给所有工作人员。请注意,响应是异步发送的。这意味着我在这个系统中至少使用了一种语义。

    kafkaStream.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            rdd.foreachPartition(
                    partition -> {
                        OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                        System.out.println(String.format("%s %d %d %d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
    
                        // Print statements in this section are shown in the executor's stdout logs
                        KafkaProducer<String, MLMIOutput> producer = new KafkaProducer(producerConfig(o.partition()));
                        partition.forEachRemaining(record -> {
    
                            System.out.println("request: "+record.value());
    
                            Response data = new  Response …
                            // As as debugging technique, users can write to DBFS to verify that records are being written out
                            // dbutils.fs.put("/tmp/test_kafka_output",data,true)
                            ProducerRecord<String, Response> message = new ProducerRecord(outputTopic, null, data);
                            Future<RecordMetadata> result = producer.send(message);
                            try {
                                RecordMetadata metadata = result.get();
                                System.out.println(String.format("offset='$d' partition='%d' topic='%s'timestamp='$d",
                                metadata.offset(),metadata.partition(),metadata.topic(),metadata.timestamp()));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            }
                        });
                        producer.close();
                    });
    
            ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
        }
    

    );

    【讨论】:

      猜你喜欢
      • 2016-10-06
      • 2016-12-16
      • 2019-10-11
      • 2020-04-11
      • 2015-10-13
      • 2020-05-09
      • 1970-01-01
      • 1970-01-01
      • 2023-01-30
      相关资源
      最近更新 更多