【问题标题】:Running two threads at once, and subsequently communicating between them一次运行两个线程,然后在它们之间进行通信
【发布时间】:2013-11-20 10:01:55
【问题描述】:

我正在尝试使用在同一台机器上进行通信的发送方和接收方来实现停止并等待 ARQ。我的问题是让两个线程同时运行,然后在两个线程之间进行通信(可能使用Thread.notify())。目前,当我在 2 个类中运行两个单独的主要方法时,我的代码可以工作,而没有实现流控制协议。但是,当我尝试从单独的 main 方法运行代码时,我只能先运行 Receiver 线程或 Sender 线程,这两种情况都会导致代码无限期地等待。我一般都是线程新手,所以非常感谢任何帮助!

发送者类:

import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;

import java.io.File;
import java.io.FileInputStream;
import tcdIO.*;

/**
 *
 * Sending side of a communication channel.
 *
 * The start method splits an image into a number of packets and sends them to a given receiver.
 * The main method acts as test for the class by filling the destination host and port number and the source port number.
 *
 */
public class Sender implements Runnable{
    static final int DEFAULT_SRC_PORT = 50000;
    static final int DEFAULT_DST_PORT = 50001;
    static final String DEFAULT_DST_HOST = "localhost";

    static final String FILENAME = "input.jpg";

    static final int MTU = 1500;

    static Terminal terminal;

    DatagramSocket socket;
    InetSocketAddress dstAddress;

    /**
     * Constructor
     * 
     */
    Sender() {
        this(DEFAULT_DST_HOST, DEFAULT_DST_PORT, DEFAULT_SRC_PORT);
    }


    /**
     * Constructor
     *   
     * Attempts to create socket at given port and create an InetSocketAddress for the destinations
     */
    Sender(String dstHost, int dstPort, int srcPort) {
        try {
            dstAddress= new InetSocketAddress(dstHost, dstPort);
            socket= new DatagramSocket(srcPort);
        }
        catch(java.lang.Exception e) {
            e.printStackTrace();
        }
    }

    synchronized void sleep() {
        try {this.wait(100);}catch(Exception e){e.printStackTrace();}
    }


    /**
     * Sender Method
     * 
     * Transmits a given image as a collection of packets; the first packet contains the size of the image as string.
     */
    public void run() {
        byte[] data= null;
        DatagramPacket packet= null;

        File file= null;
        FileInputStream fin= null;
        byte[] buffer= null;
        int size;
        int counter;

        try {   
            file= new File(FILENAME);               // Reserve buffer for length of file and read file
            buffer= new byte[(int) file.length()];
            fin= new FileInputStream(file);
            size= fin.read(buffer);
            if (size==-1) throw new Exception("Problem with File Access");
            terminal.println("File size: " + buffer.length + ", read: " + size);

            data= (Integer.toString(size)).getBytes();  // 1st packet contains the length only
            packet= new DatagramPacket(data, data.length, dstAddress);
            terminal.println("Please press any key");
            terminal.readChar();
            socket.send(packet);            

            counter= 0;
            do {
                data= new byte[(counter+MTU<size) ? MTU : size-counter];  // The length of the packet is either MTU or a remainder
                java.lang.System.arraycopy(buffer, counter, data, 0, data.length);
                terminal.println("Counter: " + counter + " - Payload size: " + data.length);

                packet= new DatagramPacket(data, data.length, dstAddress);
                socket.send(packet);
                this.sleep();   
                counter+= data.length;
            } while (counter<size);

        terminal.println("Send complete");
    }
    catch(java.lang.Exception e) {
        e.printStackTrace();
    }       
}



public static void main(String[] args) {
    Sender s;
    try {           
        String dstHost;
        int dstPort;
        int srcPort;

        //dstHost= args[0];
        //dstPort= Integer.parseInt(args[1]);
        //srcPort= Integer.parseInt(args[2]);
        dstHost= DEFAULT_DST_HOST;
        dstPort= DEFAULT_DST_PORT;
        srcPort= DEFAULT_SRC_PORT;

        terminal= new Terminal("Sender");

        s= new Sender(dstHost, dstPort, srcPort);
        s.run();

        terminal.println("Program completed");
    } catch(java.lang.Exception e) {
        e.printStackTrace();
    }
}


}

接收器类:

import java.io.File;
import java.io.FileOutputStream;
import java.net.DatagramSocket;
import java.net.DatagramPacket;

