【问题标题】:python read file non blocking on windowspython在Windows上读取文件非阻塞
【发布时间】:2016-07-09 21:22:34
【问题描述】:

我在 Windows (Win7) 上有一个程序,它每 x 秒写入一个 txt 文件。 现在我有一个 python 脚本,它每 x 秒读取一次这个 txt 文件。 当 python 脚本读取文件并且同时另一个程序想要写入该文件时 - 写入程序崩溃(并显示权限错误)。由于我无法修改程序写入txt文件的方式,所以我必须尝试在不阻塞写入程序的情况下打开txt文件。 有人知道我在这种情况下可以做什么(不阻塞阅读) 对于这个主题的每一个提示,我都会非常高兴!

尝试读取文件的程序代码如下所示:

    with codecs.open(datapath, "r", 'utf-16') as raw_data:

         raw_data_x = raw_data.readlines()

我必须用“编解码器”打开文件,因为它是 unicode。

【问题讨论】:

  • 感谢您的快速回答!我已经看过那个帖子了。我认为它不适用于 win 系统。
  • 查看 win32file。它允许更多特定于 Windows 的标志。
  • 有几种方法可以做到这一点,包括 OVERLAPPED IO(参见 Win32 API CreateFile)和“IO Completion Ports”stackoverflow.com/questions/1176477/…。您将需要一个 Win32 扩展模块(我在 C++ 中完成了此操作,但在 Python 中没有完成)。这是不平凡的。
  • 在 Windows 上写入由另一个进程打开的文件的能力似乎与如何使用异步 I/O(如 iocp)是一个单独的问题(看看 multiprocessingasyncio 如何使用它)。

标签: python windows windows-7


【解决方案1】:

经过很长时间,我设法在 ctypes 中为您创建了一个函数。请记住,这仅在进程未获得“独占”访问权限时才有效。如果是这样,那么您就不走运了,需要使用像 here 所示或实施 here 这样的卷影复制服务。
无论如何,给你:

import ctypes
from ctypes import wintypes
import os
import msvcrt

GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000

OPEN_EXISTING = 3
OPEN_ALWAYS = 4

ACCESS_MODES = {
    "r": GENERIC_READ,
    "w": GENERIC_WRITE,
    "r+": (GENERIC_READ|GENERIC_WRITE)
}

OPEN_MODES = {
    "r": OPEN_EXISTING,
    "w": OPEN_ALWAYS,
    "r+": OPEN_ALWAYS,
}


def open_file_nonblocking(filename, access):
    # Removes the b for binary access.
    internal_access = access.replace("b", "")
    access_mode = ACCESS_MODES[internal_access]
    open_mode = OPEN_MODES[internal_access]
    handle = wintypes.HANDLE(ctypes.windll.kernel32.CreateFileW(
        wintypes.LPWSTR(filename),
        wintypes.DWORD(access_mode),
        wintypes.DWORD(2|1),  # File share read and write
        ctypes.c_void_p(0),
        wintypes.DWORD(open_mode),
        wintypes.DWORD(0),
        wintypes.HANDLE(0)
    ))

    try:
        fd = msvcrt.open_osfhandle(handle.value, 0)
    except OverflowError as exc:
        # Python 3.X
        raise OSError("Failed to open file.") from None
        # Python 2
        # raise OSError("Failed to open file.")

    return os.fdopen(fd, access)

该函数在共享读写句柄的同时打开文件,允许多次访问。然后它将句柄转换为普通的 python 文件对象。
完成后请务必关闭文件。

