【问题标题】:Multiple Thread modifying the same command attributes多个线程修改相同的命令属性
【发布时间】:2012-08-15 09:30:48
【问题描述】:

问题陈述:-

在下面的程序中,我使用ThreadPoolExecutorArrayBlockingQueue

每个线程每次都需要使用UNIQUE ID,并且必须为60 minutes or more 运行,所以在那个60 minutes 中,所有ID's will get finished 都有可能,所以我需要再次重用这些ID。所以我在这里使用ArrayBlockingQueue 概念。

两种场景:-

  1. 如果command.getDataCriteria() 包含Previous,那么每个 线程总是需要使用UNIQUE ID between 1 and 1000 和 释放它以再次使用。
  2. 否则,如果command.getDataCriteria() 包含New,那么每个 线程总是需要使用UNIQUE ID between 2000 and 3000 和 释放它以再次使用。

我目前在使用以下程序时遇到什么问题-

我面临的一个问题是

  • run method 中,如果command.getDataCriteria()Previous,那么它也会进入else if block(which is for New),这不应该发生吗?我也在做.equals check?为什么会这样?可能是因为很多线程会同时启动并且之前修改过命令?

else if(command.getDataCriteria().equals("New")) {

如果多线程正在修改它,那么我该如何解决这个问题?无论发生什么问题,它都发生在run method 中。任何建议都会有很大帮助,因为我长期以来一直坚持这一点。可能我们需要Synchronize the threads, so that no other thread should modify the command when another thread is trying to execute it.

public synchronized void runNextCommand() {

LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();
LinkedList<Integer> availableNewIds = new LinkedList<Integer>();

executorService = new ThreadPoolExecutor(noOfThreads, noOfThreads, 500L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(noOfThreads), new ThreadPoolExecutor.CallerRunsPolicy());

    // If there are any free threads in the thread pool
    if (!(((ThreadPoolExecutor) executorService).getActiveCount() < noOfThreads))
    return;

    for (int i = 1; i <= 1000; i++) {
    availableExistingIds.add(i);
    }

    for (int n = 2000; n <= 3000; n++) {
    availableNewIds.add(n);
    }

    BlockingQueue<Integer> existIdPool = new ArrayBlockingQueue<Integer>(1000, false, availableExistingIds);
    BlockingQueue<Integer> newIdPool = new ArrayBlockingQueue<Integer>(1001, false, availableNewIds);

    // Running for particular duration of time
    while(System.currentTimeMillis() <= endTime) {
    Command nextCommand = getNextCommandToExecute();
    Task nextCommandExecutorRunnable = new Task(nextCommand, existIdPool, newIdPool);
    executorService.submit(nextCommandExecutorRunnable);
    }

    executorService.shutdown();
    if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
    executorService.shutdownNow();
    }
}

runnable的实现(真正的单元级命令执行器)

private static final class Task implements Runnable {
private Command command;
private DPSclient m_DPSclient = null;
private DPSclient psc = null;
private BlockingQueue<Integer> existPool;
private BlockingQueue<Integer> newPool;
private int existId;
private int newId;
private static Object syncObject = new Object();  


public Task(Command command, BlockingQueue<Integer> pool1, BlockingQueue<Integer> pool2) {
    this.command = command;
    this.existPool = pool1;
    this.newPool = pool2;
}

public void run() {

  synchronized(syncObject) {
    if(command.getDataCriteria().equals("Previous")) {
    try {
        // Getting existing id from the existPool
        existId = existPool.take();
        attributeGetSetMethod(existId);
    } catch (Exception e) {
        getLogger().log(LogLevel.ERROR, e.getLocalizedMessage());
    } finally {
        // And releasing that existing ID for re-use
        existPool.offer(existId);       
    }
} else if(command.getDataCriteria().equals("New")) {
     try {
        // Getting new id from the newPool
        newId = newPool.take();
        attributeGetSetMethod(newId);
    } catch (Exception e) {
        getLogger().log(LogLevel.ERROR, e.getLocalizedMessage());
    } finally {
        // And releasing that new ID for re-use
        newPool.offer(newId);   
    }
    }
}
  }
}

