【问题标题】:Pyarrow basic auth: How to prevent `Stream is closed`?Pyarrow 基本身份验证:如何防止“流已关闭”?
【发布时间】:2022-01-17 11:50:58
【问题描述】:

我是 Arrow Flight 和 pyarrow (v=6.0.1) 的新手,我正在尝试实现基本身份验证,但我总是遇到错误:

OSError: Stream is closed

我通过顺序运行以下两个文件(分别代表服务器和客户端)创建了一个最小的复制示例:

from typing import Dict, Union
from pyarrow.lib import tobytes
from pyarrow.flight import BasicAuth, FlightUnauthenticatedError, ServerAuthHandler, FlightServerBase
from pyarrow._flight import ServerAuthSender, ServerAuthReader


class ServerBasicAuthHandler(ServerAuthHandler):
    def __init__(self, creds: Dict[str, str]):
        self.creds = {user.encode(): pw.encode() for user, pw in creds.items()}

    def authenticate(self, outgoing: ServerAuthSender, incoming: ServerAuthReader):
        buf = incoming.read()  # this line raises "OSError: Stream is closed"
        auth = BasicAuth.deserialize(buf)
        if auth.username not in self.creds:
            raise FlightUnauthenticatedError("unknown user")
        if self.creds[auth.username] != auth.password:
            raise FlightUnauthenticatedError("wrong password")
        outgoing.write(tobytes(auth.username))

    def is_valid(self, token: bytes) -> Union[bytes, str]:
        if not token:
            raise FlightUnauthenticatedError("no basic auth provided")
        if token not in self.creds:
            raise FlightUnauthenticatedError("unknown user")
        return token

service = FlightServerBase(
    location=f"grpc://[::]:50051",
    auth_handler=ServerBasicAuthHandler({"user": "pw"}),
)

service.serve()
from pyarrow.flight import FlightClient

client = FlightClient(location=f"grpc://localhost:50051")
client.authenticate_basic_token("user", "pw")

我基本上是从their tests 复制了ServerAuthHandler 的实现,所以它被证明是有效的。但是,我无法让它工作。

错误消息Stream is closed 难以调试。我不知道它来自哪里,也无法追踪到 pyarrow 实现中的任何地方(无论是 Python 端还是 C++ 端)。我看不出它是从哪里来的。

任何有关如何防止此错误的帮助或提示将不胜感激。

【问题讨论】:

标签: windows stream pyarrow


【解决方案1】:

OP 中的示例混合了两种身份验证实现(这确实令人困惑)。 “BasicAuth”对象不是authenticate_basic_token 方法实现的实际HTTP 基本身份验证;这是因为贡献者多年来实施了多种身份验证方法。实际测试如下:

header_auth_server_middleware_factory = HeaderAuthServerMiddlewareFactory()
no_op_auth_handler = NoopAuthHandler()


def test_authenticate_basic_token():
    """Test authenticate_basic_token with bearer token and auth headers."""
    with HeaderAuthFlightServer(auth_handler=no_op_auth_handler, middleware={
        "auth": HeaderAuthServerMiddlewareFactory()
    }) as server:
        client = FlightClient(('localhost', server.port))
        token_pair = client.authenticate_basic_token(b'test', b'password')
        assert token_pair[0] == b'authorization'
        assert token_pair[1] == b'Bearer token1234'

即我们没有使用authenticate,而是使用“中间件”来实现。一个完整的例子如下:

import base64
import pyarrow.flight as flight

class BasicAuthServerMiddlewareFactory(flight.ServerMiddlewareFactory):
    def __init__(self, creds):
        self.creds = creds

    def start_call(self, info, headers):
        token = None
        for header in headers:
            if header.lower() == "authorization":
                token = headers[header]
                break

        if not token:
            raise flight.FlightUnauthenticatedError("No credentials supplied")

        values = token[0].split(' ', 1)
        if values[0] == 'Basic':
            decoded = base64.b64decode(values[1])
            pair = decoded.decode("utf-8").split(':')
            if pair[0] not in self.creds:
                raise flight.FlightUnauthenticatedError("No credentials supplied")
            if pair[1] != self.creds[pair[0]]:
                raise flight.FlightUnauthenticatedError("No credentials supplied")
            return BasicAuthServerMiddleware("BearerTokenValue")

        raise flight.FlightUnauthenticatedError("No credentials supplied")


class BasicAuthServerMiddleware(flight.ServerMiddleware):
    def __init__(self, token):
        self.token = token

    def sending_headers(self):
        return {'authorization': f'Bearer {self.token}'}


class NoOpAuthHandler(flight.ServerAuthHandler):
    def authenticate(self, outgoing, incoming):
        pass

    def is_valid(self, token):
        return ""


with flight.FlightServerBase(auth_handler=NoOpAuthHandler(), middleware={
    "basic": BasicAuthServerMiddlewareFactory({"test": "password"})
}) as server:
    client = flight.connect(('localhost', server.port))
    token_pair = client.authenticate_basic_token(b'test', b'password')
    print(token_pair)
    assert token_pair[0] == b'authorization'
    assert token_pair[1] == b'Bearer BearerTokenValue'

【讨论】:

    【解决方案2】:

    我认为这仅仅是因为 Windows 不支持此功能。

    仔细检查后,“证明它有效”的测试在 Windows 中被跳过。评论指的是this issue。不过(表面上)该问题已得到解决;没有任何关于为什么它不能与 Stream is closed 一起使用。

    【讨论】:

      猜你喜欢
      • 2013-01-12
      • 2017-11-29
      • 1970-01-01
      • 2016-02-08
      • 2018-07-12
      • 2016-04-28
      • 1970-01-01
      • 2016-10-17
      • 1970-01-01
      相关资源
      最近更新 更多