【发布时间】:2015-12-20 05:51:02
【问题描述】:
我有一个工作,每 10 秒从 Kafka 接收数据,然后我将数据格式化并插入到 cassandra,但是我的工作变得越来越慢,这非常令人困惑。
根据我的统计,每 10 秒不到 100 条消息,第一次处理最多只需要 1 秒,但几天后处理速度变慢,处理 10 秒需要 14 秒数据。
我很困惑,是否有某些因素会使工作变慢。
而且我注意到处理python -m pyspark.daemon也消耗越来越多的内存,有没有一些方法可以降低内存消耗。
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
24527 yao.yu 20 0 10.334g 9.823g 3580 R 96.8 66.9 3424:56 python
代码如下:
if __name__ == "__main__":
conf = SparkConf().setAppName("Kafka_To_Cassandra").set("spark.streaming.kafka.maxRatePerPartition", "1000")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})
lines = kvs.map(lambda x: x[1]) \
.filter(lambda s: 'identifier' in s) \
.filter(lambda s: 'app_name' in s) \
.filter(lambda s: 'app_version' in s)
map_lines = lines.map(mapper).filter(lambda s: 'JsonLoadException' not in s)
#map_lines.pprint()
map_lines.foreachRDD(lambda rdd: rdd.foreachPartition(save_to_cassandra))
ssc.start()
ssc.awaitTermination()
【问题讨论】:
-
看不到你的代码,很难说进程为什么会这样积累内存。
-
听起来像内存泄漏:如果您或其他应用程序正在使用内存但没有释放它,那么一段时间后,这个“泄漏”的内存将显着大于您的物理 RAM,这将迫使机器利用磁盘分页来补充 RAM,这对性能有非常负面的影响。重新启动或只是重新启动进程是一种快速修复,但不是解决方案。
-
@KlausD。嗨克劳斯,代码在上面,我认为原因可能是:
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})
标签: python cassandra apache-spark spark-streaming pyspark