【问题标题】:Subscribing to ReactiveCocoa signals in parallel, on a limited number of threads在有限数量的线程上并行订阅 ReactiveCocoa 信号
【发布时间】:2013-09-26 01:37:42
【问题描述】:

我订阅了这样创建的信号:

RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
    for (int i = 0; i < 100; i++) {
        [subscriber sendNext:[[RACSignal createSignal:^(... subscriber2) {
            NSString *string = someFunctionThatTakesALongTime(i);
            [subscriber2 sendNext:string];
            [subscriber2 sendComplete];
            return nil;
        }] setNameWithFormat:@"inside signal"]];
    }

    [subscriber sendComplete];
    return nil;
}] setNameWithFormat:@"outside signal"];

int n = 4;
[[signal flatten:n] subscribeNext:^(NSString *string) { ... }];

我希望-flatten: 并行订阅n 信号。我用[RACScheduler scheduler] 尝试了-startLazilyWithScheduler:block:[RACScheduler scheduler] 的“内部信号”,但这让我的电脑停了下来。在 Instruments 中,它似乎正在为每个信号创建一个新线程。

此代码的先前版本作为 NSOperations 添加到 NSOperationQueue,该队列设置为并行运行 n 操作。它有效,但我可以使用 RAC 使其更容易理解。

我如何从我的信号信号中一次-flatten: n 信号,以便内部信号每个都在相同的n 线程上运行?

======================================

更新:我找错树了;我的性能问题是由于物理内存不足。我猜有些物体的寿命太长了,导致我的记忆问题。我偶然在某个时候解决了我的内存使用问题,同时进行了重构以更频繁地使用 RAC。我不知道人们是否会从我的代码中受益,但这里是:

我使用这段代码从使用外部信号开始:

[[[self drawRects] flatten:self.maxProcesses] subscribeNext:^(NSDictionary *result) {
    @strongify(self);

    NSString *keyString = result[kDrawRectsResultsKeyKey];
    self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
    self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];

    [self setNeedsDisplayInRect:[result[kDrawRectsResultsRectKey] rectValue]];
}];

改为使用更多 RAC 操作(也替换同一类中的其他命令式代码):

// Get the latest zoomed drawing bounds and get the latest imageProvider's latest imagesByLocation
// Skip one of each signal to avoid firing immediately
RACSignal *zoomedDrawingBounds = [RACChannelTo(self, zoomedDrawingBounds) skip:1];
RACSignal *imagesFromImageProvider = [[[RACChannelTo(self, imageProvider) skip:1]
                                       map:^(id<PTWImageProvider> imageProvider) {
                                           return RACChannelTo(imageProvider, imagesByLocation);
                                       }]
                                      switchToLatest];

// Lift the drawing method, getting a signal of signals on each call
RACSignal *drawingSignals = [[self rac_liftSelector:@selector(drawingSignalsForRect:givenImagesByLocations:)
                               withSignalsFromArray:@[ zoomedDrawingBounds, imagesFromImageProvider, ]]
                             switchToLatest];

@weakify(self);

// Lift flatten: using maxProcesses so that if maxProcesses changes, the number of signals being
// flatten:ed can change almost immediately.
RACSignal *drawnRectanglesZoomed = [[[[drawingSignals
                                       rac_liftSelector:@selector(flatten:) withSignalsFromArray:@[ RACChannelTo(self, maxProcesses) ]]
                                      switchToLatest]
                                     doNext:^(NSDictionary *result) {
                                         @strongify(self);

                                         // side effects! store the rendered image and its associated image rep
                                         NSString *keyString = result[kDrawRectsResultsKeyKey];
                                         self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
                                         self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];
                                     }]
                                    map:^(NSDictionary *result) {
                                        // Extract the drawn rect from the results
                                        return result[kDrawRectsResultsRectKey];
                                    }];

RACSignal *drawnRectangles = [[drawnRectanglesZoomed
                               combineLatestWith:RACChannelTo(self, zoomLevel)]
                              map:^(RACTuple *tuple) {
                                  // Convert between zoomed and unzoomed coordinates
                                  CGRect zoomedRect = [[tuple first] rectValue];
                                  CGFloat zoomLevel = [[tuple second] floatValue];
                                  CGAffineTransform zoomTransform = CGAffineTransformMakeScale(zoomLevel, zoomLevel);
                                  return [NSValue valueWithRect:CGRectApplyAffineTransform(zoomedRect, zoomTransform)];
                              }];

// Lift setNeedsDisplayInRect: with results from the drawing signals, so setNeedsDisplayInRect: is called
// as tiles are rendered.
[self rac_liftSelector:@selector(setNeedsDisplayInRect:)
  withSignalsFromArray:@[ [drawnRectangles deliverOn:[RACScheduler mainThreadScheduler]] ]];

现在,如果我更新我的工作方法以在后台调度程序上返回冷信号,flatten: 会导致多个信号同时运行,没有问题:

RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
    for (int i = 0; i < 100; i++) {
        [subscriber sendNext:[[RACSignal startLazilyWithScheduler:[RACScheduler scheduler] block:^(... subscriber2) {
            NSString *string = someFunctionThatTakesALongTime(i);
            [subscriber2 sendNext:string];
            [subscriber2 sendComplete];
        }] setNameWithFormat:@"inside signal"]];
    }

    [subscriber sendComplete];
    return nil;
}] setNameWithFormat:@"outside signal"];

【问题讨论】:

    标签: objective-c reactive-cocoa


    【解决方案1】:

    +[RACScheduler scheduler] 每次调用时都会创建一个新的串行 GCD 队列,但仅此一项不会导致任何问题,因为 GCD 队列与操作系统线程没有直接关系。

    相反,问题可能在于+flatten: 在之前的信号完全完成之前订阅了新信号(即,这是由旧信号传递的事件发生的)。

    您可以通过延迟订阅内部信号来解决这个问题:

    RACSignal *workSignal = [[[[RACSignal
        // Wait for one scheduler iteration,
        return:RACUnit.defaultUnit]
        delay:0]
        // then actually do the work.
        then:^{
            return [[RACSignal
                createSignal:^(id<RACSubscriber> subscriber2) {
                    NSString *string = someFunctionThatTakesALongTime(i);
                    [subscriber2 sendNext:string];
                    [subscriber2 sendComplete];
                    return nil;
                }]
                // Invokes the above block on a new background scheduler.
                subscribeOn:[RACScheduler scheduler]];
        }]
        setNameWithFormat:@"inside signal"];
    
    [subscriber sendNext:workSignal];
    

    但是,这似乎是不必要的复杂。当队列结束时,GCD 会自动降低你的线程数,所以我怀疑这种改变是否真的值得。

    【讨论】:

    • 我接受了这个答案,但我的最终解决方案完全不同。我用更多信息更新了我的帖子。
    猜你喜欢
    • 2016-06-26
    • 1970-01-01
    • 1970-01-01
    • 2016-02-10
    • 2017-10-28
    • 2021-09-02
    • 1970-01-01
    • 2011-01-15
    • 2013-07-11
    相关资源
    最近更新 更多