【问题标题】:Data structure to manage a maximum number of requests per minute for an api用于管理 api 每分钟最大请求数的数据结构
【发布时间】:2020-12-19 02:22:02
【问题描述】:

我需要将数据发送到外部 api,但此 API 对每个端点的请求数有限制(即:每分钟 60 个请求)。

数据来自Kafka,然后每条消息都转到redis(因为我可以发送一个包含200个项目的请求)。所以,我用一个简单的缓存来帮助我,我可以保证如果我的服务器宕机,我不会丢失任何消息。

问题是,有时候 Kafka 开始发送很多消息,然后 redis 开始增长(超过 100 万条消息要发送到 api),我们不能太快地发出请求消息进来了。然后,我们有一个很大的延迟。

我的第一个代码很简单:ExecutorService executor = Executors.newFixedThreadPool(1);
当消息很少并且延迟最小时,这非常有效。

所以,我做的第一件事就是将执行者更改为:ExecutorService executor = Executors.newCachedThreadPool();
所以我可以要求新线程,因为我需要更快地向外部 api 发出请求,但是,我遇到了每分钟请求数限制的问题。

有些端点我每分钟可以发出 300 个请求,其他 500 个,其他 30 个等等。

我做的代码不是很好,这是针对我工作的公司的,所以我真的需要把它做得更好。

所以,每次我要请求外部 api 时,我都会调用 makeRequest 方法,这个方法是同步的,我知道我可以使用同步列表,但我认为同步方法更适合这种情况。

// This is an inner class
private static class IntegrationType {

    final Queue<Long> requests; // This queue is used to store the timestamp of the requests
    final int maxRequestsPerMinute; // How many requests I can make per minute

    public IntegrationType(final int maxRequestsPerMinute) {
        this.maxRequestsPerMinute = maxRequestsPerMinute;
        this.requests = new LinkedList<>();
    }

    synchronized void makeRequest() {
        final long current = System.currentTimeMillis();
        requests.add(current);
        if (requests.size() >= maxRequestsPerMinute) {
            long first = requests.poll(); // gets the first request

            // The difference between the current request and the first request of the queue
            final int differenceInSeconds = (int) (current - first) / 1000;
           
            // if the difference is less than the maximum allowed
            if (differenceInSeconds <= 60) {
                // seconds to sleep.
                final int secondsToSleep = 60 - differenceInSeconds;
                sleep(secondsToSleep);
            }
        }
    }

     void sleep( int seconds){
        try {
            Thread.sleep(seconds * 1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
     }
}

那么,我可以使用一种数据结构吗? 我应该考虑什么?

提前致谢。

【问题讨论】:

    标签: java multithreading data-structures apache-kafka synchronization


    【解决方案1】:

    如果正确理解您的问题,您可以使用BlockingQueueScheduledExecutorService,如下所示。

    BlockingQueues 有 put 方法,如果有可用空间,它只会将给定元素添加到队列中,否则方法调用将等待(直到有可用空间)。他们还有 take 方法,它只会在有任何元素的情况下从队列中删除一个元素,否则方法调用将等待(直到至少有一个元素要取)。

    具体来说,您可以使用LinkedBlockingQueueArrayBlockingQueue,它们可以在任何给定时间提供固定大小的元素。这个固定大小意味着你可以用put 提交任意数量的请求,但你只会take 请求并每秒处理一次或什么的(例如每分钟发出60 个请求)。

    要实例化具有固定大小的LinkedBlockingQueue,只需使用相应的构造函数(它接受大小作为参数)。 LinkedBlockingQueue 将根据其文档将 take 元素按 FIFO 顺序排列。

    要实例化具有固定大小的ArrayBlockingQueue,请使用接受大小的构造函数以及名为fairboolean 标志。如果此标志为true,则队列将take 元素也按FIFO 顺序。

    然后你可以有一个ScheduledExecutorService(而不是在循环中等待),你可以提交一个Runnable,它将take从队列中,与外部API进行通信,然后等待所需的通信之间的延迟。

    按照上面的简单演示示例:

    import java.util.Objects;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
        
        public static class RequestSubmitter implements Runnable {
            private final BlockingQueue<Request> q;
            
            public RequestSubmitter(final BlockingQueue<Request> q) {
                this.q = Objects.requireNonNull(q);
            }
            
            @Override
            public void run() {
                try {
                    q.put(new Request()); //Will block until available capacity.
                }
                catch (final InterruptedException ix) {
                    System.err.println("Interrupted!"); //Not expected to happen under normal use.
                }
            }
        }
        
        public static class Request {
            public void make() {
                try {
                    //Let's simulate the communication with the external API:
                    TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
                }
                catch (final InterruptedException ix) {
                    //Let's say here we failed to communicate with the external API...
                }
            }
        }
        
        public static class RequestImplementor implements Runnable {
            private final BlockingQueue<Request> q;
            
            public RequestImplementor(final BlockingQueue<Request> q) {
                this.q = Objects.requireNonNull(q);
            }
            
            @Override
            public void run() {
                try {
                    q.take().make(); //Will block until there is at least one element to take.
                    System.out.println("Request made.");
                }
                catch (final InterruptedException ix) {
                    //Here the 'taking' from the 'q' is interrupted.
                }
            }
        }
        
        public static void main(final String[] args) throws InterruptedException {
            
            /*The following initialization parameters specify that we
            can communicate with the external API 60 times per 1 minute.*/
            final int maxRequestsPerTime = 60;
            final TimeUnit timeUnit = TimeUnit.MINUTES;
            final long timeAmount = 1;
            
            final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
            //final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
            
            //Submit some RequestSubmitters to the pool...
            final ExecutorService pool = Executors.newFixedThreadPool(100);
            for (int i = 0; i < 50_000; ++i)
                pool.submit(new RequestSubmitter(q));
            
            System.out.println("Serving...");
            
            //Find out the period between communications with the external API:
            final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
            //We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
            
            //The most important line probably:
            Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
        }
    }
    

