【问题标题】:Getting NotSerializableException error in storm topology在风暴拓扑中出现 NotSerializableException 错误
【发布时间】:2018-10-15 08:27:22
【问题描述】:

风暴版本:1.2.1, Java版本:8

我正在 scala 中编写风暴拓扑,并在集群模式下运行时开始出现以下错误。在LocalCluster 模式下,我也可以通过配置:conf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, Boolean.box( true)) 获得相同的结果。以下是痕迹:

2018-05-05 00:49:59,342 ERROR util [Thread-37-disruptor-executor[6 6]-send-queue] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$consume_loop_STAR_$fn__4492.invoke(disruptor.clj:84) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) ~[storm-core-1.2.1.jar:1.2.1]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$assert_can_serialize.invoke(worker.clj:133) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$mk_transfer_fn$fn__5204.invoke(worker.clj:213) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4882.invoke(executor.clj:314) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
    ... 6 more
Caused by: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_131]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_131]
    at org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:38) ~[storm-core-1.2.1.jar:1.2.1]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$assert_can_serialize.invoke(worker.clj:133) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.worker$mk_transfer_fn$fn__5204.invoke(worker.clj:213) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4882.invoke(executor.clj:314) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
    ... 6 more

似乎storm正在尝试序列化ObjectNode,这是无法做到的并给出NotSerializableException

ObjectNode 不应该是serializable 吗?我看到一个关于这个here 的旧讨论,但觉得这应该是serializable

我尝试在风暴配置中添加以下内容,但没有帮助。

conf.registerSerialization(classOf[com.fasterxml.jackson.databind.node.ObjectNode])

我也尝试添加conf.setSkipMissingKryoRegistrations(false),但同样没有救援。

什么是合适的解决方案?

【问题讨论】:

    标签: java scala serialization apache-storm kryo


    【解决方案1】:

    ObjectNode 不可序列化(它没有实现 Serializable 接口)。

    conf.setSkipMissingKryoRegistrations(false) 是默认设置。请参阅https://storm.apache.org/releases/2.0.0-SNAPSHOT/Serialization.html,其中描述了此属性的作用。我不认为你想在你的情况下改变它。

    conf.registerSerialization(ObjectNode.class); 添加到拓扑配置应该可以工作,不知道为什么它不适合你。如果你不能让它工作,你可以通过序列化来解决它,例如在发出值之前映射或字符串。

    【讨论】:

      【解决方案2】:

      从@Stig 的answeranswer 中获得灵感,每当在螺栓而不是我的对象之间传递它时,我都会序列化对象的。所以现在我在螺栓中发送这样的字节数组:

      val messages = input.asInstanceOf[TupleImpl].get("Request").asInstanceOf[Array[Byte]].getObj[List[myObject]]
      val objMapper = new ObjectMapper()
      messages.foreach(message => collector.emit(new Values(objMapper.writeValueAsBytes(message))))
      

      编辑 1:

      另一种可能的解决方法似乎是(没有尝试过,我通过发送字节来解决)是为您从一个螺栓传递到另一个螺栓的对象编写一个序列化器类,如here 所述。以下是来自此链接的示例序列化程序:

      public class StockAvroSerializer extends Serializer<Stock> {
      
          private static final Logger LOG = LoggerFactory.getLogger(StockAvroSerializer.class);
          private Schema SCHEMA = Stock.getClassSchema();
      
          public void write(Kryo kryo, Output output, Stock object) {
              DatumWriter<Stock> writer = new SpecificDatumWriter<>(SCHEMA);
              ByteArrayOutputStream out = new ByteArrayOutputStream();
              BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
              try {
                  writer.write(object, encoder);
                  encoder.flush();
              } catch (IOException e) {
                  LOG.error(e.toString(), e);
              }
              IOUtils.closeQuietly(out);
              byte[] outBytes = out.toByteArray();
              output.writeInt(outBytes.length, true);
              output.write(outBytes);
          }
      
          public Stock read(Kryo kryo, Input input, Class<Stock> type) {
              byte[] value = input.getBuffer();
              SpecificDatumReader<Stock> reader = new SpecificDatumReader<>(SCHEMA);
              Stock record = null;
              try {
                  record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
              } catch (IOException e) {
                  LOG.error(e.toString(), e);
              }
              return record;
          }
      }
      

      编辑 2:

      Here是我找到ObjectNode不能序列化的原因了:

      JsonNode 不知道如何序列化自己,只有在序列化时可用的信息:没有 ObjectMapper 或 JsonGenerator 可以使用;后者是它必须序列化自己的组件(如果有的话,还有内容)。它不能也应该尝试实例化(它们应该如何配置?);而静态单例往往会在更大的系统中引起问题(一部分尝试以一种方式配置它们,另一部分尝试不同)

      但这是相当古老的通信方式,在新版本中,我相信应该有一些机制可以使其可序列化。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2013-08-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-08-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多