【发布时间】:2018-02-22 04:46:39
【问题描述】:
我有一个 pyspark 程序,它可以接受配置中提到的多个 Kafka 流,如下所示:
[stream1]
server=10.0.0.1:9090
topic=log_topic
[stream2]
server=10.0.0.2:9090
topic=file_topic
所以我的代码使用上述配置来加载多个流,如下所示:
from configobj import ConfigObj
config = ConfigObj("my.conf")
for i, j in conf.iteritems():
stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache()
stream.pprint()
现在说stream1是否有以下传入内容:
aaaaa
aaaaa
aaaaa
...
而stream2的内容如下:
bbbbb
bbbbb
bbbbb
...
使用 pprint 函数,我期待看到以下输出:
-----------------------------
2017-09-13 16:54:32
-----------------------------
aaaaa
aaaaa
aaaaa
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
但我看到以下输出:
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
我知道在 for 循环的第二次迭代之后读取变量 stream 之后似乎存在延迟加载或读取变量的东西。谁能告诉我如何实现这一点,以便我可以在 for 循环中处理 2 个单独的流。
谢谢!
【问题讨论】:
标签: apache-spark pyspark apache-kafka spark-streaming