【问题标题】:Copy data from blob storage to sqlDatabase (into multiple tables)将数据从 blob 存储复制到 sqlDatabase(复制到多个表中)
【发布时间】:2019-10-22 13:57:59
【问题描述】:

我对 azure 比较陌生,但我发现自己陷入了困境!我正在尝试使用 Azure DataFactory 将 Blob 存储中的数据读取到 SQL 数据库中。我使用复制活动使这个过程正常工作,现在我试图将数据插入到多个表中,这些表以某种方式相互关联(privateKey,foreignKey)。 例如,要更新 Table CAR,我需要知道所有者是否存在于 Table Owner 中。而且我无法找到有关如何进行的详细说明!哪位有经验的可以给我一些指导?谢谢

【问题讨论】:

  • 你考虑过使用存储过程吗?

标签: azure azure-data-factory


【解决方案1】:

我会采取不同的方法来解决这个问题。使用下面的代码,我们可以将来自多个文件的数据合并到一个数据框中,并将整个数据推送到 SQL Server 中。这是 Scala,因此需要在 Azure Databricks 环境中运行。

# merge files with similar names into a single dataframe
val DF = spark.read.format("csv")
   .option("sep","|")
   .option("inferSchema","true")
   .option("header","false")
   .load("mnt/rawdata/corp/ABC*.gz")


DF.count()


# rename headers in dataframe
val newNames = Seq("ID", "FName", "LName", "Address", "ZipCode", "file_name")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema


# push the dataframe to sql server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"            -> "my_sql_server.database.windows.net",
  "databaseName"   -> "my_db_name",
  "dbTable"        -> "dbo.my_table",
  "user"           -> "xxxxx",
  "password"       -> "xxxxx",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

import org.apache.spark.sql.SaveMode
DF.write.mode(SaveMode.Append).sqlDB(config)

上面的代码将读取每个文件的每一行。如果标题在第一行,这很好用。如果标题不在第一行,请使用下面的代码创建一个特定的模式,然后再次读取每个文件的每一行。

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.functions.input_file_name

val customSchema = StructType(Array(
    StructField("field1", StringType, true),
    StructField("field2", StringType, true),
    StructField("field3", StringType, true),
    StructField("field4", StringType, true),
    StructField("field5", StringType, true),
    StructField("field6", StringType, true),
    StructField("field7", StringType, true)))

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("sep", "|")
    .schema(customSchema)
    .load("mnt/rawdata/corp/ABC*.gz")
    .withColumn("file_name", input_file_name())


import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._



val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "databaseName"      -> "MyDatabase",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.write.mode(SaveMode.Append).
//df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

【讨论】:

    【解决方案2】:

    您需要使用映射数据流执行联接或查找,然后使用条件拆分转换将行重定向到适当的数据库表。

    【讨论】:

      【解决方案3】:

      据我了解,您需要使用查找活动来获取表名,然后使用 forEach 活动来遍历表,然后使用映射数据流或 Databricks 来应用过滤器和连接

      【讨论】:

        【解决方案4】:

        我可以补充 Shivar 所说的内容。我们将不得不使用查找活动,但您应该确保在处理 CAR 表之前先复制表“所有者”,为此,您应该在 foreach 活动中启用顺序选项,以便遵循顺序,否则会出现很多FK违规错误。

        【讨论】:

          猜你喜欢
          • 2020-09-24
          • 2019-03-30
          • 1970-01-01
          • 1970-01-01
          • 2020-11-20
          • 2016-02-12
          • 2020-04-25
          • 2020-04-16
          • 2014-05-10
          相关资源
          最近更新 更多