【问题标题】:IPython cluster and PicklingErrorIPython 集群和 PicklingError
【发布时间】:2013-12-21 01:18:46
【问题描述】:

我的问题似乎与This Thread 相似,但是,虽然我认为我正在遵循建议的方法,但我仍然收到 PicklingError。当我在本地运行我的进程而不发送到 IPython 集群引擎时,该函数工作正常。

我在 IPyhon 的 notebook 上使用 zipline,所以我先创建一个基于 zipline.TradingAlgorithm 的类

单元格 [1]

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()

单元格 [2]

%%px --local  # This insures that the Class and modules exist on each engine
import zipline as zpl
import numpy as np

class Agent(zpl.TradingAlgorithm):  # must define initialize and handle_data methods
    def initialize(self):
        self.valueHistory = None
        pass

    def handle_data(self, data):
        for security in data.keys():
            ## Just randomly buy/sell/hold for each security
            coinflip = np.random.random()
            if coinflip < .25:
                self.order(security,100)
            elif coinflip > .75:
                self.order(security,-100)
        pass

单元格 [3]

from zipline.utils.factory import load_from_yahoo

start = '2013-04-01'
end   = '2013-06-01'
sidList = ['SPY','GOOG']
data = load_from_yahoo(stocks=sidList,start=start,end=end)

agentList = []
for i in range(3):
    agentList.append(Agent())

def testSystem(agent,data):
    results = agent.run(data)  #-- This is how the zipline based class is executed
    #-- next I'm just storing the final value of the test so I can plot later
    agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])
    return agent

for i in range(10):
    tasks = []
    for agent in agentList:
        #agent = testSystem(agent,data)  ## On its own, this works!
        #-- To Test, uncomment the above line and comment out the next two 
        tasks.append(lview.apply_async(testSystem,agent,data))
    agentList = [ar.get() for ar in tasks]

for agent in agentList:
    plot(agent.valueHistory)

这是产生的错误:

PicklingError                             Traceback (most recent call last)/Library/Python/2.7/site-packages/IPython/kernel/zmq/serialize.pyc in serialize_object(obj, buffer_threshold, item_threshold)
    100         buffers.extend(_extract_buffers(cobj, buffer_threshold))
    101 
--> 102     buffers.insert(0, pickle.dumps(cobj,-1))
    103     return buffers
    104 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果我用类似的东西覆盖 zipline.TradingAlgorithm 中的 run() 方法:

def run(self, data):
    return 1

尝试这样的事情......

def run(self, data):
    return zpl.TradingAlgorithm.run(self,data)

导致相同的 PicklingError。

然后传递给引擎工作,但显然没有执行测试的胆量。由于 run 是 zipline.TradingAlgorithm 内部的一种方法,我不知道它所做的一切,我如何确保它通过?

【问题讨论】:

    标签: python ipython pickle ipython-notebook zipline


    【解决方案1】:

    zipline TradingAlgorithm 对象在运行后似乎不可腌制:

    import zipline as zpl
    
    class Agent(zpl.TradingAlgorithm):  # must define initialize and handle_data methods
        def handle_data(self, data):
            pass
    
    agent = Agent()
    pickle.dumps(agent)[:32] # ok
    
    agent.run(data)
    pickle.dumps(agent)[:32] # fails
    

    但这向我表明,您应该在引擎上创建代理,并且只来回传递数据/结果(理想情况下,根本不传递数据,或最多传递一次)。

    最小化数据传输可能如下所示:

    定义类:

    %%px
    import zipline as zpl
    import numpy as np
    
    class Agent(zpl.TradingAlgorithm):  # must define initialize and handle_data methods
        def initialize(self):
            self.valueHistory = []
    
        def handle_data(self, data):
            for security in data.keys():
                ## Just randomly buy/sell/hold for each security
                coinflip = np.random.random()
                if coinflip < .25:
                    self.order(security,100)
                elif coinflip > .75:
                    self.order(security,-100)
    

    加载数据

    %%px
    from zipline.utils.factory import load_from_yahoo
    
    start = '2013-04-01'
    end   = '2013-06-01'
    sidList = ['SPY','GOOG']
    
    data = load_from_yahoo(stocks=sidList,start=start,end=end)
    agent = Agent()
    

    并运行代码:

    def testSystem(agent, data):
        results = agent.run(data)  #-- This is how the zipline based class is executed
        #-- next I'm just storing the final value of the test so I can plot later
        agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])
    
    # create references to the remote agent / data objects
    agent_ref = parallel.Reference('agent')
    data_ref =  parallel.Reference('data')
    
    tasks = []
    for i in range(10):
        for j in range(len(rc)):
            tasks.append(lview.apply_async(testSystem, agent_ref, data_ref))
    # wait for the tasks to complete
    [ t.get() for t in tasks ]
    

    并绘制结果,从不自己获取代理

    %matplotlib inline
    import matplotlib.pyplot as plt
    
    for history in rc[:].apply_async(lambda : agent.valueHistory):
        plt.plot(history)
    

    这与您共享的代码并不完全相同 - 三个代理在您的所有引擎上来回弹跳,而每个引擎都有一个代理。我对 zipline 的了解还不够,无法说明这对您是否有用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-11-22
      • 1970-01-01
      • 2016-06-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-06-02
      相关资源
      最近更新 更多