【问题标题】:Python tnsnames.ora parserPython tnsnames.ora 解析器
【发布时间】:2017-04-19 14:34:47
【问题描述】:

我需要一个包含来自tnsnames.ora 文件的所有数据库连接的字典。

我需要从这里开始:

(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com)))

到这里:

{'DESCRIPTION': [{'ADDRESS_LIST': [{'ADDRESS': [{'PROTOCOL': 'TCP'},
                                               {'HOST': 'mydbserver.mydomain.com'},
                                               {'PORT': '1521'}
                                               ]
                                   }]
                  },
                  {'CONNECT_DATA': [{'SID': 'CATAL'},
                                    {'SERVER': 'DEDICATED'},
                                    {'SERVICE_NAME': 'mydb.mydomain.com'}
                                   ]
                  }
                ]
}

到目前为止,我的代码是:

def get_param(param_string):
    print("get_param input:", param_string)
    if param_string.count("(") != param_string.count(")"):
        raise Exception("Number of '(' is not egal to number of ')' : " + str(param_string.count("(")) + " and " + str(param_string.count(")")))
    else:
        param_string = param_string[1:-1]
        splitted     = param_string.split("=")
        keywork      = splitted[0]
        if len(splitted) == 2:
            return {keywork: splitted[1]}
        else:
            splitted.remove(keywork)
            values       = "=".join(splitted)
            return {keywork: get_value_list(values)}

def get_value_list(value_string):
    print("get_value_list input:", value_string)
    to_return = list()
    if "=" not in value_string and "(" not in value_string and ")" not in value_string:
        to_return.append(value_string)
    elif value_string[0] != "(":
        raise Exception("[ERROR] Format error '(' is not the first char: " + repr(value_string))
    else:
        parenth_count = 0
        strlen        = len(value_string)
        current_value = ""
        for i in range(0,strlen):
            current_char = value_string[i]
            current_value += current_char
            if current_char == "(":
                parenth_count += 1
            elif current_char == ")":
                parenth_count += -1
                if parenth_count == 0:
                    to_return.append(get_param(current_value))
                    if i != (strlen - 1):
                        if value_string[i+1] == "(":
                           to_return += get_value_list(value_string[i+1:])
                        else:
                            raise Exception("Format error - Next char should be a '('. value_string[i+1]:" + repr(value_string[i+1]) )
                    break
    print("get_value_list return:", to_return)
    if len(to_return) == 0:
        to_return = ""
    elif len(to_return) == 1:
        to_return = to_return[0]
    return to_return

connection_infos   = "(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com)))"
current_connection = get_param(connection_infos)
print("current_connection:", current_connection)
pprint(current_connection)

我得到了这个:

{'DESCRIPTION': [{'ADDRESS_LIST': {'ADDRESS': [{'PROTOCOL': 'TCP'},
                                               {'HOST': 'mydbserver.mydomain.com'},
                                                'PORT']
                                   }
                  },
                  'CONNECT_DATA'
                ]
}

所以我做错了什么。而且我觉得我做的事情太复杂了。有人会指出我犯的一些错误或帮助我找到更简单的方法吗?

【问题讨论】:

  • 恕我直言,您可以为 tnsnames.ora 下载 ANTLR 语法,然后让 ANTLR 为您生成解析器源代码。
  • 此外,您的解析器似乎不支持“include”子句。
  • 查看这个预言机页面:Mastering Oracle+Python, Part 3: Data Parsing。寻找考虑解析一个 tnsnames.ora 文件

标签: python oracle python-3.x tnsnames


【解决方案1】:

我现在有一个工作代码,但我对它并不满意。它太长,不灵活,并且不适用于其他一些 tnsnames.ora 可能的格式:

class Tnsnames():
    def __init__(self, file_path, file_name='tnsnames.ora'):
        self.file_path  = file_path
        self.file_name  = file_name
        self.load_file()
    def load_file(self):
        try:
            fhd = open(os.path.join(self.file_path, self.file_name), 'rt', encoding='utf-8')
        except:
            raise
        else:
            #Oracle doc : https://docs.oracle.com/cd/B28359_01/network.111/b28317/tnsnames.htm#NETRF007
            file_content      = list()
            for l in fhd:
                l = " ".join(l.split()).strip(" \n")
                if len(l) > 0:
                    if "#" not in l:
                        file_content.append(l)
            fhd.close()
            file_content           = " ".join(file_content)
            connections_list       = dict()
            current_depth          = 0
            current_word           = ""
            current_keyword        = ""
            name_to_register       = ""
            is_in_add_list         = False
            current_addr           = dict()
            connections_aliases    = dict()
            stop_registering       = False
            connections_duplicates = list()
            for c in file_content:
                if c == " ":
                    pass
                elif c == "=":
                    current_keyword = str(current_word)
                    current_word    = ""
                    if current_keyword == "ADDRESS_LIST":
                        is_in_add_list = True
                elif c == "(":
                    if current_depth == 0:
                        current_keyword = current_keyword.upper()
                        names_list      = current_keyword.replace(" ","").split(",")
                        if len(names_list) == 1:
                            name_to_register = names_list[0]
                        else:
                            name_to_register = None
                            # We use either the first name with at least 
                            # a dot in it, or the longest one.
                            for n in names_list:
                                if "." in n:
                                    name_to_register = n
                                    break
                            else:
                                name_to_register = max(names_list, key=len)
                            names_list.remove(name_to_register)
                            for n in names_list:
                                if n in connections_aliases.keys():
                                    print("[ERROR] already registered alias:", n, 
                                          ". Registered to:", connections_aliases[n], 
                                          ". New:", name_to_register, 
                                          ". This possible duplicate will not be registered.")
                                    connections_duplicates.append(n) 
                                    stop_registering = True
                                else:
                                    connections_aliases[n] = name_to_register
                        if not stop_registering:
                            connections_list[name_to_register] = {"ADDRESS_LIST": list(), 
                                                                  "CONNECT_DATA": dict(),
                                                                  "LAST_TEST_TS": None}
                        current_depth += 1
                    elif current_depth in [1,2,3]:
                        current_depth += 1
                    else:
                        print("[ERROR] Incorrect depth:", repr(current_depth), ". Current connection will not be registered" )
                        del connections_list[name_to_register]
                        stop_registering = True
                elif c == ")":
                    if current_depth == 1:
                        if stop_registering:
                            stop_registering = False
                        else:
                            # Before moving to next connection,
                            # we check that current connection 
                            # have at least a HOST, and a SID or
                            # SERVICE_NAME
                            connection_is_valid = True
                            if isinstance(connections_list[name_to_register]["ADDRESS_LIST"], dict):
                                if "HOST" not in connections_list[name_to_register]["ADDRESS_LIST"].keys():
                                    print("[ERROR] Only one address defined, and no HOST defined. Current connection will not be registered:", name_to_register)
                                    connection_is_valid = False
                            elif isinstance(connections_list[name_to_register]["ADDRESS_LIST"], list):
                                for current_address in connections_list[name_to_register]["ADDRESS_LIST"]:
                                    if "HOST" in current_address.keys():
                                        break
                                else:
                                    print("[ERROR] Multiple addresses but none with HOST. Current connection will not be registered:", name_to_register)
                                    connection_is_valid = False
                            else:
                                print("[ERROR] Incorrect address format:", connections_list[name_to_register]["ADDRESS_LIST"], " Connection:", name_to_register)
                                connection_is_valid = False
                            if not connection_is_valid:
                                del connections_list[name_to_register]
                            else:
                                if "SERVICE_NAME" not in connections_list[name_to_register]["CONNECT_DATA"].keys() and \
                                   "SID" not in connections_list[name_to_register]["CONNECT_DATA"].keys():
                                    print("[ERROR] Missing SERVICE_NAME / SID for connection:", name_to_register)
                                    del connections_list[name_to_register]
                    elif current_depth == 2:
                        if is_in_add_list:
                            is_in_add_list = False
                            if not stop_registering:
                                if len(connections_list[name_to_register]["ADDRESS_LIST"]) == 1:
                                    connections_list[name_to_register]["ADDRESS_LIST"] = connections_list[name_to_register]["ADDRESS_LIST"][0]
                    elif current_depth == 3:
                        if is_in_add_list:
                            if not stop_registering:
                                connections_list[name_to_register]["ADDRESS_LIST"].append(current_addr)
                            current_addr   = dict()
                        elif current_keyword.upper() in ["SID", "SERVER", "SERVICE_NAME"]:
                            if not stop_registering:
                                connections_list[name_to_register]["CONNECT_DATA"][current_keyword.upper()] = current_word.upper()
                    elif current_depth == 4:
                        if is_in_add_list:
                            if not stop_registering:
                                current_addr[current_keyword.upper()] = current_word.upper()
                    current_keyword = ""
                    current_word    = ""
                    current_depth  += -1
                else:
                    current_word += c
        self.connections = connections_list
        self.aliases     = connections_aliases
        self.duplicates  = connections_duplicates

