【问题标题】:Frequent concurrent method calls in Java data-loggerJava数据记录器中频繁的并发方法调用
【发布时间】:2017-07-05 08:52:40
【问题描述】:

我正在实现一个 Java 数据记录器,它以精确的时间间隔读取来自不同生产机器的一些数据。为了避免一个调用阻塞以下调用,我正在考虑为解析器类的每次调用创建一个新线程

但是,这需要创建许多线程,然后每 10 秒(这是我的阅读间隔)停止它们。当解析器出现异常时(由于我正在使用的物联网设备可能超时),非并发方法会导致我有很多延迟,从而延迟下一次调用。

while(!error){

//JDBC connections and other calls here
//Queryresult is a ResultSet that returns all the machine addresses needing to be read

    while(queryresult.next()){
                        //Parser.ParseSpeedV is the method I need to call concurrently
                        Double v = Parser.ParseSpeedV(..Params..);
                        Double s = v*queryresult.getDouble("const");
                        st = conn.createStatement();
                        st.executeUpdate("INSERT INTO ...");
                    }
st.close();
Thread.sleep(10000);
}

什么是实现并发方法调用(对方法 ParseSpeedV)而不产生每天启动的数千个线程造成的开销的最佳方法?

【问题讨论】:

    标签: java multithreading logging concurrency


    【解决方案1】:

    您要使用的是ScheduledExecutorService。它允许您添加以固定速率或固定延迟重复的任务。所以你可以即添加一个每 10 秒从设备获取数据的任务。然后,Executor 服务会确保它在该时间间隔内以合理的低偏差运行。

    final ScheduledExecutorService myScheduledExecutor = Executors.newScheduledThreadPool(16);
    myScheduledExecutor.scheduleAtFixedRate(myTask, 0L, 10L, TimeUnit.SECONDS);
    

    【讨论】:

    • 这个很有趣,我想知道的是:它如何处理线程终止?它会阻止调用者线程结束吗?
    • 只要至少有一个非恶魔线程处于活动状态,Java VM 就会处于活动状态。因此,只要您没有将调度程序线程显式设置为恶魔类型,VM 就应该保持活动状态。但是,根据您的用例,您可能希望有一个“主”线程等待用户输入以终止,除非通过命令行/任务管理器杀死 JVM 就足够了。
    【解决方案2】:

    您的情况是Thread Pool 的完美用例。这部分 Java 库构建在简单的 Threads 之上,允许您创建固定大小的线程池并反复重用它们:

    ExecutorService executor = Executors.newFixedThreadPool(5);
    

    任何时候你想做一些工作,你把它添加到执行器中

    executor.execute(new Runnable() {
        @Override
        public void run() {
            // Do some work
        }
    });
    

    如果您调用 execute 超过 5 次,多余的可运行对象将保留在队列中,直到有空间为止。

    现在,如果您需要从这些正在运行的任务中接收信息,您需要编写一个实现 Runnable 并接受某种对象的类,该对象希望拥有您的 runnable 所具有的信息:

    public class Worker implements Runnable {
       Consumer consumer;
    
       public Worker(Consumer consumer) {
           this.consumer = consumer;
       }
    
       @Override public void run() {
           // Do work
           value = // get value
           consumer.put(value);
       }
    }
    

    现在您所要做的就是定义一个对值进行操作的 Consumer 类(具有 put() 方法或其他方法)并像这样创建您的 Workers:

    Consumer consumer = new Consumer();
    Worker worker = new Worker(myConsumer);
    executor.execute(worker);
    

    【讨论】:

    • 非常感谢!我会看看这个文档
    • 冷静并确保查看不同的类型(例如,池不必是固定大小的)。此外,如果答案最终解决了您的问题,请接受它,以便在搜索结果中清晰可见。
    • 谢谢,我现在将其标记为已解决。还有一个问题:execute 命令如何返回我期望的值?我是否需要为副作用设置一些东西,或者我可以简单地从我的调用中获得一个返回值?我想我得看看这个文档
    • 为此,您需要扩展 Runnable 并让它接受要更新的对象。这样,当 runnable 需要返回一个值时,它可以简单地调用该对象的方法。我会更新我的答案
    • 您描述的是 Executor 服务而不是 ThreadPool (这是相关但不同的东西)。还有一个 ScheduledExecutorService,它是专门为重复任务而设计的。
    猜你喜欢
    • 1970-01-01
    • 2012-08-13
    • 2022-12-10
    • 1970-01-01
    • 2012-07-20
    • 1970-01-01
    • 2020-07-06
    • 2011-09-28
    • 2011-01-17
    相关资源
    最近更新 更多