【问题标题】:In rxjs, how do I chain mapping through arrays of data received from different API's?在 rxjs 中,如何通过从不同 API 接收的数据数组链接映射?
【发布时间】:2019-02-04 14:36:03
【问题描述】:

我正在调用一个 API 并接收一组结果,我正在检查分页,如果存在更多页面,我将调用下一页,重复直到没有更多页面。

对于每个结果数组,我调用另一个端点并执行完全相同的操作:我收到一个结果数组,检查另一个页面并再次调用端点。洗涤,冲洗重复。

例如:

我想获取一个可能是分页响应的国家/地区列表,然后对于每个国家/地区我想获取一个城市列表,这也可能是分页的。对于每个城市,我执行一组转换,然后存储在数据库中。

我已经试过了,但是卡住了:


const grabCountries = Observable.create(async (observer) => {
    const url = 'http://api.com/countries'
    let cursor = url
    do {

        const results = fetch(cursor)

        // results = { 
        //   data: [ 'Canada', 'France', 'Spain' ],
        //   next: '47asd8f76358df8f4058898fd8fab'
        // }

        results.data.forEach(country => { observer.next(country) })

        cursor = results.next ? `${url}/${results.next}` : undefined

    } while(cursor)

})


const getCities = {
    next: (country) => {
        const url = 'http://api.com/cities'
        let cursor = url
        do {

            const results = fetch(cursor)

            // results = {
            //     data: [ 
            //         'Montreal', 'Toronto', 
            //         'Paris', 'Marseilles', 
            //         'Barcelona', 'Madrid' 
            //     ],
            //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
            // }

            results.data.forEach(city => { 
                `**** What do I do here?? ****` 
            })

            cursor = results.next ? `${url}/${results.next}` : undefined

        } while(cursor)
    }
}

我尝试了几种方法:

制作主题(有时我需要根据“grabCountries”的结果进行并行处理。例如,我可能希望在抓取城市的同时将国家/地区存储在数据库中。)

const intermediateSubject = new Subject()

intermediateSubject.subscribe(storeCountriesInDatabase)
intermediateSubject.subscribe(getCities)

我也尝试过管道和映射,但似乎基本上是一样的。

在我写这篇文章的时候,我想到了这个解决方案,它似乎工作正常,我只是想知道我是否把它弄得太复杂了。在某些情况下,我需要连续进行几次 API 调用。 (想象一下,国家 => 州 => 城市 => 面包店 => 评论 => 评论 => 回复)所以这个奇怪的映射到另一个观察者回调模式可能会变得讨厌。

所以这就是我现在基本上拥有的:

// grabCountries stays the same as above, but the rest is as follows:

const grabCities = (country) =>
  Observable.create(async (observer) => {
    const url = `http://api.com/${country}/cities`
      let cursor = url
      do {
       const results = fetch(cursor)

       // results = {
       //     data: [
       //         'Montreal', 'Toronto',
       //         'Paris', 'Marseilles',
       //         'Barcelona', 'Madrid'
       //     ],
       //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
       // }

       results.data.forEach(city => {
         observer.next(city)
       })

    cursor = results.next ? `${url}/${results.next}` : undefined

    } while (cursor)
})

const multiCaster = new Subject()

grabCountries.subscribe(multiCaster)
multiCaster.pipe(map((country) => {
    grabCities(country).pipe(map(saveCityToDB)).subscribe()
})).subscribe()
multiCaster.pipe(map(saveCountryToDB)).subscribe()

tl;dr - 我调用了一个 API,该 API 接收数组中的一组分页结果,我需要映射每个项目并调用另一个 api 接收另一组分页结果,每个结果集也在一个数组中。

将一个 observable 嵌套在另一个 observable 中并通过 'callApiForCountries.pipe(map(forEachCountryCallApiForCities))' 映射结果是最好的方法还是您有任何其他建议?

【问题讨论】:

  • 这是您的全部用例吗?我非常喜欢 RXJS,但也许更简单的 Promise.all() 更适合这里。
  • @RandyCasburn 这不是用例的全部。最后,这将执行各种并行和顺序并动态构建的转换和/或副作用(日志记录、存储、更新外部数据库、队列等)。这是一个大项目的开始。我只是一次解决这件事情。
  • 令人着迷 - 听起来很有趣!

标签: javascript rxjs


【解决方案1】:

下面是应该与下一个 url 的顺序抓取一起使用的代码。 您从 {next:url} 开始,直到 res.next 不可用。

of({next:http://api.com/cities}).pipe(
    expand(res=>results.next ? `${url}/${results.next}` : undefined
    takeWhile(res=>res.next!==undefined)
).subscribe()

【讨论】:

    【解决方案2】:

    好的,所以我为此花费了很多脑力,并提出了两个似乎可行的解决方案。

    const nestedFlow = () => {
    	fetchAccountIDs.pipe(map(accountIds => {
    		getAccountPostIDs(accountIds) // Has the do loop for paging inside
    			.pipe(
    				map(fetchPostDetails),
    				map(mapToDBFormat),
    				map(storeInDB)
    			).subscribe()
    	})).subscribe()
    }
    
    
    const expandedflow = () => {
    	fetchAccountIDs.subscribe((accountId) => {
    		// accountId { accountId: '345367geg55sy'}
    		getAccountPostIDs(accountId).pipe(
    			expand((results) => {
    				/*
    				results : {
    					postIDs: [
    						131424234,
    						247345345,
    					],
    					cursor: '374fg8v0ggfgt94',
    				}
    				*/
    				const { postIDs, cursor } = results
    				if (cursor) return getAccountPostIDs({...accountId, cursor})
    				return { postIDs, cursor }
    			}),
    			takeWhile(hasCursor, true), // recurs until cursor is undefined
    			concatMap(data => data.postIDs), 
    			map(data => ({ post_id: data })), 
    			map(fetchPostDetails), 
    			map(mapToDBFormat), 
    			map(storeInDB) 
    		).subscribe()
    	})
    }

    两者似乎都具有相似的性能。我读到一些离开数据流是一种不好的做法,你应该管道所有的东西,但我不知道如何消除“expandedFlow”中的第一个出口,因为“expand”需要回调一个可观察的,但也许可以的。

    现在我只需要解决从getAccountPostIDs 中调用“完成”到最后一条记录存储在数据库中的竞争条件问题。目前在我的测试中,observer.complete 在 3 个 upsert 操作之前完成。

    感谢任何 cmets,我希望这对将来的人有所帮助。

    【讨论】:

      【解决方案3】:

      您需要的是expand operator。它以递归方式运行,因此符合分页结果的想法。

      【讨论】:

      • 感谢您的建议,这看起来可行。我会努力解决的。
      猜你喜欢
      • 1970-01-01
      • 2019-09-09
      • 2020-10-11
      • 1970-01-01
      • 2021-11-30
      • 1970-01-01
      • 1970-01-01
      • 2018-11-02
      • 1970-01-01
      相关资源
      最近更新 更多