【问题标题】:Thread is not waiting for new data using Condition线程不使用 Condition 等待新数据
【发布时间】:2014-01-16 18:01:57
【问题描述】:

我正在尝试为实验室编写一些程序,该实验室基本上是一个将数据存储在文件夹中的网络缓存,因此如果客户想要打开网页,如果该页面不存在,则该页面将存储在缓存文件夹中,并将显示给客户。我能够下载和显示页面,但是当线程必须等待时出现问题,因为请求的资源正在被下载并存储在缓存文件夹中。

下载类如下:

public class Downloads {
private ArrayList<DownloadsNode> nodes;
private int debug,readersCount=0,writersCount=0;
private Lock readLock=new ReentrantReadWriteLock().readLock();
private Lock writeLock=new ReentrantReadWriteLock().writeLock();    

public Downloads ( int debug )
{   nodes = new ArrayList<DownloadsNode>();
    this.debug = debug;
}


public synchronized DownloadsNode findNode ( URL url )
{   Iterator<DownloadsNode> i = nodes.iterator();
    while ( i.hasNext() )
    {   DownloadsNode node = i.next();
        if (node.getUrl().equals(url))
            return node;
    }
    return null;
}

public synchronized DownloadsNode addNode ( URL url )
{   DownloadsNode node = new DownloadsNode ( url );
    nodes.add ( node );
    return node;
}

public synchronized void deleteNode ( DownloadsNode node )
{   nodes.remove ( node );
}

public synchronized void accessAsAReader()
{
    while(writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
    readLock.lock();
    readersCount++;
}

public synchronized void accessAsAWriter()
{
    while(readersCount>0||writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }
    }
    writeLock.lock();
    writersCount++;
}

public synchronized void outAsAReader()
{
    readLock.unlock();
    readersCount--;
    this.notifyAll();
}

public synchronized void outAsAWriter()
{
    writeLock.unlock();
    writersCount--;
    this.notifyAll();
}

public Lock getreadLock()
{
    return this.readLock;
}
}

DownloadsNode 类如下:

public class DownloadsNode {
private int waiting;
private URL url;
private cacheNode node;
private Lock readLock=new ReentrantReadWriteLock().readLock();
private Lock writeLock=new ReentrantReadWriteLock().writeLock();    
private int readersCount=0,writersCount=0;

public DownloadsNode ( URL url )
{   waiting = 1;
    this.url = url;
}

public URL getUrl()
{   return url;
}

public synchronized void setnodeC ( cacheNode node )
{   this.node = node;
}

public synchronized cacheNode getnodeC()
{   return node;
}

public synchronized int getwaiting()
{   return waiting;
}

public synchronized void addWaiting()
{   waiting++;
}


public synchronized void quitWaiting()
{   waiting--;
}

    public synchronized void accessAsAReader()
{
    while(writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
    readLock.lock();
    readersCount++;
}

public synchronized void accessAsAWriter()
{
    while(readersCount>0||writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }
    }
    writeLock.lock();
    writersCount++;
}

public synchronized void outAsAReader()
{
    readLock.unlock();
    readersCount--;
    this.notifyAll();
}

public synchronized void outAsAWriter()
{
    writeLock.unlock();
    writersCount--;
    this.notifyAll();
}

public Lock getreadLock()
{
    return this.readLock;
}
}

缓存类如下:

public class Cache {
private HashMap<URL,cacheNode> map; 
private Lock readLock=new ReentrantReadWriteLock().readLock();
private Lock writeLock=new ReentrantReadWriteLock().writeLock();    
private boolean added=false;

public Cache( int up_edge, int lim_inf, int debug )
{   
       //The content of this doesn't matter
}

public synchronized cacheNode findNode ( URL url )
{   
    cacheNode cacheNode= map.get ( url );
    return cacheNode;
}


public synchronized cacheNode addNode ( resourceWeb resource, AtomicInteger referencias )
{   

    cacheNode node = new cacheNode ( resource, referencias );
    map.put ( resource.getUrl(), node );
    added=true;
    return node;
}

public boolean getadded()
{
    return added;
}

    public synchronized void accessAsAReader()
{
    while(writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
    readLock.lock();
    readersCount++;
}

public synchronized void accessAsAWriter()
{
    while(readersCount>0||writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }
    }
    writeLock.lock();
    writersCount++;
}

public synchronized void outAsAReader()
{
    readLock.unlock();
    readersCount--;
    this.notifyAll();
}

public synchronized void outAsAWriter()
{
    writeLock.unlock();
    writersCount--;
    this.notifyAll();
}

public Lock getreadLock()
{
    return this.readLock;
}
}

之前的所有类都来自以下一个:

