【问题标题】:Approach to filtering and validating return values with rxjs使用 rxjs 过滤和验证返回值的方法
【发布时间】:2016-03-14 18:57:18
【问题描述】:

所以这是我试图弄清楚如何使用 rxjs 实现的场景:

  1. 从文件/数据库/等加载一些元数据集。元数据中的每个元素都有一个 id 和附加信息——比如实际数据的位置。目前,我在应用程序启动时异步加载所有这些元数据。加载此数据后,Observable 调用完成。最终我可能会添加刷新功能

  2. 稍后在应用程序中,我需要根据元数据中可用的内容加载特定的数据集。我目前正在尝试使用 fetchData(ids:string[]):Observable 之类的函数来执行此操作。这是我不清楚如何在 rxjs 范式下进行的地方。我同样不确定如何使用 fetchDatum(id:string):Observable

  3. 之类的函数请求单个项目

我当然可以使用过滤器仅对从 IMetadata Observable 发出的与列表中的名称之一匹配的 IMetadata 项目进行操作 - 但我还需要确认所有请求的项目都在 IMetadata Observable 排放中找到,并且如果不是我需要出错。

因此,如果有人使用 id = "Bob" 请求 IMetadata - 但源 Observable 没有发出这样的 IMetadata,那么它需要出错。或者如果他们请求 { "Shirley", "Rex", "Samantha" } 并且没有 "Rex" 的数据,那么它应该出错。

我考虑过在这里使用 Rx.Subject,但从我读到的内容来看,这在 rxjs 范式下通常是不可取的。请告知在 rxjs 范式下哪些方法适用于这种情况。谢谢!

【问题讨论】:

  • 你与 Rx 抗争是因为你并不真正想要一个流,你想要一个对象,metadata = {}(或者更好的是,Immutable.Map())。您的元数据随时间变化的唯一意义是您更新它。您可以使用持有元数据的 Rx 主题(作为单个对象,而不是其组成记录)。依赖于metadata 内容的程序的任何部分都可以订阅,并且每次更改时,他们都有机会进行相应的更新。

标签: rxjs rxjs5


【解决方案1】:

这是我想出的解决方案。这个函数创建一个依赖于 IBufferEvaluator 的 Observable,告诉它如何处理源 Observable 发出的每个项目。它可以将项目附加到缓冲区、跳过发出的项目、清除缓冲区、将缓冲区刷新给订阅者等。如果你找到更好的方法来做到这一点,请告诉我,特别是如果它是一个不合时宜的方法盒子 rxjs 解决方案。谢谢。

import Rx from 'rxjs/Rx';

export enum BufferAction {    
    APPEND, /** Append the current emission to the buffer and continue  **/
    SKIP, /** Do nothing, ignoring the current emission if applicable  **/
    FLUSH, /** This will ignore the current emission, if applicable, and flush the existing buffer contents */
    CLEAR, /** Clear the buffer contents. Ignore the current emission, if applicable */
    COMPLETE, /** Mark the Observable as Complete. The buffer will be cleared upon completion. **/
    APPEND_THEN_FLUSH,   /** Append the current emission to the buffer prior to flushing it  **/
    APPEND_THEN_COMPLETE, /** Append the current emission to the buffer and then complete **/
    CLEAR_THEN_APPEND, /** Clear the buffer contents and then append the current emission to it */
    FLUSH_THEN_APPEND, /** Flush the buffer contents and then append the current emission to it */
    FLUSH_THEN_COMPLETE, /** Flush the buffer contents and then mark the Observable as complete */
    APPEND_FLUSH_COMPLETE /** Append the current emission, flush the buffer, and then complete  */
}

export function bufferActionToString(action: BufferAction):string
{
    switch(action)
    {
        case BufferAction.APPEND: return "APPEND";
        case BufferAction.SKIP: return "SKIP";
        case BufferAction.FLUSH: return "FLUSH";
        case BufferAction.CLEAR: return "CLEAR";
        case BufferAction.COMPLETE: return "COMPLETE";
        case BufferAction.APPEND_THEN_FLUSH: return "APPEND_THEN_FLUSH";
        case BufferAction.APPEND_THEN_COMPLETE: return "APPEND_THEN_COMPLETE";
        case BufferAction.CLEAR_THEN_APPEND: return "CLEAR_THEN_APPEND";
        case BufferAction.FLUSH_THEN_APPEND: return "FLUSH_THEN_APPEND";
        case BufferAction.FLUSH_THEN_COMPLETE: return "FLUSH_THEN_COMPLETE";
        case BufferAction.APPEND_FLUSH_COMPLETE: return "APPEND_FLUSH_COMPLETE";
        default: return "Unrecognized Buffer Action [" + action + "]";
    }
}

