【问题标题】:Need to merge 2 large csv files row by row in python需要在python中逐行合并2个大csv文件
【发布时间】:2016-04-19 10:00:59
【问题描述】:

我正在为一个项目处理 2 个大型数据集文件。我逐行管理清理文件。但是,在尝试应用相同的逻辑来合并基于公共列的 2 个文件时,它会失败。问题是第二个循环完全运行,然后顶部循环运行(不知道为什么会这样)。我尝试使用 numpy

buys = np.genfromtxt('buys_dtsep.dat',delimiter=",",dtype='str')
clicks = np.genfromtxt('clicks_dtsep.dat',delimiter=",",dtype='str')
f = open('combined.dat', 'w')
for s in clicks:
    for s2 in buys:
      #process data

但是由于内存限制以及将数据加载到数组然后处理它所花费的时间,将具有 3300 万个条目的文件加载到数组中是不可行的。我正在尝试逐行处理文件以避免内存不足。

buys = open('buys_dtsep.dat')
clicks = open('clicks_dtsep.dat')
f = open('combined.dat', 'w')

csv_buys = csv.reader(buys)
csv_clicks = csv.reader(clicks)



for s in csv_clicks:
     print 'file 1 row x'#to check when it loops
     for s2 in csv_buys:
        print s2[0] #check looped data  
          #do merge op

打印的输出应该是

file 1 row 0
file 2 row 0
 ...
file 2 row x
file 1 row 1
and so on

我得到的输出是

file 2 row 0
file 2 row 1
...
file 2 row x
file 1 row 0
...
file 1 row z

如果上述循环问题可以解决,将无法逐行合并文件。

更新:示例数据

购买文件样本

420374,2014-04-06,18:44:58.314,214537888,12462,1
420374,2014-04-06,18:44:58.325,214537850,10471,1
281626,2014-04-06,09:40:13.032,214535653,1883,1
420368,2014-04-04,06:13:28.848,214530572,6073,1
420368,2014-04-04,06:13:28.858,214835025,2617,1
140806,2014-04-07,09:22:28.132,214668193,523,1
140806,2014-04-07,09:22:28.176,214587399,1046,1

点击文件示例

420374,2014-04-06,18:44:58,214537888,0
420374,2014-04-06,18:41:50,214537888,0
420374,2014-04-06,18:42:33,214537850,0
420374,2014-04-06,18:42:38,214537850,0
420374,2014-04-06,18:43:02,214537888,0
420374,2014-04-06,18:43:10,214537888,0
420369,2014-04-07,19:39:43,214839373,0
420369,2014-04-07,19:39:56,214684513,0

【问题讨论】:

  • 你可以使用pandas吗?如果是,您可以考虑 read_csvchunks 参数,例如 that 示例
  • 您能从两个 dat 文件中添加一些示例行吗?

标签: python csv numpy memory-management


【解决方案1】:

编辑:OP 想要遍历第二个文件,所以我改变了答案

您正在循环第一个文件中的第一行,然后循环整个第二行。 您的内部循环只会工作一次,因为 csv_buys 迭代器将在第一次循环的第一次运行中被消耗。

for s in csv_clicks:  # <--- looping over the 1st file works fine
    print 'file 1 row x'#to check when it loops
    for s2 in csv_buys: #<--- loops all over the 2nd one and finish the iterator! this loop will ONLY work once!
        print s2[0] #check looped data  
         #do merge op

你需要做的是:

for s in csv_clicks:  # <--- stays the same - works fine
    print 'file 1 row x'#to check when it loops
    for s2 in open('buys_dtsep.dat'): #<---- Now you loop from the start each time :) yay
        print s2[0] #check looped data  
         #do merge op

警告:上面的代码复杂度为 O^2。

如果您的脚本会很慢(而且会),您将不得不考虑不同的解决方案

【讨论】:

  • 我需要检查是否在文件 2 的任何行中找到文件 1 中第 i 行的第 1 列的匹配项,以便生成合并文件。这就是我嵌套循环的原因。我会尝试使用上述方法来做同样的事情
  • @duckvader - 将我的答案更改为每次循环第二个文件
  • 新解决方案允许我访问我需要的方式。谢谢你。虽然它很慢,但我会继续努力,直到找到更好的解决方案。
【解决方案2】:

以下方法有望有所帮助。它旨在提高速度并减少您的内存需求:

from heapq import merge
from itertools import groupby, ifilter

def get_click_entries(key):
    with open('clicks.csv', 'rb') as f_clicks:
        for entry in ifilter(lambda x: int(x[0]) == key, csv.reader(f_clicks)):
            entry.insert(4, '')  # add empty missing column
            yield entry

# First create a set holding all column 0 click entries

with open('clicks.csv', 'rb') as f_clicks:
    csv_clicks = csv.reader(f_clicks)
    click_keys = {int(cols[0]) for cols in csv_clicks}

