【问题标题】:do we have a java queue object or mechanism to handle batch treatment?我们有处理批处理的 java 队列对象或机制吗?
【发布时间】:2012-10-08 08:51:30
【问题描述】:

Java 是否支持任何队列对象或机制来处理批处理?

ex:我们有一个队列(或任何想要的队列对象),一些生产者将项目一一推送到队列中,我的目标是当我们在这个队列中有 10 个项目或超过 10 个项目时,我们可以触发一些处理程序一批处理。

或者它不是自动触发的,我们需要找到一种方法在处理程序端优雅地循环队列。

我们是否有典型的高性能对象或库来处理这个问题?

谢谢, 埃姆雷

【问题讨论】:

    标签: java queue


    【解决方案1】:

    队列中的批处理可以通过等待/通知来实现,就像你会阻止对资源的线程调用,直到它可用或不可用。

    public class MyQueue implements Queue<Object>{
            public synchronized List<Object> peek() {
            if(this.list.size()>=10)
                      this.list.wait();
            return Collections.subList(0,10);
        }
            @Override
        public boolean add(Object e) {
            this.list.add(e);
                    if(this.list.size()>=10)
                      this.list.notifyAll(); 
            return false;
        }
    }
    

    不会自动触发

    在这种情况下,您可以在指定超时后调用等待。

    【讨论】:

      【解决方案2】:

      您可以使用BlockingQueue.drainTo()自动获取批量要执行的任务。这适用于每秒超过 100K 的任务。

      如果您需要更高性能的队列,您可以使用更复杂的 DisruptorJava Chronicle,它们每秒可以排队处理数百万个任务,都支持自动批处理。

      【讨论】:

      • 所以你的意思是,我们需要在方法端实现它,对吧?我们添加一个循环来运行 BlockingQueue.drainTo() 以获取包含队列项的列表,然后调用处理器来处理它。
      • 你可以这样做。生产者和消费者都有方法;)
      • 我已经构建了一个库来处理固定超时的批处理和刷新:github.com/fulmicotone/io.fulmicotone.fqueue
      【解决方案3】:

      以下是批量处理对象的快速尝试,使用后台线程收集和处理由其他线程推送到队列中的对象:

      public abstract class Batcher<E> implements Runnable {
      
          public static interface BatchProcessor<E> {
              public void processBatch(List<E> batch);
          }
      
          private final BlockingQueue<E> queue;
          private final BatchProcessor<E> processor;
      
          private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) {
              this.queue = queue;
              this.processor = processor;
          }
      
          @Override
          public void run() {
              try {
                  while (true) {
                      List<E> batch = new ArrayList<E>();
                      for (int i = 0; i < 10; i++) {
                          batch.add(queue.take());
                      }
                      processor.processBatch(batch);
                  }
              } catch (InterruptedException e) {
                  return;
              }
          }
      
      }
      

      要使用它,您创建一个BlockingQueue 并在其上放置对象,创建一个BatchProcessor 实现的实例来处理批次,然后创建一个Batcher 的实例以将对象从前者泵送到后者。

      【讨论】:

        【解决方案4】:

        查看接口java.util.Queue的API文档,它有几个实现。

        还有一个标准 API,Java Message Service (JMS) 来处理队列系统,以便在不同进程之间交换消息。

        【讨论】:

        • 这如何解决批处理问题?我认为不会。
        【解决方案5】:

        我认为CountDownLatch 是您需要的,或者可能是CyclicBarrier。这将允许您设置一个同步点,该点将在发生一定数量的操作后触发消费者,并且您可以使用标准队列作为容器对象。

        【讨论】:

        • 能否请您提供一些代码sn-p,会很好理解的想法。
        • CountDownLatch 在有多个线程产生或多个线程消耗(或两者兼有)时特别有用。目前尚不清楚这里的情况。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2015-05-05
        • 2012-11-12
        • 1970-01-01
        • 1970-01-01
        • 2020-07-13
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多