【问题标题】:How to get the pivot lines from two pairs of files in Python?如何从 Python 中的两对文件中获取轴心线?
【发布时间】:2021-06-08 18:53:35
【问题描述】:

来自How to get the pivot lines from two tab-separated files?,有一种快速的方法可以使用 unix 命令来旋转两个文件中的行。

如果我们有两对文件:

  • f1af1b
  • f2af2b

目标是提供一个以制表符分隔的 3 列文件,其中包括:

  • f1a / f2a
  • f1b
  • f2b

其中f1a / f2a 是文件中出现在f1af1b 中的行:

我尝试了以下方法,但如果文件非常大,存储f1f2 字典将占用大量内存。例如。数十亿行的文件。

import sys
from tqdm import tqdm 

f1a, f1b, f2a, f2b = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]


# Read first pair of file into memory.
with open(f1a) as fin_f1a, open(f1a) as fin_f1b:
  f1 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f1a, fin_f1b))}

with open(s2) as fin_f2a, open(t2) as fin_f2b:
  f2 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f2a, fin_f2b))}


with open('pivoted.tsv', 'w') as fout:
  for s in tqdm(f1.keys() & f2.keys()):
    print('\t'.join([s, f1[s], f2[s]]), end='\n', file=fout)

是否有更快/更好/更简单的方法在 Python 中实现相同的 3 列制表符分隔文件?是否有库可以有效地对大文件执行此类操作?


使用turicreate.SFrame,我也可以这样做:

from turicreate import SFrame

f1a, f1b, f2a, f2b = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]

sf1a = SFrame.read_csv(f1a, delimited='\0', header=False)
sf1b = SFrame.read_csv(f1b, delimited='\0', header=False) 

sf2a = SFrame.read_csv(f2a, delimited='\0', header=False)
sf2b = SFrame.read_csv(f2b, delimited='\0', header=False)

sf1 = sf1a.join(sf1b) 
sf2 = sf2a.join(sf2b)

sf = sf1.join(sf2, on='X1', how='left') 
sf.save('pivoted')

