【问题标题】:TensorFlowFederated: Passing tensor to tff.federated_computationTensorFlowFederated:将张量传递给 tff.federated_computation
【发布时间】:2020-09-03 12:17:49
【问题描述】:

我已经在我的单机上试用了 TFF 教程 (MNIST),现在我正在尝试使用 MNIST 数据执行多机进程。

显然,我不能使用create_tf_dataset_for_client,所以我使用 GRPC 来学习如何将数据从一台机器传递到另一台机器。

我的场景是服务器将初始模型(带有零)分派给模型将在本地数据上运行的所有参与客户端。每个客户端都会将新的权重分派给执行 federated_mean 的服务器。

我正在考虑使用tff.learning.build_federated_averaging_process,希望可以自定义next 函数(第二个参数),但我失败了...我什至不确定我们是否使用这种方法发送模型并取回权重来自远程客户端。

然后我想我可以在@tff.federated_computation 装饰器下使用tff.federated_mean。但是,由于权重是数组并且我有它们的列表(因为我有很多客户),我无法理解如何创建一个指向该列表列表的tff.FederatedType。任何对分布式数据集进行联邦建模的人的帮助都会很容易理解。

问候, 开发。

【问题讨论】:

    标签: tensorflow-federated


    【解决方案1】:

    TFF 计算设计为与平台/运行时无关;单个计算可以由多个不同的后端执行。

    TFF 的类型系统在这里有助于推理数据在计算中的流动方式。请参阅custom federated algorithms part 1 tutorial,了解 TFF 如何看待类型。

    build_federated_averaging_process 的结果需要放置在客户端的数据集参数;对于元素类型 T 的数据集,在 TFF 的通常表示法中,这将表示为 {T*}@C。这个签名细节对于如何数据集到达客户端,或者实际上如何表示客户端本身是不可知的。

    具体化数据并表示客户端实际上是运行时的工作。 TFF 在这里提供了一些所谓的native 选项。

    例如,在本地 Python 运行时客户端由本地机器上的线程表示。数据集只是渴望的tf.data.Dataset 对象,线程在训练期间从数据集中提取数据。

    在远程 Python 运行时中,客户端由远程工作人员(上的线程)表示,因此单个远程工作人员可以运行多个客户端。在这种情况下,正如您所注意到的,数据必须在远程工作人员上具体化才能进行培训。

    有几种方法可以实现这一点。

    第一,TFF 实际上会在这个 RPC 连接上为您处理急切数据集的序列化和反序列化,因此您可以像在本地运行时一样使用 identical 模式来指定数据,它应该“只是工作”。通过使用tf.raw_ops.DatasetToGraphV2,这种模式实际上在 2021 年 3 月得到了显着改善。

    然而,也许更好地映射到联邦计算的概念是使用一些库函数来简单地在工作人员上实例化数据集

    假设您有一个迭代过程ip,它接受statedata 参数,其中data 的类型为{T*}@C。进一步假设我们有一个 TFF 计算 get_dataset_for_client_id,它接受一个字符串并返回一个适当类型的数据集(IE,它的 TFF 类型签名是 tf.str -> T*)。

    然后我们可以将这两个计算组合成另一个:

    @tff.federated_computation(STATE_TYPE, tff.FederatedType(tf.string, tff.CLIENTS))
    def new_next(state, client_ids):
      datasets_on_clients = tff.federated_map(get_dataset_for_client_id, client_ids)
      return ip.next(state, datasets_on_clients)
    

    new_next 现在要求控制器仅指定要训练的客户端的ids,并将指向数据存储的责任委托给代表客户端的任何人。

    我认为这种模式很可能是你想要的; TFF 提供了一些 helper,例如 tff.simulation.ClientDatatff.simulation.compose_dataset_computation_with_iterative_process 上的 dataset_computation 属性,它们或多或少会执行我们上面为您所做的接线。

    【讨论】:

    • 感谢基思的回复。
    • 我了解您提到的新 API 应该用于实现我的目标。但是,我有点卡在迭代过程的“下一个”功能上,它如何指向远程数据集(在单独的 GPU 上)?更具体地说,我认为您的函数“get_dataset_for_client_id”会将数据集从客户端带到服务器,而我们正在尝试将模型分派到数据所在的客户端 GPU。一旦我解决了这个问题,我就可以使用您提到的 API,但仍然无法解决上述问题。
    【解决方案2】:

    让我们一步一步来。如果下面的解释回答了您的问题,请告诉我们。

    1. 让我们从一个 TF(非联合,仅本地)代码示例开始,该代码获取数据集并对其进行处理,例如添加数字:
    @tff.tf_computation(tff.SequenceType(tf.int32))
    def process_data(ds):
      return ds.reduce(np.int32(0), lambda x, y: x + y)
    

    此代码在输入处获取一个整数数据集,并在输出处返回一个带有总和的整数。

    您可以通过查看类型签名来确认这一点,如下所示:

    str(process_data.type_signature)
    

    你应该看到这个:

    (int32* -> int32)
    

    所以,process_data 接受一组整数,并返回一个整数。

    1. 现在,使用 TFF 的联合运算符,我们可以创建在多个客户端上执行此操作的联合计算,如下所示:
    @tff.federated_computation(tff.FederatedType(tff.SequenceType(tf.int32), tff.CLIENTS))
    def process_data_on_clients(federated_ds):
      return tff.federated_map(process_data, federated_ds)
    

    如果你查看这个新计算的类型签名(就像上面一样),你会看到:

    ({int32*}@CLIENTS -> {int32}@CLIENTS)
    

    这意味着process_data_on_clients 接受一组联合整数(每个客户端一个集合),并返回一个联合整数(每个客户端一个整数和总和)。

    上面发生的情况是,process_data 中的 TF 逻辑将在每个客户端上执行一次。这就是federated_map 运算符的工作原理。

    1. 现在,process_data_on_clients 有点像您正在使用的迭代过程。它希望您提供一个联合数据集作为参数。

    让我们看看如何按照与上述相同的模式制作一个。

    这里有一些 TF 代码创建一个带有整数的单个数据集,假设您提供一个整数 n 并希望创建一个数字从 1 到 n 的数据集,即 {1, 2, ... , n}:

    @tff.tf_computation(tf.int32)
    def make_data(n):
      return tf.data.Dataset.range(tf.cast(n, tf.int64)).map(lambda x: tf.cast(x + 1, tf.int32))
    

    这显然是一个愚蠢的例子,您可以根据需要做更多的事情(例如,从由名称指定的文件中读取数据等)。

    这是它的类型签名的样子:

    (int32 -> int32*)
    

    你可以看到与process_data的相似之处。

    而且,就像处理数据一样,现在我们可以使用 federated_map 运算符在所有客户端上生成数据:

    @tff.federated_computation(tff.FederatedType(tf.int32, tff.CLIENTS))
    def make_data_on_clients(federated_n):
      return tff.federated_map(make_data, federated_n)
    

    这是类型签名:

    ({int32}@CLIENTS -> {int32*}@CLIENTS)
    

    太好了,所以make_data_on_clients 接受一个联合整数(告诉我们在每个客户端上生成多少数据项),并返回一个联合数据集,就像 process_data_on_clients 想要的那样。

    您可以检查两者是否按预期协同工作:

    federated_n = [2, 3, 4]
    federated_ds = make_data_on_clients(federated_n)
    result = process_data_on_clients(federated_ds)
    result
    

    你应该得到 1+2、1+2+3 和 1+2+3+4 的和 3 个参与此计算的客户端(请注意,上面的联合整数中有 3 个数字,所以有 3客户在这里):

    [<tf.Tensor: shape=(), dtype=int32, numpy=3>,
     <tf.Tensor: shape=(), dtype=int32, numpy=6>,
     <tf.Tensor: shape=(), dtype=int32, numpy=10>]
    

    请注意,到目前为止,您看到的所有 TF 代码,包括数据集创建和数据集缩减,都在客户端上执行(使用 federated_map)。

    1. 现在,您可以将两者放在一起:
    @tff.federated_computation(tff.FederatedType(tf.int32, tff.CLIENTS))
    def make_and_process_data_on_clients(federated_n):
      federated_ds = make_data_on_clients(federated_n)
      return process_data_on_clients(federated_ds)
    

    现在,您可以一次性调用制造和工艺数据组合:

    make_and_process_data_on_clients(federated_n)
    

    同样,这里的所有 TF 代码都在客户端上执行,就像上面一样。

    1. 那么这会给您带来什么影响?

    回到 Keith 的解释,您从 TFF 获得的迭代过程需要一个联合数据集作为输入,就像 process_data_on_clients

    Keith 示例中的函数 get_dataset_for_client_id 与我们的 make_data 类似,假设它包含您希望在每个客户端上运行的 TensorFlow 代码,以便在该客户端上物理构建数据集。

    在愚蠢的例子中,数据集构造逻辑使用range,但它可以是任何东西。例如,您可以从同一本地文件 my_data 或使用自定义 TF 操作或通过任何其他方式在每个客户端上加载数据。就像在我们的示例中一样,您可以将参数传递给该函数以提供更集中的控制(类似于上面对联合整数所做的任何事情)。

    Keith 示例中的代码剪辑器 new_next 就像我们的 make_and_process_data_on_clients 一样,它结合了两种联合计算:一种在客户端上生成联合数据(由您提供,正如这里所讨论的那样),另一种用于处理该数据(来自 tff.learning,迭代过程)。

    这有帮助吗?

    如果仍然不清楚,我建议您尝试我在上面的分布式设置中包含的示例,因为您已经有了一个。您可以向该代码注入一些 TF 打印操作,以确认您编写的 TF 代码正在系统中的客户端计算机上执行。

    获得该部分后,只需进行简单的调整即可将 make_data 中愚蠢的数据集构造逻辑替换为从您使用的任何本地数据源加载每个客户端上的数据集的逻辑。

    编辑:

    Re:如何打印,任何出现在 @tff.tf_computation 正文中的 TensorFlow 代码都会以 Eager 模式执行,您可以使用标准 TensorFlow 机制(例如 tf.print)从 TensorFlow 内部进行打印。

    tensorflow.org/api_docs/python/tf/print

    关于如何配置具有多个工作节点的多机系统,请参阅 Kubernetes 教程。请注意,驱动进程的机器连接到工作节点,而不是相反。

    https://www.tensorflow.org/federated/tutorials/high_performance_simulation_with_kubernetes

    【讨论】:

    • 感谢@Krzys 的回复。根据您的反馈,我们现在尝试在 3 台不同的机器上设置一个简单的场景,然后再尝试更复杂的东西:1 台服务器 (CPU) 和 2 台客户端 (GPU)。服务器托管一个 grpc 服务器,通过在端口 8080 上运行建议的标准 docker 命令。这使用 LocalExecutor。客户端发送新通道请求并启动 RemoteExecutor。我们如何在另一台机器上打印的服务器或客户端上打印一些东西?我们只是想证明查询可以来回发送。
    猜你喜欢
    • 1970-01-01
    • 2019-03-12
    • 2021-08-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多