【问题标题】:nodejs functional programming with generators and promises使用生成器和承诺的 nodejs 函数式编程
【发布时间】:2019-06-10 09:04:36
【问题描述】:

总结

node.js 中的函数式编程是否足够通用?它可以用来解决处理小批量数据库记录的实际问题,而无需使用toArray 将所有记录加载到内存中(因此内存不足)。你可以阅读this criticism for background。我们想通过异步生成器演示此类 node.js 库的 Mux and DeMux 和 fork/tee/join 功能。

上下文

我质疑在 node.js 中使用任何函数式编程工具(如 ramdalodashimlazy)甚至自定义函数式编程的有效性和通用性。

给定

来自 MongoDB 游标的数百万条记录可以使用 await cursor.next() 进行迭代

您可能想read more about async generators and for-await-of

对于可以使用的假数据(在节点 10 上)

function sleep(ms) {
    return new Promise((resolve) => setTimeout(resolve, ms));
}
async function* getDocs(n) {
  for(let i=0;i<n;++i) {
     await sleep(1);
     yield {i: i, t: Date.now()};
  }
}
let docs=getDocs(1000000);

通缉

我们需要

  • 第一个文档
  • 最后一个文档
  • 文档数量
  • 分成 n 个文档的批次/批量,并为该批量发出 socket.io 事件

确保第一个和最后一个文档包含在批次中并且没有被消耗。

约束

数以百万计的记录不应该加载到 ram 中,一个应该迭代它们并且最多只保存一批。

该要求可以使用通常的 nodejs 代码来完成,但可以使用诸如 here 中的 applyspec 之类的东西来完成。

R.applySpec({
  first: R.head(),
  last: R.last(),
  _: 
    R.pipe(
      R.splitEvery(n),
      R.map( (i)=> {return "emit "+JSON.stringify(i);})
    ) 
})(input)

【问题讨论】:

  • 您能否澄清实际问题?更多的是关于“这可以在功能上完成”还是“如何在功能上做到这一点”或“在功能上这样做不好”?
  • 也就是说,所提到的问题确实暗示了诸如流和惰性评估之类的词(我在这里故意含糊其辞),并且没有什么能阻止您以功能方式执行此操作。
  • 是否让 Javascript 中的函数式编程与在 clojure 或 Haskell 中一样有趣?不,这更费力,因为您无法使用丰富的工具集。如果你不喜欢这样,可以使用 JS 作为编译目标。
  • 无论您的范例是否有效,您打算如何执行此操作:“确保第一个和最后一个文档包含在批次中并且不被消耗?”获得第一个是合理的;你只需要在整个过程中保持参考,但最后一个要困难得多。如果你能在其他范式中展示你是如何做到这一点的,也许我们可以提出一个功能性的替代方案。
  • 即使是铁杆函数式程序员也会在情况允许时牺牲性能。将退化的情况弯曲成一个人为的函数解决方案可能是一个有趣的难题,但这不是我们在生产中所做的。

标签: node.js functional-programming lodash ramda.js


【解决方案1】:

为了展示如何使用 vanilla JS 对其进行建模,我们可以介绍折叠异步生成器的想法,该生成器生成可以组合在一起的东西。

const foldAsyncGen = (of, concat, empty) => (step, fin) => async asyncGen => {
  let acc = empty
  for await (const x of asyncGen) {
    acc = await step(concat(acc, of(x)))
  }
  return await fin(acc)
}

这里的论点分为三个部分:

  • (of, concat, empty) 期望一个函数产生“可组合”事物,该函数将结合两个“可组合”事物和一个“可组合”事物的空/初始实例
  • (step, fin) 期望一个函数在每一步都采用“可组合”事物,并产生一个用于下一步的“可组合”事物的 Promise,以及一个将在生成器已用尽并产生最终结果的Promise
  • async asyncGen 是要处理的异步生成器

在 FP 中,“可组合”事物的概念被称为 Monoid,它定义了一些详细说明将两者组合在一起的预期行为的规律。

