【问题标题】:How to stream data from kafka avro console to HDFS using kafka-connect-hdfs?如何使用 kafka-connect-hdfs 将数据从 kafka avro 控制台流式传输到 HDFS?
【发布时间】:2018-10-08 12:18:20
【问题描述】:

我正在尝试运行 kafka-connect-hdfs,但没有成功。

我在 .bash_profile 中添加了以下行并运行了 'source ~/.bash_profile'

export LOG_DIR=~/logs

quickstart-hdfs.properties配置文件是

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
hdfs.url=xxx.xxx.xxx.xxx:xxxx # placeholder
flush.size=3

hadoop.conf.dir = /etc/hadoop/conf/
logs.dir = ~/logs
topics.dir = ~/topics
topics=test_hdfs

我正在遵循中概述的快速入门说明 https://docs.confluent.io/current/connect/connect-hdfs/docs/hdfs_connector.html

connector-avro-stanalone.properties 文件的内容是:

bootstrap.servers=yyy.yyy.yyy.yyy:yyyy # This is the placeholder for the Kafka broker url with the appropriate port
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets

我的主目录中有 quickstart-hdfs.properties 和 connector-avro-stanalone.properties,我运行:

confluent load hdfs-sink -d quickstart-hdfs.properties

我不确定如何访问我的主目录中 connector-avro-stanalone.properties 文件中的信息。

当我运行:'confluent log connect'时,我收到以下错误:

[2018-04-26 17:36:00,217] INFO Couldn't start HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:90)
org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
        at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:56)
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:213)
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:101)
        at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:82)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:267)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:163)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:51)
        ... 12 more
Caused by: java.io.IOException: No FileSystem for scheme: xxx.xxx.xxx.xxx -> This is the hdfs_url in quickstart-hdfs.properties file without the port
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
        at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2691)
        at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:420)
        at io.confluent.connect.hdfs.storage.HdfsStorage.<init>(HdfsStorage.java:56)
        ... 17 more
[2018-04-26 17:36:00,217] INFO Shutting down HdfsSinkConnector. (io.confluent.connect.hdfs.HdfsSinkTask:91)

任何解决此问题的帮助将不胜感激。

【问题讨论】:

    标签: hadoop apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    原因:java.io.IOException: No FileSystem for scheme: xxx.xxx.xxx.xxx

    您需要hdfs.url=hdfs://xxx.yyy.zzz.abc 以及名称节点端口

    此外,您需要删除属性文件中等号周围的空格

    【讨论】:

    • 嗨@cricket_007,感谢您的回复。在配置文件中,hdfs.url 已经具有来自 /etc/hadoop/conf/hdfs-site.xml 中的属性值的 url 以及相应的端口,其名称为 dfs.namenode.https-地址。我有两个 hdfs-site.xml 文件,一个位于 /etc/hadoop/conf/hdfs-site.xml,另一个位于 /usr/hdp/current/hadoop-client/conf/hdfs-site.xml,尽管这些文件是完全相同的。只是从 quickstart-hdfs.properties 中删除空格并不会改变任何东西。另外,我在我的主目录中使用了 quickstart-hdfs.properties。
    • 不是你需要的HTTPS地址。它是 Namenode RPC 地址。 ~/logs~/topics 也必须是 HDFS 上的绝对路径
    • 您好 cricket_007,我按照您的建议进行了更改,但问题仍然存在。谢谢
    • 您需要确保 plugin.path=/usr/share/java/ 是包含 kafka-connect-hdfs 的目录(假设您正在运行 Confluent 4.x)
    • 嗨 cricket_007,我修复了 hdfs.url 并确保 plugin.path 正确。我知道得到一个不同的错误:
    猜你喜欢
    • 2018-03-19
    • 2016-04-27
    • 2017-04-07
    • 1970-01-01
    • 2021-08-20
    • 2019-10-12
    • 2016-12-02
    • 2017-01-08
    • 2017-03-22
    相关资源
    最近更新 更多