感谢您对此的帮助。谢谢

更新 - Matt 建议的 getNextCommandToExecute method 的代码

// Get the next command to execute based on percentages
    private synchronized Command getNextCommandToExecute() {
    int commandWithMaxNegativeOffset = 0; // To initiate, assume the first one has the max negative offset
    if (totalExecuted != 0) {
        // Manipulate that who has max negative offset from its desired execution
        double executedPercentage = ((double)executedFrequency[commandWithMaxNegativeOffset] / (double)totalExecuted) * 100;
        double offsetOfCommandWithMaxNegative = executedPercentage - commands.get(commandWithMaxNegativeOffset).getExecutionPercentage();

        for (int j=1; j < commands.size(); j++) {
        double executedPercentageOfCurrentCommand = ((double)executedFrequency[j] / (double)totalExecuted) * 100;
        double offsetOfCurrentCommand = executedPercentageOfCurrentCommand - commands.get(j).getExecutionPercentage();

        if (offsetOfCurrentCommand < offsetOfCommandWithMaxNegative) {
            offsetOfCommandWithMaxNegative = offsetOfCurrentCommand;
            commandWithMaxNegativeOffset = j;
        }
        }
    }

    // Next command to execute is the one with max negative offset
    executedFrequency[commandWithMaxNegativeOffset] ++;
    totalExecuted ++;

    // This is for User Logging/No User Logging and Data is Previous/New
    LinkedHashMap<String, Double> dataCriteriaMap = (LinkedHashMap<String, Double>) sortByValue(commands.get(commandWithMaxNegativeOffset).getDataUsageCriteria());
    Set<Map.Entry<String, Double>> entriesData = dataCriteriaMap.entrySet();
    Iterator<Map.Entry<String, Double>> itData = entriesData.iterator();
    Map.Entry<String, Double> firstEntryData = itData.next();
    Map.Entry<String, Double> secondEntryData = itData.next();

    LinkedHashMap<Boolean, Double> userCriteriaMap = (LinkedHashMap<Boolean, Double>) sortByValue(commands.get(commandWithMaxNegativeOffset).getUserLoggingCriteria());
    Set<Map.Entry<Boolean, Double>> entriesUser = userCriteriaMap.entrySet();
    Iterator<Map.Entry<Boolean, Double>> itUser = entriesUser.iterator();
    Map.Entry<Boolean, Double> firstEntryUser = itUser.next();
    Map.Entry<Boolean, Double> secondEntryUser = itUser.next();

    double percent = r.nextDouble() * 100;

    if (percent < secondEntryData.getValue().doubleValue()) {
        commands.get(commandWithMaxNegativeOffset).setDataCriteria(secondEntryData.getKey());
    } else {
        commands.get(commandWithMaxNegativeOffset).setDataCriteria(firstEntryData.getKey());
    }

    if (percent < secondEntryUser.getValue().doubleValue()) {
        commands.get(commandWithMaxNegativeOffset).setUserLogging(secondEntryUser.getKey());
    } else { 
        commands.get(commandWithMaxNegativeOffset).setUserLogging(firstEntryUser.getKey());
    }

    return commands.get(commandWithMaxNegativeOffset);
    }

并且命令已经在类的顶部声明为-

private static List<Command> commands;

更新另一种方法:-

