【发布时间】:2019-01-05 20:11:58
【问题描述】:
我需要遍历一个形状像树的 API。例如,目录结构或讨论线程。它可以通过以下流程建模:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil
val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))
如何遍历这些数据?我得到了以下工作:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))
val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))
val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)
source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
FlowShape(source.in, data.out)
}
val flow = Flow.fromGraph(loop)
Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)
system.terminate()
但是,由于我使用的是带缓冲区的流,因此流永远不会完成。
在上游完成且缓冲元素已耗尽时完成
我多次阅读Graph cycles, liveness, and deadlocks 部分,但仍在努力寻找答案。
这将创建一个活锁:
import java.util.concurrent.atomic.AtomicInteger
def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed
// should be > max loop(x)
val bufferSize = 10000
val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
ref ! seed
Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}
编辑:我添加了一个 git repo 来测试您的解决方案https://github.com/MasseGuillaume/source-unfold
【问题讨论】:
-
以下问题的答案可能会有所帮助,尤其是“无流循环”部分:stackoverflow.com/questions/32459329/…
-
这不行,itemFlow来自HTTP调用,我无权访问
nested函数。 -
示例代码中似乎有一些不正确的项目:
itemFlow未在任何地方使用,commentFlow未定义。此外,itemFlow似乎是Flow[ItemId, Item,...]类型,但被声明为Flow[ItemId, Data, ...]。 -
@RamonJRomeroyVigil 确实,已编辑。
标签: scala akka-stream