让我们一步一步来。如果下面的解释回答了您的问题,请告诉我们。
- 让我们从一个 TF(非联合,仅本地)代码示例开始,该代码获取数据集并对其进行处理,例如添加数字:
@tff.tf_computation(tff.SequenceType(tf.int32))
def process_data(ds):
return ds.reduce(np.int32(0), lambda x, y: x + y)
此代码在输入处获取一个整数数据集,并在输出处返回一个带有总和的整数。
您可以通过查看类型签名来确认这一点,如下所示:
str(process_data.type_signature)
你应该看到这个:
(int32* -> int32)
所以,process_data 接受一组整数,并返回一个整数。
- 现在,使用 TFF 的联合运算符,我们可以创建在多个客户端上执行此操作的联合计算,如下所示:
@tff.federated_computation(tff.FederatedType(tff.SequenceType(tf.int32), tff.CLIENTS))
def process_data_on_clients(federated_ds):
return tff.federated_map(process_data, federated_ds)
如果你查看这个新计算的类型签名(就像上面一样),你会看到:
({int32*}@CLIENTS -> {int32}@CLIENTS)
这意味着process_data_on_clients 接受一组联合整数(每个客户端一个集合),并返回一个联合整数(每个客户端一个整数和总和)。
上面发生的情况是,process_data 中的 TF 逻辑将在每个客户端上执行一次。这就是federated_map 运算符的工作原理。
- 现在,
process_data_on_clients 有点像您正在使用的迭代过程。它希望您提供一个联合数据集作为参数。
让我们看看如何按照与上述相同的模式制作一个。
这里有一些 TF 代码创建一个带有整数的单个数据集,假设您提供一个整数 n 并希望创建一个数字从 1 到 n 的数据集,即 {1, 2, ... , n}:
@tff.tf_computation(tf.int32)
def make_data(n):
return tf.data.Dataset.range(tf.cast(n, tf.int64)).map(lambda x: tf.cast(x + 1, tf.int32))
这显然是一个愚蠢的例子,您可以根据需要做更多的事情(例如,从由名称指定的文件中读取数据等)。
这是它的类型签名的样子:
(int32 -> int32*)
你可以看到与process_data的相似之处。
而且,就像处理数据一样,现在我们可以使用 federated_map 运算符在所有客户端上生成数据:
@tff.federated_computation(tff.FederatedType(tf.int32, tff.CLIENTS))
def make_data_on_clients(federated_n):
return tff.federated_map(make_data, federated_n)
这是类型签名:
({int32}@CLIENTS -> {int32*}@CLIENTS)
太好了,所以make_data_on_clients 接受一个联合整数(告诉我们在每个客户端上生成多少数据项),并返回一个联合数据集,就像 process_data_on_clients 想要的那样。
您可以检查两者是否按预期协同工作:
federated_n = [2, 3, 4]
federated_ds = make_data_on_clients(federated_n)
result = process_data_on_clients(federated_ds)
result
你应该得到 1+2、1+2+3 和 1+2+3+4 的和 3 个参与此计算的客户端(请注意,上面的联合整数中有 3 个数字,所以有 3客户在这里):
[<tf.Tensor: shape=(), dtype=int32, numpy=3>,
<tf.Tensor: shape=(), dtype=int32, numpy=6>,
<tf.Tensor: shape=(), dtype=int32, numpy=10>]
请注意,到目前为止,您看到的所有 TF 代码,包括数据集创建和数据集缩减,都在客户端上执行(使用 federated_map)。
- 现在,您可以将两者放在一起:
@tff.federated_computation(tff.FederatedType(tf.int32, tff.CLIENTS))
def make_and_process_data_on_clients(federated_n):
federated_ds = make_data_on_clients(federated_n)
return process_data_on_clients(federated_ds)
现在,您可以一次性调用制造和工艺数据组合:
make_and_process_data_on_clients(federated_n)
同样,这里的所有 TF 代码都在客户端上执行,就像上面一样。
- 那么这会给您带来什么影响?
回到 Keith 的解释,您从 TFF 获得的迭代过程需要一个联合数据集作为输入,就像 process_data_on_clients。
Keith 示例中的函数 get_dataset_for_client_id 与我们的 make_data 类似,假设它包含您希望在每个客户端上运行的 TensorFlow 代码,以便在该客户端上物理构建数据集。
在愚蠢的例子中,数据集构造逻辑使用range,但它可以是任何东西。例如,您可以从同一本地文件 my_data 或使用自定义 TF 操作或通过任何其他方式在每个客户端上加载数据。就像在我们的示例中一样,您可以将参数传递给该函数以提供更集中的控制(类似于上面对联合整数所做的任何事情)。
Keith 示例中的代码剪辑器 new_next 就像我们的 make_and_process_data_on_clients 一样,它结合了两种联合计算:一种在客户端上生成联合数据(由您提供,正如这里所讨论的那样),另一种用于处理该数据(来自 tff.learning,迭代过程)。
这有帮助吗?
如果仍然不清楚,我建议您尝试我在上面的分布式设置中包含的示例,因为您已经有了一个。您可以向该代码注入一些 TF 打印操作,以确认您编写的 TF 代码正在系统中的客户端计算机上执行。
获得该部分后,只需进行简单的调整即可将 make_data 中愚蠢的数据集构造逻辑替换为从您使用的任何本地数据源加载每个客户端上的数据集的逻辑。
编辑:
Re:如何打印,任何出现在 @tff.tf_computation 正文中的 TensorFlow 代码都会以 Eager 模式执行,您可以使用标准 TensorFlow 机制(例如 tf.print)从 TensorFlow 内部进行打印。
tensorflow.org/api_docs/python/tf/print
关于如何配置具有多个工作节点的多机系统,请参阅 Kubernetes 教程。请注意,驱动进程的机器连接到工作节点,而不是相反。
https://www.tensorflow.org/federated/tutorials/high_performance_simulation_with_kubernetes