yoyo1216

阿里云oss存储图片

单线程版-上传网络流

import pymysql
import oss2
import requests
import logging

# 添加日志

logging.basicConfig(
    level=logging.INFO,  # 定义输出到文件的log级别,大于此级别的都被输出
    format=\'%(asctime)s  %(filename)s  %(levelname)s : %(message)s\',  # 定义输出log的格式
    datefmt=\'%Y-%m-%d %H:%M:%S\',  # 时间
    filename=\'Error.log\',  # log文件名
    filemode=\'a\')  # 写入模式“w”或“a”


class osss(object):

    def __init__(self):
        self.db = pymysql.connect(host=\'\', port=3306, database=\'spider_yu\', user=\'spider\',
                                  password=\'\', charset=\'utf8\')
        # self.db = pymysql.connect(host=\'127.0.0.1\', port=3306, database=\'shuo\', user=\'root\',
        #                      password=\'root\', charset=\'utf8\')
        self.cursor = self.db.cursor()
        self.auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
        self.bucket = oss2.Bucket(self.auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
        self.numbre = 0
        while True:
            self.ssh_fun()

    def ssh_fun(self):
        num = self.cursor.execute(
            \'select id, big, avatar from cqyy_product_info where id>{} limit 1000\'.format(self.numbre))
        if int(num) == int(0):
            exit()
        data_tuple = self.cursor.fetchall()
        for data in data_tuple:
            self.numbre = data[0]
            big = data[1]
            if "?" in big:
                big = big.split(\'?\')[0]
            avatar = data[2]
            if "?" in avatar:
                avatar = avatar.split(\'?\')[0]
            # print(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(self.numbre, big, avatar))
            logging.info(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(self.numbre, big, avatar))
            try:
                if \',\' in big:
                    big = big.split(\',\')
                    big_list = []
                    for big_url in big:
                        if \'/\' in big_url:
                            big_name = big_url.split(\'/\')[-1]
                            big_list.append(big_name)
                            num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
                            if not num:
                                input = requests.get(big_url)
                                self.bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
                                self.cursor.execute(
                                    \'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
                                self.db.commit()

                    big_oss = \',\'.join(big_list)
                    self.cursor.execute(
                        \'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_oss, self.numbre))
                    self.db.commit()

                else:
                    if \'/\' in big:
                        big_name = big.split(\'/\')[-1]
                        self.cursor.execute(
                            \'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_name, self.numbre))
                        self.db.commit()
                        num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
                        if not num:
                            input = requests.get(big)
                            self.bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
                            self.cursor.execute(
                                \'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
                            self.db.commit()

                if \',\' in avatar:
                    avatar = avatar.split(\',\')
                    avatar_list = []
                    for avatar_url in avatar:
                        if \'/\' in avatar_url:
                            avatar_name = avatar_url.split(\'/\')[-1]
                            avatar_list.append(avatar_name)
                            num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
                            if not num:
                                input = requests.get(avatar_url)
                                self.bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
                                self.cursor.execute(
                                    \'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
                                self.db.commit()
                    avatar_oss = \',\'.join(avatar_list)
                    self.cursor.execute(
                        \'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_oss, self.numbre))
                    self.db.commit()

                else:
                    if \'/\' in avatar:
                        avatar_name = avatar.split(\'/\')[-1]
                        self.cursor.execute(
                            \'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_name,
                                                                                              self.numbre))
                        self.db.commit()
                        num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
                        if not num:
                            input = requests.get(avatar)
                            self.bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
                            self.cursor.execute(
                                \'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
                            self.db.commit()
            except Exception as e:
                # print(\'oss存入失败 id:"{}"  错误类型:"{}"\'.format(self.numbre, e))
                logging.info(\'oss存入失败 id:"{}"  错误类型:"{}"\'.format(self.numbre, e))


if __name__ == \'__main__\':
    osss = osss()

多线程版-上传网络流

import paramiko, threading
import queue
import pymysql
import oss2
import requests
import logging

# 添加日志

logging.basicConfig(
    level=logging.INFO,  # 定义输出到文件的log级别,大于此级别的都被输出
    format=\'%(asctime)s  %(filename)s  %(levelname)s : %(message)s\',  # 定义输出log的格式
    datefmt=\'%Y-%m-%d %H:%M:%S\',  # 时间
    filename=\'Error.log\',  # log文件名
    filemode=\'a\')  # 写入模式“w”或“a”


class ThreadPool(object):
    def __init__(self, maxsize):
        self.maxsize = maxsize
        self._q = queue.Queue(self.maxsize)
        for i in range(self.maxsize):
            self._q.put(threading.Thread)

    def getThread(self):
        return self._q.get()

    def addThread(self):
        self._q.put(threading.Thread)


dbs = pymysql.connect(host=\'\', port=3306, database=\'spider_yu\', user=\'spider\',
                      password=\'\', charset=\'utf8\')


def ssh_fun(numbre, big, avatar, pool, db, bucket):
    print(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(numbre, big, avatar))
    logging.info(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(numbre, big, avatar))
    # dbs = pymysql.connect(host=\'127.0.0.1\', port=3306, database=\'shuo\', user=\'root\',
    #                       password=\'root\', charset=\'utf8\')
    # dbs = pymysql.connect(host=\'115.238.111.198\', port=3306, database=\'spider_yu\', user=\'spider\',
    #                       password=\'Kangce@0608\', charset=\'utf8\')
    cursor = dbs.cursor()
    try:
        if \',\' in big:
            big = big.split(\',\')
            big_list = []
            for big_url in big:
                if \'/\' in big_url:
                    big_name = big_url.split(\'/\')[-1]
                    big_list.append(big_name)
                    num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
                    if not num:
                        # auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
                        # bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
                        input = requests.get(big_url)
                        bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
                        cursor.execute(
                            \'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
                        dbs.commit()

            big_oss = \',\'.join(big_list)
            cursor.execute(
                \'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_oss, numbre))
            dbs.commit()

        else:
            if \'/\' in big:
                big_name = big.split(\'/\')[-1]
                cursor.execute(
                    \'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_name, numbre))
                dbs.commit()
                num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
                if not num:
                    # auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
                    # bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
                    input = requests.get(big)
                    bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
                    cursor.execute(
                        \'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
                    dbs.commit()

        if \',\' in avatar:
            avatar = avatar.split(\',\')
            avatar_list = []
            for avatar_url in avatar:
                if \'/\' in avatar_url:
                    avatar_name = avatar_url.split(\'/\')[-1]
                    avatar_list.append(avatar_name)
                    num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
                    if not num:
                        # auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
                        # bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
                        input = requests.get(avatar_url)
                        bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
                        cursor.execute(
                            \'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
                        dbs.commit()
            avatar_oss = \',\'.join(avatar_list)
            cursor.execute(
                \'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_oss, numbre))
            dbs.commit()

        else:
            if \'/\' in avatar:
                avatar_name = avatar.split(\'/\')[-1]
                cursor.execute(
                    \'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_name, numbre))
                dbs.commit()
                num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
                if not num:
                    # auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
                    # bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
                    input = requests.get(avatar)
                    bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
                    cursor.execute(
                        \'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
                    dbs.commit()
    except Exception as e:
        # print(\'oss存入失败 id:"{}"  错误类型:"{}"\'.format(numbre, e))
        logging.info(\'oss存入失败 id:"{}"  错误类型:"{}"\'.format(numbre, e))

    finally:
        # dbs.close()
        pool.addThread()


if __name__ == \'__main__\':
    t_list = []
    pool = ThreadPool(3)
    numbre = 0
    # db = pymysql.connect(host=\'127.0.0.1\', port=3306, database=\'shuo\', user=\'root\',
    #                      password=\'root\', charset=\'utf8\')
    db = pymysql.connect(host=\'\', port=3306, database=\'spider_yu\', user=\'spider\',
                         password=\'\', charset=\'utf8\')
    cursor = db.cursor()
    auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
    bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
    while True:
        num = cursor.execute(\'select id, big, avatar from cqyy_product_info where id>{} limit 1000\'.format(numbre))
        if int(num) == int(0):
            exit()
        data_tuple = cursor.fetchall()
        for data in data_tuple:
            numbre = data[0]
            big = data[1]
            if "?" in big:
                big = big.split(\'?\')[0]
            avatar = data[2]
            if "?" in avatar:
                avatar = avatar.split(\'?\')[0]
            # if int(numbre) == int(20):
            #     exit()
            th = pool.getThread()
            t = th(target=ssh_fun, args=(numbre, big, avatar, pool, db, bucket))
            t.start()
            t_list.append(t)
        for i in t_list:
            i.join()
    db.close()

  

发表于 2019-02-15 10:30  守护式等待  阅读(1703)  评论(0编辑  收藏  举报
 

分类:

技术点:

相关文章: