【问题标题】:passing a parameter to threadpoolexecutor将参数传递给线程池执行程序
【发布时间】:2019-02-22 14:10:59
【问题描述】:

我查看了多线程并尝试实现一个应用程序,该应用程序创建单独的线程来运行收集进程。该过程中的主要方法需要一个变量arraylist,我正在尝试找出一种将arraylist传递给每个线程的方法。

ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
    ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) context.getBean("taskExecutor");

    MainTask mxTask = (MainTask) context.getBean("MainTask");
    mxTask.setName("Thread 1");
    taskExecutor.execute(mxTask);

    MainTask mxTask2 = (MainTask) context.getBean("MainTask");
    mxTask2.setName("Thread 2");
    taskExecutor.execute(mxTask2);

上面是声明线程的类方法,其中 MainTask 是执行 run() 方法的类,然后调用其他主要方法。

@Override
public void run() {
    System.out.println(name + " is running.");
    getConfigurations();
    try {
            mainRun();


    } catch (MessagingException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

getconfigurations() 方法读取所有必要的配置变量。下面是每个线程需要运行的主进程。

public static void mainRun() throws IOException {

    ArrayList<String> filterList = new ArrayList<String>();
    ArrayList<String> brokerList = new ArrayList<String>();


    if(kafkaServers1 != null) {
        brokerList.add(kafkaServers1);
    } else if(!kafkaServers2.isEmpty()) {
        brokerList.add(kafkaServers2);
    } else if(!kafkaServers3.isEmpty()) {
        brokerList.add(kafkaServers3);
    }

    ArrayList<String> serverList = new ArrayList<String>();

    for(int x=0;x<brokerList.size();x++){
        String[] serverBrokers = brokerList.get(x).split(",");
        serverList.add(serverBrokers[0]);
        serverList.add(serverBrokers[1]);
        serverList.add(serverBrokers[2]);

    }


    try {

        while(true){
            for (String temp : serverList) {
                kafkaServer = temp;
                hostName = kafkaServer;
                InetAddress addr = InetAddress.getByName(hostName);
                hostName = addr.getHostName();
                kafkaServer= hostName;
                retrieveData(hostName);

...

变量 kafkaServers1 包含 3 个 ip 的列表,这些 ip 被拆分并添加到 serverList 数组列表中。我要做的只是为每个线程分配一个 ip。这可能吗?有人可以请教吗?

【问题讨论】:

  • 如果你已经在使用 Spring,为什么不使用 Spring Kafka?
  • 对不起;你什么意思?我没有运行 kafka 消费者;我只需要从服务器读取指标。
  • 啊,好吧,那更有意义。我以为你正在尝试从某个主题或其他内容中检索数据。

标签: java spring applicationcontext threadpoolexecutor


【解决方案1】:

您可以检索 Kafka 配置,然后为每个主机创建一个新的 Runnable,而不是让您的 Runnable 对象成为一个 bean。提交给执行人。

MyRunnable infoGetter = new MyRunnable(hostInfo);
taskExecutor.execute(infoGetter);

如果您需要对结果做一些事情,CompletableFuture 可能是更好的选择,或者如果您需要合并它们,则使用 fork/join RecursiveTask&lt;V&gt;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-06-01
    • 2011-02-18
    • 1970-01-01
    • 2015-03-08
    • 2014-01-05
    • 2020-09-18
    相关资源
    最近更新 更多