【问题标题】:Spring Shared Queue as a ServiceSpring 共享队列即服务
【发布时间】:2016-08-28 12:57:35
【问题描述】:

我想实现一个存储来自另一个队列 (SQS) 的数据的服务。 首先我定义了 Listener

@Component
public class MyListener implements MessageListener{


    @Override
    @JmsListener(destination = "my-queue")
    public void onMessage(Message message) {
        try{
            System.out.println(((TextMessage) message).getText());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

仅用于测试该方法仅打印消息的有效负载

现在我想将某种基于LinkedBlockingQueue 的内存队列定义为service

public interface IStoreService {
    void save(Order order);
    String get();
}

这个想法是其他服务将能够使用get() 方法来获取第一条消息并使用它。我认为这个解决方案可以通过创建一个端点来减少延迟,让服务可以获取一些信息或等待通知。这也只是我的假设,我可能会误解一些东西。

最后是一个问题:我应该如何正确地为一个生产者和多个消费者共享这个队列?

UPD1(有关应用程序的更多信息)。

它是从 Simple Queue Service (Amazon) 获取信息(比如说 JSON 对象)的独立应用程序。这个逻辑在MyListener 组件中实现。我还有另一个队列,它有Orders,该队列基于 ActiveMQ(是的,应用程序的架构令人作呕)。

这是工作流程: ActiveMQListener 从 ActiveMQ 队列中逐个获取订单,我希望 ActiveMQListener 从我的 Storage 中获取第一条消息,假设合并这两个对象。

唯一的问题是如何在从 SQS 获取数据并将其推送到存储的 MyListener 和从 ActiveMQ 获取数据的 ActiveMQListener 之间共享此存储。

还有StoreService的演示代码。

public class StoreService implements IStoreService {

    private final Queue<Order> queue = new LinkedBlockingQueue<>();
    @Override
    public void save(Order order) {
        this.queue.add(order);
    }

    @Override
    public String get() {
        Order order = queue.poll();
        return order.getId();
    }
}

UPD2 我在尝试解决问题时遇到了问题。正如我上面提到的,有三个队列:

+--------------+-----------------------------------------+
|request.queue | - get information from ActiveMQ         |
|response.queue| - pass data to another ActiveMQ queue   |
|sqs.queue     | - queue based on Amazon SQS             |
+--------------+-----------------------------------------+

我也有 StoreService

@Service
public class StoreService implements IStoreService {

    private final BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Override
    public void save(Order order) {
        this.queue.add(order);
    }

    @Override
    public String get() {
        Order order = null;
        try {
            order = queue.take();
            return order.getId();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }
}

带有方法receiveRequest的ActiveMQListener

    @JmsListener(destination = "request.queue")
    @SendTo("response.queue")
    public String receiveRequest(Order order){

        System.out.println("[INFO] REQUEST:" + order.getId());
        try{
            String test = storeService.get();
            System.out.println("[INFO] SQS:" + test);

        }catch (Exception e){
            e.printStackTrace();
        }

        return order.getId();
    }

对不起,System.out.prinln,我禁用了记录器 如您所见,此方法从名为request.queue 的队列中获取信息,并尝试从 storeSerivce 获取数据

线路:String test = storeService.get()

但问题是我不知道如何将 storeService 传递给它。我做了这样的事情

    private StoreService storeService;
    @Autowired
    public ActiveMQListener Listener(StoreService storeService){
        this.storeService = storeService;
    }

storeService 是空的,因为它只是storeService 的一个新实例。

有什么想法吗?

UPD3 我还把它上传到了 github repository。这有点混乱,但我认为理解这个概念是可以的。

【问题讨论】:

  • 您能否提供更多与您的问题相关的信息?您已经定义了一个接口。生产者和消费者可以简单地使用它。您的问题中的“分享”是什么意思?例如,您需要远程访问吗?您有什么要求?
  • @mm759,我添加了 UPD1 部分。请检查一下。
  • 我的候选答案(希望我理解您的需求):MyListener 在接收数据时调用 StoreService 上的 save。 ActiveMQListener 调用 StoreService.get 来检索数据。您需要在 StoreService 的方法中添加 synchronized 以避免并发问题。我不明白的是 StoreService.get 只返回 ID。您是否不需要整个对象,这样您就不必从其他地方获取它来实现您提到的减少延迟。请注意,在服务器崩溃等情况下,使用内存存储可能会导致数据丢失。
  • StorageServices 因为测试只返回ID。我稍后会实施。我试图解决这个问题,但遇到了我在 UPD2 部分中解释的问题。
  • @mm759,我还添加了 github repo。请在 UPD3 部分查看。

标签: java spring queue


【解决方案1】:

Spring 可以像这样使用依赖注入来注入 StoreService:

@Autowired
private StoreService storeService;

这必须在服务用于获取由 Spring 管理的唯一实例的任何地方完成。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-06-28
    • 2022-01-03
    • 2017-07-25
    • 1970-01-01
    • 2013-12-08
    • 2013-04-22
    • 2015-02-17
    • 1970-01-01
    相关资源
    最近更新 更多