【问题标题】:java concurrency - keep order between certain tasksjava并发 - 保持某些任务之间的顺序
【发布时间】:2020-05-05 11:27:17
【问题描述】:

我的应用收到来自多个用户的消息。每个用户的消息必须按顺序执行,否则消息可以并行执行。如何实现这样的逻辑?

例如消息按以下顺序到达:u1:m1u1:m2u2:m1u1:m3u2:m2。执行应该像这样并行:

  • 线程1:u1:m1u1:m2u1:m3
  • 线程2:u2:m1u2:m2

用户数量可能很大,因此我不能只为每个用户创建一个线程执行器。

private ExcutorService executorService = newFixedThreadPool(10);

public void onMessage(String user, String message) {
  // TODO schedule tasks per user in order
  executorService.schedule(() -> processMessage(message));
}

【问题讨论】:

  • 可能是个愚蠢的想法,但是如何将消息排队并将它们放入每个用户的队列中呢?无论是 java 队列还是任何其他队列机制

标签: java concurrency


【解决方案1】:

你可以这样做:

        //CREATE EXECUTORS
        int numberOfThreads = 10;
        ExecutorService[] executors = new ExecutorService[numberOfThreads];
        for (int i = 0; i < numberOfThreads; i++) {
            executors[i] = Executors.newSingleThreadExecutor();
        }

然后在您的方法中,借助 hashCode 和模数为用户使用特定的执行器:

public void onMessage(String user, String message) {
    //same user will always get the same executor, hashCode will evenly distribute the load among the executors
    int executorToUse = Math.abs(user.hashCode()) % numberOfThreads; 
    ExecutorService executorService = executors[executorToUse];
    executorService.execute(() -> processMessage(message));
}

【讨论】:

  • 将动作大致均匀地分布在可用线程上,并确保每个用户的动作都进入同一个线程。不错。
【解决方案2】:

您可以通过分组队列来解耦消息生产者和处理器:

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class GroupingBlockingQueue<G, E> {

    private static final int UNBOUNDED = -1;

    private final Lock lock = new ReentrantLock();

    private final Condition notFull  = lock.newCondition();

    private final Condition notEmpty = lock.newCondition();

    private final Map<G, List<E>> map = new LinkedHashMap<>();

    private final int bound;

    public GroupingBlockingQueue() {
        this(UNBOUNDED);
    }

    public GroupingBlockingQueue(int bound) {
        this.bound = bound;
    }

    public void put(G group, E element) throws InterruptedException {
        lock.lock();
        try {
            if (bound > 0) {
                while (bound == map.keySet().size()) {
                    notFull.await();
                }
            }
            map.computeIfAbsent(group, k -> new ArrayList<>()).add(element);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Item<G, E> take() throws InterruptedException {
        lock.lock();
        try {
            Set<G> keys = map.keySet();
            while (keys.isEmpty()) {
                notEmpty.await();
            }
            G group = keys.iterator().next();
            List<E> elements = map.remove(group);
            notFull.signal();
            return new Item<>(group, elements);
        } finally {
            lock.unlock();
        }
    }

    public static class Item<G, E> {
        private final G group;
        private final List<E> elements;

        public Item(G group, List<E> elements) {
            this.group = group;
            this.elements = elements;
        }

        public G getGroup() {
            return group;
        }

        public List<E> getElements() {
            return elements;
        }
    }
}

那么消息处理器可能看起来像:

package ru.dkovalev;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.newFixedThreadPool;

public class UserMessageProcessor implements AutoCloseable {

    private GroupingBlockingQueue<String, String> queue = new GroupingBlockingQueue<>();
    private ExecutorService executorService;

    public UserMessageProcessor(int threadCount) {
        executorService = newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executorService.submit((Runnable) this::processMessages);
        }
    }

    private void processMessages() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                GroupingBlockingQueue.Item<String, String> item = queue.take();
                processMessages(item.getGroup(), item.getElements());
            }
        } catch (InterruptedException ignore) {}
    }

    private void processMessages(String user, List<String> messages) throws InterruptedException {
        for (String message : messages) {
            System.out.println(String.format("[%s] %s:%s", Thread.currentThread().getName(), user, message));
            Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        }
    }

    public void onMessage(String user, String message) throws InterruptedException {
        queue.put(user, message);
    }

    @Override
    public void close() {
        try {
            executorService.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException ignore) {
        }
        executorService.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        try (UserMessageProcessor processor = new UserMessageProcessor(10)) {
            processor.onMessage("u1", "m1");
            processor.onMessage("u2", "m1");
            processor.onMessage("u1", "m2");
            processor.onMessage("u2", "m2");
            processor.onMessage("u1", "m3");
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-10-13
    • 1970-01-01
    • 1970-01-01
    • 2017-03-08
    • 1970-01-01
    • 2015-11-29
    • 2010-10-15
    • 2018-10-15
    相关资源
    最近更新 更多