【问题标题】:Pre-initializing a pool of worker threads to reuse connection objects (sockets)预初始化工作线程池以重用连接对象(套接字)
【发布时间】:2013-05-16 19:34:57
【问题描述】:

我需要在 Java 中构建一个工人池,其中每个工人都有自己的连接套接字;当工作线程运行时,它使用套接字但保持打开以供以后重用。我们之所以决定采用这种方法,是因为与在 ad-hoc 基础上创建、连接和销毁套接字相关的开销需要太多开销,因此我们需要一种方法,通过该方法,工作池可以通过其套接字连接进行预初始化,准备好在保持套接字资源不受其他线程影响的同时继续工作(套接字不是线程安全的),所以我们需要一些类似的东西......

public class SocketTask implements Runnable {
  Socket socket;
  public SocketTask(){
    //create + connect socket here
  }

  public void run(){
    //use socket here
  }

}

在应用程序启动时,我们希望初始化工作线程,并希望以某种方式初始化套接字连接......

MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
   pool.addWorker( new WorkerThread());

当应用程序请求工作时,我们将任务发送到工作池以立即执行...

pool.queueWork( new SocketTask(..));


使用工作代码更新
基于来自 Gray 和 jontejj 的有用 cmets,我得到了以下代码工作......

SocketTask

public class SocketTask implements Runnable {
    private String workDetails;
    private static final ThreadLocal<Socket> threadLocal = 
           new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue(){
            return new Socket();
        }           
    };

    public SocketTask(String details){              
        this.workDetails = details;
    }

    public void run(){      
        Socket s = getSocket(); //gets from threadlocal
        //send data on socket based on workDetails, etc.
    }

    public static Socket getSocket(){
        return threadLocal.get();
    }
}

ExecutorService

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());

    int tasks = 15;  
    for( int i = 1; i <= tasks; i++){
        threadPool.execute(new SocketTask("foobar-" + i));
    }   

我喜欢这种方法有几个原因...

  • 套接字是可用于正在运行的任务的本地对象(通过 ThreadLocal),从而消除了并发问题。
  • 套接字创建一次并保持打开,重复使用 当新任务排队时,消除了套接字对象创建/销毁的开销。

【问题讨论】:

  • 感谢工作代码 sn-p,对我帮助很大!我想知道:当线程池关闭时如何关闭数据库连接?
  • 搜索orderly shutdown的执行线程,希望对你有帮助。
  • 我检查过了,谢谢!不过,我并没有完全帮助我……我正在寻找一种方法来为存储在 ThreadLocal 中的所有数据库连接调用 dbconnection.close()。我真的不知道如何处理这个......我的意思是,一旦线程池被关闭,数据库连接不会自动关闭,对吧?
  • 我想我找到了另一个解决方案:我使用一个线程池和一个相同大小的单独向量来存储数据库连接。实际上,在我的情况下,ThreadLocal 并不是真正必要的,因为我并不关心哪个线程获得哪个连接,只要它有一个。无论如何感谢您的灵感!

标签: java concurrency threadpool thread-local threadpoolexecutor


【解决方案1】:

我认为你应该使用ThreadLocal

package com.stackoverflow.q16680096;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main
{
    public static void main(String[] args)
    {
        ExecutorService pool = Executors.newCachedThreadPool();
        int nrOfConcurrentUsers = 100;
        for(int i = 0; i < nrOfConcurrentUsers; i++)
        {
            pool.submit(new InitSocketTask());
        }

        // do stuff...

        pool.submit(new Task());
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class InitSocketTask implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do initial setup here
    }

}

package com.stackoverflow.q16680096;

import java.net.Socket;

public final class SocketPool
{
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue()
        {
            return new Socket(); // Pass in suitable arguments here...
        }
    };

    public static Socket get()
    {
        return SOCKETS.get();
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class Task implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do stuff with socket...
    }
}

每个线程都有自己的套接字。

