sss4

前言

本文主要记录了一些比较实用的运维自动化小工具

  

Oauth2.0认证(第三方授权登录)

 

1.Oauth2.0协议是什么?

我去逛慕课网 如果不注册账号,我还可以用QQ登录,我在慕课网点击QQ登录,然后页面弹出QQ的授权界面,我点击授权慕课网就可以拿到我的个人信息,头像信息,完成快速登录;省去了用户注册、输入密码的时间;

 

 Oauth2.0验证流程分析

 

 

def get_auth_url():
    weibo_authurl=\'https://api.weibo.com/oauth2/authorize\'
    client_id=439057412
    redirect_uri=\'http://127.0.0.01:8001/comepele/weibo/\'
    auth_url=\'%s?client_id=%s&redirect_uri=%s\'%(weibo_authurl,client_id,redirect_uri)
    return auth_url


print(get_auth_url())
我的web应用授权url
def weibo_auth (request):  #1.用户访问进入该视图https://api.weibo.com/oauth2/authorize?client_id=439057412&redirect_uri=http://127.0.0.01:8001/comepele/weibo/
    code=request.GET.get(\'code\')
    if code:
        access_tocken_url=\'https://api.weibo.com/oauth2/access_token\'
        data={"client_id":439057412,
              "client_secret":\'d2b81e254e0dbf705f53d98ee5fe0fc9\',
              \'grant_type\':\'authorization_code\',
              \'code\':code,
               \'redirect_uri\':\'http://127.0.0.01:8001/comepele/weibo/\'
               }

        #{\'access_token\': \'2.009tC8pF0KtOiT2e83c56652wB8j3B\', \'remind_in\': \'157679999\', \'expires_in\': 157679999, \'uid\': \'5340703278\', \'isRealName\': \'true\'}
        response_info=requests.post(url=access_tocken_url,data=data)
        access_token=json.loads(response_info.text).get(\'access_token\')
        uid=json.loads(response_info.text).get(\'uid\')
        #https://api.weibo.com/2/users/show.json #微博获取用户信息接口
        get_user_info_api=\'https://api.weibo.com/2/users/show.json?access_token={0}&uid={1}\'.format(access_token,uid)
        response_user_info=requests.get(url=get_user_info_api)
        print(response_user_info.text)




    return HttpResponse(\'OK\')
python实现Oauth2.0协议
import requests
import json,datetime

def session_permission(request,obj,models,init_permission):
    if obj:
        username=obj.username
        request.session[\'username\'] = username
        user_obj = models.UserInfo.objects.filter(username=username).first()
        imgsrc = str(user_obj.image)
        request.session[\'Avatar\'] = imgsrc

        work_order_seach = Q(Q(initiator__icontains=username) | Q(agent__icontains=username))
        ret = models.Worker_order.objects.filter(work_order_seach).count()
        request.session[\'Wcount\'] = ret  # 和当前用户相关的工单数量

        work_order_seach1 = Q(Q(status=0) & Q(agent__icontains=username))
        ret1 = models.Worker_order.objects.filter(work_order_seach1).values_list("seach", "alarm_time", "initiator",
                                                                                 "title", "agent", "responsible_person",
                                                                                 "status", "fault_category",
                                                                                 )
        request.session[\'Wuntreated_count\'] = ret1.count()  # 当前用户待处理工单数量

        enddate_list = models.Worker_order.objects.filter(agent=username).filter(level__in=[1, 2]).values_list(
            \'enddate\')
        time_out_count = 0
        for date in enddate_list:
            enddate = datetime.datetime.strptime(date[0], \'%Y年%m月%d日%H时%M分%S秒\')
            if enddate < datetime.datetime.now():
                time_out_count += 1
        else:
            request.session[\'Work_list_timeout_Count\'] = time_out_count  # 当前用户超时工单数量

        department_obj = obj.department.all().first()
        # print(department_obj.title)
        ret2 = models.ZabbixMsg.objects.filter(department=department_obj.title).all()

        request.session[\'ZabbixMsg_Count\'] = ret2.count()  # 当前用户部门的zabbix MSG的总数量
        request.session[\'ZabbixMsg_Off-line_Count\'] = ret2.filter(available=0).count()  # 当前用户部门的zabbix MSG的离线数量

        if department_obj.title == \'OPS\':
            ret2 = models.ZabbixMsg.objects.all().count()
            request.session[\'ZabbixMsg_Count\'] = ret2

            ret3 = models.ZabbixMsg.objects.all().filter(available=0).count()
            request.session[\'ZabbixMsg_Off-line_Count\'] = ret3

        request.session[\'user_info\'] = {"id": models.UserInfo.objects.get(username=username).id,
                                        "username": username,
                                        "image": models.UserInfo.objects.get(username=username).image.url}
        department_list = [d.title for d in obj.department.all()]
        department = department_list[0]
        request.session[\'departments\'] = department_list
        init_permission(obj.auth, department, request)
        return True
    else:
        return False





#用户首先访问,https://api.weibo.com/oauth2/authorize?client_id=3863976085&redirect_uri=http://127.0.0.1:8000/test/


class Oauth2_App_Info(object):  #生成APP信息
    sina_oauth_setings={
        \'oauth_urls\' :
                 {
                 \'access_tocken_url\': \'https://api.weibo.com/oauth2/access_token\',
                    \'user_info_api\': \'https://api.weibo.com/2/users/show.json\',
                },
        \'oauth_apps\':
                {
                \'/login/\': {\'app_key\':439057412,#439057412
                \'callback_uri\':\'http://172.17.10.112:8001/login/\',
                \'secret_key\':\'d2b81e254e0dbf705f53d98ee5fe0fc9\',
                             },
                \'/oauth2/sina/\':  {\'app_key\':334415239,
                \'callback_uri\':\'http://172.17.10.112:8001/oauth2/sina/\',
                \'secret_key\':\'cfee03f62a9c5ccd4f10620fa06be3c6\',
                             },

                \'/test/\': {\'app_key\':3863976085,
                                  \'callback_uri\': \'http://127.0.0.1:8000/test/\',
                                  \'secret_key\': \'213ebf44c86bc013aba5d3cf2ea47820\',
                                  },

                           }

        }
    def __init__(self,request,oauth2_type=\'sina_oauth_setings\'):
        self.request=request
        self.seting_dict=getattr(self,oauth2_type)
        self.app_info=self.seting_dict[\'oauth_apps\'][self.request.path_info]
        self.oauth_urls_info=self.seting_dict[\'oauth_urls\']
        self.app_key =self.app_info[\'app_key\']
        self.callback_uri = self.app_info[\'callback_uri\']
        self.secret_key = self.app_info[\'secret_key\']
        self.code=self.request.GET.get(\'code\')
        self.access_uri=self.oauth_urls_info[\'access_tocken_url\']
        self.user_info_api = self.oauth_urls_info[\'user_info_api\']



class Oauth2(object): #Oauth逻辑
    def __init__(self,Oauth2_App):
        self.outh2_app=Oauth2_App

    def get_tocken(self):
        data = {"client_id": self.outh2_app.app_key,
                "client_secret": self.outh2_app.secret_key,
                \'grant_type\': \'authorization_code\',
                \'code\':self.outh2_app.code,
                \'redirect_uri\':self.outh2_app.callback_uri
                }
        response_info = requests.post(url=self.outh2_app.access_uri,data=data)
        data= json.loads(response_info.text)
# {\'access_token\': \'2.009tC8pFvgqUNEf9bc678379M6CG3C\', \'remind_in\': \'157679999\', \'expires_in\': 157679999, \'uid\': \'5340703278\', \'isRealName\': \'true\'}
        return data


#{\'id\': 5340703278, \'idstr\': \'5340703278\', \'class\': 1,
    #  \'screen_name\': \'Martin--------\', \'name\': \'Martin--------\', \'province\': \'11\', \'city\': \'1\',
    # \'location\': \'北京 东城区\', \'description\': \'\',
    # \'url\': \'http://www.cnblogs.com/sss4/\',
    #  \'profile_image_url\': \'http://tvax3.sinaimg.cn/crop.0.0.400.400.50/005Pr2Tsly8fwtkd9o55jj30b40b4qbg.jpg\',
    #  \'profile_url\': \'u/5340703278\', \'domain\': \'\', \'weihao\': \'\', \'gender\': \'m\', \'followers_count\': 1, \'friends_count\': 3, \'pagefriends_count\': 0, \'statuses_count\': 0, \'video_status_count\': 0, \'favourites_count\': 0, \'created_at\': \'Mon Oct 20 20:31:53 +0800 2014\', \'following\': False, \'allow_all_act_msg\': False, \'geo_enabled\': True, \'verified\': False, \'verified_type\': -1, \'remark\': \'\', \'insecurity\': {\'sexual_content\': False}, \'ptype\': 0, \'allow_all_comment\': True, \'avatar_large\': \'http://tvax3.sinaimg.cn/crop.0.0.400.400.180/005Pr2Tsly8fwtkd9o55jj30b40b4qbg.jpg\', \'avatar_hd\': \'http://tvax3.sinaimg.cn/crop.0.0.400.400.1024/005Pr2Tsly8fwtkd9o55jj30b40b4qbg.jpg\', \'verified_reason\': \'\', \'verified_trade\': \'\', \'verified_reason_url\': \'\', \'verified_source\': \'\', \'verified_source_url\': \'\', \'follow_me\': False, \'like\': False, \'like_me\': False, \'online_status\': 0, \'bi_followers_count\': 0, \'lang\': \'zh-cn\', \'star\': 0, \'mbtype\': 0, \'mbrank\': 0, \'block_word\': 0,
    # \'block_app\': 0, \'credit_score\': 80, \'user_ability\': 0, \'urank\': 3, \'story_read_state\': -1, \'vclub_member\': 0}
    def get_user_info(self):
        \'\'\'
        user_info_api: \'https://api.weibo.com/2/users/show.json?access_token={0}&uid={1}\'.format(access_token,uid)
        \'\'\'
        tocken_data=self.get_tocken()
        access_token=tocken_data.get(\'access_token\')
        uid=tocken_data.get(\'uid\')
        param=\'?access_token={0}&uid={1}\'.format(access_token,uid)
        user_info_api=self.outh2_app.user_info_api+param
        response_user_info = requests.get(url=user_info_api)
        user_info = json.loads(response_user_info.text)
        return user_info

    def is_grant(self, models):
        sina_uid = self.get_tocken().get(\'uid\')
        obj = models.UserInfo.objects.filter(sina_blog_name=sina_uid).first()
        if obj:
            return obj
        return False

    def grant(self, models):
        curent_sina_user = self.get_user_info()
        sina_uid = curent_sina_user[\'idstr\']
        avatar_url = curent_sina_user[\'profile_image_url\']
        curent_user = self.outh2_app.request.session.get(\'username\')
        obj = models.UserInfo.objects.filter(username=curent_user).first()
        obj.sina_blog_name = sina_uid
        obj.image=avatar_url
        obj.save()
DjangoOauth2

 

1.用户GET 请求访问慕课网,点击了QQ登录,慕课网

2.腾讯QQ认证服务器response用户1个页面,询问用户是否授权?

3.假设用户给予授权,认证服务器将用户导向 客户端事先指定的"重定向URI"(redirection URI),同时附上一个授权码。

4.客户端收到授权码,附上早先的"重定向URI",向认证服务器申请令牌。这一步是在客户端的后台的服务器上完成的,对用户不可见。

5.认证服务器核对了授权码和重定向URI,确认无误后,向客户端发送访问令牌(access token)和更新令牌(refresh token)。

 

 

 

自动化运维工具Fabric

 

Fabric是什么?

Fabric是基于SSH协议和paramiko实现的快速自动化部署的运维工具python第三方模块

我们在大型企业里面需要批量部署一些 web服务、mysql服务....如果是服务器数量少可以1台1台部署,如果成千上万服务器就需要 更加快捷、高效的部署工具了!

 

Fabric的功能?

上传、下载远程服务器文件

执行远程命令

用户提示

错误处理

pip install fabric3  #python3安装fabric

 

使用Fabirc

使用fabric分为2个步骤:

1.创建fabfile.py文件是 fabric在执行时默认读取的文件,所以我们要在这个文件中编写执行的任务;

from fabric.api import *

hosts1 = \'root@172.17.10.112\'
# hosts2 = \'root@172.16.22.1\'

env.hosts=[hosts1]
env.password=\'xxxxxx1234\' #设置密码!

def hello():  #任务1
    run(\'echo hello world\')

def check(): #任务2
    run(\'ls /Users/\')

def remote(): #任务3
    run(\'ping www.baidu.com\')

 

 2.执行fabfile文件中的任务内容

D:\版本\cmdb_rbac_arya>fab -l
Available commands: check hello remote D:\版本\cmdb_rbac_arya
>fab remote [root@172.17.10.112] Executing task \'remote\' [root@172.17.10.112] run: ping www.baidu.com [root@172.17.10.112] out: PING www.a.shifen.com (220.181.112.244) 56(84) bytes of data. [root@172.17.10.112] out: 64 bytes from 220.181.112.244: icmp_seq=1 ttl=52 time=5.41 ms

 

 

Fabric使用参数

指定执行的主机

D:\版本\cmdb_rbac_arya>fab hello -H root@172.17.10.112:22
[root@172.17.10.112:22] Executing task \'hello\'
[root@172.17.10.112:22] run: echo hello world
[root@172.17.10.112:22] out: hello world
[root@172.17.10.112:22] out:


Done.
Disconnecting from root@172.17.10.112... done.

 

指定文件

D:\版本\cmdb_rbac_arya>fab -f zhanggen.py hello -H root@172.17.10.112:22
[root@172.17.10.112:22] Executing task \'hello\'
[root@172.17.10.112:22] run: echo hello world
[root@172.17.10.112:22] out: hello world
[root@172.17.10.112:22] out:


Done.
Disconnecting from root@172.17.10.112... done.

 

fab -p并行执行

[root@cmdb fabric]# fab -l
Available commands:

    hello
[root@cmdb fabric]# fab -p hello
Usage: fab [options] <command>[:arg1,arg2=val2,host=foo,hosts=\'h1;h2\',...] ...

Options:
  -h, --help            show this help message and exit
  -d NAME, --display=NAME
                        print detailed info about command NAME
  -F FORMAT, --list-format=FORMAT
                        formats --list, choices: short, normal, nested
  -I, --initial-password-prompt
                        Force password prompt up-front
  --initial-sudo-password-prompt
                        Force sudo password prompt up-front
  -l, --list            print list of possible commands and exit
  --set=KEY=VALUE,...   comma separated KEY=VALUE pairs to set Fab env vars
  --shortlist           alias for -F short --list
  -V, --version         show program\'s version number and exit
  -a, --no_agent        don\'t use the running SSH agent
  -A, --forward-agent   forward local agent to remote end
  --abort-on-prompts    abort instead of prompting (for password, host, etc)
  -c PATH, --config=PATH
                        specify location of config file to use
  --colorize-errors     Color error output
  -D, --disable-known-hosts
                        do not load user known_hosts file
  -e, --eagerly-disconnect
                        disconnect from hosts as soon as possible
  -f PATH, --fabfile=PATH
                        python module file to import, e.g. \'../other.py\'
  -g HOST, --gateway=HOST
                        gateway host to connect through
  --gss-auth            Use GSS-API authentication
  --gss-deleg           Delegate GSS-API client credentials or not
  --gss-kex             Perform GSS-API Key Exchange and user authentication
  --hide=LEVELS         comma-separated list of output levels to hide
  -H HOSTS, --hosts=HOSTS
                        comma-separated list of hosts to operate on
  -i PATH               path to SSH private key file. May be repeated.
  -k, --no-keys         don\'t load private key files from ~/.ssh/
  --keepalive=N         enables a keepalive every N seconds
  --linewise            print line-by-line instead of byte-by-byte
  -n M, --connection-attempts=M
                        make M attempts to connect before giving up
  --no-pty              do not use pseudo-terminal in run/sudo
  -p PASSWORD, --password=PASSWORD
                        password for use with authentication and/or sudo
  -P, --parallel        default to parallel execution method
  --port=PORT           SSH connection port
  -r, --reject-unknown-hosts
                        reject unknown hosts
  --sudo-password=SUDO_PASSWORD
                        password for use with sudo only
  --system-known-hosts=SYSTEM_KNOWN_HOSTS
                        load system known_hosts file before reading user
                        known_hosts
  -R ROLES, --roles=ROLES
                        comma-separated list of roles to operate on
  -s SHELL, --shell=SHELL
                        specify a new shell, defaults to \'/bin/bash -l -c\'
  --show=LEVELS         comma-separated list of output levels to show
  --skip-bad-hosts      skip over hosts that can\'t be reached
  --skip-unknown-tasks  skip over unknown tasks
  --ssh-config-path=PATH
                        Path to SSH config file
  -t N, --timeout=N     set connection timeout to N seconds
  -T N, --command-timeout=N
                        set remote command timeout to N seconds
  -u USER, --user=USER  username to use when connecting to remote hosts
  -w, --warn-only       warn, instead of abort, when commands fail
  -x HOSTS, --exclude-hosts=HOSTS
                        comma-separated list of hosts to exclude
  -z INT, --pool-size=INT
                        number of concurrent processes to use in parallel mode
[root@cmdb fabric]# fab -P hello
[root@172.17.10.112:22] Executing task \'hello\'
[root@172.17.10.112:22] run: ls
[root@172.17.10.112:22] out: anaconda-ks.cfg         ecdsa-0.11          nohup.out           root                模板
[root@172.17.10.112:22] out: ansible-2.6.2         ecdsa-0.11.tar.gz      paramiko-1.15.1       setuptools-7.0        视频
[root@172.17.10.112:22] out: ansible-2.6.2.tar.gz     install.log          paramiko-1.15.1.tar.gz   setuptools-7.0.tar.gz    图片
[root@172.17.10.112:22] out: ansible_api         install.log.syslog      pycrypto-2.6.1       simplejson-3.6.5        文档
[root@172.17.10.112:22] out: ansible-cmdb-1.17     Jinja2-2.7.3          pycrypto-2.6.1.tar.gz    simplejson-3.6.5.tar.gz  下载
[root@172.17.10.112:22] out: ansible-cmdb-1.17.tar     Jinja2-2.7.3.tar.gz      pycrypto-2.6.1.tar.gz.1  untitled folder        音乐
[root@172.17.10.112:22] out: cmdb_rbac_arya         MarkupSafe-0.9.3      Python-3.6.1           v1.7.2.tar.gz        桌面
[root@172.17.10.112:22] out: cmdb_rbac_arya.4_28.bak  MarkupSafe-0.9.3.tar.gz  Python-3.6.1.tgz       web.sql
[root@172.17.10.112:22] out: cmdb_rbac_arya.51.bak     node-v0.10.30          PyYAML-3.11           zhanggen.txt
[root@172.17.10.112:22] out: cmdb_rbac_arya.bak     node-v0.10.30.tar.gz      PyYAML-3.11.tar.gz       公共的
[root@172.17.10.112:22] out: 


Done.
[root@cmdb fabric]# 

 

 

 

Fabric常用AIP

def hello():  #任务1
    lcd(path=\'/\')                #切换本地目录
    cd(\'/\')                      #切换远程目录
    local(command=\'fuck\')        #执行本地命令
    run(\'echo hello world\')      #执行远程命令
    sudo(\'rm -rf /\')             #执行远程sudo命令

 

Fabric错误处理 try 

判断

def hello():  #任务1
    with cd(\'/sb\'):#当cd(\'/sb\')执行成功之后!
        run(\'ls\')   #执行(\'ls\')

捕捉异常

def hello():  #任务1
    with settings(warn_only=True): #忽略  cd(\'/SB\')
        cd(\'/SB\')
    run(\'ls\')#执行run(\'ls\')

 

Fabric的装饰器

from fabric.api import *

@task
def nginx():
    run(\'hostname\')
nginx.py
from nginx import nginx
#@hosts(hosts1) #指定被装饰的任务函数在哪台服务器上运行
#@parallel          #强制被装饰的函数并行执行(只能在Linux执行)
#@serial             #强制串行执行
#@roles(\'db\')      #指定执行的角色!
@task()              #导入其他模块中的任务(定义新任务)
def hello():  #任务
    run(\'ls\')#执行run(\'ls\')
from fabric.api import *  #加载fabric模块
hosts1 = \'root@172.17.10.112:22\'
hosts2 = \'root@192.168.1.18:22\'

env.hosts=[hosts1,hosts2]
#env.password=\'xxxxxx1234\' #全局设置密码!
env.passwords={
    hosts1:\'xxxxxx1234\',
    hosts2:\'xxxxxx123\'}


## @hosts(hosts1) #指定被装饰的任务函数在哪台服务器上运行
#@parallel      #强制被装饰的函数并行执行(只能在Linux执行)
#@serial         #强制串行执行
#@roles(\'db\')     #指定执行的角色!

          #导入其他模块中的任务(定义新任务)
def hello():  #任务
    run(\'ls\')#执行run(\'ls\')
设置不同密码登录

 

 

 

 

覆盖重写Django的auth_user表

1.用户表

创建Django项目可以把APP全部放在1个文件夹里,这样的目录结构会更加清晰;

 

 

覆盖扩展Django的auth_user表

AUTH_USER_MODEL=\'users.UserProfile\' #需要在setting中重载AUTH_USER_MODE
setings.py
from django.contrib.auth.models import AbstractUser #自定制扩展Django自带表格

class UserProfile(AbstractUser):
    pass
models.py

 

设置media上传

 

image=models.FileField(verbose_name=\'头像\', upload_to=\'upload/avatar/\')
models.py
from django.views.static import serve
urlpatterns = [
    url(r\'^media/(?P<path>.*)$\',serve,{\'document_root\': settings.MEDIA_ROOT}),
]
urls.py
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cmdb_rbac_arya.settings")
django.setup()  # 在Django视图之外,调用Django功能设置环境变量!

from cmdb import models
import xlwt

# 创建一个文件对象
wb = xlwt.Workbook(encoding=\'utf8\')
# 创建一个sheet对象
sheet = wb.add_sheet(\'order-sheet\',cell_overwrite_ok=True)

# 设置文件头的样式,这个不是必须的可以根据自己的需求进行更改
style_heading = xlwt.easyxf("""
        font:
            name Arial,
            colour_index white,
            bold on,
            height 0xA0;
        align:
            wrap off,
            vert center,
            horiz center;
        pattern:
            pattern solid,
            fore-colour 0x19;
        borders:
            left THIN,
            right THIN,
            top THIN,
            bottom THIN;
        """
    )
style_body = xlwt.easyxf("""
    font:
        name Arial,
        bold off,
        height 0XA0;
    align:
        wrap on,
        vert center,
        horiz left;
    borders:
        left THIN,
        right THIN,
        top THIN,
        bottom THIN;
    """
)
style_green = xlwt.easyxf(" pattern: pattern solid,fore-colour 0x11;")
style_red = xlwt.easyxf(" pattern: pattern solid,fore-colour 0x0A;")
fmts = [
    \'M/D/YY\',
    \'D-MMM-YY\',
    \'D-MMM\',
    \'MMM-YY\',
    \'h:mm AM/PM\',
    \'h:mm:ss AM/PM\',
    \'h:mm\',
    \'h:mm:ss\',
    \'M/D/YY h:mm\',
    \'mm:ss\',
    \'[h]:mm:ss\',
    \'mm:ss.0\',
]
style_body.num_format_str = fmts[0]

# 写入文件标题
sheet.write(0,0,\'工单标题\',style_heading)
sheet.write(0,1,\'工单发送方\',style_heading)
sheet.write(0,2,\'工单处理方\',style_heading)
sheet.write(0,3,\'相关责任人\',style_heading)
sheet.write(0,4,\'办理日期\',style_heading)
sheet.write(0,5,\'结束日期\',style_heading)


# 写入数据
data_row = 1
# UserTable.objects.all()这个是查询条件,可以根据自己的实际需求做调整.
for i in models.Worker_order.objects.all() :
    # 格式化datetime
    sheet.write(data_row,0,i.title,style_body)
    sheet.write(data_row,1,i.initiator,style_body)
    sheet.write(data_row,2,i.agent,style_body)
    sheet.write(data_row,3,i.responsible_person,style_body)
    sheet.write(data_row,4,i.startdate,style_body)
    sheet.write(data_row,5,i.enddate,style_body)

# 写出到IO

wb.save(\'张根.xls\')
# 重新定位到开始
使用xlwt导出数据库数据

 

 

 

psutil模块

psutil模块是什么?

psutil可以跨平台获取系统运行的进程和系统利用率(CP、内存、硬盘、网络信息的python扩展库),跨平台这就意味着Windows系统主机的信息也可以被采集到!

一专多能:实现了ps、top、lsof、netstat、ifconfig、who、df、kill、free、nice、ionice、iostat、iotop、uptime、pidof、tty、taskset、pmap等命令工具的功能;

pip install psutil

 

1. 使用用psutil获取系统相关信息

import psutil
#-----------------------获取CPU相关-------------------------------------
print(psutil.cpu_times())               #查看CPU的完整信息,获取单项信息 .user .system .idel.....
print(psutil.cpu_count())               #查看CPU的逻辑个数
print(psutil.cpu_count(logical=False)) #查看CPU的物理个数
print(psutil.cpu_percent(interval=0))   #查看单个CPU的使用率
print(psutil.cpu_percent(percpu=True))  #查看每个CPU的使用率

#------------------------获取内存相关的信息---------------------------------
print(psutil.virtual_memory())          #查看内存的完整信息,获取单项信息 .total .available .percent .used .free
print(psutil.virtual_memory().percent)  #查看内存使用百分比
print(psutil.swap_memory())             #查看swap分区的完整信息sswap(total=16075837440, used=4880826368, free=11195011072, percent=30.4, sin=0, sout=0)

#-----------------------获取硬盘分区相关信息--------------------------------------
print(psutil.disk_usage(\'D:\\\'))         #获取硬盘的使用情况
print(psutil.disk_io_counters(perdisk=True)) #查看硬盘的读写状况

#----------------------获取网络相关信息-----------------------
print(psutil.net_io_counters())           #获取网络的总体信息
print(psutil.net_io_counters(pernic=True))#获取每个网卡的IO信息
print(psutil.net_if_addrs())              #获取网卡相关地址信息
print(psutil.net_connections())            #获取所有接口的连接信息

#-------------------获取系统其他相关信息-------------------------
print(psutil.users())                      #获取当前登录信息
print(datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(\'%Y-%m-%d %H:%M:%S\')) #获取开机时间

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-

try:
    import psutil
except ImportError:
    print(\'错误: psutil模块没有发现!\')
    exit()
import platform
import datetime
import time

def get_osinfo():
    \'\'\'获取操作系统相关信息\'\'\'
    osType = platform.system()
    osVersion = platform.version()
    osArchitecture = platform.architecture()
    hostName = platform.node()
    return osType, osVersion, osArchitecture,hostName

def get_processor():
    \'\'\'获取物理CPU个数\'\'\'
    return psutil.cpu_count(logical=False)


def get_cores():
    \'\'\'获取逻辑CPU个数\'\'\'
    return psutil.cpu_count()


def get_boot_time():
    \'\'\'获取开机时间\'\'\'
    return datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")


def get_disk_root():
    \'\'\'获取根分区磁盘空间\'\'\'
    return psutil.disk_usage(\'D:\')


def get_mem_total():
    \'\'\'获取内存容量\'\'\'
    return psutil.virtual_memory()[0] / 1024 / 1024


def get_mem_free():
    \'\'\'获取可用内存大小\'\'\'
    return psutil.virtual_memory()[4] / 1024 / 1024


def get_key():
    \'\'\'函数获取各网卡发送、接收字节数\'\'\'

    key_info = psutil.net_io_counters(pernic=True).keys()  # 获取网卡名称

    recv = {}
    sent = {}

    for key in key_info:
        recv.setdefault(key, psutil.net_io_counters(
            pernic=True).get(key).bytes_recv)  # 各网卡接收的字节数
        sent.setdefault(key, psutil.net_io_counters(
            pernic=True).get(key).bytes_sent)  # 各网卡发送的字节数

    return key_info, recv, sent


def get_rate(func):
    \'\'\'函数计算每1秒网卡速率\'\'\'
    key_info, old_recv, old_sent = func()  # 上1秒收集的数据

    time.sleep(1)

    key_info, now_recv, now_sent = func()  # 当前所收集的数据

    net_in = {}
    net_out = {}

    for key in key_info:
        net_in.setdefault(key, (now_recv.get(key) - old_recv.get(key)) / 1024)  # 每秒接收速率
        net_out.setdefault(key, (now_sent.get(key) - old_sent.get(key)) / 1024)  # 每秒发送速率

    return key_info, net_in, net_out


def main():
    \'\'\'程序入口函数\'\'\'
    ostype, osversion, osarchitecture,hostname = get_osinfo()

    print(\'操作系统类型:\',ostype)
    print(\'操作系统版本:\', osversion)
    print(\'操作系统位数:\', osarchitecture[0])
    print(\'主机名:\', hostname)
    print(\'物理CPU个数:\', get_processor())
    print(\'逻辑CPU个数:\', get_cores())
    print(\'开机时间:\', get_boot_time())
    print(\'根分区可用空间(单位为MB):\', get_disk_root()[2] / 1024 / 1024)
    print(\'内存总量(单位为MB):\', get_mem_total())
    print(\'可用内存大小(单位为MB):\', get_mem_free())

    i = 0
    while i < 3:                                                            #去获取每秒每块网卡的 速率
        key_info, net_in, net_out = get_rate(get_key)
        for key in key_info:
            print(\'%s\nInput:\t %-5sKB/s\nOutput:\t %-5sKB/s\n\' %
                  (key, net_in.get(key), net_out.get(key)))
        i += 1


if __name__ == \'__main__\':
    main()
获取本机的基本信息和 3秒的网卡速率

 

2.使用psutil管理系统进程

A.process类的作用

依据进程ID来获取单个进程的名称、执行路径、执行状态、系统资源利用率等信息;

查看系统中所有运行的进程

import psutil
pid_list=psutil.pids() #获取当前系统进程ID列表
for i in pid_list:
    p=psutil.Process(i)
    print(\'我是进程:{0},我的进程ID:{1} 俺爹的进程ID:{2}\'.format(p.name(),str(p.pid),str(p.ppid()  )))#获取所有进程的名称

pidList = psutil.pids()
for pid in pidList: #查看系统中进行的相关信息
proc = psutil.Process(pid)
pidDictionary=proc.as_dict(attrs=[\'pid\', \'name\', \'username\', \'exe\', \'create_time\', \'status\', \'num_threads\'])#获取所有进程的相关信息以字典显示
tempText = \'\'
for keys in pidDictionary.keys():
tempText += keys + \':\' + str(pidDictionary[keys]) + \'\n\'
print(tempText)
print(\'*********************\n\')
 

查看单个进程的详细信息

p = psutil.Process(pid=7160)
print(p.exe())           #获取进程的工作目录
print(p.cwd())           #获取进程的工作目录的绝对路径
print(p.cpu_times())     #c查看进程CPU时间信息
print(p.cpu_affinity())  #查看CPU占用情况
print(p.username())            #开启该进程的用户
print(p.num_threads())         #查看进程包含的线程数量
print(p.memory_percent())      #查看进程的内存使用率
print(p.memory_info())         #查看进程rss、vms信息
print(p.connections())         #获取进程的namedutples列表、fs、family、laddr等信息
print(p.io_counters())         #查看进程的IO
print(p.status())              #查看进程状态
print(datetime.datetime.fromtimestamp(p.create_time()).strftime(\'%Y-%m-%d %H:%M:%S\') )#查看进程的创建时间
p.suspend()                    #挂起进程
p.resume()                     #恢复
p.kill()                       #杀死进程 

 

B.使用Popen方法启动进程;

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psutil
from subprocess import PIPE
p=psutil.Popen([\'python\',\'nginx.py\'])  #开启1个进程
p.communicate()                        #打印开启进程的输出

 

 

ipy模块

ipy模块主要对IP地址进制处理和转换;

from IPy import IP
private_ipaddr=IP(\'192.168.0.0/24\')
public_ipaddr=IP(\'123.150.204.166\')
print(public_ipaddr.make_net(\'255.255.0.0\')) #查询IP所在的网段
print(public_ipaddr.reverseNames())       #查看该IP地址的反向解析
print(private_ipaddr.version())          #查看IP地址的版本
print(public_ipaddr.iptype())            #查看是 私网地址?公网地址?
########################ip地址格式转换#####################

print(private_ipaddr.int()) #转换成整型格式 3232235520 print(private_ipaddr.strBin()) #把IP地址转换成二进制格式 11000000101010000000000000000000 print(private_ipaddr.strHex()) #把IP地址转换成16进制格式 0xc0a80000

 

        kafka消息中间件

kafka简介

kafka是一款高性能:以O(1)系统开销下把消息写到磁盘)、高吞吐、分布式(方便架构扩展)的发布订阅消息系统,数据流(区别于redis队列《---------》可以重复消费 消费之后数据不销毁)处理平台;

 

 

 

安装kafka

wget https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
tar -zxf kafka_2.11-0.10.2.0.tgz 
cd kafka_2.11-0.10.2.0/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &           #后台启动zookeeper
nohup bin/kafka-server-start.sh config/server.properties &          #后台启动kafka服
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1              #前台启动生产者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning #前台启动消费者

 

kafka的配置文件

broker.id=0                    #在集群中的唯一标识
num.network.threads=3          #处理忘了请求的最大线程(CPU个数)
num.io.threads=8          #处理磁盘IO的线程数
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs       #日志路径
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168         #保存消息的时间
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181     #集群地址多个,分割
zookeeper.connection.timeout.ms=6000
advertised.host.name=192.168.226.137 #切记不要写0.0.0.0
advertised.listeners=PLAINTEXT://192.168.226.137:9092
 

 

kafka的python API (pykafka)

pip install pykafka

from pykafka import KafkaClient
import json

client = KafkaClient(hosts=\'192.168.226.137:9092\')
print(client.topics)
topic = client.topics[b\'test\']
producer = topic.get_producer(sync=True)        #创建1个生产者
producer.start()
data=json.dumps({"topic0":"Zhang Gen is very handsome."}).encode(\'utf-8\')

with topic.get_sync_producer() as producer:  #在某个topic中创建1个生产者
    producer.produce(message=data)            #生产者 生产消息
    print(\'插入成功\')
pykafka生产者
from pykafka import KafkaClient
import json

client = KafkaClient(hosts="192.168.226.137:9092")
print(client.topics)
topic = client.topics[b\'test\']    #topic名称
consumer = topic.get_simple_consumer()
for record in consumer:
    if record is not None:
        valuestr = record.value.decode()   #从bytes转为string类型
        print(valuestr)
pykafka消费者

 

 

安装logstash

wget https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.zip
unzip logstash-5.5.2.zip
cd logstash-5.5.2/
bin/logstash -e \'input{stdin{}}output{stdout{codec=>rubydebu}\'}

 json格式输出

{
    "@timestamp" => 2018-11-15T03:16:14.366Z,
      "@version" => "1",
          "host" => "localhost.localdomain",
       "message" => ""
}

 

 

Django项目结合极验滑动验证

 

 

简介

下载极验pythonSDK

pip install geetest #安装jeetest包

 

from django.conf.urls import url
from django.contrib import admin
from app01 import views
urlpatterns = [
    url(r\'^admin/\', admin.site.urls),
    url(r\'^login/\', views.login,name=\'login\'),
    url(r\'^index/\', views.index,name=\'index\'),
    url(r\'^pc-geetest/register\', views.get_geetest, name=\'get_geetest\'),#极验获取验证码url
]
url.py
from django.shortcuts import render,HttpResponse,redirect
import json
from app01 import models
from geetest import GeetestLib

pc_geetest_id = "254cb3cd9d98d2635352609157931960"
pc_geetest_key = "2d3dc172309e33998c5eba085be1cf48"

def login(request):
    if request.method==\'GET\':
        return render(request,\'login.html\')
    else:
        response_info={\'status\':200,\'msg\':\'success\'}
        gt = GeetestLib(pc_geetest_id, pc_geetest_key)
        challenge = request.POST.get(gt.FN_CHALLENGE, \'\')
        validate = request.POST.get(gt.FN_VALIDATE, \'\')
        seccode = request.POST.get(gt.FN_SECCODE, \'\')
        status = request.session[gt.GT_STATUS_SESSION_KEY]
        user_id = request.session["user_id"]
        username = request.POST.get("username")
        password = request.POST.get("password")
        if status:
            result = gt.success_validate(challenge, validate, seccode, user_id)
        else:
            result = gt.failback_validate(challenge, validate, seccode)
        if result:
            user_obj=models.User_info.objects.filter(username=username,password=password).first()
            if not user_obj:
                response_info = {\'status\': 404, \'msg\': \'not find it\'}
        print(response_info)
        return HttpResponse(json.dumps(response_info,ensure_ascii=False))


def index(request):
    return render(request,\'index.html\')


def get_geetest(request): #极验验证
    user_id = \'test\'
    gt = GeetestLib(pc_geetest_id, pc_geetest_key)
    status = gt.pre_process(user_id)
    request.session[gt.GT_STATUS_SESSION_KEY] = status
    request.session["user_id"] = user_id
    response_str = gt.get_response_str()
    return HttpResponse(response_str)
views.py
{% load staticfiles %}
<!DOCTYPE html>
<html lang="en" class="no-js">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>绫致-运维管理平台</title>
    <link rel="icon" href="{% static \'img/demo-1-bg.jpg\' %}">

    <link rel="stylesheet" type="text/css" href="{% static \'bootstrap/css/normalize.css\' %}">
    <link rel="stylesheet" type="text/css" href="{% static \'bootstrap/css/demo.css\' %}">
    <!--必要样式-->
    <link rel="stylesheet" type="text/css" href="{% static \'bootstrap/css/component.css\' %}">
    <link rel="stylesheet" href="/static/bootstrap/css/style.css" media="screen" type="text/css"/>
    <script src="http://static.geetest.com/static/tools/gt.js"></script>
    <!--[if IE]>
{#<script src="js/html5.js"></script>#}
{#<script src="{% static \'bootstrap/js/html5.js\' %}"></script>#}

<![endif]-->
    <style>
        .error_div {
            width: 300px;
            height: 20px;
            color: white;
            border-color: #ebccd1;
            margin-left: 10px;

        }

        .error_div1 {
            margin-top: 45px;
            width: 300px;
            height: 20px;
            color: white;
            border-color: #ebccd1;
            margin-left: 10px;

        }

        .error_div2 {
            width: 300px;
            height: 20px;
            color: red;
            border-color: #c9302c;
            margin-left: 10px;
            text-align: center;

        }
    </style>
</head>
<body>
<div class="container demo-1">
    <div class="content">
        <div id="large-header" class="large-header" style="height: 1094px;">
            <canvas id="demo-canvas" width="1360" height="1094"
                    style="background-image:url(\'{{ request.session.url_photo }}\')"></canvas>
            <div class="logo_box">

                <p style="height: 30px"><a href="#"
                                           style="font-size: 40px;vertical-align: 80px;color: yellow;font-family:Microsoft YaHei"><strong>IT运维管理平台</strong></a>
                </p>
                <p style="height: 30px"><a href="#"
                                           style="font-size: 20px;vertical-align: 80px;color: white;font-family: \'Microsoft YaHei\'">IT
                    Management Center</a></p>
                <div class="input_outer">
                    <span class="u_user"></span>
                    <input id=\'username\' name="user" class="text" style="color: #FFFFFF !important" type="text"
                           placeholder="请输入账户">
                </div>
                <div class="input_outer">
                    <span class="us_uer"></span>
                    <input id=\'password\' name="pass" class="text"
                           style="color: #FFFFFF !important; position:absolute; z-index:100;" value="" type="password"
                           placeholder="请输入密码">
                </div>
                {#                <div class="input_outer"  style="width: 200px">#}
                {#                    <span class="us_uer"></span>#}
                {#                    <input id=\'code\' name="code" class="text"#}
                {#                           style="color: #FFFFFF !important; position:absolute; z-index:100;width: 100px" value=""#}
                {#                           type="text" placeholder="请输入验证码">#}
                {#                    <img src="/check_code/" alt="验证码" onclick="changeImg(this)"#}
                {#                         style="position: absolute;top:1px;right: 0;margin-right: -125px">#}
                {#                </div>#}
                <div id="captcha-box">
                    <div id="loading-tip" style="color:red">正在为您火速加载中....</div>
                </div>
                <div id="popup-captcha">
                    <!--存放极验验证码区域!! -->
                </div>

                <div class="mb2" id="login_zone" hidden="hidden">
                    <button id=\'mybtn\' class="act-but submit"
                            style="color: #FFFFFF;width: 330px;border: none;opacity:0.9;font-size: 26px; ">登录
                    </button>
                </div>
                <a title="首次使用该功能需要提前在运维平台做用户认证"
                   href="https://api.weibo.com/oauth2/authorize?client_id=439057412&redirect_uri=http://172.17.10.112:8001/login/">
                    <img src="/static/weibo.png" alt="">
                </a>
            </div>
        </div>
    </div>
</div><!-- /container -->

<script src="{% static "bootstrap/js/TweenLite.min.js" %}"></script>
<script src="{% static "bootstrap/js/EasePack.min.js" %}"></script>
<script src="{% static "bootstrap/js/rAF.js" %}"></script>
<script src="{% static "bootstrap/js/demo-1.js" %}"></script>
<script src="{% static "bootstrap/js/jquery.js" %}"></script>

<script>
    function changeImg(ths) {
        ths.src = ths.src + "?";
    }

    {#    $(\'#mybtn\').click(function () {#}
    {#        $(\'#username\').next().remove();#}
    {#        $(\'#password\').next().remove();#}
    {#        $(\'#code\').siblings(\'.error_div1\').remove();#}
    {#        $(\'#username\').parent().parent().find(".error_div2").remove();#}
    {#        var flag = true;#}
    {#        var user_name = $(\'#username\').val();#}
    {#        var user_pwd = $(\'#password\').val();#}
    {#        var check_code = $(\'#code\').val();#}
    {#        if (flag) {#}
    {#            $.ajaxSetup({#}
    {#                data: {csrfmiddlewaretoken: \'{{ csrf_token }}\'}#}
    {#            });#}
    {#            $.ajax({#}
    {#                url: \'/login/\',#}
    {#                type: \'POST\',#}
    {#                data: {"username": user_name, "password": user_pwd, "code": check_code},#}
    {#                dataType: "JSON",#}
    {#                success: function (data) {#}
    {#                    if (data.status) {#}
    {#                        location.href = \'/login_first/\'#}
    {#                    }#}
    {#    else#}
    {#    {#}
    {#        console.log(data);#}
    {#        $.each(data.error, function (k, v) {#}
    {#            if (k == \'username\') {#}
    {#                var $msg = "<div class=\'error_div\'>" + v[0] + \'</div>\';#}
    {#                $(\'#\' + k).after($msg)#}
    {#            } else if (k == \'password\') {#}
    {#                var $msg = "<div class=\'error_div1\'>" + v[0] + \'</div>\';#}
    {#                $(\'#\' + k).after($msg)#}
    {#            } else if (k == \'code\') {#}
    {#                var $msg = "<div class=\'error_div1\'>" + v[0] + \'</div>\';#}
    {#                $(\'#\' + k).after($msg)#}
    {#            }#}
    {#            else {#}
    {#                var $msg = "<div class=\'error_div2\'>" + v + \'</div>\';#}
    {#                $(\'#username\').parent().before($msg);#}
    {#            }#}
    {##}
    {#        });#}
    {#    }#}
    {#                }#}
    {#            })#}
    {#        }#}
    {##}
    {#    });#}


    var handlerPopup = function (captchaObj) {
        captchaObj.onReady(function () {        //1.极验加载/准备时回调函数
            $("#loading-tip").hide();
            $(\'#login_zone\').show();
        });
        
        captchaObj.onSuccess(function () {   //2.极验加载成功的回调
            $(\'#login_zone\').hide();
            var validate = captchaObj.getValidate();
            $.ajax({                          //3.发送用户输入到后台
                url: "{% url \'login\'%}", 
                type: "post",
                dataType: "json",
                data: {
                    username: $(\'#username\').val(),
                    password: $(\'#password\').val(),
                    csrfmiddlewaretoken: \'{{ csrf_token }}\',
                    geetest_challenge: validate.geetest_challenge,
                    geetest_validate: validate.geetest_validate,
                    geetest_seccode: validate.geetest_seccode
                },
                success: function (data) {
                    if (data.status) {           //4.后端验证成功跳转
                        location.href = \'/login_first/\'
                    }
                    else {
                        $(\'#login_zone\').show();
                        captchaObj.reset();         //4.后端验证失败,从新加载极致验证插件;
                        $(\'#popup-captcha\').hide();
                        $(\'#username\').next().remove();
                        $(\'#password\').next().remove();
                        $(\'#code\').siblings(\'.error_div1\').remove();
                        $(\'#username\').parent().parent().find(".error_div2").remove();
                        $.each(data.error, function (k, v) {
                            if (k == \'username\') {
                                var $msg = "<div class=\'error_div\'>" + v[0] + \'</div>\';
                                $(\'#\' + k).after($msg)
                            } else if (k == \'password\') {
                                var $msg = "<div class=\'error_div1\'>" + v[0] + \'</div>\';
                                $(\'#\' + k).after($msg)
                            } else if (k == \'code\') {
                                var $msg = "<div class=\'error_div1\'>" + v[0] + \'</div>\';
                                $(\'#\' + k).after($msg)
                            }
                            else {
                                var $msg = "<div class=\'error_div2\'>" + v + \'</div>\';
                                $(\'#username\').parent().before($msg);
                            }

                        });
                    }
                }
            });
        });

        $("#mybtn").click(function () {
            captchaObj.appendTo("#popup-captcha");
            $(\'#popup-captcha\').show()
        });
    };
 
    $.ajax({   //0.加载极验插件
        url: "/pc-geetest/register?t=" + (new Date()).getTime(), // 加随机数防止缓存
        type: "get",
        dataType: "json",
        success: function (data) {
            // 使用initGeetest接口
            // 参数1:配置参数
            // 参数2:回调,回调的第一个参数验证码对象,之后可以使用它做appendTo之类的事件
            initGeetest({
                gt: data.gt,
                challenge: data.challenge,
                product: "popup", // 产品形式,包括:float,embed,popup。注意只对PC版验证码有效
                offline: !data.success // 表示用户后台检测极验服务器是否宕机,一般不需要关注
                // 更多配置参数请参见:http://www.geetest.com/install/sections/idx-client-sdk.html#config
            }, handlerPopup);
        }
    });


</script>


</body>
</html>
login.html

 

百度语音提示

安装百度文字装语音模块

pip install baidu-aip 

播放audio的标签

 <embed height="1" width="1" src="/media/audio/{{ request.session.username }}/audio.mp3">
import datetime
import os,time
from aip import AipSpeech
from django.conf import settings
import threading

class Speak_timeout(threading.Thread):
    APP_ID = \'14895770\'
    API_KEY = \'PDpuxPZ3pLo9Lx7wsjFXZlAB\'
    SECRET_KEY = \'1OpDz7xyc6xnWSWUUIGnxT87zEkWtj73\'
    client = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
    member = {\'van\': \'刘浩\', \'markguo\': \'郭光\', \'owen.wu\': \'吴诗萌\',
              \'benson\': \'曹立林\', \'zhanggen\': \'张根\', \'lvhongfang\': \'吕洪芳\'}

    def __init__(self, request, models):
        super(Speak_timeout,self).__init__()
        self.request = request
        self.curent_user = request.session.get(\'username\')  # session
        self.audio_path = os.path.join(settings.MEDIA_ROOT, \'audio\', self.curent_user)
        self.file_name = os.path.join(self.audio_path, \'audio.mp3\')
        self.models = models

    def timeout_count(self):
        temp = self.models.Worker_order.objects.filter(level__in=[1, 2, 3], agent=self.curent_user,
                                                       status__in=[0,1,3])

        if temp:  # 就是有未处理的工单
            self.count = 0
            for w in temp:
                enddate = datetime.datetime.strptime(w.enddate, \'%Y年%m月%d日%H时%M分%S秒\')
                if enddate < datetime.datetime.now():  # 代表:未处理&超时 工单
                    self.count += 1

            if self.count:
                chinese_name = self.member.get(self.curent_user)
                self.curent_user = chinese_name if chinese_name else self.curent_user
                content = \'HI:%s 你有%s条超时工单!\' % (self.curent_user,self.count)
                self.speak(content=content)


        if os.path.exists(self.audio_path) is False:
            os.makedirs(self.audio_path)

        else:
            if self.count==0 and os.path.isfile(self.file_name):
                os.remove(self.file_name)

    def speak(self, content):
        if not os.path.exists(self.audio_path):
            os.makedirs(self.audio_path)
        result = self.client.synthesis(content, \'zh\', 1, {
            \'vol\': 5, \'per\': 4
        })
        # 识别正确返回语音二进制 错误则返回dict 参照下面错误码
        if not isinstance(result,dict):
            with open(self.file_name,\'wb\') as f:
                f.write(result)

    def run(self):
        self.timeout_count()
百度文字转语音
class WorOrder_Middleware(RbacMiddleware,MiddlewareMixin):
    def process_request(self,request):
        if request.path_info not in [\'/login/\',\'/pc-geetest/register\',\'/arya/cmdb/worker_order/handle/\']:
            speak_obj=Speak_timeout(request,models) #工单语言
            speak_obj.start()
Django中间件

 

 

python3调用zabbix3API

zabbix提供着丰富的API功能,但在使用它们的前提是你需要得到1个授权码;

zabbix文档

 

import requests
import json
class Smart_select(object):
    members = {
        \'owen.wu\': {
            "mediatypeid": "10", #原来 \'mediatypeid\': \'5\', \'type\': \'1\', \'description\': \'微信_DB\'
            "sendto": "owen.wu@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        \'zhanggen\': {
            "mediatypeid": "10", #
            "sendto": "zhanggen@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        \'markguo\': {
            "mediatypeid": "10",
            "sendto": "markguo@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        \'lvhongfang\': {
            "mediatypeid": "10",
            "sendto": "lvhongfang@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
    }
    zabbix_api= \'http://10.150.22.211/zabbix/api_jsonrpc.php\'
    headers = {\'content-type\':\'application/json-rpc\'}
    auth_data = json.dumps(
        {"jsonrpc": "2.0",\'method\': \'user.login\', \'params\': {"user": \'admin\', "password": \'123123xxx\'}, "auth": None,
         \'id\': 0})

    def __init__(self,current_user):
        self.current_user=current_user
        self.auth = requests.post(url=self.zabbix_api, headers=self.headers, data=self.auth_data).json()[\'result\']

    def select(self):
        self.updatemedia_data = json.dumps({
            "jsonrpc": "2.0",
            "method": "user.updatemedia",
            "params": {
                "users": [
                    {
                        "userid": "1"  #admin组
                    }
                ],
                "medias":self.members[self.current_user]
            },
            "auth": self.auth,
            "id": 1
        })
        response=requests.post(url=self.zabbix_api,data=self.updatemedia_data,headers=self.headers).json()
        print(response)
        return response
调用zabbix api 每周自动更新admin用户的媒介

 

 

 触发报警

 

 

动作发送给用户

 

用户关联媒介

 zabbix目前使用Admin 用户调用了zabbix报警推送到运维平台  媒介       

Zabbix报警推送到运维平台 markguo@bestseller.com.cn 1-7,00:00-24:00 已启用

 

 

纪念2次zabbix报警无法调用微信接口的问题

多个media媒介同时调用微信接口冲突

解决:zabbix报警流程分析,重新创建报警流程

 新用户--->关联新媒介-->调用不同文件名python脚本 

新动作---->发送新用户

 

 

class Smart_select(object):
    members = {
        \'owen.wu\': {
            "mediatypeid": "13",  # 原来 \'mediatypeid\': \'5\', \'type\': \'1\', \'description\': \'微信_DB\'
            "sendto": "owen.wu@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        \'zhanggen\': {
            "mediatypeid": "13",  #
            "sendto": "zhanggen@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        \'markguo\': {
            "mediatypeid": "13",
            "sendto": "markguo@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        \'lvhongfang\': {
            "mediatypeid": "13",
            "sendto": "lvhongfang@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
    }
    zabbix_api = \'http://10.150.22.211/zabbix/api_jsonrpc.php\'
    headers = {\'content-type\': \'application/json-rpc\'}
    auth_data = json.dumps(
        {"jsonrpc": "2.0", \'method\': \'user.login\', \'params\': {"user": \'admin\', "password": \'123123xxx\'}, "auth": None,
         \'id\': 0})

    def __init__(self, current_user):
        self.current_user = current_user
        self.auth = requests.post(url=self.zabbix_api, headers=self.headers, data=self.auth_data).json()[\'result\']

    def select(self):
        self.updatemedia_data = json.dumps({
            "jsonrpc": "2.0",
            "method": "user.updatemedia",
            "params": {
                "users": [
                    {
                        "userid": "21"  # admin组
                    }
                ],
                "medias": self.members[self.current_user]
            },
            "auth": self.auth,
            "id": 1
        })
        response = requests.post(url=self.zabbix_api, data=self.updatemedia_data, headers=self.headers).json()[\'result\']
        return response

    def get_media_type(self):
        self.get_media_type_data = json.dumps({
            "jsonrpc": "2.0",
            "method":"mediatype.get",
            "params": {
                "output": "extend"
            },
            "auth": self.auth,
            "id": 1
        })
        response = requests.get(url=self.zabbix_api, data=self.get_media_type_data, headers=self.headers).json()[\'result\']
        for i in response:
            print(i)
        return response
    def get_users(self):
        self.get_user_data = json.dumps({
            "jsonrpc": "2.0",
            "method": "user.get",
            "params": {
                "output": "extend"
            },
            "auth": self.auth,
            "id": 1
        })
        response = requests.get(url=self.zabbix_api, data=self.get_user_data, headers=self.headers).json()[\'result\']
        return response

    def get_usermedia(self):
        self.get_user_data = json.dumps(
            {
                "jsonrpc": "2.0",
                "method": "usermedia.get",
                "params": {
                    "output": "extend",
                    "userids": "1"
                },
                "auth": self.auth,
                "id": 1
            }

        )
        response = requests.get(url=self.zabbix_api, data=self.get_user_data, headers=self.headers).json()[\'result\']
        return response
python 调用zabbix api示例代码

 

 

https://blog.csdn.net/weixin_33875839/article/details/88226753

 

 

Django-jet

 

Django-jet的功能就是对Django的 Django-admin后台管理插件进行美化

 

参考博客

GitHub地址

                        yagmail发送邮件

 

yagmail模块是1个极其好用的邮件发送模块

 

0.安装yagmail模块

[root@cmdb /]# pip install yagmail

 

1.配置邮箱

 

 2.设置授权码

 

 

 3.使用yagmail模块

注意 password="授权码”,如果按照以上的步骤操作之后仍然报错

smtplib.SMTPDataError: (554, b\'DT:SPM 163 smtp1,C9GowAD32Bi_sxhcdfRjAw--.682S2 1545122751,please see http://mail.163.com/help/help_spam_16.htm?ip=219.142.140.226&hostid=smtp1&time=1545122751\')

Process finished with exit code 1

就换个IP地址!!

import yagmail

# 连接邮箱服务器
yag = yagmail.SMTP(user="13220198866@163.com", password="步骤2设置的授权码", host="smtp.163.com")

# 邮箱正文
contents = [\'韦哥,我正在用yagmail这个库给你发邮件,感觉贼鸡巴简单\']

# # 发送邮件,给一个人
# yag.send(\'645172205@qq.com\', \'subject\', contents)

#给多个人发送邮件,并发送附件
yag.send([\'645172205@qq.com\', \'454381958@qq.com\'], \'发送附件\', conte

 

python-nmap模块

nmap是一款功能强大网络扫描、嗅探工具,正好用它去自动发现主机,做CMDB信息采集工作;

 

0.Linux安装

 wget http://nmap.org/dist/nmap-6.40-1.x86_64.rpm 
 rpm -Uvh nmap-6.40-1.x86_64.rpm

 

2.pip安装python-nmap模块

pip3.6 install python-nmap

 

3.简单使用

import nmap
nm=nmap.PortScanner()
result=nm.scan(\'192.168.1.0/24\',arguments="-sP").get(\'scan\') # 

for k,v in result.items(): #result返回1个大字典{\'nmap\': \'本次扫描元信息描述\',\'scan\':\'扫描结果\'}
    print(k,v)
\'\'\'
(\'192.168.1.0\',
 {\'nmap\': {\'command_line\': \'nmap -oX - -p 22,80,8888,8080,443 -sS 192.168.1.0\',           #本次扫描元信息描述
           \'scaninfo\': {\'tcp\': {\'method\': \'syn\', \'services\': \'22,80,443,8080,8888\'}},
           \'scanstats\': {\'timestr\': \'Thu Dec 20 09: 49:40 2018\',
           \'elapsed\': \'9.16\',
           \'uphosts\': \'0\',
           \'downhosts\': \'1\',
           \'totalhosts\': \'1\'}
 },
 \'scan\': {}})                                                                        #扫描结果

 

192.168.112.1 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.1\', \'mac\': \'1C:6A:7A:0B:1D:FF\'}, \'vendor\': {\'1C:6A:7A:0B:1D:FF\': \'Cisco Systems\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.2 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.2\', \'mac\': \'00:6B:F1:08:30:65\'}, \'vendor\': {\'00:6B:F1:08:30:65\': \'Cisco Systems\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.4 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.4\', \'mac\': \'90:60:F1:39:DC:03\'}, \'vendor\': {\'90:60:F1:39:DC:03\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.5 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.5\', \'mac\': \'B0:19:C6:A8:25:50\'}, \'vendor\': {\'B0:19:C6:A8:25:50\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.8 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.8\', \'mac\': \'E4:B3:18:39:6C:0A\'}, \'vendor\': {\'E4:B3:18:39:6C:0A\': \'Intel Corporate\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.9 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.9\', \'mac\': \'48:45:20:47:07:66\'}, \'vendor\': {\'48:45:20:47:07:66\': \'Intel Corporate\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.10 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.10\', \'mac\': \'24:F6:77:23:A3:C1\'}, \'vendor\': {\'24:F6:77:23:A3:C1\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.12 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.12\', \'mac\': \'D8:1D:72:81:C3:72\'}, \'vendor\': {\'D8:1D:72:81:C3:72\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.13 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.13\', \'mac\': \'30:B4:9E:C7:63:0B\'}, \'vendor\': {\'30:B4:9E:C7:63:0B\': \'Tp-link Technologies\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.15 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.15\', \'mac\': \'A0:D7:95:62:95:4B\'}, \'vendor\': {\'A0:D7:95:62:95:4B\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.16 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.16\', \'mac\': \'38:53:9C:3B:C0:74\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.21 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.21\', \'mac\': \'3C:2E:F9:88:08:86\'}, \'vendor\': {\'3C:2E:F9:88:08:86\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.22 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.22\', \'mac\': \'20:F7:7C:73:D4:29\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.23 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.23\', \'mac\': \'5C:1D:D9:46:46:88\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.24 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.24\', \'mac\': \'70:77:81:3B:85:51\'}, \'vendor\': {\'70:77:81:3B:85:51\': \'Hon Hai Precision Ind.\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.25 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.25\', \'mac\': \'70:48:0F:4D:53:47\'}, \'vendor\': {\'70:48:0F:4D:53:47\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.26 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.26\', \'mac\': \'90:B0:ED:69:07:94\'}, \'vendor\': {\'90:B0:ED:69:07:94\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.27 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.27\', \'mac\': \'E4:A7:A0:34:E3:9C\'}, \'vendor\': {\'E4:A7:A0:34:E3:9C\': \'Intel Corporate\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.29 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.29\', \'mac\': \'90:DD:5D:82:C5:D9\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.30 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.30\', \'mac\': \'EC:D0:9F:97:D1:4D\'}, \'vendor\': {\'EC:D0:9F:97:D1:4D\': \'Xiaomi Communications\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.31 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.31\', \'mac\': \'38:89:2C:65:26:47\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.32 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.32\', \'mac\': \'24:F0:94:C0:EB:61\'}, \'vendor\': {\'24:F0:94:C0:EB:61\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.33 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.33\', \'mac\': \'04:B1:67:60:79:B0\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.35 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.35\', \'mac\': \'34:AB:37:F1:6F:06\'}, \'vendor\': {\'34:AB:37:F1:6F:06\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.38 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.38\', \'mac\': \'E0:33:8E:AE:60:62\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.40 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.40\', \'mac\': \'38:53:9C:EC:9F:FF\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.42 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.42\', \'mac\': \'90:94:97:16:50:35\'}, \'vendor\': {}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.46 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.46\', \'mac\': \'E4:9A:DC:B6:BF:B8\'}, \'vendor\': {\'E4:9A:DC:B6:BF:B8\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.47 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.47\', \'mac\': \'70:48:0F:37:A0:71\'}, \'vendor\': {\'70:48:0F:37:A0:71\': \'Apple\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.49 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.49\', \'mac\': \'B0:E5:ED:B0:E5:11\'}, \'vendor\': {\'B0:E5:ED:B0:E5:11\': \'Huawei Technologies\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}
192.168.112.50 {\'hostnames\': [{\'name\': \'\', \'type\': \'\'}], \'addresses\': {\'ipv4\': \'192.168.112.50\', \'mac\': \'30:B4:9E:A9:25:D3\'}, \'vendor\': {\'30:B4:9E:A9:25:D3\': \'Tp-link Technologies\'}, \'status\': {\'state\': \'up\', \'reason\': \'arp-response\'}}

4.nmap使用参数

-sP :进行ping扫描 这个命令可以用于探测局域网有哪些机器 打印出对ping扫描做出响应的主机,不做进一步测试(如端口扫描或者操作系统探测):
-sn:  Ping Scan - disable port scan  #ping探测扫描主机,不进行端口扫描 (测试过对方主机把icmp包都丢弃掉,依然能检测到对方开机状态)
-sA (发送tcp的ack包进行探测,可以探测主机是否存活)

 

5.高级用法

-sS :半开放扫描(非3次握手的tcp扫描)

使用频率最高的扫描选项:SYN扫描,又称为半开放扫描,它不打开一个完全的TCP连接,执行得很快,效率高

一个完整的tcp连接需要3次握手,而-sS选项不需要3次握手) 缺点:它需要root/administrator权限执行

 

 sT:3次握手方式tcp的扫描

 

sU:udp端口的扫描

Udp scan(sU) 顾名思义,这种扫描技术用来寻找目标主机打开的UDP端口.它不需要发送任何的SYN包,因为这种技术是针对UDP端口的。UDP扫描发送UDP数据包到目标主机,并等待响应,

 

sW:窗口扫描  

 

sV:版本检测(sV)
版本检测是用来扫描目标主机和端口上运行的软件的版本,如下扫描,多出了ssh的版本信息 

 

更多nmap 参数参考

 

telnetlib模块

 

telnet是Linux中测试远程主机端口是否开启、登录远程主机、交互机的工具,python也是有模块与之对应的,它就是telnetlib模块;

其实telnetlib模块 通过telnet协议登录服务器,然并卵 不是有paramiko模块吗?其实是有telnetlib做一个登录交换机的程序还可以哈哈哈~;

 

我只是用telnetlib模块 过滤主机列表中的服务器IP 是否为Linux服务器?在ssh服务开启在22号端口的前提下;

import  telnetlib
#判定是否为Linux系统(约定开启22号端口)
tn=telnetlib.Telnet(host=\'172.17.10.112\',port=\'22\',timeout=4)
result=tn.read_until(b"\n") #读取截止到 /n的返回结果; b\'SSH-2.0-OpenSSH_5.3\r\n\'
print(result)

 

        pexpect模块      

1.pexpect简介

pexpect是通过

  • 0.启动子进程
  • 1.去连接Linux主机
  • 2.执行shell相关命令
  • 3.通过正则匹配 缓冲区执行结果
  • 4.返回不同响应结果

的方式实现了于Linux主机自动交互的 Python 模块。

 

2.简单使用

#!/usr/bin/python3
import pexpect
#spawn类启动一个子程序,有丰富的方法实现对 子程序的控制
ssh_k=pexpect.spawn(\'ssh root@172.16.22.1 -p22\')#生成子程序,去连接192.168.1.18

#ssh_k.expect(\'[p,P]assword:\')
#expect 已正则表达式的方式:匹配缓冲区执行结果包含password:的内容;
# 正则匹配成功 返回0,正则匹配不成功一直等待返回,直到子程序到达超时时间(30秒);

ret=ssh_k.expect([pexpect.EOF,pexpect.TIMEOUT,\'password\'])
print(ret)
#匹配3种情况:错误输出:pexpect.EOF 超时:pexpect.TIMEOUT 正确:password
#正则匹配pexpect.EOF返回0 ,pexpect.TIMEOUT 返回1 , password成功 返回2,

 

3.向子程序发送指令

send():            发送执行指令时会 自动加回车符 /n
sendline():        发送执行指令 不会自动加回车符
sendcontrol(char):  发送控制符 ctrl c

 

4.自动登录Linux主机并切换到Oracle用户;

import pexpect
import sys
password=\'EC_history&LINGzhi\'
ssh=pexpect.spawn(\'ssh root@10.102.6.38 -p 22\')
status=ssh.expect([\'password:\',\'continue connecting (yes/no)?\',pexpect.TIMEOUT,pexpect.EOF],timeout=50)
if status==0:
    ssh.sendline(password)  #注意输入密码 不要使用send 不需要\n回车!
elif status ==1:
    ssh.send(\'yes\')
    ssh.expect(\'password: \')
    ssh.sendline(password)
elif status==2:
    print(\'贱婢做了好久~做不到\')
elif status==3:
    print(\'贱婢死爹了,您请回!\')
    sys.exit(\'Bye...\')
index=ssh.expect([\'#\',pexpect.EOF,pexpect.TIMEOUT])
if index==0:
    print(\'You were logged in as root.\')
    ssh.sendline(\'su - oracle\')
    ssh.sendline(\'whoami\')
    status=ssh.expect([\'oracle\', pexpect.EOF, pexpect.TIMEOUT])
    if status==0:
        print(\'切换到oracle用户!\')

 

5.pexpect的nteract() 和远程主机交互

import pexpect
import sys
password=\'EC_history&LINGzhi\'
ssh=pexpect.spawn(\'ssh root@10.102.6.38 -p 22\')
status=ssh.expect([\'password:\',\'continue connecting (yes/no)?\',pexpect.TIMEOUT,pexpect.EOF],timeout=50)
if status==0:
    ssh.sendline(password)  #注意输入密码 不要使用send 不需要\n回车!
elif status ==1:
    ssh.send(\'yes\')
    ssh.expect(\'password: \')
    ssh.sendline(password)
elif status==2:
    print(\'贱婢做了好久~做不到\')
elif status==3:
    print(\'贱婢死爹了,您请回!\')
    sys.exit(\'Bye...\')
index=ssh.expect([\'#\',pexpect.EOF,pexpect.TIMEOUT])
if index==0:
    print(\'You were logged in as root.\')
    ssh.interact()  #直接 进入远程主机交互式模式

 

6.python远程登录Linux各模块小结

fabric:方便与shell脚本结合,擅长批量部署,任务管理。

paramiko:方便嵌套系统平台中,擅长远程执行命令,文件传输,自身不执行多进程;

pexpect擅长自动交互,比如ssh、ftp、telnet,原生支持多线程

参考

 

 

ASE加密解密

有时候我们不能把一些敏感信息直接存储到文件或者数据库里,所有就需要加密之后再存储;

ASE对称加密算法

加密/解密双方 约定好 对称的加密/解密规则 (以什么规则加密的 就以什么规则解密)

 

pip install pycryptodome  #安装pycryptodome模块
import base64
from Crypto.Cipher import AES

# str不是16的倍数那就补足为16的倍数
def add_to_16(text):
    while len(text) % 16 != 0:
        text += \'\0\' #\0 就是加1个长度,直到加到长度为16
    return str.encode(text)  # 返回bytes

key = \'woshinibaba!\'  # 密码
text = \'zhanggen\'  # 待加密文本


#对称加密算法:加密/解密双方 约定好 对称的加密/解密规则 (以什么规则加密的 就以什么规则解密)
aes = AES.new(add_to_16(key), AES.MODE_ECB)  # 初始化加密器接收 1个16位长度的字符串
#使用add_to_16  加密
encrypted_text = str(base64.encodebytes(aes.encrypt(add_to_16(text))), encoding=\'utf8\').replace(\'\n\', \'\')
#使用add_to_16  解密
text_decrypted = str(aes.decrypt(base64.decodebytes(bytes(encrypted_text, encoding=\'utf8\'))).rstrip(b\'\0\').decode("utf8"))

print(\'加密值:\', encrypted_text)
print(\'解密值:\', text_decrypted)

在线AES加密解密

 

ssh无密码登录

如果你想不通过密码的方式登录1个Linux主机,A------>B ;

A:本机  B:目标主机

1.本机生成一对公、私钥

ssh-keygen -t rsa            #一组公私钥在~/.ssh/

2.本机将公钥拷贝到 目标主机

 scp ~/.ssh/id_rsa.pub root@192.168.1.18:~/.ssh/

#-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------#

3.目标主机创建authorized_keys文件

cat id_rsa.pub >> ~/.ssh/authorized_keys

4.设置目标主机权限 

sudo chmod 700 ~/.ssh
sudo chmod 700 /home/当前用户
sudo chmod 600 ~/.ssh/authorized_keys

5.目标主机修改sshd服务配置文件

vi /etc/ssh/sshd_config
StrictModes no

6.目标主机重启sshd服务

[root@svnnew .ssh]# /etc/init.d/sshd restart
停止 sshd:                                                [确定]
正在启动 sshd:                                            [确定]

 

 

 

Python的ConfigParser模块

 

 

Parser模块用于读取配置文件内容

from configparser import ConfigParser
\'\'\'
[db]
db_port = 3306
db_user = root
db_host = 127.0.0.1
db_pass = xgmtest

[concurrent]
processor = 20
thread = 10


[zabbix]
host= 3306
port = root
username=admin
password = 127.0.0.1

\'\'\'
cp = ConfigParser()
cp.read("test.config")#读取配置文件
section = cp.sections() #获取所有的secetions         [\'db\', \'concurrent\', \'zabbix\']
items=cp.items(\'zabbix\')#获取当个secetion中的item  [(\'host\', \'3306\'), (\'port\', \'root\'), (\'username\', \'admin\'), (\'password\', \'127.0.0.1\')]
str_port=cp.get(\'zabbix\',"host") #获取item中的值(str类型)   \'3306\'
int_port=cp.get(\'zabbix\',"host")#获取item中的值(int类型)  3306
print(int_port)
from configparser import ConfigParser

 


Python操作crontab

crontab是运维工作里不可缺少的一环,如何通过自动化的方式让它使用起来更加快捷,简单呢?Python中有1个模块对其进行了封装;

pip install python-crontab

 

PS:

当我下载完了python-crontab模块之后,想获取1个job的执行日志,死活获取不到,还好我没有死心,

发现了python-crontab模块源码中的正则表达式 和 我的crontable日志 match不上,竟然少了1个D

MATCHER = r\'(?P<date>\w+ +\d+ +\d\d:\d\d:\d\d) (?P<host>\w+) \' + \
        r\'CRON\[(?P<pid>\d+)\]: \((?P<user>\w+)\) CMD \((?P<cmd>.*)\)\'

                                         查询每个job的日志就是通过这种正则匹配出来的

Jun  6 15:43:01 cmdb CROND[11986]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:44:01 cmdb CROND[11997]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[11998]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[12001]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[11999]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[12000]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:45:01 cmdb CROND[12009]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:46:01 cmdb CROND[12021]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:46:01 cmdb CROND[12022]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:46:01 cmdb CROND[12023]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:46:01 cmdb CROND[12024]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:46:01 cmdb CROND[12025]: (root) CMD (echo date >> ~/time.log # zhanggen)
from crontab import CronTab

cron  = CronTab(user=True,log=\'/var/log/cron\') # 代表 crontab -l 中的所有 job ,log为crontable 的日志路径


##创建job
# job = my_user_cron.new(command=\'echo date >> ~/time.log\')#在crontab -l 列表中创建 job
# job.setall(\'*/2 * * * *\')                                  #设置执行时间
# job.set_comment("zhanggen")                             #设置任务描述
# job.enable()                                            #启用/禁用
# my_user_cron.write()

# #通过_comment、_command、_time 3种形式查找job
# job1=list(cron.find_comment(\'zhanggen\'))[0] #获取1条crontable记录的最近1条日志,注意日志排列为倒序
# job2=list(cron.find_command(\'echo date\'))[0] # matches foobar1
# job3=list(cron.find_time(\'*/2\'))[0]
#
# #查看job的日志记录
# latest_log1=list(job1.log)[1]
# latest_log2=list(job2.log)[1]
# latest_log3=list(job3.log)[1]


#删除job对象
# cron.remove(job)
# cron.remove_all(command= \'echo\')
cron.remove_all(comment=\'zhanggen\')
# cron.remove_all(time=\'*/2\')

#提交执行结果
cron.write() #最后写入才会生效
代码 

                   定时任务框架APScheduler

1.APScheduler结合Django

from django.shortcuts import render,HttpResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.start()
def time_task(**task):
    print(task)
from django_apscheduler import models
from datetime import datetime

register_events(scheduler)

def task(request):
    # obj=scheduler.get_job(job_id=\'zhanggen\')

    #
    # scheduler.remove_job(job_id=\'zhanggen\')#s删除
    scheduler.add_job(name=\'您好\',func=time_task,trigger="interval",seconds=3,id=\'七点\',kwargs={\'name\':\'zhanggen\'})
    scheduler.add_job(name=\'您好\', func=time_task, trigger="interval", seconds=3, id=\'八点\', kwargs={\'name\': \'张根\'})
    # scheduler.add_job(name=\'您好\',id=\'六点\',func=time_task,trigger=\'date\',run_date=datetime(2019,6,27,17,58,5),kwargs={\'name\':\'zhanggen\'})
    objs = models.DjangoJob.objects.filter(name__endswith=\'六点\')
    print(objs)
    # obj = scheduler.get_job(job_id=\'benson\')
    # obj.remove()#删除
    # obj.pause()#暂停
    #obj.resume()#恢复
    return HttpResponse(\'OK\')
views.py

 

 

ldap3

使用ldap用户认证:

0.让管理ldap服务器的运维人员给你开通账号

2.使用Python登录验证

from ldap3 import Server, Connection
def Ldap_auth(username,pasword):
    \'\'\'

    :param username:你的用户名
    :param pasword: 你的密码
    :return: 验证成功:success 验证失败:invalidCredentials
    \'\'\'
    ldaphost = ("ldap://10.10.82.222:10389")
    s = Server(ldaphost)
    conn2 = Connection(s,user=\'uid={0},ou=people,dc=example,dc=com\'.format(username) , password=pasword,check_names=True, lazy=False,raise_exceptions=False)
    conn2.bind()
    return conn2.result["description"]

ret=Ldap_auth(\'zhanggen\',\'zhanggen123.com\')
print(ret)
ldap3验证

 ldap获取所有用户信息

import ldap
def get_userinfo_from_ldap():
    try:
        server = "ldap://10.10.82.222:10389"
        membersDN ="ou=people,dc=example,dc=com"
        groups_DN="ou=groups,dc=example,dc=com"

        conn = ldap.initialize(server)
        conn.set_option(ldap.OPT_REFERRALS, 0)
        conn.protocol_version = ldap.VERSION3
        filterstr = \'(objectClass=organizationalPerson)\'
        # 获取的字段
        attrlist = [\'uid\', \'displayName\']
        conn.search(membersDN, ldap.SCOPE_ONELEVEL,
                    filterstr=filterstr, attrlist=attrlist)
        code, members = conn.result(timeout=3)

        name_map = {}
        for member in members:
            data = member[1]
            uid = data.get(\'uid\', [\'\'])[0].decode(
                encoding=\'utf-8\')
            displayName = data.get(\'displayName\', [\'\'])[
                0].decode(encoding=\'utf-8\')
            if uid:
                name_map[uid] = displayName
        conn.unbind_s()

        return True, name_map
    except ldap.LDAPError as e:

        return False
    pass

print(get_userinfo_from_ldap())
获取所有用户信息

 

 

 调用Ucloud API

import base64
import hashlib
import json
import os
import sys
import urllib.parse
import urllib.request

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import conf


class UCloudHelper():
    def __init__(self):
        pass

    def get_token(self, uhost_id, uhost_name):
        return hashlib.md5((\'%s%s\' % (uhost_id, uhost_name)).encode(\'utf8\')).hexdigest()[:16]

    def __generate_url(self, params):
        # 1. 把排好序的k,v+private key作为明文
        params[\'PublicKey\'] = conf.ucloud_public_key
        s = \'\'.join([\'\'.join((k, params[k])) for k in sorted(params.keys())])
        s = \'%s%s\' % (s, conf.ucloud_private_key)
        # 2. sha1加密
        params[\'Signature\'] = hashlib.sha1(s.encode(\'utf8\')).hexdigest()
        # 3. 请求ucloud
        url = \'https://api.ucloud.cn/?%s\' % urllib.parse.urlencode(params)
        # print(\'params: %s\' % params)
        # print(\'url: %s\' % url)
        return url

    def describe_all_uhosts(self, limit):
        \'\'\'返回一坨实例信息\'\'\'
        params = {
            \'Action\': \'DescribeUHostInstance\',
            \'Region\': conf.ucloud_region,
            \'Tag\': conf.ucloud_tag,
            \'Limit\': str(limit),
        }
        return self.__generate_url(params)

    def describe_uhost_instance(self, uhost_id):
        \'\'\'返回一个实例的信息\'\'\'
        params = {
            \'Action\': \'DescribeUHostInstance\',
            \'Region\': conf.ucloud_region,
            \'Tag\': conf.ucloud_tag,
            \'UHostIds.0\': uhost_id,
        }
        return self.__generate_url(params)

    def describe_image(self):
        \'\'\'返回镜像\'\'\'
        params = {
            \'Action\': \'DescribeImage\',
            \'Region\': \'cn-bj2\',
            # \'Zone\':\'cn-bj2\',
            \'ImageType\': \'Base\'
        }
        return self.__generate_url(params)

    def parse_image(self, result):
        \'\'\'describe_image结果解析\'\'\'
        prefix = \'sensors-analytics-saas-\'
        # prefix = \'sensors-analytics-standalone\'
        ret = []
        for image in result[\'ImageSet\']:
            if image[\'ImageName\'].startswith(prefix):
                image[\'Version\'] = tuple([int(x) for x in image[\'ImageName\'].replace(prefix, \'\').split(\'.\')])
                ret.append(image)
        if not ret:
            raise Exception(\'cannot find %s image: %s!\' % (prefix, str(result)))
        return sorted(ret, key=lambda x: x[\'Version\'])[-1]

    #def create_uhost_instance(self, name, cpu, memory, disk, host_type, image_id,MachineType=False):
    def create_uhost_instance(self, image_id):
        \'\'\'创建一个新的实例\'\'\'
        # image = self.describe_image() -> image[\'ImageId\']
        params = {
            \'Action\': \'CreateUHostInstance\',
            \'Region\': conf.ucloud_region,
            \'Zone\': conf.ucloud_zone,
            \'Tag\': conf.ucloud_tag,
            \'ImageId\': \'uimage-5fhmo0\',#image_id,
            \'LoginMode\': \'Password\',
            \'Password\': base64.b64encode(conf.ucloud_default_password.encode(\'utf8\')).decode(\'utf8\'),
            \'CPU\': str(4),
            \'Memory\': str(1024 * 16),
            \'StorageType\': \'LocalDisk\',
            \'DiskSpace\': str(200),
            \'Name\': \'cloud-TEST\',
            "MachineType": \'O\',
            # \'UHostType\': host_type,
            # \'HostType\': \'N2\',
            \'ChargeType\': \'Month\',
        }
        if image_id==\'111\':
           pass

        print(\'创建主机的参数:\',params,\'\r\n\')
        return self.__generate_url(params)

    def delete_uhost_instance(self, uhost_id):
        \'\'\'关闭实例\'\'\'
        params = {
            \'Action\': \'TerminateUHostInstance\',
            \'UHostId\': uhost_id,
            \'Region\': conf.ucloud_region,
        }
        return self.__generate_url(params)

    def stop_uhost_instance(self, uhost_id):
        \'\'\'暂停实例\'\'\'
        params = {
            \'Action\': \'StopUHostInstance\',
            \'UHostId\': uhost_id,
            \'Region\': conf.ucloud_region,
        }
        return self.__generate_url(params)

    def start_uhost_instance(self, uhost_id):
        \'\'\'启动实例\'\'\'
        params = {
            \'Action\': \'StartUHostInstance\',
            \'UHostId\': uhost_id,
            \'Region\': conf.ucloud_region,
        }
        return self.__generate_url(params)

    def reinstall_uhost_instance(self, image_id, uhost_id, reserve_disk=False):
        \'\'\'重装实例 返回重装后的版本号\'\'\'
        # image = self.describe_image() -> image[\'ImageId\']
        params = {
            \'Action\': \'ReinstallUHostInstance\',
            \'UHostId\': uhost_id,
            \'Region\': conf.ucloud_region,
            \'Password\': base64.b64encode(conf.ucloud_default_password.encode(\'utf8\')).decode(\'utf8\'),
            \'ReserveDisk\': {True: \'Yes\', False: \'No\'}[reserve_disk],
            \'ImageId\': image_id,
        }
        return self.__generate_url(params)
if __name__ == \'__main__\':
    h=UCloudHelper()




\'\'\'
创建主机的参数: {\'Action\': \'CreateUHostInstance\', \'Region\': \'cn-bj2\', \'Zone\': \'cn-bj2-05\', \'Tag\': \'云试用版本\', \'ImageId\': \'uimage-5fhmo0\', \'LoginMode\': \'Password\', \'Password\': \'TWh4ektobDIwMTU=\', \'CPU\': \'4\', \'Memory\': \'16384\', \'StorageType\': \'LocalDisk\', \'DiskSpace\': \'200\', \'Name\': \'cloud-TEST\', \'MachineType\': \'O\', \'ChargeType\': \'Month\'} 

https://api.ucloud.cn/?Action=CreateUHostInstance&Region=cn-bj2&Zone=cn-bj2-05&Tag=%E4%BA%91%E8%AF%95%E7%94%A8%E7%89%88%E6%9C%AC&ImageId=uimage-5fhmo0&LoginMode=Password&Password=TWh4ektobDIwMTU%3D&CPU=4&Memory=16384&StorageType=LocalDisk&DiskSpace=200&Name=cloud-TEST&MachineType=O&ChargeType=Month&PublicKey=ucloudsangwf%40qq.com14284190000002005024391&Signature=ed8dad59f0dbf70bc5c5575854f2a3b77d174d51
{\'RetCode\': 0, \'Action\': \'CreateUHostInstanceResponse\', \'UHostIds\': [\'uhost-snd4rh2n\'], \'IPs\': [\'10.42.175.77\']}

\'\'\'
ucloud_helper.py

 

 

 调用阿里云发短信API

 

from django.shortcuts import render,HttpResponse
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from django.views.decorators.csrf import csrf_exempt
import json

class Ali_SMS_API(object):
    def __init__(self,AccessKeyId,AccessKeySecret,message):
        self.AccessKeyId=AccessKeyId
        self.AccessKeySecret=AccessKeySecret
        self.message=message
        self.client = AcsClient(self.AccessKeyId,self.AccessKeySecret,\'cn-hangzhou\')
        self.request = CommonRequest()
        self.request.set_accept_format(\'json\')
        self.request.set_domain(\'dysmsapi.aliyuncs.com\')
        self.request.set_protocol_type(\'https\')
        self.sign_name=[\'AA\',\'AAAA\' ]
        self.sms_template_id_list = [\'SMS_167195485\', \'SMS_144455004\', \'SMS_142951120\', \'SMS_142946117\', \'SMS_142946048\']


    def AllTemplates(self):#扩展方法1:查询所有短信模板
        self.request.set_method(\'POST\')
        self.request.set_version(\'2017-05-25\')
        self.request.set_action_name(self.message.pop(\'Action\'))
        self.request.add_query_param(\'RegionId\', "cn-hangzhou")
        ret = []
        for i in self.sms_template_id_list:
            self.request.add_query_param(\'TemplateCode\',i)
            response = self.client.do_action(self.request)
            ret.append(json.loads(str(response, encoding=\'utf-8\')))
        return ret

    def is_extended(self):#判断是否为扩展方法
        flag=False
        TemplateCode=self.message.get(\'TemplateCode\')
        if TemplateCode==\'AllTemplates\':
            flag=\'AllTemplates\'
        return flag

    def add_query_parama(self):
        self.request.set_method(\'POST\')
        self.request.set_action_name(self.message.pop(\'Action\'))
        self.request.set_version(\'2017-05-25\')
        for k, v in self.message.items():
            self.request.add_query_param(k,v)
        self.request.add_query_param(\'RegionId\', "cn-hangzhou")

    def get_results(self):
        self.compatible_with_get()
        if self.is_extended():
            return getattr(self,self.is_extended())()
        self.add_query_parama()
        response = self.client.do_action(self.request)
        ret = str(response, encoding=\'utf-8\')
        # print(ret)
        return ret
    def compatible_with_get(self):#兼容Django的get 方法 [值]
        if self.message.get(\'method\')==\'get\':
            self.message={k:v[0]  for k, v in self.message.items()}
            self.message.pop(\'method\')
            self.message.pop(\'token\')


@csrf_exempt
def sms_handler(request):
    ret=\'错误\'
    token=request.GET.get(\'token\')
    if token==\'dfjdyr892jd730t8h\':
        message={"token":token}
        if request.method==\'GET\':#/sd/ali/sms?templates=templates
            message=dict(request.GET)
            message[\'method\']=\'get\'
        if request.method==\'POST\':
            message = json.loads(request.body.decode(\'utf-8\'))
        obj = Ali_SMS_API(\'LTAswUpAAPhT9bim\', \'DS8AmhuUzFY\',message=message)
        ret=obj.get_results()
    return HttpResponse(json.dumps(ret,ensure_ascii=False))

#Sendsorsdata2020
调用阿里云短信接口

 

 python查询目录

使用Python的glob模块可以快速的路径的具体位置。

    def scan_wechat_path(self):
        for disk in psutil.disk_partitions():
            WeChatFiles = glob.glob(r"{0}\\*\\Documents\\WeChat Files".format(disk.device))
            if WeChatFiles:
                self.wechat_path=WeChatFiles[0]
                break
        if self.wechat_path:
            self.scan_path(self.wechat_path)
查询微信缓存所在路径

 

Python操作Word文档

使用Python查询Word文档中的内容时,把Word文档转成Html文档,再使用BeautifulSoup去查询标签是一种不错的方法

import re

from docx import Document

document = Document("./file.docx")

# 1.获取所有段落
all_paragraphs = document.paragraphs
for paragraph in all_paragraphs:
    string_paragraph = paragraph.text
    # 匹配标题
    if re.match(r"^\d+", string_paragraph) and len(string_paragraph) < 10:
        print(paragraph.text)

# 2.按表格读取全部数据
document = Document("./file.docx")
all_tables = document.tables
for table in all_tables:
    for row in table.rows:
        for cell in row.cells:
            print("---->", cell.text)

# 3.把word文档后缀转换.zip结尾
import zipfile

word = zipfile.ZipFile("./file.docx")
xml = word.read("word/document.xml").decode("utf-8")
xml_list = xml.split("<w:t>")
print(xml_list)

# 4.把word文档转换成html文件
from bs4 import BeautifulSoup
from pydocx import PyDocX

html = PyDocX.to_html("file.docx")
soup = BeautifulSoup(html, \'html.parser\')
key_word = "4.2.1.2"
tag2 = soup.find(name=\'h%s\' % (len(key_word.split("."))), text=re.compile(r\'^%s\' % (key_word)))
print(tag2.find_next_sibling(name="table"))

 Word---->table标签----》二维数组

import re

from bs4 import BeautifulSoup
from pydocx import PyDocX


class DocxSearchService(object):
    @classmethod
    def search(cls, html_document, search_type, search_title, first_index, second_index):
        html_document = PyDocX.to_html(html_document)
        soup = BeautifulSoup(html_document, \'html.parser\')
        result = ""
        # 获取h标题标签作为参照点
        h_tag = soup.find(name=\'h%s\' % (len(search_title.split("."))), text=re.compile(r\'^%s\' % (search_title)))
        # 根据h标签,找到P标签并获取其内容
        if search_type == "p":
            result = cls.find_content(h_tag, search_type, first_index, second_index)
        # 根据h标签,找到Table标签并获取其内容
        elif search_type == "table":
            result = cls.find_table(h_tag, search_type, first_index)
        return result

    @classmethod
    def find_content(cls, h_tag, search_type, first_index, second_index):
        p_tags = h_tag.find_next_siblings(name=search_type)[first_index:second_index]
        return [i.text for i in p_tags]

    @classmethod
    def find_table(cls, h_tag, search_type, first_index):
        # 根据H(标题)标签获取全部table标签
        table_tags = h_tag.find_next_siblings(name=search_type)
        # 从所有table标签中获取客户指定的table
        table_tag = table_tags[first_index]
        # 从table表格中提取数据
        table_list = cls.extract_table_data(table_tag)
        # 从table表格中提取单元格合并情况
        rowspan_info, colspan_info = cls.extract_table_span(table_tag)
        # 按列拆分单元格合并
        table_list = cls.align_column(colspan_info, table_list)
        # 按行拆分单元格合并
        table_list = cls.align_row(rowspan_info, table_list)
        # 返回最终数据
        return table_list

    @classmethod
    def extract_table_data(cls, table_tag):
        table_list = []
        all_trs = table_tag.find_all("tr")
        for row in all_trs:
            row_list = [cell.text for cell in row.find_all("td")]
            table_list.append(row_list)
        return table_list

    @classmethod
    def extract_table_span(cls, table_tag):
        rowspan_info = []
        colspan_info = []
        table_trs = table_tag.find_all("tr")
        for i in range(0, len(table_trs)):
            tr_tds = table_trs[i].find_all("td")
            for j in range(0, len(tr_tds)):
                if tr_tds[j].attrs.get("rowspan"):
                    rowspan_info.append([i, j, int(tr_tds[j].attrs.get("rowspan")), tr_tds[j].text])
                if tr_tds[j].attrs.get("colspan"):
                    colspan_info.append([i, j, int(tr_tds[j].attrs.get("colspan")), tr_tds[j].text])
        return rowspan_info, colspan_info

    @classmethod
    def align_head(cls, table_list):
        max_length = max([len(l) for l in table_list])
        header_length = len(table_list[0])
        if header_length < max_length:
            table_list[0].extend([table_list[0][-1] * (max_length - header_length)])

    @classmethod
    def align_column(cls, colspan_info: list, table_list: list):
        for item in colspan_info:
            for i in range(0, item[2] - 1):
                table_list[item[0]].insert(item[1], item[3])
        return table_list

    @classmethod
    def align_row(cls, rowspan_info: list, table_list: list):
        for item in rowspan_info:
            for row in range(item[0] + 1, item[0] + item[2]):
                table_list[row].insert(item[1], item[3])
        return table_list
docx_service.py

 

 

 

 
 

 

分类:

技术点:

相关文章: