【发布时间】:2018-08-17 21:58:15
【问题描述】:
我正在测试使用 kafka 的 spark 结构化流。我在 host28 上有一个 kafka-broker(0.10.1),默认分区编号:num.partitions=1
我的制作人:
bin/kafka-console-producer.sh --broker-list host28:6667 --topic test
当我使用时
bin/kafka-console-consumer.sh --zookeeper host26:2181,host27:2181,host28:2181 --topic test --from-beginning
或
bin/kafka-console-consumer.sh --bootstrap-server host8:6667 --topic test --from-beginning --partition 0
可以接收来自 kafka 的消息。
但是当使用时
bin/kafka-console-consumer.sh --bootstrap-server host28:6667 --topic test --from-beginning
或 spark 结构化流无法接收消息
public class Main {
private static String APP_NAE = "test_streaming_from_kafka";
private static String KAFKA_HOST = "host28:6667";
private static String KAFKA_SUBSCRIBE = "test";
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName(APP_NAE)
.getOrCreate();
DataStreamReader reader = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_HOST)
.option("subscribe", KAFKA_SUBSCRIBE);
StreamingQuery query = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
.writeStream()
.format("console")
.start();
query.awaitTermination();
}
}
【问题讨论】:
标签: apache-spark apache-kafka spark-structured-streaming