【问题标题】:Importing a CSV file into a sqlite3 database table using Python使用 Python 将 CSV 文件导入 sqlite3 数据库表
【发布时间】:2011-02-22 16:36:26
【问题描述】:

我有一个 CSV 文件,我想使用 Python 将此文件批量导入我的 sqlite3 数据库。命令是“.import .....”。但它似乎不能像这样工作。谁能给我一个如何在 sqlite3 中做到这一点的例子?我正在使用 Windows 以防万一。 谢谢

【问题讨论】:

  • 请提供不起作用的 actual 命令和 actual 错误消息。 “进口......”可以是任何东西。 “不能工作”太模糊了,我们无法猜测。没有细节,我们无能为力。
  • 我所说的实际命令是“.import”,它说语法错误新“.import”
  • 请在问题中实际发布实际命令。请在问题中实际发布实际的错误消息。请不要添加简单重复的 cmets。请使用实际操作的实际复制和粘贴来更新问题。

标签: python database csv sqlite


【解决方案1】:

.import 命令是 sqlite3 命令行工具的一个功能。要在 Python 中执行此操作,您应该使用 Python 拥有的任何工具(例如 csv module)简单地加载数据,然后像往常一样插入数据。

这样,您还可以控制插入的类型,而不是依赖于 sqlite3 看似未记录的行为。

