【问题标题】:Flink on Python with Kafka and Pandas won't send to sinkFlink on Python with Kafka 和 Pandas 不会发送到 sink
【发布时间】:2020-12-17 05:52:39
【问题描述】:

我正在使用 Flink 1.11(通过 Python API 和 Anaconda 虚拟环境)和 Kafka 作为我的源和接收器。我正在将我的 Flink 作业提交到集群。所有都在 Docker(本地)上运行。

由于我是新手,所以我现在已经设置它,它本质上是使用一些窗口作为传递并慢慢构建它。

到目前为止我的设置:

if __name__ == "__main__":

  env = StreamExecutionEnvironment.get_execution_environment()
  env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
  table_config = TableConfig()
  table_env = StreamTableEnvironment.create(env, table_config)

  ## Setup environment
  # Use our previously configured Anaconda environment
  table_env.add_python_archive("venv.zip")
  table_env.get_config().set_python_executable("venv.zip/venv/bin/python")

  shared_fields = {'a': DataTypes.STRING(), 'b': DataTypes.STRING(), 'c': DataTypes.STRING()}

  source_data_topic = "eddn_topic"

  table_env.connect( 
      Kafka()
      .version("0.11")
      .topic("test_sink")
      .property("bootstrap.servers", bootstrap_host)
    ) \
    .with_format(
      Json()
      .fail_on_missing_field(False)
    ) \
    .with_schema(
        Schema()
        .fields(shared_fields)
    ) \
    .create_temporary_table("stream_sink") \

  source_ddl = f"""
          CREATE TABLE testSource(
              a STRING,
              b STRING,
              c STRING,
              `timestamp` TIMESTAMP(3),
              WATERMARK FOR `timestamp` AS `timestamp`
          ) with (
              'connector' = 'kafka-0.11',
              'properties.bootstrap.servers' = '{bootstrap_host}',
              'topic' = 'test_source',
              'properties.group.id' = 'testGroup',
              'format' = 'json',
              'scan.startup.mode' = 'latest-offset',
              'json.fail-on-missing-field' = 'false',
              'json.timestamp-format.standard' = 'ISO-8601',
              'json.ignore-parse-errors' = 'false'
          )
          """

  table_env.execute_sql(source_ddl)

  # Setup a 10-second Tumbling window
  table = table_env.from_path("testSource") \
            .select("a, b, c, timestamp") \
            .window(Tumble.over("10.second").on("timestamp").alias("testWindow")) \
            .group_by("testWindow, a, b, c") \
            .select("*")

是的,我正在混合使用 execute_sql()connect() 来设置我的表格,但这是为了我的学习目的。

从这里开始,这工作正常,消息出现在新的 Kafka 主题中:

  table.insert_into("stream_sink") 
  table_env.execute("TestEnrichmentJob")

但是,即使转换为 Pandas DataFrame 并返回也不会产生消息:

  pandasTable = table.to_pandas()
  enriched_table = table_env.from_pandas(pandasTable, [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()])
  enriched_table.insert_into("stream_sink") 

  table_env.execute("TestEnrichmentJob")

在 Flink Web 界面中观察作业可以看出,这个 sink 任务正在接收数据但没有发送任何数据(作业也没有失败,只是继续运行)。 Kafka 显示消息正在从源主题中消费,但没有在接收器主题中生成。

我觉得我错过了一些明显的东西,我是流数据的新手。

  1. 我错过了什么吗?
  2. 一旦我需要进行更高级的操作,是否需要将其实现为Python UDFs?还是可以直接写成“正常”的 Pandas 操作?

【问题讨论】:

标签: python pandas apache-kafka apache-flink


【解决方案1】:

您缺少的一件事:Flink 源永远不会显示任何传入的记录,而 Flink 接收器永远不会显示任何传出的记录。也就是说,Flink Web UI 中显示的numRecordsInnumRecordsOutnumRecordsInPerSecondnumRecordsOutPerSecond 指标仅测量 Flink 内部的流量,而忽略与 Kafka 等外部系统的通信。

编辑:

我自己还没有尝试过,但是即将发布的教程中有一个示例显示如何做到这一点:

enriched_table.execute_insert("stream_sink").get_job_client().get_job_execution_result().result()

【讨论】:

  • 很公平。 Kafka 仍然显示消息正在从我的源主题中被消费,所以我至少知道它们是从那里出来的。
  • 但是,Kafka 接收器主题没有任何内容。就我的问题而言,我提到 Flink 中的指标有点让人分心。
  • 你在用眨眼规划器吗?
  • 我只是使用上面的链接切换到它。不过运气不好。
猜你喜欢
  • 2018-11-04
  • 1970-01-01
  • 2021-11-08
  • 1970-01-01
  • 1970-01-01
  • 2021-02-21
  • 1970-01-01
  • 2019-02-04
  • 2016-10-16
相关资源
最近更新 更多