【讨论】:

    【解决方案2】:

    最近我不得不在具有跨平台兼容性的 python 中对 stdin、stdout 进行 I/O 读取。
    对于 linux
    对于 linux,我们可以使用 select 模块。它是 posix select 函数的包装器实现。它允许您传递多个文件描述符,等待它们准备好。一旦他们准备好,您将收到通知并可以执行read/write 操作。这里有一些小代码可以让你有个想法
    这里nodejs是一个带有nodejs镜像的docker环境

      stdin_buf = BytesIO(json.dumps(fn) + "\n")
      stdout_buf = BytesIO()
      stderr_buf = BytesIO()
    
      rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
      wselect = [nodejs.stdin]  # type: List[BytesIO]
      while (len(wselect) + len(rselect)) > 0:
                rready, wready, _ = select.select(rselect, wselect, [])
                try:
                    if nodejs.stdin in wready:
                        b = stdin_buf.read(select.PIPE_BUF)
                        if b:
                            os.write(nodejs.stdin.fileno(), b)
                        else:
                            wselect = []
                    for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
                        if pipes[0] in rready:
                            b = os.read(pipes[0].fileno(), select.PIPE_BUF)
                            if b:
                                pipes[1].write(b)
                            else:
                                rselect.remove(pipes[0])
                    if stdout_buf.getvalue().endswith("\n"):
                        rselect = []
                except OSError as e:
                    break  
    

    适用于窗户 此代码示例具有涉及 stdin、stdout 的读取和写入操作。 现在此代码不适用于 Windows 操作系统,因为在 Windows 上选择实现不允许标准输入、标准输出作为参数传递。
    文档说:

    Windows 上的文件对象是不可接受的,但套接字是可接受的。在 Windows 上,底层的 select() 函数由 WinSock 库提供,并且不处理并非源自 WinSock 的文件描述符。

    首先我必须提到,有很多用于在 Windows 上读取非阻塞 I/O 的库,例如 asyncio(python 3)、gevent(for python 2.7)、msvcrt,然后是 @987654329 @'s win32event 如果您的套接字已准备好接收 read/write 数据,则会提醒您。但是他们都不允许我在stdin/stdout上读​​写,给出错误,例如
    An operation is performend on something that is not a socket
    Handles only expect integer values等。
    我还没有尝试过其他一些库,例如 twister

    现在,为了在 Windows 平台上实现上述代码中的功能,我使用了threads。这是我的代码:

        stdin_buf = BytesIO(json.dumps(fn) + "\n")
        stdout_buf = BytesIO()
        stderr_buf = BytesIO()
    
        rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
        wselect = [nodejs.stdin]  # type: List[BytesIO]
        READ_BYTES_SIZE = 512
    
        # creating queue for reading from a thread to queue
        input_queue = Queue.Queue()
        output_queue = Queue.Queue()
        error_queue = Queue.Queue()
    
        # To tell threads that output has ended and threads can safely exit
        no_more_output = threading.Lock()
        no_more_output.acquire()
        no_more_error = threading.Lock()
        no_more_error.acquire()
    
        # put constructed command to input queue which then will be passed to nodejs's stdin
        def put_input(input_queue):
            while True:
                sys.stdout.flush()
                b = stdin_buf.read(READ_BYTES_SIZE)
                if b:
                    input_queue.put(b)
                else:
                    break
    
        # get the output from nodejs's stdout and continue till otuput ends
        def get_output(output_queue):
            while not no_more_output.acquire(False):
                b=os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
                if b:
                    output_queue.put(b)
    
        # get the output from nodejs's stderr and continue till error output ends
        def get_error(error_queue):
            while not no_more_error.acquire(False):
                b = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
                if b:
                    error_queue.put(b)
    
        # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
        input_thread = threading.Thread(target=put_input, args=(input_queue,))
        input_thread.start()
        output_thread = threading.Thread(target=get_output, args=(output_queue,))
        output_thread.start()
        error_thread = threading.Thread(target=get_error, args=(error_queue,))
        error_thread.start()
    
        # mark if output/error is ready
        output_ready=False
        error_ready=False
    
        while (len(wselect) + len(rselect)) > 0:
            try:
                if nodejs.stdin in wselect:
                    if not input_queue.empty():
                        os.write(nodejs.stdin.fileno(), input_queue.get())
                    elif not input_thread.is_alive():
                        wselect = []
                if nodejs.stdout in rselect:
                    if not output_queue.empty():
                        output_ready = True
                        stdout_buf.write(output_queue.get())
                    elif output_ready:
                        rselect = []
                        no_more_output.release()
                        no_more_error.release()
                        output_thread.join()
    
                if nodejs.stderr in rselect:
                    if not error_queue.empty():
                        error_ready = True
                        stderr_buf.write(error_queue.get())
                    elif error_ready:
                        rselect = []
                        no_more_output.release()
                        no_more_error.release()
                        output_thread.join()
                        error_thread.join()
                if stdout_buf.getvalue().endswith("\n"):
                    rselect = []
                    no_more_output.release()
                    no_more_error.release()
                    output_thread.join()
            except OSError as e:
                break
    

    所以对我来说最好的选择是线程。如果您想了解更多,这篇文章将是nice read

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-18
      • 2016-04-02
      • 1970-01-01
      • 2010-12-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多