【问题标题】:How do I batch sql statements with package database/sql如何使用包 database/sql 批处理 sql 语句
【发布时间】:2021-07-27 21:19:31
【问题描述】:

如何使用 Go 的 database/sql 包批处理 sql 语句?

在 Java 中我会这样做:

// Create a prepared statement
String sql = "INSERT INTO my_table VALUES(?)";
PreparedStatement pstmt = connection.prepareStatement(sql);

// Insert 10 rows of data
for (int i=0; i<10; i++) {
    pstmt.setString(1, ""+i);
    pstmt.addBatch();
}

// Execute the batch
int [] updateCounts = pstmt.executeBatch();

我如何在 Go 中实现同样的目标?

【问题讨论】:

  • 我不相信sql包中存在这样的东西。

标签: sql go


【解决方案1】:

由于db.Exec 函数是variadic,一个选项(实际上只进行一次网络往返)是自己构造语句并分解参数并传递它们。

示例代码:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, "(?, ?, ?)")
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", 
                        strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}

在我运行的一个简单测试中,该解决方案在插入 10,000 行时比另一个答案中提出的 Begin、Prepare、Commit 快了大约 4 倍 - 尽管实际改进很大程度上取决于您的个人设置、网络延迟、等等

【讨论】:

  • 嗯,这是一个有趣的方法。我有点喜欢它,你能发布你的测试来展示你所做的基准测试吗?我对这种解决方案感兴趣,因为它可以减少启动的事务数量。
  • 公平地说,您应该使用准备好的语句和事务然后再试一次,而不是每次都开始/准备/提交。这样比较会更准确。
  • @Xeoncross 我在这里写的方式,你说得对,它们是不同的交易。但我相信您可以轻松地通过db.Begin(); &lt;code from above but with tx.Exec&gt;; db.Commit() 将其作为一个事务运行。此外,这个对于 SQL 注入是安全的,因为它使用了? 占位符。
  • 我使用这个答案构建了一些东西,发现在撰写本文时,MySQL 中的占位符似乎有 2^16-1 (65,535) 的限制。为了安全起见,我最终在一个循环中运行了多个插入(一次大约插入 10,000 行)。
  • 以防有人遇到这个答案并且没有意识到(像我一样)(?, ?, ?) 的语法是 MySql 特定的,需要将 ($1, $2, $3), ... ($n, $n+1 $n+2) 更改为 Postgresql
【解决方案2】:

如果您使用的是 PostgreSQL,那么 pq 支持 bulk imports

