【问题标题】:How to get response SSL certificate from requests in python?如何从 python 中的请求中获取响应 SSL 证书?
【发布时间】:2013-05-30 00:06:41
【问题描述】:

尝试从requests 的响应中获取 SSL 证书。

有什么好的方法可以做到这一点?

【问题讨论】:

    标签: python http https request python-requests


    【解决方案1】:

    requests 故意包装这样的低级内容。通常,您唯一想做的就是verify that the certs are valid。为此,只需传递verify=True。如果你想使用非标准的 cacert 包,你也可以通过它。例如:

    resp = requests.get('https://example.com', verify=True, cert=['/path/to/my/ca.crt'])
    

    另外,requests 主要是一组围绕其他库的包装器,主要是 urllib3 和 stdlib 的 http.client(或者,对于 2.x,httplib)和 ssl

    有时,答案只是获取较低级别的对象(例如,resp.rawurllib3.response.HTTPResponse),但在许多情况下这是不可能的。

    这就是其中一种情况。唯一能看到证书的对象是http.client.HTTPSConnection(或urllib3.connectionpool.VerifiedHTTPSConnection,但这只是前者的子类)和ssl.SSLSocket,当请求返回时,它们都不存在了。 (正如名称 connectionpool 所暗示的那样,HTTPSConnection 对象存储在池中,一旦完成就可以重用;SSLSocketHTTPSConnection 的成员。)

    因此,您需要修补一些东西,以便将数据复制到链上。可能就这么简单:

    HTTPResponse = requests.packages.urllib3.response.HTTPResponse
    orig_HTTPResponse__init__ = HTTPResponse.__init__
    def new_HTTPResponse__init__(self, *args, **kwargs):
        orig_HTTPResponse__init__(self, *args, **kwargs)
        try:
            self.peercert = self._connection.sock.getpeercert()
        except AttributeError:
            pass
    HTTPResponse.__init__ = new_HTTPResponse__init__
    
    HTTPAdapter = requests.adapters.HTTPAdapter
    orig_HTTPAdapter_build_response = HTTPAdapter.build_response
    def new_HTTPAdapter_build_response(self, request, resp):
        response = orig_HTTPAdapter_build_response(self, request, resp)
        try:
            response.peercert = resp.peercert
        except AttributeError:
            pass
        return response
    HTTPAdapter.build_response = new_HTTPAdapter_build_response
    

    这是未经测试的,所以不能保证;你可能需要修补更多。

    此外,子类化和覆盖可能比猴子补丁更干净(尤其是因为 HTTPAdapter 被设计为子类化)。

    或者,更好的是,分叉 urllib3requests,修改你的分叉,并(如果你认为这合法有用的话)向上游提交拉取请求。

    无论如何,现在,从您的代码中,您可以这样做:

    resp.peercert
    

    这将为您提供带有'subject''subjectAltName' 键的字典,由pyopenssl.WrappedSocket.getpeercert 返回。如果您想了解有关证书的更多信息,请尝试使用Christophe Vandeplas's variant of this answer,它可以让您获得OpenSSL.crypto.X509 对象。如果您想获取整个对等证书链,请参阅GoldenStake's answer

    当然,您可能还想传递验证证书所需的所有信息,但这更容易,因为它已经通过了顶层。

    【讨论】:

    • 哇,这很完整。非常感谢!我会处理它,看看它是如何进行的。再次感谢!
    • 谢谢,这真的很有帮助。不幸的是,当服务器使用Connection: close 并在数据传输后立即关闭连接时,我仍然无法获得“peercert”。我也必须修补 requests.packages.urllib3.connection.HTTPSConnection.connect 才能首先调用原始函数,然后立即获取 self.sock.getpeercert(),然后将其向上传递。
    • 现代请求从 2.24.0 开始为具有可用 ssl 的系统中断此补丁:github.com/psf/requests/pull/5443/files
    【解决方案2】:

    首先,abarnert's answer 非常完整。在追查Kalkran 提出的connection-close 问题时,我实际上发现peercert 没有包含有关SSL 证书的详细信息。

    我深入挖掘了连接和套接字信息并提取了 self.sock.connection.get_peer_certificate() 函数,其中包含以下很棒的函数:

    • get_subject() CN
    • get_notAfter()get_notBefore() 到期日期
    • get_serial_number()get_signature_algorithm() 了解加密相关技术细节
    • ...

    请注意,这些仅在您的系统上安装了pyopenssl 时可用。在后台,urllib3 使用 pyopenssl(如果可用),否则使用标准库的 ssl 模块。下面显示的self.sock.connection 属性仅在self.sockurllib3.contrib.pyopenssl.WrappedSocket 时存在,而不是在ssl.SSLSocket 时存在。您可以使用pip install pyopenssl 安装pyopenssl

    完成后,代码变为:

    import requests
    
    HTTPResponse = requests.packages.urllib3.response.HTTPResponse
    orig_HTTPResponse__init__ = HTTPResponse.__init__
    def new_HTTPResponse__init__(self, *args, **kwargs):
        orig_HTTPResponse__init__(self, *args, **kwargs)
        try:
            self.peer_certificate = self._connection.peer_certificate
        except AttributeError:
            pass
    HTTPResponse.__init__ = new_HTTPResponse__init__
    
    HTTPAdapter = requests.adapters.HTTPAdapter
    orig_HTTPAdapter_build_response = HTTPAdapter.build_response
    def new_HTTPAdapter_build_response(self, request, resp):
        response = orig_HTTPAdapter_build_response(self, request, resp)
        try:
            response.peer_certificate = resp.peer_certificate
        except AttributeError:
            pass
        return response
    HTTPAdapter.build_response = new_HTTPAdapter_build_response
    
    HTTPSConnection = requests.packages.urllib3.connection.HTTPSConnection
    orig_HTTPSConnection_connect = HTTPSConnection.connect
    def new_HTTPSConnection_connect(self):
        orig_HTTPSConnection_connect(self)
        try:
            self.peer_certificate = self.sock.connection.get_peer_certificate()
        except AttributeError:
            pass
    HTTPSConnection.connect = new_HTTPSConnection_connect
    

    您将能够轻松访问结果:

    r = requests.get('https://yourdomain.tld', timeout=0.1)
    print('Expires on: {}'.format(r.peer_certificate.get_notAfter()))
    print(dir(r.peer_certificate))
    

    如果您像我一样想忽略 SSL 证书警告,只需在文件顶部添加以下内容并且不进行 SSL 验证:

    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    r = requests.get('https://yourdomain.tld', timeout=0.1, verify=False)
    print(dir(r.peer_certificate))
    

    【讨论】:

    • 非常感谢您对答案的精彩补充! +1。
    【解决方案3】:

    感谢大家的精彩回答。

    它帮助我过度设计了这个问题的答案:

    How to add a custom CA Root certificate to the CA Store used by Python in Windows?

    2019 年 2 月 12 日更新

    请查看Cert Human: SSL Certificates for Humans,了解lifehackjim 对我的https://github.com/neozenith/get-ca-py 项目的令人印象深刻的重写。

    我现在已经存档了原始存储库。

    独立sn-p

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    """
    Get Certificates from a request and dump them.
    """
    
    import argparse
    import sys
    
    import requests
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    """
    Inspired by the answers from this Stackoverflow question:
    https://stackoverflow.com/questions/16903528/how-to-get-response-ssl-certificate-from-requests-in-python
    
    What follows is a series of patching the low level libraries in requests.
    """
    
    """
    https://stackoverflow.com/a/47931103/622276
    """
    
    sock_requests = requests.packages.urllib3.contrib.pyopenssl.WrappedSocket
    
    
    def new_getpeercertchain(self, *args, **kwargs):
        x509 = self.connection.get_peer_cert_chain()
        return x509
    
    
    sock_requests.getpeercertchain = new_getpeercertchain
    
    """
    https://stackoverflow.com/a/16904808/622276
    """
    
    HTTPResponse = requests.packages.urllib3.response.HTTPResponse
    orig_HTTPResponse__init__ = HTTPResponse.__init__
    
    
    def new_HTTPResponse__init__(self, *args, **kwargs):
        orig_HTTPResponse__init__(self, *args, **kwargs)
        try:
            self.peercertchain = self._connection.sock.getpeercertchain()
        except AttributeError:
            pass
    
    
    HTTPResponse.__init__ = new_HTTPResponse__init__
    
    HTTPAdapter = requests.adapters.HTTPAdapter
    orig_HTTPAdapter_build_response = HTTPAdapter.build_response
    
    
    def new_HTTPAdapter_build_response(self, request, resp):
        response = orig_HTTPAdapter_build_response(self, request, resp)
        try:
            response.peercertchain = resp.peercertchain
        except AttributeError:
            pass
        return response
    
    
    HTTPAdapter.build_response = new_HTTPAdapter_build_response
    
    """
    Attempt to wrap in a somewhat usable CLI
    """
    
    
    def cli(args):
        parser = argparse.ArgumentParser(description="Request any URL and dump the certificate chain")
        parser.add_argument("url", metavar="URL", type=str, nargs=1, help="Valid https URL to be handled by requests")
    
        verify_parser = parser.add_mutually_exclusive_group(required=False)
        verify_parser.add_argument("--verify", dest="verify", action="store_true", help="Explicitly set SSL verification")
        verify_parser.add_argument(
            "--no-verify", dest="verify", action="store_false", help="Explicitly disable SSL verification"
        )
        parser.set_defaults(verify=True)
    
        return vars(parser.parse_args(args))
    
    
    def dump_pem(cert, outfile="ca-chain.crt"):
        """Use the CN to dump certificate to PEM format"""
        PyOpenSSL = requests.packages.urllib3.contrib.pyopenssl
        pem_data = PyOpenSSL.OpenSSL.crypto.dump_certificate(PyOpenSSL.OpenSSL.crypto.FILETYPE_PEM, cert)
        issuer = cert.get_issuer().get_components()
    
        print(pem_data.decode("utf-8"))
    
        with open(outfile, "a") as output:
            for part in issuer:
                output.write(part[0].decode("utf-8"))
                output.write("=")
                output.write(part[1].decode("utf-8"))
                output.write(",\t")
            output.write("\n")
            output.write(pem_data.decode("utf-8"))
    
    
    if __name__ == "__main__":
        cli_args = cli(sys.argv[1:])
    
        url = cli_args["url"][0]
        req = requests.get(url, verify=cli_args["verify"])
        for cert in req.peercertchain:
            dump_pem(cert)
    

    【讨论】:

      【解决方案4】:

      这个,虽然一点也不漂亮,但很有效:

      import requests
      
      req = requests.get('https://httpbin.org')
      pool = req.connection.poolmanager.connection_from_url('https://httpbin.org')
      conn = pool.pool.get()
      # get() removes it from the pool, so put it back in
      pool.pool.put(conn)
      print(conn.sock.getpeercert())
      

      【讨论】:

      • 这仅在连接仍然存在的情况下有效,不能保证。例如,如果对方发送Connection: close。或者,如果您发送了其他请求(特别是如果您在异步或线程上下文中使用它)。或者,如果 urllib3 只是无缘无故地喜欢它。
      【解决方案5】:

      开始,abarnert's answer很完整

      但我想补充一点,如果您正在寻找对等证书链,则需要修补另一段代码

      import requests
      sock_requests = requests.packages.urllib3.contrib.pyopenssl.WrappedSocket
      def new_getpeercertchain(self,*args, **kwargs):
          x509 = self.connection.get_peer_cert_chain()
          return x509
      sock_requests.getpeercertchain = new_getpeercertchain
      

      之后,您可以以与接受的答案非常相似的方式调用它

      HTTPResponse = requests.packages.urllib3.response.HTTPResponse
      orig_HTTPResponse__init__ = HTTPResponse.__init__
      def new_HTTPResponse__init__(self, *args, **kwargs):
          orig_HTTPResponse__init__(self, *args, **kwargs)
          try:
              self.peercertchain = self._connection.sock.getpeercertchain()
          except AttributeError:
              pass
      HTTPResponse.__init__ = new_HTTPResponse__init__
      
      HTTPAdapter = requests.adapters.HTTPAdapter
      orig_HTTPAdapter_build_response = HTTPAdapter.build_response
      def new_HTTPAdapter_build_response(self, request, resp):
          response = orig_HTTPAdapter_build_response(self, request, resp)
          try:
              response.peercertchain = resp.peercertchain
          except AttributeError:
              pass
          return response
      HTTPAdapter.build_response = new_HTTPAdapter_build_response
      

      您将获得resp.peercertchain,其中包含tupleOpenSSL.crypto.X509 对象

      【讨论】:

        【解决方案6】:

        为了检索证书的详细信息,例如 CN 和到期日期,改编自 example 的以下脚本运行良好。它还避免了我得到的一些错误,我认为这些错误是由于请求和 urllib3 的不正确/不兼容版本造成的:“AttributeError: 'SSLSocket' object has no attribute 'connection'”和“AttributeError: 'VerifiedHTTPSConnection' object has no attribute 'peer_certificate' "

        from OpenSSL.SSL import Connection, Context, SSLv3_METHOD, TLSv1_2_METHOD
        from datetime import datetime, time
        import socket
        host = 'www.google.com'
        try:
            try:
                ssl_connection_setting = Context(SSLv3_METHOD)
            except ValueError:
                ssl_connection_setting = Context(TLSv1_2_METHOD)
            ssl_connection_setting.set_timeout(5)
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.connect((host, 443))
                c = Connection(ssl_connection_setting, s)
                c.set_tlsext_host_name(str.encode(host))
                c.set_connect_state()
                c.do_handshake()
                cert = c.get_peer_certificate()
                print("Is Expired: ", cert.has_expired())
                print("Issuer: ", cert.get_issuer())
                subject_list = cert.get_subject().get_components()
                cert_byte_arr_decoded = {}
                for item in subject_list:
                    cert_byte_arr_decoded.update({item[0].decode('utf-8'): item[1].decode('utf-8')})
                print(cert_byte_arr_decoded)
                if len(cert_byte_arr_decoded) > 0:
                    print("Subject: ", cert_byte_arr_decoded)
                if cert_byte_arr_decoded["CN"]:
                    print("Common Name: ", cert_byte_arr_decoded["CN"])
                end_date = datetime.strptime(str(cert.get_notAfter().decode('utf-8')), "%Y%m%d%H%M%SZ")
                print("Not After (UTC Time): ", end_date)
                diff = end_date - datetime.now()
                print('Summary: "{}" SSL certificate expires on {} i.e. {} days.'.format(host, end_date, diff.days))
                c.shutdown()
                s.close()
        except:
            print("Connection to {} failed.".format(host))  
        

        此脚本需要 Python 3 和 pyOpenSSL。

        【讨论】:

          【解决方案7】:

          更清洁(-ish)的解决方案,基于以前非常好的答案!

          1. 需要在覆盖之前修补 requests.Adapter 源文件 HTTPResponse 类(待处理的拉取请求:https://github.com/psf/requests/pull/6039):
            • 将静态类变量添加到 class HTTPAdapter(BaseAdapter)_clsHTTPResponse = HTTPResponse
            • 修改 send() 方法以使用 _clsHTTPResponse 而不是直接创建 HTTPResponse 对象:resp = _clsHTTPResponse.from_httplib(...
          2. 使用此代码:
          """
          Subclassing HTTP / requests to get peer_certificate back from lower levels
          """
          from typing import Optional, Mapping, Any
          from http.client import HTTPSConnection
          from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
          from urllib3.poolmanager import PoolManager,key_fn_by_scheme
          from urllib3.connectionpool import HTTPSConnectionPool,HTTPConnectionPool
          from urllib3.connection import HTTPSConnection,HTTPConnection
          from urllib3.response import HTTPResponse as URLLIB3_HTTPResponse
          
          #force urllib3 to use pyopenssl
          import urllib3.contrib.pyopenssl
          urllib3.contrib.pyopenssl.inject_into_urllib3()  
          
          class HTTPSConnection_withcert(HTTPSConnection):
              def __init__(self, *args, **kw):
                  self.peer_certificate = None
                  super().__init__(*args, **kw)
              def connect(self):
                  res = super().connect() 
                  self.peer_certificate = self.sock.connection.get_peer_certificate()
                  return res
          
          class HTTPResponse_withcert(URLLIB3_HTTPResponse):
              def __init__(self, *args, **kwargs):
                  self.peer_certificate = None
                  res = super().__init__( *args, **kwargs)
                  self.peer_certificate = self._connection.peer_certificate
                  return res
                 
          class HTTPSConnectionPool_withcert(HTTPSConnectionPool):
              ConnectionCls   = HTTPSConnection_withcert
              ResponseCls     = HTTPResponse_withcert
              
          class PoolManager_withcert(PoolManager): 
              def __init__(
                  self,
                  num_pools: int = 10,
                  headers: Optional[Mapping[str, str]] = None,
                  **connection_pool_kw: Any,
              ) -> None:   
                  super().__init__(num_pools,headers,**connection_pool_kw)
                  self.pool_classes_by_scheme = {"http": HTTPConnectionPool, "https": HTTPSConnectionPool_withcert}
                  self.key_fn_by_scheme = key_fn_by_scheme.copy()
                          
          class HTTPAdapter_withcert(HTTPAdapter):
              _clsHTTPResponse = HTTPResponse_withcert
              def build_response(self, request, resp):
                  response = super().build_response( request, resp)
                  response.peer_certificate = resp.peer_certificate
                  return response
          
              def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
                  #do not call super() to not initialize PoolManager twice
                  # save these values for pickling
                  self._pool_connections  = connections
                  self._pool_maxsize      = maxsize
                  self._pool_block        = block
          
                  self.poolmanager        = PoolManager_withcert(num_pools=connections, 
                                                             maxsize=maxsize,
                                                             block=block, 
                                                             strict=True, 
                                                             **pool_kwargs)
          class Session_withcert(Session):
              def __init__(self):
                  super().__init__()
                  self.mount('https://', HTTPAdapter_withcert())
          
          1. 仅此而已! 您现在可以像使用基础会话一样使用新会话 Session_withcert(),但您也可以这样做:
          ss= Session_withcert()
          resp=ss.get("https://www.google.fr")
          resp.peer_certificate.get_subject()
          print(resp.peer_certificate.get_subject())
          

          将输出:

          <X509Name object '/CN=*.google.fr'>
          

          【讨论】:

            猜你喜欢
            • 2022-12-22
            • 2014-06-16
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2015-11-26
            • 1970-01-01
            相关资源
            最近更新 更多