private synchronized void attributeGetSetMethod(int id_range) {

        requestlTransaction requestlTransaction = null;
        try {
        GUID_VALUES = new LinkedHashMap<Integer, String>();

        // I am not sure how CAL logging has to be done, it has to be at each attribute level or something else? So that is the reason I left this thing.

        if(!(command.getAttributeIDSet().isEmpty())) {

            requestlTransaction = requestlTransactionFactory.create("DPSLnPTest");
            m_DPSclient = setupDPS(command.getName(), getDPSAttributeKeys(command.getDataCriteria(), command.getUserLogging() , id_range));


            for(String attr: command.getAttributeIDSet()) {

            requestlTransaction.setName("DPSAttributeSet");
            requestlTransaction.setStatus("0");
            //requestlTransaction.addData("IpAddress", ipAddress);

            if(attr.contains("/")) {
                lengthOfString = Integer.parseInt(attr.split("/")[1]);
                attr = attr.split("/")[0];
            }
            DPSAttribute attr1 = new DPSAttribute();
            attr1.setRequestAttributeId(new DPSAttributeId(Integer.parseInt(attr)));
            DPSMetadataMgr mgr = DPSMetadataMgr.getInstance();
            DPSRequestAttributeMetadata metadata = mgr.getRequestAttributeMetadataById(Integer.parseInt(attr));
            int maxOccurs = metadata.getMaxOccurs();
            String dataType = metadata.getAttributeTypeAlias();

            DPSAttributeValue attrValue1 = getRequestAttribute(dataType, lengthOfString);

            if(maxOccurs>1) {
                DPSListAttributeValue listAttrValue = new DPSListAttributeValue();
                List<DPSAttributeValue> list = new ArrayList<DPSAttributeValue>();
                list.add(attrValue1);
                listAttrValue.setList(list);
                attr1.setRequestAttributeValue(listAttrValue);
                m_DPSclient.setDPSAttribute(attr1);
            } else {
                attr1.setRequestAttributeValue(attrValue1);         
                m_DPSclient.setDPSAttribute(attr1);
            }
            }

            List<DPSAttribute> idKeys = m_DPSclient.release(PersistenceEnum.COMMIT, false);

            // Iterating through the keys and storing into HashMap
            Iterator<DPSAttribute> i = idKeys.iterator();
            while (i.hasNext()) {
            DPSAttribute DPSAttribute = (DPSAttribute)(i.next());
            DPSAttributeId id = DPSAttribute.getAttributeId();
            DPSAttributeValue value = DPSAttribute.getRequestAttribute();

            if(id.getId() == DPSLnPConstants.CGUID_ID && (value)!= null) {
                DPSLnPConstants.CGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.CGUID_ID, DPSLnPConstants.CGUID_VALUE);
            } else if(id.getId() == DPSLnPConstants.SGUID_ID && (value)!= null) {
                DPSLnPConstants.SGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.SGUID_ID, DPSLnPConstants.SGUID_VALUE);
            } else if(id.getId() == DPSLnPConstants.PGUID_ID && (value)!= null) {
                DPSLnPConstants.PGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.PGUID_ID, DPSLnPConstants.PGUID_VALUE);
            } else if(id.getId() == DPSLnPConstants.UID_ID && (value)!= null) {
                DPSLnPConstants.UID_VALUE = String.valueOf(((DPSLongAttributeValue)value).getValue());
                GUID_VALUES.put(DPSLnPConstants.UID_ID, DPSLnPConstants.UID_VALUE);
            } else if(id.getId() == DPSLnPConstants.SITE_ID && (value)!= null) {
                DPSLnPConstants.SITEID_VALUE = String.valueOf(((DPSIntAttributeValue)value).getValue());
                GUID_VALUES.put(DPSLnPConstants.SITE_ID, DPSLnPConstants.SITEID_VALUE);
            } else if(id.getId() == DPSLnPConstants.ALOC_ID && (value)!= null) {
                DPSLnPConstants.ALOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.ALOC_ID, DPSLnPConstants.ALOC_VALUE);
            } else if(id.getId() == DPSLnPConstants.ULOC_ID && (value)!= null) {
                DPSLnPConstants.ULOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.ULOC_ID, DPSLnPConstants.ULOC_VALUE);
            } else if(id.getId() == DPSLnPConstants.SLOC_ID && (value)!= null) {
                DPSLnPConstants.SLOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.SLOC_ID, DPSLnPConstants.SLOC_VALUE);
            } else if(id.getId() == DPSLnPConstants.PLOC_ID && (value)!= null) {
                DPSLnPConstants.PLOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.PLOC_ID, DPSLnPConstants.PLOC_VALUE);
            }
            }

            // Storing all the locators, guid in a map corresponding to an ID, then later on insert everything directly into db
            GUID_ID_MAPPING.put(id_range, GUID_VALUES);

            // Sleeping the command for particular milliseconds
            // One thing not sure, I should be sleeping the command here or I should put it above this comment line '// Iterating through the keys'
            Thread.sleep(command.getSleepTime());
        } 

        // for get attributes   
        // And also how CAL logging has to be done here too. And we can use same DPS Smart Client that got created above to get the attributes value?

        if(!(command.getAttributeIDGet().isEmpty())) {

            requestlTransaction.setName("DPSAttributeGet");
            requestlTransaction.setStatus("1");
            psc = setupDPS(command.getName(), getDPSAttributeKeys(command.getDataCriteria(), command.getUserLogging() , id_range));

            for(String attr: command.getAttributeIDGet()) {
            DPSAttribute attribute = new DPSAttribute();
            attribute = psc.getDPSAttribute(new DPSAttributeId(Integer.parseInt(attr)));
            //System.out.println(attribute.getAttributeId()+ " " +attribute.getRequestAttribute());
            }
        }
        } catch(Exception e) {
        getLogger().log(LogLevel.ERROR, e);
        } finally {
        requestlTransaction.completed();
        }

    }