with open('buys.csv', 'rb') as f_buys, \
    open('clicks.csv', 'rb') as f_clicks,   \
    open('merged.csv', 'wb') as f_merged:

    csv_buys = csv.reader(f_buys)
    csv_clicks = csv.reader(f_clicks)
    csv_merged = csv.writer(f_merged)

    for k, g in groupby(csv_buys, key=lambda x: int(x[0])):
        if k in click_keys:
            buys = sorted(g, key=lambda x: (x[1], x[2]))
            clicks = sorted(get_click_entries(k), key=lambda x: (x[1], x[2]))
            csv_merged.writerows(merge(buys, clicks))       # merge the two lists based on the timestamp
            click_keys.remove(k)
        csv_merged.writerows(g)

    # Write any remaining click entries

    for k in click_keys:
        csv_merged.writerows(get_click_entries(k))

对于您的两个示例文件,这将产生以下输出:

140806,2014-04-07,09:22:28.132,214668193,523,1
140806,2014-04-07,09:22:28.176,214587399,1046,1
281626,2014-04-06,09:40:13.032,214535653,1883,1
420368,2014-04-04,06:13:28.848,214530572,6073,1
420368,2014-04-04,06:13:28.858,214835025,2617,1
420374,2014-04-06,18:41:50,214537888,,0
420374,2014-04-06,18:42:33,214537850,,0
420374,2014-04-06,18:42:38,214537850,,0
420374,2014-04-06,18:43:02,214537888,,0
420374,2014-04-06,18:43:10,214537888,,0
420374,2014-04-06,18:44:58,214537888,,0
420374,2014-04-06,18:44:58.314,214537888,12462,1
420374,2014-04-06,18:44:58.325,214537850,10471,1
420369,2014-04-07,19:39:43,214839373,,0
420369,2014-04-07,19:39:56,214684513,,0

它的工作原理是首先创建一组所有第 0 列条目,这意味着如果知道该条目不存在,您可以避免重新读取整个点击文件。然后它尝试从buys 中读入一组匹配的第0 列条目,并从clicks 中读入相应的第0 列条目列表。然后根据时间戳对它们进行排序并按顺序合并在一起。然后从集合中删除该条目,因此它们不会被重新读取。

【讨论】:

  • 我喜欢你的方法,但是创建一个集合不会花费很长时间,因为我在一个数据集中有大约 3300 万个条目,而另一个数据集中有几百万?我也会尝试这个解决方案并发布结果。谢谢
  • 每次循环都需要重读点击,因此创建一次集不会花费太多时间,并且可以避免许多不必要的重读。我假设这两个文件都不适合内存。
  • 谢谢。我部分尝试了你的方法。效果很好:)
【解决方案3】:

例如,我已将文件替换为 StringIO。与文件对象代码看起来相同。

import  StringIO

file1 = StringIO.StringIO("""420374,2014-04-06,18:44:58.314,214537888,12462,1
420374,2014-04-06,18:44:58.325,214537850,10471,1
281626,2014-04-06,09:40:13.032,214535653,1883,1
420368,2014-04-04,06:13:28.848,214530572,6073,1
420368,2014-04-04,06:13:28.858,214835025,2617,1
140806,2014-04-07,09:22:28.132,214668193,523,1
140806,2014-04-07,09:22:28.176,214587399,1046,1""")

file2 = StringIO.StringIO("""420374,2014-04-06,18:44:58,214537888,0
420374,2014-04-06,18:41:50,214537888,0
420374,2014-04-06,18:42:33,214537850,0
420374,2014-04-06,18:42:38,214537850,0
420374,2014-04-06,18:43:02,214537888,0
420374,2014-04-06,18:43:10,214537888,0
420369,2014-04-07,19:39:43,214839373,0
420369,2014-04-07,19:39:56,214684513,0""")

outfile = StringIO.StringIO()

data1_iter, skip_1 = iter(file1), False
data2_iter, skip_2 = iter(file2), False

while True:
    out = []
    if not skip_1:
        try:
            out.append(next(data1_iter).split()[0])
        except StopIteration:
            skip_1 = True
    if not skip_2:
        try:
            out.append(next(data2_iter).split()[0])
        except StopIteration:
            skip_2 = True            

    outfile.write('\n'.join(out) + "\n")
    if skip_1 and skip_2:
        break

print(outfile.getvalue())

输出:

420374,2014-04-06,18:44:58.314,214537888,12462,1
420374,2014-04-06,18:44:58,214537888,0
420374,2014-04-06,18:44:58.325,214537850,10471,1
420374,2014-04-06,18:41:50,214537888,0
281626,2014-04-06,09:40:13.032,214535653,1883,1
420374,2014-04-06,18:42:33,214537850,0
420368,2014-04-04,06:13:28.848,214530572,6073,1
420374,2014-04-06,18:42:38,214537850,0
420368,2014-04-04,06:13:28.858,214835025,2617,1
420374,2014-04-06,18:43:02,214537888,0
140806,2014-04-07,09:22:28.132,214668193,523,1
420374,2014-04-06,18:43:10,214537888,0
140806,2014-04-07,09:22:28.176,214587399,1046,1
420369,2014-04-07,19:39:43,214839373,0
420369,2014-04-07,19:39:56,214684513,0

【讨论】:

    猜你喜欢
    • 2020-07-22
    • 2020-06-22
    • 2016-05-01
    • 1970-01-01
    • 2018-08-08
    • 2013-06-19
    • 2020-02-23
    • 2022-11-05
    • 1970-01-01
    相关资源
    最近更新 更多