【问题标题】:How to use python loop to through file and execute queries using parameter from text file如何使用python循环通过文件并使用文本文件中的参数执行查询
【发布时间】:2018-07-12 12:55:46
【问题描述】:

我正在尝试使脚本正常工作,该脚本从文件中获取每一行并使用该行作为输入来运行 SQL 查询。具体来说,我正在尝试使用具有域列表的文件并使用这些域名来查询 postgresql 数据库。任何帮助将不胜感激!

from __future__ import print_function

try:
    import psycopg2
except ImportError:
    raise ImportError('\n\033[33mpsycopg2 library missing. pip install psycopg2\033[1;m\n')
    sys.exit(1)
import re
import sys
import json

DB_HOST = 'crt.sh'
DB_NAME = 'certwatch'
DB_USER = 'guest'


def connect_to_db(domain_name):
    try:
        conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
        cursor = conn.cursor()
        cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'emailAddress' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower('%{}'));".format(domain_name))
    except:
        print("\n\033[1;31m[!] Unable to connect to the database\n\033[1;m")
    return cursor

def get_unique_emails(cursor, domain_name):
    unique_emails = []
    for result in cursor.fetchall():
        matches=re.findall(r"\'(.+?)\'",str(result))
        for email in matches:
            #print(email)
            if email not in unique_emails:
                if "{}".format(domain_name) in email:
                    unique_emails.append(email)
    return unique_emails

def print_unique_emails(unique_emails):
    print("\033[1;32m[+] Total unique emails found: {}\033[1;m".format(len(unique_emails)))
    for unique_email in sorted(unique_emails):
        print(unique_email)

def write_unique_emails(unique_emails):
    with open('unique_emails.json', 'w') as outfile:
        json.dump(unique_emails, outfile, sort_keys=True, indent=4)

def get_domain_name():
    filepath = 'file.txt'  
    with open(filepath) as fp:  
    for cnt, line in enumerate(fp):
        print("Line {}: {}".format(cnt, line))
    return line

if __name__ == '__main__':
    domain_name = get_domain_name()                                             
    cursor = connect_to_db(domain_name)
    unique_emails = get_unique_emails(cursor, domain_name)
    print_unique_emails(unique_emails)
    write_unique_emails(unique_emails)

以下代码使用 sys.argv

from __future__ import print_function

try:
    import psycopg2
except ImportError:
    raise ImportError('\n\033[33mpsycopg2 library missing. pip install psycopg2\033[1;m\n')
    sys.exit(1)
import re
import sys
import json

DB_HOST = 'crt.sh'
DB_NAME = 'certwatch'
DB_USER = 'guest'

def connect_to_db(domain_name):
    try:
        conn = psycopg2.connect("dbname={0} user={1} host={2}".format(DB_NAME, DB_USER, DB_HOST))
        cursor = conn.cursor()
        cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'emailAddress' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower('%{}'));".format(domain_name))
    cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'serialNumber' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower('%{}'));".format(domain_name))
    except:
        print("\n\033[1;31m[!] Unable to connect to the database\n\033[1;m")
    return cursor

def get_unique_emails(cursor, domain_name):
    unique_emails = []
    for result in cursor.fetchall():
        matches=re.findall(r"\'(.+?)\'",str(result))
        for email in matches:
            #print(email)
            if email not in unique_emails:
                if "{}".format(domain_name) in email:
                    unique_emails.append(email)
    return unique_emails

def get_unique_serialNumber(cursor, domains):
    unique_domains = []
    for result in cursor.fetchall():
        matches=re.findall(r"\'(.+?)\'",str(result))
        for serialNumber in matches:
            if serialNumber not in unique_serialNumber:
                if ".{}".format(domain_name) in serialNumber:
                    unique_serialNumber.append(serialNumber)
    return unique_serialNumber

def print_unique_serialNumber(unique_serialNumber):
    for unique_serialNumber in sorted(unique_serialNumber):
        print(unique_serialNumber)

def print_unique_emails(unique_emails):
    print("\033[1;32m[+] Total unique emails found: {}\033[1;m".format(len(unique_emails)))
    for unique_email in sorted(unique_emails):
        print(unique_email)

def write_unique_emails(unique_emails):
    with open('read.json', 'w') as outfile:
        json.dump(unique_emails, outfile, sort_keys=True, indent=4)

def get_domain_name():
    if len(sys.argv) <= 1:
        print("\n\033[33mUsage: python emails_from_ct_logs.py <target_domain>\033[1;m\n")
        sys.exit(1)
    else:
        return sys.argv[1]

if __name__ == '__main__':
    domain_name = get_domain_name()
    cursor = connect_to_db(domain_name)
    unique_emails = get_unique_emails(cursor, domain_name)
    print_unique_emails(unique_emails)
    write_unique_emails(unique_emails)
    unique_serialNumber = get_unique_serialNumber(cursor, domain_name)
    print_unique_serialNumber(unique_serialNumber)

【问题讨论】:

  • 如果有帮助,请作为快速指针:initd.org/psycopg/docs/usage.html 更具体地说:“警告永远不要,永远不要,永远不要使用 Python 字符串连接 (+) 或字符串参数插值 (%) 来传递变量到 SQL 查询字符串。即使在枪口下。“(即不要使用 .format... 在查询中插入参数)。
  • @Bruno 感谢您的评论。除了使用“.format”,将参数插入我的 SQL 查询的正确方法是什么?
  • 使用%s 占位符并将列表/元组中的参数作为cursor.execute(query, params) 的第二个参数传递(NOT cursor.execute(query % params)`)。这一切都记录在我上面评论中的链接中。
  • @Bruno 再次感谢您的评论。这就是您将列表/元组中的参数作为第二个参数传递的意思吗? cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'dNSName' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower('domain_name'))";
  • 没有。在这里,您在查询字符串中对值进行硬编码。您可能应该这样做:cursor.execute("SELECT ci.NAME_VALUE NAME_VALUE FROM certificate_identity ci WHERE ci.NAME_TYPE = 'dNSName' AND reverse(lower(ci.NAME_VALUE)) LIKE reverse(lower(%s))", [ domain_name ])。请注意,第二个问题是,您还需要在执行查询后以某种方式获取结果(您永远不会遍历游标或使用任何调用从中获取数据)。第三个问题是LIKE 的工作方式(这里似乎没有使用通配符)。

标签: python sql python-3.x postgresql psycopg2


【解决方案1】:

查看Psycopg2。如果不知道数据库的所有细节,就不可能进行“剪切和粘贴”代码转储。 here 涵盖了基础知识,希望这足以让您继续前进。何时或如果您有更具体的问题,请创建一个新线程。

【讨论】:

  • 感谢您的回复。如果我使用 sys.args 让用户在运行时指定域,则查询有效。仅当我尝试从 .txt 文件中读取时,它才不起作用。您知道获取域名并将其实现到查询中的有效方法吗?
  • 您的意思是,如果您执行domain_name = sys.argv[1] 之类的操作,它会起作用,但实际上,它不起作用? domain_name 在这两种情况下是什么样子的?
  • 嗨,我使用上面的 sys.argv 添加了工作代码,因为它不允许我将它放在这个窗口中。
  • 您可以添加类似print("domain_name: {}".format(domain_name)) 的内容吗?在domain_name = get_domain_name() 之后。然后显示哪些有效,哪些无效的输出。
  • 您好,感谢您回复我。在 domain_name = get_domain_name() 下面添加 print("domain_name: {}".format(domain_name)) 后,我只打印了我在 .txt 文件中拥有的域的名称。对于如何让他工作,您还有其他想法吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-12-30
  • 1970-01-01
  • 2012-09-07
  • 2017-05-12
  • 1970-01-01
  • 2021-07-06
  • 1970-01-01
相关资源
最近更新 更多