一、说明

此客户端使用python3编写

此客户端实现RTSP的OPTIONS, DESCRIBE, SETUP , PLAY, GET_PARAMETER,TEARDOWN方法,未实现ANNOUNCE,PAUSE,SET_PARAMETER,REDIRECT,RECORD

RTSP就是针对一个URL,或是依次或是有选择地执行以上11种请求;要做渗透测试,没很多可测的,也就测试URL中的参数和各请求方法的请求头参数有没有溢出

此客户端自动依次执行请求:OPTIONS--DESCRIBE(2次)--SETUP(2次)--PLAY--GET_PARAMETER(5次)--TEARDOWN

RTSP有Basic和Digest两种验证方法,此客户端实现默认使用Digest方法,如果想换用Basic方法请将config_dict['auth_method']的值由‘Digest’修改为‘Basic’

总体执行效果如下图所示

python3实现的rtsp客户端脚本

 

二、客户端源代码

V1.0版本(20180525):

import base64
import socket
import hashlib
import time

global config_dict
config_dict = {
    'server_username': 'admin',
    'server_password': '1234567',
    'server_ip': '10.10.6.94',
    'server_port': 554,
    'server_path': '/chIP=1&streamType=main/',
    'cseq': 2,
    'user_agent': 'LibVLC/3.0.2 (LIVE555 Streaming Media v2016.11.28)',
    'buffer_len': 1024,
    'auth_method': 'Digest',
    'header_modify_allow': False,
    'options_header_modify': False,
    'describe_header_modify': False,
    'describe_auth_header_modify': False,
    'setup_header_modify': False,
    'setup_session_header_modify': False,
    'play_header_modify': False,
    'get_parameter_header_modify': False,
    'teardown_header_modify': False
}


def gen_response_value(url,public_method,realm,nonce):
    frist_pre_md5_value = hashlib.md5((config_dict['server_username'] + ':' + realm + ':' + config_dict['server_password']).encode()).hexdigest()
    first_post_md5_value = hashlib.md5((public_method+':' + url).encode()).hexdigest()
    response_value = hashlib.md5((frist_pre_md5_value + ':' + nonce + ':' + first_post_md5_value).encode()).hexdigest()
    return response_value


def gen_options_header():
    global config_dict
    str_options_header = 'OPTIONS rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict[
        'server_path'] + ' RTSP/1.0\r\n'
    str_options_header += 'CSeq: ' + str(config_dict['cseq']) + '\r\n'
    str_options_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_options_header += '\r\n'
    return str_options_header


def gen_describe_header():
    global config_dict
    str_describe_header = 'DESCRIBE rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict[
        'server_path'] + ' RTSP/1.0\r\n'
    str_describe_header += 'CSeq: ' + str(config_dict['cseq'] + 1) + '\r\n'
    str_describe_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_describe_header += 'Accept: application/sdp\r\n'
    str_describe_header += '\r\n'
    return str_describe_header


def gen_describe_auth_header(url,realm,nonce):
    global config_dict
    public_method = 'DESCRIBE'
    str_describe_auth_header = 'DESCRIBE rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict['server_path'] + ' RTSP/1.0\r\n'
    str_describe_auth_header += 'CSeq: ' + str(config_dict['cseq'] + 2) + '\r\n'
    if config_dict['auth_method'] == 'Basic':
        auth_64 = base64.b64encode((config_dict['server_username'] + ":" + config_dict['server_password']).encode("utf-8")).decode()
        str_describe_auth_header += 'Authorization: Basic '+auth_64 + ' \r\n'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_describe_auth_header += 'Authorization: Digest username="'+config_dict['server_username']+'", realm="'+realm+'", nonce="'+nonce+'", uri="'+url+'", response="'+response_value+'"\r\n'
    str_describe_auth_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_describe_auth_header += 'Accept: application/sdp\r\n'
    str_describe_auth_header += '\r\n'
    return str_describe_auth_header