【讨论】:

  • 无需准备插页。 SQL 语句的源和编译结果保存在缓存中。
  • @John Machin:是否有链接指向 SQLite 如何做到这一点?
  • @Marcelo:如果您对它是如何完成的(为什么?)感兴趣,请查看 sqlite 源代码或在 sqlite 邮件列表中询问。
  • @John Machin:我很感兴趣,因为在我遇到的所有 SQLite 文档中,没有一个关于自动缓存未准备好的语句的词。我认为阅读源代码或调查邮件列表来发现诸如是否应该准备我的 SQL 语句这样基本的东西是不合理的。您在这方面的信息来源是什么?
  • @Marcelo:实际上它是在 Python sqlite3 包装器模块中完成的。 docs.python.org/library/… 说“”“sqlite3 模块内部使用语句缓存来避免 SQL 解析开销。如果要显式设置为连接缓存的语句数,可以设置 cached_statements 参数。当前实现的默认值为缓存 100 条语句。"""
【解决方案2】:
import csv, sqlite3

con = sqlite3.connect(":memory:") # change to 'sqlite:///your_filename.db'
cur = con.cursor()
cur.execute("CREATE TABLE t (col1, col2);") # use your column names here

with open('data.csv','r') as fin: # `with` statement available in 2.5+
    # csv.DictReader uses first line in file for column headings by default
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['col1'], i['col2']) for i in dr]

cur.executemany("INSERT INTO t (col1, col2) VALUES (?, ?);", to_db)
con.commit()
con.close()

【讨论】:

  • 如果您遇到与我相同的问题:确保将 col1 和 col2 更改为 csv 文件中的列标题。最后通过调用 con.close() 关闭与数据库的连接。
  • 谢谢,@乔纳斯。更新帖子。
  • 当我尝试这种方法时,我不断收到not all arguments converted during string formatting
  • 我试过这个方法,但它对我不起作用。您能否在这里查看我的数据集(它们很正常,除了某些列有空值)并尝试使用您的代码导入它们? stackoverflow.com/questions/46042623/…
  • 此代码未针对非常大的 csv 文件(GB 顺序)进行优化
【解决方案3】:

非常感谢伯尼的answer!不得不稍微调整一下——这对我有用:

import csv, sqlite3
conn = sqlite3.connect("pcfc.sl3")
curs = conn.cursor()
curs.execute("CREATE TABLE PCFC (id INTEGER PRIMARY KEY, type INTEGER, term TEXT, definition TEXT);")
reader = csv.reader(open('PC.txt', 'r'), delimiter='|')
for row in reader:
    to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8"), unicode(row[2], "utf8")]
    curs.execute("INSERT INTO PCFC (type, term, definition) VALUES (?, ?, ?);", to_db)
conn.commit()

我的文本文件 (PC.txt) 如下所示:

1 | Term 1 | Definition 1
2 | Term 2 | Definition 2
3 | Term 3 | Definition 3

【讨论】:

    【解决方案4】:
    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    
    import sys, csv, sqlite3
    
    def main():
        con = sqlite3.connect(sys.argv[1]) # database file input
        cur = con.cursor()
        cur.executescript("""
            DROP TABLE IF EXISTS t;
            CREATE TABLE t (COL1 TEXT, COL2 TEXT);
            """) # checks to see if table exists and makes a fresh table.
    
        with open(sys.argv[2], "rb") as f: # CSV file input
            reader = csv.reader(f, delimiter=',') # no header information with delimiter
            for row in reader:
                to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8")] # Appends data from CSV file representing and handling of text
                cur.execute("INSERT INTO neto (COL1, COL2) VALUES(?, ?);", to_db)
                con.commit()
        con.close() # closes connection to database
    
    if __name__=='__main__':
        main()
    

    【讨论】:

      【解决方案5】:

      创建一个到磁盘上文件的 sqlite 连接留给读者作为练习......但是现在 pandas 库可以实现两行

      df = pandas.read_csv(csvfile)
      df.to_sql(table_name, conn, if_exists='append', index=False)
      

      【讨论】:

      • 使用 sep=';'。 pandas 文档清楚地概述了如何处理这个问题。
      • 有没有办法在不使用 RAM 的情况下使用 pandas?,我有一个巨大的 .csv (7gb) 我无法作为数据框导入然后附加到数据库。
      • 是的,pandas 中有一种方法可以分块读取,而不是一次全部读取。恐怕我无法完全回忆起我的头顶。我想你添加 chunksize=,然后你会得到一个迭代器,然后你可以使用它来分段附加到数据库。如果你找不到它,请告诉我,我可以找出一个食谱。
      • 非常好,@TennesseeLeeuwenburg。我不需要df,所以我将您的示例缩短为:pandas.read_csv(csvfile).to_sql(table_name, conn, if_exists='append', index=False)
      • 我就像“来吧....继续滚动....这里必须是熊猫答案........很好!”
      【解决方案6】:

      我的 2 美分(更通用):

      import csv, sqlite3
      import logging
      
      def _get_col_datatypes(fin):
          dr = csv.DictReader(fin) # comma is default delimiter
          fieldTypes = {}
          for entry in dr:
              feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
              if not feildslLeft: break # We're done
              for field in feildslLeft:
                  data = entry[field]
      
                  # Need data to decide
                  if len(data) == 0:
                      continue
      
                  if data.isdigit():
                      fieldTypes[field] = "INTEGER"
                  else:
                      fieldTypes[field] = "TEXT"
              # TODO: Currently there's no support for DATE in sqllite
      
          if len(feildslLeft) > 0:
              raise Exception("Failed to find all the columns data types - Maybe some are empty?")
      
          return fieldTypes
      
      
      def escapingGenerator(f):
          for line in f:
              yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
      
      
      def csvToDb(csvFile, outputToFile = False):
          # TODO: implement output to file
      
          with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
              dt = _get_col_datatypes(fin)
      
              fin.seek(0)
      
              reader = csv.DictReader(fin)
      
              # Keep the order of the columns name just as in the CSV
              fields = reader.fieldnames
              cols = []
      
              # Set field and type
              for f in fields:
                  cols.append("%s %s" % (f, dt[f]))
      
              # Generate create table statement:
              stmt = "CREATE TABLE ads (%s)" % ",".join(cols)
      
              con = sqlite3.connect(":memory:")
              cur = con.cursor()
              cur.execute(stmt)
      
              fin.seek(0)
      
      
              reader = csv.reader(escapingGenerator(fin))
      
              # Generate insert statement:
              stmt = "INSERT INTO ads VALUES(%s);" % ','.join('?' * len(cols))
      
              cur.executemany(stmt, reader)
              con.commit()
      
          return con
      

      【讨论】:

      • if len(feildslLeft) > 0: always true ,因此引发异常。请检查并更正此问题。
      • 有什么方法可以做到这一点而不必 fseek(),以便可以在流上使用它?
      • @mwag 您可以跳过列类型检查并将所有列作为文本导入。
      【解决方案7】:

      您可以使用blazeodo 有效地做到这一点

      import blaze as bz
      csv_path = 'data.csv'
      bz.odo(csv_path, 'sqlite:///data.db::data')
      

      Odo 会将 csv 文件存储到架构 data 下的 data.db(sqlite 数据库)

      或者你直接使用odo,不使用blaze。无论哪种方式都很好。阅读此documentation

      【讨论】:

      • bz 未定义:P
      • 它可能是非常旧的包,因为他的内部错误:AttributeError: 'SubDiGraph' object has no attribute 'edge'
      • 也得到相同的属性错误:虽然 GitHub 上似乎有 cmets,但
      【解决方案8】:

      基于 Guy L 解决方案(喜欢它),但可以处理转义字段。

      import csv, sqlite3
      
      def _get_col_datatypes(fin):
          dr = csv.DictReader(fin) # comma is default delimiter
          fieldTypes = {}
          for entry in dr:
              feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]        
              if not feildslLeft: break # We're done
              for field in feildslLeft:
                  data = entry[field]
      
                  # Need data to decide
                  if len(data) == 0:
                      continue
      
                  if data.isdigit():
                      fieldTypes[field] = "INTEGER"
                  else:
                      fieldTypes[field] = "TEXT"
              # TODO: Currently there's no support for DATE in sqllite
      
          if len(feildslLeft) > 0:
              raise Exception("Failed to find all the columns data types - Maybe some are empty?")
      
          return fieldTypes
      
      
      def escapingGenerator(f):
          for line in f:
              yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
      
      
      def csvToDb(csvFile,dbFile,tablename, outputToFile = False):
      
          # TODO: implement output to file
      
          with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
              dt = _get_col_datatypes(fin)
      
              fin.seek(0)
      
              reader = csv.DictReader(fin)
      
              # Keep the order of the columns name just as in the CSV
              fields = reader.fieldnames
              cols = []
      
              # Set field and type
              for f in fields:
                  cols.append("\"%s\" %s" % (f, dt[f]))
      
              # Generate create table statement:
              stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
              print(stmt)
              con = sqlite3.connect(dbFile)
              cur = con.cursor()
              cur.execute(stmt)
      
              fin.seek(0)
      
      
              reader = csv.reader(escapingGenerator(fin))
      
              # Generate insert statement:
              stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))
      
              cur.executemany(stmt, reader)
              con.commit()
              con.close()
      

      【讨论】:

        【解决方案9】:
        import csv, sqlite3
        
        def _get_col_datatypes(fin):
            dr = csv.DictReader(fin) # comma is default delimiter
            fieldTypes = {}
            for entry in dr:
                feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]        
                if not feildslLeft: break # We're done
                for field in feildslLeft:
                    data = entry[field]
        
                # Need data to decide
                if len(data) == 0:
                    continue
        
                if data.isdigit():
                    fieldTypes[field] = "INTEGER"
                else:
                    fieldTypes[field] = "TEXT"
            # TODO: Currently there's no support for DATE in sqllite
        
        if len(feildslLeft) > 0:
            raise Exception("Failed to find all the columns data types - Maybe some are empty?")
        
        return fieldTypes
        
        
        def escapingGenerator(f):
            for line in f:
                yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")
        
        
        def csvToDb(csvFile,dbFile,tablename, outputToFile = False):
        
            # TODO: implement output to file
        
            with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
                dt = _get_col_datatypes(fin)
        
                fin.seek(0)
        
                reader = csv.DictReader(fin)
        
                # Keep the order of the columns name just as in the CSV
                fields = reader.fieldnames
                cols = []
        
                # Set field and type
                for f in fields:
                    cols.append("\"%s\" %s" % (f, dt[f]))
        
                # Generate create table statement:
                stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
                print(stmt)
                con = sqlite3.connect(dbFile)
                cur = con.cursor()
                cur.execute(stmt)
        
                fin.seek(0)
        
        
                reader = csv.reader(escapingGenerator(fin))
        
                # Generate insert statement:
                stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))
        
                cur.executemany(stmt, reader)
                con.commit()
                con.close()
        

        【讨论】:

        • 请正确格式化您的代码并添加一些说明
        【解决方案10】:

        为了简单起见,您可以使用项目 Makefile 中的 sqlite3 命令行工具。

        %.sql3: %.csv
            rm -f $@
            sqlite3 $@ -echo -cmd ".mode csv" ".import $< $*"
        %.dump: %.sql3
            sqlite3 $< "select * from $*"
        

        make test.sql3 然后从现有的 test.csv 文件创建 sqlite 数据库,其中包含单个表“test”。然后你可以make test.dump来验证内容。

        【讨论】:

          【解决方案11】:

          如果 CSV 文件必须作为 python 程序的一部分导入,那么为了简单和高效,您可以按照以下建议的方式使用os.system

          import os
          
          cmd = """sqlite3 database.db <<< ".import input.csv mytable" """
          
          rc = os.system(cmd)
          
          print(rc)
          
          

          重点是通过指定数据库的文件名,数据会自动保存,假设读取没有错误。

          【讨论】:

          【解决方案12】:

          .import 是正确的方法,但这是来自 SQLite3 命令行程序的命令。这个问题的许多最佳答案都涉及本机 python 循环,但如果您的文件很大(我的文件是 10^6 到 10^7 条记录),您希望避免将所有内容读入 pandas 或使用本机 python 列表理解/循环(虽然我没有计时比较)。

          对于大文件,我相信最好的选择是使用subprocess.run()来执行sqlite的导入命令。在下面的示例中,我假设表已经存在,但 csv 文件在第一行有标题。请参阅.import docs 了解更多信息。

          subprocess.run()

          from pathlib import Path
          db_name = Path('my.db').resolve()
          csv_file = Path('file.csv').resolve()
          result = subprocess.run(['sqlite3',
                                   str(db_name),
                                   '-cmd',
                                   '.mode csv',
                                   '.import --skip 1 ' + str(csv_file).replace('\\','\\\\')
                                           +' <table_name>'],
                                  capture_output=True)
          

          编辑说明:sqlite3 的 .import 命令已改进,因此它可以将第一行视为标题名称,甚至可以跳过前 x 行(需要版本 >=3.32,如前所述在this answer中。如果您有旧版本的sqlite3,您可能需要先创建表,然后在导入前剥离csv的第一行。--skip 1参数在3.32之前会出错

          说明
          在命令行中,您要查找的命令是 sqlite3 my.db -cmd ".mode csv" ".import file.csv table"subprocess.run() 运行命令行进程。 subprocess.run() 的参数是一个字符串序列,它被解释为一个命令,后面跟着它的所有参数。

          • sqlite3 my.db打开数据库
          • 数据库后的-cmd 标志允许您将多个后续命令传递给 sqlite 程序。在 shell 中,每个命令都必须用引号引起来,但在这里,它们只需要成为序列中自己的元素
          • '.mode csv' 符合您的预期
          • '.import --skip 1'+str(csv_file).replace('\\','\\\\')+' &lt;table_name&gt;' 是导入命令。
            不幸的是,由于 subprocess 将所有后续内容作为带引号的字符串传递给 -cmd,因此如果您有 Windows 目录路径,则需要将反斜杠加倍。

          剥离标题

          这不是问题的重点,但这是我使用的。同样,我不想在任何时候将整个文件读入内存:

          with open(csv, "r") as source:
              source.readline()
              with open(str(csv)+"_nohead", "w") as target:
                  shutil.copyfileobj(source, target)
          
          

          【讨论】:

          • 无法使 --skip 1 与 3.32.3 和 3.36.0 一起使用
          • 命令行中的@roman 或subprocess.run() ?
          • 我同意这是处理大文件的唯一方法。
          【解决方案13】:

          我发现可能有必要将数据从 csv 传输到数据库中分块进行拆分,以免内存不足。可以这样做:

          import csv
          import sqlite3
          from operator import itemgetter
          
          # Establish connection
          conn = sqlite3.connect("mydb.db")
          
          # Create the table 
          conn.execute(
              """
              CREATE TABLE persons(
                  person_id INTEGER,
                  last_name TEXT, 
                  first_name TEXT, 
                  address TEXT
              )
              """
          )
          
          # These are the columns from the csv that we want
          cols = ["person_id", "last_name", "first_name", "address"]
          
          # If the csv file is huge, we instead add the data in chunks
          chunksize = 10000
          
          # Parse csv file and populate db in chunks
          with conn, open("persons.csv") as f:
              reader = csv.DictReader(f)
          
              chunk = []
              for i, row in reader: 
          
                  if i % chunksize == 0 and i > 0:
                      conn.executemany(
                          """
                          INSERT INTO persons
                              VALUES(?, ?, ?, ?)
                          """, chunk
                      )
                      chunk = []
          
                  items = itemgetter(*cols)(row)
                  chunk.append(items)
          
          

          【讨论】:

            【解决方案14】:

            如果您的 CSV 文件非常大,这里有一些可行的解决方案。按照另一个答案的建议使用 to_sql,但设置 chunksize 以便它不会尝试一次处理整个文件。

            import sqlite3
            import pandas as pd
            
            conn = sqlite3.connect('my_data.db')
            c = conn.cursor()
            users = pd.read_csv('users.csv')
            users.to_sql('users', conn, if_exists='append', index = False, chunksize = 10000)
            

            您也可以使用 Dask,如 here 所述,并行编写大量 Pandas DataFrame:

            dto_sql = dask.delayed(pd.DataFrame.to_sql)
            out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
                   for d in ddf.to_delayed()]
            dask.compute(*out)
            

            更多详情请见here

            【讨论】:

              【解决方案15】:

              下面也可以根据CSV头添加字段名:

              import sqlite3
              
              def csv_sql(file_dir,table_name,database_name):
                  con = sqlite3.connect(database_name)
                  cur = con.cursor()
                  # Drop the current table by: 
                  # cur.execute("DROP TABLE IF EXISTS %s;" % table_name)
              
                  with open(file_dir, 'r') as fl:
                      hd = fl.readline()[:-1].split(',')
                      ro = fl.readlines()
                      db = [tuple(ro[i][:-1].split(',')) for i in range(len(ro))]
              
                  header = ','.join(hd)
                  cur.execute("CREATE TABLE IF NOT EXISTS %s (%s);" % (table_name,header))
                  cur.executemany("INSERT INTO %s (%s) VALUES (%s);" % (table_name,header,('?,'*len(hd))[:-1]), db)
                  con.commit()
                  con.close()
              
              # Example:
              csv_sql('./surveys.csv','survey','eco.db')
              

              【讨论】:

                【解决方案16】:

                这样您也可以在 CSV 上进行连接:

                import sqlite3
                import os
                import pandas as pd
                from typing import List
                
                class CSVDriver:
                    def __init__(self, table_dir_path: str):
                        self.table_dir_path = table_dir_path  # where tables (ie. csv files) are located
                        self._con = None
                
                    @property
                    def con(self) -> sqlite3.Connection:
                        """Make a singleton connection to an in-memory SQLite database"""
                        if not self._con:
                            self._con = sqlite3.connect(":memory:")
                        return self._con
                    
                    def _exists(self, table: str) -> bool:
                        query = """
                        SELECT name
                        FROM sqlite_master 
                        WHERE type ='table'
                        AND name NOT LIKE 'sqlite_%';
                        """
                        tables = self.con.execute(query).fetchall()
                        return table in tables
                
                    def _load_table_to_mem(self, table: str, sep: str = None) -> None:
                        """
                        Load a CSV into an in-memory SQLite database
                        sep is set to None in order to force pandas to auto-detect the delimiter
                        """
                        if self._exists(table):
                            return
                        file_name = table + ".csv"
                        path = os.path.join(self.table_dir_path, file_name)
                        if not os.path.exists(path):
                            raise ValueError(f"CSV table {table} does not exist in {self.table_dir_path}")
                        df = pd.read_csv(path, sep=sep, engine="python")  # set engine to python to skip pandas' warning
                        df.to_sql(table, self.con, if_exists='replace', index=False, chunksize=10000)
                
                    def query(self, query: str) -> List[tuple]:
                        """
                        Run an SQL query on CSV file(s). 
                        Tables are loaded from table_dir_path
                        """
                        tables = extract_tables(query)
                        for table in tables:
                            self._load_table_to_mem(table)
                        cursor = self.con.cursor()
                        cursor.execute(query)
                        records = cursor.fetchall()
                        return records
                

                extract_tables():

                import sqlparse
                from sqlparse.sql import IdentifierList, Identifier,  Function
                from sqlparse.tokens import Keyword, DML
                from collections import namedtuple
                import itertools
                
                class Reference(namedtuple('Reference', ['schema', 'name', 'alias', 'is_function'])):
                    __slots__ = ()
                
                    def has_alias(self):
                        return self.alias is not None
                
                    @property
                    def is_query_alias(self):
                        return self.name is None and self.alias is not None
                
                    @property
                    def is_table_alias(self):
                        return self.name is not None and self.alias is not None and not self.is_function
                
                    @property
                    def full_name(self):
                        if self.schema is None:
                            return self.name
                        else:
                            return self.schema + '.' + self.name
                
                def _is_subselect(parsed):
                    if not parsed.is_group:
                        return False
                    for item in parsed.tokens:
                        if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
                                                                        'UPDATE', 'CREATE', 'DELETE'):
                            return True
                    return False
                
                
                def _identifier_is_function(identifier):
                    return any(isinstance(t, Function) for t in identifier.tokens)
                
                
                def _extract_from_part(parsed):
                    tbl_prefix_seen = False
                    for item in parsed.tokens:
                        if item.is_group:
                            for x in _extract_from_part(item):
                                yield x
                        if tbl_prefix_seen:
                            if _is_subselect(item):
                                for x in _extract_from_part(item):
                                    yield x
                            # An incomplete nested select won't be recognized correctly as a
                            # sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
                            # the second FROM to trigger this elif condition resulting in a
                            # StopIteration. So we need to ignore the keyword if the keyword
                            # FROM.
                            # Also 'SELECT * FROM abc JOIN def' will trigger this elif
                            # condition. So we need to ignore the keyword JOIN and its variants
                            # INNER JOIN, FULL OUTER JOIN, etc.
                            elif item.ttype is Keyword and (
                                    not item.value.upper() == 'FROM') and (
                                    not item.value.upper().endswith('JOIN')):
                                tbl_prefix_seen = False
                            else:
                                yield item
                        elif item.ttype is Keyword or item.ttype is Keyword.DML:
                            item_val = item.value.upper()
                            if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
                                    item_val.endswith('JOIN')):
                                tbl_prefix_seen = True
                        # 'SELECT a, FROM abc' will detect FROM as part of the column list.
                        # So this check here is necessary.
                        elif isinstance(item, IdentifierList):
                            for identifier in item.get_identifiers():
                                if (identifier.ttype is Keyword and
                                        identifier.value.upper() == 'FROM'):
                                    tbl_prefix_seen = True
                                    break
                
                
                def _extract_table_identifiers(token_stream):
                    for item in token_stream:
                        if isinstance(item, IdentifierList):
                            for ident in item.get_identifiers():
                                try:
                                    alias = ident.get_alias()
                                    schema_name = ident.get_parent_name()
                                    real_name = ident.get_real_name()
                                except AttributeError:
                                    continue
                                if real_name:
                                    yield Reference(schema_name, real_name,
                                                    alias, _identifier_is_function(ident))
                        elif isinstance(item, Identifier):
                            yield Reference(item.get_parent_name(), item.get_real_name(),
                                            item.get_alias(), _identifier_is_function(item))
                        elif isinstance(item, Function):
                            yield Reference(item.get_parent_name(), item.get_real_name(),
                                            item.get_alias(), _identifier_is_function(item))
                
                
                def extract_tables(sql):
                    # let's handle multiple statements in one sql string
                    extracted_tables = []
                    statements = list(sqlparse.parse(sql))
                    for statement in statements:
                        stream = _extract_from_part(statement)
                        extracted_tables.append([ref.name for ref in _extract_table_identifiers(stream)])
                    return list(itertools.chain(*extracted_tables))
                

                示例(假设account.csvtojoin.csv 存在于/path/to/files):

                db_path = r"/path/to/files"
                driver = CSVDriver(db_path)
                query = """
                SELECT tojoin.col_to_join 
                FROM account
                LEFT JOIN tojoin
                ON account.a = tojoin.a
                """
                driver.query(query)
                

                【讨论】:

                  【解决方案17】:
                  """
                  cd Final_Codes
                  python csv_to_db.py
                  CSV to SQL DB
                  """
                  
                  import csv
                  import sqlite3
                  import os
                  import fnmatch
                  
                  UP_FOLDER = os.path.dirname(os.getcwd())
                  DATABASE_FOLDER = os.path.join(UP_FOLDER, "Databases")
                  DBNAME = "allCompanies_database.db"
                  
                  
                  def getBaseNameNoExt(givenPath):
                      """Returns the basename of the file without the extension"""
                      filename = os.path.splitext(os.path.basename(givenPath))[0]
                      return filename
                  
                  
                  def find(pattern, path):
                      """Utility to find files wrt a regex search"""
                      result = []
                      for root, dirs, files in os.walk(path):
                          for name in files:
                              if fnmatch.fnmatch(name, pattern):
                                  result.append(os.path.join(root, name))
                      return result
                  
                  
                  if __name__ == "__main__":
                      Database_Path = os.path.join(DATABASE_FOLDER, DBNAME)
                      # change to 'sqlite:///your_filename.db'
                      csv_files = find('*.csv', DATABASE_FOLDER)
                  
                      con = sqlite3.connect(Database_Path)
                      cur = con.cursor()
                      for each in csv_files:
                          with open(each, 'r') as fin:  # `with` statement available in 2.5+
                              # csv.DictReader uses first line in file for column headings by default
                              dr = csv.DictReader(fin)  # comma is default delimiter
                              TABLE_NAME = getBaseNameNoExt(each)
                              Cols = dr.fieldnames
                              numCols = len(Cols)
                              """
                              for i in dr:
                                  print(i.values())
                              """
                              to_db = [tuple(i.values()) for i in dr]
                              print(TABLE_NAME)
                              # use your column names here
                              ColString = ','.join(Cols)
                              QuestionMarks = ["?"] * numCols
                              ToAdd = ','.join(QuestionMarks)
                              cur.execute(f"CREATE TABLE {TABLE_NAME} ({ColString});")
                              cur.executemany(
                                  f"INSERT INTO {TABLE_NAME} ({ColString}) VALUES ({ToAdd});", to_db)
                              con.commit()
                      con.close()
                      print("Execution Complete!")
                  
                  

                  当您在文件夹中有大量 csv 文件并希望一次转换为单个 .db 文件时,这应该会派上用场!

                  请注意,您不必事先知道文件名、表名或字段名(列名)!

                  酷啊?!

                  【讨论】:

                    猜你喜欢
                    • 1970-01-01
                    • 2015-02-05
                    • 2014-12-02
                    • 2020-08-23
                    • 1970-01-01
                    • 2013-02-19
                    • 2012-08-05
                    • 1970-01-01
                    • 1970-01-01
                    相关资源
                    最近更新 更多