【问题标题】:create stream on kafka topic using KSQL使用 KSQL 在 kafka 主题上创建流
【发布时间】:2019-03-25 14:12:53
【问题描述】:

以下是我来自 kafka 主题的示例日志

2019-03-04T08:53:03.023Z "cd8cbe" 100.212.212.212 - - [20/Feb/2019:12:13:33 +0000] "GET http://dl-mysite.com/drm/PRIORITY1080/HINDI_MOVIES/somemovie.mp4/video/avc1/4/seg-1281.m4s HTTP/1.1" 200 325040 "-" "Dalvik/2.1.0 (Linux; U; Android 6.0; Le X509 Build/DHXOSOP5801911241S)" "256" "0.000"

我正在尝试使用 KSQL 在上述主题上创建流。下面是我用来创建流的脚本。运行以下流创建脚本后,它返回“流创建”消息,但是 select 语句 (select * from test_duplicate_stream;) 不返回任何内容。

CREATE STREAM test_duplicate_stream (logArrivalTime varchar,edgeid varchar,ip varchar,col1_empty varchar,col2_empty varchar, eventdate varchar,url varchar,response_code int,response_length BIGINT,col3_empty varchar,user_agent varchar,request_length varchar, response_time varchar) WITH (kafka_topic='test_duplicate',VALUE_FORMAT='DELIMITED');

我认为“DELIMITED”不是在这里使用的正确值,因为我的字段不是逗号分隔而是空格分隔。为我的日志创建流的正确方法是什么?

【问题讨论】:

  • 你原题目的格式是什么?
  • 我的主题是空格分隔

标签: apache-kafka ksqldb


【解决方案1】:

KSQL 目前支持:

  • JSON
  • 逗号分隔(分隔)
  • Avro

如果您的数据不是上述格式之一,那么您将无法在不先更改序列化的情况下处理它。

另请参阅Notes on troubleshooting KSQL,了解您的查询何时不返回数据。

【讨论】:

  • 谢谢罗宾!目前我的主题有空格分隔的字段,有什么方法可以将数据从当前主题复制到另一个主题并在复制时更改新主题的格式?
  • 最简单的方法是以更结构化的格式编写数据,但如果这不是一个选项,那么您将不得不以某种方式对其进行预处理,我认为 KSQL 不能目前提供帮助。挑战在于它不仅仅是空格作为分隔符,而且只是引号之外的空格,对吗?因此,虽然包含空格,但它是单个值 "Dalvik/2.1.0 (Linux; U; Android 6.0; Le X509 Build/DHXOSOP5801911241S)"
猜你喜欢
  • 1970-01-01
  • 2021-07-17
  • 2019-06-22
  • 2019-02-10
  • 1970-01-01
  • 2021-10-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多