【04】第三方API调用
一、第三方API
1. JIRA API
#!/usr/bin/env python # _*_ coding:utf-8 _*_ import json import requests # Suppress SSL warnings generated when contacting Foreman requests.packages.urllib3.disable_warnings() class JiraAPI(object): def __init__(self, host, username, password, proxy_host, proxy_port): # self.base_uri = "https://%s/jira/rest/api/2" % host self.base_uri = "https://%s/rest/api/2" % host self.headers = {\'Content-type\': \'application/json\'} self.auth = (username, password) self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} self.timeout = 30 def query_api(self, prefix, aid=None): ret = {} if prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code ret[\'retval\'] = json.loads(query_request.text) else: url = \'%s/%s\' % (self.base_uri, prefix) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code ret[\'retval\'] = json.loads(query_request.text) return ret def check_api(self, prefix, aid=None, subitem=None): ret = {} if prefix and aid and subitem: url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code ret[\'retval\'] = json.loads(query_request.text) elif prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code ret[\'retval\'] = json.loads(query_request.text) else: url = \'%s/%s\' % (self.base_uri, prefix) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code ret[\'retval\'] = json.loads(query_request.text) return ret def create_api(self, prefix, params=None): ret = {} url = \'%s/%s\' % (self.base_uri, prefix) update_request = requests.post(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params), proxies=self.proxy) ret[\'exitcode\'] = update_request.status_code if update_request.text == \'\' or update_request.text is None: ret[\'retval\'] = update_request.text else: ret[\'retval\'] = json.loads(update_request.text) return ret def update_api(self, prefix, aid=None, params=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) update_request = requests.put(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params), proxies=self.proxy) ret[\'exitcode\'] = update_request.status_code return ret def delete_api(self, prefix, aid=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) delete_request = requests.delete(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy) ret[\'exitcode\'] = delete_request.status_code ret[\'retval\'] = json.loads(delete_request.text) return ret
2. Foreman API
#!/usr/bin/env python # _*_ coding:utf-8 _*_ import json import requests # Suppress SSL warnings generated when contacting Foreman requests.packages.urllib3.disable_warnings() class ForemanAPI(object): def __init__(self, host, username, password): self.base_uri = "https://%s/api/v2" % host self.headers = {\'Content-type\': \'application/json\'} self.auth = (username, password) def query_api(self, prefix, aid=None): ret = {} try: if prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] else: url = \'%s/%s?per_page=500\' % (self.base_uri, prefix) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text)[\'results\'] else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] except: ret[\'exitcode\'] = \'\' ret[\'retval\'] = [] ret[\'msg\'] = \'Exception occurred, please double check\' return ret def search_api(self, prefix, aid=None, subitem=None): ret = {} try: if prefix and aid and subitem: url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] elif prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] else: url = \'%s/%s\' % (self.base_uri, prefix) query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text)[\'results\'] else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] except: ret[\'exitcode\'] = \'\' ret[\'retval\'] = [] ret[\'msg\'] = \'Exception occurred, please double check\' return ret def create_api(self, prefix, params=None): ret = {} try: url = \'%s/%s\' % (self.base_uri, prefix) update_request = requests.post(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params)) ret[\'exitcode\'] = update_request.status_code if ret[\'exitcode\'] == 201: ret[\'retval\'] = json.loads(update_request.text) else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] except: ret[\'exitcode\'] = \'\' ret[\'retval\'] = [] ret[\'msg\'] = \'Exception occurred, please double check\' return ret def update_api(self, prefix, aid=None, params=None): ret = {} try: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) update_request = requests.put(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params)) ret[\'exitcode\'] = update_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(update_request.text) else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] except: ret[\'exitcode\'] = \'\' ret[\'retval\'] = [] ret[\'msg\'] = \'Exception occurred, please double check\' return ret def delete_api(self, prefix, aid=None): ret = {} try: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) delete_request = requests.delete(url, verify=False, auth=self.auth, headers=self.headers) ret[\'exitcode\'] = delete_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(delete_request.text) else: ret[\'retval\'] = [] ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\'] except: ret[\'exitcode\'] = \'\' ret[\'retval\'] = [] ret[\'msg\'] = \'Exception occurred, please double check\' return ret
3. Provision API
#!/usr/bin/env python # _*_ coding:utf-8 _*_ import json import requests from libs.utils.tools import read_file_to_json from libs.utils.config_parsers import get_config # Suppress SSL warnings generated when contacting Foreman requests.packages.urllib3.disable_warnings() config = get_config() def get_v2api_token(host, uname, passwd, proxy_host, proxy_port): ret = {} timeout = 10 base_uri = "https://%s/v2/tokens" % host headers = {\'Content-type\': \'application/json\'} proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} json_file = \'%s/etc/token.json\' % config.get(\'global\', \'base_dir\') params = read_file_to_json(json_file) if uname and passwd: params[\'auth\'][\'passwordCredentials\'][\'username\'] = uname params[\'auth\'][\'passwordCredentials\'][\'password\'] = passwd token_request = requests.post(base_uri, verify=False, headers=headers, data=json.dumps(params), timeout=timeout, proxies=proxy) ret[\'exitcode\'] = token_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(token_request.text) ret[\'token\'] = ret[\'retval\'][\'access\'][\'token\'][\'id\'] ret[\'tenant\'] = ret[\'retval\'][\'access\'][\'token\'][\'tenant\'][\'id\'] else: ret[\'retval\'] = [] ret[\'token\'] = \'\' ret[\'tenant\'] = \'\' else: ret[\'exitcode\'] = 400 ret[\'retval\'] = [] ret[\'token\'] = \'\' ret[\'tenant\'] = \'\' return ret def get_keystone_token(host, port, proxy_host, proxy_port): ret = {} timeout = 20 base_uri = "https://%s:%s/v3/auth/tokens" % (host, port) headers = {\'Content-type\': \'application/json\'} proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} json_file = \'%s/etc/keystone.json\' % config.get(\'global\', \'base_dir\') params = read_file_to_json(json_file) # token_request = requests.post(base_uri, verify=False, headers=headers, data=json.dumps(params), timeout=timeout, proxies=proxy) token_request = requests.post(base_uri, verify=False, headers=headers, data=json.dumps(params), timeout=timeout) ret[\'exitcode\'] = token_request.status_code if ret[\'exitcode\'] == 201: ret[\'retval\'] = token_request.headers token_string = ret[\'retval\'][\'X-Subject-Token\'] else: token_string = \'\' ret[\'retval\'] = [] ret[\'exitcode\'] = 400 return token_string class EsAPI(object): def __init__(self, host, token, proxy_host, proxy_port): self.base_uri = "https://%s/v2" % host self.headers = {\'Content-type\': \'application/json\', \'X-Auth-Token\': token} self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} self.timeout = 30 def query_api(self, prefix, aid=None): ret = {} if prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, proxies=self.proxy) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text else: url = \'%s/%s\' % (self.base_uri, prefix) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, proxies=self.proxy) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text return ret def search_api(self, prefix, aid=None, subitem=None): ret = {} if prefix and aid and subitem: url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, proxies=self.proxy) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text elif prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, proxies=self.proxy) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text else: url = \'%s/%s\' % (self.base_uri, prefix) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, proxies=self.proxy) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text return ret def create_api(self, prefix, params=None): ret = {} url = \'%s/%s\' % (self.base_uri, prefix) update_request = requests.post(url, verify=False, headers=self.headers, data=json.dumps(params), proxies=self.proxy) ret[\'exitcode\'] = update_request.status_code if update_request.status_code == 200: ret[\'retval\'] = json.loads(update_request.text) else: ret[\'retval\'] = update_request.text return ret def update_api(self, prefix, aid=None, params=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) update_request = requests.put(url, verify=False, headers=self.headers, data=json.dumps(params), proxies=self.proxy) ret[\'exitcode\'] = update_request.status_code # ret[\'retval\'] = json.loads(update_request.text) return ret def delete_api(self, prefix, aid=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) delete_request = requests.delete(url, verify=False, headers=self.headers, proxies=self.proxy) ret[\'exitcode\'] = delete_request.status_code if delete_request.status_code == 200: ret[\'retval\'] = json.loads(delete_request.text) else: ret[\'retval\'] = delete_request.text return ret class GlanceAPI(object): def __init__(self, host, port, token, proxy_host, proxy_port): self.base_uri = "https://%s:%s/v2" % (host, port) self.headers = {\'Content-type\': \'application/json\', \'X-Auth-Token\': token} self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} self.timeout = 30 def query_api(self, prefix, aid=None): ret = {} if prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, proxies=self.proxy) # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text else: url = \'%s/%s\' % (self.base_uri, prefix) # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, # proxies=self.proxy) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text return ret def search_api(self, prefix, aid=None, subitem=None): ret = {} if prefix and aid and subitem: url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem) # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, # proxies=self.proxy) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text elif prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, # proxies=self.proxy) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text else: url = \'%s/%s\' % (self.base_uri, prefix) # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout, # proxies=self.proxy) query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if query_request.status_code == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text return ret def create_api(self, prefix, params=None): ret = {} url = \'%s/%s\' % (self.base_uri, prefix) # update_request = requests.post(url, verify=False, headers=self.headers, data=json.dumps(params), # proxies=self.proxy) update_request = requests.post(url, verify=False, headers=self.headers, data=json.dumps(params)) ret[\'exitcode\'] = update_request.status_code if update_request.status_code == 200: ret[\'retval\'] = json.loads(update_request.text) else: ret[\'retval\'] = update_request.text return ret def update_api(self, prefix, aid=None, params=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) # update_request = requests.put(url, verify=False, headers=self.headers, data=json.dumps(params), # proxies=self.proxy) update_request = requests.put(url, verify=False, headers=self.headers, data=json.dumps(params)) ret[\'exitcode\'] = update_request.status_code # ret[\'retval\'] = json.loads(update_request.text) return ret def delete_api(self, prefix, aid=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) # delete_request = requests.delete(url, verify=False, headers=self.headers, proxies=self.proxy) delete_request = requests.delete(url, verify=False, headers=self.headers) ret[\'exitcode\'] = delete_request.status_code if delete_request.status_code == 200: ret[\'retval\'] = json.loads(delete_request.text) else: ret[\'retval\'] = delete_request.text return ret
4. Qualys API
#!/usr/bin/env python # _*_ coding:utf-8 _*_ import json import requests import xml.etree.ElementTree as et # Suppress SSL warnings generated when contacting Foreman requests.packages.urllib3.disable_warnings() class QualysAPI(object): def __init__(self, host, username, password, proxy_host, proxy_port): self.base_uri = "https://%s/api/2.0/fo" % host self.headers = {\'X-Requested-With\': \'VPC Qualys API\'} self.auth = (username, password) self.username = username self.password = password self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} self.timeout = 30 self.session = requests.Session() # Logon the Qualys API server def login(self): retdata = dict() payload = { \'action\': \'login\', \'username\': self.username, \'password\': self.password } url = \'%s/%s\' % (self.base_uri, \'session/\') print(\'Web URI: %s\' % url) try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) # r = self.session.post(url, headers=self.headers, data=payload) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: xml_ret = et.fromstring(r.text) for elem in xml_ret.findall(\'.//TEXT\'): retdata[\'retval\'] = elem.text else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata # Logout the Qualys API server once task done def logout(self): retdata = dict() payload = { \'action\': \'logout\' } url = \'%s/%s\' % (self.base_uri, \'session/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) # r = self.session.post(url, headers=self.headers, data=payload) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: xml_ret = et.fromstring(r.text) for elem in xml_ret.findall(\'.//TEXT\'): retdata[\'retval\'] = elem.text else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e self.session.close() print(retdata) return retdata # Trigger the Vulnerability and Compliance Scan def launch_scan(self, action, ip_address, scan_title, option_id, iscanner_name): retdata = dict() payload = { \'action\': \'launch\', \'ip\': ip_address, \'iscanner_name\': iscanner_name, \'option_id\': option_id, \'scan_title\': scan_title, } if action == \'compliance\': url = \'%s/%s\' % (self.base_uri, \'scan/compliance/\') else: url = \'%s/%s\' % (self.base_uri, \'scan/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: xml_ret = et.fromstring(r.text) for elem in xml_ret.findall(\'.//ITEM\'): if elem[0].text == \'REFERENCE\': retdata[\'retval\'] = elem[1].text else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata # Query Security scan status def query_scan_status(self, action, scan_ref): retdata = dict() payload = { \'action\': \'list\', \'scan_ref\': scan_ref, } if action == \'compliance\': url = \'%s/%s\' % (self.base_uri, \'scan/compliance/\') else: url = \'%s/%s\' % (self.base_uri, \'scan/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: xml_ret = et.fromstring(r.text) for elem in xml_ret.findall(\'.//STATUS\'): status = elem[0].text retdata[\'retval\'] = status else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata # Download Vulnerability Scan Report def download_vul_report(self, scan_ref, filename): retdata = dict() payload = { \'action\': \'fetch\', \'scan_ref\': scan_ref, \'output_format\': \'csv_extended\' } url = \'%s/%s\' % (self.base_uri, \'scan/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: with open(filename, "wb") as report: report.write(r.content) retdata[\'retval\'] = \'Download Vulnerability Scan Report Success\' else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata # Parse Compliance Policy def parse_policy(self, os_type): retdata = dict() payload = { \'action\': \'list\', } url = \'%s/%s\' % (self.base_uri, \'compliance/policy/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: xml_ret = et.fromstring(r.text) for elem in xml_ret.findall(\'.//POLICY\'): if \'[+ PROD]\' in elem[1].text and \'DXC Baseline %s\' % os_type in elem[1].text: policy_id = elem[0].text retdata[\'retval\'] = policy_id # else: # retdata[\'retval\'] = \'\' else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata # Launch Compliance Scan Report def launch_report(self, template_id, report_title, ip_address, policy_id): retdata = dict() payload = { \'action\': \'launch\', \'report_type\': \'Policy\', \'template_id\': template_id, \'output_format\': \'csv\', \'report_title\': report_title, \'policy_id\': policy_id, \'ips\': ip_address } url = \'%s/%s\' % (self.base_uri, \'report/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: xml_ret = et.fromstring(r.text) for elem in xml_ret.findall(\'.//ITEM\'): if elem[0].text == \'ID\': report_id = elem[1].text retdata[\'retval\'] = report_id else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata # Query Compliance Report Status def query_report_status(self, report_id): retdata = dict() payload = { \'action\': \'list\', \'id\': report_id, } url = \'%s/%s\' % (self.base_uri, \'report/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: xml_ret = et.fromstring(r.text) for elem in xml_ret.findall(\'.//STATUS\'): status = elem[0].text retdata[\'retval\'] = status else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata # Download Compliance Report def download_report(self, report_id, filename): retdata = dict() payload = { \'action\': \'fetch\', \'id\': report_id, } url = \'%s/%s\' % (self.base_uri, \'report/\') try: r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy) retdata[\'exitcode\'] = r.status_code if r.status_code == 200: with open(filename, "wb") as report: report.write(r.content) retdata[\'retval\'] = \'Download Compliance Scan Report Success\' else: retdata[\'retval\'] = r.text except Exception as e: retdata[\'exitcode\'] = 400 retdata[\'retval\'] = e print(retdata) return retdata
5. Confluence API
#!/usr/bin/env python # _*_ coding:utf-8 _*_ import json import requests # Suppress SSL warnings generated when contacting Foreman requests.packages.urllib3.disable_warnings() media_space = \'131269718\' def validate_api(conf_host, uname, passwd, proxy_host, proxy_port): ret = {} timeout = 10 headers = {\'Content-type\': \'application/json\'} if conf_host and uname and passwd: auth = (uname, passwd) url = \'https://%s/rest/api/content/%s/child/page\' % (conf_host, media_space) if proxy_host and proxy_port: proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} query_request = requests.get(url, verify=False, auth=auth, headers=headers, proxies=proxy, timeout=timeout) else: query_request = requests.get(url, verify=False, auth=auth, headers=headers, timeout=timeout) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = [] else: ret[\'exitcode\'] = 400 ret[\'retval\'] = [] return ret def read_file_to_json(mapping_file): try: with open(mapping_file) as data_file: data = json.load(data_file) except Exception as e: data = None return data class ConfluenceAPI(object): def __init__(self, host, username, password, proxy_host, proxy_port): self.base_uri = "https://%s/rest/api" % host self.headers = {\'Content-type\': \'application/json\'} self.auth = (username, password) self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)} self.timeout = 30 def query_api(self, prefix, aid=None): ret = {} if prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) try: query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text except Exception as e: ret[\'retval\'] = str(e) else: url = \'%s/%s\' % (self.base_uri, prefix) try: query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text except Exception as e: ret[\'retval\'] = str(e) return ret def search_api(self, prefix, aid=None, subitem=None): ret = {} if prefix and aid and subitem: url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem) try: query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text except Exception as e: ret[\'retval\'] = str(e) elif prefix and aid: url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) try: query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text except Exception as e: ret[\'retval\'] = str(e) else: url = \'%s/%s\' % (self.base_uri, prefix) try: query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy, timeout=self.timeout) ret[\'exitcode\'] = query_request.status_code if ret[\'exitcode\'] == 200: ret[\'retval\'] = json.loads(query_request.text) else: ret[\'retval\'] = query_request.text except Exception as e: ret[\'retval\'] = str(e) return ret def create_api(self, prefix, params=None): ret = {} url = \'%s/%s\' % (self.base_uri, prefix) try: update_request = requests.post(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params), proxies=self.proxy) ret[\'exitcode\'] = update_request.status_code if update_request.text == \'\' or update_request.text is None: ret[\'retval\'] = update_request.text else: ret[\'retval\'] = json.loads(update_request.text) except Exception as e: ret[\'retval\'] = str(e) return ret def update_api(self, prefix, aid=None, params=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) try: update_request = requests.put(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params), proxies=self.proxy) ret[\'exitcode\'] = update_request.status_code ret[\'retval\'] = json.loads(update_request.text) except Exception as e: ret[\'retval\'] = str(e) return ret def delete_api(self, prefix, aid=None): ret = {} url = \'%s/%s/%s\' % (self.base_uri, prefix, aid) try: delete_request = requests.delete(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy) ret[\'exitcode\'] = delete_request.status_code ret[\'retval\'] = json.loads(delete_request.text) except Exception as e: ret[\'retval\'] = str(e) return ret if __name__ == \'__main__\': proxy_host = \'x.x.x.x\' proxy_port = \'8080\' conf_host = \'confluence.xxx.com\' conf_uname = \'uname\' conf_pass = \'pass\' data = validate_api(conf_host, conf_uname, conf_pass, proxy_host, proxy_port) print(data) print(\'=================================\') conf_api = ConfluenceAPI(conf_host, conf_uname, conf_pass, proxy_host, proxy_port) get_data = conf_api.query_api(\'content\', \'15234\') print(get_data)
二、API编写