【问题标题】:Spark not deleting old data in MemSql when Overwrite mode is used使用 Overwrite 模式时 Spark 不会删除 MemSql 中的旧数据
【发布时间】:2018-05-29 09:59:40
【问题描述】:

我正在使用覆盖模式运行 Spark 作业。我期待它会删除表中的数据并插入新数据。但是它只是将数据附加到它。

我期待与在文件系统中使用保存鼠标覆盖时相同的行为,

object HiveToMemSQL {
def main(args: Array[String]) {

    val log = Logger.getLogger(HiveToMemSQL.getClass)

    //var options = getOptions()
    //val cmdLineArgs = new CommandLineOptions().validateArguments(args, options)

    //if (cmdLineArgs != null) {

    // Get command line options values
    var query = "select * from default.students"
    // Get destination DB details from command line
    val destHostName ="localhost"
    //val destUserName = cmdLineArgs.getOptionValue("destUserName")
    //val destPassword = cmdLineArgs.getOptionValue("destPassword")
    val destDBName ="tsg"
    val destTable = "ORC_POS_TEST"
    val destPort = 3308
    val destConnInfo = MemSQLConnectionInfo(destHostName, destPort, "root", "", destDBName)

    val spark = SparkSession.builder().appName("Hive To MemSQL")
    .config("maxRecordsPerBatch" ,"100")
    .config("spark.memsql.host", destConnInfo.dbHost)
    .config("spark.memsql.port", destConnInfo.dbPort.toString)
    .config("spark.memsql.user", destConnInfo.user)
    .config("spark.memsql.password", destConnInfo.password)
    .config("spark.memsql.defaultDatabase", destConnInfo.dbName)
    //          .config("org.apache.spark.sql.SaveMode" , SaveMode.Overwrite.toString())
    .config("spark.memsql.defaultSaveMode"  , "Overwrite")
    .config("maxRecordsPerBatch" ,"100").master("local[*]").enableHiveSupport().getOrCreate()

    import spark.implicits._
    import spark.sql

    // Queries are expressed in HiveQL
    val sqlDF = spark.sql("select* from tsg.v_pos_krogus_wk_test")
    log.info("Successfully read data from source")
    sqlDF.printSchema()
    sqlDF.printSchema()

    // MemSQL destination DB Master Aggregator, Port, Username and Password
    import spark.implicits._

    // Disabling writing to leaf nodes directly
    var saveConf = SaveToMemSQLConf(spark.memSQLConf,
    params = Map("useKeylessShardingOptimization" -> "false", 
                 "writeToMaster" -> "false" , 
                 "saveMode" -> SaveMode.Overwrite.toString()))

    log.info("Save mode before  :" + saveConf.saveMode )
    saveConf= saveConf.copy(saveMode=SaveMode.Overwrite)
    log.info("Save mode after  :" + saveConf.saveMode )

    val tableIdent = TableIdentifier(destDBName, destTable)
    sqlDF.saveToMemSQL(tableIdent, saveConf)

    log.info("Successfully completed writing to MemSQL DB")
}}

【问题讨论】:

    标签: apache-spark singlestore


    【解决方案1】:

    MemSQL Spark 连接器设置将写入 REPLACE 语句。 REPLACE 的工作方式与 INSERT 完全相同,只是如果表中的旧行与 PRIMARY KEY 的新行具有相同的值,则在插入新行之前删除旧行。见https://docs.memsql.com/sql-reference/v6.0/replace/

    【讨论】:

    • 有没有办法,我可以在插入之前截断表格,就像我如何在 spark 中使用 jdbc 写入格式一样?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-18
    • 2020-06-27
    • 2016-12-01
    • 1970-01-01
    • 2023-02-09
    相关资源
    最近更新 更多