【发布时间】:2020-06-13 02:57:54
【问题描述】:
我在 Spark 中创建了一个示例 Direct Kafka Stream。 Kafka 有 30 个给定主题的分区。但是所有消费者都从同一台执行器机器上执行。
卡夫卡经理截图。
根据我对直接 Kafka Stream 的理解,Driver 将偏移量提供给执行器并以此进行轮询。
Spark 版本:2.4
示例代码如下:
import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import java.util.Arrays;
import java.util.HashMap;
public class Main {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("StreamingTest");
conf.set("spark.shuffle.service.enabled", "true");
conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.concurrentJobs", "1");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
conf.set("spark.streaming.backpressure.pid.minRate", "1500");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "test-topic-1");
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
System.out.println("Processing test-topic-1");
try {
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));
kafkaStream1.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);
rdd.foreachPartition(partition -> {
OffsetRange o = beginOffsets[TaskContext.get().partitionId()];
});
((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(beginOffsets);
});
ssc.start();
ssc.awaitTermination();
}
public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-ids>");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"hrishi-testing-nfr-7");
kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
}
}
【问题讨论】:
-
Dstream api 已弃用。您尝试过结构化流式传输吗?
-
是的,我知道。 Dstream 已被弃用,但我的用例仅限于映射分区,因此这不是问题,目前无法负担迁移。我怀疑它是因为 Dstream。
-
您也可以映射数据框。无论如何,请显示完整代码,这个属性看起来很可疑
spark.streaming.concurrentJobs -
添加了完整的示例代码@cricket_007
-
看起来你的代码并没有真正做任何事情接受提交偏移量。您是否尝试过仅使用非常基本的属性集?你想优化什么?