【问题标题】:Delays while sending data with Java NIO使用 Java NIO 发送数据时出现延迟
【发布时间】:2014-11-06 14:04:13
【问题描述】:

我需要您对 Java NIO 包的建议。我在通过网络发送数据包时遇到延迟问题。原始代码实际上是我的port of the SFML book source code to Java,但在这里我将只向您展示一个最小的工作示例,其中重现了问题。尽管此代码确实包含 SFML 库中的一些片段(实际上是创建一个窗口和一个事件循环),但我相信这对问题没有影响。

这里我只展示部分代码,完整版可用here

所以,程序有两个实体:服务器和客户端。如果您以服务器模式启动应用程序,则会创建一个 Server,开始侦听新的连接,并自动创建一个新的 Client 并尝试连接到该 Server。在客户端模式下,只会创建一个客户端并连接到服务器。

该应用程序还会创建一个新的基本 GUI 窗口并启动一个事件循环,一切都会在其中发生。

客户端向服务器发送数据包。它只通过记录接受的事实来处理它们。客户端可以发送两种类型的数据包:定期数据包(带有递增 ID)和事件数据包(应用程序对按下 SPACE 或 M 按钮作出反应)。

客户端发送数据包:

public void update(Time dt) throws IOException {
    if (!isConnected) return;

    if (tickClock.getElapsedTime().compareTo(Time.getSeconds(1.f / 20.f)) > 0) {
        Packet intervalUpdatePacket = new Packet();
        intervalUpdatePacket.append(PacketType.INTERVAL_UPDATE);
        intervalUpdatePacket.append(intervalCounter++);

        PacketReaderWriter.send(socketChannel, intervalUpdatePacket);

        tickClock.restart();
    }
}

public void handleEvent(Event event) throws IOException {
    if (isConnected && (event.type == Event.Type.KEY_PRESSED)) {
        KeyEvent keyEvent = event.asKeyEvent();

        if (keyEvent.key == Keyboard.Key.SPACE) {
            LOGGER.info("press SPACE");
            Packet spacePacket = new Packet();
            spacePacket.append(PacketType.SPACE_BUTTON);
            PacketReaderWriter.send(socketChannel, spacePacket);
        }

        if (keyEvent.key == Keyboard.Key.M) {
            LOGGER.info("press M");
            Packet mPacket = new Packet();
            mPacket.append(PacketType.M_BUTTON);
            PacketReaderWriter.send(socketChannel, mPacket);
        }
    }
}

服务器接受数据包:

private void handleIncomingPackets() throws IOException {
    readSelector.selectNow();

    Set<SelectionKey> readKeys = readSelector.selectedKeys();
    Iterator<SelectionKey> it = readKeys.iterator();

    while (it.hasNext()) {
        SelectionKey key = it.next();
        it.remove();

        SocketChannel channel = (SocketChannel) key.channel();

        Packet packet = null;
        try {
            packet = PacketReaderWriter.receive(channel);
        } catch (NothingToReadException e) {
            e.printStackTrace();
        }

        if (packet != null) {
            // Interpret packet and react to it
            handleIncomingPacket(packet, channel);
        }
    }
}

private void handleIncomingPacket(Packet packet, SocketChannel channel) {
    PacketType packetType = (PacketType) packet.get();

    switch (packetType) {
        case INTERVAL_UPDATE:
            int intervalId = (int) packet.get();
            break;
        case SPACE_BUTTON:
            LOGGER.info("handling SPACE button");
            break;
        case M_BUTTON:
            LOGGER.info("handling M button");
            break;
    }
}

这是一个PacketReaderWriter 对象:

package server;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class PacketReaderWriter {
    private static final int PACKET_SIZE_LENGTH = 4;
    private static final ByteBuffer packetSizeReadBuffer = ByteBuffer.allocate(PACKET_SIZE_LENGTH);
    private static ByteBuffer clientReadBuffer;

    private static byte[] encode(Packet packet) throws IOException {
        try (
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos)
        ) {
            oos.writeObject(packet);
            return baos.toByteArray();
        }
    }

    private static Packet decode(byte[] encodedPacket) throws IOException, ClassNotFoundException {
        try (ObjectInputStream oi = new ObjectInputStream(new ByteArrayInputStream(encodedPacket))) {
            return (Packet) oi.readObject();
        }
    }

    public static void send(SocketChannel channel, Packet packet) throws IOException {
        byte[] encodedPacket = encode(packet);

        ByteBuffer packetSizeBuffer = ByteBuffer.allocate(PACKET_SIZE_LENGTH).putInt(encodedPacket.length);
        packetSizeBuffer.flip();

        // Send packet size
        channel.write(packetSizeBuffer);

        // Send packet content
        ByteBuffer packetBuffer = ByteBuffer.wrap(encodedPacket);
        channel.write(packetBuffer);
    }

    public static Packet receive(SocketChannel channel) throws IOException, NothingToReadException {
        int bytesRead;

        // Read packet size
        packetSizeReadBuffer.clear();
        bytesRead = channel.read(packetSizeReadBuffer);

        if (bytesRead == -1) {
            channel.close();
            throw new NothingToReadException();
        }

        if (bytesRead == 0) return null;

        packetSizeReadBuffer.flip();
        int packetSize = packetSizeReadBuffer.getInt();

        // Read packet
        clientReadBuffer = ByteBuffer.allocate(packetSize);
        bytesRead = channel.read(clientReadBuffer);

        if (bytesRead == -1) {
            channel.close();
            throw new NothingToReadException();
        }

        if (bytesRead == 0) return null; 

        clientReadBuffer.flip();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        baos.write(clientReadBuffer.array(), 0, bytesRead);
        clientReadBuffer.clear();

        try {
            return decode(baos.toByteArray());
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        }
    }
}