public class MainThread extends Thread{
private ArrayBlockingQueue<Petition> petitionQueue;
private Cache cache;
private int debug;

//constructor goes here but its content doesn't matter

public void run()
{
    webResource webResource = null;
    while(true)
    {
        try {
            CacheNode cNode;
            do
            {

                Petition petition = PetitionQueue.take();


                //Searchs the web resource inside the cache
                cache.accessAsAReader();
                synchronized(this)
                {
                    cNode = cache.findNode ( petition.getURL() );
                }
                if ( cNode != null )    // If it's found
                {   
                    cache.outAsAReader();
                    webResource = cNode.webResource;
                }                   

                else // if it's not found
                {
                    cache.outAsAReader();

                    Downloads downloads=new Downloads(debug);
                    DownloadsNode node;
                    downloads.accessAsAReader();
                    synchronized(this)
                    {
                        node=downloads.findNode(petition.getURL());
                    }

                    **if(node!=null)
                    {
                        downloads.outAsAReader();                           
                        /*Another thread is downloading to store in the cache*/
                        /*We indicate that we are waiting. Here is my problem if I'm not wrong*/
                        node.accessAsAWriter();                     
                        synchronized(this)
                        {
                            node.incrementWaiting();
                        }
                        Condition waitingCondition=node.getReadLock().newCondition();
                        while(cache!=null&&!cache.getAdded())
                        {
                            waitingCondition.await();   
                        }
                        /*Delete from count of waiting threads*/

                        synchronized(this)
                        {
                            nodo.decrementWaiting();
                        }
                        node.outAsAWriter();
                        /*If there aren't wating threads, delete the entry from downloads list*/
                        if(node.getWaiting()==0)
                        {
                            downloads.accessAsAWriter();
                            synchronized(this)
                            {
                                downloads.delete(node);
                            }
                            downloads.outAsAWriter();
                        }
                    }
                    else
                    {
                        downloads.outAsAReader();
                        downloads.accessAsAWriter();
                        synchronized(this)
                        {
                            node=downloads.addNode(petition.getURL());
                        }
                        downloads.outAsAWriter();
                        // Download the web resource.
                        webResource = petition.download();


                        cache.accessAsAWriter();
                        synchronized(this)
                        {
                            cNode = cache.addNode ( webResource, new AtomicInteger(1) );

                        }
                        cache.outAsAWriter();
                        /*Check if other threads are waiting*/
                        if(node.getWaiting()>0)
                        {
                            Condition condition2=nodo.getWriteLock().newCondition();
                            condition2.signalAll();
                        }
                        else
                        {
                            downloads.accessAsAWriter();
                            synchronized(this)
                            {
                                downloads.delete(node);
                            }
                            downloads.outAsAWriter();
                        }
                    }

                }
                //Sends response to the client
            } while (true);

        } catch (InterruptedException e) {
             // TODO: poner una forma de terminar.
        } catch (Exception e) {

        }
    }
}

}

有几点需要考虑:

-我必须在每个节点中创建条件变量,以便线程会因此而等待并实现方法来打开/关闭锁并保证互斥。

-如果一个线程正在下载其他线程感兴趣的资源,我使用条件变量来等待,并在资源下载并存储在缓存中时说“停止等待”。我在那部分做错了什么? (全部在 MainThread 类中)

-在保证互斥的情况下,我是否必须将同步放在正确的方法中,执行以下方式: -锁 -同步(这个)并做我必须做的任何事情 -开锁 还是我在这两种方式上都做同样的事情?在管理下载列表和缓存时,我必须建立批评区域,所以我认为有些部分在不需要的方法名称中添加同步,或者我错误地使用了读写锁。

注意:由于我翻译了变量和方法的名称以便于理解,我可能写了一些不同的东西,但在我的程序中是正确的

非常感谢您

【问题讨论】:

  • 你需要发一个SSCCE sscce.org
  • 虽然我还没有通读所有这些代码,但我立即想到的一件事是您将synchronized 与锁的使用混合在一起。使用其中之一,而不是两者。如果您打算使用锁,您应该阅读the javadoc for the Lock class 以了解如何正确使用它们。或者,我怀疑您可以同时取消锁定和同步,并使用像 LinkedBlockingQueue 或 LinkedBlockingDeque 这样的并发集合,或者在每次下载时使用带有 Callable/Future 的 ExecutorService。

标签: java concurrency conditional-statements mutual-exclusion reentrantreadwritelock


【解决方案1】:

首先,阅读该代码非常困难。在您的 MainThread 中,您嵌套了几次 if 语句,并且没有深入的代码分析,很难判断出了什么问题。

但很少有事情引起我的注意。

Downloads 类中,您从两个不同的ReentrantReadWriteLock 对象中获取读写锁的引用!

private Lock readLock=new ReentrantReadWriteLock().readLock();
private Lock writeLock=new ReentrantReadWriteLock().writeLock(); 

这行不通!您应该为Download 对象创建一个ReentrantReadWriteLock 实例,并从中获取readLockwriteLock 锁的引用。读和写锁是相连的,所以当你锁定writeLock 时,你会阻止读者执行受保护的部分代码。请阅读有关ReadWriteLock 的更多信息,您可以查看使用ReentrantReadWriteLock in mbassador EventBus 的示例。

更重要的是,您也不需要锁定DownloadsDownloadsNode!因为您为每个线程创建了这些类的实例,并且您不在线程之间共享它。因此,您可以简单地从这些类中删除所有负责锁定的代码。线程之间共享的唯一对象是Cache 类的实例。

此外,您在MainThread 中不需要synchronized(this)!从此类创建的每个新线程都将在不同的对象 (this) 上同步。因此,添加所有这些 synchronized 块是没有意义的。最糟糕的是,我在您附加的代码中只看到一个线程。

我认为您应该删除所有负责锁定和同步的代码,然后分析女巫对象在线程之间共享并且应该防止并发访问。对我来说,唯一需要锁定的对象是 Cache 实例。

如果您发布工作 SSCCE,也许有人可以帮助您重构此代码。 VGR 在他的评论中指出了一些好的提示。

【讨论】:

  • 非常感谢 Marcin。我会检查一下,如果可行,再试一次。
猜你喜欢
  • 1970-01-01
  • 2020-04-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-01
  • 1970-01-01
  • 2010-12-16
  • 2011-02-04
相关资源
最近更新 更多