【发布时间】:2019-01-18 14:07:16
【问题描述】:
我是 pytorch 的新手,我正在尝试探索它与 spark 一起使用的可行性(目前我正在独立使用 spark)。
至于现在,我在一个非常具体的话题上苦苦挣扎。 让我们从一个非常简单的模型开始:
# linmodel.py
import torch
import torch.nn as nn
import numpy as np
def standardize(x):
return (x - np.mean(x)) / np.std(x)
def add_noise(y):
rnd = np.random.randn(y.shape[0])
return y + rnd
def cost(target, predicted):
cost = torch.sum((torch.t(target) - predicted) ** 2)
return cost
class LinModel(nn.Module):
def __init__(self, in_size, out_size):
super(LinModel, self).__init__() # always call parent's init
self.linear = nn.Linear(in_size, out_size, bias=False) # layer parameters
def forward(self, x):
return self.linear(x)
它实例化了一个基本的线性模型,以及一些实用函数。 目标是逼近一个目标矩阵,并跟踪 渐变行为。
我正在努力实现以下目标:
- 创建我的目标矩阵
- 在工作人员上拆分输入
- 在工作器上实例化模型和优化器
- 计算输入子集的近似值
- 检索梯度以进行进一步分析
在第 5 点之前一切正常。 代码如下:
#test.py
import torch
import torch.nn as nn
import numpy as np
import torch.optim
from torch.autograd import Variable
from pyspark import SparkContext
import linmodel
def prepare_input(nsamples=400):
Xold = np.linspace(0, 1000, nsamples).reshape([nsamples, 1])
X = linmodel.standardize(Xold)
W = np.random.randint(1, 10, size=(5, 1))
Y = W.dot(X.T) # target
for i in range(Y.shape[1]):
Y[:, i] = linmodel.add_noise(Y[:, i])
x = Variable(torch.from_numpy(X), requires_grad=False).type(torch.FloatTensor)
y = Variable(torch.from_numpy(Y), requires_grad=False).type(torch.FloatTensor)
print("created torch variables {} {}".format(x.size(), y.size()))
return x, y, W
def initialize(tup):
x, y = tup[0] # data
m, o = tup[1] # model and optimizer
model, optimizer = torch_step(x, y, m, o)
# here we have the gradients
print('gradient: {}'.format([param.grad.data for param in model.parameters()]))
return (x, y), (model, optimizer)
def create_model():
model = linmodel.LinModel(1, 5)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
return model, optimizer
def torch_step(x, y, model, optimizer):
prediction = model(x)
loss = linmodel.cost(y, prediction)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return model, optimizer
def main(sc, num_partitions=4):
x, y, W = prepare_input()
parts_x = list(torch.split(x, int(x.size()[0] / num_partitions)))
parts_y = list(torch.split(y, int(x.size()[0] / num_partitions), 1))
rdd_models = sc.parallelize([create_model() for _ in range(num_partitions)]).repartition(num_partitions)
rdd_x = sc.parallelize(parts_x).repartition(num_partitions)
rdd_y = sc.parallelize(parts_y).repartition(num_partitions)
parts = rdd_x.zip(rdd_y) # [((100x1), (5x100)), ...]
full = parts.zip(rdd_models).map(initialize).cache()
models_out = full.map(lambda x: x[1][0]).collect()
test_model = models_out[0]
print(type(test_model))
print('gradient: {}'.format([param.grad.data for param in test_model.parameters()]))
if __name__ == '__main__':
sc = SparkContext(appName='test')
main(sc)
正如您在 cmets 中看到的,当函数 initialize 映射到完整的 rdd 时,如果您检查执行程序的日志,您会发现要计算的梯度。
当我收集结果并尝试访问驱动程序上的相同属性时,我收到AttributeError: 'NoneType' object has no attribute 'data'
这意味着所有model.grad 属性都设置为None。
我确定我在这里遗漏了一些重要的东西,但我看不到它。
感谢任何提示。
非常感谢。
【问题讨论】:
-
听起来无法完全序列化。常见问题,如果您使用依赖本机代码的对象。
-
是的。你是对的。试过
torch.save、json和pickle。没运气。最好往其他方向前进。非常感谢。 -
我可能错了,但
save/load应该可以正常工作。
标签: python-3.x pyspark pytorch