单线程版-上传网络流
import pymysql
import oss2
import requests
import logging
# 添加日志
logging.basicConfig(
level=logging.INFO, # 定义输出到文件的log级别,大于此级别的都被输出
format=\'%(asctime)s %(filename)s %(levelname)s : %(message)s\', # 定义输出log的格式
datefmt=\'%Y-%m-%d %H:%M:%S\', # 时间
filename=\'Error.log\', # log文件名
filemode=\'a\') # 写入模式“w”或“a”
class osss(object):
def __init__(self):
self.db = pymysql.connect(host=\'\', port=3306, database=\'spider_yu\', user=\'spider\',
password=\'\', charset=\'utf8\')
# self.db = pymysql.connect(host=\'127.0.0.1\', port=3306, database=\'shuo\', user=\'root\',
# password=\'root\', charset=\'utf8\')
self.cursor = self.db.cursor()
self.auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
self.bucket = oss2.Bucket(self.auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
self.numbre = 0
while True:
self.ssh_fun()
def ssh_fun(self):
num = self.cursor.execute(
\'select id, big, avatar from cqyy_product_info where id>{} limit 1000\'.format(self.numbre))
if int(num) == int(0):
exit()
data_tuple = self.cursor.fetchall()
for data in data_tuple:
self.numbre = data[0]
big = data[1]
if "?" in big:
big = big.split(\'?\')[0]
avatar = data[2]
if "?" in avatar:
avatar = avatar.split(\'?\')[0]
# print(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(self.numbre, big, avatar))
logging.info(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(self.numbre, big, avatar))
try:
if \',\' in big:
big = big.split(\',\')
big_list = []
for big_url in big:
if \'/\' in big_url:
big_name = big_url.split(\'/\')[-1]
big_list.append(big_name)
num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
if not num:
input = requests.get(big_url)
self.bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
self.cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
self.db.commit()
big_oss = \',\'.join(big_list)
self.cursor.execute(
\'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_oss, self.numbre))
self.db.commit()
else:
if \'/\' in big:
big_name = big.split(\'/\')[-1]
self.cursor.execute(
\'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_name, self.numbre))
self.db.commit()
num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
if not num:
input = requests.get(big)
self.bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
self.cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
self.db.commit()
if \',\' in avatar:
avatar = avatar.split(\',\')
avatar_list = []
for avatar_url in avatar:
if \'/\' in avatar_url:
avatar_name = avatar_url.split(\'/\')[-1]
avatar_list.append(avatar_name)
num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
if not num:
input = requests.get(avatar_url)
self.bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
self.cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
self.db.commit()
avatar_oss = \',\'.join(avatar_list)
self.cursor.execute(
\'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_oss, self.numbre))
self.db.commit()
else:
if \'/\' in avatar:
avatar_name = avatar.split(\'/\')[-1]
self.cursor.execute(
\'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_name,
self.numbre))
self.db.commit()
num = self.cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
if not num:
input = requests.get(avatar)
self.bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
self.cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
self.db.commit()
except Exception as e:
# print(\'oss存入失败 id:"{}" 错误类型:"{}"\'.format(self.numbre, e))
logging.info(\'oss存入失败 id:"{}" 错误类型:"{}"\'.format(self.numbre, e))
if __name__ == \'__main__\':
osss = osss()
多线程版-上传网络流
import paramiko, threading
import queue
import pymysql
import oss2
import requests
import logging
# 添加日志
logging.basicConfig(
level=logging.INFO, # 定义输出到文件的log级别,大于此级别的都被输出
format=\'%(asctime)s %(filename)s %(levelname)s : %(message)s\', # 定义输出log的格式
datefmt=\'%Y-%m-%d %H:%M:%S\', # 时间
filename=\'Error.log\', # log文件名
filemode=\'a\') # 写入模式“w”或“a”
class ThreadPool(object):
def __init__(self, maxsize):
self.maxsize = maxsize
self._q = queue.Queue(self.maxsize)
for i in range(self.maxsize):
self._q.put(threading.Thread)
def getThread(self):
return self._q.get()
def addThread(self):
self._q.put(threading.Thread)
dbs = pymysql.connect(host=\'\', port=3306, database=\'spider_yu\', user=\'spider\',
password=\'\', charset=\'utf8\')
def ssh_fun(numbre, big, avatar, pool, db, bucket):
print(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(numbre, big, avatar))
logging.info(\'numbre:"{}", big:"{}", avatar:"{}"\'.format(numbre, big, avatar))
# dbs = pymysql.connect(host=\'127.0.0.1\', port=3306, database=\'shuo\', user=\'root\',
# password=\'root\', charset=\'utf8\')
# dbs = pymysql.connect(host=\'115.238.111.198\', port=3306, database=\'spider_yu\', user=\'spider\',
# password=\'Kangce@0608\', charset=\'utf8\')
cursor = dbs.cursor()
try:
if \',\' in big:
big = big.split(\',\')
big_list = []
for big_url in big:
if \'/\' in big_url:
big_name = big_url.split(\'/\')[-1]
big_list.append(big_name)
num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
if not num:
# auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
# bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
input = requests.get(big_url)
bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
dbs.commit()
big_oss = \',\'.join(big_list)
cursor.execute(
\'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_oss, numbre))
dbs.commit()
else:
if \'/\' in big:
big_name = big.split(\'/\')[-1]
cursor.execute(
\'update cqyy_product_info set big_oss="{}" where id={}\'.format(big_name, numbre))
dbs.commit()
num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(big_name))
if not num:
# auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
# bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
input = requests.get(big)
bucket.put_object(\'yxb-cqyy/drug_image/{}\'.format(big_name), input)
cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(big_name))
dbs.commit()
if \',\' in avatar:
avatar = avatar.split(\',\')
avatar_list = []
for avatar_url in avatar:
if \'/\' in avatar_url:
avatar_name = avatar_url.split(\'/\')[-1]
avatar_list.append(avatar_name)
num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
if not num:
# auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
# bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
input = requests.get(avatar_url)
bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
dbs.commit()
avatar_oss = \',\'.join(avatar_list)
cursor.execute(
\'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_oss, numbre))
dbs.commit()
else:
if \'/\' in avatar:
avatar_name = avatar.split(\'/\')[-1]
cursor.execute(
\'update cqyy_product_info set avatar_oss="{}" where id={}\'.format(avatar_name, numbre))
dbs.commit()
num = cursor.execute(\'select id from cqyy_cqyy where url="{}"\'.format(avatar_name))
if not num:
# auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
# bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
input = requests.get(avatar)
bucket.put_object(\'yxb-cqyy/head_portrait/{}\'.format(avatar_name), input)
cursor.execute(
\'insert into cqyy_cqyy(url) values ("{}")\'.format(avatar_name))
dbs.commit()
except Exception as e:
# print(\'oss存入失败 id:"{}" 错误类型:"{}"\'.format(numbre, e))
logging.info(\'oss存入失败 id:"{}" 错误类型:"{}"\'.format(numbre, e))
finally:
# dbs.close()
pool.addThread()
if __name__ == \'__main__\':
t_list = []
pool = ThreadPool(3)
numbre = 0
# db = pymysql.connect(host=\'127.0.0.1\', port=3306, database=\'shuo\', user=\'root\',
# password=\'root\', charset=\'utf8\')
db = pymysql.connect(host=\'\', port=3306, database=\'spider_yu\', user=\'spider\',
password=\'\', charset=\'utf8\')
cursor = db.cursor()
auth = oss2.Auth(\'LTAIjiyFNM8SukLq\', \'fH83Q8o4JDMCpid7XxJSSDm4zPGxDW\')
bucket = oss2.Bucket(auth, \'http://oss-cn-hangzhou.aliyuncs.com\', \'yxb-dev\')
while True:
num = cursor.execute(\'select id, big, avatar from cqyy_product_info where id>{} limit 1000\'.format(numbre))
if int(num) == int(0):
exit()
data_tuple = cursor.fetchall()
for data in data_tuple:
numbre = data[0]
big = data[1]
if "?" in big:
big = big.split(\'?\')[0]
avatar = data[2]
if "?" in avatar:
avatar = avatar.split(\'?\')[0]
# if int(numbre) == int(20):
# exit()
th = pool.getThread()
t = th(target=ssh_fun, args=(numbre, big, avatar, pool, db, bucket))
t.start()
t_list.append(t)
for i in t_list:
i.join()
db.close()