【问题标题】:Iterate data source asynchronously in batch and stop while remote return no data in Scala批量异步迭代数据源并在Scala中远程不返回数据时停止
【发布时间】:2017-08-30 19:08:36
【问题描述】:

假设我们有一个假数据源,它将批量返回它保存的数据

class DataSource(size: Int) {
    private var s = 0
    implicit val g = scala.concurrent.ExecutionContext.global
    def getData(): Future[List[Int]] = {
        s = s + 1
        Future {
        Thread.sleep(Random.nextInt(s * 100))
        if (s <= size) {
            List.fill(100)(s)
        } else {
            List()
        }
    }

}
object Test extends App {
    val source = new DataSource(100)
    implicit val g = scala.concurrent.ExecutionContext.global

    def process(v: List[Int]): Unit = {
        println(v)
    }

    def next(f: (List[Int]) => Unit): Unit = {
        val fut = source.getData()
        fut.onComplete {
            case Success(v) => {
                f(v)
                v match {
                    case h :: t => next(f)
                }
            }
        }
    }

    next(process)

    Thread.sleep(1000000000)
}

我有我的,这里的问题是有些部分更不纯净。理想情况下,我想将每个批次的 Future 包装成一个大的未来,并且当最后一批返回 0 大小列表时,包装器未来是否成功?我的情况有点来自this 的帖子,next() 有同步调用,而我的也是异步的。

或者有没有可能做我想做的事?下一批只有在上一个最终解决后才会获取下一批是否获取下一批取决于返回的大小?

浏览此类数据源的最佳方法是什么?是否有任何现有的 Scala 框架提供我正在寻找的功能? play 的 Iteratee, Enumerator, Enumeratee 是正确的工具吗?如果是这样,任何人都可以提供一个示例来说明如何使用这些设施来实现我正在寻找的东西吗?

编辑---- 在 chunjef 的帮助下,我刚刚尝试过。它确实对我有用。不过,我根据他的回答做了一些小改动。

Source.fromIterator(()=>Iterator.continually(source.getData())).mapAsync(1)    (f=>f.filter(_.size > 0))
    .via(Flow[List[Int]].takeWhile(_.nonEmpty))
    .runForeach(println)

但是,有人可以比较 Akka Stream 和 Play Iteratee 吗?是否值得我也试试 Iteratee?


代码片段 1:

Source.fromIterator(() => Iterator.continually(ds.getData)) // line 1
    .mapAsync(1)(identity) // line 2
    .takeWhile(_.nonEmpty) // line 3
    .runForeach(println)   // line 4

代码片段 2:假设 getData 依赖于另一个流的其他输出,我想将它与以下流连接起来。但是,它会产生太多文件打开错误。不确定是什么导致了这个错误,如果我理解正确的话,mapAsync 的吞吐量被限制为 1。

Flow[Int].mapConcat[Future[List[Int]]](c => {
  Iterator.continually(ds.getData(c)).to[collection.immutable.Iterable]
}).mapAsync(1)(identity).takeWhile(_.nonEmpty).runForeach(println)

【问题讨论】:

  • 该代码有效,对吧?
  • 有点,是的。我正在寻找更好的版本。我粘贴的代码我不认为太 scala 风格。 :)
  • 使用 Iteratee 或 Akka Stream,而不是重新发明轮子
  • @cchantep 这就是我想知道的,怎么做?官方文档不好理解。

标签: scala


【解决方案1】:

以下是使用 Akka Streams 实现相同行为的一种方法,使用您的 DataSource 类:

import scala.concurrent.Future
import scala.util.Random

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._

object StreamsExample extends App {
  implicit val system = ActorSystem("Sandbox")
  implicit val materializer = ActorMaterializer()

  val ds = new DataSource(100)

  Source.fromIterator(() => Iterator.continually(ds.getData)) // line 1
        .mapAsync(1)(identity) // line 2
        .takeWhile(_.nonEmpty) // line 3
        .runForeach(println)   // line 4
}

class DataSource(size: Int) {
  ...
}

