【问题标题】:Copy files from gen2 ADLS based on acknowledgement file根据确认文件从 gen2 ADLS 复制文件
【发布时间】:2021-06-26 03:03:06
【问题描述】:

我正在尝试使用数据工厂管道将数据从 gen2 ADLS 复制到另一个 ADLS。 此管道每天运行并仅复制该特定日期的数据。这是通过在复制活动中提供开始和结束时间来完成的。

有时源 ADLS 中的文件会延迟,以便管道运行,但不会复制任何数据。 为了跟踪这一点,我们计划在将数据复制到源 ADLS 后保留一个确认文件,以便在复制之前我们可以检查确认文件并仅在确认文件存在时继续数据复制。

所以检查应该每 10 分钟发生一次 在这 2 小时内,如果文件存在,则数据复制应继续,检查任务也应停止。 如果 2 小时后没有数据,则作业应该失败。

我尝试在 ADF 中执行验证任务。但一个问题是文件夹名称,因为我的文件夹将以数据和创建时间戳命名(例如:2021-03-30-02-19-33)。 我必须在提供文件夹名称时排除文件夹的时间戳部分。 这怎么可能。验证活动是否接受通配符路径?

任何线索如何实现这一点?

有没有办法在get matadata任务中实现10分钟2小时后的连续检查?我们可以通过获取元数据任务来实现上述场景吗?

【问题讨论】:

    标签: azure azure-data-factory azure-data-factory-2 azure-data-lake azure-data-lake-gen2


    【解决方案1】:

    据我所知,ADF validationmetadata 活动不支持文件夹/文件路径的通配符路径。

    【讨论】:

      【解决方案2】:

      如果我们确实需要使用通配符路径,我们必须在笔记本文件上编写 Scala/Python.... 脚本并从 ADF 执行。

      我在下面使用了从 ADF 获取输入参数的 scala 脚本。

      import java.io.File
      import java.util.Calendar
      
      dbutils.widgets.text("mainFolderPath", "","")
      dbutils.widgets.text("finalFolderStartName", "","")
      dbutils.widgets.text("fileName", "","")
      dbutils.widgets.text("noOfTry", "1","")
      val mainFolderPath = dbutils.widgets.get("mainFolderPath")
      val finalFolderStartName = dbutils.widgets.get("finalFolderStartName")
      val fileName = dbutils.widgets.get("fileName")
      val noOfTry = (dbutils.widgets.get("noOfTry")).toInt
      
      println("Main folder path : " + mainFolderPath)
      println("Final folder start name : " + finalFolderStartName)
      println("File name to be checked : " + fileName)
      println("Number of tries with a gap of 1 mins : " + noOfTry)
      
      if(mainFolderPath == "" || finalFolderStartName == "" || fileName == ""){
        dbutils.notebook.exit("Please pass input parameters and rerun!")
      }
      
      def getListOfSubDirectories(directoryName: String): Array[String] = {
          (new File(directoryName))
              .listFiles
              .filter(_.isDirectory)
              .map(_.getName)   
      }
      
      var counter = 0
      var folderFound = false
      var fileFound = false
      
      try{
        while (counter < noOfTry && !fileFound) {
          val folders = getListOfSubDirectories(mainFolderPath)
          if(folders.exists(firstName => firstName.startsWith(finalFolderStartName))){
            folders.foreach(fol => {
              if(fol.startsWith(finalFolderStartName)){
                val finalPath = mainFolderPath + "/" + fol + "/" + fileName
                println("Final file path : " + finalPath)
                folderFound = true
                if(new File(finalPath).exists) {
                  fileFound = true
                }else{
                  println("found the final folder but no file found!")
                  println("waiting for 10 mins! " + Calendar.getInstance().getTime())
                  counter = counter+1
                  Thread.sleep(1*60*1000)
                }
              }
            })
          }else{
          println("folder does not exists with name : " + mainFolderPath + "/" + finalFolderStartName + "*")
          println("waiting for 10 mins! " + Calendar.getInstance().getTime())
          counter = counter+1
          Thread.sleep(1*60*1000)
          }
        }
      }catch{
        case e : Throwable =>  throw e;
      }
      
      if(folderFound && fileFound){
        println("File Exists!")
      }else{
        throw new Exception("File does not exists!");
      }
      

      【讨论】:

        猜你喜欢
        • 2020-04-20
        • 2020-09-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-08-11
        • 2022-08-02
        • 2022-01-13
        • 2020-12-03
        相关资源
        最近更新 更多