【问题标题】:Flink Collector issue when Collection Object with Map of Object class使用 Object 类的 Map 收集对象时的 Flink 收集器问题
【发布时间】:2020-09-29 19:21:19
【问题描述】:

我面临一个问题,当我从 flink flatmap 收集器收集对象时,我没有正确收集到值。我得到了对象引用,但它没有给我实际价值。

dataStream.filter(new FilterFunction<GenericRecord>() {
      @Override
      public boolean filter(GenericRecord record) throws Exception {
        if (record.get("user_id") != null) {
          return true;
        }
        return false;
      }
    }).flatMap(new ProfileEventAggregateFlatMapFunction(aggConfig))
        .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>() {
          @Override
          public String map(
              ProfileEventAggregateEmittedTuple profileEventAggregateEmittedTupleNew)
              throws Exception {
            String res=null;
            try {
              ObjectMapper mapper = new ObjectMapper();
              mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
              res= mapper.writeValueAsString(profileEventAggregateEmittedTupleNew);
            } catch (Exception e) {
              e.printStackTrace();
            }
            return res;
          }
        }).print();




public class ProfileEventAggregateFlatMapFunction extends
    RichFlatMapFunction<GenericRecord, ProfileEventAggregateEmittedTuple> {

  private final ProfileEventAggregateTupleEmitter aggregator;
  ObjectMapper mapper = ObjectMapperPool.getInstance().get();

  public ProfileEventAggregateFlatMapFunction(String config) throws IOException {
    this.aggregator = new ProfileEventAggregateTupleEmitter(config);
  }

  @Override
  public void flatMap(GenericRecord event,
      Collector<ProfileEventAggregateEmittedTuple> collector) throws Exception {
    try {

      List<ProfileEventAggregateEmittedTuple> aggregateTuples = aggregator.runAggregates(event);

      for (ProfileEventAggregateEmittedTuple tuple : aggregateTuples) {

        collector.collect(tuple);
      }
}}

调试结果: 我在收集器中收集的元组

tuple = {ProfileEventAggregateEmittedTuple@7880} 
 profileType = "userprofile"
 key = "1152473"
 businessType = "keyless"
 name = "consumer"
 aggregates = {ArrayList@7886}  size = 1
  0 = {ProfileEventAggregate@7888} "geo_id {geo_id=1} {keyless_select_destination_cnt=1, total_estimated_distance=12.5}"
   entityType = "geo_id"
   dimension = {LinkedHashMap@7891}  size = 1
    "geo_id" -> {Integer@7897} 1
     key = "geo_id"
     value = {Integer@7897} 1
   metrics = {LinkedHashMap@7892}  size = 2
    "keyless_select_destination_cnt" -> {Long@7773} 1
     key = "keyless_select_destination_cnt"
     value = {Long@7773} 1
    "total_estimated_distance" -> {Double@7904} 12.5
     key = "total_estimated_distance"
     value = {Double@7904} 12.5

这是我的地图函数 .map(new MapFunction()

 profileEventAggregateEmittedTuple = {ProfileEventAggregateEmittedTuple@7935} 
 profileType = "userprofile"
 key = "1152473"
 businessType = "keyless"
 name = "consumer"
 aggregates = {GenericData$Array@7948}  size = 1
  0 = {ProfileEventAggregate@7950} "geo_id {geo_id=java.lang.Object@863dce2} {keyless_select_destination_cnt=java.lang.Object@7cdb4bfc, total_estimated_distance=java.lang.Object@52e81f57}"
   entityType = "geo_id"
   dimension = {HashMap@7952}  size = 1
    "geo_id" -> {Object@7957} 
     key = "geo_id"
     value = {Object@7957} 
      Class has no fields
   metrics = {HashMap@7953}  size = 2
    "keyless_select_destination_cnt" -> {Object@7962} 
     key = "keyless_select_destination_cnt"
     value = {Object@7962} 
      Class has no fields
    "total_estimated_distance" -> {Object@7963} 

请帮助我了解为什么我没有得到正确的数据。

public class ProfileEventAggregateEmittedTuple implements Cloneable, Serializable {
  private String profileType;
  private String key;
  private String businessType;
  private String name;
  private List<ProfileEventAggregate> aggregates = new ArrayList<ProfileEventAggregate>();
  private long startTime;
  private long endTime;

  public String getProfileType() {
    return profileType;
  }

  public void setProfileType(String profileType) {
    this.profileType = profileType;
  }

  public String getKey() {
    return key;
  }

  public void setKey(String key) {
    this.key = key;
  }

  public String getBusinessType() {
    return businessType;
  }

  public void setBusinessType(String businessType) {
    this.businessType = businessType;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public List<ProfileEventAggregate> getAggregates() {
    return aggregates;
  }

  public void addAggregate(ProfileEventAggregate aggregate) {
    this.aggregates.add(aggregate);
  }

  public void setAggregates(List<ProfileEventAggregate> aggregates) {
    this.aggregates = aggregates;
  }

  public long getStartTime() {
    return startTime;
  }

  public void setStartTime(long startTime) {
    this.startTime = startTime;
  }

  public long getEndTime() {
    return endTime;
  }

  public void setEndTime(long endTime) {
    this.endTime = endTime;
  }

 @Override
  public ProfileEventAggregateEmittedTuple clone() {
    ProfileEventAggregateEmittedTuple clone = new ProfileEventAggregateEmittedTuple();

    clone.setProfileType(this.profileType);
    clone.setKey(this.key);
    clone.setBusinessType(this.businessType);
    clone.setName(this.name);

    for (ProfileEventAggregate aggregate : this.aggregates) {
      clone.addAggregate(aggregate.clone());
    }
    return clone;
  }

public class ProfileEventAggregate  implements Cloneable, Serializable {

  private String entityType;
  private Map<String, Object> dimension =new LinkedHashMap<String, Object>();
  private Map<String, Object> metrics = new LinkedHashMap<String, Object>();

  public Map<String, Object> getDimension() {
    return dimension;
  }

  public void setDimension(Map<String, Object> dimension) {
    this.dimension.putAll(dimension);
  }

  public void addDimension(String dimensionKey, Object dimensionValue) {
    this.dimension.put(dimensionKey, dimensionValue);
  }

  public Map<String, Object> getMetrics() {
    return metrics;
  }
  public void addMetric(String metricKey, Object metricValue) {
    this.metrics.put(metricKey, metricValue);
  }
  public void setMetrics(Map<String, Object> metrics) {
    this.metrics.putAll(metrics);
  }
  public String getEntityType() {
    return entityType;
  }
  public void setEntityType(String entityType) {
    this.entityType = entityType;
  }

  @Override
  public ProfileEventAggregate clone()  {
    ProfileEventAggregate clone = new ProfileEventAggregate();

    clone.setEntityType(this.entityType);
    clone.getDimension().putAll(this.getDimension());
    clone.getMetrics().putAll(this.metrics);
    return clone;
  }

【问题讨论】:

    标签: java java-8 apache-flink flink-streaming


    【解决方案1】:

    当您不enableObjectReuse 时,将使用您配置的序列化程序(似乎是Avro?)复制对象。

    在您的情况下,您使用 Map 无法推断出合理的模式。

    最简单的解决方法是发送至enableObjectReuse。否则,请确保您的序列化程序与您的数据匹配。因此,您可以在使用 AvroSerializer#copy 的地方添加一个单元测试,并确保您的 POJO 是 properly annotated,如果您想坚持使用 Avro 反射,甚至更好地使用架构优先方法,您可以在其中使用 generate your Java POJO with a Avro schema 并使用特定的 Avro。

    让我们讨论一些替代方案:

    • 使用GenericRecord。不要将其转换为 Java 类型,而是直接访问 GenericRecord。这通常是完整记录灵活的唯一方法(例如,您的工作接受任何输入并将其写入 S3)。
    • 非规范化架构。您可以使用class EventInformation { int id; String predicate; Object value; },而不是使用一些class Event { int id; Map&lt;String, Object&gt; data; }。您需要对所有信息进行分组以进行处理。但是,您会在使用 Avro 时遇到相同类型的问题。
    • 使用宽模式。查看前面的方法,如果事先知道不同的谓词,那么您可以使用它来制作宽模式class Event { int id; Long predicate1; Integer predicate2; ... String predicateN; },其中所有条目都可以为空,并且其中大多数确实是null。编码null 非常便宜。
    • 放弃 Avro。 Avro 是全类型的。您可能想要使用更动态的东西。 Protobuf 有Any 支持任意子消息。
    • 使用 Kryo。 Kryo 可以序列化任意对象树,但代价是速度较慢且开销更大。

    如果要写入数据,还需要考虑添加类型信息以进行正确反序列化的解决方案。例如,查看此JSON question。但是还有更多的方法来实现它。

    【讨论】:

    • 谢谢。我添加了 pojo 类,让我知道那里出了什么问题。
    • 这就像我担心的那样:您使用Map&lt;String, Object&gt; 无法推断出合理的模式。对于指标,您可能想要使用Map&lt;String, Long&gt;Map&lt;String, Double&gt;。如果dimension 捕获任意值,您可能不得不求助于Map&lt;String, String&gt;。请注意,一般而言,这些模式在序列化成本和计算方面完全没有效率。使用可空类型的宽模式通常要好得多。
    • 我想让它泛化。我没有为每个事件使用特定的类,因为有很多事件,每次有新事件发生时我都必须更改代码。我保留了 Map 的指标,因为它可以是 Long、Int、Double 任何类型的值。同样,维度值也可以是字符串、整数、长整数等。什么是更好的设计来使其泛化。请建议。你能用可为空的类型详细说明宽模式吗
    • 感谢 Arvid 提供有关这些方法的详细信息。我在序列化部分不是很好,你能否推荐一些好东西来阅读以获得一些深入的知识。另外,我仍然不清楚通过使用 enableObjectReuse 它将如何工作。
    • 在这里你可以阅读更多关于enableObjectReuseserialization in Flink的信息。
    猜你喜欢
    • 2020-03-23
    • 2015-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-08-10
    • 2012-07-26
    • 1970-01-01
    相关资源
    最近更新 更多