然后我们可以创建一个 Monoid,用于在单步执行生成器时传递第一个、最后一个和一批值。

const Accum = (first, last, batch) => ({
  first,
  last,
  batch,
})

Accum.empty = Accum(null, null, []) // an initial instance of `Accum`

Accum.of = x => Accum(x, x, [x])    // an `Accum` instance of a single value

Accum.concat = (a, b) =>            // how to combine two `Accum` instances together
  Accum(a.first == null ? b.first : a.first, b.last, a.batch.concat(b.batch))

为了捕捉刷新累积批次的想法,我们可以创建另一个函数,该函数接受一个 onFlush 函数,该函数将在返回的 Promise 中执行一些操作,其中值被刷新,大小 n刷新批次。

Accum.flush = onFlush => n => acc =>
  acc.batch.length < n ? Promise.resolve(acc)
                       : onFlush(acc.batch.slice(0, n))
                           .then(_ => Accum(acc.first, acc.last, acc.batch.slice(n)))

我们现在还可以定义如何折叠Accum 实例。

Accum.foldAsyncGen = foldAsyncGen(Accum.of, Accum.concat, Accum.empty)

定义了上述实用程序后,我们现在可以使用它们来为您的特定问题建模。

const emit = batch => // This is an analog of where you would emit your batches
  new Promise((resolve) => resolve(console.log(batch)))

const flushEmit = Accum.flush(emit)

// flush and emit every 10 items, and also the remaining batch when finished
const fold = Accum.foldAsyncGen(flushEmit(10), flushEmit(0))

最后运行你的例子。

fold(getDocs(100))
  .then(({ first, last })=> console.log('done', first, last))

【讨论】:

  • 谢谢你的建议,我是问FP包的完整性和通用性,不是自己构建包。
  • 也许您应该更新您的问题以删除“上下文”部分下的“自定义”工具请求。
  • 你有一个出色的答案。我的意思是一个自定义的 npm 包,而不是命名的包,但我希望它是通用的,以便我们质疑它的通用性。我的问题是,如果你以npm包的形式发布上面的代码,你认为它会通用吗?
  • 大部分是完全通用的,以至于Accum 对象的 Monoid 实例可以从 First、Last 和 Batch 三个独立的 Monoid 实例派生。此示例特有的唯一逻辑是为调用flushEmit 函数的foldSyncGen 定义step 函数。
【解决方案2】:

我不确定是否应该暗示函数式编程在处理大量数据时会比命令式编程在性能方面提供任何优势。

我认为您需要在工具包中添加另一个工具,可能是 RxJS

RxJS 是一个使用可观察序列编写异步和基于事件的程序的库。

如果您不熟悉 RxJS 或一般的响应式编程,我的示例肯定会看起来很奇怪,但我认为熟悉这些概念是一项不错的投资

在您的情况下,可观察序列是您的 MongoDB 实例,它会随着时间的推移发出记录。

我要伪造你的数据库:

var db = range(1, 5);

range 函数是一个 RxJS 的东西,它会在提供的范围内发出一个值。

db.subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 2
//=> record 3
//=> record 4
//=> record 5

现在我只对第一条和最后一条记录感兴趣。

我可以创建一个只发出第一条记录的 observable,然后创建另一个只发出最后一条记录的 observable:

var db = range(1, 5);
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});
//=> record 1
//=> record 5

但是我还需要批量处理所有记录:(在本例中,我将创建每批 10 条记录)

var db = range(1, 100);
var batches = db.pipe(bufferCount(10))
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, batches, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 1,2,3,4,5,6,7,8,9,10
//=> record 11,12,13,14,15,16,17,18,19,20
//=> record 21,22,23,24,25,26,27,28,29,30
//=> record 31,32,33,34,35,36,37,38,39,40
//=> record 41,42,43,44,45,46,47,48,49,50
//=> record 51,52,53,54,55,56,57,58,59,60
//=> record 61,62,63,64,65,66,67,68,69,70
//=> record 71,72,73,74,75,76,77,78,79,80
//=> record 81,82,83,84,85,86,87,88,89,90
//=> record 91,92,93,94,95,96,97,98,99,100
//=> record 100

