【问题标题】:Why can't streaming connect to the Java socket client?为什么无法流式连接到 Java 套接字客户端?
【发布时间】:2014-06-01 10:45:59
【问题描述】:

我正在研究 Spark Streaming 处理实时数据,我构建了 Spark Streaming 的示例 wordCount,之后可以运行示例:

/bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999

如果我在另一个终端运行nc -L -p 9999,那么我可以在这个终端输入字母,示例可以接收字母并给出正确的结果。

但是我开发了一个 Java 套接字客户端来将内容发送到 9999 端口 - 为什么示例无法接收它?我认为该示例只是监视 9999 端口,并从该端口接收任何内容。

这是Java代码:

    File file = new File("D:\\OutputJson.dat");
    long l = file.length();
    socket = new Socket();
    boolean connected = false;
    while (!connected) {
        //not stop until send successful
        try {
            socket.connect(new InetSocketAddress("localhost", 9999));
            connected = true;
            System.out.println("connected success!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("connected failed!");
            Thread.sleep(5000);
        }
    }
    dos = new DataOutputStream(socket.getOutputStream());
    fis = new FileInputStream(file);
    sendBytes = new byte[1024];
    while ((length = fis.read(sendBytes, 0, sendBytes.length)) > 0) {
        sumL += length;
        System.out.println("sent:" + ((sumL / l) * 100) + "%");
        dos.write(sendBytes, 0, length);
        dos.flush();
    }
    if (sumL == l) {
        bool = true;
    }

此 Java 函数总是返回以下错误:

java.net.SocketException: Socket closed

我开发了另一个 Java 类来从这个发送套接字接收数据,它工作正常,为什么不能用这个接收?

【问题讨论】:

  • JavaNetworkWordCount 假定记录由 \n 分隔。如果 OutputJson.data 中的数据没有被 \n 分隔,那么 spark 流接收器将找不到记录的结尾,因此无法正确接收任何内容。

标签: apache-spark


【解决方案1】:

从记忆中我想我使用了一个 ServerSocket。代码类似于:

public void sendMsg(String msg) throws IOException {
    ServerSocket serverSocket = null;
    Socket clientSocket = null;
    try {
        serverSocket = new ServerSocket(port);
        clientSocket = serverSocket.accept();
        PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
        out.write(msg);
        out.flush();
        out.close();
    } finally {
        try {
            clientSocket.close();
            serverSocket.close();
        } finally {
            clientSocket = null;
            serverSocket = null;
        }
    }
}

【讨论】:

  • 问题不能这样解决,因为spark不能从executors写入
猜你喜欢
  • 1970-01-01
  • 2013-02-14
  • 1970-01-01
  • 1970-01-01
  • 2019-10-05
  • 1970-01-01
  • 2013-12-05
  • 2014-05-21
  • 1970-01-01
相关资源
最近更新 更多