【问题标题】:How to listen for incoming emails in python 3?如何在 python 3 中收听传入的电子邮件?
【发布时间】:2019-12-28 17:06:56
【问题描述】:

目标是让应用程序在 python 3 中运行并读取特定 gmail 帐户上的传入电子邮件,如何监听这些电子邮件的接收?

它应该做的是等到收件箱收到新邮件,从电子邮件中读取主题和正文,然后从正文中获取文本(无格式)。

这是我目前得到的:

import imaplib
import email
import datetime
import time

mail = imaplib.IMAP4_SSL('imap.gmail.com', 993)
mail.login(user, password)
mail.list()
mail.select('inbox')

status, data = mail.search(None, 'ALL')
for num in data[0].split():
    status, data = mail.fetch(num, '(RFC822)')
    email_msg = data[0][1]
    email_msg = email.message_from_bytes(email_msg)
    maintype = email_msg.get_content_maintype()
    if maintype == 'multipart':
        for part in email_msg.get_payload():
            if part.get_content_maintype() == 'text':
                print(part.get_payload())
    elif maintype == 'text':
        print(email_msg.get_payload())

但这有几个问题:当消息是多部分时,每个部分都会被打印出来,有时最后一部分基本上是整个消息,但是是 html 格式。

此外,这会打印收件箱中的所有消息,如何使用 imaplib 收听新电子邮件?或与其他图书馆。

【问题讨论】:

    标签: python-3.x email


    【解决方案1】:

    我不确定这样做的同步方式,但如果您不介意使用异步循环并将未读电子邮件定义为目标,那么它可以工作。
    (我没有实现 IMAP 轮询循环,只有电子邮件获取循环)

    我的改变

    1. 将 IMAP 搜索过滤器从 'ALL' 替换为 '(UNSEEN)' 以获取未读电子邮件。
    2. serializing policy 从默认的 policy.Compat32 更改为 policy.SMTP
    3. 使用 email.message.walk() 方法(新 API)运行和过滤消息部分。
    4. the docs 中描述的新 API 调用替换旧的 email API 调用,
      并在 these examples 中演示。

    结果代码

    import imaplib, email, getpass
    from email import policy
    
    imap_host = 'imap.gmail.com'
    imap_user = 'example@gmail.com'
    
    # init imap connection
    mail = imaplib.IMAP4_SSL(imap_host, 993)
    rc, resp = mail.login(imap_user, getpass.getpass())
    
    # select only unread messages from inbox
    mail.select('Inbox')
    status, data = mail.search(None, '(UNSEEN)')
    
    # for each e-mail messages, print text content
    for num in data[0].split():
        # get a single message and parse it by policy.SMTP (RFC compliant)
        status, data = mail.fetch(num, '(RFC822)')
        email_msg = data[0][1]
        email_msg = email.message_from_bytes(email_msg, policy=policy.SMTP)
    
        print("\n----- MESSAGE START -----\n")
    
        print("From: %s\nTo: %s\nDate: %s\nSubject: %s\n\n" % ( \
            str(email_msg['From']), \
            str(email_msg['To']), \
            str(email_msg['Date']), \
            str(email_msg['Subject'] )))
    
        # print only message parts that contain text data
        for part in email_msg.walk():
            if part.get_content_type() == "text/plain":
                for line in part.get_content().splitlines():
                    print(line)
    
        print("\n----- MESSAGE END -----\n")
    

    【讨论】:

    • 我认为将范围缩小到仅未读的电子邮件可能会导致问题。
    【解决方案2】:

    您是否检查了 git 用户 nickoala 发布的 here 的以下脚本 (3_emailcheck.py)?它是一个 python 2 脚本,在 Python3 中,您需要先用电子邮件内容解码字节。

    import time
    from itertools import chain
    import email
    import imaplib
    
    imap_ssl_host = 'imap.gmail.com'  # imap.mail.yahoo.com
    imap_ssl_port = 993
    username = 'USERNAME or EMAIL ADDRESS'
    password = 'PASSWORD'
    
    # Restrict mail search. Be very specific.
    # Machine should be very selective to receive messages.
    criteria = {
        'FROM':    'PRIVILEGED EMAIL ADDRESS',
        'SUBJECT': 'SPECIAL SUBJECT LINE',
        'BODY':    'SECRET SIGNATURE',
    }
    uid_max = 0
    
    
    def search_string(uid_max, criteria):
        c = list(map(lambda t: (t[0], '"'+str(t[1])+'"'), criteria.items())) + [('UID', '%d:*' % (uid_max+1))]
        return '(%s)' % ' '.join(chain(*c))
        # Produce search string in IMAP format:
        #   e.g. (FROM "me@gmail.com" SUBJECT "abcde" BODY "123456789" UID 9999:*)
    
    
    def get_first_text_block(msg):
        type = msg.get_content_maintype()
    
        if type == 'multipart':
            for part in msg.get_payload():
                if part.get_content_maintype() == 'text':
                    return part.get_payload()
        elif type == 'text':
            return msg.get_payload()
    
    
    server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
    server.login(username, password)
    server.select('INBOX')
    
    result, data = server.uid('search', None, search_string(uid_max, criteria))
    
    uids = [int(s) for s in data[0].split()]
    if uids:
        uid_max = max(uids)
        # Initialize `uid_max`. Any UID less than or equal to `uid_max` will be ignored subsequently.
    
    server.logout()
    
    
    # Keep checking messages ...
    # I don't like using IDLE because Yahoo does not support it.
    while 1:
        # Have to login/logout each time because that's the only way to get fresh results.
    
        server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
        server.login(username, password)
        server.select('INBOX')
    
        result, data = server.uid('search', None, search_string(uid_max, criteria))
    
        uids = [int(s) for s in data[0].split()]
        for uid in uids:
            # Have to check again because Gmail sometimes does not obey UID criterion.
            if uid > uid_max:
                result, data = server.uid('fetch', uid, '(RFC822)')  # fetch entire message
                msg = email.message_from_string(data[0][1])
    
                uid_max = uid
    
                text = get_first_text_block(msg)
                print 'New message :::::::::::::::::::::'
                print text
    
        server.logout()
    time.sleep(1)
    

    【讨论】:

      猜你喜欢
      • 2021-11-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-03
      相关资源
      最近更新 更多