【发布时间】:2016-08-28 12:57:35
【问题描述】:
我想实现一个存储来自另一个队列 (SQS) 的数据的服务。 首先我定义了 Listener
@Component
public class MyListener implements MessageListener{
@Override
@JmsListener(destination = "my-queue")
public void onMessage(Message message) {
try{
System.out.println(((TextMessage) message).getText());
}catch (Exception e){
e.printStackTrace();
}
}
}
仅用于测试该方法仅打印消息的有效负载
现在我想将某种基于LinkedBlockingQueue 的内存队列定义为service
public interface IStoreService {
void save(Order order);
String get();
}
这个想法是其他服务将能够使用get() 方法来获取第一条消息并使用它。我认为这个解决方案可以通过创建一个端点来减少延迟,让服务可以获取一些信息或等待通知。这也只是我的假设,我可能会误解一些东西。
最后是一个问题:我应该如何正确地为一个生产者和多个消费者共享这个队列?
UPD1(有关应用程序的更多信息)。
它是从 Simple Queue Service (Amazon) 获取信息(比如说 JSON 对象)的独立应用程序。这个逻辑在MyListener 组件中实现。我还有另一个队列,它有Orders,该队列基于 ActiveMQ(是的,应用程序的架构令人作呕)。
这是工作流程:
ActiveMQListener 从 ActiveMQ 队列中逐个获取订单,我希望 ActiveMQListener 从我的 Storage 中获取第一条消息,假设合并这两个对象。
唯一的问题是如何在从 SQS 获取数据并将其推送到存储的 MyListener 和从 ActiveMQ 获取数据的 ActiveMQListener 之间共享此存储。
还有StoreService的演示代码。
public class StoreService implements IStoreService {
private final Queue<Order> queue = new LinkedBlockingQueue<>();
@Override
public void save(Order order) {
this.queue.add(order);
}
@Override
public String get() {
Order order = queue.poll();
return order.getId();
}
}
UPD2 我在尝试解决问题时遇到了问题。正如我上面提到的,有三个队列:
+--------------+-----------------------------------------+
|request.queue | - get information from ActiveMQ |
|response.queue| - pass data to another ActiveMQ queue |
|sqs.queue | - queue based on Amazon SQS |
+--------------+-----------------------------------------+
我也有 StoreService
@Service
public class StoreService implements IStoreService {
private final BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
@Override
public void save(Order order) {
this.queue.add(order);
}
@Override
public String get() {
Order order = null;
try {
order = queue.take();
return order.getId();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
带有方法receiveRequest的ActiveMQListener
@JmsListener(destination = "request.queue")
@SendTo("response.queue")
public String receiveRequest(Order order){
System.out.println("[INFO] REQUEST:" + order.getId());
try{
String test = storeService.get();
System.out.println("[INFO] SQS:" + test);
}catch (Exception e){
e.printStackTrace();
}
return order.getId();
}
对不起,System.out.prinln,我禁用了记录器
如您所见,此方法从名为request.queue 的队列中获取信息,并尝试从 storeSerivce 获取数据
线路:String test = storeService.get()
但问题是我不知道如何将 storeService 传递给它。我做了这样的事情
private StoreService storeService;
@Autowired
public ActiveMQListener Listener(StoreService storeService){
this.storeService = storeService;
}
而storeService 是空的,因为它只是storeService 的一个新实例。
有什么想法吗?
UPD3 我还把它上传到了 github repository。这有点混乱,但我认为理解这个概念是可以的。
【问题讨论】:
-
您能否提供更多与您的问题相关的信息?您已经定义了一个接口。生产者和消费者可以简单地使用它。您的问题中的“分享”是什么意思?例如,您需要远程访问吗?您有什么要求?
-
@mm759,我添加了 UPD1 部分。请检查一下。
-
我的候选答案(希望我理解您的需求):MyListener 在接收数据时调用 StoreService 上的 save。 ActiveMQListener 调用 StoreService.get 来检索数据。您需要在 StoreService 的方法中添加 synchronized 以避免并发问题。我不明白的是 StoreService.get 只返回 ID。您是否不需要整个对象,这样您就不必从其他地方获取它来实现您提到的减少延迟。请注意,在服务器崩溃等情况下,使用内存存储可能会导致数据丢失。
-
StorageServices 因为测试只返回ID。我稍后会实施。我试图解决这个问题,但遇到了我在 UPD2 部分中解释的问题。
-
@mm759,我还添加了 github repo。请在 UPD3 部分查看。