【问题标题】:Scala parallel executionScala 并行执行
【发布时间】:2020-03-06 06:11:18
【问题描述】:

我正在处理一项要求,以获取有关使用 Scala 存储在 Linux 中的文件的统计信息。

我们将根目录作为输入传递,我们的代码将获得根目录的完整子目录列表。

然后对于列表中的每个目录,我将获得文件列表,对于每个文件,我将获得所有者、组、权限、lastmodifiedtime、createdtime、lastaccesstime。

问题是如何并行处理目录列表以获取存储在该目录中的文件的统计信息。

在生产环境中,我们在根文件夹中有 100000 多个文件夹。

所以我的列表有 100000 多个文件夹列表。

如何在可用列表上并行化我的操作(文件统计信息)。

由于我是 Scala 新手,请帮我解决这个要求。

对不起,没有代码sn-p。

谢谢。

【问题讨论】:

  • 这很可能是文件系统返回目录中文件列表的瓶颈,并且不能是多线程的。
  • 不使用并行集合库,这听起来像是 Future 的工作,每个子目录都在自己的 Future 中处理。这听起来像是一个有趣的程序,所以我明天会继续努力。
  • 您期待什么样的输出?我有两个问题:1)是否允许多个输出文件? 2) 输出是否必须反映文件层次结构的结构?如果这两个问题的答案是 1) 是和 2) 否,那么实施该解决方案会更容易。对于第二个问题,为了解释我的意思,我问的是子目录是否可以以不固定的顺序交错。我宁愿打印数据,以便仅将每个子目录的文件组合在一起,而不是对每个子目录所代表的“块”强加顺序。我可以打印子目录的路径...
  • ... 在该子目录的文件数据上方,以便用户知道他们正在查看的数据。但我更希望允许对数据进行加扰。这将使算法更快。但是,我所问问题的答案可能与我想要的不同。那只会让实现变得更慢或更不美观。
  • 另外,我想补充一点,如果问题的答案是 1) 否,2) 是,那么实施该解决方案将受到我可以在内存中保存的数据量的限制。这意味着我必须在遍历期间将所有数据存储在数据库中,并在最后检索所有数据。这将为您提供最干净的数据 - 单个日志文件中的所有内容,按层次结构排序 - 但它会更慢,因为虽然遍历使用多线程,但从数据库中检索数据以在最后生成日志文件不会'不会。

标签: scala scala-collections


【解决方案1】:

我最终使用了 Akka 演员。

我对您想要的输出做出了假设,以便程序简单快速。我所做的假设是输出是 JSON,不保留层次结构,并且可以接受多个文件。如果您不喜欢 JSON,可以将其替换为其他内容,但其他两个假设对于保持程序当前的速度和简单性很重要。

您可以设置一些命令行参数。如果您不设置它们,则将使用默认值。默认值包含在 Main.scala 中。

命令行参数如下:

(0) 你开始的根目录; (无默认)

(1) 该程序中所有超时的超时间隔(以秒为单位); (默认为 60)

(2) 要使用的打印机 actor 的数量;这将是创建的日志文件的数量; (默认为 50)

(3) 用于监视器actor的滴答间隔; (默认为 500)

对于超时,请记住这是程序完成时等待的时间间隔值。因此,如果您运行一个小作业并想知道为什么需要一分钟才能完成,那是因为它在关闭程序之前等待超时间隔过去。

由于您正在运行如此大的作业,可能默认超时时间 60 太小。如果您收到抱怨超时的异常,请增加超时值。

请注意,如果您的滴答间隔设置得太高,您的程序可能会提前关闭。

要运行,只需在项目文件夹中启动 sbt,然后键入

runMain Main <canonical path of root directory>

我不知道如何在 Java 中获取文件组。您需要对此进行研究并将相关代码添加到 Entity.scala 和 TraverseActor.scala。

此外,TraverseActor.scala 中的 f.list() 有时会返回为 null,这就是我将其包装在 Option 中的原因。您必须调试该问题以确保您不会在某些文件上静默失败。

现在,这里是所有文件的内容。

