【问题标题】:Flink Count of Events using metric使用度量的 Flink 事件计数
【发布时间】:2020-06-10 06:50:01
【问题描述】:

我在 kafka 中有一个主题,我在其中获取 json 格式的多种类型的事件。我创建了一个文件流接收器,通过分桶将这些事件写入 S3。

FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
                new SimpleStringSchema(),
                properties);
        final StreamingFileSink<Object> errorSink = StreamingFileSink
                .forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
                .withBucketAssigner(new EventTimeBucketAssignerJson())
                .build();

        env.addSource(errorTopicConsumer)
                .name("error_source")
                .setParallelism(1)
                .addSink(errorSink)
                .name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {

    @Override
    public String getBucketId(Object record, Context context) {
        StringBuffer partitionString = new StringBuffer();
        Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
        try {
            partitionString.append("event_name=")
                    .append(tuple3.f0).append("/");

            String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
            partitionString.append(timePartition);
        } catch (Exception e) {
            partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
                    .append("month=").append(Constants.DEFAULT_MONTH).append("/")
                    .append("day=").append(Constants.DEFAULT_DAY);
        }
        return partitionString.toString();
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

现在我想将每个事件的每小时计数发布为 prometheus 的指标,并在其上发布一个 grafana 仪表板。

所以请帮助我如何使用 flink 指标实现每个事件的每小时计数并发布到 prometheus。

谢谢

【问题讨论】:

    标签: stream apache-flink flink-streaming


    【解决方案1】:

    通常,这是通过简单地为请求创建一个计数器然后使用 Prometheus 中的rate() 函数来完成的,这将为您提供给定时间内的请求率。

    但是,如果您出于某种原因想要自己执行此操作,那么您可以执行类似于 org.apache.kafka.common.metrics.stats.Rate 中所做的操作。因此,在这种情况下,您需要收集样本列表及其收集时间,以及您要用于计算速率的窗口大小,然后您可以简单地进行计算,即删除样本超出范围并已过期,然后只需计算窗口中有多少样本。

    然后您可以将Gauge 设置为计算值。

    【讨论】:

    • 谢谢多米尼克。正如您提到的创建计数器,但问题是我想为每种事件类型单独计数。所以在我的数据中有一个名为 event_name 的字段,我想要计数对应于每个事件。
    • 从技术上讲,您可以简单地为每个 EventType 创建一个带有 Counter 的 Map 并为每个事件递增它们。
    • 我无法完全了解您。能否提供示例代码。
    猜你喜欢
    • 1970-01-01
    • 2023-03-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多