【问题标题】:Fast IPC/Socket communication in Java/PythonJava/Python 中的快速 IPC/Socket 通信
【发布时间】:2012-03-04 06:31:53
【问题描述】:

两个进程(Java 和 Python)需要在我的应用程序中进行通信。我注意到套接字通信占用了 93% 的运行时间。为什么通讯这么慢?我应该寻找套接字通信的替代方案还是可以更快?

更新:我发现了一个简单的修复方法。由于某种未知原因,缓冲的输出流似乎没有真正缓冲。因此,我现在将所有数据放入客户端/服务器进程中的字符串缓冲区中。我在flush方法中将它写入套接字。

我仍然对使用共享内存在进程之间快速交换数据的示例感兴趣。

一些附加信息:

  1. 应用程序中的消息大小大部分时间都在 64kb 以下。
  2. 服务器用Java编写,客户端用Python编写。
  3. Socket IPC 实现如下:发送 200 个字节需要 50 个周期!这一定是太高了。如果我在 5000 个周期内发送 2 个字节,则需要的时间会少很多。
  4. 两个进程都在一台 Linux 机器上运行。
  5. 在实际应用程序中,每个周期对客户端的 iFid.write() 进行大约 10 次调用。
  6. 这是在 Linux 系统上完成的。

这是服务器端:

public class FastIPC{
    public PrintWriter out;
    BufferedReader in;
    Socket socket = null;
    ServerSocket serverSocket = null;


    public FastIPC(int port) throws Exception{
        serverSocket = new ServerSocket(port);
        socket = serverSocket.accept();
        out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
        in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    }

    public void send(String msg){
        out.println(msg); // send price update to socket
    }

    public void flush(){
        out.flush();
    }

    public String recv() throws Exception{
        return in.readLine();
    }

    public static void main(String[] args){
        int port = 32000;
        try{
            FastIPC fip = new FastIPC(port);
            long start = new Date().getTime();
            System.out.println("Connected.");
            for (int i=0; i<50; i++){
                for(int j=0; j<100; j++)
                    fip.send("+");
                fip.send(".");
                fip.flush();
                String msg = fip.recv();
            }
            long stop = new Date().getTime();
            System.out.println((double)(stop - start)/1000.);
        }catch(Exception e){
            System.exit(1);
        }
    }
}

而客户端是:

import sys
import socket

class IPC(object):
    def __init__(self):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.s.connect(("localhost", 32000))
        self.fid = self.s.makefile() # file wrapper to read lines
        self.listenLoop() # wait listening for updates from server

    def listenLoop(self):
        fid = self.fid
        print "connected"
        while True:
            while True:
                line = fid.readline()
                if line[0]=='.':
                    break
            fid.write('.\n')
            fid.flush()

if __name__ == '__main__':
    st = IPC()

【问题讨论】:

  • Linux ...修改了问题。
  • 你需要缓存数据。一个接一个(或以其他小块)发送字节是完全无效的,无论它是否是本地计算机。
  • out 是这里的缓冲流。
  • ZeroMQ 非常适合。

标签: java python sockets ipc


【解决方案1】:

您有多种选择。由于您使用的是 Linux,因此您可以使用 UNIX 域套接字。或者,您可以将数据序列化为 ASCII 或 JSon 或其他格式,并通过管道、SHM(共享内存段)、消息队列、DBUS 或类似方式提供数据。值得考虑您拥有什么样的数据,因为这些 IPC 机制具有不同的性能特征。有一个draft USENIX paper 对各种权衡进行了很好的分析,值得一读。

既然您说(在此答案的 cmets 中)您更喜欢使用 SHM,这里有一些代码示例可以帮助您入门。使用 Python posix_ipc 库:

import posix_ipc # POSIX-specific IPC
import mmap      # From Python stdlib

class SharedMemory(object):
    """Python interface to shared memory. 
    The create argument tells the object to create a new SHM object,
    rather than attaching to an existing one.
    """

    def __init__(self, name, size=posix_ipc.PAGE_SIZE, create=True):
        self.name = name
        self.size = size
        if create:
            memory = posix_ipc.SharedMemory(self.name, posix_ipc.O_CREX,
                                            size=self.size)
        else:
            memory = posix_ipc.SharedMemory(self.name)
        self.mapfile = mmap.mmap(memory.fd, memory.size)
        os.close(memory.fd)
        return

    def put(self, item):
        """Put item in shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        pickle.dump(item, self.mapfile, protocol=2)
        return

    def get(self):
        """Get a Python object from shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        return pickle.load(self.mapfile)

    def __del__(self):
        try:
            self.mapfile.close()
            memory = posix_ipc.SharedMemory(self.name)
            memory.unlink()
        except:
            pass
        return    

