【问题标题】:Storm-jms Spout collecting Avro messages and sending down stream?Storm-jms Spout 收集 Avro 消息并向下发送?
【发布时间】:2015-06-19 21:27:54
【问题描述】:

我是 Avro 格式的新手。我正在尝试使用 Storm-Jms spout 从 JMS 队列中收集 Avro 消息,并使用 hdfs bolt 将它们发送到 hdfs。

队列正在发送 avro,但我无法使用 HDFS BOLT 以 avro 格式获取它们。

如何正确收集 avro 消息并将其发送到下游,而不会在 hdfs 中出现编码错误。

【问题讨论】:

  • 您应该将收到的异常消息添加到您的问题中。
  • 嗨 Joshua 我在风暴中没有遇到任何异常我能够从 JMS 读取数据并将其放置在 hdfs 但是在使用 HDFS-bolt 读取放置在 hdfs 中的 .avro 文件时,我收到了错误当我尝试使用 HIVE 读取文件时。这是错误:java.io.IOException: java.io.IOException: Not a data file。
  • 我认为 Storm 需要一些类似于 HDFS BOLT 中 Flume Avroevent 序列化程序的东西。
  • 看起来 Storm 需要机制来将元组序列化为 HDFS 螺栓中的 avro 元组。

标签: hadoop jms message-queue apache-storm avro


【解决方案1】:

现有的 HDFS Bolt 不支持写入 avro 文件,我们需要通过以下更改来克服这个问题。在此示例代码中,我使用从我的 spout 获取 JMS 消息并将这些 JMS 字节消息转换为 AVRO 并将它们发送到 HDFS。

此代码可作为修改 AbstractHdfsBolt 中方法的示例。

public void execute(Tuple tuple) {          
        try {               
            long length = bytesMessage.getBodyLength();
            byte[] bytes = new byte[(int)length];
            ///////////////////////////////////////
            bytesMessage.readBytes(bytes);
            String replyMessage = new String(bytes, "UTF-8");

            datumReader = new SpecificDatumReader<IndexedRecord>(schema);
            decoder = DecoderFactory.get().binaryDecoder(bytes, null);

            result = datumReader.read(null, decoder);                               
            synchronized (this.writeLock) {                 
                dataFileWriter.append(result);                                      
                dataFileWriter.sync();
                this.offset += bytes.length;                    
               if (this.syncPolicy.mark(tuple, this.offset)) {
                   if (this.out instanceof HdfsDataOutputStream) {
                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
                    } else {
                        this.out.hsync();
                        this.out.flush();
                    }
                    this.syncPolicy.reset();
                }
               dataFileWriter.flush();
            }

            if(this.rotationPolicy.mark(tuple, this.offset)){
                rotateOutputFile(); // synchronized
                this.offset = 0;
                this.rotationPolicy.reset();
            }
        } catch (IOException | JMSException e) {
            LOG.warn("write/sync failed.", e);
            this.collector.fail(tuple);
        } 
    }

@Override
void closeOutputFile() throws IOException {
    this.out.close();
}

@Override
Path createOutputFile() throws IOException {
    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
    this.out = this.fs.create(path);
    dataFileWriter.create(schema, out);
    return path;
}

@Override
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
    // TODO Auto-generated method stub
     LOG.info("Preparing HDFS Bolt...");
     try {

            schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
        } catch (IOException e1) {              
            e1.printStackTrace();
        }
     this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
     dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
     JMSAvroUtils JASV = new JMSAvroUtils();         
}

【讨论】:

    猜你喜欢
    • 2017-02-23
    • 2018-08-14
    • 1970-01-01
    • 2020-12-09
    • 2017-12-27
    • 2015-12-28
    • 2014-12-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多