    请注意,我使用了scheduleWithFixedDelay不是 scheduleAtFixedRate。您可以在他们的文档中看到,第一个将等待提交的Runnable 的调用结束之间的延迟以开始下一个,而第二个不会等待,只需重新提交Runnable 每个@987654349 @ 时间单位。但是我们不知道与外部 API 通信需要多长时间,那么如果我们 scheduleAtFixedRateperiod 每分钟一次,但请求需要超过一分钟才能完成? .. 然后会在第一个请求尚未完成时提交一个新请求。所以这就是我使用scheduleWithFixedDelay 而不是scheduleAtFixedRate 的原因。但还有更多:我使用了单线程调度的执行器服务。这是否意味着如果第一次调用没有完成,那么第二次调用就无法启动?...好吧,如果你看一下Executors#newSingleThreadScheduledExecutor()的实现,看起来可能会发生第二次调用,因为单线程核心池大小,并不意味着池的大小是固定的

    我使用scheduleWithFixedDelay 的另一个原因是请求下溢。例如,队列为空怎么办?那么调度也应该等待,不要再次提交Runnable

    另一方面,如果我们使用scheduleWithFixedDelay,假设调度之间有1/60f秒的延迟,并且一分钟内提交了60多个请求,那么这肯定会使我们的吞吐量对外部API 丢弃,因为使用scheduleWithFixedDelay,我们可以保证最多 60 个请求将发送到外部 API。它可以小于这个值,但我们不希望它是。我们希望每次都达到极限。如果这不是您关心的问题,那么您已经可以使用上述实现了。

    但是假设您每次都尽可能接近限制,在这种情况下,据我所知,您可以使用自定义调度程序来执行此操作,这比第一个解决方案更不干净,但时间更准确。

    归根结底,通过上述实现,您需要确保与外部 API 的通信以服务请求的速度尽可能快。

    最后,我应该警告您考虑,如果我建议的 BlockingQueue 实现不是 FIFO 顺序中的 puting,我无法找到会发生什么。我的意思是,如果 2 个请求几乎同时到达,而队列已满,该怎么办?他们都会等待,但第一个到达的人会等待并首先获得puted,还是第二个是puted?我不知道。如果您不关心在外部 API 上发出的某些请求无序,那么请不要担心并使用到目前为止的代码。但是,如果您确实关心,并且可以在每个请求中输入例如序列号,那么您可以在BlockingQueue 之后使用PriorityQueue ,甚至可以尝试使用PriorityBlockingQueue(不幸的是,这是无限的)。这会使事情变得更加复杂,所以我没有用PriorityQueue 发布相关代码。至少我尽了最大努力,我希望我能提出一些好的想法。我并不是说这篇文章是对你所有问题的完整解决方案,但它是一些开始的考虑因素。

    【讨论】:

    • 感谢您的宝贵时间。我要去实现它,之后,我会回到这里。
    • 我以另一种方式做了,但我真的认为你的回答更好。
    【解决方案2】:

    我实现了与@gthanop 建议的不同的东西。

    我发现,限制可能会改变。所以,我可能需要扩大或缩小阻止列表。另一个原因是,让我们当前的代码适应这一点并不容易。还有一个,我们可能会使用多个实例,所以我们需要一个分布式锁。

    所以,我更容易实现一些东西,但不如@ghtanop 的答案那么有效。

    这是我的代码(已改编,因为我无法显示公司代码):

    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.Semaphore;
    
    public class Teste {
        
        private static enum ExternalApi {    
            A, B, C;
        }
    
        private static class RequestManager {
    
            private long firstRequest; // First request in one minute
        
            // how many request have we made
            private int requestsCount = 0;
        