def gen_setup_header(url, realm, nonce):
    global config_dict
    public_method = 'SETUP'
    str_setup_header  = 'SETUP rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict['server_path'] + 'trackID=0 RTSP/1.0\r\n'
    str_setup_header += 'CSeq: ' + str(config_dict['cseq'] + 3) + '\r\n'
    if config_dict['auth_method'] == 'Basic':
        auth_64 = base64.b64encode((config_dict['server_username'] + ":" + config_dict['server_password']).encode("utf-8")).decode()
        str_setup_header += 'Authorization: Basic '+auth_64 + ' \r\n'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_setup_header += 'Authorization: Digest username="'+config_dict['server_username']+'", realm="'+realm+'", nonce="'+nonce+'", uri="'+url+'", response="'+response_value+'"\r\n'
    str_setup_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_setup_header += 'Transport: RTP/AVP;unicast;client_port=50166-50167\r\n'
    str_setup_header += '\r\n'
    return str_setup_header


def gen_setup_session_header(url, realm, nonce,session):
    global config_dict
    public_method = 'SETUP'
    str_setup_session_header = 'SETUP rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict[
        'server_path'] + '/trackID=1 RTSP/1.0\r\n'
    str_setup_session_header += 'CSeq: ' + str(config_dict['cseq'] + 4) + '\r\n'
    if config_dict['auth_method'] == 'Basic':
        auth_64 = base64.b64encode((config_dict['server_username'] + ":" + config_dict['server_password']).encode("utf-8")).decode()
        str_setup_session_header += 'Authorization: Basic '+auth_64 + ' \r\n'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_setup_session_header += 'Authorization: Digest username="'+config_dict['server_username']+'", realm="'+realm+'", nonce="'+nonce+'", uri="'+url+'", response="'+response_value+'"\r\n'
    str_setup_session_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_setup_session_header += 'Transport: RTP/AVP;unicast;client_port=50168-50169\r\n'
    str_setup_session_header += 'Session: '+session+'\r\n'
    str_setup_session_header += '\r\n'
    return str_setup_session_header


def gen_play_header(url, realm, nonce,session):
    global config_dict
    public_method = 'PLAY'
    str_play_header = 'PLAY rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict[
        'server_path'] + ' RTSP/1.0\r\n'
    str_play_header += 'CSeq: ' + str(config_dict['cseq'] + 5) + '\r\n'
    if config_dict['auth_method'] == 'Basic':
        auth_64 = base64.b64encode((config_dict['server_username'] + ":" + config_dict['server_password']).encode("utf-8")).decode()
        str_play_header += 'Authorization: Basic '+auth_64 + ' \r\n'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_play_header += 'Authorization: Digest username="'+config_dict['server_username']+'", realm="'+realm+'", nonce="'+nonce+'", uri="'+url+'", response="'+response_value+'"\r\n'
    str_play_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_play_header += 'Session: '+session+'\r\n'
    str_play_header += 'Range: npt=0.000-\r\n'
    str_play_header += '\r\n'
    return str_play_header


def gen_get_parameter_header(url, realm, nonce, session,count):
    global config_dict
    public_method = 'GET_PARAMETER'
    str_get_parameter_header = 'GET_PARAMETER rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict[
        'server_path'] + ' RTSP/1.0\r\n'
    str_get_parameter_header += 'CSeq: ' + str(config_dict['cseq'] + 6+int(count)) + '\r\n'
    if config_dict['auth_method'] == 'Basic':
        auth_64 = base64.b64encode((config_dict['server_username'] + ":" + config_dict['server_password']).encode("utf-8")).decode()
        str_get_parameter_header += 'Authorization: Basic '+auth_64 + ' \r\n'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_get_parameter_header += 'Authorization: Digest username="' + config_dict[
        'server_username'] + '", realm="' + realm + '", nonce="' + nonce + '", uri="' + url + '", response="' + response_value + '"\r\n'
    str_get_parameter_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_get_parameter_header += 'Session: ' + session + '\r\n'
    str_get_parameter_header += '\r\n'
    return str_get_parameter_header


def gen_teardown_header(url, realm, nonce, session):
    global config_dict
    public_method = 'TEARDOWN'
    str_teardown_header = 'TEARDOWN rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict['server_path'] + ' RTSP/1.0\r\n'
    str_teardown_header += 'CSeq: ' + str(config_dict['cseq'] + 11) + '\r\n'
    if config_dict['auth_method'] == 'Basic':
        auth_64 = base64.b64encode((config_dict['server_username'] + ":" + config_dict['server_password']).encode("utf-8")).decode()
        str_teardown_header += 'Authorization: Basic '+auth_64 + ' \r\n'
    else:
        response_value = gen_response_value(url, public_method, realm, nonce)
        str_teardown_header += 'Authorization: Digest username="' + config_dict[
        'server_username'] + '", realm="' + realm + '", nonce="' + nonce + '", uri="' + url + '", response="' + response_value + '"\r\n'
    str_teardown_header += 'User-Agent: ' + config_dict['user_agent'] + '\r\n'
    str_teardown_header += 'Session: ' + session + '\r\n'
    str_teardown_header += '\r\n'
    return str_teardown_header


def add_header_according_to_protocol(str_header):
    str_header = str_header[0:len(str_header)-2]
    str_header += 'Accept: application/rtsl, application/sdp;level=2'
    str_header += 'Accept-Encoding: \r\n'
    str_header += 'Accept-Language: \r\n'
    str_header += 'Authorization: \r\n'
    str_header += 'Bandwidth: 1*DIGIT \r\n'
    str_header += 'Blocksize: \r\n'
    str_header += 'Cache-Control: no-cache \r\n'
    str_header += 'Conference: 199702170042.SAA08642@obiwan.arl.wustl.edu%20Starr \r\n'
    str_header += 'Connection: \r\n'
    str_header += 'Content-Base: \r\n'
    str_header += 'Content-Encoding: \r\n'
    str_header += 'Content-Language: \r\n'
    str_header += 'Content-Length: 20 \r\n'
    str_header += 'Content-Location: \r\n'
    str_header += 'Content-Type: \r\n'
    str_header += 'Date: \r\n'
    str_header += 'Expires: Thu, 01 Dec 1994 16:00:00 GMT \r\n'
    str_header += 'From: \r\n'
    str_header += 'If-Modified-Since: Sat, 29 Oct 1994 19:43:31 GMT \r\n'
    str_header += 'Last-Modified: \r\n'
    str_header += 'Proxy-Require: \r\n'
    str_header += 'Referer: \r\n'
    str_header += 'Require: funky-feature \r\n'
    str_header += 'Scale: -3.5 \r\n'
    str_header += 'Speed: 2.5 \r\n'
    str_header += 'Transport: RTP/AVP;unicast;client_port=3456-3457;mode="PLAY" \r\n'
    str_header += 'User-Agent: \r\n'
    str_header += 'Via: \r\n'
    str_header += 'Range: npt=2\r\n'
    str_header += '\r\n'
    return str_header


