【问题标题】:Low InnoDB Writes per Second - AWS EC2 to MySQL RDS using Python每秒低 InnoDB 写入 - 使用 Python 的 AWS EC2 到 MySQL RDS
【发布时间】:2016-03-05 13:40:06
【问题描述】:

我有大约 60GB 的 JSON 文件,我正在使用 Python 对其进行解析,然后使用 Python-MySQL 连接器将其插入 MySQL 数据库。每个 JSON 文件大约 500MB

我一直在使用带有辅助卷的 AWS r3.xlarge EC2 实例来保存 60GB 的 JSON 数据。

然后我使用 AWS RDS r3.xlarge MySQL 实例。这些实例都在同一个区域和可用区中。 EC2 实例使用以下 Python 脚本加载 JSON,对其进行解析,然后将其插入 MySQL RDS。我的蟒蛇:

import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os

os.chdir("./json_data")

for file in glob.glob("*.json"):
    with open(file, 'rU') as data_file:
        results = json.load(data_file)
        print('working on file:', file)

    cnx = mysql.connector.connect(user='', password='',
        host='')

    cursor = cnx.cursor(buffered=True)

    DB_NAME = 'DB'

    def create_database(cursor):
        try:
            cursor.execute(
                "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
        except mysql.connector.Error as err:
            print("Failed creating database: {}".format(err))
            exit(1)

    try:
        cnx.database = DB_NAME    
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_BAD_DB_ERROR:
            create_database(cursor)
            cnx.database = DB_NAME
        else:
            print(err)
            exit(1)

    add_overall_data = ("INSERT INTO master" 
        "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
        "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")

    add_polyline = ("INSERT INTO polyline"
        "(Overview_polyline, request_no)"
        "VALUES (%(Overview_polyline)s, %(request_no)s)")

    add_summary = ("INSERT INTO summary"
        "(summary, request_no)"
        "VALUES (%(summary)s, %(request_no)s)")

    add_warnings = ("INSERT INTO warnings"
        "(warnings, request_no)"
        "VALUES (%(warnings)s, %(request_no)s)")

    add_waypoint_order = ("INSERT INTO waypoint_order"
        "(waypoint_order, request_no)"
        "VALUES (%(waypoint_order)s, %(request_no)s)")

    add_leg_data = ("INSERT INTO leg_data"
        "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
        "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
    error_messages = []
    for result in results:
        if result["status"] == "OK":
            for leg in result['routes'][0]['legs']:
                try: 
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": leg['dtf']['value'],
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": leg['start_address'],
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": leg['end_address']
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
                except KeyError, e:
                    error_messages.append(e)
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": "000",
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": 'unknown',
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": 'unknown'
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
            for overview_polyline in result['routes']:
                params = {
                "request_no": request_no,
                "Overview_polyline": overview_polyline['overview_polyline']['points']
                }
                cursor.execute(add_polyline, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for summary in result['routes']:
                params = {
                "request_no": request_no,
                "summary": summary['summary']
                }
                cursor.execute(add_summary, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for warnings in result['routes']:
                params = {
                "request_no": request_no,
                "warnings": str(warnings['warnings'])
                }
                cursor.execute(add_warnings, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for waypoint_order in result['routes']:
                params = {
                "request_no": request_no,
                "waypoint_order": str(waypoint_order['waypoint_order'])
                }
                cursor.execute(add_waypoint_order, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for steps in result['routes'][0]['legs'][0]['steps']:
                params = {
                "request_no": request_no,
                "leg_dt": steps['dt']['value'],
                "leg_ds": steps['ds']['value'],
                "leg_O_l": steps['start_location']['lat'],
                "leg_O_ln": steps['start_location']['lng'],
                "leg_D_l": steps['end_location']['lat'],
                "leg_D_ln": steps['end_location']['lng'],
                "leg_html_inst": steps['html_instructions'],
                "leg_polyline": steps['polyline']['points'],
                "leg_travel_mode": steps['travel_mode']
                }
                cursor.execute(add_leg_data, params)
        cnx.commit()
    print('error messages:', error_messages)
    cursor.close()
    cnx.close()
    print('finished' + file)

在 Linux 实例上使用 htop 我可以看到以下内容:

关于 MySQL 数据库,使用 MySQL Workbench 我可以看到:

这个 python 脚本已经运行了好几天,但我只将大约 20% 的数据插入 MySQL。

我的问题 - 如何确定瓶颈?是 Python 脚本吗?它似乎正在使用少量内存 - 我可以增加这个吗?我根据 (How to improve the speed of InnoDB writes per second of MySQL DB) 检查了 InnoDB 缓冲池大小,发现它很大:

SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
|               11674845184 |
+---------------------------+

由于我在同一区域使用 RDS 和 EC2 实例,我不认为存在网络瓶颈。非常欢迎关于我应该在哪里寻找最大节省的指针!

编辑

我想我可能偶然发现了这个问题。为了在解析过程中提高效率,我分别编写了每个级别的 JSON。但是,我必须执行一个查询以匹配 JSON 的嵌套部分与其更高级别。使用小型数据库时,此查询的开销较低。我注意到这个数据库上的插入速度急剧下降。这是因为它必须搜索更大且不断增长的数据库才能正确连接 JSON 数据。

除了等待之外,我不知道如何解决这个问题......

【问题讨论】:

  • 你提到EC2和RDS在同一个区域;他们也在同一个可用区吗?如果没有,这可能是一种很容易看到进一步改进的方法。
  • 是的 - 考虑到这一点。他们都在同一个可用区
  • 您是否尝试过在 RDS 实例上使用预配置的 IOP?
  • 我正在使用:IOPS = 1000 和 200GB 存储的预置 IOPS (SSD)。我不确定是否可以增加 IOP,但现在将研究...
  • 请使用显式事务,否则很难理解。

标签: python mysql amazon-web-services amazon-ec2


【解决方案1】:

我在 Python 脚本中看不到任何表定义....但是当我们尝试执行大型数据操作时 - 我们总是会在加载到 MySQL 时禁用任何数据库索引 - 如果您有任何约束/外键强制执行- 加载时也应禁用此功能。

通过 Connector/Python 连接时默认禁用自动提交。

但我在您提供的代码中看不到任何提交选项

总结

禁用/删除(用于加载)

-- 索引
-- 约束 -- 外键 -- 触发器

在您的加载程序中

-- 禁用自动提交 -- 提交 n 条记录(N 取决于您可用的缓冲区大小)

【讨论】:

    【解决方案2】:

    我的英语很差

    如果我做这个工作,我会的

    1. 使用python将json转为txt

    2. 使用mysq imp工具,将txt导入mysql

    如果你必须做python+mysql allinone,我建议使用

    insert table values(1),value(2)...value(xxx)  
    

    为什么'SELECT request_no FROM master'多次​​出现,应该从json读取

    我的英语很差。所以..

    【讨论】:

      【解决方案3】:

      鉴于此信息,看起来脚本数据库大部分都处于空闲状态。在 MySQL 级别调整任何内容都为时过早。

      您需要更多地了解您的程序正在做什么。

      首先记录每个查询花费了多少时间,出现了多少错误等等。

      这些SELECTs 可能需要添加索引才能正常运行,如果这是一个问题的话。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2014-05-01
        • 1970-01-01
        • 2014-02-14
        • 1970-01-01
        • 1970-01-01
        • 2020-10-19
        • 2020-04-28
        • 2019-05-03
        相关资源
        最近更新 更多