【问题标题】:in ReactiveX, how do I pass other parameters to Observer.create?在 ReactiveX 中,如何将其他参数传递给 Observer.create?
【发布时间】:2017-10-22 16:30:51
【问题描述】:

使用 RxPY 进行说明。

我想从一个函数创建一个 observable,但该函数必须带参数。这个特定示例必须以随机间隔返回我要发送给它的许多预定义代码之一。到目前为止,我的解决方案是使用闭包:

from __future__ import print_function

from rx import Observable
import random
import string
import time

def make_tickers(n = 300, s = 123):
    """ generates up to n unique 3-letter strings geach makde up of uppsercase letters"""
    random.seed(s)
    tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3)) for y in range(n)]
    tickers = list(set(tickers)) # unique
    print(len(tickers))
    return(tickers)

def spawn_prices_fn(tickers):
    """ returns a function that will return a random element 
    out of tickers every 20-100 ms, and takes an observable parameter """

    def spawner(observer):
        while True:
            next_tick = random.choice(tickers)
            observer.on_next(next_tick)
            time.sleep(random.randint(20, 100)/1000.0)

    return(spawner)


if __name__ == "__main__":
    spawned = spawn_prices_fn(make_tickers())
    xx = Observable.create(spawned)
    xx.subscribe(lambda s: print(s))

有没有更简单的方法?是否可以将更多参数发送到不需要闭包的 Observable.create 的第一个参数函数?什么是规范建议?

【问题讨论】:

  • 您期待什么样的模式?任何伪代码

标签: python reactivex rx-py


【解决方案1】:

它可以通过多种方式完成,这是一种不会过多更改代码的解决方案。 请注意,代码生成也可以分解为一个生成单个字符串的函数,并结合一些rx 魔术,使其更像rx-like

我还稍微调整了代码让flake8开心

from __future__ import print_function

import random
import string
import time

from rx import Observable


def make_tickers(n=300, s=123):
    """
    Generates up to n unique 3-letter strings each made up of uppercase letters
    """
    random.seed(s)
    tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3))
               for y in range(n)]
    tickers = list(set(tickers))  # unique
    print(len(tickers))
    return(tickers)


def random_picker(tickers):
    ticker = random.choice(tickers)
    time.sleep(random.randint(20, 100) / 1000.0)
    return ticker


if __name__ == "__main__":
    xx = Observable\
        .repeat(make_tickers())\
        .map(random_picker)\
        .subscribe(lambda s: print(s))

或者没有make_tickers的解决方案:

from __future__ import print_function

import random
import string
import time

from rx import Observable


def random_picker(tickers):
    ticker = random.choice(tickers)
    time.sleep(random.randint(20, 100) / 1000.0)
    return ticker


if __name__ == "__main__":
    random.seed(123)
    Observable.range(1, 300)\
        .map(lambda _: ''.join(random.choice(string.ascii_uppercase)
                               for _ in range(3)))\
        .reduce(lambda x, y: x + [y], [])\
        .do_while(lambda _: True)\
        .map(random_picker)\
        .subscribe(lambda s: print(s))

time.sleep 可以从random_picker 移开,但代码会变得有点棘手

【讨论】:

    【解决方案2】:

    您也可以使用“partials”来包装您的订阅方法。它允许您定义其他参数,但在仅等待 Observer 和 Scheduler 的方法上调用 rx.create:

    def my_subscription_with_arguments(observer, scheduler, arg1):
        observer.on_next(arg1)
    
    my_subscription_wrapper = functools.partial(my_subscription_with_arguments, arg1='hello')
    source = rx.create(my_subscription_wrapper)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-06-27
      • 2012-06-30
      • 1970-01-01
      • 1970-01-01
      • 2014-06-04
      • 2022-08-21
      相关资源
      最近更新 更多