【问题标题】:Quarkus AMQP send message to queue after request business logicQuarkus AMQP 在请求业务逻辑后将消息发送到队列
【发布时间】:2019-07-23 19:13:43
【问题描述】:

一旦我收到 HTTP Get/Post,我必须坚持并反对,然后将消息发送到其他服务正在侦听的队列以开始执行其他复杂工作

我当前的问题是我不能只调用带有 @Outgoing("channel") 注释的方法,我尝试过并继续执行该方法而不调用

有没有办法使用 Quarkus 框架调用方法将 JSON 有效负载发送到队列?

PS:我也在尝试使用rabbitMQ并切换回ActiveMQ

我已经按照 Quarkus tuturial 关于响应式消息传递并尝试在已实现的资源中注册某些内容,但没有运气

@Path("/part")
class PartService : PanacheRepository<PartDao>, Logging {

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Transactional
    fun fetchParts(): List<PartDao> {

        val partDao = PartDao(label = "Test", status = PartStatus.INBANK, creatorId = "ghost-007")

        partDao.persist()

        if (partDao.isPersistent) {
            // Send a message to a queue -> PoC
            send(partDao)
        }

        return findAll().list()
    }

    @Outgoing("part-persisted")
    @Transactional
    fun send(partDao: PartDao): CompletionStage<AmqpMessage<*>> {
        val future = CompletableFuture<AmqpMessage<*>>()
        val message = "hello from sender"

        // Debug proposes
        println("Sending (data): $message")
        logger.debug(partDao.toString())

        future.complete(AmqpMessage(message))
        return future
    }

}

预期:

完成后在队列中注册消息“来自发件人的你好”:

curl http://localhost/part

实际结果:

send 方法继续执行

【问题讨论】:

  • 嗨,是否可以将各种关注点分成不同的类?很难弄清楚目前的情况
  • 你好,最好解释一下发生了什么:1. fetchParts 方法在执行 http 请求时执行(例如:curl localhost/parts) 2. 当 fetchParts() 方法执行时我需要发送将消息发送到队列,这就是它调用 send() 方法的原因 3.“send()”方法会将输出返回到在 @Outgoing 注释中配置的消息队列问题是 send() 方法正在不间断地执行,而不是何时fetchParts 被执行
  • 我担心将所有问题都封装到同一个类中可能会导致奇怪的行为
  • 这只是为了测试porpuses
  • 当然,我理解,但我担心这种仅用于测试的简单方法本身会导致问题

标签: kotlin quarkus


【解决方案1】:

如果我理解正确,您想调用一个将某些内容放入流中的方法。

据我所知,您必须使用 Emitter 来执行此操作,请参见例如https://github.com/michalszynkiewicz/devoxxpl-demo/blob/master/search/src/main/java/com/example/search/SearchEndpoint.java#L23

请参阅https://smallrye.io/smallrye-reactive-messaging/#_stream 文档。

【讨论】:

  • 非常感谢!这正是我所需要的
猜你喜欢
  • 2016-03-03
  • 1970-01-01
  • 1970-01-01
  • 2021-06-13
  • 2014-03-15
  • 1970-01-01
  • 2012-08-10
  • 2012-05-20
  • 2016-05-02
相关资源
最近更新 更多