对于 Java 端,您希望创建相同的类,尽管我在 cmets 中所说的 JTux 似乎提供了等效的功能,并且您需要的 API 在 UPosixIPC 类中。

下面的代码是你需要实现的东西的大纲。但是,缺少一些东西——异常处理是显而易见的,还有一些标志(在UConstant 中找到它们),您需要添加一个信号量来保护put / get 方法。但是,这应该使您走上正确的轨道。请记住,mmap 或内存映射文件是 RAM 段的类似文件的接口。因此,您可以像使用普通文件的fd 一样使用它的文件描述符。

import jtux.*;

class SHM {

    private String name;
    private int size;
    private long semaphore;
    private long mapfile; // File descriptor for mmap file

    /* Lookup flags and perms in your system docs */
    public SHM(String name, int size, boolean create, int flags, int perms) {
        this.name = name;
        this.size = size;
        int shm;
        if (create) {
            flags = flags | UConstant.O_CREAT;
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        } else {
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        }
        this.mapfile = UPosixIPC.mmap(..., this.size, ..., flags, shm, 0);
        return;
    }


    public void put(String item) {
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        UFile.write(item.getBytes(), this.mapfile);
        return;
    }


    public String get() {    
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        byte[] buffer = new byte[this.size];
        UFile.read(this.mapfile, buffer, buffer.length);
        return new String(buffer);
    }


    public void finalize() {
        UPosix.shm_unlink(this.name);
        UPosix.munmap(this.mapfile, this.size);
    }

}

【讨论】:

  • 共享内存似乎最快。但是,如何在工作程序中使用这些想法?
  • 以某种 Java 和 Python 都可以读取的格式(ASCII、XML、ctypes,任何对您来说最简单的格式)读取/写入 SHM 段的数据。对于 Python 部分,您可以使用这个库:semanchuk.com/philip/posix_ipc 对于 Java 有这个:java.sun.com/docs/hotspot/ism.htmlHTH
  • ASCII 足够简单。如果你很好理解这些,你能写一个简单的例子,以便我可以将性能与我的问题中的套接字实现进行比较?
  • 另外,我认为 ISM 仅适用于 Solaris ...我正在使用 Linux。
  • 对我来说很难的两件事是使一个进程快速意识到另一个进程已经完成写入的机制。其次,您能否也给我一个 Java 方面的工作示例?我已经能够找到很多关于 python 共享内存的信息,但是在 Java 上,这个概念似乎不太受支持。
【解决方案2】:

一些想法

  • 服务器用Java编写,客户端用Python编写。

一个奇怪的组合,但有什么理由不能通过标准输入、标准输出调用另一个发送?

  • Socket IPC 实现如下:发送 200 个字节需要 50 个周期!这一定是太高了。如果我在 5000 个周期内发送 2 个字节,则需要的时间会少很多。

对操作系统的任何调用都会相对较慢(延迟方面)。使用共享内存可以绕过内核。如果吞吐量是您的问题,我发现如果延迟对您来说不是这样的问题,您可以使用套接字达到 1-2 GB/s。

  • 两个进程都在一台 Linux 机器上运行。

使共享内存变得理想。

  • 在实际应用程序中,每个周期对客户端的 iFid.write() 进行大约 10 次调用。

不知道为什么会这样。为什么不构建单个结构/缓冲区并编写一次。我会使用直接缓冲区是 NIO 来最小化延迟。使用字符翻译非常昂贵,尤其是如果您只需要 ASCII。

  • 这是在 Linux 系统上完成的。

应该很容易优化。

我通过内存映射文件使用共享内存。这是因为我需要记录每条消息以进行审核。对于数百万条消息,我的平均往返延迟约为 180 ns,而在实际应用中约为 490 ns。

这种方法的一个优点是,如果有短暂的延迟,读者可以很快赶上作者。它还支持轻松重新启动和复制。

这只是在Java中实现,但原理很简单,我相信它也可以在python中工作。

https://github.com/peter-lawrey/Java-Chronicle

【讨论】:

  • 你能给我一个简单的服务器/客户端示例吗?就像我在问题中提供的那样?
  • 仅适用于 Java(请参阅我的链接)我不太了解 python。
猜你喜欢
  • 1970-01-01
  • 2010-11-12
  • 1970-01-01
  • 2011-03-22
  • 1970-01-01
  • 2017-07-02
  • 2015-05-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多