【讨论】:

  • 使 Socket 成为 ThreadLocal 使其属于线程而不是工作人员,这使您可以直接在任务中访问套接字。只要确保使用启动套接字的 InitTasks 来预热 ThreadPool。然后当执行新任务时,线程池中的所有线程都会准备好套接字。
  • 我用您示例中的工作代码更新了我的原始问题。你称之为SocketPool 我在我的代码中命名为SocketTask,但它们都有一个静态ThreadLocal&lt;Socket&gt; 对象来获取本地套接字对象,这对你来说是否有效?
  • WorkerThreadFactory 似乎是多余的,Executors#defaultThreadFactory() 有什么问题?
  • 我认为你是对的...我最初是从 Gray 那里得到的,他在 run() 中建议自定义代码,但我删除了该代码,所以现在 WorkerThreadFactory 是通用的,可能不需要,谢谢。
  • +1 疯狂地组织起来,哈哈 :) package com.stackoverflow.q16680096;
【解决方案2】:

一个想法是将Sockets 放在BlockingQueue 中。然后,当您需要 Socket 时,您的线程可以从队列中 take(),当它们完成 Socket 时,它们将 put() 回到队列中。

public void run() {
    Socket socket = socketQueue.take();
    try {
       // use the socket ...
    } finally {
       socketQueue.put(socket);
    }
}

这有额外的好处:

  • 您可以返回使用ExecutorService 代码。
  • 您可以将套接字通信与结果处理分开。
  • 您不需要一对一的对应关系来处理线程和套接字。但是套接字通信可能占了 98% 的工作量,所以可能没有任何收获。
  • 当您完成并且您的ExecutorService 完成后,您可以通过将它们出列并关闭它们来关闭您的套接字。

这确实增加了另一个 BlockingQueue 的额外开销,但如果您正在进行 Socket 通信,您不会注意到它。

我们认为 ThreadFactory 无法满足我们的需求...

我认为如果您使用线程局部变量,您可以完成这项工作。您的线程工厂将创建一个线程,该线程首先打开套接字,将其存储在线程本地,然后调用 Runnable arg,它使用套接字完成所有工作,从 ExecutorService 内部队列中取出作业。完成后,arg.run() 方法将完成,您可以从线程本地获取套接字并关闭它。

类似于以下内容。这有点乱,但你应该明白。

ExecutorService threadPool =
    Executors.newFixedThreadPool(10,
      new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    openSocketAndStoreInThreadLocal();
                    // our tasks would then get the socket from the thread-local
                    r.run();
                    getSocketFromThreadLocalAndCloseIt();
                }
            });
            return thread;
        }
      }));

所以你的任务将实现Runnable,看起来像:

public SocketWorker implements Runnable {
    private final ThreadLocal<Socket> threadLocal;
    public SocketWorker(ThreadLocal<Socket> threadLocal) {
       this.threadLocal = threadLocal;
    }
    public void run() {
        Socket socket = threadLocal.get();
        // use the socket ...
    }
}

【讨论】:

  • 没有。 ThreadFactory 每个线程只调用一次。因此,如果您的线程池有 10 个线程,则只有在初始化池并启动线程时才会创建套接字。不是每个任务@SAFX。
  • 记住@SAFX,newThread 方法中的run() 方法不是你的任务运行。这是ExecutorService 用来使任务出列并运行它们的run 方法。
  • 是的,有点@SAFX。 r.run() 是并发代码内部的可运行文件。它将您的任务从内部任务阻塞队列中取出并调用您的任务的run()(捕获异常等)。我将在我的答案中添加一个示例任务类。
  • 1) 所以你会说executor.execute(new SocketWorker(socktThreadLocal));。这允许您在启动任务的类中定义线程本地。或者您可以在@jontejj 的回答中使用static thread-local。 2)哎呀,这是一个错误。修复了@SAFX。
  • 让它工作@Gray,没有将threadLocal传递给SocketWorker,我只是定义了一个private static final ThreadLocal&lt;Socket&gt;并覆盖initialValue()以在SocketWorker中第一次创建套接字;发送给执行程序的任何任务都只需重用下一个可用线程及其本地套接字,就像一个魅力一样,谢谢。
猜你喜欢
  • 2018-04-25
  • 1970-01-01
  • 2016-07-01
  • 2013-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-05-15
相关资源
最近更新 更多