【讨论】:

    【解决方案3】:

    为不支持?占位符的PostgreSQL适配Andrew's solution,以下工作:

    func BulkInsert(unsavedRows []*ExampleRowStruct) error {
        valueStrings := make([]string, 0, len(unsavedRows))
        valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
        i := 0
        for _, post := range unsavedRows {
            valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3))
            valueArgs = append(valueArgs, post.Column1)
            valueArgs = append(valueArgs, post.Column2)
            valueArgs = append(valueArgs, post.Column3)
            i++
        }
        stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
        _, err := db.Exec(stmt, valueArgs...)
        return err
    }
    

    【讨论】:

    • 不错的sn-p!这不是对 SQL 注入开放吗?
    • 我没有看到风险,字符串插值仅使用递增的i 参数(valueStrings 将是($1, $2, $3),($4, $5, $6),...)。 ExampleRowStructs 仅传递给 db.Exec,数据库驱动程序负责替换占位符。
    【解决方案4】:

    扩展 Avi Flax 的答案,我需要在我的 INSERT 中添加一个 ON CONFLICT DO UPDATE 子句。

    解决办法是先COPY到临时表(设置为在事务结束时删除),然后从临时表INSERT到永久表。

    这是我确定的代码:

    func (fdata *FDataStore) saveToDBBulk(items map[fdataKey][]byte) (err error) {
        tx, err := fdata.db.Begin()
        if err != nil {
            return errors.Wrap(err, "begin transaction")
        }
        txOK := false
        defer func() {
            if !txOK {
                tx.Rollback()
            }
        }()
    
        // The ON COMMIT DROP clause at the end makes sure that the table
        // is cleaned up at the end of the transaction.
        // While the "for{..} state machine" goroutine in charge of delayed
        // saving ensures this function is not running twice at any given time.
        _, err = tx.Exec(sqlFDataMakeTempTable)
        // CREATE TEMPORARY TABLE fstore_data_load
        // (map text NOT NULL, key text NOT NULL, data json)
        // ON COMMIT DROP
        if err != nil {
            return errors.Wrap(err, "create temporary table")
        }
    
        stmt, err := tx.Prepare(pq.CopyIn(_sqlFDataTempTableName, "map", "key", "data"))
        for key, val := range items {
            _, err = stmt.Exec(string(key.Map), string(key.Key), string(val))
            if err != nil {
                return errors.Wrap(err, "loading COPY data")
            }
        }
    
        _, err = stmt.Exec()
        if err != nil {
            return errors.Wrap(err, "flush COPY data")
        }
        err = stmt.Close()
        if err != nil {
            return errors.Wrap(err, "close COPY stmt")
        }
    
        _, err = tx.Exec(sqlFDataSetFromTemp)
        // INSERT INTO fstore_data (map, key, data)
        // SELECT map, key, data FROM fstore_data_load
        // ON CONFLICT DO UPDATE SET data = EXCLUDED.data
        if err != nil {
            return errors.Wrap(err, "move from temporary to real table")
        }
    
        err = tx.Commit()
        if err != nil {
            return errors.Wrap(err, "commit transaction")
        }
        txOK = true
        return nil
    }
    

    【讨论】:

      【解决方案5】:

      如果您使用 Postgres,这里是 @Debasish Mitra 的解决方案。

      功能示例:https://play.golang.org/p/dFFD2MrEy3J

      替代示例:https://play.golang.org/p/vUtW0K4jVMd

      data := []Person{{"John", "Doe", 27}, {"Leeroy", "Jenkins", 19}}
      
      vals := []interface{}{}
      for _, row := range data {
          vals = append(vals, row.FirstName, row.LastName, row.Age)
      }
      
      sqlStr := `INSERT INTO test(column1, column2, column3) VALUES %s`
      sqlStr = ReplaceSQL(sqlStr, "(?, ?, ?)", len(data))
      
      //Prepare and execute the statement
      stmt, _ := db.Prepare(sqlStr)
      res, _ := stmt.Exec(vals...)
      

      函数替换SQL

      func ReplaceSQL(stmt, pattern string, len int) string {
          pattern += ","
          stmt = fmt.Sprintf(stmt, strings.Repeat(pattern, len))
          n := 0
          for strings.IndexByte(stmt, '?') != -1 {
              n++
              param := "$" + strconv.Itoa(n)
              stmt = strings.Replace(stmt, "?", param, 1)
          }
          return strings.TrimSuffix(stmt, ",")
      }
      

      【讨论】:

      【解决方案6】:

      如果有人使用 pgx(Golang 中最好的 Postgres 驱动程序),请参阅以下解决方案: https://github.com/jackc/pgx/issues/764#issuecomment-685249471

      【讨论】:

        【解决方案7】:

        借鉴 Andrew C 的想法,并使用 sql 标量变量根据我的工作需要对其进行调整。它非常适合我工作中的特定要求。也许它对某人有用,因为它在 golang 中模拟 sql 的批处理事务很有用。就是这样。

        func BulkInsert(unsavedRows []*ExampleRowStruct) error {
            valueStrings := make([]string, 0, len(unsavedRows))
            valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
            i := 0
            for _, post := range unsavedRows {
                valueStrings = append(valueStrings, fmt.Sprintf("(@p%d, @p%d, @p%d)", i*3+1, i*3+2, i*3+3))
                valueArgs = append(valueArgs, post.Column1)
                valueArgs = append(valueArgs, post.Column2)
                valueArgs = append(valueArgs, post.Column3)
                i++
            }
            sqlQuery := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
        
            var params []interface{}
        
            for i := 0; i < len(valueArgs); i++ {
                var param sql.NamedArg
                param.Name = fmt.Sprintf("p%v", i+1)
                param.Value = valueArgs[i]
                params = append(params, param)
            }
        
            _, err := db.Exec(sqlQuery, params...)
            return err
        }
        

        【讨论】:

          【解决方案8】:

          对于 Postgres lib pq 支持批量插入:https://godoc.org/github.com/lib/pq#hdr-Bulk_imports

          但同样可以通过下面的代码实现,但真正有用的是尝试执行批量条件更新(相应地更改查询)。

          对于 Postgres 执行类似的批量插入,您可以使用以下函数。

          // ReplaceSQL replaces the instance occurrence of any string pattern with an increasing $n based sequence
          func ReplaceSQL(old, searchPattern string) string {
             tmpCount := strings.Count(old, searchPattern)
             for m := 1; m <= tmpCount; m++ {
                old = strings.Replace(old, searchPattern, "$"+strconv.Itoa(m), 1)
             }
             return old
          }
          

          所以上面的示例变成了

          sqlStr := "INSERT INTO test(n1, n2, n3) VALUES "
          vals := []interface{}{}
          
          for _, row := range data {
             sqlStr += "(?, ?, ?)," // Put "?" symbol equal to number of columns
             vals = append(vals, row["v1"], row["v2"], row["v3"]) // Put row["v{n}"] blocks equal to number of columns
          }
          
          //trim the last ,
          sqlStr = strings.TrimSuffix(sqlStr, ",")
          
          //Replacing ? with $n for postgres
          sqlStr = ReplaceSQL(sqlStr, "?")
          
          //prepare the statement
          stmt, _ := db.Prepare(sqlStr)
          
          //format all vals at once
          res, _ := stmt.Exec(vals...)
          

          【讨论】:

            【解决方案9】:

            我让 pq.CopyIn 工作了,它实际上比字符串值/参数方法快 2.4 倍(顺便说一句,这是一个非常有用和优雅的解决方案,所以谢谢!)

            我将 1000 万个 int、varchar 的测试值插入到一个结构中,并使用以下函数加载它。我对 GoLang 有点陌生,所以请耐心等待......

            func copyData(client *client.DbClient, dataModels []*dataModel) error{
                db := *client.DB
                txn, err := db.Begin()
                if err != nil {
                    return err
                }
                defer txn.Commit()
            
                stmt, err := txn.Prepare(pq.CopyIn("_temp", "a", "b"))
                if err != nil {
                    return(err)
                }
            
                for _, model := range dataModels{
                    _, err := stmt.Exec(model.a, model.b)
                    if err != nil {
                        txn.Rollback()
                        return err
                    }
                }
            
                _, err = stmt.Exec()
                if err != nil {
                    return err
                }
            
                err = stmt.Close()
                if err != nil {
                    return err
                }
            
                return nil
                }
            

            `

            经过(字符串值/参数):1m30.60s。

            经过(复制):37.57 秒。

            【讨论】:

              【解决方案10】:

              无法通过 database/sql 中可用的接口进行批处理。但是,特定的数据库驱动程序可能会单独支持它。例如,https://github.com/ziutek/mymysql 似乎支持使用 MySQL 进行批处理。

              【讨论】:

              【解决方案11】:

              go-pggo-pg 是另一个使用链式语法的好库

              https://github.com/go-pg/pg/wiki/Writing-Queries#insert

              使用单个查询插入多本书:

              err := db.Model(book1, book2).Insert()
              

              【讨论】:

                【解决方案12】:

                这是一个更通用的版本,用于根据 @andrew-c 和 @mastercarl 的答案生成查询和值参数:

                //bulk/insert.go

                import (
                    "strconv"
                    "strings"
                )
                
                type ValueExtractor = func(int) []interface{}
                
                func Generate(tableName string, columns []string, numRows int, postgres bool, valueExtractor ValueExtractor) (string, []interface{}) {
                    numCols := len(columns)
                    var queryBuilder strings.Builder
                    queryBuilder.WriteString("INSERT INTO ")
                    queryBuilder.WriteString(tableName)
                    queryBuilder.WriteString("(")
                    for i, column := range columns {
                        queryBuilder.WriteString("\"")
                        queryBuilder.WriteString(column)
                        queryBuilder.WriteString("\"")
                        if i < numCols-1 {
                            queryBuilder.WriteString(",")
                        }
                    }
                    queryBuilder.WriteString(") VALUES ")
                    var valueArgs []interface{}
                    valueArgs = make([]interface{}, 0, numRows*numCols)
                    for rowIndex := 0; rowIndex < numRows; rowIndex++ {
                        queryBuilder.WriteString("(")
                        for colIndex := 0; colIndex < numCols; colIndex++ {
                            if postgres {
                                queryBuilder.WriteString("$")
                                queryBuilder.WriteString(strconv.Itoa(rowIndex*numCols + colIndex + 1))
                            } else {
                                queryBuilder.WriteString("?")
                            }
                            if colIndex < numCols-1 {
                                queryBuilder.WriteString(",")
                            }
                        }
                        queryBuilder.WriteString(")")
                        if rowIndex < numRows-1 {
                            queryBuilder.WriteString(",")
                        }
                        valueArgs = append(valueArgs, valueExtractor(rowIndex)...)
                    }
                    return queryBuilder.String(), valueArgs
                }
                

                // 批量/insert_test.go

                import (
                    "fmt"
                    "strconv"
                )
                
                func valueExtractor(index int) []interface{} {
                    return []interface{}{
                        "trx-" + strconv.Itoa(index),
                        "name-" + strconv.Itoa(index),
                        index,
                    }
                }
                
                func ExampleGeneratePostgres() {
                    query, valueArgs := Generate("tbl_persons", []string{"transaction_id", "name", "age"}, 3, true, valueExtractor)
                    fmt.Println(query)
                    fmt.Println(valueArgs)
                    // Output:
                    // INSERT INTO tbl_persons("transaction_id","name","age") VALUES ($1,$2,$3),($4,$5,$6),($7,$8,$9)
                    // [[trx-0 name-0 0] [trx-1 name-1 1] [trx-2 name-2 2]]
                }
                
                func ExampleGenerateOthers() {
                    query, valueArgs := Generate("tbl_persons", []string{"transaction_id", "name", "age"}, 3, false, valueExtractor)
                    fmt.Println(query)
                    fmt.Println(valueArgs)
                    // Output:
                    // INSERT INTO tbl_persons("transaction_id","name","age") VALUES (?,?,?),(?,?,?),(?,?,?)
                    // [[trx-0 name-0 0] [trx-1 name-1 1] [trx-2 name-2 2]]
                }
                

                【讨论】:

                  猜你喜欢
                  • 1970-01-01
                  • 1970-01-01
                  • 2013-11-26
                  • 2012-12-14
                  • 2014-10-23
                  • 2014-01-09
                  • 2011-02-24
                  • 2015-10-25
                  • 1970-01-01
                  相关资源
                  最近更新 更多