【问题标题】:Kafka Consumer didn't receiving any messages from its ProducerKafka Consumer 没有收到来自 Producer 的任何消息
【发布时间】:2016-09-13 03:55:25
【问题描述】:

以下是我为 kafka 生产者编写的 python 编码,我不确定消息是否能够发布到 Kafka Broker。因为消费者端没有收到任何消息。当我使用生产者控制台命令对其进行测试时,我的 Consumer python 程序运行良好。

from __future__ import print_function

import sys
from pyspark import SparkContext
from kafka import KafkaClient, SimpleProducer

if __name__ == "__main__":

if len(sys.argv) != 2:
    print("Usage:spark-submit producer1.py <input file>", file=sys.stderr)
    exit(-1)

sc = SparkContext(appName="PythonRegression")

def sendkafka(messages):
    ## Set broker port
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka, async=True, batch_send_every_n=5,  
batch_send_every_t=10)
    send_counts = 0
    for message in messages:
        try:
            print(message)
            ## Set topic name and push messages to the Kafka Broker
            yield producer.send_messages('test', message.encode('utf-8'))
        except Exception, e:
            print("Error: %s" % str(e))
        else:
            send_counts += 1
    print("The count of prediction results which were sent IN THIS PARTITION 
is %d.\n" % send_counts)

## Connect and read the file.    
rawData = sc.textFile(sys.argv[1])

## Find and skip the first row
dataHeader = rawData.first()
data =  rawData.filter(lambda x: x != dataHeader)

## Collect the RDDs.
sentRDD = data.mapPartitions(sendkafka) 
sentRDD.collect()

## Stop file connection
sc.stop()

这是我的“消费者”python 编码

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if len(sys.argv) < 3:
print ("Program to pulls the messages from kafka brokers.")
print("Usage: consume.py <zk> <topic>", file=sys.stderr)

else:
## Flow
## Loads settings from system properties, for launching of spark-submit.
sc = SparkContext(appName="PythonStreamingKafkaWordCount")

## Create a StreamingContext using an existing SparkContext.
ssc = StreamingContext(sc, 10)

## Get everything after the python script name
zkQuorum, topic = sys.argv[1:]

## Create an input stream that pulls messages from Kafka Brokers.
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1})

## 
lines = kvs.map(lambda x: x[1])

## Print the messages pulled from Kakfa Brokers
lines.pprint()

## Save the pulled messages as file
## lines.saveAsTextFiles("OutputA")

## Start receiving data and processing it
ssc.start()

## Allows the current process to wait for the termination of the context 
ssc.awaitTermination()

【问题讨论】:

    标签: python apache-spark apache-kafka spark-streaming kafka-python


    【解决方案1】:

    我通常使用 kafka-console-consumer(Apache Kafka 的一部分)来调试此类问题,以使用您尝试生成的主题。如果控制台消费者收到消息,您就知道它们到达了 Kafka。

    如果你先运行生产者,让它完成,然后再启动消费者,那么问题可能是消费者从日志末尾开始,正在等待额外的消息。确保您首先启动消费者,或者将其配置为从头开始自动启动(抱歉,不确定如何使用您的 Python 客户端执行此操作)。

    【讨论】:

      【解决方案2】:

      您可以检查主题中的消息数量是否随着 Produce 请求的增加而增加:

      ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
      --broker-list <Kafka_broker_hostname>:<broker_port> --topic Que1 \ 
      --time -1 --offsets 1 | awk -F  ":" '{sum += $3} END {print sum}'
      

      如果消息数量在增加,则说明 Producer 工作正常。

      【讨论】:

        【解决方案3】:

        好吧,我认为我本地的 Zookeeper 或 Kafka 有问题,因为我在另一台服务器上对其进行了测试,它运行良好。但是,感谢那些回复我的人;)

        【讨论】:

          猜你喜欢
          • 2018-02-09
          • 1970-01-01
          • 2019-09-01
          • 2016-09-07
          • 2022-12-15
          • 2013-07-22
          • 2021-12-10
          • 2015-05-20
          • 2019-11-20
          相关资源
          最近更新 更多