【问题标题】:How to use PySpark to stream data into MySQL database?如何使用 PySpark 将数据流式传输到 MySQL 数据库?
【发布时间】:2019-04-15 12:55:52
【问题描述】:

我目前正在开发一个单页 Web 应用程序,该应用程序允许用户将大型 CSV 文件(目前正在测试约 7GB 文件)上传到烧瓶服务器,然后将该数据集流式传输到数据库。上传大约需要一分钟,文件被完全保存到烧瓶服务器上的临时文件中。现在我需要能够流式传输此文件并将其存储到数据库中。我做了一些研究,发现 PySpark 非常适合流式传输数据,我选择 MySQL 作为将 CSV 数据流式传输到的数据库(但我对其他数据库和流式传输方法持开放态度)。我是一名初级开发人员,也是 PySpark 的新手,所以我不知道该怎么做。 Spark streaming guide 表示数据必须通过 Kafka、Flume、TCP 套接字等来源获取,所以我想知道是否必须使用这些方法中的任何一种将我的 CSV 文件导入 Spark。但是,我遇到了这个great example,他们将 csv 数据流式传输到 Azure SQL 数据库中,看起来他们只是使用 Spark 直接读取文件,而无需通过 Kafka 等流式源来摄取它。唯一的事情是该示例使我感到困惑的是,他们正在使用 HDInsight Spark 群集将数据流式传输到数据库中,而我不确定如何将所有这些与烧瓶服务器合并。我为缺少代码而道歉,但目前我只有一个烧瓶服务器文件,其中一条路由进行文件上传。任何示例、教程或建议将不胜感激。

【问题讨论】:

    标签: python mysql flask pyspark spark-streaming


    【解决方案1】:

    我不确定流式传输部分,但 spark 可以有效地处理大文件 - 并且存储到 db 表将并行完成,因此对您的详细信息没有太多了解,并且前提是您在服务器上有上传的文件,我会说:

    如果我想在表格中保存像 csv 这样的大型结构化文件,我会这样开始:

    # start with some basic spark configuration, e.g. we want the timezone to be UTC 
    conf = SparkConf()
    conf.set('spark.sql.session.timeZone', 'UTC')
    # this is important: you need to have the mysql connector jar for the right mysql version:
    conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
    # instantiate a spark session: the first time it will take a few seconds
    spark = SparkSession.builder \
        .config(conf=conf) \
        .appName('Huge File uploader') \
        .getOrCreate()
    
    # read the file first as a dataframe
    df = spark.read.csv('path to 7GB/ huge csv file')
    
    # optionally, add a filename column
    from pyspark.sql import functions as F
    df = df.withColumn('filename', F.lit('thecurrentfilename'))
    
    # write it to the table
    df.write.format('jdbc').options(
                url='e.g. localhost:port',
                driver='com.mysql.cj.jdbc.Driver',  # the driver for MySQL
                dbtable='the table name to save to',
                user='user',
                password='secret',
            ).mode('append').save()
    

    注意这里的“追加”模式:这里的问题是 spark 无法对表执行更新,它要么追加新行,要么替换表中的内容。

    所以,如果你的 csv 是这样的:

    id, name, address....
    

    您最终会得到一个包含相同字段的表格。

    这是我能想到的最基本的示例,因此您可以从 spark 开始,而无需考虑 spark 集群或其他任何相关内容。我建议你试一试,看看这是否适合你的需要:)

    另外,请记住,这可能需要几秒钟或更长时间,具体取决于您的数据、数据库所在的位置、您的机器和数据库负载,因此最好让事情与您的 api 保持异步,再说一次,我不知道你的任何其他细节。

    希望这会有所帮助。祝你好运!

    【讨论】:

    • 感谢您的详细回复! @mkaran 我试了一下,但在 df.write() 方法的最后一步出现错误,说 Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
    • 看起来是 MySQL 错误。我使用的是 MySQL 8.0 版并尝试了on this post 和其他帖子中建议的所有内容,我还多次重新启动了我的计算机和 MySQL 服务器,但没有任何效果。我也降级到 MySQL 5.7 版,但仍然遇到同样的错误。
    • 我似乎无法写入我的 MySQL 数据库,所以我唯一的其他选择是使用 pyspark 可以连接到的另一个数据库。
    • 嗨@Mario,你能告诉我print(len(df.columns))print(df.count()) 在read.csv 之后的输出是什么吗?
    猜你喜欢
    • 1970-01-01
    • 2018-03-10
    • 1970-01-01
    • 1970-01-01
    • 2015-11-03
    • 2015-02-07
    • 1970-01-01
    • 1970-01-01
    • 2020-11-06
    相关资源
    最近更新 更多