这就是问题所在:在按下按钮(并从客户端发送相应的数据包)和在服务器上接受此数据包之间存在相当大的延迟。如果我在客户端模式下启动应用程序的新实例(简而言之,只需添加一个新客户端),延迟会变得更大。

我看不出为什么这些周期性数据包会产生如此多的网络负载,以至于其他数据包无法通过,但也许我只是遗漏了一些东西。在这里我不得不说我不是Java专家,所以不要太责怪我没有看到明显的东西:)

有人有什么想法吗?

谢谢!

【问题讨论】:

  • 定义“相当大的延迟”,并说明您认为这会造成“如此多的网络负载”的原因。我想说你在这里创建了太多的对象,尤其是太多的ByteBuffers。尝试在频道的整个生命周期中为每个频道使用一个,也许两个,一个用于阅读,一个用于写作。
  • 当我按“SPACE”时,大约 2/3 秒后,我看到一条关于在服务器端接受包的消息。我会尽量减少缓冲区对象。
  • 我们能看到PacketReaderWriter吗?
  • 这是我问题中的最后一段代码。
  • 因为没有工作代码(没有设置我们的小测试)来重现您的问题。这是一个猜测:这个int packetSize = packetSizeReadBuffer.getInt(); 可能是一个问题,当你在这个地方bytesRead = channel.read(packetSizeReadBuffer); 你读不到四个字节时。

标签: java sockets nio sfml


【解决方案1】:

我决定看看 Github 存储库。

你的 Server.run() 看起来像这样。

    public void run() {
    while (isRunning) {
        try {
            handleIncomingConnections();
            handleIncomingPackets();
        } catch (IOException e) {
            e.printStackTrace();
        }

        try {
            // Sleep to prevent server from consuming 100% CPU
            sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

sleep(100) 将导致每秒大约 10 次对 handleIncomingPackets() 的调用。反过来,handleIncomingPackets() 将选择一个客户端通道并在单个接收到的数据包上调用 handleIncomingPacket()。如果我理解正确的话,服务器总共可以为每个客户端处理 10 个数据包/秒。

另一方面,客户端尝试每秒发送 20 个 PacketType.INTERVAL_UPDATE 类型的数据包。客户端必须每秒发送更少的数据包,或者服务器需要能够每秒处理更多的数据包。

当前的 sleep(100) 意味着在服务器响应单个数据包之前总会有大约 100 毫秒的延迟,即使在非过载情况下也是如此。这可能没问题,但如果您确保您确实读取了频道上所有可用的数据包,而不是每次只读取一个数据包。

总而言之:为了缩短响应时间,您必须做的最小更改就是减少 sleep() 时间。 10毫秒就好了。但我也建议尝试检查每次迭代中是否有多个可用的数据包。

更新: 在您链接的 c++ 文件中,我的预感是每次迭代它读取的数据包不止一个。

<snip>
while (peer->socket.receive(packet) == sf::Socket::Done)
        {
            // Interpret packet and react to it
            handleIncomingPacket(packet, *peer, detectedTimeout);
</snip>

while 循环将读取 所有 个可用的数据包。与您的 Java 版本相比,您在每次服务器迭代中每个客户端读取一个数据包。

if (packet != null) {
    // Interpret packet and react to it
    handleIncomingPacket(packet, channel);
}

您需要确保您还读取了 Java 版本的所有可用数据包。

如果您只是想说服自己客户端代码发送的数据包比服务器代码可以处理的多,可以通过将 sleep() 临时设置为 10 毫秒来快速完成。

【讨论】:

  • 感谢您的回答,但我不确定问题出在哪里,因为我的代码只是原始 C++ 代码的直接移植,您可以找到 here。而且效果很好:)
  • 扫描 C++ 文件后更新答案。
  • 谢谢,我去看看。
猜你喜欢
  • 2013-05-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-28
  • 1970-01-01
  • 2012-03-20
  • 1970-01-01
相关资源
最近更新 更多