【问题标题】:How to create multiple output files in Talend based on a column from an SQL Input如何根据 SQL 输入中的列在 Talend 中创建多个输出文件
【发布时间】:2014-09-05 11:02:23
【问题描述】:

我需要根据 Talend Open Studio 中 sql 输入的值(列)创建多个输出文件。

我的 tMSSQLInput 返回大约 50,000 行,其中一列是 building_name

A 楼 B楼 C楼 ....

因此,所有具有“Building A”值的行都应该在一个名为“buildingA.xls”的excel文件中,所有具有“Building B”的行都应该在一个名为“buildingB.xls”的excel文件中,依此类推。

我一直在尝试将 tLoop 或 tForEach 与 tIterateToFlow 一起使用,但我不确定我是否知道如何实现它。

提前致谢。

【问题讨论】:

  • 我会做一个选择不同的countName,然后像这样连接它:tMSSQLInput -> flowtoIterate -> subjob。现在子作业将过滤行。例如。子作业将根据输入参数创建 1 个文件。如果您有 10 座建筑物,则子作业将被调用 10 次。在子作业中,您可以使用 tMSSQLInput -> tFileOutputExcel。我会使用子作业,因为如果您将它们与迭代一起使用,动态更改文件名有时会导致问题。

标签: etl talend


【解决方案1】:

Gabriele's answer 在我看来还不错。

但是,如果您发现自己处于这样一种情况,即您在许多建筑物中拥有大量数据,以至于您可以将任何单个建筑物的行数据存储在内存中,但不能存储所有数据,那么我会倾向于使用稍微不同的方法。

在此示例作业中,我使用 MySQL 数据库组件只是因为我碰巧有一个本地 MySQL 数据库,但有关此作业的所有内容也适用于 Oracle 或 MS SQL Server:

在这种情况下,一开始我们使用 tMySqlConnection 组件打开到数据库的连接。剩下的 2 个数据库组件(tMySqlInput 和 tMySqlRow)然后使用共享连接详细信息。

我们首先使用 tMySqlInput 中的以下查询获取数据库中的建筑物列表:

"SELECT DISTINCT building
FROM filesplittest"

这会返回每个不同的建筑物。

然后我们遍历每个建筑物,这样我们就可以在内存中只保留该特定建筑物的记录,以便在接下来的工作中使用。

然后,我们使用 tMySqlRow 组件通过准备好的语句为特定的迭代构建拉取数据。我使用的示例查询如下所示:

"SELECT building, foo, bar
FROM FileSplitTest
WHERE building = ?"

然后我们在高级设置中配置prepared statement:

我说过第一个参数(参数索引 = 1)是我们之前检索到的建筑值,并且 tFlowToIterate 帮助我们推送到 globalMap,因此我们在这种情况下使用 ((String)globalMap.get("row6.building")) 从那里检索它(它是第 6 行流中的“建筑”列)。

当使用准备好的语句时,您需要将数据作为记录集对象检索,因此您需要像这样设置 tMySqlRow 的架构:

然后我们使用 tParseRecordSet 组件对其进行解析:

使用适合此示例的架构:

然后我们需要遍历这组数据,将其附加到适当命名的 CSV。为此,我们使用另一个 tFlowToIterate 组件,并通过 tFixedFlowInput 组件绕了一个稍微烦人的弯路,在将每条记录的数据从 globalMap 中读取出来,然后再将其传递给 tFileOutputDelimited:

最后我们将其追加到以建筑物命名的 CSV:

请注意附加复选框已选中,否则作业的每次迭代都会覆盖前一个。我们还通过 building 列中的值命名文件。


正如 Gabriele 所提到的,如果您的数据在任何时候都可以很好地放入内存中,您可以通过将数据读入 tHashOutput 组件然后过滤散列中的数据来简化工作:

我们首先将所有数据读入 tHashOutput 组件,然后在整个作业期间将数据保存在内存中。 Talend 有时会出于某种奇怪的原因隐藏这些组件,但您可以通过将它们重新添加到项目属性 -> 设计器 -> 调色板设置中来重新启用它们:

接下来,我们使用 tHashInput 组件(链接到前一个 tHashOutput 组件 - 不要忘记将相同的模式添加到 tHashInput 组件)从哈希中读回数据,然后使用 tAggregateRow 组件并通过“构建" 有效地区分建筑价值:

然后我们使用 tFlowToIterate 遍历“building”的不同值,然后通过当前正在迭代的 building 值过滤哈希(第二次读取):

最后,我们再次确保附加到以 building 列中的值命名的文件:

【讨论】:

    【解决方案2】:

    我认为最好在两步工作中详细完成

    • 首先获得要构建的文件列表
    • 然后在所属文件上路由行

    我会设计这样的工作

    tMSSSQL_Input_1------>tCacheOut_1
            |
            |
        OnSubjobOk
            |
            |
            v
        tCacheIn_1------->tAggregateRow------>tFlowToIterate
                                                   /
                                                  / 
                                             (iterate)
                                                /
                                               /
                                              /
           +---------------------------------+
           |
           |
           v
       tCacheIn_1------->tFilterRow-------->tFileOutDelimited
    

    让我解释一下发生了什么

    • 在第一个子作业中,您将表转储到内存缓冲区中(Talend Exchange 上提供的tCacheOut 是一个很好的组件,但开箱即用的tHashInput/tHashOutput 可以完成这项工作,也) - 这仅用于查询数据库一次,但如果性能不是必需的,您可以触发多个查询并避免使用内存缓冲区
    • 然后您第一次阅读转储以区分您的建筑物(在建筑物列上使用 tAggregateRow
    • 然后,您将切换到一个迭代流,将当前建筑值保存在一个全局变量中,我们称之为“my_building
    • 然后,您第二次读取您的转储并仅过滤当前建筑物的行。事实上,您可以在过滤条件中使用globalMap.get("my_building")
    • 最后,您要将这些行保存在适当的文件中,再次使用 globalMap.get("my_building") 参数化您的文件名。

    【讨论】:

      【解决方案3】:

      一种方法是使用如下流程:

      tMySqlInput-->tFlowToIterate-->tFixedFlowInput-->tFileOutputDelimited.

      在 tFlowToIterate 中(您可以添加键 - 例如 FileName,它将从 tMySqlInput 架构中的列中获取值)

      在 tFileOutputDelimited 中,您可以使用此 (String)glotbalMap.get("FileName") 构建一个文件名路径,该文件名来自 tMySqlInput 的每一行。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-05-19
        • 2015-06-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多