【发布时间】:2017-08-01 13:04:19
【问题描述】:
我尝试使用 akka-streams(对它的理解有限)和 Apache 的 pdfbox 构建一个小型 PDF 解析器。 我没有真正得到的一件事是,流恰好在 mapAsync 中给出的 parallelism 给定数量之后停止。 因此,如果 PDF 文档有 20 页并且并行度设置为 5,则前 5 页将被处理,其余的将被忽略,如果设置为 20,则一切正常。有人知道我做错了什么吗?
class PdfParser(ws: WSClient, conf: Configuration, parallelism: Int) {
implicit val system = ActorSystem("image-parser")
implicit val materializer = ActorMaterializer()
def documentPages(doc: PDDocument, key: String): Iterator[Page] = {
val pages: util.List[_] = doc.getDocumentCatalog.getAllPages
val pageList = (for {
i ← 0 until pages.size()
page = pages.get(i)
} yield Page(page, s"$key-$i.jpg")).toIterator
pageList
}
val pageToImage: Flow[Page, Image, NotUsed] = Flow[Page].map { p ⇒
val img = p.content.asInstanceOf[PDPage].convertToImage()
Image(img, p.name)
}
val imageToS3: Flow[Image, String, NotUsed] = Flow[Image].mapAsync(parallelism) { i ⇒
val s3 = S3.fromConfiguration(ws, conf)
val bucket = s3.getBucket("elsa-essays")
val baos = new ByteArrayOutputStream()
ImageIO.write(i.content, "jpg", baos)
val res = bucket add BucketFile(i.name, "image/jpeg", baos.toByteArray)
res.map { _ ⇒
"uploaded"
}.recover {
case e: S3Exception ⇒ e.message
}
}
val sink: Sink[String, Future[String]] = Sink.head[String]
def parse(path: Path, key: String): Future[String] = {
val stream: InputStream = new FileInputStream(path.toString)
val doc = PDDocument.load(stream)
val source = Source.fromIterator(() ⇒ documentPages(doc, key))
val runnable: RunnableGraph[Future[String]] = source.via(pageToImage).via(imageToS3).toMat(sink)(Keep.right)
val res = runnable.run()
res.map { s ⇒
doc.close()
stream.close()
s
}
}
}
【问题讨论】:
标签: scala playframework akka-stream