简化的逐行概述:

  • line 1:如果有下游需求,则创建一个持续调用ds.getData 的流源。
  • line 2mapAsync 是一种处理Futures 的流元素的方法。在这种情况下,流元素的类型为Future[List[Int]]。参数1 是并行级别:我们在这里指定1,因为DataSource 内部使用可变变量,大于1 的并行级别可能会产生意想不到的结果。 identityx =&gt; x 的简写,这基本上意味着对于每个 Future,我们将其结果传递到下游而不进行转换。
  • line 3:本质上,只要Future 的结果是非空的List[Int],就会调用ds.getData。如果遇到空的List,则终止处理。
  • line 4runForeach 在这里接受一个函数 List[Int] =&gt; Unit 并为每个流元素调用该函数。

【讨论】:

  • 如果数据到达末尾,getData 将返回 0 大小的列表。在这种情况下,身份应该是一个过滤器吗?或者getData返回第一个0列表时如何停止流?
  • @TodoChen:正如我在答案中提到的,如果遇到空的List.takeWhile(_.nonEmpty) 将完成流。 identity 不应替换为过滤器。
  • 你能看看我在问题中的编辑吗?这两个代码片段有什么区别?如何避免迭代器运行太快?我认为演员系统中的某个地方可以设置开始获取数据的演员数量?
【解决方案2】:

理想情况下,我想将每个批次的 Future 包装成一个大的未来,并且当最后一批返回 0 大小列表时包装器未来成功?

我想你正在寻找Promise

您将在开始第一次迭代之前设置Promise

这会给你promise.futureFuture,然后你可以用它来跟踪所有事情的完成。

在您的onComplete 中,添加一个case _ =&gt; promise.success()

类似

def loopUntilDone(f: (List[Int]) => Unit): Future[Unit] = {
  val promise = Promise[Unit]

  def next(): Unit = source.getData().onComplete {
        case Success(v) => 
            f(v)
            v match {
                case h :: t => next()
                case _ => promise.success()
            }      
        case Failure(e) => promise.failure(e)
  }


  // get going
  next(f)

  // return the Future for everything
  promise.future
}


// future for everything, this is a `Future[Unit]`
// its `onComplete` will be triggered when there is no more data
val everything = loopUntilDone(process)

【讨论】:

  • 是的,我想到了。但我认为必须有一些更优雅的方式。这样客户端代码就可以像调用 IteratorLike 的 foreach 或 next 一样调用遍历?
  • 您的意思是您想要一个Iterator[Future[Int]] 来为您提供未批处理的单个元素?我认为您不能这样做,因为您必须等待下一批到达,然后迭代器才能知道是否应该有另一个元素。
  • 您可能希望了解反应式流处理的替代方法。
  • 我想要 i = new MyIteratorLike(s:Datasource) Val l: List[Futuer[List[Int]]] = i.foreach(f:()=>Unit)
  • 那行不通,因为您需要能够在不阻塞的情况下遍历List,并且您的List 不知道在该批次到达之前是否还有另一批次。
【解决方案3】:

您可能正在寻找反应式流库。我个人最喜欢的(也是我最熟悉的)是Monix。这就是它在 DataSource 不变的情况下的工作方式

import scala.concurrent.duration.Duration
import scala.concurrent.Await

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global

object Test extends App {
    val source = new DataSource(100)
    val completed = // <- this is Future[Unit], completes when foreach is done
        Observable.repeat(Observable.fromFuture(source.getData()))
            .flatten // <- Here it's Observable[List[Int]], it has collection-like methods
            .takeWhile(_.nonEmpty)
            .foreach(println)

    Await.result(completed, Duration.Inf)
}

【讨论】:

  • 非常感谢。为什么选择 monic 而不是 Akka Stream?
  • @TododoChen 个人偏好,主要是。我使用 monix Task 作为 cats 的 IO monad 和 Future 的替换,所以更方便。另外,我发现 monix 在某种程度上更容易理解。
【解决方案4】:

我刚刚发现通过使用 flatMapConcat 可以实现我想要实现的目标。没有必要再开始另一个问题,因为我已经有了答案。将我的示例代码放在这里,以防有人正在寻找类似的答案。

这种类型的 API 对于传统企业应用程序之间的某些集成非常常见。 DataSource 用于模拟 API,而对象 App 用于演示客户端代码如何利用 Akka Stream 来使用 API。

在我的小项目中,API 是在 SOAP 中提供的,我使用 scalaxb 将 SOAP 转换为 Scala 异步样式。通过对象 App 中演示的客户端调用,我们可以使用 AKKA Stream 使用 API。感谢大家的帮助。

class DataSource(size: Int) {
    private var transactionId: Long = 0
    private val transactionCursorMap: mutable.HashMap[TransactionId, Set[ReadCursorId]] = mutable.HashMap.empty
    private val cursorIteratorMap: mutable.HashMap[ReadCursorId, Iterator[List[Int]]] = mutable.HashMap.empty
    implicit val g = scala.concurrent.ExecutionContext.global

    case class TransactionId(id: Long)

    case class ReadCursorId(id: Long)

    def startTransaction(): Future[TransactionId] = {
        Future {
            synchronized {
                transactionId += transactionId
            }
            val t = TransactionId(transactionId)
            transactionCursorMap.update(t, Set(ReadCursorId(0)))
            t
        }
    }

    def createCursorId(t: TransactionId): ReadCursorId = {
        synchronized {
            val c = transactionCursorMap.getOrElseUpdate(t, Set(ReadCursorId(0)))
            val currentId = c.foldLeft(0l) { (acc, a) => acc.max(a.id) }
            val cId = ReadCursorId(currentId + 1)
            transactionCursorMap.update(t, c + cId)
            cursorIteratorMap.put(cId, createIterator)
            cId
        }
    }

    def createIterator(): Iterator[List[Int]] = {
        (for {i <- 1 to 100} yield List.fill(100)(i)).toIterator
    }

    def startRead(t: TransactionId): Future[ReadCursorId] = {
        Future {

            createCursorId(t)
        }
    }

    def getData(cursorId: ReadCursorId): Future[List[Int]] = {

        synchronized {
            Future {
                Thread.sleep(Random.nextInt(100))
                cursorIteratorMap.get(cursorId) match {
                    case Some(i) => i.next()
                    case _ => List()
                }
            }
        }
    }


}


object Test extends App {
    val source = new DataSource(10)
    implicit val system = ActorSystem("Sandbox")
    implicit val materializer = ActorMaterializer()
    implicit val g = scala.concurrent.ExecutionContext.global
    //
    //  def process(v: List[Int]): Unit = {
    //    println(v)
    //  }
    //
    //  def next(f: (List[Int]) => Unit): Unit = {
    //    val fut = source.getData()
    //    fut.onComplete {
    //      case Success(v) => {
    //        f(v)
    //        v match {
    //
    //          case h :: t => next(f)
    //
    //        }
    //      }
    //
    //    }
    //
    //  }
    //
    //  next(process)
    //
    //  Thread.sleep(1000000000)

    val s = Source.fromFuture(source.startTransaction())
      .map { e =>
          source.startRead(e)
      }
      .mapAsync(1)(identity)
      .flatMapConcat(
          e => {
              Source.fromIterator(() => Iterator.continually(source.getData(e)))
          })
      .mapAsync(5)(identity)
      .via(Flow[List[Int]].takeWhile(_.nonEmpty))
      .runForeach(println)


    /*
      val done = Source.fromIterator(() => Iterator.continually(source.getData())).mapAsync(1)(identity)
        .via(Flow[List[Int]].takeWhile(_.nonEmpty))
        .runFold(List[List[Int]]()) { (acc, r) =>
          //      println("=======" + acc + r)
          r :: acc
        }

      done.onSuccess {

        case e => {
          e.foreach(println)
        }

      }
      done.onComplete(_ => system.terminate())
    */
}

【讨论】:

    猜你喜欢
    • 2015-08-24
    • 1970-01-01
    • 2021-05-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多