【问题标题】:Scoped interrupts, do they exist?范围中断,它们存在吗?
【发布时间】:2011-08-30 02:08:23
【问题描述】:

我需要一些系统给我范围内的中断,这样当 Thread.sleep(..) 没有执行时中断线程被禁用。我通常会使用可以正常工作的受保护块,但是在这种情况下我不能这样做有几个原因,主要是因为争用会阻止整个事情正常工作。模拟只是在几秒钟内冻结。范围中断将取代任何阻塞机制,并防止中断“泄漏”到正在测试中断的方法之外。在这种情况下,它将是Thread.sleep(...)。但是,我想知道这是否要求在 Java 中根本不可能的事情。也许守卫的阻塞机制是可以做到的最好的。

我已经创建了自己的 Selector 实现,用于网络模拟器(选择器与模拟器回调等交互以进行优化),基本上我目前正在使用 CountDownLatch。这在某些情况下很好,而在其他情况下,它会造成更糟糕的阻塞,同时不会冻结整个模拟本身(但是,它太慢以至于无法使用)。取决于使用的网络框架,代码的哪些部分比其他部分更受打击。我还认为在“好”模拟中可能会出现问题,很少会导致等待 13 毫秒,因此在获取内部 CountDownLatch 或内部发生任何魔法时。

如果您可以在没有上述任何建议的情况下解决问题,那就太棒了:) 无论如何,我的代码如下:

/**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            wakeup = false;
            selecting = true;
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {
                            wakeup();
                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                    processDeregisterQueue();
                }
            } finally {
                selecting = false;
                wakeup = false;
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (selectedKeys) {
            wakeup = true;
            selectedKeys.notifyAll();
        }
        return this;
    }
}

很抱歉在这种情况下没有发布 SCEE,但在这种情况下有点困难。任何建议都会有所帮助。

干杯,
克里斯

【问题讨论】:

    标签: java networking concurrency locking css-selectors


    【解决方案1】:

    作用域中断不存在。

    但是,当 sleepwait 或以 InterruptedException 终止的任何内容时,“中断”标志将在引发异常之前被清除。因此,如果您在某个块边界处捕获异常,您就可以确定范围。同样,Thread.interrupted() 测试并清除“中断”标志。


    在更仔细地阅读您的代码时,我意识到您(实际上)根本没有使用中断。相反,您使用的是notify,而notify 只会唤醒正在执行wait 的线程。

    【讨论】:

    • 问题在于中断时范围的粒度——它不断泄漏到父 try/catch 中,因为无法判断我是否在可中断代码中定义的范围。就像我说的,我需要避免使用受保护的块机制,因为它在这种情况下不起作用。这是一个真正的头脑风暴。涉及 LockSupport.park(..) 的东西可能会起作用,这是同步构造的骨架。
    • 我无法想象你的建议,虽然听起来不错:) 我想知道 interrupted() 标志是否真的可以帮助防止中断泄漏到父异常捕获器中。跨度>
    • @Chris - 查看 javadocs 中抛出 InterruptedException 的方法
    • @Chris - 我并不是说这是真正的范围界定:它不是。相反,我是说这是你可能得到的最接近的结果。
    猜你喜欢
    • 2017-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-30
    • 2012-01-25
    • 2019-03-22
    • 2021-12-09
    • 2018-06-27
    相关资源
    最近更新 更多