【问题标题】:Executor framework abnormal behaviourExecutor框架异常行为
【发布时间】:2015-06-16 13:56:30
【问题描述】:

您好,我正在使用 Executors 来并行加载一些数据。我的应用程序正在从数据库中获取一些具有父子关系的数据,例如:

parent 1 -> [child11, child12,..., child1N]
parent 2 -> [child21, childy22,..., child2N]
.....
parent N -> [childN1, childyN2,..., childNN]

现在我想要并行处理。我现在正在做的是加载所有数据 一次从数据库中设置一个父子,并调用执行器服务来映射关系中的那些并存储在我的数据结构中。

现在我有这个代码:

父子关系如下:

public class Post implements Serializable {

    private static final long serialVersionUID = 1L;

    private Integer postId;
    private String  postText;
    private String  postType;
    private Integer menuItemId;
    private boolean parentPost;
    private Integer parentPostId;
    // Contains all the Child of this Post
    private List<Post> answers = new ArrayList<Post>();
    ....
    //getters and setters
}

现在我有一个用于同步的 Post 类的包装器

public class PostList {

    private List<Post> postList;

    public PostList() {
        super();
        this.postList = new ArrayList<Post>();
    }

    public List<Post> getPostList() {
        return postList;
    }

    public synchronized boolean add(Post post) {
        return postList.add(post);
    }

    public synchronized boolean addAnswer(Post answer) {
        for(Post post : postList)
        {
            if(post.getPostId() == answer.getParentPostId())
            {
                post.getAnswers().add(answer);
                break;
            }
        }
        return true;
    }
}

现在我从数据库加载代码是:

/* This is called to load each parent-child set at a time, when the 
first set is fetched from DB then call to executor to store those in  
internal data structure. */

List<Post> posts = null;
PostList postList = null;
Integer args[] ={menuItemId};
// Fetch all Posts which are in parent child relation
posts = getDataFromDB(...)
if(posts != null && posts.size() >0)
{
    postList = new PostList();
    ExecutorService executor = Executors.newFixedThreadPool(10);
    for(Post post : posts)
    {
         executor.execute(new PostProcessor(post, postList));
    }
    logger.debug("Starting executor shutdown...");
    executor.shutdown();
    while (!executor.isTerminated()) {
        try {
            executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            logger.error("Interrupted executor >>", ex.getMessage(), ex);
        }
    }
    logger.debug("All post loading done ...");
    logger.debug("PostList >> " + postList);
    if(postList.getPostList() != null)
        return postList.getPostList();
}

在后处理器中我有

public class PostProcessor implements Runnable {

    private Post post;
    private PostList postList;


    public PostProcessor(Post post, PostList postList) {
        super();
        this.post = post;
        this.postList = postList;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        Post answer = null;
        try
        {
            // if Post is parent / is a question
            if ("Q".equalsIgnoreCase(post.getPostType())) 
            {
                // do some operation
                postList.add(post);
            }
            // Post is an Answer, so add the answer to proper Question
            else {
                answer = post;
                postList.addAnswer(answer);
            }
            Thread.sleep(1000);
        }
        catch(Throwable throwable)
        {
            logger.error(throwable.getMessage(),throwable);
        }
    }
}

但它表现异常,有时它会加载所有问题帖子但不是所有答案,有时它根本没有加载父帖子。请帮助我在哪里做错了。

【问题讨论】:

  • 为什么不使用像 Hibernate 这样的 ORM?
  • 多线程意味着没有顺序保证。但是您的代码依赖于排序,如果尚未加载问题,它会默默地添加答案。
  • 谢谢,是的。这种方法一定会有所帮助。但是后来我需要集成hibernate。但目前我正在寻找我对这段代码所做的错误。
  • @Hogler。但是我在我的数据库查询中通过 post id 排序,这是一个自动递增的键。事实上,任何类型问题的帖子都将在其任何子项之前添加到 db 中。因此将出现在列表中它的孩子之前。并且列表保持迭代顺序
  • 顺便说一句,目前还不清楚为什么要使用线程池。您所做的所有工作就是在synchronized 代码块中向ArrayLists 添加项目。多线程在这里没有任何好处,而且您似乎没有理解使用多线程的含义,所以最好的解决方案就是不使用它。

标签: java multithreading concurrency executorservice java.util.concurrent


【解决方案1】:

如果 addAnswer 失败,那么它应该返回 false 或抛出异常;这表明相应的问题尚未加载或不存在。两种选择:

  1. 首先处理所有问题,如果答案与问题不匹配,则抛出异常。
  2. 当您查询数据库时,获取问题计数并在每次处理问题时递减(在将问题添加到帖子列表后进行递减,否则您可能会以question_count == 0,但尚未将问题添加到列表中);如果答案与问题和question_count &gt; 0 不匹配,则将答案放回队列中,否则抛出异常。

更多的是效率而不是正确性,我建议您消除synchronized 方法并改用java.util.concurrent 中的线程安全数据结构 - 这将减少锁争用。这看起来像

public class PostList {

    private AtomicInteger questionCount;
    private ConcurrentLinkedQueue<Post> questions;
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<Post>> answers;

    public boolean addQuestion(Post post) {
        questions.offer(post);
        if(answers.putIfAbsent(post.getPostId(), new ConcurrentLinkedQueue<>()) 
             != null) {
            questionCount.decrementAndGet();
            return true;
        } else throw new IllegalArgumentException("duplicate question id");
    }

    public boolean addAnswer(Post answer) {
        ConcurrentLinkedQueue<Post> queue = answers.get(answer.getParentPostId());
        if(queue != null) {
          queue.offer(answer);
          return true;
        } else if(questionCount.get() > 0) {
          return false;
        } else {
          throw new IllegalArgumentException("answer has no question");
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多