【问题标题】:Is there a way to read previously enqueued messages in ActiveMQ using spark structured streaming?有没有办法使用火花结构化流读取 ActiveMQ 中先前排队的消息?
【发布时间】:2021-04-02 08:49:59
【问题描述】:

我有一个生产者正在向 ActiveMQ 的一个主题和一个具有以下属性的客户端发送消息:

val optionsMap: Map[String, String] =
      Map[String, String]("brokerUrl" -> brokerUrl,
                          "topic" -> topicName, 
                          "persistence" -> "memory", 
                          "username" -> username, 
                          "password" -> password, 
                          "clientId" -> "something")

现在,当我的客户端应用程序未运行并且我向ActiveMQ topic 发送消息时,Enqueued Messages 的数量增加了 1,Dequeued Messages 的数量保持不变。 但是一旦我启动我的客户端,Dequeued Messages 的数量就等于Enqueued Messages 的数量,尽管我的客户端应用程序不产生任何输出。如何解决这个问题?我希望我的客户端应用程序输出所有以前排队的消息。

【问题讨论】:

    标签: apache-spark activemq spark-structured-streaming


    【解决方案1】:

    除非您要发送到具有已注册持久订阅的主题,否则代理将立即丢弃所有发送的消息,因为这是定义主题的目的。只有活跃的 Topic 订阅者或持久订阅者才会收到发送到 Topic 的消息。

    【讨论】:

    • 我指定一个clientId,这不会让我的订阅者持久吗?
    • 不,当然在生产者方面这无论如何也行不通。订阅者之前必须使用订阅名称和客户端 ID 进行订阅才能创建持久订阅。
    • 能否请您指出如何在 spark-structured-streaming 中创建持久订阅...我的意思是我已经定义了 clientId,如何定义订阅名称?
    猜你喜欢
    • 2019-09-20
    • 1970-01-01
    • 2020-10-15
    • 2020-08-07
    • 1970-01-01
    • 2021-09-11
    • 2021-05-31
    • 2015-09-13
    • 1970-01-01
    相关资源
    最近更新 更多