【问题标题】:Avro Communication over TCP Sockets基于 TCP 套接字的 Avro 通信
【发布时间】:2014-08-07 21:33:01
【问题描述】:

我目前正在从事一个项目,该项目将涉及用 C 和 Java 编写的应用程序的通信。因此,我选择使用 Apache Avro。我在网站上看到 Avro 可以使用 DataFileWriter 类从文件中(反)序列化对象。

但是,就我而言,我想在我的应用程序之间使用 TCP 套接字。因此,DataFileWriter 类对我不起作用。阅读文档后,我没有找到任何关于如何通过 TCP 套接字发送对象的信息。

关于如何做到这一点的任何想法?我特别想知道我应该在 Java 客户端上使用哪种输入和输出流。

我为 Java 服务器开发了以下代码:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.HashMap;

import middleman.bigpeer.BigPeer;

import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

public class MiddleManWorker implements Runnable {

    private InputStream in;

    private OutputStream out;

    private Socket clientSocket;

    public MiddleManWorker(Socket clientSocket, HashMap<Integer, NodeType> dbNodesDirectory, 
            HashMap<Integer, NodeType>  workersDirectory) {
        this.clientSocket = clientSocket;
        try {
            this.in = clientSocket.getInputStream();
            this.out = clientSocket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        EncoderFactory encoderFactory = new EncoderFactory();
        DecoderFactory decoderFactory = new DecoderFactory();
    BinaryEncoder binaryEncoder = encoderFactory.binaryEncoder(out, null);
        BinaryDecoder binaryDecoder = decoderFactory.binaryDecoder(in, null);
        SpecificDatumReader<BigPeer> peerDatumReader = new SpecificDatumReader<BigPeer>(BigPeer.class);
        BigPeer bigPeer = null;
        SpecificDatumWriter<BigPeer> writer = new SpecificDatumWriter<BigPeer>();
        try {
            peerDatumReader.read(bigPeer, binaryDecoder);
            System.out.println("Received: " + bigPeer.getType());
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            writer.write(bigPeer, binaryEncoder);
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

} 

Java 客户端示例如下:

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;

import middleman.bigpeer.BigPeer;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

public class SystemClient {

    public static void connect(String serverIPAddress, Integer serverPort) throws IOException, ClassNotFoundException {
        /**
         * Create Connection with the server
         */
        Socket socket = new Socket(serverIPAddress, serverPort);
        InputStream in = socket.getInputStream();
        OutputStream out = socket.getOutputStream();
        EncoderFactory encoderFactory = new EncoderFactory();
        DecoderFactory decoderFactory = new DecoderFactory();
        BinaryEncoder binaryEncoder = encoderFactory.binaryEncoder(out, null);
        BinaryDecoder binaryDecoder = decoderFactory.binaryDecoder(in, null);

        BigPeer bigPeer = new BigPeer();
        bigPeer.setType("test");
        SpecificDatumReader<BigPeer> reader = new SpecificDatumReader<BigPeer>(BigPeer.class);
        SpecificDatumWriter<BigPeer> writer = new SpecificDatumWriter<BigPeer>(BigPeer.class);

        System.out.println("Before: " +  bigPeer.getType());
        writer.write(bigPeer, binaryEncoder);
        System.out.println("Waiting for response...");
        reader.read(bigPeer, binaryDecoder);
        System.out.println("After: " + bigPeer.getType());
    }

}

服务器似乎在peerDatumReader.read(bigPeer, binaryDecoder); 代码行停止。有什么想法吗?

谢谢, 尼克

【问题讨论】:

    标签: java c sockets tcp avro


    【解决方案1】:

    出于性能原因,BinaryEncoder 使用内部缓冲区。您可能需要在编码器上调用 flush 以通过管道发送数据。

    有关此行为的更多信息,请参阅reference

    返回的 BinaryEncoder 实现可能会缓冲其输出。在调用 Flushable.flush() 之前,数据可能不会出现在底层 OutputStream 上。缓冲区大小由 configureBufferSize(int) 配置。

    如果不需要缓冲,并且可以接受较低的性能,请使用 directBinaryEncoder(OutputStream, BinaryEncoder)

    【讨论】:

      猜你喜欢
      • 2022-07-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-03-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-12-01
      相关资源
      最近更新 更多