【问题标题】:akka streams stops after parallelismakka 流在并行后停止
【发布时间】:2017-08-01 13:04:19
【问题描述】:

我尝试使用 akka-streams(对它的理解有限)和 Apache 的 pdfbox 构建一个小型 PDF 解析器。 我没有真正得到的一件事是,流恰好在 mapAsync 中给出的 parallelism 给定数量之后停止。 因此,如果 PDF 文档有 20 页并且并行度设置为 5,则前 5 页将被处理,其余的将被忽略,如果设置为 20,则一切正常。有人知道我做错了什么吗?

  class PdfParser(ws: WSClient, conf: Configuration, parallelism: Int) {

  implicit val system = ActorSystem("image-parser")

  implicit val materializer = ActorMaterializer()

  def documentPages(doc: PDDocument, key: String): Iterator[Page] = {

    val pages: util.List[_] = doc.getDocumentCatalog.getAllPages
    val pageList = (for {
      i ← 0 until pages.size()
      page = pages.get(i)
    } yield Page(page, s"$key-$i.jpg")).toIterator
    pageList
  }

  val pageToImage: Flow[Page, Image, NotUsed] = Flow[Page].map { p ⇒
    val img = p.content.asInstanceOf[PDPage].convertToImage()
    Image(img, p.name)
  }

  val imageToS3: Flow[Image, String, NotUsed] = Flow[Image].mapAsync(parallelism) { i ⇒

    val s3 = S3.fromConfiguration(ws, conf)
    val bucket = s3.getBucket("elsa-essays")
    val baos = new ByteArrayOutputStream()
    ImageIO.write(i.content, "jpg", baos)
    val res = bucket add BucketFile(i.name, "image/jpeg", baos.toByteArray)
    res.map { _ ⇒
      "uploaded"
    }.recover {
      case e: S3Exception ⇒ e.message
    }
  }

  val sink: Sink[String, Future[String]] = Sink.head[String]

  def parse(path: Path, key: String): Future[String] = {

    val stream: InputStream = new FileInputStream(path.toString)
    val doc = PDDocument.load(stream)
    val source = Source.fromIterator(() ⇒ documentPages(doc, key))

    val runnable: RunnableGraph[Future[String]] = source.via(pageToImage).via(imageToS3).toMat(sink)(Keep.right)
    val res = runnable.run()

    res.map { s ⇒
      doc.close()
      stream.close()
      s
    }

  }

}

【问题讨论】:

    标签: scala playframework akka-stream


    【解决方案1】:

    问题出在您的 Sink 中。 Sink.head 将从您的物化流中返回一个元素。所以问题是,为什么在流实现中使用 mapAsync(>1) 时会收到多个值?也许是因为它使用了不止一个参与者来推动价值。

    无论如何,请将您的水槽改为:

    val sink: Sink[String, Future[String]] = Sink.fold("")((a, b) => b ++ a)
    

    它会起作用的。

    【讨论】:

    • 像魅力一样工作,比我想象的更有意义。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-05-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多