【问题标题】:stream analytics 'Invalid Avro Format, drop invalid record.'流分析“无效的 Avro 格式,删除无效记录。”
【发布时间】:2015-10-05 11:36:28
【问题描述】:

我正在尝试使用“Microsoft Avro 库”将我的 C# 类序列化为“Avro”并将其发送到事件中心。但是,当我尝试通过流分析读取数据时,它会在日志中显示此错误“无效的 Avro 格式,删除无效记录”

更多细节.. 使用https://azure.microsoft.com/en-in/documentation/articles/hdinsight-dotnet-avro-serialization/中所示的反射方法序列化为avro格式并发送到事件中心

//Create a new AvroSerializer instance and specify a custom serialization strategy AvroDataContractResolver
        //for serializing only properties attributed with DataContract/DateMember
        var avroSerializer = AvroSerializer.Create<SensorData>();

        //Create a memory stream buffer
        using (var buffer = new MemoryStream())
        {
            //Create a data set by using sample class and struct
            var expected = new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } };

            //Serialize the data to the specified stream
            avroSerializer.Serialize(buffer, expected);
            var bytes = buffer.ToArray();
            var data = new EventData(bytes) {PartitionKey = "deviceId"};
            // send to event hub client
            eventHubClient.Send(data);
        }

事件可以很好地发布到事件中心。我创建了一个工作角色,可以使用这些事件并能够反序列化它们。

但是,当我将此事件中心设置为流分析的输入并将事件序列化格式设置为“avro”时,它会出现以下错误..

消息:无效的 Avro 格式,删除无效记录。

消息:IncorrectSerializationFormat 错误发生得太快。 他们被暂时压制了

我想我也必须包含 Avro Schema。谁能指导我正确的方法将 C# 类序列化为“avro”以便流分析可以理解它?

感谢您的宝贵时间。

【问题讨论】:

    标签: serialization stream analytics avro azure-stream-analytics


    【解决方案1】:

    您必须包含架构。下面是一个如何与 Schema 一起发送事件的示例。这使用了 AvroContainer。

            var eventHubClient = EventHubClient.CreateFromConnectionString("ReplaceConnectionString","ReplaceEventHubPath");
            int numberOfEvents = 10;
            using (var memoryStream = new MemoryStream())
            using (var avroWriter = AvroContainer.CreateWriter<SensorData>(memoryStream, Codec.Null))
            using (var sqWriter = new SequentialWriter<SensorData>(avroWriter, numberOfEvents))
            {
                Enumerable.Range(0, numberOfEvents)
                    .Select(i => new SensorData() { Id = "DeviceId", Value = i })
                    .ToList()
                    .ForEach(data => sqWriter.Write(data));
                memoryStream.Seek(0, SeekOrigin.Begin);
                var eventData = new EventData(memoryStream.ToArray());
                eventHubClient.Send(eventData);
            }
    

    【讨论】:

      猜你喜欢
      • 2018-06-14
      • 1970-01-01
      • 1970-01-01
      • 2012-08-05
      • 1970-01-01
      • 2014-10-26
      • 2023-04-01
      • 1970-01-01
      • 2018-11-16
      相关资源
      最近更新 更多