【问题标题】:How to list folders and files in a directory using ReactiveX如何使用 ReactiveX 列出目录中的文件夹和文件
【发布时间】:2017-05-15 23:25:14
【问题描述】:

当将 Observables 用于涉及大量链接和大量异步操作的某些任务时,例如列出文件夹中的所有项目并检查其中的所有文件夹以查找特定文件时,我通常最终要么需要为每个任务构建复杂的链(return Observable.of(folder)...)或具有某种特殊值,该值被转发到末尾以表示批处理结束(每个操作员都以 if(res === false) return Observable.of(false) 开头)。

有点像你在结账时放在你的杂货和你面前的人的那些之间的那根棍子。

似乎应该有一种更好的方法,不涉及通过各种回调和运算符转发停止值。

那么什么是调用函数的好方法,该函数接受文件夹路径字符串并返回其中所有文件和文件夹的列表。它还指定文件是否为 HTML 文件,以及文件夹是否包含名为 tiddlywiki.json 的文件。

唯一的要求是它不能返回像Observable.of(...)... 这样的东西。它可能应该在链的顶部有一个主题,但这不是必需的。

function listFolders(folder) {
    return [
        { type: 'folder', name: 'folder1' },
        { type: 'datafolder', name: 'folder2' }, //contains "tiddlywiki.json" file
        { type: 'folder', name: 'folder3' },
        { type: 'htmlfile', name: 'test.html' },
        { type: 'other', name: 'mytest.txt' }
    ]
}

【问题讨论】:

  • 就目前而言,您的问题过于宽泛(从标题来看)。也就是说,如何递归搜索特定文件的文件夹目录结构的具体问题是一个很好的问题。我建议针对该特定目标重写问题,并详细说明(输入,预期输出)。您总是可以在问题的最后提及您的更大目标是批量等。
  • 好的,稍作编辑以更加强调这一点。
  • 如果您指定您正在寻找的功能会有所帮助。该函数的输入是什么?预期的输出是什么?你的方法是什么不起作用?你必须使用哪些函数来返回 observables?你的问题越具体,答案就越具体。据我所知,我只能告诉你concatMap 和一个递归函数应该可以相对简单地完成你想要实现的目标。
  • 好的,问一个更具体的场景。

标签: rxjs reactivex


【解决方案1】:

这是一个不遵循我制定的规则的规则(请参阅下面的规则),但使用第一个作为指导大约需要十分钟。

export function statFolder(subscriber, input: Observable<any>) {
    return input.mergeMap(([folder, tag]) => {
        return obs_readdir({ folder, tag })(folder);
    }).mergeMap(([err, files, { folder, tag }]) => {
        if (err) { return Observable.of({ error: err }) as any; }
        else return Observable.from(files).mergeMap(file => {
            return obs_stat([file,folder])(path.join(folder, file as string));
        }).map(statFolderEntryCB).mergeMap<any, any>((res) => {
            let [entry, [name, folder]] = res as [any, [string, string, number, any]];
            if (entry.type === 'folder')
                return obs_readdir([entry])(path.join(entry.folder, entry.name));
            else return Observable.of([true, entry]);
        }, 20).map((res) => {
            if (res[0] === true) return (res);
            let [err, files, [entry]] = res as [any, string[], [FolderEntry, number, any]];
            if (err) {
                entry.type = "error";
            } else if (files.indexOf('tiddlywiki.json') > -1)
                entry.type = 'datafolder';
            return ([true, entry]);
        }).reduce((n, [dud, entry]) => {
            n.push(entry);
            return n;
        }, []).map(entries => {
            return { entries, folder, tag };
        }) as Observable<{ entries: any, folder: any, tag: any }>;
    }).subscribe(subscriber);
}

原文:这花了几个小时来写...而且它可以工作...但是...它使用 concatMap,因此一次只能接受一个请求。它使用我为此目的编写的自定义运算符。

export function statFileBatch(subscriber, input: Observable<any>) {
    const signal = new Subject<number>();
    var count = 0;
    //use set timeout to fire after the buffer recieves this item
    const sendSignal = (item) => setTimeout(() => { count = 0; signal.next(item); });
    return input.concatMap(([folder, tag]) => {
        return obs_readdir({ folder, tag })(folder);
    }).lift({
        call: (subs: Subscriber<any>, source: Observable<any>) => {
            const signalFunction = (count) => signal.mapTo(1), forwardWhenEmpty = true;

            const waiting = [];
            const _output = new Subject();

            var _count = new Subject<number>()
            const countFactory = Observable.defer(() => {
                return Observable.create(subscriber => {
                    _count.subscribe(subscriber);
                })
            });

            var isEmpty = true;
            const sourceSubs = source.subscribe(item => {
                if (isEmpty && forwardWhenEmpty) {
                    _output.next(item);
                } else {
                    waiting.push(item)
                }
                isEmpty = false;
            })

            const pulse = new Subject<any>();
            const signalSubs = pulse.switchMap(() => {
                return signalFunction(countFactory)
            }).subscribe(count => {
                //act on the closing observable value
                var i = 0;
                while (waiting.length > 0 && i++ < count)
                    _output.next(waiting.shift());
                //if nothing was output, then we are empty
                //if something was output then we are not
                //this is meant to be used with bufferWhen
                if (i === 0) isEmpty = true;

                _count.next(i);
                _count.complete();
                _count = new Subject<number>();

                pulse.next();
            })
            pulse.next(); //prime the pump

            const outputSubs = Observable.create((subscriber) => {
                return _output.subscribe(subscriber);
            }).subscribe(subs) as Subscription;

            return function () {
                outputSubs.unsubscribe();
                signalSubs.unsubscribe();
                sourceSubs.unsubscribe();
            }
        }
    }).mergeMap(([err, files, { folder, tag }]) => {
        if (err) { sendSignal(err); return Observable.empty(); }
        return Observable.from(files.map(a => [a, folder, files.length, tag])) as any;
    }).mergeMap((res: any) => {
        let [file, folder, fileCount, tag] = res as [string, string, number, any];
        return obs_stat([file, folder, fileCount, tag])(path.join(folder, file))
    }, 20).map(statFolderEntryCB).mergeMap<any, any>((res) => {
        let [entry, [name, folder, fileCount, tag]] = res as [any, [string, string, number, any]];
        if (entry.type === 'folder')
            return obs_readdir([entry, fileCount, tag])(path.join(entry.folder, entry.name));
        else return Observable.of([true, entry, fileCount, tag]);
    }, 20).map((res) => {
        //if (res === false) return (false);
        if (res[0] === true) return (res);
        let [err, files, [entry, fileCount, tag]] = res as [any, string[], [FolderEntry, number, any]];
        if (err) {
            entry.type = "error";
        } else if (files.indexOf('tiddlywiki.json') > -1)
            entry.type = 'datafolder';
        return ([true, entry, fileCount, tag]);
    }).map(([dud, entry, fileCount, tag]) => {
        count++;
        if (count === fileCount) {
            sendSignal([count, tag]);
        }
        return entry;
    }).bufferWhen(() => signal).withLatestFrom(signal).map(([files, [sigResult, tag]]: any) => {
        return [
            typeof sigResult !== 'number' ? sigResult : null, //error object
            files,                                            //file list
            typeof sigResult === 'number' ? sigResult : null, //file count
            tag                                               //tag
        ];
    }).subscribe(subscriber);
}

【讨论】:

    猜你喜欢
    • 2011-10-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-22
    • 1970-01-01
    • 2011-06-29
    • 2013-12-25
    • 1970-01-01
    相关资源
    最近更新 更多