【问题标题】:How to fetch from one stream first and then pass to another stream如何先从一个流中获取然后传递到另一个流
【发布时间】:2019-02-27 00:56:53
【问题描述】:

我需要调用 cassandra 来获取日期,然后将获取的日期传递给另一个将数据插入数据库的流。

def fetchDate: Future[Done] =
  readJournal(persistenceKey)
    .drop(3)
    .take(1)
    .map(l => l.mydate)
    .runWith(Sink.ignore)

def insertRowsToDb: Future[Done] = 
  readJournal(somePersistenceKey)
    .drop(4)
    .take(1)
    .map(data => MyClass(data))
    .mapAsync(1) { myData => 
      for {
        insert <- myRepository.insert(data.id, fetchDate) //error here because fetchDate is unavailable 
      }
    }

  class MyRepository(tableName: String) {
    def insert(id: String, fetchedDate: Long): Future[Int] =
      config.db.run {
        sqlu"""INSERT INTO #${tableName}
           VALUES (
            ${id},
            ${fetchedDate}
           )
          """
      }

问题

  • 如何先执行fetchDate,然后将其结果传递给myRepository.insert 行?

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    你有两个问题。首先是fetchDate 甚至没有日期!解决这个问题意味着使用Sink 运行,而不仅仅是“忽略”:

    def fetchDateFut: Future[Date] =
      readJournal(persistenceKey)
        .drop(3)
        .take(1)
        .map(l => l.mydate)
        .runWith(Sink.last)
    

    然后,您需要flatMap 您的Future 将该日期纳入范围:

    def insertRowsToDb: Future[Done] = fetchDateFut.flatMap { fetchDate: Date =>
      readJournal(somePersistenceKey)
        .drop(4)
        .take(1)
        .map(data => MyClass(data))
        .mapAsync(1) { myData => 
          for {
            insert <- myRepository.insert(data.id, fetchDate)
          }
        }
    }
    

    【讨论】:

    • 谢谢!有没有办法获取从readJournal 返回的最后一个元素?而不是做drop(4)take(1)。我想在没有预先知道会有多少的情况下抓住最后一个事件
    • @Anthony 我没有意识到这是你的情况。在这种情况下,只需完全删除 .drop(3).take(1)Sink.last 已经获得了流中的最后一个元素。
    猜你喜欢
    • 2010-12-24
    • 1970-01-01
    • 1970-01-01
    • 2021-05-26
    • 1970-01-01
    • 1970-01-01
    • 2011-06-03
    • 2023-02-01
    • 1970-01-01
    相关资源
    最近更新 更多