【问题标题】:How to read data from RabbitMQ using Apache Beam如何使用 Apache Beam 从 RabbitMQ 读取数据
【发布时间】:2019-05-02 15:44:30
【问题描述】:

我有一个原型 Apache Beam 管道,我尝试使用以下配置从 RabbitMQ 读取数据

        p.apply("read_from_rabbit", RabbitMqIO.read()
                .withUri(options.getRabbitMQUri())
                .withQueue(options.getRabbitMQQueue())
                )
            .apply("extract_json_data", MapElements.via(new RabbitMessageToKafkaMessage()))

当我尝试运行它时,我总是得到

Exception in thread "main" java.lang.NoClassDefFoundError: com/rabbitmq/client/QueueingConsumer$Delivery
    at java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    at java.lang.Class.getDeclaredConstructors(Class.java:2020)
    at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1793)
    at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
    at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:253)
    at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:251)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:250)
    at java.io.ObjectStreamClass.writeNonProxy(ObjectStreamClass.java:735)
    at java.io.ObjectOutputStream.writeClassDescriptor(ObjectOutputStream.java:668)
    at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1282)
    at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
    at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1213)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1120)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:119)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:250)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:35)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:205)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.translateAppliedPTransform(PTransformTranslation.java:369)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:120)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:651)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:666)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:280)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:258)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:208)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:154)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
    at myCompany.myProject.RabbitToKafka.runTransformer(RabbitToKafka.java:54)
    at myCompany.myProject.RabbitToKafka.main(RabbitToKafka.java:61)
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.QueueingConsumer$Delivery
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 48 more

我的理解是,由于我的RabbitMessageToKafkaMessage类读取了RabbitMQ消息,这些消息又包含RabbitMQ数据,特别是注入构造函数的传递信息:public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {这样可以吗?

如果是这样,我如何在读取操作期间将我的RabbitMQMessage 转换为KV

EDIT 1 从 Eclipse 运行我的管道时发生错误。

EDIT 2 该项目是使用 Eclipse 运行的 maven 项目。 我的 Apache Bean 依赖都在 2.12.0 版本(是最新的)。

我的依赖树如下(至少是关于 RabbitMQ 的部分)

myCompany:myProject:jar:0.1.5-SNAPSHOT
+- org.apache.beam:beam-sdks-java-io-rabbitmq:jar:2.12.0:compile
|  \- com.rabbitmq:amqp-client:jar:5.4.3:compile

【问题讨论】:

  • 你是如何运行你的管道的?您是否构建了一个 uberjar 并从命令行运行它?还是直接从您的 IDE 运行它?您是否检查了 amqp-client 是否在运行时类路径中?
  • @ihji 我已经编辑了问题:这段代码是从 Eclipse 运行的。

标签: java serialization rabbitmq apache-beam


【解决方案1】:

看起来,将我以前的 pom 与兼容 Apache Beam 的 pom 合并不是一个好主意,因为它会在两个不同版本的 amqp-client 之间产生冲突。 删除 Spring boot 父 pom 通过删除错误的 amqp-client 版本解决了该错误。

【讨论】:

    【解决方案2】:

    看起来在依赖管理方面存在一些粗糙的边缘。以下步骤可能有助于发现问题:

    1. 确认缺少的类(或amqp-client jar)在运行时类路径中
    2. 构建 uberjar 并尝试从命令行运行它
    3. 尝试使用不同的运行器(FlinkRunner、DataflowRunner)
    4. 尝试使用最新版本的 Beam

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-06-07
      • 1970-01-01
      • 2023-02-02
      • 1970-01-01
      • 1970-01-01
      • 2019-07-18
      • 1970-01-01
      相关资源
      最近更新 更多