【问题标题】:In Rx instead of only getting the last debounced object, can I get the complete sequence?在 Rx 中,我可以得到完整的序列,而不是只得到最后一个去抖动的对象吗?
【发布时间】:2018-05-07 00:41:55
【问题描述】:

我想知道其中一个去抖动的物体是否是一个绿球。在去抖之前或之后仅过滤绿球会导致错误的行为。

【问题讨论】:

  • 看起来您需要一个“分组依据”运算符。
  • 我可以回答这个问题,但您已将您的问题标记为rxjsrx-java。它是哪一个? JavaScript 还是 Java?
  • Rxjava 具体来说,我标记了两者,因为我认为答案是相同的,只是语法不同。
  • 你想用那些绿色的做什么?

标签: rxjs rx-java reactive


【解决方案1】:

您可以将缓冲区运算符与去抖动运算符一起使用。这是一个非常基本的例子:

// This is our event stream. In this example we only track mouseup events on the document
const move$ = Observable.fromEvent(document, 'mouseup');

// We want to create a debounced version of the initial stream
const debounce$ = move$.debounceTime(1000);

// Now create the buffered stream from the initial move$ stream. 
// The debounce$ stream can be used to emit the values that are in the buffer
const buffered$ = move$.buffer(debounce$);

// Subscribe to your buffered stream
buffered$.subscribe(res => console.log('Buffered Result: ', res));

【讨论】:

  • 我刚刚看到您的问题是针对 rx java 的。我的示例专门针对 rxjs,但它应该与 java 的工作方式相同。
  • 非常感谢!这成功了!我还发现the documentation 写了 RxJava 版本,这是RxJava 1.x buffer 下拉列表中的最后一个示例
【解决方案2】:

如果我正确理解您想要实现的目标,您可能需要构建一个 Observable ,它发出某种对象,其中包含源值(即在您的情况下为蓝色、红色、绿色)以及指示的标志去抖动值中是否有绿色。

如果是这样,您可以尝试按照这些思路编写代码

const s = new Subject<string>();

setTimeout(() => s.next('B'), 100);
setTimeout(() => s.next('G'), 1100);
setTimeout(() => s.next('B'), 1200);
setTimeout(() => s.next('G'), 1300);
setTimeout(() => s.next('R'), 1400);
setTimeout(() => s.next('B'), 2400);

let hasGreen = false;

s
.do(data => hasGreen =  hasGreen || data === 'G')
.debounceTime(500)
.map(data => ({data, hasGreen})) // this map has to come before the following do
.do(() => hasGreen =  false)
.subscribe(data => console.log(data))

注意顺序。特别是,您必须将创建要发出的对象的 map 运算符放在重置变量的 do 之前。

【讨论】:

    【解决方案3】:

    这可以通过一组重要的运算符来完成,并通过引入额外的通道对流程产生副作用:

    import java.util.Queue;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.junit.Test;
    
    import io.reactivex.*;
    import io.reactivex.functions.Consumer;
    import io.reactivex.schedulers.*;
    import io.reactivex.subjects.PublishSubject;
    
    public class DebounceTimeDrop {
    
        @Test
        public void test() {
            PublishSubject<Integer> source = PublishSubject.create();
    
            TestScheduler scheduler = new TestScheduler();
    
            source.compose(debounceTime(10, TimeUnit.MILLISECONDS, scheduler, v -> {
                System.out.println(
                        "Dropped: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS));
            }))
            .subscribe(v -> System.out.println(
                    "Passed: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)),
                    Throwable::printStackTrace, 
                    () -> System.out.println(
                            "Done "  + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)));
    
            source.onNext(1);
            scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    
            scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
    
            source.onNext(2);
            scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
            source.onNext(3);
            scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
            source.onNext(4);
            scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
            source.onNext(5);
            scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    
            scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
    
            source.onNext(6);
            scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    
            scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
    
            source.onComplete();
        }
    
        public static <T> ObservableTransformer<T, T> debounceTime(
                long time, TimeUnit unit, Scheduler scheduler, 
                Consumer<? super T> dropped) {
            return o -> Observable.<T>defer(() -> {
                AtomicLong index = new AtomicLong();
                Queue<Timed<T>> queue = new ConcurrentLinkedQueue<>();
    
                return o.map(v -> {
                    Timed<T> t = new Timed<>(v, 
                        index.getAndIncrement(), TimeUnit.NANOSECONDS);
                    queue.offer(t);
                    return t;
                })
                .debounce(time, unit, scheduler)
                .map(v -> {
                    while (!queue.isEmpty()) {
                        Timed<T> t = queue.peek();
                        if (t.time() < v.time()) {
                            queue.poll();
                            dropped.accept(t.value());
                        } else
                        if (t == v) {
                            queue.poll();
                            break;
                        }
                    }
                    return v.value();
                })
                .doOnComplete(() -> {
                    while (!queue.isEmpty()) {
                        dropped.accept(queue.poll().value());
                    }
                });
            });
        }
    }
    

    打印

    Passed: 1 @ T=10
    Dropped: 2 @ T=43
    Dropped: 3 @ T=43
    Dropped: 4 @ T=43
    Passed: 5 @ T=43
    Passed: 6 @ T=73
    Done  @ T=93
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-07-14
      • 2023-04-02
      • 1970-01-01
      • 1970-01-01
      • 2018-04-11
      • 2021-04-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多