首先引入相关的模块
import time
import requests
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import pymongo
定义一个 Jingdong 类用来抓取京东的商品信息,既然要抓取商品,就一定需要商品的关键字信息,所以在 init 方法中需要加入 keyword 作为关键字进行搜索。将要用到的webdriver 和 WebDriverWait 实例都作为实例属性在 init 方法中进行初始化。
有了关键字参数之后需要进行搜索,将搜索的内容写入一个初始化函数中,并在创建 Jingdong 类实例的时候调用。
def __init__(self, keyword):
# keyword 是商品的关键字信息
self.keyword = keyword
self.url = 'https://www.jd.com/'
self.try_count = 0
self.browser = webdriver.Chrome()
self.wait = WebDriverWait(self.browser, 10)
# 在创建实例的时候执行初始化操作
self._initialize()
def _initialize(self):
# 访问京东首页
self.browser.get(self.url)
try:
# 获取顶部搜索框以及确定按钮,为了防止元素还未出现引起获取失败,这里使用了 WebDriverWait
# presence_of_element_located 是等待到定位元素出现
# element_to_be_clickable 是等待到定位元素可点击
search_input = self.wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[@class="form"]/input')
))
search_confirm = self.wait.until(EC.element_to_be_clickable(
(By.XPATH, '//div[@class="form"]/button')
))
# 清空搜索框中的文本信息
search_input.clear()
# 将要查询的商品信息输入搜索框
search_input.send_keys(self.keyword)
# 点击搜索
search_confirm.click()
except TimeoutException:
# 如果超时,先等待 5 秒,在重新操作
time.sleep(5)
self.try_count += 1
# 如果失败了 3 次,终止操作
if self.try_count <= 3:
self._initialize()
else:
print("访问超时")
搜索关键字完成之后,需要获取页面源代码,然后通过 xpath 等解析库进行解析,获取需要的信息,在一个页面获取完成之后需要进行翻页,将这些操作写入一个 page 方法中。
def page(self, page):
"""
Used to get the source code of a page
@params:
page - the target page number
"""
# 打印当前正在抓取的是第几个页面
print('正在抓取第 {} 页的 {} 信息:'.format(page, self.keyword))
# 京东因为滚动条滑动到页面底部时才会显示完整的页面信息,所以执行 javascript 代码
# 是滚动条滑动到页面底部
self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
time.sleep(0.5)
self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
try:
# 如果当前抓取的页码不是第一页
if page > 1:
# 获取底部分页中的页码输入框和确定按钮
page_input = self.wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[@id="J_bottomPage"]//input')
))
page_confirm = self.wait.until(EC.element_to_be_clickable(
(By.XPATH, '//*[@id="J_bottomPage"]/span[2]/a')
))
# 清空输入框并输入当前页码,然后点击确定翻页
page_input.clear()
page_input.send_keys(page)
page_confirm.click()
# 为了确保翻页成功(获取的是指定页面的信息),需要等到 curr 类所在的 a 标签中的文本
# 刚好是当前需要抓取的页码
self.wait.until(EC.text_to_be_present_in_element(
(By.XPATH, '//span[@class="p-num"]/a[@class="curr"]'), str(page)
))
# 等待商品信息出现
self.wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[@id="J_goodsList"]/ul/li')
))
# 获取当前页面的商品信息
self.products()
except TimeoutException:
time.sleep(5)
self.try_count += 1
if self.try_count <= 3:
self.page(page)
else:
print("访问超时")
然后就是解析页面,获取当前页所有的商品信息,将其写入一个 products 方法中
def products(self):
html = etree.HTML(self.browser.page_source)
items = html.xpath('//div[@id="J_goodsList"]/ul/li')
for item in items:
iPhone = {}
iPhone['price'] = item.xpath('.//div[@class="p-price"]/strong/i/text()')[0]
iPhone['intro'] = item.xpath('.//div[contains(@class, "p-name")]/a/@title')[0]
iPhone['reviews_num'] = item.xpath('.//div[@class="p-commit"]/strong/a/text()')[0]
iPhone['shop'] = process_item(item.xpath('.//div[@class="p-shop"]//a/text()'))
print(iPhone)
# save_product(iPhone, 'localhost', 'shop', 'jingdong')
最后就是保存到数据库了,这个操作不是必须的,有时候我们并不需要将获取的信息存入数据库,所以保存到数据库的操作没有封装到 Jingdong 类中,而是写到了一个外部函数 save_product 中。
def save_product(product, mongo_url, mongo_db, collection):
client = pymongo.MongoClient(mongo_url)
collection = client[mongo_db][collection]
collection.insert_one(product)
完整代码
import time
import requests
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import pymongo
"""
使用 selenium + xpath 抓取京东 iPhone 前 100 页
"""
class Jingdong(object):
"""
A class for crawling goods infomation in jingdong
Main Methods:
_initialize - perform some _initialization operations
page - get a page source code
products - get goods infomation from the current page
"""
def __init__(self, keyword):
# keyword 是商品的关键字信息
self.keyword = keyword
self.url = 'https://www.jd.com/'
self.try_count = 0
self.browser = webdriver.Chrome()
self.wait = WebDriverWait(self.browser, 10)
# 在创建实例的时候执行初始化操作
self._initialize()
def _initialize(self):
# 访问京东首页
self.browser.get(self.url)
try:
# 获取顶部搜索框以及确定按钮,为了防止元素还未出现引起获取失败,这里使用了 WebDriverWait
# presence_of_element_located 是等待到定位元素出现
# element_to_be_clickable 是等待到定位元素可点击
search_input = self.wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[@class="form"]/input')
))
search_confirm = self.wait.until(EC.element_to_be_clickable(
(By.XPATH, '//div[@class="form"]/button')
))
# 清空搜索框中的文本信息
search_input.clear()
# 将要查询的商品信息输入搜索框
search_input.send_keys(self.keyword)
# 点击搜索
search_confirm.click()
except TimeoutException:
# 如果超时,先等待 5 秒,在重新操作
time.sleep(5)
self.try_count += 1
# 如果失败了 3 次,终止操作
if self.try_count <= 3:
self._initialize()
else:
print("访问超时")
def page(self, page):
"""
Used to get the source code of a page
@params:
page - the target page number
"""
# 打印当前正在抓取的是第几个页面
print('正在抓取第 {} 页的 {} 信息:'.format(page, self.keyword))
# 京东因为滚动条滑动到页面底部时才会显示完整的页面信息,所以执行 javascript 代码
# 是滚动条滑动到页面底部
self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
time.sleep(0.5)
self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
try:
# 如果当前抓取的页码不是第一页
if page > 1:
# 获取底部分页中的页码输入框和确定按钮
page_input = self.wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[@id="J_bottomPage"]//input')
))
page_confirm = self.wait.until(EC.element_to_be_clickable(
(By.XPATH, '//*[@id="J_bottomPage"]/span[2]/a')
))
# 清空输入框并输入当前页码,然后点击确定翻页
page_input.clear()
page_input.send_keys(page)
page_confirm.click()
# 为了确保翻页成功(获取的是指定页面的信息),需要等到 curr 类所在的 a 标签中的文本
# 刚好是当前需要抓取的页码
self.wait.until(EC.text_to_be_present_in_element(
(By.XPATH, '//span[@class="p-num"]/a[@class="curr"]'), str(page)
))
# 等待商品信息出现
self.wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[@id="J_goodsList"]/ul/li')
))
# 获取当前页面的商品信息
self.products()
except TimeoutException:
time.sleep(5)
self.try_count += 1
if self.try_count <= 3:
self.page(page)
else:
print("访问超时")
def products(self):
html = etree.HTML(self.browser.page_source)
items = html.xpath('//div[@id="J_goodsList"]/ul/li')
for item in items:
iPhone = {}
iPhone['price'] = item.xpath('.//div[@class="p-price"]/strong/i/text()')[0]
iPhone['intro'] = item.xpath('.//div[contains(@class, "p-name")]/a/@title')[0]
iPhone['reviews_num'] = item.xpath('.//div[@class="p-commit"]/strong/a/text()')[0]
iPhone['shop'] = process_item(item.xpath('.//div[@class="p-shop"]//a/text()'))
print(iPhone)
# save_product(iPhone, 'localhost', 'shop', 'jingdong')
# 不知道为什么,在测试的时候中间有一个商品没有查询到店铺信息而报错,所以添加了这个函数用来检测
def process_item(item):
return item[0] if len(item) > 0 else ''
def save_product(product, mongo_url, mongo_db, collection):
client = pymongo.MongoClient(mongo_url)
collection = client[mongo_db][collection]
collection.insert_one(product)
def main():
jingdong = Jingdong('iPhone')
for i in range(1, 101):
jingdong.page(i)
if __name__ == "__main__":
main()
代码执行情况:
数据库中的保存情况: