【问题标题】:parallel file downloads using nio without creating threads per file download使用 nio 进行并行文件下载,无需为每个文件下载创建线程
【发布时间】:2015-05-20 15:08:40
【问题描述】:
I have tried a program which download files parallely using java.nio by creating a thread per file download.

    package com.java.tftp.nio;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/**
 * This class is used to download files concurrently from tftp server by
 * configuring the filenames, no of files.
 * 
 * @author SHRIRAM
 * 
 */
public class TFTP_NIO_Client {

    /**
     * destination folder
     * */
    private String destinationFolder;

    /**
     * list of files names to download
     * */
    private List<String> fileNames;
    /**
     * integer indicates the number of files to download concurrently
     * */
    private int noOfFilesToDownload;

    public TFTP_NIO_Client(List<String> fileNames, String destinationFolder,
            int noOfFilesToDownload) {
        this.destinationFolder = destinationFolder;
        this.fileNames = fileNames;
        this.noOfFilesToDownload = noOfFilesToDownload;
        initializeHandlers();
    }

    /**
     * This method creates threads to register the channel to process download
     * files concurrently.
     * 
     * @param noOfFilesToDownload
     *            - no of files to download
     */
    private void initializeHandlers() {
        for (int i = 0; i < noOfFilesToDownload; i++) {
            try {
                Selector aSelector = Selector.open();
                SelectorHandler theSelectionHandler = new SelectorHandler(
                        aSelector, fileNames.get(i));
                theSelectionHandler.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * Setup RRQ/WRQ packet Packet : | Opcode | FileName | 0 | mode | 0 |
     * Filename -> Filename in array of bytes. 0 -> indicates end of file mode
     * -> string in byte array 'netascii' or 'octet'
     * 
     * @param aOpcode
     * @param aMode
     * @param aFileName
     * @throws IOException
     */
    private void sendRequest(int aOpcode, int aMode, String aFileName,
            DatagramChannel aChannel, InetSocketAddress aAddress)
            throws IOException {
        // Read request packet
        TFTPReadRequestPacket theRequestPacket = new TFTPReadRequestPacket();
        aChannel.send(
                theRequestPacket.constructReadRequestPacket(aFileName, aMode),
                aAddress);
    }

    /**
     * sends TFTP ACK Packet Packet : | opcode | Block# | opcode -> 4 -> 2 bytes
     * Block -> block number -> 2bytes
     * 
     * @param aBlock
     */
    private ByteBuffer sendAckPacket(int aBlockNumber) {
        // acknowledge packet
        TFTPAckPacket theAckPacket = new TFTPAckPacket();
        return theAckPacket.getTFTPAckPacket(aBlockNumber);
    }

    /**
     * This class is used to handle concurrent downloads from the server.
     * 
     * */
    public class SelectorHandler extends Thread {
        private Selector selector;
        private String fileName;

        /**
         * flag to indicate the file completion.
         * */
        private boolean isFileReadFinished = false;

        public SelectorHandler(Selector aSelector, String aFileName)
                throws IOException {
            this.selector = aSelector;
            this.fileName = aFileName;
            registerChannel();
        }

        private void registerChannel() throws IOException {
            DatagramChannel theChannel = DatagramChannel.open();
            theChannel.configureBlocking(false);
            selector.wakeup();
            theChannel.register(selector, SelectionKey.OP_READ);
            sendRequest(Constants.OP_READ, Constants.ASCII_MODE, fileName,
                    theChannel, new InetSocketAddress(Constants.HOST,
                            Constants.TFTP_PORT));
        }

        @Override
        public void run() {
            process();
        }

        private void process() {
            System.out.println("Download started for " + fileName + "  ");
            File theFile = new File(destinationFolder
                    + fileName.substring(fileName.lastIndexOf("/")));
            FileOutputStream theFout = null;
            try {
                theFout = new FileOutputStream(theFile);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }

            while (!isFileReadFinished) {
                try {
                    if (selector.select() == 0) {
                        try {
                            // sleep 2sec was introduced because selector is
                            // thread safe but keys are not thread safe
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            continue;
                        }
                        continue;
                    }
                    Set<SelectionKey> theSet = selector.selectedKeys();
                    Iterator<SelectionKey> theSelectedKeys = theSet.iterator();
                    synchronized (theSelectedKeys) {
                        while (theSelectedKeys.hasNext()) {
                            SelectionKey theKey = theSelectedKeys.next();
                            theSelectedKeys.remove();
                            if (theKey.isReadable()) {
                                isFileReadFinished = read(theKey, theFout,
                                        fileName);
                                if (!isFileReadFinished) {
                                    theKey.interestOps(SelectionKey.OP_READ);
                                }
                            } else if (theKey.isWritable()) {
                                // there is no implementation for file write to
                                // server.
                                theKey.interestOps(SelectionKey.OP_READ);
                            }
                        }
                    }
                } catch (IOException ie) {
                    ie.printStackTrace();
                }
            }
            System.out.println("Download finished for " + fileName);
            try {
                if (selector.isOpen()) {
                    selector.close();
                }
                if (theFout != null) {
                    theFout.close();
                }
            } catch (IOException ie) {

            }
        }
    }

    /**
     * @param aKey
     *            registered key for the selector
     * @param aOutStream
     *            - file output stream to write the file contents.
     * @return boolean
     * @throws IOException
     */
    private boolean read(SelectionKey aKey, OutputStream aOutStream,
            String aFileName) throws IOException {
        DatagramChannel theChannel = (DatagramChannel) aKey.channel();

        // data packet
        TFTPDataPacket theDataPacket = new TFTPDataPacket();
        ByteBuffer theReceivedBuffer = theDataPacket.constructTFTPDataPacket();
        SocketAddress theSocketAddress = theChannel.receive(theReceivedBuffer);
        theReceivedBuffer.flip();
        byte[] theBuffer = theReceivedBuffer.array();
        byte[] theDataBuffer = theDataPacket.getDataBlock();
        if (theDataPacket.getOpCode() == Constants.OP_DATA) {
            int theLimit = theDataPacket.getLimit();
            // checks the limit of the buffer because a packet with data less
            // than 512 bytes of content signals that it is the last packet in
            // transmission for this particular file
            if (theLimit != Constants.MAX_BUFFER_SIZE
                    && theLimit < Constants.MAX_BUFFER_SIZE) {
                byte[] theLastBlock = new byte[theLimit];
                System.arraycopy(theBuffer, 0, theLastBlock, 0, theLimit);
                // writes the lastblock
                aOutStream.write(theLastBlock);

                // sends an acknowledgment to the server using TFTP packet
                // block number
                theChannel
                        .send(sendAckPacket((((theBuffer[2] & 0xff) << 8) | (theBuffer[3] & 0xff))),
                                theSocketAddress);
                if (theChannel.isOpen()) {
                    theChannel.close();
                }
                return true;
            } else {
                aOutStream.write(theDataBuffer);
                // sends an acknowledgment to the server using TFTP packet
                // block number
                theChannel
                        .send(sendAckPacket((((theBuffer[2] & 0xff) << 8) | (theBuffer[3] & 0xff))),
                                theSocketAddress);
                return false;
            }
        } else if (Integer.valueOf(theBuffer[1]) == Constants.OP_ERROR) {
            System.out.println("File : " + aFileName + "  not found  ");
            handleError(theReceivedBuffer);
        }
        return false;
    }

    /**
     * This method handles the error packet received from Server.
     * 
     * @param aBuffer
     */
    private void handleError(ByteBuffer aBuffer) {

        // Error packet
        new TFTPErrorPacket(aBuffer);
    }
}

    Is it possible to download multiple files in parallel using java.nio by not creating a thread per file download? If yes can anybody suggest a solution to proceed further.

【问题讨论】:

  • 是的。这是。但请展示你的作品。
  • @rmalchow 我已附上文件..

标签: java io nio java.util.concurrent


【解决方案1】:

我会提供一种方法来实现您的目标:

  1. L列出要下载的文件。
  2. 创建一个映射M,它将保存要下载的文件名和相应的选择器实例的映射。

  3. 对于 L 中的每个文件 F 从M中获取F对应的Selector SK 通过检查任何准备好的事件来处理选择器的状态。 如果处理完成,则将 F 对应的 Selector 设置为 null。这将有助于识别文件 谁的 处理完成。或者,您可以从中删除 F L;以便下次循环时只处理尚未完全下载的文件。

说了这么多,我很想知道你为什么要尝试这样的壮举?如果这个要求背后的思考过程是将线程数减少到 1,那么它是不正确的。请记住,您最终会对运行的单线程造成负担,并且可以肯定您的吞吐量不一定是最佳的,因为单线程将同时处理网络和磁盘 I/O。此外,考虑在将几个文件之一写入磁盘时遇到异常的情况 - 您最终会中止所有文件的传输;我确定你不想要的东西。

一种更好且更具可扩展性的方法是在单个线程上轮询选择器,但将任何 I/O 活动移交给工作线程。更好的方法仍然是阅读Doug Lea's paper 中介绍的技术并实施它们。事实上Netty library 已经实现了这种模式,并且在生产中被广泛使用。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-28
    • 1970-01-01
    相关资源
    最近更新 更多