【问题讨论】:

  • 如果您将代码发布到 getNextCommandToExecute 方法,可能更容易弄清楚。我没有立即看到任何大问题(尽管我没有深入研究),但是如果您重新使用可能会产生影响的命令对象。
  • 感谢马特的建议。我已经用getNextCommandToExecute 方法的代码更新了这个问题。如果您需要我提供任何其他信息来找出问题的正确原因,请告诉我。
  • 您是否希望多个线程访问Command 的同一个实例?
  • @Brad,我猜我不需要...

标签: java multithreading thread-safety threadpool threadpoolexecutor


【解决方案1】:

据我所知,如果command.getDataCriteria() 应该只被一个线程访问,那么它的值不应该因为线程同步问题而改变。假设我们要确保没有两个线程获得对象实例的句柄。

您拥有synchronized getNextCommandToExecute(),但没有证据表明您的commands 集合会根据您的定义private static List&lt;Command&gt; commands; 同步。如果上述假设成立,我们希望保证没有两个线程可以get() Collection 中的同一个对象实例。

代码中还有什么地方是访问 commands 集合的地方?

如果commands.get() 可以正确同步,则返回的实例不应该有任何线程争用。哪个线程获得的是一个对象拥有它。

...随时告诉我我找错树了

[编辑]关注您的 cmets

我很难确定出了什么问题,因为我没有所有代码,并且需要做出假设。在我看来,您在许多方法中添加了 synchronized 关键字,并期望它能够解决您的问题。我认为最好的方法是尽量减少真正需要同步的代码行,这需要更清楚地了解真正需要同步的内容。

  1. 您不想跨线程共享对 Command 对象的引用,因此请确保您对 List&lt;Command&gt; commands 的 get() 和 add() 操作是同步的。您可以使用synchronizedListConcurrentLinkedQueue

  2. 你能让 Command 对象不可变吗? (即没有 setX() 方法)

【讨论】:

  • 感谢您的建议。所以你说我需要同步commands.get(),但我已经同步getNextCommandToExecute方法,我怎么能同步commands.get方法?基本上我将每个command attributes 添加到commands 并且我已将commands 声明为private static List&lt;Command&gt; commands; 在类的顶部?您还需要我在代码中提供什么其他内容来确定问题的根本原因吗?
  • 并且命令集合也可以从run method中的attributeGetSetMethod方法访问,我也做了synchronized too
  • attributeGetSetMethod中我只传some id,所以我需要传command object also in that method
  • 可能,我需要制作这个列表private static List&lt;Command&gt; commandsthread safe?
  • 感谢布拉德的详细建议。非常感谢。让我先试试这些东西。如果我发现任何可疑的东西会更新..
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-08-03
  • 1970-01-01
  • 2022-10-25
  • 1970-01-01
  • 2015-03-31
  • 2023-04-07
相关资源
最近更新 更多