【问题标题】:Spark Structured Streaming cannot read from kafka inside dockerSpark Structured Streaming 无法从 docker 内的 kafka 读取
【发布时间】:2021-12-05 22:38:00
【问题描述】:

提交 spark 结构化流作业以从 kafka 读取后无法修复问题。 spark作业代码示例:

object KafkaStructuredStreaming {


  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName(getClass.getName)
      .master("spark://spark-master:7077")
      .getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("startingOffsets", "earliest")
      .option("subscribe", "tweet-upload-6")
      .option("enable.auto.commit", false)
      .option("group.id", "Structured-Streaming-Examples")
      .option("failOnDataLoss", false)
      .load()

    df.printSchema()

    val consoleOutput = df.writeStream
      .outputMode("append")
      .format("console")
      .start()
    consoleOutput.awaitTermination()

  }
}

注意:Kafka 和 spark 节点位于同一个 docker 网络中,之前使用 spark-streaming 一切正常,但我切换到结构化流,因为我遇到了单个输入流 -> 多个输出流的问题。

我现在收到错误:

07721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job  | java.net.UnknownHostException: kafka
submit-spark-job  |     at java.net.InetAddress.getAllByName0(InetAddress.java:1282)
submit-spark-job  |     at java.net.InetAddress.getAllByName(InetAddress.java:1194)
submit-spark-job  |     at java.net.InetAddress.getAllByName(InetAddress.java:1128)
submit-spark-job  |     at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
submit-spark-job  |     at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)
submit-spark-job  |     at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)
submit-spark-job  |     at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1367)
submit-spark-job  | 21/10/18 22:44:41 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0-1, groupId=spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job  | java.net.UnknownHostException: kafka

还有

85949862d6d3-1705030665-driver-0] Group coordinator kafka:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
submit-spark-job  | 21/10/18 23:03:46 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.7 executor 0): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
submit-spark-job  |     at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactor

Docker 编写文件:

version: '2'

services:
  zookeeper:
    image: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: linuxkitpoc/kafka:latest
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  tweet-producer:
    image: mkovacevic/tweet-producer-app:latest
    ports:
      - "8080:8080"
    tty: true
    depends_on:
      - kafka

(在默认网络中创建:twitter-streaming_default) 和

version: '3'
services:
  spark-master:
    image: bde2020/spark-master:3.1.1-hadoop3.2
    container_name: spark-master
    hostname:  spark-master
    ports:
      - "7077:7077"
    environment:
      - INIT_DAEMON_STEP=setup_spark
  spark-worker-1:
    image: bde2020/spark-worker:3.1.1-hadoop3.2
    container_name: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8081:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
  spark-worker-2:
    image: bde2020/spark-worker:3.1.1-hadoop3.2
    container_name: spark-worker-2
    depends_on:
      - spark-master
    ports:
      - "8082:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
  submit-spark:
    image: mkovacevic/spark-streaming-app:latest
    container_name: submit-spark-job
    depends_on:
      - spark-master
      - spark-worker-1
      - spark-worker-2

networks:
  default:
    external: true
    name: twitter-streaming_default

有什么建议吗??

【问题讨论】:

  • 首先向我们展示您的/etc/hosts 或Windows 上的任何等效项。尝试 ping kafka:9092。似乎主机名kafka 没有正确解析。另外,发布您的 docker 文件,以便我们查看网络设置。没有那个它必须说什么。

标签: scala apache-spark apache-kafka streaming spark-structured-streaming


【解决方案1】:

每个撰写文件都会创建自己的、隔离的默认桥接网络。

如果您想使用两个文件,那么您必须明确地将网络添加到每个服务。否则,您需要将所有服务放在一个文件中

顺便说一句,linuxkitpoc/kafka 图像已经多年没有更新了。我建议你为 Kafka 使用其他东西。个人推荐来自Bitnami

【讨论】:

  • 看起来 Bitnami 图像有帮助! @OneCricketeer 谢谢!
  • 该图像不是问题(除非代理实际上没有正确启动)。但是,如果这解决了您的问题,请随时使用帖子旁边的复选标记接受答案
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-12-30
  • 2019-10-18
  • 2020-07-12
  • 2021-05-08
  • 1970-01-01
  • 2021-05-22
相关资源
最近更新 更多