【发布时间】:2020-12-17 05:52:39
【问题描述】:
我正在使用 Flink 1.11(通过 Python API 和 Anaconda 虚拟环境)和 Kafka 作为我的源和接收器。我正在将我的 Flink 作业提交到集群。所有都在 Docker(本地)上运行。
由于我是新手,所以我现在已经设置它,它本质上是使用一些窗口作为传递并慢慢构建它。
到目前为止我的设置:
if __name__ == "__main__":
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
table_config = TableConfig()
table_env = StreamTableEnvironment.create(env, table_config)
## Setup environment
# Use our previously configured Anaconda environment
table_env.add_python_archive("venv.zip")
table_env.get_config().set_python_executable("venv.zip/venv/bin/python")
shared_fields = {'a': DataTypes.STRING(), 'b': DataTypes.STRING(), 'c': DataTypes.STRING()}
source_data_topic = "eddn_topic"
table_env.connect(
Kafka()
.version("0.11")
.topic("test_sink")
.property("bootstrap.servers", bootstrap_host)
) \
.with_format(
Json()
.fail_on_missing_field(False)
) \
.with_schema(
Schema()
.fields(shared_fields)
) \
.create_temporary_table("stream_sink") \
source_ddl = f"""
CREATE TABLE testSource(
a STRING,
b STRING,
c STRING,
`timestamp` TIMESTAMP(3),
WATERMARK FOR `timestamp` AS `timestamp`
) with (
'connector' = 'kafka-0.11',
'properties.bootstrap.servers' = '{bootstrap_host}',
'topic' = 'test_source',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
'json.fail-on-missing-field' = 'false',
'json.timestamp-format.standard' = 'ISO-8601',
'json.ignore-parse-errors' = 'false'
)
"""
table_env.execute_sql(source_ddl)
# Setup a 10-second Tumbling window
table = table_env.from_path("testSource") \
.select("a, b, c, timestamp") \
.window(Tumble.over("10.second").on("timestamp").alias("testWindow")) \
.group_by("testWindow, a, b, c") \
.select("*")
是的,我正在混合使用 execute_sql() 和 connect() 来设置我的表格,但这是为了我的学习目的。
从这里开始,这工作正常,消息出现在新的 Kafka 主题中:
table.insert_into("stream_sink")
table_env.execute("TestEnrichmentJob")
但是,即使转换为 Pandas DataFrame 并返回也不会产生消息:
pandasTable = table.to_pandas()
enriched_table = table_env.from_pandas(pandasTable, [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()])
enriched_table.insert_into("stream_sink")
table_env.execute("TestEnrichmentJob")
在 Flink Web 界面中观察作业可以看出,这个 sink 任务正在接收数据但没有发送任何数据(作业也没有失败,只是继续运行)。 Kafka 显示消息正在从源主题中消费,但没有在接收器主题中生成。
我觉得我错过了一些明显的东西,我是流数据的新手。
- 我错过了什么吗?
- 一旦我需要进行更高级的操作,是否需要将其实现为Python UDFs?还是可以直接写成“正常”的 Pandas 操作?
【问题讨论】:
-
FWIW,正在准备这方面的教程:github.com/apache/flink/pull/13273/files。希望这将很快合并并在主文档中可用。
标签: python pandas apache-kafka apache-flink