【发布时间】:2019-07-10 19:10:57
【问题描述】:
我是 docker 新手。我正在尝试使用 docker 运行火花流应用程序。
我有 kafka 和 spark 流应用程序分别在 2 个容器中运行。
我的 kafka 服务已启动并运行良好。我用 $KAFKA_HOME/bin/kafka-console-producer.sh 和 $KAFKA_HOME/bin/kafka-console-consumer.sh 进行了测试。我可以接收消息。
但是当我运行我的 spark 流应用程序时,它显示:
[Consumer clientId=consumer-1, groupId=consumer-spark] Connection to node -1 could not be established. Broker may not be available.
所以,我无法使用消息。
kafka : docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://:9092
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Spark 流式处理代码:
val sparkConf = new SparkConf().setAppName("Twitter Ingest Data")
sparkConf.setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaTopics = "sentiment"
val kafkaBroker = "kafka:9092"
val topics : Set[String] = kafkaTopics.split(",").map(_.trim).toSet
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> kafkaBroker,
"group.id" -> "consumer-spark",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
logger.info("Connecting to broker...")
logger.info(s"kafkaParams: $kafkaParams")
val tweetStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
我不确定我是否遗漏了什么。
任何帮助将不胜感激!
【问题讨论】:
标签: docker apache-kafka spark-streaming