【发布时间】:2018-07-19 15:03:10
【问题描述】:
我正在使用here in spark streaming documentation 提供的策略来提交卡夫卡本身。我的流程是这样的: 主题 A --> Spark Stream [foreachRdd 进程 -> 发送到主题 b] 提交到主题 A 的偏移量
JavaInputDStream<ConsumerRecord<String, Request>> kafkaStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Request>Subscribe(inputTopics, kafkaParams)
);
kafkaStream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
rdd.foreachPartition(
consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(String.format("$s %d %d $d", o.topic(), o.partition(), o.fromOffset(), o.untilOffset()));
consumerRecords.forEachRemaining(record -> doProcess(record));
});
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
}
);
假设 RDD 从主题 A 获得 10 个事件,在处理每个事件时,我向主题 B 发送一个新事件。现在假设其中一个响应失败。现在我不想将那个特定的偏移量提交给主题 A。主题 A 和 B 具有相同数量的分区 N。所以每个 RDD 应该从同一个分区消费。继续处理的最佳策略是什么?如何重置流以尝试处理主题 A 中的这些事件,直到成功?我知道我是否不能在不提交的情况下继续处理该分区,因为这会自动移动偏移量并且不会再次处理失败的记录。
我不知道流/rdd 是否有可能继续尝试仅为该分区处理相同的消息,而其他分区/rdd 可以继续工作。如果我从那个特定的 RDD 中抛出一个异常,我的工作会发生什么。会失败吗?我需要手动重新启动它吗?对于普通消费者,您可以重试/恢复,但我不确定 Streaming 会发生什么。
【问题讨论】:
标签: apache-spark apache-kafka spark-streaming