【发布时间】:2021-12-20 18:41:43
【问题描述】:
所以我的目标基本上是实现global top-k subsampling。梯度稀疏化非常简单,我已经在stateful clients example 上完成了这个构建,但现在我想使用你推荐的编码器here at page 28。此外,我只想平均非零梯度,所以假设我们有 10 个客户端,但在通信轮次的给定位置只有 4 个客户端具有非零梯度,那么我想将这些梯度的总和除以 4,而不是 10。我希望通过在分母处对分子和掩码、1 和 0 处的梯度求和来实现这一点。同样向前迈进,我将为渐变选择添加随机性,因此我必须与渐变选择同时创建这些蒙版。我现在的代码是
import tensorflow as tf
from tensorflow_model_optimization.python.core.internal import tensor_encoding as te
@te.core.tf_style_adaptive_encoding_stage
class GrandienrSparsificationEncodingStage(te.core.AdaptiveEncodingStageInterface):
"""An example custom implementation of an `EncodingStageInterface`.
Note: This is likely not what one would want to use in practice. Rather, this
serves as an illustration of how a custom compression algorithm can be
provided to `tff`.
This encoding stage is expected to be run in an iterative manner, and
alternatively zeroes out values corresponding to odd and even indices. Given
the determinism of the non-zero indices selection, the encoded structure does
not need to be represented as a sparse vector, but only the non-zero values
are necessary. In the decode mehtod, the state (i.e., params derived from the
state) is used to reconstruct the corresponding indices.
Thus, this example encoding stage can realize representation saving of 2x.
"""
ENCODED_VALUES_KEY = 'stateful_topk_values'
INDICES_KEY = 'indices'
SHAPES_KEY = 'shapes'
ERROR_COMPENSATION_KEY = 'error_compensation'
def encode(self, x, encode_params):
shapes_list = [tf.shape(y) for y in x]
flattened = tf.nest.map_structure(lambda y: tf.reshape(y, [-1]), x)
gradients = tf.concat(flattened, axis=0)
error_compensation = encode_params[self.ERROR_COMPENSATION_KEY]
gradients_and_error_compensation = tf.math.add(gradients, error_compensation)
percentage = tf.constant(0.1, dtype=tf.float32)
k_float = tf.multiply(percentage, tf.cast(tf.size(gradients_and_error_compensation), tf.float32))
k_int = tf.cast(tf.math.round(k_float), dtype=tf.int32)
values, indices = tf.math.top_k(tf.math.abs(gradients_and_error_compensation), k = k_int, sorted = False)
indices = tf.expand_dims(indices, 1)
sparse_gradients_and_error_compensation = tf.scatter_nd(indices, values, tf.shape(gradients_and_error_compensation))
new_error_compensation = tf.math.subtract(gradients_and_error_compensation, sparse_gradients_and_error_compensation)
state_update_tensors = {self.ERROR_COMPENSATION_KEY: new_error_compensation}
encoded_x = {self.ENCODED_VALUES_KEY: values,
self.INDICES_KEY: indices,
self.SHAPES_KEY: shapes_list}
return encoded_x, state_update_tensors
def decode(self,
encoded_tensors,
decode_params,
num_summands=None,
shape=None):
del num_summands, decode_params, shape # Unused.
flat_shape = tf.math.reduce_sum([tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]])
sizes_list = [tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]]
scatter_tensor = tf.scatter_nd(
indices=encoded_tensors[self.INDICES_KEY],
updates=encoded_tensors[self.ENCODED_VALUES_KEY],
shape=[flat_shape])
nonzero_locations = tf.nest.map_structure(lambda x: tf.cast(tf.where(tf.math.greater(x, 0), 1, 0), tf.float32) , scatter_tensor)
reshaped_tensor = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
zip(tf.split(scatter_tensor, sizes_list), encoded_tensors[self.SHAPES_KEY])]
reshaped_nonzero = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
zip(tf.split(nonzero_locations, sizes_list), encoded_tensors[self.SHAPES_KEY])]
return reshaped_tensor, reshaped_nonzero
def initial_state(self):
return {self.ERROR_COMPENSATION_KEY: tf.constant(0, dtype=tf.float32)}
def update_state(self, state, state_update_tensors):
return {self.ERROR_COMPENSATION_KEY: state_update_tensors[self.ERROR_COMPENSATION_KEY]}
def get_params(self, state):
encode_params = {self.ERROR_COMPENSATION_KEY: state[self.ERROR_COMPENSATION_KEY]}
decode_params = {}
return encode_params, decode_params
@property
def name(self):
return 'gradient_sparsification_encoding_stage'
@property
def compressible_tensors_keys(self):
return False
@property
def commutes_with_sum(self):
return False
@property
def decode_needs_input_shape(self):
return False
@property
def state_update_aggregation_modes(self):
return {}
我按照您在here at page 45 列出的步骤手动运行了一些简单的测试。它有效,但我有一些疑问/问题。
- 当我使用相同形状的张量列表(例如:2 个 2x25 张量)作为编码的输入 x 时,它可以正常工作,但是当我尝试使用不同形状的张量列表(2x20 和 6x10)时,它给出和错误提示
InvalidArgumentError:所有输入的形状必须匹配:values[0].shape = [2,20] != values1.shape = [6,10] [Op:Pack] 名称:packed
我该如何解决这个问题?正如我所说,我想使用全局 top-k,所以我必须一次编码整个可训练模型的权重。以cnn model used here为例,所有的张量都有不同的形状。
- 如何进行我在开始时描述的平均?比如here你已经完成了
mean_factory = tff.aggregators.MeanFactory( tff.aggregators.EncodedSumFactory(mean_encoder_fn), # 分子 tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator )
有没有一种方法可以重复此操作,将 decode 的一个输出用于分子,而将另一个输出用于分母?如何处理 0 除以 0? tensorflow 有 divide_no_nan 函数,我可以使用它还是需要在每个函数中添加 eps?
-
使用编码器时如何处理分区?每个客户端是否都有一个唯一的编码器,为它保存一个唯一的状态?正如您所讨论的,here at page 6 客户端状态用于跨筒仓设置,但如果客户端顺序发生变化会发生什么情况?
-
Here 你推荐使用stateful clients example。你能再解释一下吗?我的意思是在 run_one_round 中,编码器的确切位置以及它们如何与客户端更新和聚合一起使用/结合?
-
我有一些额外的信息,例如我想传递给编码的稀疏性。这样做的建议方法是什么?
【问题讨论】:
-
你能试着澄清一下这个问题吗?您到底想实现/实现什么?
-
嗨,Jakub。我更新了问题。希望我设法解释了我想要实现的目标。如果您能再看一遍,我将不胜感激。提前致谢