【问题标题】:Decrypt Kafka Avro Message解密 Kafka Avro 消息
【发布时间】:2016-08-28 11:11:01
【问题描述】:

我正在尝试解析 kafka 消息,它采用某种加密的 AVRO 格式。我有以下 AvroSchema.avsc avro 架构文件:

{
    "type": "record",
    "namespace": "kafka.events",
    "name": "AvroSchema",
        "fields": [
            { "name": "product_id", "type": "string" },
            { "name": "available_to_promise_quantity", "type": "double" },
            { "name": "online_available_to_promise_quantity", "type": "double" },
            { "name": "stores_available_to_promise_quantity", "type": "double" },
            { "name": "is_infinite_inventory", "type": "boolean", "default" : false },
            { "name": "event_timestamp", "type": "long" },
            { "name": "previous_event", "type": "AvroSchema" }
        ]
 }

现在,我编写了以下代码来获取 JSON 格式的数据:

for (final KafkaStream<byte[], byte[]> stream : streams){
    ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
    byte[] consumedEncryptedMessage;
    MessageAndMetadata<byte[], byte[]> consumedEntry;
    while(consumerIterator.hasNext()){
        consumedEntry = consumerIterator.next();
            if(null != consumedEntry){
                consumedEncryptedMessage = consumedEntry.message();
                    try {
                            Schema schema = null;
                            schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc"));
                            DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
                            Decoder decoder = DecoderFactory.get().binaryDecoder(consumedEncryptedMessage , null);
                            GenericRecord decryptedmsg = null;
                            decryptedmsg = reader.read(null, decoder);
                            System.out.println(decryptedmsg);
                        }
                        catch(Exception e) {
                            e.printStackTrace();
                            System.out.println(e);
                        }

请帮助我如何解密消息。

加密的字节消息属于这种类型:080-21-0001 :�Aw�@@��A�ǐ�U :�Aw�@@��A

我按照建议进行了更改,现在我有以下代码:

while(consumerIterator.hasNext()){
    consumedEntry = consumerIterator.next();
        if(null != consumedEntry){
            consumedEncryptedMessage = consumedEntry.message();
                try {
                    Schema schema = new Schema.Parser().parse(new File("src/AVROSchema.avsc"));
                    File myfile = new File("/Users/z001ldc/Desktop/myfile.txt");
                    DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
                    FileUtils.writeByteArrayToFile(myfile, consumedEncryptedMessage);
                    @SuppressWarnings("resource")
                    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myfile, reader);
                    while (dataFileReader.hasNext()) {
                        decryptedMessage = dataFileReader.next(decryptedMessage);
                        System.out.println(decryptedMessage.get("product_id").toString());
                    }
                }
                catch(Exception e) {
                    e.printStackTrace();
                    System.out.println(e);
                }

但我仍然收到“不是数据文件”的错误。

【问题讨论】:

    标签: java json stack-overflow apache-kafka avro


    【解决方案1】:

    反序列化不需要解密

    首先你得到你的架构行

    schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc"));
    DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
    

    然后

    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(consumedEncryptedMessage, reader);
    GenericRecord user = null;
    while (dataFileReader.hasNext()) {
    // Reuse user object by passing it to next(). This saves us from
    // allocating and garbage collecting many objects for files with
    // many items.
    user = dataFileReader.next(user);
    System.out.println(user);
    

    【讨论】:

    • DataFileReader dataFileReader = new DataFileReader(consumedEncryptedMessage, reader);此行显示错误,因为 consumeEntryMessage 不是文件,它是 byte[] 类型。我应该将消息作为文件使用吗?
    • Ohhh .. 没有看到您有一系列事件,如模式 ATPEvent 中指定的那样。您能否尝试从您的架构中删除它以进行测试并尝试说 System.out.println(result.get("product_id").toString()) 。希望那是罪魁祸首,您的其余代码看起来不错。
    • 有没有办法只在字节数组中使用消耗的entrymsg 而不是文件,因为我只将它作为一个字节数组。
    猜你喜欢
    • 1970-01-01
    • 2016-11-10
    • 2016-12-07
    • 1970-01-01
    • 2016-10-08
    • 1970-01-01
    • 1970-01-01
    • 2018-01-12
    • 2021-04-13
    相关资源
    最近更新 更多