【问题标题】:Can not modify value in JavaRDD无法修改 JavaRDD 中的值
【发布时间】:2018-12-05 06:30:08
【问题描述】:

我有一个关于如何更新 JavaRDD 值的问题。

我有一个JavaRDD<CostedEventMessage>,其中包含有关应该将其写入 kafka 主题分区的信息的消息对象。

我正在尝试使用以下代码更改此类对象的 partitionId 字段:

rddToKafka = rddToKafka.map(event -> repartitionEvent(event, numPartitions));

repartitionEvent 逻辑在哪里:

costedEventMessage.setPartitionId(1);
return costedEventMessage;

但是修改没有发生。

您能否告知为什么以及如何正确修改 JavaRDD 中的值?

【问题讨论】:

    标签: java apache-spark rdd


    【解决方案1】:

    Spark 是惰性的,因此从您粘贴在上面的代码中,不清楚您是否真的对 JavaRDD 执行了任何操作(例如 collectforEach),以及您是如何得出数据未更改的结论的。

    例如,如果您假设通过运行以下代码:

    List<CostedEventMessage> messagesLst = ...;
    JavaRDD<CostedEventMessage> rddToKafka = javaSparkContext.parallelize(messagesLst);
    rddToKafka = rddToKafka.map(event -> repartitionEvent(event, numPartitions));
    

    messagesLst 中的每个元素都将分区设置为 1,你错了。 如果您添加例如,那将是正确的:

    messagesLst = rddToKafka.collect();
    

    更多详情请参考documentation

    【讨论】:

    • 在我的情况下,我必须返回JavaRDD&lt;CostedEventMessage&gt;,并且不可能调用collect()。有什么解决方案可以用来修改JavaRDD&lt;CostedEventMessage&gt;
    • 返回它有什么用途?之后你用 RDD 做什么?
    • 我可以使用任何其他方法来强制 Spark 进行这些更改并再次获取 JavaRDD?
    • 外部代码使用 JavaRDD,我在 kafka 主题中返回它
    • 所以使用这个RDD的外部代码会通过对其执行一个动作来触发“修改”。目前尚不清楚您是如何得出未应用更改的结论的,我的意思是-您检查了哪些对象?您无法检查 RDD 本身。我真的建议你阅读文档
    猜你喜欢
    • 1970-01-01
    • 2016-04-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多