            // A timer thread, it will execute at every minute, it will refresh the request count and the first request time
            private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        
            RequestManager() {
                final long initialDelay = 0L;
                final long fixedRate = 60;
        
                executor.scheduleAtFixedRate(() -> {
                    System.out.println("Clearing the current count!");
                    requestsCount = 0;
                    firstRequest = System.currentTimeMillis();
                }, initialDelay, fixedRate, TimeUnit.SECONDS);
            }
        
            void incrementRequest() {
                requestsCount++;
            }
        
            long getFirstRequest() {
                return firstRequest;
            }
        
        
            boolean requestsExceeded(final int requestLimit) {
                return requestsCount >= requestLimit;
            }
        
        }
    
        public static class RequestHelper {
    
            private static final byte SECONDS_IN_MINUTE = 60;
            private static final short MILLISECONDS_IN_SECOND = 1000;
            private static final byte ZERO_SECONDS = 0;
        
            // Table to support the time, and count of the requests
            private final Map<Integer, RequestManager> requests;
        
            // Table that contains the limits of each type of request
            private final Map<Integer, Integer> requestLimits;
        
            /**
             * We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
             */
            private final Semaphore[] semaphores;
        
            private RequestHelper(){
        
                // one semaphore for type
                semaphores = new Semaphore[ExternalApi.values().length];
                requests = new ConcurrentHashMap<>();
                requestLimits = new HashMap<>();
        
                for (final ExternalApi type : ExternalApi.values()) {
    
                    // Binary semaphore, must be fair, because we are updating things.
                    semaphores[type.ordinal()] = new Semaphore(1, true);
                }
            }
        
            /**
             * When my token expire, I must update this, because the limits might change.
             * @param limits the new api limits
             */
            protected void updateLimits(final Map<ExternalApi, Integer> limits) {
                limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
            }
        
        
            /**
             * Increments the counter for the type of the request,
             * Using the mutual exclusion lock, we can handle and block other threads that are trying to
             * do a request to the api.
             * If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
             * since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
             *
             * @param type of the integration, Supp, List, PET etc ...
             */
            protected final void addRequest(final ExternalApi type) {
        
                // the index of this request
                final int requestIndex = type.ordinal();
        
                // we get the permit for the semaphore of the type
                final Semaphore semaphore = semaphores[requestIndex];
        
                // Try to acquire a permit, if no permit is available, it will block until one is available.
                semaphore.acquireUninterruptibly();
        
                ///gets the requestManager for the type
                final RequestManager requestManager = getRequest(requestIndex);
        
                // increments the number of requests
                requestManager.incrementRequest();
        
                if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {
        
                    // the difference in seconds between a minute - the time that we needed to reach the maximum of requests
                    final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;
        
                    // We reached the maximum in less than a minute
                    if (secondsToSleep > ZERO_SECONDS) {
                        System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!\n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
                        sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
                    }
                }
                // releases the semaphore
                semaphore.release();
            }
        
        
            private final void sleep(final long time) {
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        
            /**
             * Gets the first Request Manager, if it is the first request, it will create the
             * RequestManager object
             * @param index
             * @return a RequestManager instance
             */
            private RequestManager getRequest(final int index) {
                RequestManager request = requests.get(index);
                if(request == null) {
                    request = new RequestManager();
                    requests.put(index, request);
                }
                return request;
            }
        }
    
        public static void main(String[] args) {
            
            final RequestHelper requestHelper = new RequestHelper();
            
            final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);
            
            // update the limits
            requestHelper.updateLimits(apiLimits);
    
            final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
            executor.scheduleWithFixedDelay(() -> {
                System.out.println("A new request is going to happen");
                requestHelper.addRequest(ExternalApi.A);
                sleep(65);
            }, 0, 100, TimeUnit.MILLISECONDS);
    
            executor.scheduleWithFixedDelay(() -> {
                System.out.println("B new request is going to happen");
                requestHelper.addRequest(ExternalApi.B);
                sleep(50);
            }, 0, 200, TimeUnit.MILLISECONDS);
    
            executor.scheduleWithFixedDelay(() -> {
                System.out.println("C new request is going to happen");
                requestHelper.addRequest(ExternalApi.C);
                sleep(30);
            }, 0, 300, TimeUnit.MILLISECONDS);
    
        }
        
        
        private static final void sleep(final long time) {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } 
    }
    

    【讨论】:

    • 由于我使用的是 Kafka,因此来自它的消息比我的集成要快得多。我将消息放入缓存(Redis),然后当消息达到我可以发送到外部 api 的最大行数(例如 200)时,我从 redis 检索 N(200)条消息,然后创建有效负载并制作要求。这样做比实际集成要快得多,所以,我使用的是线程缓存。
    猜你喜欢
    • 1970-01-01
    • 2023-03-12
    • 2018-01-14
    • 2020-02-28
    • 1970-01-01
    • 2015-06-14
    • 2018-02-13
    • 2020-12-15
    • 2017-08-05
    相关资源
    最近更新 更多