【问题标题】:How to get state for multiple keyBy in flink using queryable state client?如何使用可查询状态客户端在 flink 中获取多个 keyBy 的状态?
【发布时间】:2018-04-11 05:09:21
【问题描述】:

我使用的是 Flink 1.4.2,我有一个场景需要使用两个键。 例如

KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));

值描述会

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
        descriptor.setQueryable(queryableStateName);

谁能建议我使用可查询状态客户端获取 flink 中多个键的状态?

QueryableClient 下面对于单个键“clusterId”运行良好。

kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);

多个键的 type_info 应该是什么?任何与此相关的建议/示例或参考都会非常有帮助?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    我找到了解决办法。

    我在 valueStateDescription 中给出了 TypeHint。

    在 Flink 作业中:

    TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});
    
    ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
    

    在客户端:

    ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
    

    我有两个键,所以我使用了 Tuple2 类并设置我的键的值,如下所示。 注意:如果你有两个以上的键,那么你必须根据你的键选择 Tuple3、Tuple4 类。

     Tuple2<String, String> tuple = new Tuple2<>();
     tuple.f0 = clusterId;
     tuple.f1 = ssid;
    

    然后我提供了 TypeHint。

    TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};
    
    CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);
    

    在上面的代码中,getState 方法将返回 ImmutableValueState 所以我需要像下面这样得到我的 pojo。

    ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();
    
    totalUsage = state.value();
    

    【讨论】:

    • 感谢您发布解决您自己问题的方法,这对其他人有帮助。 Flink 文档是否缺少此信息?如果是这样,请打开一个 Jira 问题添加它,这样它将使更多人受益,谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-11-12
    • 1970-01-01
    • 1970-01-01
    • 2016-02-07
    • 2020-01-23
    • 2013-05-07
    • 1970-01-01
    相关资源
    最近更新 更多