【问题标题】:flink: getting byte[] data from kafkaflink:从kafka获取字节[]数据
【发布时间】:2015-10-28 18:34:03
【问题描述】:

我正在使用 flink-1.0-SNAPSHOT 来使用来自 kafka 的数据。数据以 Snappy 压缩 byte[] 的形式传入,传递给 thrift 供以后使用。

当我使用 flink 检索数据时,它会以某种方式损坏或处理不当,以至于无法解压缩。代码来源于this示例,如下:

DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<String, String>() {

    @Override public String map(String value) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
 });

isValidCompressedBuffer 每次都返回 false。

众所周知,通过其他途径使用的数据是好的。

我错过了什么?


解决方案:

我发布此内容是因为我找不到任何使用 RawSchema 的示例。

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));

    dataStream.map(new MapFunction<byte[], Object>() {
        @Override
        public Object map(byte[] bytes) throws Exception {
            boolean bvali = Snappy.isValidCompressedBuffer(bytes);

            });
            return 0;
        }
    }).print();
    env.execute();
}

【问题讨论】:

  • 如何将数据输入 Kafka?
  • 公司其他部门的一些制作人,我无法控制。可以说我可以使用其他方式读取数据,所以我知道格式。我想通过 flink 让它工作,这样我就可以将流速与我们现有的管道进行比较。
  • Kafka 开箱即用地进行快速压缩。您不需要解压缩数据 - 消费者获得解压缩的数据。
  • @AnatolyDeyneka - 到目前为止,这还不是我的经验。我将节俭对象发送到 Kafka 并使用解压缩->反序列化过程来获取消费者端的位。只要我们使用 Kafka,就一直是这样(至少对我而言)。
  • 好的。据我了解,您不使用 kafka 压缩,而是放置了已压缩的对象。在这种情况下,问题可能出在“字符串值”中。将字节读取为字符串是不正确的。你能把它改成 map(byte[] value) 再试一次吗?

标签: apache-kafka snappy apache-flink


【解决方案1】:

将字节消息作为字符串读取是不正确的。 您应该按原样读取字节然后解压缩:

public Object map(byte[] bytes) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
    ...

【讨论】:

    猜你喜欢
    • 2018-05-09
    • 2016-11-08
    • 2021-01-05
    • 1970-01-01
    • 1970-01-01
    • 2017-08-13
    • 2018-02-01
    • 2021-01-09
    • 2020-09-18
    相关资源
    最近更新 更多