正如您在输出中看到的,它已经发出:

  1. 第一条记录
  2. 十批,每批 10 条记录
  3. 最后一条记录

我不会尝试为你解决你的练习,而且我对 RxJS 不太熟悉,无法对此进行过多扩展。

我只是想向您展示另一种方式,让您知道可以将其与函数式编程结合起来。

希望对你有帮助

【讨论】:

  • 这不是作业。我质疑一般性,因为有人可能会建议在数百万条记录上使用 resolve toArray() ,而只需要一小部分(将其乘以许多请求)。不,您的答案无效,因为如果一个管道使用文档,另一个管道将看不到它。 read this
  • 我将编辑问题以提供有效的假数据。
  • 抱歉,我知道这不是家庭作业。在内存中处理数百万条记录是行不通的,因此我想让您了解 RxJS。请注意,在 RxJS 中,pipe 在订阅 observable 之前不会执行任何操作。我不知道你对 RxJS 了解多少,所以我故意让我的回答尽可能简单。另外,我自己也不是 RxJS 专家。我认为答案可能不是您想要的,但我认为它确实提供了一些见解。
  • 我添加了假数据。并感谢您建议 RxJs 我需要调查一下。
  • 你可以 see here, first() 确实使用了批处理中的文档
【解决方案3】:

我想我前段时间可能已经为您开发了一个答案,名为scramjet。它是轻量级的(node_modules 中没有数千个依赖项),易于使用,并且确实使您的代码非常易于理解和阅读。

让我们从你的案例开始:

DataStream
    .from(getDocs(10000))
    .use(stream => {
        let counter = 0;

        const items = new DataStream();
        const out = new DataStream();

        stream
            .peek(1, async ([first]) => out.whenWrote(first))
            .batch(100)
            .reduce(async (acc, result) => {
                await items.whenWrote(result);

                return result[result.length - 1];
            }, null)
            .then((last) => out.whenWrote(last))
            .then(() => items.end());

        items
            .setOptions({ maxParallel: 1 })
            .do(arr => counter += arr.length)
            .each(batch => writeDataToSocketIo(batch))
            .run()
            .then(() => (out.end(counter)))
        ;

        return out;
    })
    .toArray()
    .then(([first, last, count]) => ({ first, count, last }))
    .then(console.log)
;

所以我真的不同意 javascript FRP 是一种反模式,我认为我没有唯一的答案,但是在开发第一个提交时,我发现 ES6 箭头语法和 async/await 写在一个链式方式使代码易于理解。

这是来自OpenAQ 特别是this line in their fetch process 的超燃冲压发动机代码示例:

return DataStream.fromArray(Object.values(sources))
  // flatten the sources
  .flatten()
  // set parallel limits
  .setOptions({maxParallel: maxParallelAdapters})
  // filter sources - if env is set then choose only matching source,
  //   otherwise filter out inactive sources.
  // * inactive sources will be run if called by name in env.
  .use(chooseSourcesBasedOnEnv, env, runningSources)
  // mark sources as started
  .do(markSourceAs('started', runningSources))
  // get measurements object from given source
  // all error handling should happen inside this call
  .use(fetchCorrectedMeasurementsFromSourceStream, env)
  // perform streamed save to DB and S3 on each source.
  .use(streamMeasurementsToDBAndStorage, env)
  // mark sources as finished
  .do(markSourceAs('finished', runningSources))
  // convert to measurement report format for storage
  .use(prepareCompleteResultsMessage, fetchReport, env)
  // aggregate to Array
  .toArray()
  // save fetch log to DB and send a webhook if necessary.
  .then(
    reportAndRecordFetch(fetchReport, sources, env, apiURL, webhookKey)
  );

它描述了每个数据源发生的一切。所以这是我的建议,以供提问。 :)