测试 tnsnames.ora :

########################################
# This is a sample tnsnames.ora        #
########################################


###################################################
#  PRODDB
###################################################

proddb.mydbs.domain.com, PRODDB =
  (DESCRIPTION =
    (ADDRESS_LIST =
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb1.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb2.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb3.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb4.mydbs.domain.com)(PORT = 1522))
    )
    (CONNECT_DATA =
      (SID = PRODDB)
      (SERVER = DEDICATED)
      (SERVICE_NAME = proddb.mydbs.domain.com)
    )
  )

###################################################
#  DEVDBA : Test database for DBA usage 
###################################################

devdba.mydbs.domain.com, DEVDBA =
  (DESCRIPTION =
    (ADDRESS_LIST =
      (ADDRESS = (PROTOCOL = TCP)(HOST = devdba.mydbs.domain.com)(PORT = 1521))
    )
    (CONNECT_DATA =
      (SID = DEVDBA)
    )
  )

测试代码:

from pprint       import pprint
from lib_database import Tnsnames

tnsnnames = Tnsnames('/usr/lib/oracle/12.2/client64/network/admin')
print('Connexions:')
pprint(tnsnnames.connections)
print('Aliases:')
pprint(tnsnnames.aliases)
print('Duplicates:')
pprint(tnsnnames.duplicates)

输出:

Connexions:
    {'DEVDBA.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': {'HOST': 'DEVDBA.MYDBS.DOMAIN.COM',
                                                  'PORT': '1521',
                                                  'PROTOCOL': 'TCP'},
                                 'CONNECT_DATA': {'SID': 'DEVDBA'},
     'PRODDB.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': [{'HOST': 'PRODDB1.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB2.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB3.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB4.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'}],
                                 'CONNECT_DATA': {'SERVER': 'DEDICATED',
                                                  'SERVICE_NAME': 'PRODDB.MYDBS.DOMAIN.COM',
                                                  'SID': 'PRODDB'}}
Aliases:
    {'DEVDBA': 'DEVDBA.MYDBS.DOMAIN.COM', 'PRODDB': 'PRODDB.MYDBS.DOMAIN.COM'}
Duplicates:
    []

我找不到其他用于 tnsnames.ora 文件的 Python 解析器。如果你知道一个,请指点我。

