【问题标题】:Scala ListBuffer cannot be used as a POJO type in FlinkScala ListBuffer 不能用作 Flink 中的 POJO 类型
【发布时间】:2019-12-18 13:10:27
【问题描述】:

当我运行下面的代码时。日志打印:

class scala.collection.mutable.ListBuffer 不包含字段 scala$collection$mutable$ListBuffer$$start 的设置器

Class class scala.collection.mutable.ListBuffer 不能用作 POJO 类型,因为并非所有字段都是有效的 POJO 字段,必须作为 GenericType 处理。

代码:

private lazy val schoolDescriptor = new ListStateDescriptor[School]("schoolDescriptor", classOf[School])


context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new School))

类定义:

class School {
   var classes: ListBuffer[Class] = ListBuffer()
}

class Class {
   var students: ListBuffer[Class] = ListBuffer()
}

class Student {
   var name = ""
}

如果POJO有ListBuffer类型字段,而ListBuffer的元素也有ListBuffer类型字段怎么办?

【问题讨论】:

  • 您为什么使用ListBuffer 而不是ListArray?它最好用于优化算法,而不是作为公共字段中的一流数据类型。
  • 可变性 + 分布式计算 = 灾难 - 在尝试使用它们之前,您可能首先尝试了解这些框架是如何工作的。然后,很清楚为什么这不起作用,为什么即使它起作用也不是一个好主意。

标签: scala apache-flink flink-streaming


【解决方案1】:

在 cmets 中已经有一些提示可以解决您关于不变性的问题。

一般来说,我也建议这样做,因为当您使用 Flink 状态时,通用 API 合同是 如果你更新你的状态对象(schoolDescriptor),你必须用它调用 state#update。

这可能适用于堆状态而不调用更新(并不总是由 API 保证),但不会起作用,例如用于 RocksDB 状态后端。 如果您使用纯 POJO [1],序列化也容易得多。

在您的非 POJO 情况下,一般方法是实现您的自定义 org.apache.flink.api.common.typeutils.TypeSerializer 或注册自定义序列化器 [2] 使用另一个状态描述符构造函数:ListStateDescriptor(String name, TypeSerializer typeSerializer) 或重构您的类以支持开箱即用的序列化 [3]。

来自安德烈

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types

[2]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html

[3]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-09-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多