【问题标题】:Akka Streams, source items as another source?Akka Streams,源项目作为另一个来源?
【发布时间】:2017-06-07 06:40:07
【问题描述】:

我正在使用Alpakka-FTP,但也许我正在寻找一个通用的 akka-stream 模式。 FTP 连接器可以列出文件或检索它们:

def ls(host: String): Source[FtpFile, NotUsed]
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]]

理想情况下,我想创建一个这样的流:

LIST
  .FETCH_ITEM
  .FOREACH(do something)

但我无法使用上面编写的两个函数创建这样的流。我觉得我应该可以使用Flow 到达那里,比如

Ftp.ls
  .via(some flow that uses the Ftp.fromPath above)
  .runWith(Sink.foreach(do something))

这可能吗,只考虑上面的lsfromPath 函数?

编辑:

我可以使用一个演员和mapAsync 来解决这个问题,但我仍然觉得应该更直接。

class Downloader extends Actor {
  override def receive = {
    case ftpFile: FtpFile =>
      Ftp.fromPath(Paths.get(ftpFile.path), settings)
        .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right)
        .run() pipeTo sender
  }
}

val downloader = as.actorOf(Props(new Downloader))

Ftp.ls("test_path", settings)
  .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult])
  .runWith(Sink.foreach(res => println("got it!" + res)))

【问题讨论】:

    标签: scala ftp akka-stream akka-camel alpakka


    【解决方案1】:

    您应该能够为此目的使用flatMapConcat。您的具体示例可以重写为

    Ftp.ls("test_path", settings).flatMapConcat{ ftpFile =>
      Ftp.fromPath(Paths.get(ftpFile.path), settings)
    }.runWith(FileIO.toPath(Paths.get("testHDF.txt")))
    

    文档here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-27
      • 2020-12-20
      • 2011-10-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多