【讨论】:

    【解决方案2】:

    您可以使用pyparsing

    import pyparsing as pp
    
    # 1. Literals
    VAR = pp.Word(pp.alphas + "_", pp.alphanums + "_").setName('variable')
    SPACE = pp.Suppress(pp.Optional(pp.White()))
    EQUALS = SPACE + pp.Suppress('=') + SPACE
    OPEN = pp.Suppress('(') + SPACE
    CLOSE = pp.Suppress(')') + SPACE
    
    INTEGER = pp.Optional('-') + pp.Word(pp.nums) + ~pp.Char(".")
    INTEGER.setParseAction(lambda t: int(t[0]))
    
    FLOAT = pp.Optional('-') + pp.Word(pp.nums) + pp.Char('.') + pp.Optional(pp.Word(pp.nums))
    FLOAT.setParseAction(lambda t: float(t[0]))
    
    STRING = pp.Word(pp.alphanums + r'_.-')
    
    # 2. Literal assignment expressions: (IDENTIFIER = VALUE)
    INTEGER_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + INTEGER + CLOSE)
    FLOAT_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + FLOAT + CLOSE)
    STRING_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + STRING + CLOSE)
    
    # 3. Nested object assignment
    ASSIGNMENT = pp.Forward()
    NESTED_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + ASSIGNMENT + CLOSE)
    ASSIGNMENT << pp.OneOrMore(INTEGER_ASSIGNMENT |
                               FLOAT_ASSIGNMENT |
                               STRING_ASSIGNMENT |
                               NESTED_ASSIGNMENT)
    
    # 4. Net service name(s): NAME(.DOMAIN)[, NAME(.DOMAIN)...]
    NET_SERVICE_NAME = pp.OneOrMore(pp.Word(pp.alphas + '_' + '.', pp.alphanums + '_' + '.')
                                    + pp.Optional(pp.Suppress(',')))
    
    # 5. Full TNS entry
    TNS_ENTRY = NET_SERVICE_NAME + EQUALS + ASSIGNMENT
    

    以下是一些示例数据:

    TNS_NAMES_ORA = """
    MYDB =
      (DESCRIPTION =
        (ADDRESS_LIST =
          (ADDRESS = (PROTOCOL = TCP)(HOST = server01)(PORT = 25881))
        )
        (CONNECT_DATA =
          (SID = MYDB01)
        )
      )
    
    OTHERDB.DOMAIN, ALIAS_FOR_OTHERDB.DOMAIN = 
      (DESCRIPTION_LIST =
        (DESCRIPTION =
          (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)
                                     (HOST = server02)
                                     (PORT = 25881)
          ))
          (CONNECT_DATA = (SID = MYDB02))
        )
      )
    """
    

    这部分有点一次性,但这里有一个在此之上添加一些额外解析以提取所有数据的示例:

    def _parse_addresses(tns_entry, addresses):
        """
        Parse ADDRESS keywords from the a TNS entry
        :param definition: Unparsed part of the TNS entry
        :param addresses: List of addresses parsed
        """
        keyword = tns_entry[0]
        # Base Case: We found an ADDRESS, so extract the data 
        # and do not recurse into it
        if keyword.upper() == 'ADDRESS':
            port = None
            host = None
            for k, v in tns_entry[1:]:
                if k == 'PORT':
                    port = v
                elif k == 'HOST':
                    host = v
            if port is None:
                print('WARNING: Ignoring ADDRESS due to missing PORT')
            elif host is None:
                print('WARNING: Ignoring ADDRESS due to missing HOST')
            addresses.append({'host': host, 'port': port})
        # Else recursively descend through the definition
        for d in tns_entry[1:]:
            # Only parse sub-lists, not literals
            if isinstance(d, list):
                _parse_addresses(d, addresses)
    
    def _parse_connect_data(tns_entry, sids):
        """
        Parse CONNECT_DATA keywords from the a TNS entry
        :param definition: Unparsed part of the TNS entry
        :param sids: List of Oracle SIDs
        """
        keyword = tns_entry[0]
        # Base Case: We found a CONNECT_DATA, so extract the data 
        # and do not recurse into it
        if keyword.upper() == 'CONNECT_DATA':
            sid = None
            for k, v in tns_entry[1:]:
                if k == 'SID':
                    sid = v
            if sid is None:
                print('WARNING: Ignoring CONNECT_DATA due to missing SID')
            sids.append(sid)
        for d in tns_entry[1:]:
            # Only parse sub-lists, not literals
            if isinstance(d, list):
                _parse_connect_data(d, sids)
            
    def get_connection_info(net_service_name: str, tns_string: str):
        """
        Generator over all simple connections inferred from a TNS entry
        :param net_service_name: Net service name to return connection info for
        :param tns_string: tnsnames.ora file contents
        """
        # Parse the TNS entries and keep the requested definition
        definition = None
        for tokens, _start, _end in TNS_ENTRY.scanString(tns_string):
            if net_service_name in tokens.asList()[0]:
                definition = tokens.asList()[1]
                break
        # Check if we found a definition
        if definition is None:
            raise KeyError(f'No net service named {net_service_name}')
        # Look for all the ADDRESS keywords
        addresses = []
        _parse_addresses(definition, addresses)
        # Look for all CONNECT_DATA keywords
        sids = []
        _parse_connect_data(definition, sids)
        # Emit all combinations
        for address in addresses:
            for sid in sids:
                yield {'sid': sid, **address}
    
    # Try it out!
    for connection_info in get_connection_info('MYDB', TNS_NAMES_ORA):
        print(connection_info)
    

    为了“好玩”,我在这里写了一篇关于它的博文: https://unparameterized.blogspot.com/2021/02/parsing-oracle-tns-files-in-python.html

    【讨论】:

      【解决方案3】:
      import re
      
      def find_match(tns_regex, y):
          x1 = re.match(tns_regex, y, re.M + re.I + re.S)
          if x1 is not None:
              x1 = x1.groups(1)[0] # Only first match is returned
              x1 = x1.strip('\n')
          return(x1)
      
      
      # Removing commented text
      with open("C:\\Oracle\\product\\11.2.0\\client_1\\network\\admin\\tnsnames.ora") as tns_file:
          with open("test_tns.ora", 'w+') as output:
              lines =tns_file.readlines()
              for line in lines:
                  if not line.startswith('#'):
                      output.write(line)
      
      
      with open('test_tns.ora') as tns_file:
          tnsnames = tns_file.read()
      
      
      tnsnames1 = re.split(r"\){3,}\n\n", tnsnames)
      
      # Regex matches
      tns_name = '^(.+?)\s?\=\s+\(DESCRIPTION.*'
      tns_host = '.*?HOST\s?=\s?(.+?)\)'
      tns_port = '.*?PORT\s?=\s?(\d+?)\)'
      tns_sname = '.*?SERVICE_NAME\s?=\s?(.+?)\)'
      tns_sid = '.*?SID\s?=\s?(.+?)\)'
      
      easy_connects = []
      
      for y in tnsnames1:
          y = '%s))' % y
      
          l = [find_match(x, y) for x in [tns_name, tns_host, tns_port, tns_sname, tns_sid]]
      
          d = {
              'name': l[0],
              'host': l[1],
              'port': l[2],
              'service_name': l[3],
              'sid': l[4]
          }
      
          easy_connects.append(d)
      
      
      print(easy_connects)
      

      【讨论】:

        【解决方案4】:

        我写了这个小代码。它解析 tnsnames.ora。它速度快,效果很好。

        def parse_ora_tns_file(fpath,tnskey=None,return_all_keys=False,view_file=False,logger=None)->str:
         
            """
            This function parse oracle tns file
            parms: fpath : full file path like
            fpath=filepath\tnsnames.ora
            param: tnskey: find tns entry for given tns key like tnskey='ABC.WORLD'
            param: return_all_keys: if True it will return all tns key names
            param: view_file : if True it returns tnsnames.ora as str
            """
            clean_tns_file=''
            if logger:
                logger.info('Reading tnsnames ora file at {} ...'.format(fpath))
            with open(fpath,mode='r') as tns_file:
         
                lines =tns_file.readlines()
                for line in lines:
                    if not line.startswith('#'):
         
                        clean_tns_file=clean_tns_file+line
         
         
         
         
         
            #clean file
            clean_str = clean_tns_file.replace('\n','')
            clean_str = clean_str.replace('\t','')
         
            #replace = with ' ' so later I can split with ' '
            #with below it becomes like ABC.WORLD = (DESCRIPTION) to ABC.WORLD ' ' (DESCRIPTION)
            clean_str = clean_str.replace('=',' ')
         
            #Below one output to ['ABC.WORLD','',' (DESCRIPTION)']
            lstresult= clean_str.split(" ")
            #Below code remove extra space from char list like it becomes ['ABC.WORLD','','(DESCRIPTION)']
            lstresult = [txt.strip() for txt in lstresult]
         
            #Below code to replace () chars to '' from list so output as ['ABC.WORLD','','DESCRIPTION']
            removetable = str.maketrans('', '', '()')
            out_list = [s.translate(removetable) for s in lstresult]
         
            #Below code remove any empty list items so output as ['ABC.WORLD','DESCRIPTION']
            out_list = list(filter(None, out_list))
         
         
         
            #find index of DESCRIPTION words
            indices = [i for i, x in enumerate(out_list ) if x.upper() == "DESCRIPTION"]
            tns_keys= ""
         
            for i in indices:
                #use index of DESCRIPTION to tns keys which is required for regx pattern below.
                tns_keys=tns_keys+out_list[i-1]+"|"
                 
         
            if return_all_keys:
                return tns_keys.replace('|',',')[:-1]
         
            if logger:
                logger.info('Keys found in tnsnames ora: {}'.format(tns_keys))
            regex = r"\s+(?!^({}))".format(tns_keys)
            result = re.sub(regex, '', clean_tns_file, 0, re.MULTILINE)
         
            if view_file:
                return result
            if result:
                for match in re.finditer(r'^((?:{}))(.*)'.format(tns_keys), result, re.MULTILINE):
         
                    if match.group(1) == tnskey:
                         
                         
                        #removing = sign from start of entry
                        if logger:
                            logger.info('Found tns entry: {} {}'.format(match.group(1),match.group(2)))
                        return match.group(2)[1:]
         
            if logger:
                logger.info('No tns entry found for {}'.format(tnskey))
            return None
        

        【讨论】:

        • 虽然此代码可能会回答问题,但提供有关它如何和/或为什么解决问题的额外上下文将提高​​答案的长期价值。
        • 欢迎来到 SO!请使用tour 并阅读How to Answer。然后使用与该评论相关的内容更新您的答案。
        • 这个问题没有给出为什么需要在 dict 中转换 tns 条目的上下文。
        • 例如,您可以详细说明为什么原始代码不起作用,以及使您的代码起作用的不同之处。
        猜你喜欢
        • 2010-11-17
        • 1970-01-01
        • 2011-09-18
        • 2013-03-29
        • 2012-12-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多