build.sbt

name := "stackoverflow20191110"

version := "0.1"

scalaVersion := "2.12.1"

libraryDependencies ++= Seq(
  "io.circe" %% "circe-core",
  "io.circe" %% "circe-generic",
  "io.circe" %% "circe-parser"
).map(_ % "0.12.2")

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.16"

Entity.scala

import io.circe.Encoder
import io.circe.generic.semiauto._

sealed trait Entity {
  def path: String
  def owner: String
  def permissions: String
  def lastModifiedTime: String
  def creationTime: String
  def lastAccessTime: String
  def hashCode: Int
}

object Entity {
  implicit val entityEncoder: Encoder[Entity] = deriveEncoder
}

case class FileEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity

object fileentityEncoder {
  implicit val fileentityEncoder: Encoder[FileEntity] = deriveEncoder
}

case class DirectoryEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity

object DirectoryEntity {
  implicit val directoryentityEncoder: Encoder[DirectoryEntity] = deriveEncoder
}

case class Contents(path: String, files: IndexedSeq[Entity])

object Contents {
  implicit val contentsEncoder: Encoder[Contents] = deriveEncoder
}

Main.scala

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import java.io.{BufferedWriter, File, FileWriter}

import ShutDownActor.ShutDownYet

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try

object Main {

  val defaultNumPrinters = 50

  val defaultMonitorTickInterval = 500

  val defaultTimeoutInS = 60

  def main(args: Array[String]): Unit = {
    val timeoutInS = Try(args(1).toInt).toOption.getOrElse(defaultTimeoutInS)

    val system = ActorSystem("SearchHierarchy")

    val shutdown = system.actorOf(ShutDownActor.props)

    val monitor = system.actorOf(MonitorActor.props(shutdown, timeoutInS))

    val refs = (0 until Try(args(2).toInt).toOption.getOrElse(defaultNumPrinters)).map{x =>
      val name = "logfile" + x
      (name, system.actorOf(PrintActor.props(name, Try(args(3).toInt).toOption.getOrElse(defaultMonitorTickInterval), monitor)))
    }

    val root = system.actorOf(TraverseActor.props(new File(args(0)), refs))

    implicit val askTimeout = Timeout(timeoutInS seconds)

    var isTimedOut = false

    while(!isTimedOut){
      Thread.sleep(30000)
      val fut = (shutdown ? ShutDownYet).mapTo[Boolean]
      isTimedOut = Await.result(fut, timeoutInS seconds)
    }

    refs.foreach{ x =>
      val fw = new BufferedWriter(new FileWriter(new File(x._1), true))
      fw.write("{}\n]")
      fw.close()
    }

    system.terminate
  }

}

MonitorActor.scala

import MonitorActor.ShutDown
import akka.actor.{Actor, ActorRef, Props, ReceiveTimeout, Stash}
import io.circe.syntax._

import scala.concurrent.duration._

class MonitorActor(shutdownActor: ActorRef, timeoutInS: Int) extends Actor with Stash {

  context.setReceiveTimeout(timeoutInS seconds)

  override def receive: Receive = {
    case ReceiveTimeout =>
      shutdownActor ! ShutDown
  }

}

object MonitorActor {
  def props(shutdownActor: ActorRef, timeoutInS: Int) = Props(new MonitorActor(shutdownActor, timeoutInS))

  case object ShutDown
}

PrintActor.scala

import java.io.{BufferedWriter, File, FileWriter, PrintWriter}

import akka.actor.{Actor, ActorRef, Props, Stash}
import PrintActor.{Count, HeartBeat}

class PrintActor(name: String, interval: Int, monitorActor: ActorRef) extends Actor with Stash {

  val file = new File(name)

  override def preStart = {
    val fw = new BufferedWriter(new FileWriter(file, true))
    fw.write("[\n")
    fw.close()

    self ! Count(0)
  }

  override def receive: Receive = {
    case Count(c) =>
      context.become(withCount(c))
      unstashAll()

    case _ =>
      stash()
  }

