【问题标题】:my custom flink metrics reporter don't work我的自定义 flink 指标报告器不起作用
【发布时间】:2021-08-25 07:08:46
【问题描述】:

使用 flink 1.13.1 版本

我写了一个自定义指标报告器,但它似乎在我的 flink 中不起作用。 当我启动 flink 时,JobManager 显示警告日志如下:

2021-08-25 14:54:06,243 WARN  org.apache.flink.runtime.metrics.ReporterSetup               [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, org.apache.flink.metrics.graphite.GraphiteReporterFactory, org.apache.flink.metrics.statsd.StatsDReporterFactory, org.apache.flink.metrics.prometheus.PrometheusReporterFactory, org.apache.flink.metrics.jmx.JMXReporterFactory, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
2021-08-25 14:54:06,245 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl          [] - No metrics reporter configured, no metrics will be exposed/reported.

但我已经在 plugins 文件夹中创建了名为“metrics-kafka”的文件夹,而不是打包指标报告器并将 jar 文件复制到“metrics-kafka”或 lib 文件夹(两个文件夹都不起作用)

我的 flink 配置文件:

metrics.reporter.kafka.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka.class: org.apache.flink.metrics.kafka.KafkaReporter
metrics.reporter.kafka.interval: 15 SECONDS

我的指标报告器工厂类:

package org.apache.flink.metrics.kafka

import org.apache.flink.metrics.reporter.{InterceptInstantiationViaReflection, MetricReporter, MetricReporterFactory}
import java.util.Properties

@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
class KafkaReporterFactory extends MetricReporterFactory{
  override def createMetricReporter(properties: Properties): MetricReporter = {
    new KafkaReporter()
  }
}

还有我的记者班:

package org.apache.flink.metrics.kafka

import org.apache.flink.metrics.MetricConfig
import org.apache.flink.metrics.reporter.{InstantiateViaFactory, Scheduled}

@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.kafka.KafkaReporterFactory")
class KafkaReporter extends MyAbstractReporter with Scheduled{
  some code ...
}

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    我发现需要在 '/resources/META-INF/services/' 中添加名为 'org.apache.flink.metrics.reporter.MetricReporterFactory' 的文件,并在此文件中写入工厂类路径,如 'org.apache .flink.metrics.kafka.KafkaReporterFactory'

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-11
      • 1970-01-01
      • 2021-09-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多