【问题标题】:Spring Cloud Dataflow - http | kafka and kafka | hdfs - Getting Raw message in HDFSSpring Cloud 数据流 - http |卡夫卡和卡夫卡| hdfs - 在 HDFS 中获取原始消息
【发布时间】:2023-04-06 06:32:01
【问题描述】:

我正在 SCDF(本地服务器 1.7.3)中创建一个基本流,其中我正在配置 2 个流。 1. HTTP -> Kafka 主题 2. Kafka 主题 -> HDFS

流:

stream create --name ingest_from_http --definition "http --port=8000 --path-pattern=/test > :streamtest1"
stream deploy --name ingest_from_http --properties "app.http.spring.cloud.stream.bindings.output.producer.headerMode=raw"

stream create --name ingest_to_hdfs --definition ":streamtest1 > hdfs --fs-uri=hdfs://<host>:8020 --directory=/tmp/hive/sensedev/streamdemo/ --file-extension=xml --spring.cloud.stream.bindings.input.consumer.headerMode=raw" 

我在 /tmp/hive/sensedev/streamdemo/ 位置创建了一个 Hive 托管表

DROP TABLE IF EXISTS gwdemo.xml_test;
CREATE TABLE gwdemo.xml_test(

id int,

name string

 )

ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'

WITH SERDEPROPERTIES (

"column.xpath.id"="/body/id/text()",

"column.xpath.name"="/body/name/text()"


)

STORED AS

INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'

OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'

LOCATION '/tmp/hive/sensedev/streamdemo'

TBLPROPERTIES (

"xmlinput.start"="<body>",

"xmlinput.end"="</body>")

;

测试:

  1. Hive 是否能够读取 XML : 将 xml 文件放在该位置 /tmp/hive/sensedev/streamdemo。

文件内容:&lt;body&gt;&lt;id&gt;1&lt;/id&gt;&lt;name&gt;Test1&lt;/name&gt;&lt;/body&gt;

在表上运行 SELECT 命令时,它正确显示了上述记录。

  1. 当使用 http post 在 SCDF 中发布记录时,我得到了正确的数据 在 Kafka Consumer 但是当我检查 HDFS 时,xml 文件是 正在创建,但我在这些文件中收到原始消息。 示例:

    数据流>http post --target http:///test --data "&lt;body&gt;&lt;id&gt;2&lt;/id&gt;&lt;name&gt;Test2&lt;/name&gt;&lt;/body&gt;" --contentType application/xml

在 Kafka 控制台消费者中,我能够读取正确的 XML 消息:&lt;body&gt;&lt;id&gt;2&lt;/id&gt;&lt;name&gt;Test2&lt;/name&gt;&lt;/body&gt;

 $ hdfs dfs -cat /tmp/hive/sensedev/streamdemo/hdfs-sink-2.xml
[B@31d94539

问题: 1. 我错过了什么?如何在 HDFS 中新创建的 XML 文件中获取正确的 XML 记录?

【问题讨论】:

    标签: hadoop hive apache-kafka hdfs spring-cloud-dataflow


    【解决方案1】:

    HDFS Sink 需要一个 Java 序列化对象。

    【讨论】:

      猜你喜欢
      • 2017-06-12
      • 1970-01-01
      • 2016-05-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-08
      • 2018-03-06
      相关资源
      最近更新 更多