  def withCount(c: Int): Receive = {
    case s: String =>
      val fw = new BufferedWriter(new FileWriter(file, true))
      fw.write(s)
      fw.write(",\n")
      fw.close()

      if (c == interval) {
        monitorActor ! HeartBeat
        context.become(withCount(0))
      } else {
        context.become(withCount(c+1))
      }
  }

}

object PrintActor {
  def props(name: String, interval: Int, monitorActor: ActorRef) = Props(new PrintActor(name, interval, monitorActor))

  case class Count(count: Int)

  case object HeartBeat
}

ShutDownActor.scala

import MonitorActor.ShutDown
import ShutDownActor.ShutDownYet
import akka.actor.{Actor, Props, Stash}

class ShutDownActor() extends Actor with Stash {

  override def receive: Receive = {
    case ShutDownYet => sender ! false
    case ShutDown => context.become(canShutDown())
  }

  def canShutDown(): Receive = {
    case ShutDownYet => sender ! true
  }

}

object ShutDownActor {
  def props = Props(new ShutDownActor())

  case object ShutDownYet
}

TraverseActor.scala

import java.io.File

import akka.actor.{Actor, ActorRef, PoisonPill, Props, ReceiveTimeout}
import io.circe.syntax._

import scala.collection.JavaConversions
import scala.concurrent.duration._
import scala.util.Try

class TraverseActor(start: File, printers: IndexedSeq[(String, ActorRef)]) extends Actor{

  val hash = start.hashCode()
  val mod = hash % printers.size
  val idx = if (mod < 0) -mod else mod
  val myPrinter = printers(idx)._2

  override def preStart = {
    self ! start
  }

  override def receive: Receive = {
    case f: File =>
      val path = f.getCanonicalPath
      val files = Option(f.list()).map(_.toIndexedSeq.map(x =>new File(path + "/" + x)))

      val directories = files.map(_.filter(_.isDirectory))

      directories.foreach(ds => processDirectories(ds))

      val entities = files.map{fs =>
        fs.map{ f =>
          val path = f.getCanonicalPath
          val owner = Try(java.nio.file.Files.getOwner(f.toPath).toString).toOption.getOrElse("")
          val permissions = Try(java.nio.file.Files.getPosixFilePermissions(f.toPath).toString).toOption.getOrElse("")
          val attributes = Try(java.nio.file.Files.readAttributes(f.toPath, "lastModifiedTime,creationTime,lastAccessTime"))
          val lastModifiedTime = attributes.flatMap(a => Try(a.get("lastModifiedTime").toString)).toOption.getOrElse("")
          val creationTime = attributes.flatMap(a => Try(a.get("creationTime").toString)).toOption.getOrElse("")
          val lastAccessTime = attributes.flatMap(a => Try(a.get("lastAccessTime").toString)).toOption.getOrElse("")

          if (f.isDirectory) FileEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
          else DirectoryEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
        }
      }

      directories match {
        case Some(seq) =>
          seq match {
            case x+:xs =>
            case IndexedSeq() => self ! PoisonPill
          }
        case None => self ! PoisonPill
      }

      entities.foreach(e => myPrinter ! Contents(f.getCanonicalPath, e).asJson.toString)
  }

  def processDirectories(directories: IndexedSeq[File]): Unit = {
    def inner(fs: IndexedSeq[File]): Unit = {
      fs match {
        case x +: xs =>
          context.actorOf(TraverseActor.props(x, printers))
          processDirectories(xs)
        case IndexedSeq() =>
      }

    }

    directories match {
      case x +: xs =>
        self ! x
        inner(xs)
      case IndexedSeq() =>
    }
  }

}

object TraverseActor {
  def props(start: File, printers: IndexedSeq[(String, ActorRef)]) = Props(new TraverseActor(start, printers))
}

我只测试了一个小例子,所以这个程序在运行你的工作时可能会遇到问题。如果发生这种情况,请随时提出问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-14
    • 1970-01-01
    • 2017-09-18
    • 2013-06-04
    • 2015-12-03
    • 1970-01-01
    • 2015-08-25
    • 1970-01-01
    相关资源
    最近更新 更多