【问题标题】:Java Get MulticastSocket.receive to Throw ClosedByInterruptExceptionJava 获取 MulticastSocket.receive 以抛出 ClosedByInterruptException
【发布时间】:2017-03-16 01:22:13
【问题描述】:

我有一些代码从多播套接字读取数据,直到用户定义的结束时间。如果线程通过调用Thread.interrupt(或您可以提出的任何其他用户启动的操作)中断,我也想停止读取数据。我不知道如何在线程中断时获得通知。现有代码如下:

// These are the constants I am given
final int         mcastPort = ...;
final InetAddress mcastIP   = ...;
final InetAddress ifaceIP   = ...; // Null indicates all interfaces should be used
final Instant     endTime   = ...; // Time at which to stop processing

// Initialize a datagram
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

// Process incoming datagram packets
try (final MulticastSocket socket = new MulticastSocket(port)) {
    socket.joinGroup(mcastIP);
    if (ifaceIP != null)
        socket.setInterface(ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        socket.setSoTimeout(soTimeout);
        socket.receive(packet);

        // Process packet
        ...
    } while (true);
} catch (final SocketTimeoutException e) {
    // Normal condition... the recording time has ended
} catch (final ClosedByInterruptException e) {
    // Uh-oh... this never happens
} ...

我看到有一个DatagramSocket.getChannel 方法返回一个DatagramChannel,所以我自然假设该类型用于读取/写入底层套接字。这个假设是不正确的,这意味着MulticastSocket 没有实现InterruptibleChannel。因此,MulticastSocket.receive 永远不会抛出 ClosedByInterruptException

我在网上搜索了示例,但不知道如何修改上述代码以使用DatagramChannel 而不是MulticastSocket。我需要帮助的问题是:

  1. 如何在 DatagramChannel 上设置 SO_TIMEOUT 参数?
  2. 如何将InetAddress 转换为NetworkInterface 对象?

以下是我对如何将我的实现从 MulticastSocket 转换为 DatagramChannel 以满足我的要求的最佳猜测:

// Initialize a buffer
final ByteBuffer buffer = ByteBuffer.allocate(1000);

try (final DatagramChannel mcastChannel = DatagramChannel.open()) {
    mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    mcastChannel.connect(new InetSocketAddress(port));
    mcastChannel.join(mcastIP);
    if (ifaceIP != null)
        // HELP: this option requires an InterfaceAddress object,
        //       but I only have access to an InetAddress object
        mcastChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        // HELP: SO_TIMEOUT is not a member of StandardSocketOptions
        mcastChannel.setOption(SO_TIMEOUT, ???);
        mcastChannel.receive(buffer);

        // Process packet
        ...
    } while (true);
} ...

这种方法会奏效吗? DatagramChannel.receive 没有将 SocketTimeoutException 列为它能够抛出的异常之一。如果这可行,那么请让我知道我需要如何将第二个实现更改为与第一个实现等效,但能够在客户端调用Thread.interrupt 时抛出ClosedByInterruptException。如果没有,那么是否有人对我如何满足在预定义时间停止数据报接收的要求,同时还提供一种通过用户交互停止执行的方法有任何其他想法?

【问题讨论】:

  • 不要对未引用的文本使用引号格式。

标签: java sockets multicast multicastsocket


【解决方案1】:

如何在 DatagramChannel 上设置 SO_TIMEOUT 参数?

致电channel.socket().setSoTimeout()

如何将InetAddress 转换为NetworkInterface 对象?

您枚举网络接口,直到找到具有所需地址的网络接口。

DatagramChannel.receive() 未列出 SocketTimeoutException

没必要。它列出了IOException,并且SocketTimeoutException 扩展了IOException

你的第二段代码应该调用bind(),而不是connect(),并且它应该在调用join()之前设置接口,而不是之后。除此之外,一旦你修复了网络接口问题,它应该可以正常工作,如果被中断,它将抛出ClosedByInterruptException

【讨论】:

    【解决方案2】:

    最终的工作代码是:

    // Create a datagram packet used to read multicast data from the socket
    final byte[] buffer = new byte[1000];
    final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
    
    // Process incoming datagram packets
    try (final DatagramChannel mcastChannel =
            DatagramChannel.open(StandardProtocolFamily.INET)) {
    
        // Set the appropriate parameters on the socket
        mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        mcastChannel.bind(new InetSocketAddress(port));
        if (ifaceIP == null) {
            // Call join on each NetworkInterface that supports IPv4 multicast
        } else {
            mcastChannel.join(mcastIP, NetworkInterface.getByInetAddress(ifaceIP));
        }
    
        final DatagramSocket socket = mcastChannel.socket();
    
        do {
            final Duration timeToEnd = Duration.between(Instant.now(), endTime);
            if (timeToEnd.compareTo(Duration.ZERO) < 0) break;
    
            socket.setSoTimeout((int)timeToEnd.toMillis());
            socket.receive(packet);
    
            // Process packet
            ...
        } while (true);
    } catch (final SocketTimeoutException e) {
        // The end time has passed
    } catch (final ClosedByInterruptException e) {
        // The user initiated the closure
    } ...
    

    注意事项:

    1. 呼叫socket.receive。如果将其更改为mcastChannel.receive,您将永远无法获得SocketTimeoutException
    2. bind 的调用发生在对join 的调用之前。您可以在多个不同的网络接口上多次调用join

    【讨论】:

      猜你喜欢
      • 2011-04-02
      • 2011-11-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-03
      • 2020-06-28
      • 2015-06-24
      相关资源
      最近更新 更多