【问题标题】:How to do a aggregation transformation in a kiba etl script (kiba gem)?如何在 kiba etl 脚本(kiba gem)中进行聚合转换?
【发布时间】:2015-06-30 18:26:31
【问题描述】:

我想编写一个 Kiba Etl 脚本,该脚本具有从 CSV 到目标 CSV 的源,其中包含转换规则列表,其中第二个转换器是聚合,其中的操作如选择名称、sum(euro) 按名称分组

Kiba ETL 脚本文件

source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol

transform VerifyFieldsPresence, [:name, :euro]

transform AggregateFields, { sum: :euro, group_by: :name}

transform RenameField,from: :euro, to: :total_amount

destination CsvDestination, 'result.csv', [:name, :total_amount]

users.csv

date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack

result.csv(预期结果)

total_amount;name
16;Jack
97;Jill
99;Mack

由于 etl 转换器一次在一行上一个接一个地执行,但我的第二个转换器行为取决于我无法在传递给转换方法的类中访问它的整个行集合。

transform AggregateFields, { sum: :euro, group_by: :name }

是否有可能使用 kiba gem 实现此行为
提前谢谢你

【问题讨论】:

    标签: ruby etl kiba-etl


    【解决方案1】:

    编辑:现在是 2020 年,Kiba ETL v3 包含一个更好的方法来做到这一点。查看这篇文章https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 了解所有相关信息。

    木场作者在这里!您可以通过多种不同的方式实现这一点,主要取决于数据大小和您的实际需求。以下是几种可能性。

    在 Kiba 脚本中使用变量进行聚合

    require 'awesome_print'
    
    transform do |r|
      r[:amount] = BigDecimal.new(r[:amount])
      r
    end
    
    total_amounts = Hash.new(0)
    
    transform do |r|
      total_amounts[r[:name]] += r[:amount]
      r
    end
    
    post_process do
      # pretty print here, but you could save to a CSV too
      ap total_amounts
    end
    

    这是最简单的方法,但也很灵活。

    不过,它会将您的聚合保存在内存中,因此这可能足够好,也可能不够好,具体取决于您的场景。请注意,目前 Kiba 是单线程的(但“Kiba Pro”将是多线程的),因此目前不需要为聚合添加锁或使用线程安全结构。

    从 post_process 块调用 TextQL

    另一种快速简便的聚合方法是先生成一个非聚合的 CSV 文件,然后利用 TextQl 实际进行聚合,如下所示:

    destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]
    
    post_process do
      query = <<SQL
        select
          name,
          /* apparently sqlite has reduced precision, round to 2 for now */
          round(sum(amount), 2) as total_amount
        from tbl group by name
    SQL
    
      textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
    end
    

    定义了以下助手:

    def system!(cmd)
      raise "Failed to run command #{command}" unless system(command)
    end
    
    def textql(source_file, query, output_file)
      system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
      # this one uses csvfix to pretty print the table
      system! "cat #{output_file} | csvfix ascii_table"
    end
    

    在进行计算时要注意精度。

    编写内存聚合目的地

    在这里可以使用的一个有用技巧是用一个类包装给定的目的地来进行聚合。下面是它的样子:

    class InMemoryAggregate
      def initialize(sum:, group_by:, destination:)
        @aggregate = Hash.new(0)
        @sum = sum
        @group_by = group_by
        # this relies a bit on the internals of Kiba, but not too much
        @destination = destination.shift.new(*destination)
      end
    
      def write(row)
        # do not write, but count here instead
        @aggregate[row[@group_by]] += row[@sum]
      end
    
      def close
        # use close to actually do the writing
        @aggregate.each do |k,v|
          # reformat BigDecimal additions here
          value = '%0.2f' % v
          @destination.write(@group_by => k, @sum => value)
        end
        @destination.close
      end
    end
    

    你可以这样使用:

    # convert your string into an actual number
    transform do |r|
      r[:amount] = BigDecimal.new(r[:amount])
      r
    end
    
    destination CsvDestination, 'non-aggregated.csv', [:name, :amount]
    
    destination InMemoryAggregate,
      sum: :amount, group_by: :name,
      destination: [
        CsvDestination, 'aggregated.csv', [:name, :amount]
      ]
    
    post_process do
      system!("cat aggregated.csv | csvfix ascii_table")
    end
    

    这个版本的好处是您可以将聚合器重复用于不同的目的地(如数据库或其他任何地方)。

    请注意,这会将所有聚合保留在内存中,就像第一个版本一样。

    插入具有聚合功能的商店

    另一种方法(如果您的数据量很大,则特别有用)是将生成的数据发送到能够为您聚合数据的东西中。它可以是常规的 SQL 数据库、Redis 或任何更花哨的东西,然后您可以根据需要进行查询。

    正如我所说,实施将在很大程度上取决于您的实际需求。希望您能在这里找到适合您的东西!

    【讨论】:

    • 有什么方法可以继续对聚合集进行进一步的转换。 'transform AggregateFields, { sum: :euro, group_by: :name}' 'transform RenameField,from: :euro, to: :total_amount' 即重命名字段或聚合集上的任何变换
    • 你还不能那样写,但你可以做的是将聚合发送到 CSV 文件,然后使用辅助 Kiba 脚本,使用你的聚合作为源。从那里你可以随心所欲地变换。
    • 一个技巧是您可以在post_process 块中从第一个脚本启动辅助 Kiba 脚本。这样一来,一切都将在一次运行中被链接起来。希望这会有所帮助!
    • 根据您将聚合发送到 CSV 文件的建议,我得到了以下链接中提到的问题的解决方案。 gist.github.com/umar-webonise/ce46679c111854a164c8 谢谢 :)
    • 对不起,我发布的上述解决方案似乎不起作用。将尝试使用您的 post_process 块方法
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多