bhan5

【04】第三方API调用

一、第三方API

1. JIRA API

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import json
import requests

# Suppress SSL warnings generated when contacting Foreman
requests.packages.urllib3.disable_warnings()

class JiraAPI(object):
    def __init__(self, host, username, password, proxy_host, proxy_port):
        # self.base_uri = "https://%s/jira/rest/api/2" % host
        self.base_uri = "https://%s/rest/api/2" % host
        self.headers = {\'Content-type\': \'application/json\'}
        self.auth = (username, password)
        self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
        self.timeout = 30

    def query_api(self, prefix, aid=None):
        ret = {}
        if prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                         timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            ret[\'retval\'] = json.loads(query_request.text)
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                         timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            ret[\'retval\'] = json.loads(query_request.text)

        return ret

    def check_api(self, prefix, aid=None, subitem=None):
        ret = {}
        if prefix and aid and subitem:
            url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem)
            query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                         timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            ret[\'retval\'] = json.loads(query_request.text)
        elif prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                         timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            ret[\'retval\'] = json.loads(query_request.text)
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                         timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            ret[\'retval\'] = json.loads(query_request.text)
        return ret

    def create_api(self, prefix, params=None):
        ret = {}
        url = \'%s/%s\' % (self.base_uri, prefix)
        update_request = requests.post(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params),
                                       proxies=self.proxy)
        ret[\'exitcode\'] = update_request.status_code
        if update_request.text == \'\' or update_request.text is None:
            ret[\'retval\'] = update_request.text
        else:
            ret[\'retval\'] = json.loads(update_request.text)

        return ret

    def update_api(self, prefix, aid=None, params=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        update_request = requests.put(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params),
                                      proxies=self.proxy)
        ret[\'exitcode\'] = update_request.status_code

        return ret

    def delete_api(self, prefix, aid=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        delete_request = requests.delete(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy)
        ret[\'exitcode\'] = delete_request.status_code
        ret[\'retval\'] = json.loads(delete_request.text)

        return ret
View Code

2. Foreman API

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import json
import requests

# Suppress SSL warnings generated when contacting Foreman
requests.packages.urllib3.disable_warnings()


class ForemanAPI(object):

    def __init__(self, host, username, password):
        self.base_uri = "https://%s/api/v2" % host
        self.headers = {\'Content-type\': \'application/json\'}
        self.auth = (username, password)

    def query_api(self, prefix, aid=None):
        ret = {}
        try:
            if prefix and aid:
                url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = []
                    ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
            else:
                url = \'%s/%s?per_page=500\' % (self.base_uri, prefix)
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)[\'results\']
                else:
                    ret[\'retval\'] = []
                    ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
        except:
            ret[\'exitcode\'] = \'\'
            ret[\'retval\'] = []
            ret[\'msg\'] = \'Exception occurred, please double check\'

        return ret

    def search_api(self, prefix, aid=None, subitem=None):
        ret = {}
        try:
            if prefix and aid and subitem:
                url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem)
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = []
                    ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
            elif prefix and aid:
                url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = []
                    ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
            else:
                url = \'%s/%s\' % (self.base_uri, prefix)
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)[\'results\']
                else:
                    ret[\'retval\'] = []
                    ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
        except:
            ret[\'exitcode\'] = \'\'
            ret[\'retval\'] = []
            ret[\'msg\'] = \'Exception occurred, please double check\'

        return ret

    def create_api(self, prefix, params=None):
        ret = {}
        try:
            url = \'%s/%s\' % (self.base_uri, prefix)
            update_request = requests.post(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params))
            ret[\'exitcode\'] = update_request.status_code
            if ret[\'exitcode\'] == 201:
                ret[\'retval\'] = json.loads(update_request.text)
            else:
                ret[\'retval\'] = []
                ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
        except:
            ret[\'exitcode\'] = \'\'
            ret[\'retval\'] = []
            ret[\'msg\'] = \'Exception occurred, please double check\'

        return ret

    def update_api(self, prefix, aid=None, params=None):
        ret = {}
        try:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            update_request = requests.put(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params))
            ret[\'exitcode\'] = update_request.status_code
            if ret[\'exitcode\'] == 200:
                ret[\'retval\'] = json.loads(update_request.text)
            else:
                ret[\'retval\'] = []
                ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
        except:
            ret[\'exitcode\'] = \'\'
            ret[\'retval\'] = []
            ret[\'msg\'] = \'Exception occurred, please double check\'

        return ret

    def delete_api(self, prefix, aid=None):
        ret = {}
        try:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            delete_request = requests.delete(url, verify=False, auth=self.auth, headers=self.headers)
            ret[\'exitcode\'] = delete_request.status_code
            if ret[\'exitcode\'] == 200:
                ret[\'retval\'] = json.loads(delete_request.text)
            else:
                ret[\'retval\'] = []
                ret[\'msg\'] = \'Bad request with exit code %s\' % ret[\'exitcode\']
        except:
            ret[\'exitcode\'] = \'\'
            ret[\'retval\'] = []
            ret[\'msg\'] = \'Exception occurred, please double check\'

        return ret
View Code

3. Provision API

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import json
import requests
from libs.utils.tools import read_file_to_json
from libs.utils.config_parsers import get_config

# Suppress SSL warnings generated when contacting Foreman
requests.packages.urllib3.disable_warnings()

config = get_config()


def get_v2api_token(host, uname, passwd, proxy_host, proxy_port):
    ret = {}
    timeout = 10
    base_uri = "https://%s/v2/tokens" % host
    headers = {\'Content-type\': \'application/json\'}
    proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
    json_file = \'%s/etc/token.json\' % config.get(\'global\', \'base_dir\')
    params = read_file_to_json(json_file)
    if uname and passwd:
        params[\'auth\'][\'passwordCredentials\'][\'username\'] = uname
        params[\'auth\'][\'passwordCredentials\'][\'password\'] = passwd
        token_request = requests.post(base_uri, verify=False, headers=headers,
                                      data=json.dumps(params),
                                      timeout=timeout,
                                      proxies=proxy)
        ret[\'exitcode\'] = token_request.status_code
        if ret[\'exitcode\'] == 200:
            ret[\'retval\'] = json.loads(token_request.text)
            ret[\'token\'] = ret[\'retval\'][\'access\'][\'token\'][\'id\']
            ret[\'tenant\'] = ret[\'retval\'][\'access\'][\'token\'][\'tenant\'][\'id\']
        else:
            ret[\'retval\'] = []
            ret[\'token\'] = \'\'
            ret[\'tenant\'] = \'\'
    else:
        ret[\'exitcode\'] = 400
        ret[\'retval\'] = []
        ret[\'token\'] = \'\'
        ret[\'tenant\'] = \'\'

    return ret


def get_keystone_token(host, port, proxy_host, proxy_port):
    ret = {}
    timeout = 20
    base_uri = "https://%s:%s/v3/auth/tokens" % (host, port)
    headers = {\'Content-type\': \'application/json\'}
    proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
    json_file = \'%s/etc/keystone.json\' % config.get(\'global\', \'base_dir\')
    params = read_file_to_json(json_file)
    # token_request = requests.post(base_uri, verify=False, headers=headers, data=json.dumps(params), timeout=timeout, proxies=proxy)
    token_request = requests.post(base_uri, verify=False, headers=headers, data=json.dumps(params), timeout=timeout)
    ret[\'exitcode\'] = token_request.status_code
    if ret[\'exitcode\'] == 201:
        ret[\'retval\'] = token_request.headers
        token_string = ret[\'retval\'][\'X-Subject-Token\']
    else:
        token_string = \'\'
        ret[\'retval\'] = []
        ret[\'exitcode\'] = 400

    return token_string


class EsAPI(object):
    def __init__(self, host, token, proxy_host, proxy_port):
        self.base_uri = "https://%s/v2" % host
        self.headers = {\'Content-type\': \'application/json\', \'X-Auth-Token\': token}
        self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
        self.timeout = 30

    def query_api(self, prefix, aid=None):
        ret = {}
        if prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
                                         proxies=self.proxy)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
                                         proxies=self.proxy)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text

        return ret

    def search_api(self, prefix, aid=None, subitem=None):
        ret = {}
        if prefix and aid and subitem:
            url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
                                         proxies=self.proxy)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text
        elif prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
                                         proxies=self.proxy)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
                                         proxies=self.proxy)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text

        return ret

    def create_api(self, prefix, params=None):
        ret = {}
        url = \'%s/%s\' % (self.base_uri, prefix)
        update_request = requests.post(url, verify=False, headers=self.headers, data=json.dumps(params),
                                       proxies=self.proxy)
        ret[\'exitcode\'] = update_request.status_code
        if update_request.status_code == 200:
            ret[\'retval\'] = json.loads(update_request.text)
        else:
            ret[\'retval\'] = update_request.text

        return ret

    def update_api(self, prefix, aid=None, params=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        update_request = requests.put(url, verify=False, headers=self.headers, data=json.dumps(params),
                                      proxies=self.proxy)
        ret[\'exitcode\'] = update_request.status_code
        # ret[\'retval\'] = json.loads(update_request.text)
        return ret

    def delete_api(self, prefix, aid=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        delete_request = requests.delete(url, verify=False, headers=self.headers, proxies=self.proxy)
        ret[\'exitcode\'] = delete_request.status_code
        if delete_request.status_code == 200:
            ret[\'retval\'] = json.loads(delete_request.text)
        else:
            ret[\'retval\'] = delete_request.text

        return ret


class GlanceAPI(object):

    def __init__(self, host, port, token, proxy_host, proxy_port):
        self.base_uri = "https://%s:%s/v2" % (host, port)
        self.headers = {\'Content-type\': \'application/json\', \'X-Auth-Token\': token}
        self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
        self.timeout = 30

    def query_api(self, prefix, aid=None):
        ret = {}
        if prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
                                         proxies=self.proxy)
            # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
            # proxies=self.proxy)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text

        return ret

    def search_api(self, prefix, aid=None, subitem=None):
        ret = {}
        if prefix and aid and subitem:
            url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem)
            # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
            #                              proxies=self.proxy)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text
        elif prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
            #                              proxies=self.proxy)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            # query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout,
            #                              proxies=self.proxy)
            query_request = requests.get(url, verify=False, headers=self.headers, timeout=self.timeout)
            ret[\'exitcode\'] = query_request.status_code
            if query_request.status_code == 200:
                ret[\'retval\'] = json.loads(query_request.text)
            else:
                ret[\'retval\'] = query_request.text
        return ret

    def create_api(self, prefix, params=None):
        ret = {}
        url = \'%s/%s\' % (self.base_uri, prefix)
        # update_request = requests.post(url, verify=False, headers=self.headers, data=json.dumps(params),
        #                                proxies=self.proxy)
        update_request = requests.post(url, verify=False, headers=self.headers, data=json.dumps(params))
        ret[\'exitcode\'] = update_request.status_code
        if update_request.status_code == 200:
            ret[\'retval\'] = json.loads(update_request.text)
        else:
            ret[\'retval\'] = update_request.text

        return ret

    def update_api(self, prefix, aid=None, params=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        # update_request = requests.put(url, verify=False, headers=self.headers, data=json.dumps(params),
        #                               proxies=self.proxy)
        update_request = requests.put(url, verify=False, headers=self.headers, data=json.dumps(params))
        ret[\'exitcode\'] = update_request.status_code
        # ret[\'retval\'] = json.loads(update_request.text)

        return ret

    def delete_api(self, prefix, aid=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        # delete_request = requests.delete(url, verify=False, headers=self.headers, proxies=self.proxy)
        delete_request = requests.delete(url, verify=False, headers=self.headers)
        ret[\'exitcode\'] = delete_request.status_code
        if delete_request.status_code == 200:
            ret[\'retval\'] = json.loads(delete_request.text)
        else:
            ret[\'retval\'] = delete_request.text

        return ret
View Code

4. Qualys API

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import json
import requests
import xml.etree.ElementTree as et

# Suppress SSL warnings generated when contacting Foreman
requests.packages.urllib3.disable_warnings()


class QualysAPI(object):

    def __init__(self, host, username, password, proxy_host, proxy_port):
        self.base_uri = "https://%s/api/2.0/fo" % host
        self.headers = {\'X-Requested-With\': \'VPC Qualys API\'}
        self.auth = (username, password)
        self.username = username
        self.password = password
        self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
        self.timeout = 30
        self.session = requests.Session()

    # Logon the Qualys API server
    def login(self):
        retdata = dict()
        payload = {
            \'action\': \'login\',
            \'username\': self.username,
            \'password\': self.password
        }
        url = \'%s/%s\' % (self.base_uri, \'session/\')
        print(\'Web URI: %s\' % url)
        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            # r = self.session.post(url, headers=self.headers, data=payload)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                xml_ret = et.fromstring(r.text)
                for elem in xml_ret.findall(\'.//TEXT\'):
                    retdata[\'retval\'] = elem.text
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata

    # Logout the Qualys API server once task done
    def logout(self):
        retdata = dict()
        payload = {
            \'action\': \'logout\'
        }
        url = \'%s/%s\' % (self.base_uri, \'session/\')
        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            # r = self.session.post(url, headers=self.headers, data=payload)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                xml_ret = et.fromstring(r.text)
                for elem in xml_ret.findall(\'.//TEXT\'):
                    retdata[\'retval\'] = elem.text
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        self.session.close()

        print(retdata)
        return retdata

    # Trigger the Vulnerability and Compliance Scan
    def launch_scan(self, action, ip_address, scan_title, option_id, iscanner_name):
        retdata = dict()
        payload = {
            \'action\': \'launch\',
            \'ip\': ip_address,
            \'iscanner_name\': iscanner_name,
            \'option_id\': option_id,
            \'scan_title\': scan_title,
        }

        if action == \'compliance\':
            url = \'%s/%s\' % (self.base_uri, \'scan/compliance/\')
        else:
            url = \'%s/%s\' % (self.base_uri, \'scan/\')

        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                xml_ret = et.fromstring(r.text)
                for elem in xml_ret.findall(\'.//ITEM\'):
                    if elem[0].text == \'REFERENCE\':
                        retdata[\'retval\'] = elem[1].text
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata

    # Query Security scan status
    def query_scan_status(self, action, scan_ref):
        retdata = dict()
        payload = {
            \'action\': \'list\',
            \'scan_ref\': scan_ref,
        }

        if action == \'compliance\':
            url = \'%s/%s\' % (self.base_uri, \'scan/compliance/\')
        else:
            url = \'%s/%s\' % (self.base_uri, \'scan/\')

        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                xml_ret = et.fromstring(r.text)
                for elem in xml_ret.findall(\'.//STATUS\'):
                    status = elem[0].text
                    retdata[\'retval\'] = status
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata

    # Download Vulnerability Scan Report
    def download_vul_report(self, scan_ref, filename):
        retdata = dict()
        payload = {
            \'action\': \'fetch\',
            \'scan_ref\': scan_ref,
            \'output_format\': \'csv_extended\'
        }
        url = \'%s/%s\' % (self.base_uri, \'scan/\')
        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                with open(filename, "wb") as report:
                    report.write(r.content)

                retdata[\'retval\'] = \'Download Vulnerability Scan Report Success\'
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata

    # Parse Compliance Policy
    def parse_policy(self, os_type):
        retdata = dict()
        payload = {
            \'action\': \'list\',
        }
        url = \'%s/%s\' % (self.base_uri, \'compliance/policy/\')
        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                xml_ret = et.fromstring(r.text)
                for elem in xml_ret.findall(\'.//POLICY\'):
                    if \'[+ PROD]\' in elem[1].text and \'DXC Baseline %s\' % os_type in elem[1].text:
                        policy_id = elem[0].text
                        retdata[\'retval\'] = policy_id
                # else:
                #     retdata[\'retval\'] = \'\'
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata

    # Launch Compliance Scan Report
    def launch_report(self, template_id, report_title, ip_address, policy_id):
        retdata = dict()
        payload = {
            \'action\': \'launch\',
            \'report_type\': \'Policy\',
            \'template_id\': template_id,
            \'output_format\': \'csv\',
            \'report_title\': report_title,
            \'policy_id\': policy_id,
            \'ips\': ip_address
        }
        url = \'%s/%s\' % (self.base_uri, \'report/\')
        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                xml_ret = et.fromstring(r.text)
                for elem in xml_ret.findall(\'.//ITEM\'):
                    if elem[0].text == \'ID\':
                        report_id = elem[1].text
                        retdata[\'retval\'] = report_id
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata

    # Query Compliance Report Status
    def query_report_status(self, report_id):
        retdata = dict()
        payload = {
            \'action\': \'list\',
            \'id\': report_id,
        }
        url = \'%s/%s\' % (self.base_uri, \'report/\')
        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                xml_ret = et.fromstring(r.text)
                for elem in xml_ret.findall(\'.//STATUS\'):
                    status = elem[0].text
                    retdata[\'retval\'] = status
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata

    # Download Compliance Report
    def download_report(self, report_id, filename):
        retdata = dict()
        payload = {
            \'action\': \'fetch\',
            \'id\': report_id,
        }
        url = \'%s/%s\' % (self.base_uri, \'report/\')
        try:
            r = self.session.post(url, headers=self.headers, data=payload, proxies=self.proxy)
            retdata[\'exitcode\'] = r.status_code
            if r.status_code == 200:
                with open(filename, "wb") as report:
                    report.write(r.content)
                retdata[\'retval\'] = \'Download Compliance Scan Report Success\'
            else:
                retdata[\'retval\'] = r.text
        except Exception as e:
            retdata[\'exitcode\'] = 400
            retdata[\'retval\'] = e

        print(retdata)
        return retdata
View Code

5. Confluence API

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import json
import requests

# Suppress SSL warnings generated when contacting Foreman
requests.packages.urllib3.disable_warnings()
media_space = \'131269718\'

def validate_api(conf_host, uname, passwd, proxy_host, proxy_port):
    ret = {}
    timeout = 10
    headers = {\'Content-type\': \'application/json\'}
    if conf_host and uname and passwd:
        auth = (uname, passwd)
        url = \'https://%s/rest/api/content/%s/child/page\' % (conf_host, media_space)
        if proxy_host and proxy_port:
            proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
            query_request = requests.get(url, verify=False, auth=auth, headers=headers, proxies=proxy, timeout=timeout)
        else:
            query_request = requests.get(url, verify=False, auth=auth, headers=headers, timeout=timeout)

        ret[\'exitcode\'] = query_request.status_code
        if ret[\'exitcode\'] == 200:
            ret[\'retval\'] = json.loads(query_request.text)
        else:
            ret[\'retval\'] = []
    else:
        ret[\'exitcode\'] = 400
        ret[\'retval\'] = []

    return ret


def read_file_to_json(mapping_file):
    try:
        with open(mapping_file) as data_file:
            data = json.load(data_file)
    except Exception as e:
        data = None

    return data


class ConfluenceAPI(object):
    def __init__(self, host, username, password, proxy_host, proxy_port):
        self.base_uri = "https://%s/rest/api" % host
        self.headers = {\'Content-type\': \'application/json\'}
        self.auth = (username, password)
        self.proxy = {\'https\': \'https://{0}:{1}\'.format(proxy_host, proxy_port)}
        self.timeout = 30

    def query_api(self, prefix, aid=None):
        ret = {}
        if prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            try:
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                             timeout=self.timeout)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = query_request.text
            except Exception as e:
                ret[\'retval\'] = str(e)
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            try:
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                             timeout=self.timeout)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = query_request.text
            except Exception as e:
                ret[\'retval\'] = str(e)

        return ret

    def search_api(self, prefix, aid=None, subitem=None):
        ret = {}
        if prefix and aid and subitem:
            url = \'%s/%s/%s/%s\' % (self.base_uri, prefix, aid, subitem)
            try:
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                             timeout=self.timeout)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = query_request.text
            except Exception as e:
                ret[\'retval\'] = str(e)
        elif prefix and aid:
            url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
            try:
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                             timeout=self.timeout)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = query_request.text
            except Exception as e:
                ret[\'retval\'] = str(e)
        else:
            url = \'%s/%s\' % (self.base_uri, prefix)
            try:
                query_request = requests.get(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy,
                                             timeout=self.timeout)
                ret[\'exitcode\'] = query_request.status_code
                if ret[\'exitcode\'] == 200:
                    ret[\'retval\'] = json.loads(query_request.text)
                else:
                    ret[\'retval\'] = query_request.text
            except Exception as e:
                ret[\'retval\'] = str(e)
        return ret

    def create_api(self, prefix, params=None):
        ret = {}
        url = \'%s/%s\' % (self.base_uri, prefix)
        try:
            update_request = requests.post(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params),
                                           proxies=self.proxy)
            ret[\'exitcode\'] = update_request.status_code
            if update_request.text == \'\' or update_request.text is None:
                ret[\'retval\'] = update_request.text
            else:
                ret[\'retval\'] = json.loads(update_request.text)
        except Exception as e:
            ret[\'retval\'] = str(e)

        return ret

    def update_api(self, prefix, aid=None, params=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        try:
            update_request = requests.put(url, verify=False, auth=self.auth, headers=self.headers, data=json.dumps(params),
                                          proxies=self.proxy)
            ret[\'exitcode\'] = update_request.status_code
            ret[\'retval\'] = json.loads(update_request.text)
        except Exception as e:
            ret[\'retval\'] = str(e)

        return ret

    def delete_api(self, prefix, aid=None):
        ret = {}
        url = \'%s/%s/%s\' % (self.base_uri, prefix, aid)
        try:
            delete_request = requests.delete(url, verify=False, auth=self.auth, headers=self.headers, proxies=self.proxy)
            ret[\'exitcode\'] = delete_request.status_code
            ret[\'retval\'] = json.loads(delete_request.text)
        except Exception as e:
            ret[\'retval\'] = str(e)

        return ret


if __name__ == \'__main__\':
    proxy_host = \'x.x.x.x\'
    proxy_port = \'8080\'
    conf_host = \'confluence.xxx.com\'
    conf_uname = \'uname\'
    conf_pass = \'pass\'
    data = validate_api(conf_host, conf_uname, conf_pass, proxy_host, proxy_port)
    print(data)

    print(\'=================================\')
    conf_api = ConfluenceAPI(conf_host, conf_uname, conf_pass, proxy_host, proxy_port)
    get_data = conf_api.query_api(\'content\', \'15234\')
    print(get_data)
View Code

二、API编写

分类:

技术点:

相关文章: