【问题标题】:how to apply custom encoders to multiple clients at once? how to use custom encoders in run_one_round?如何一次将自定义编码器应用于多个客户端?如何在 run_one_round 中使用自定义编码器?
【发布时间】:2021-12-20 18:41:43
【问题描述】:

所以我的目标基本上是实现global top-k subsampling。梯度稀疏化非常简单,我已经在stateful clients example 上完成了这个构建,但现在我想使用你推荐的编码器here at page 28。此外,我只想平均非零梯度,所以假设我们有 10 个客户端,但在通信轮次的给定位置只有 4 个客户端具有非零梯度,那么我想将这些梯度的总和除以 4,而不是 10。我希望通过在分母处对分子和掩码、1 和 0 处的梯度求和来实现这一点。同样向前迈进,我将为渐变选择添加随机性,因此我必须与渐变选择同时创建这些蒙版。我现在的代码是

import tensorflow as tf

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te


@te.core.tf_style_adaptive_encoding_stage
class GrandienrSparsificationEncodingStage(te.core.AdaptiveEncodingStageInterface):
  """An example custom implementation of an `EncodingStageInterface`.
  Note: This is likely not what one would want to use in practice. Rather, this
  serves as an illustration of how a custom compression algorithm can be
  provided to `tff`.
  This encoding stage is expected to be run in an iterative manner, and
  alternatively zeroes out values corresponding to odd and even indices. Given
  the determinism of the non-zero indices selection, the encoded structure does
  not need to be represented as a sparse vector, but only the non-zero values
  are necessary. In the decode mehtod, the state (i.e., params derived from the
  state) is used to reconstruct the corresponding indices.
  Thus, this example encoding stage can realize representation saving of 2x.
  """

  ENCODED_VALUES_KEY = 'stateful_topk_values'
  INDICES_KEY = 'indices'
  SHAPES_KEY = 'shapes'
  ERROR_COMPENSATION_KEY = 'error_compensation'

  def encode(self, x, encode_params):
    shapes_list = [tf.shape(y) for y in x]
    flattened = tf.nest.map_structure(lambda y: tf.reshape(y, [-1]), x)
    gradients = tf.concat(flattened, axis=0)
    error_compensation = encode_params[self.ERROR_COMPENSATION_KEY]
    
    gradients_and_error_compensation = tf.math.add(gradients, error_compensation)

    percentage = tf.constant(0.1, dtype=tf.float32)
    k_float = tf.multiply(percentage, tf.cast(tf.size(gradients_and_error_compensation), tf.float32))
    k_int = tf.cast(tf.math.round(k_float), dtype=tf.int32)

    values, indices = tf.math.top_k(tf.math.abs(gradients_and_error_compensation), k = k_int, sorted = False)
    indices = tf.expand_dims(indices, 1)
    sparse_gradients_and_error_compensation = tf.scatter_nd(indices, values, tf.shape(gradients_and_error_compensation))

    new_error_compensation = tf.math.subtract(gradients_and_error_compensation, sparse_gradients_and_error_compensation)
    state_update_tensors = {self.ERROR_COMPENSATION_KEY: new_error_compensation}
    
    encoded_x = {self.ENCODED_VALUES_KEY: values,
                 self.INDICES_KEY: indices,
                 self.SHAPES_KEY: shapes_list}

    return encoded_x, state_update_tensors

  def decode(self,
             encoded_tensors,
             decode_params,
             num_summands=None,
             shape=None):
    del num_summands, decode_params, shape  # Unused.
    flat_shape = tf.math.reduce_sum([tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]])
    sizes_list = [tf.math.reduce_prod(shape) for shape in encoded_tensors[self.SHAPES_KEY]]
    scatter_tensor = tf.scatter_nd(
        indices=encoded_tensors[self.INDICES_KEY],
        updates=encoded_tensors[self.ENCODED_VALUES_KEY],
        shape=[flat_shape])
    nonzero_locations = tf.nest.map_structure(lambda x: tf.cast(tf.where(tf.math.greater(x, 0), 1, 0), tf.float32) , scatter_tensor)
    reshaped_tensor = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
            zip(tf.split(scatter_tensor, sizes_list), encoded_tensors[self.SHAPES_KEY])]
    reshaped_nonzero = [tf.reshape(flat_tensor, shape=shape) for flat_tensor, shape in
            zip(tf.split(nonzero_locations, sizes_list), encoded_tensors[self.SHAPES_KEY])]
    return  reshaped_tensor, reshaped_nonzero


  def initial_state(self):
    return {self.ERROR_COMPENSATION_KEY: tf.constant(0, dtype=tf.float32)}

  def update_state(self, state, state_update_tensors):
    return {self.ERROR_COMPENSATION_KEY: state_update_tensors[self.ERROR_COMPENSATION_KEY]}

  def get_params(self, state):
    encode_params = {self.ERROR_COMPENSATION_KEY: state[self.ERROR_COMPENSATION_KEY]}
    decode_params = {}
    return encode_params, decode_params

  @property
  def name(self):
    return 'gradient_sparsification_encoding_stage'

  @property
  def compressible_tensors_keys(self):
    return False

  @property
  def commutes_with_sum(self):
    return False

  @property
  def decode_needs_input_shape(self):
    return False

  @property
  def state_update_aggregation_modes(self):
    return {}

我按照您在here at page 45 列出的步骤手动运行了一些简单的测试。它有效,但我有一些疑问/问题。

  1. 当我使用相同形状的张量列表(例如:2 个 2x25 张量)作为编码的输入 x 时,它可以正常工作,但是当我尝试使用不同形状的张量列表(2x20 和 6x10)时,它给出和错误提示

InvalidArgumentError:所有输入的形状必须匹配:values[0].shape = [2,20] != values1.shape = [6,10] [Op:Pack] 名称:packed

我该如何解决这个问题?正如我所说,我想使用全局 top-k,所以我必须一次编码整个可训练模型的权重。以cnn model used here为例,所有的张量都有不同的形状。

  1. 如何进行我在开始时描述的平均?比如here你已经完成了

mean_factory = tff.aggregators.MeanFactory( tff.aggregators.EncodedSumFactory(mean_encoder_fn), # 分子 tff.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator )

有没有一种方法可以重复此操作,将 decode 的一个输出用于分子,而将另一个输出用于分母?如何处理 0 除以 0? tensorflow 有 divide_no_nan 函数,我可以使用它还是需要在每个函数中添加 eps?

  1. 使用编码器时如何处理分区?每个客户端是否都有一个唯一的编码器,为它保存一个唯一的状态?正如您所讨论的,here at page 6 客户端状态用于跨筒仓设置,但如果客户端顺序发生变化会发生什么情况?

  2. Here 你推荐使用stateful clients example。你能再解释一下吗?我的意思是在 run_one_round 中,编码器的确切位置以及它们如何与客户端更新和聚合一起使用/结合?

  3. 我有一些额外的信息,例如我想传递给编码的稀疏性。这样做的建议方法是什么?

【问题讨论】:

  • 你能试着澄清一下这个问题吗?您到底想实现/实现什么?
  • 嗨,Jakub。我更新了问题。希望我设法解释了我想要实现的目标。如果您能再看一遍,我将不胜感激。提前致谢

标签: tensorflow-federated


【解决方案1】:

这里有一些答案,希望对你有帮助:

  1. 如果要将所有聚合结构视为单个张量,请使用concat_factory 作为最外层聚合器。这会将整个结构连接到客户端的 rank-1 张量,然后在最后解包回原始结构。使用示例:tff.aggregators.concat_factory(tff.aggregators.MeanFactory(...))

请注意,编码阶段对象旨在使用单个张量,因此您使用相同张量描述的内容可能只是偶然的。

  1. 有两种选择。

    一个。修改客户端训练代码,以便传递给加权聚合器的权重已经是您想要的(零/一 面具)。在您链接的有状态客户端示例中,这将是here。然后你会得到你需要的默认值(通过对分子求和)。

    b.修改UnweightedMeanFactory 以准确地执行您描述和使用的平均变体。开始将修改this

  2. (和 4.)我认为这是您需要实现的。与示例 here 中初始化现有客户端状态的方式相同,您需要扩展它以包含聚合器状态,并确保将这些状态与客户端一起采样,就像 here 所做的那样。然后,要在示例中集成聚合器,您需要替换this 硬编码tff.federated_mean。这种集成的一个例子是在tff.learning.build_federated_averaging_process的实现中,主要是here

  3. 我不确定问题是什么。也许得到以前的工作(对我来说似乎是一个先决条件),然后在新帖子中澄清并提问?

【讨论】:

  • 感谢您富有洞察力的回答。毫无疑问有很多工作要做,但我不太明白你在第三点的意思。如何在编码器结构之外初始化编码器状态?初始化后,如何将这些状态用作编码器的输入?我实际上尝试过使用 aggregatorFactory,但它要求所有状态都在服务器上,这迫使我使用 federated_collect 将 client_state 位置更改为 @SERVER 并通过测量 MeasuredProcessOutput 传递它。我想我可以通过使用保持自己状态的 AdaptiveEncoder 来避免这种情况
猜你喜欢
  • 2020-05-24
  • 1970-01-01
  • 2021-01-01
  • 2020-09-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-09-12
  • 1970-01-01
相关资源
最近更新 更多