import tcdIO.*;

/**
 * Receiving side of a communication channel.
 *
 * The class provides the basic functionality to receive a datagram from a sender.
 * The main method acts as test for the class by filling the port number at which to receive the datagram.
 */
public class Receiver implements Runnable{
    static final String FILENAME = "output.jpg";
    static final int DEFAULT_PORT = 50001;
    static final int MTU = 1500;
    static Terminal terminal;

    DatagramSocket socket;

    /**
     * Constructor
     * 
     */
    Receiver() {
        this(DEFAULT_PORT);
    }


    /**
     * Constructor
     *   
     * Attempts to create socket at given port
     */
    Receiver(int port) {
        try {
            socket= new DatagramSocket(port);
        }
        catch(java.lang.Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * Receiver Method
     * 
     * Attempts to receive a number of packets that contain an image; the first packet contains the size of the image
     */
    public void run() {
        byte[] data;
        byte[] buffer;
        DatagramPacket packet;
        int counter;
        int size;

        File file;
        FileOutputStream fout;

        try {
            data= new byte[MTU];  // receive first packet with size of image as payload
            packet= new DatagramPacket(data, data.length);
            terminal.println("Waiting for incoming packets");
            socket.receive(packet);         

            data= packet.getData();   // reserve buffer to receive image
            size= (Integer.valueOf(new String(data, 0, packet.getLength()))).intValue();
            terminal.println("Filesize:" + size);
            buffer= new byte[size];

            counter= 0;         
            while(counter<size) {  // receive packet and store payload in array
                data= new byte[MTU];
                packet= new DatagramPacket(data, data.length);
                socket.receive(packet);
                terminal.println("Received packet - Port: " + packet.getPort() + " - Counter: " + counter + " - Payload: "+packet.getLength()); 

                System.arraycopy(data, 0, buffer, counter, packet.getLength());
                counter+= packet.getLength();
            }

            file= new File(FILENAME);               // Create file and write buffer into file
            fout= new FileOutputStream(file);
            fout.write(buffer, 0, buffer.length);
            fout.flush();
            fout.close();
        }
        catch(java.lang.Exception e) {
            e.printStackTrace();
        }       
    }


    /**
     * Test method
     * 
     * Creates an instance of the class Receiver
     * 
     * @param args arg[0] Port number to receive information on
     * /
    public static void main(String[] args) {
        Receiver r;

        try {
            terminal= new Terminal("Receiver");
            int port;

            //port= Integer.parseInt(args[0]);
            port= DEFAULT_PORT;
            r= new Receiver(port);  
            r.run();

            terminal.println("Program completed");
        } catch(java.lang.Exception e) {
            e.printStackTrace();
        }
    }
    */
}

还有 main,它只是实例化并运行它们:

import tcdIO.Terminal;


public class FlowControlMain {

    /**
     * 
     *
     */
    public static void main(String[] args) {

        Sender s;
        Receiver r;
        try{
            String dstHost= "localhost";
            int dstPort= 50001;
            int srcPort= 50000;

            Sender.terminal= new Terminal("Sender");
            Receiver.terminal = new Terminal("Receiver");

            s= new Sender(dstHost, dstPort, srcPort);
            r = new Receiver(dstPort);
            s.run();
            r.run();

        }catch(Exception e){
            e.printStackTrace();

        }

    }


}

抱歉,代码量太大,只是想给出一个完整的画面

【问题讨论】:

    标签: java multithreading flow-control


    【解决方案1】:

    您没有使用线程,而是在主线程中执行run() 方法。

    在自己的Thread 中启动Runnable 的正确方法是

    Thread t = new Thread(myRunnable);
    t.start();
    

    或者使用ExecutorService,它的级别更高一些,允许线程池等操作。

    【讨论】:

    • 非常感谢!就他们之间的通信而言, notify() 会起作用吗?
    • 取决于你所说的沟通。等待/通知机制允许您控制线程,使一个(或多个线程)等待另一个线程完成某些任务,然后再继续。
    • 这正是我需要发生的事情,我需要发送者线程等待接收者确认收到数据包,然后再发送下一个
    • 我假设它与“同步”语句有关,但我不确定如何实现
    • 好吧,wait() 使线程等待来自另一个线程的 notify(),并且是同步的(需要从同步上下文中调用 wait 和 notify)。有很多关于它的教程。
    猜你喜欢
    • 1970-01-01
    • 2020-05-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多