【问题标题】:Pyspark Multiple Kafka Stream overwriting variablesPyspark 多个 Kafka 流覆盖变量
【发布时间】:2018-02-22 04:46:39
【问题描述】:

我有一个 pyspark 程序,它可以接受配置中提到的多个 Kafka 流,如下所示:

[stream1]
server=10.0.0.1:9090
topic=log_topic

[stream2]
server=10.0.0.2:9090
topic=file_topic

所以我的代码使用上述配置来加载多个流,如下所示:

from configobj import ConfigObj
config = ConfigObj("my.conf")
for i, j in conf.iteritems():
    stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache()
    stream.pprint()

现在说stream1是否有以下传入内容:

aaaaa
aaaaa
aaaaa
...

而stream2的内容如下:

bbbbb
bbbbb
bbbbb
...

使用 pprint 函数,我期待看到以下输出:

-----------------------------
2017-09-13 16:54:32
-----------------------------
aaaaa
aaaaa
aaaaa
...

-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...

但我看到以下输出:

-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...

-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...

我知道在 for 循环的第二次迭代之后读取变量 stream 之后似乎存在延迟加载或读取变量的东西。谁能告诉我如何实现这一点,以便我可以在 for 循环中处理 2 个单独的流。

谢谢!

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-streaming


    【解决方案1】:

    类似:

    streams = []
    config = ConfigObj("my.conf")
    for i, j in conf.iteritems():
        stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache()
        streams.append(stream)
    
    all_topic = ssc.union(*streams)    
    all_topic.pprint()
    

    【讨论】:

    • 感谢张老师的回复。但是我不需要流的联合,而是我想根据一些逻辑对两个流应用不同的 map reduce 函数。我需要单独的流。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-26
    • 2015-08-14
    • 2014-01-19
    • 1970-01-01
    • 2021-11-28
    • 1970-01-01
    相关资源
    最近更新 更多