【讨论】:

  • 如何在不从批次流中消耗它们的情况下生成第一个和最后一个文档?在 Ramda 中,我使用了 'applySpec'
  • BTW 对我来说,多线程不是解决这个问题的。我们只是在循环文件而不是发明火箭
  • @MuayyadAlsadi 嗯...不是因为我没有足够清楚地阅读 Wanted 部分。让我重写一下。
  • 有趣 - @MuayyadAlsadi 你读过文档吗,scramjet 默认是多线程的?由于情况并非如此,我需要稍微重写文档和介绍。
  • @MuayyadAlsadi 似乎 scramjet 缺少将项目从流末尾切出的方法,因此应该在库中出现奇怪的代码。它可以满足您的需求,但是如果您有几分钟的空闲时间,也许我们可以使用聊天功能-我很乐意听到一些建设性的批评...
【解决方案4】:

这里有两个使用RxJsscramjet 的解决方案。

这是RxJs solution

诀窍是使用share(),这样first()last() 就不会从迭代器中消费,forkJoin 用于将它们组合起来以使用这些值发出 done 事件。

function ObservableFromAsyncGen(asyncGen) {
  return Rx.Observable.create(async function (observer) {
    for await (let i of asyncGen) {
      observer.next(i);
    }
    observer.complete();
  });  
}
async function main() {
  let o=ObservableFromAsyncGen(getDocs(100));
  let s = o.pipe(share());
  let f=s.pipe(first());
  let e=s.pipe(last());
  let b=s.pipe(bufferCount(13));
  let c=s.pipe(count());
  b.subscribe(log("bactch: "));
  Rx.forkJoin(c, f, e, b).subscribe(function(a){console.log(
    "emit done with count", a[0], "first", a[1], "last", a[2]);})
}

这是一个scramjet,但这不是纯粹的(函数有副作用)

async function main() {
  let docs = getDocs(100);
  let first, last, counter;
  let s0=Sj.DataStream
    .from(docs)
    .setOptions({ maxParallel: 1 })
    .peek(1, (item)=>first=item[0])
    .tee((s)=>{
        s.reduce((acc, item)=>acc+1, 0)
        .then((item)=>counter=item);
    })
    .tee((s)=>{
        s.reduce((acc, item)=>item)
        .then((item)=>last=item);
    })
    .batch(13)
    .map((batch)=>console.log("emit batch"+JSON.stringify(batch));
  await s0.run();
  console.log("emit done "+JSON.stringify({first: first, last:last, counter:counter}));
}

我将与 @michał-kapracki 合作开发它的纯版本。

【讨论】:

    【解决方案5】:

    针对这类问题,我制作了这个库:ramda-generators

    希望它是您正在寻找的:函数式 JavaScript 中流的惰性求值

    唯一的问题是我不知道如何在不重新运行生成器的情况下从流中获取最后一个元素和元素的数量

    在不解析内存中的整个数据库的情况下计算结果的可能实现可能是这样的:

    Try it on repl.it

    const RG = require("ramda-generators");
    const R  = require("ramda");
    
    const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
    
    const getDocs = amount => RG.generateAsync(async (i) => {
        await sleep(1);
        return { i, t: Date.now() };
    }, amount);
    
    const amount = 1000000000;
    
    (async (chunkSize) => {
        const first = await RG.headAsync(getDocs(amount).start());
        const last  = await RG.lastAsync(getDocs(amount).start()); // Without this line the print of the results would start immediately 
    
        const DbIterator = R.pipe(
            getDocs(amount).start,
            RG.splitEveryAsync(chunkSize),
            RG.mapAsync(i => "emit " + JSON.stringify(i)),
            RG.mapAsync(res => ({ first, last, res })),
        );
    
        for await (const el of DbIterator()) 
            console.log(el);
    
    })(100);
    

    【讨论】:

    • 您能否提供一个可能对原帖作者有所帮助的代码示例?
    • 正好是时候添加函数来跨多个函数分发同一个迭代器
    猜你喜欢
    • 2018-07-20
    • 2015-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-07
    • 2016-06-22
    相关资源
    最近更新 更多