【问题讨论】:

    标签: python performance csv dictionary memory-efficient


    【解决方案1】:

    通用合并

    zip 函数不会存储可迭代对象的完整副本。这样我们就可以放心使用了。

    假设您有两个按第一列升序排序的可迭代对象,您可以按如下方式连接这两个表。

    def merge(t1, t2):
        end = object()
        end_ = end, None
        a1, b1 = next(t1, end_)
        a2, b2 = next(t2, end_)
        while a1 is not end and a2 is not end:
            if a1 < a2:
                a1, b1 = next(t1, end_)
            elif a1 > a2:
                a2, b2 = next(t2, end_)
            else:
                yield a1, b1, b2
                a1, b1 = next(t1, end_)
                a2, b2 = next(t2, end_)
    

    Merge 使用两个迭代器调用并产生第三个迭代器,每个迭代器一次只需要存储一个元素。

    list(merge(iter([(0, 1), (1, 1), (3, 2)]), 
      iter([(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e')])))
    
    [(0, 1, 'a'), (1, 1, 'b'), (3, 2, 'd')]
    

    扫描写字

    为了防止整个文件被存储,我有一个扫描方法,每个文件一次读取一行。

    def scan(fa, fb):
        for a, b in zip(fa, fb):
            a = a.strip().replace('\t', ' ')
            b = b.strip().replace('\t', ' ')
            yield a, b
    def scan_by_name(fa, fb):
        with open(fa) as fha, open(fb) as fhb:
            yield from scan(fha, fhb)
    

    那么你可以通过这种方式来解决你的问题(未经测试,我没有你的文件)

    with open('pivoted.tsv', 'w') as fout:
        t1 = scan_by_name(f1a, f1b)
        t2 = scan_by_name(f2a, f2b)
        for row in merge(t1, t2):
            print('\t'.join(row), end='\n', file=fout)
    

    【讨论】:

    • tqdm 和 zip 不会加载文件。它被通读并产生项目
    • 你可能是对的。但在这种情况下,包只能说明执行了多少次迭代,而不是实际进度的百分比。
    【解决方案2】:

    正如leangaurav 建议的那样,可以使用Dask 完成。

    优点是我们可以创建一个在线程池(或进程)中运行并读取文件块(使用少量 RAM)的解决方案,而不必担心这一点。

    例如,我们创建一些测试数据:

    from string import ascii_lowercase
    from random import choices
    
    def rand_str(k=3):
        return " ".join("".join(choices(ascii_lowercase, k=k)) for _ in range(2))
    
    N = 2_000
    
    for file_name in ["example_a.txt", "example_b.txt"]:
        with open(file_name, "w") as f:
            for _ in range(N):
                line = f"{rand_str()} \t {rand_str()}\n"
                f.write(line)
    
    

    我们用 dask 读取数据,我们指出哪一列将成为索引,然后我们进行合并:

    from dask import compute
    import dask.dataframe as dd
    
    # this does not process anything yet 
    df_a = dd.read_csv("example_a.txt", sep="\t", names=["pivot", "data"]).set_index("pivot")
    df_b = dd.read_csv("example_b.txt", sep="\t", names=["pivot", "data"]).set_index("pivot")
    
    # this is the heavy part
    result = dask.compute(dd.merge(df_a, df_b, left_index=True, right_index=True))
    # save the output
    result[0].to_csv("out.txt", sep="\t", header=False)
    

    在旧笔记本上针对不同 N 的一些测试(仅考虑计算的步骤):

    • N = 500_000 -> 11s
    • N = 1_000_000 -> 25s
    • N = 2_000_000 -> 44s
    • N = 4_000_000 -> 1m33s

    【讨论】:

    • 看起来很整洁!
    【解决方案3】:

    将立即评估字典推导。

    如果您不喜欢流式传输,请尝试对数据进行分段。

    def oneSegment(first_letter)
    # Read first pair of file into memory.
    with open(f1a) as fin_f1a, open(f1a) as fin_f1b:
    f1 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f1a, fin_f1b)) if s.strip().replace('\t', ' ').startwith(first_letter)}
    
    with open(s2) as fin_f2a, open(t2) as fin_f2b:
    f2 = {s.strip().replace('\t', ' ') :t.strip().replace('\t', ' ') for s, t in tqdm(zip(fin_f2a, fin_f2b)) if s.strip().replace('\t', ' ').startwith(first_letter)}
    
    with open('pivoted.tsv', 'a') as fout:
    for s in tqdm(f1.keys() & f2.keys()):
        print('\t'.join([s, f1[s], f2[s]]), end='\n', file=fout)
    oneSegment("a")
    

    【讨论】:

      【解决方案4】:

      假设您在 linux 上运行 python 代码,此解决方案有效,您可以通过os.system 运行一系列 linux 命令来完成此操作。一些步骤可以组合成一个命令,也可以作为单独的进程并行运行以提高速度。
      同一组中的每个任务都可以并行运行。 (1,2,3,4), (5,6), (7,8), (9,10,11)

      import os
      
      f1a = "f1a"
      f1b = "f1b"
      f2a = "f2a"
      f2b = "f2b"
      
      # replace \t with space
      os.system(f"sed -i 's/\t/ /g' {f1a}") #1
      os.system(f"sed -i 's/\t/ /g' {f1b}") #2
      os.system(f"sed -i 's/\t/ /g' {f2a}") #3
      os.system(f"sed -i 's/\t/ /g' {f2b}") #4
      
      # join the pair of files with \t
      os.system(f"paste {f1a} {f1b} > f1_t") #5 join f1a and f1b with \t delimiter
      os.system(f"paste {f2a} {f2b} > f2_t") #6 join f1a and f1b with \t delimiter
      
      # prepare data for join: sort the files
      os.system(f"sort f1_t > f1") #7 
      os.system(f"sort f2_t > f2") #8 
      
      os.system(f"join f1 f2 -j 1 -t '\t'> f12") #9 lines common to both f1, f2: -j 1
      os.system(f"join f1 f2 -v 1 -j 1 -t '\t'> f11") #10 lines present only in f1 : -v 1
      os.system(f"join f1 f2 -v 2 -j 1 -t '\t'> f22") #11 lines present only in f2 : -v 2
      
      os.system(f"cat f12  f11 f22 > f") #12 join into final result f
      

      由于问题提到了 python 而不是平台,对于平台中立方法,请查看dask

      answer 解决了部分问题,即合并大文件。但是仍然需要弄清楚加入文件集等,这应该也可以使用 Dask。或者可以通过上面的代码(甚至在代码之外)完成预处理,并且可以使用 Dask 完成合并。另请检查this 答案。

      在处理大型数据块时,排序主要是使事情变得更好。
      this dask doc 中查看有关indexes 的详细信息。

      【讨论】:

        猜你喜欢
        • 2021-05-13
        • 1970-01-01
        • 1970-01-01
        • 2020-07-21
        • 2021-12-03
        • 1970-01-01
        • 2017-10-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多