export interface IBufferEvaluator<T>
{
    evalOnNext(next:T, buffer: T[]):BufferAction;
    evalOnComplete(buffer: T[]):BufferAction;
}

/** bufferWithEval.ts
 *  An Operator that buffers the emissions from the source Observable. As each emission is recieved,
 *  it and the buffered emissions are evaluated to determine what BufferAction to APPEND. You can APPEND
 *  the current emission value to the end of the buffered emissions, you can FLUSH the buffered emissions
 *  before or after appending the current emission value, you can SKIP the current emission value and then
 *  (optionally) FLUSH the buffer, and you can CLEAR the buffer before or after appending the current emission.
 *   
 *  The evalOnNext and evalOnComplete are expected to return a BufferAction to indicate
 *  which action to take. If no evalOnNext is supplied, it will default to APPENDing each emission. The evalOnComplete
 *  will default to FLUSH_THEN_COMPLETE. If evalOnNext or evalOnComplete throw an exception, the Observable will emit 
 *  the exception and cease.
 */
export function bufferWithEval<T>
    (   source: Rx.Observable<T>, 
        evaluatorFactory?: () => IBufferEvaluator<T>
    ) : Rx.Observable<T[]> 
{   
    /** if no evaluatorFactory supplied, use the default evaluatorFactory **/
    if(!evaluatorFactory)
    {
        evaluatorFactory = () => {
            return {
                evalOnNext : function(next: T, buffer: T[]) { return BufferAction.APPEND; },
                evalOnComplete : function(buffer: T[]) { return BufferAction.FLUSH; }
            };
        }
    }

    return new Rx.Observable<T[]>((subscriber: Rx.Subscriber<T[]>) => 
    {
        var _buffer = new Array<T>();          
        var _evaluator = evaluatorFactory();
        var _subscription: Rx.Subscription = null;

        function append(next: T)
        {
            _buffer.push(next);
        }

        function flush()
        {
            try
            {
                subscriber.next(_buffer);
            }
            finally
            {
                // Ignore any exceptions that come from subscriber.next()
                clear();
            }          
        }

        function clear()
        {
            _buffer = new Array<T>();
        }

        function next(next: T)
        {
            try
            {
                var action = _evaluator.evalOnNext(next, _buffer.slice(0));                
                switch(action)
                {
                    case BufferAction.APPEND: { append(next); break; }
                    case BufferAction.SKIP: { break; }
                    case BufferAction.FLUSH: { flush(); break; }
                    case BufferAction.CLEAR: { clear(); break; }
                    case BufferAction.COMPLETE: { complete(); break; }
                    case BufferAction.APPEND_THEN_FLUSH: { append(next); flush(); break; }
                    case BufferAction.APPEND_THEN_COMPLETE: { append(next); complete(); break; }
                    case BufferAction.APPEND_FLUSH_COMPLETE: { append(next); flush(); complete(); break; }
                    case BufferAction.CLEAR_THEN_APPEND: { clear(); append(next); break; }
                    case BufferAction.FLUSH_THEN_APPEND: { flush(); append(next); break; }
                    case BufferAction.FLUSH_THEN_COMPLETE: { flush(); complete(); break; }

                    default: throw new Error("next(): Invalid BufferAction '" + bufferActionToString(action) + "'");
                }   
            }
            catch(e)
            {
                error(e);
            }          
        }    

        function complete()
        {
            try
            {            
                var action = _evaluator.evalOnComplete(_buffer.slice(0));
                switch(action)
                {
                    case BufferAction.FLUSH_THEN_COMPLETE:
                    case BufferAction.FLUSH:  { flush(); } 

                    case BufferAction.CLEAR: 
                    case BufferAction.COMPLETE: { break; }                   

                    case BufferAction.APPEND:
                    case BufferAction.APPEND_THEN_FLUSH:
                    case BufferAction.APPEND_THEN_COMPLETE:
                    case BufferAction.APPEND_FLUSH_COMPLETE:
                    case BufferAction.SKIP: 
                    case BufferAction.CLEAR_THEN_APPEND: 
                    case BufferAction.FLUSH_THEN_APPEND: 
                    default: throw new Error("complete(): Invalid BufferAction '" + bufferActionToString(action) + "'");
                }

                clear();
                subscriber.complete();
                _subscription.unsubscribe();        
            }
            catch(e)
            {
                error(e);
            }           
        }        

        function error(err: any)
        {                  
            try
            {
                subscriber.error(err);
            }
            finally
            {   
                _subscription.unsubscribe();
            }
        }          

        _subscription = source.subscribe(next, error, complete);
        return _subscription;
    });
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-28
    • 2014-12-15
    • 1970-01-01
    • 2022-09-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多