【问题标题】:spark structured streaming can not recive kafka messagespark结构化流无法接收kafka消息
【发布时间】:2018-08-17 21:58:15
【问题描述】:

我正在测试使用 kafka 的 spark 结构化流。我在 host28 上有一个 kafka-broker(0.10.1),默认分区编号:num.partitions=1

我的制作人:

bin/kafka-console-producer.sh --broker-list host28:6667 --topic test

当我使用时

bin/kafka-console-consumer.sh --zookeeper host26:2181,host27:2181,host28:2181 --topic test --from-beginning

bin/kafka-console-consumer.sh --bootstrap-server host8:6667 --topic test --from-beginning --partition 0

可以接收来自 kafka 的消息。

但是当使用时

bin/kafka-console-consumer.sh --bootstrap-server host28:6667 --topic test --from-beginning

或 spark 结构化流无法接收消息

public class Main {
    private static String APP_NAE = "test_streaming_from_kafka";
    private static String KAFKA_HOST = "host28:6667";
    private static String KAFKA_SUBSCRIBE = "test";
    public static void main(String[] args) throws Exception {

        SparkSession spark = SparkSession
                .builder()
                .appName(APP_NAE)
                .getOrCreate();

        DataStreamReader reader = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", KAFKA_HOST)
                .option("subscribe", KAFKA_SUBSCRIBE);

        StreamingQuery query = reader.load()
                .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
                .writeStream()
                .format("console")
                .start();

        query.awaitTermination();
    }
}

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    已解决!

    我把spark log从INFO改成了DEBUG,然后我发现了这个:

    18/08/17 21:12:07 DEBUG AbstractCoordinator: 接收组 协调器响应 ClientResponse(receivedTimeMs=1534511527794, 断开=假,请求=客户端请求(期望响应=真, 回调=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@3d2afb1b, request=RequestSend(header={api_key=10,api_version=0,correlation_id=117,client_id=consumer-1}, 正文={group_id=spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0}), createdTimeMs=1534511527794, sendTimeMs=1534511527794), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 18/08/17 21:12:07 DEBUG AbstractCoordinator:组协调器查找 团体用 spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0 失败:组协调员不可用

    谷歌The group coordinator is not available找到this

    【讨论】:

    • 有人可以粘贴The group coordinator is not available 的分辨率吗?无法访问上面给出的链接
    猜你喜欢
    • 2020-10-27
    • 2019-01-29
    • 2018-08-20
    • 2017-04-04
    • 2020-05-27
    • 2020-01-31
    • 1970-01-01
    • 2017-08-23
    • 2021-02-21
    相关资源
    最近更新 更多