def exec_full_request():
    socket_send = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socket_send.settimeout(5)
    socket_send.connect((config_dict['server_ip'], config_dict['server_port']))
    
    url = 'rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict['server_path']
    
    print('now start to check options operation')
    str_options_header = gen_options_header()
    if config_dict['header_modify_allow'] & config_dict['options_header_modify']:
        str_options_header = add_header_according_to_protocol(str_options_header)
    socket_send.send(str_options_header.encode())
    msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
    if '200 OK' in msg_recv:
        print('OPTIONS request is OK')
    else:
        print('OPTIONS request is BAD')
    str_describe_header = gen_describe_header()
    if config_dict['header_modify_allow'] & config_dict['describe_header_modify']:
        str_describe_header = add_header_according_to_protocol(str_describe_header)
    socket_send.send(str_describe_header.encode())
    msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
    if msg_recv.find('401 Unauthorized') == -1 & False:
        msg_recv_dict = msg_recv.split('\r\n')
        print('first DESCRIBE request occur error: ')
        print(msg_recv_dict[0])
    else:
        print('first DESCRIBE is ok,now we will execute second DESCRIBE for auth')
        realm_pos = msg_recv.find('realm')
        realm_value_begin_pos = msg_recv.find('"', realm_pos)+1
        realm_value_end_pos = msg_recv.find('"', realm_pos + 8)
        realm_value = msg_recv[realm_value_begin_pos:realm_value_end_pos]
        nonce_pos = msg_recv.find('nonce')
        nonce_value_begin_pos = msg_recv.find('"', nonce_pos)+1
        nonce_value_end_pos = msg_recv.find('"', nonce_pos + 8)
        nonce_value = msg_recv[nonce_value_begin_pos:nonce_value_end_pos]
        str_describe_auth_header = gen_describe_auth_header(url, realm_value, nonce_value)
        if config_dict['header_modify_allow'] & config_dict['describe_auth_header_modify']:
            str_describe_auth_header = add_header_according_to_protocol(str_describe_auth_header)
        socket_send.send(str_describe_auth_header.encode())
        msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
        if msg_recv.find('200 OK') == -1:
            msg_recv_dict = msg_recv.split('\r\n')
            print('second DESCRIBE request occur error: ')
            print(msg_recv_dict[0])
        else:
            print('second DESCRIBE is ok,now we will execute first SETUP for session')
            str_setup_header = gen_setup_header(url, realm_value, nonce_value)
            if config_dict['header_modify_allow'] & config_dict['setup_header_modify']:
                str_setup_header = add_header_according_to_protocol(str_setup_header)
            socket_send.send(str_setup_header.encode())
            msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
            if msg_recv.find('200 OK') == -1:
                msg_recv_dict = msg_recv.split('\r\n')
                print('first SETUP request occur error: ')
                print(msg_recv_dict[0])
            else:
                print('first SETUP is ok,now we will execute second SETUP')
                session_pos = msg_recv.find('Session')
                session_value_begin_pos = msg_recv.find(' ',session_pos+8)+1
                session_value_end_pos = msg_recv.find(';',session_pos+8)
                session_value = msg_recv[session_value_begin_pos:session_value_end_pos]
                str_setup_session_header = gen_setup_session_header(url, realm_value, nonce_value,session_value)
                if config_dict['header_modify_allow'] & config_dict['setup_session_header_modify']:
                    str_setup_session_header = add_header_according_to_protocol(str_setup_session_header)
                socket_send.send(str_setup_session_header.encode())
                msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
                if msg_recv.find('200 OK') == -1:
                    msg_recv_dict = msg_recv.split('\r\n')
                    print('first SETUP request occur error: ')
                    print(msg_recv_dict[0])
                else:
                    print('second SETUP is ok, now we wil execute PLAY')
                    str_play_header = gen_play_header(url, realm_value, nonce_value, session_value)
                    if config_dict['header_modify_allow'] & config_dict['play_header_modify']:
                        str_play_header = add_header_according_to_protocol(str_play_header)
                    socket_send.send(str_play_header.encode())
                    msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
                    if msg_recv.find('200 OK') == -1:
                        msg_recv_dict = msg_recv.split('\r\n')
                        print('PLAY request occur error: ')
                        print(msg_recv_dict[0])
                    else:
                        print('PLAY is ok, we will execute GET_PARAMETER every 10 seconds and 5 times total')
                        for i in range(2):
                            str_get_parameter_header = gen_get_parameter_header(url, realm_value, nonce_value, session_value,str(i))
                            if config_dict['header_modify_allow'] & config_dict['get_parameter_header_modify']:
                                str_get_parameter_header = add_header_according_to_protocol(str_get_parameter_header)
                            socket_send.send(str_get_parameter_header.encode())
                            msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
                            msg_recv_dict = msg_recv.split('\r\n')
                            print(str(i)+'*10:'+msg_recv_dict[0])
                            time.sleep(10)
                        print('now we will execute TEARDOWN to disconnect with server')
                        str_teardown_header = gen_teardown_header(url, realm_value, nonce_value, session_value)
                        if config_dict['header_modify_allow'] & config_dict['teardown_header_modify']:
                            str_teardown_header = add_header_according_to_protocol(str_teardown_header)
                        socket_send.send(str_teardown_header.encode())
                        msg_recv = socket_send.recv(config_dict['buffer_len']).decode()
                        print(msg_recv)
                        print('program execute finished, thank you')
    socket_send.close()


exec_full_request()
View Code

相关文章: