前言
Python爬取微信文章内容,使用Selenium自动化测试工具模拟浏览器访问微信文章内容,从而爬取微信文章内容,资源文件图片,音频,视频上传至七牛服务器
Python安装
官网下载地址:https://www.python.org/downloads/release/python-370/
个人网盘下载地址,我这里安装的是3.7.0
准备文件 | 下载地址 |
---|---|
python-3.7.0-amd64 | 链接: https://pan.baidu.com/s/1dlUjcLjemTcm7jnc0HDiwQ 提取码: typ |
Selenium安装
1 | 安装好python之后,默认就可以执行pip命令 |
ChromeDriver安装
官网下载地址:http://chromedriver.storage.googleapis.com/index.html
个人网盘下载地址,我这里安装的是71.0.3578.33,下载解压到Python安装目录,我这是放置在Scripts目录下,然后在配置环境变量即可
准备文件 | 下载地址 |
---|---|
chromedriver_win32_71.0.3578.33 | 链接: https://pan.baidu.com/s/1dlUjcLjemTcm7jnc0HDiwQ 提取码: typ |
Python Requests安装
1 | pip install requests |
Python Myql安装
1 | pip install mysql-connector |
Python bs4安装
1 | pip install bs4 |
Python urllib安装
1 | pip install urllib |
微信文章内容爬取
配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40# -*- coding: utf-8 -*-
import mysql.connector
from qiniu import Auth
from qiniu import BucketManager
wechat_article_url = "https://mp.weixin.qq.com/s/{}"
wechat_video_url = "https://res.wx.qq.com/voice/getvoice?mediaid={}"
db = mysql.connector.connect(
host="192.168.0.134",
user="root",
passwd="123456q",
database="smart_trip",
charset='utf8'
)
# 七牛秘钥
qiniu_access_key = "xxxxxxxxxxxxx";
qiniu_secret_key = "xxxxxxxxxxx"
#要上传的空间
qiniu_bucket_name = 'xxxxx'
qiniu_auth = Auth(qiniu_access_key, qiniu_secret_key)
#生成上传 Token,可以指定过期时间等
token = qiniu_auth.upload_token(qiniu_bucket_name, None, 36000)
qiniu_bucket = BucketManager(qiniu_auth)
# 批次
batch = 1
# 初始化chrome浏览器, 以下方式是无窗口模式
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
def get_chrom():
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome = webdriver.Chrome(chrome_options=chrome_options)
return chrome程序入口
1
2
3
4
5
6
7
8
9
10# -*- coding: utf-8 -*-
# 爬取微信公众号内容
from wechat import wechat_article as spider
from common import config as conf
chrome = conf.get_chrom()
spider.spider_article("G1FvfYtcd5FVCJbaFShEGA", chrome)
chrome.close()
print("done")微信文章内容处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88# -*- coding: utf-8 -*-
# 爬取列表内容
from common import config as conf
from common import article_dao as dao
from common import qiniu_utils as qiniu
import bs4
from bs4 import BeautifulSoup
def handle_p(item, article_id, chrome, db_connect_cur):
if len(item.select("iframe")) > 0:
item_frame = chrome.find_elements_by_tag_name("iframe")[0]
chrome.switch_to.frame(item_frame)
iframe_text = str(chrome.page_source)
print(iframe_text)
dao.save_article_p(article_id=article_id, type='video',content=iframe_text, db_connect_cur=db_connect_cur)
# 处理音频
elif len(item.select("mpvoice")) >0:
video_source = item.select("mpvoice")[0].get("voice_encode_fileid")
print(video_source)
# 爬取音频
video_key = qiniu.spider_source(conf.source_url.format(video_source))
# # 保存内容
dao.save_article_p(article_id=article_id, type='audio',content=video_key, db_connect_cur=db_connect_cur)
elif len(item.select("img")) > 0:
source_img = item.select("img")[0].get("data-src")
print(source_img)
# 爬取图片
img_key = qiniu.spider_source(conf.source_url.format(source_img))
# 保存内容
dao.save_article_p(article_id=article_id, type='img',content=img_key, db_connect_cur=db_connect_cur)
return img_key
else:
tag_str = ''
for tag in item.children:
tag_str += str(tag)
print(tag_str)
dao.save_article_p(article_id=article_id, type='text',content=tag_str, db_connect_cur=db_connect_cur)
def spider_article(article_id, chrome):
db_connect_cur = conf.db.cursor(buffered=True)
if dao.check_article_exists(article_id, db_connect_cur):
print("article_id already exists", article_id)
return article_id
# url format
article_url = conf.wechat_article_url.format(article_id)
chrome.get(article_url)
chrome.find_element_by_id("publish_time").click()
content_html = BeautifulSoup(chrome.page_source, 'html.parser')
# print(content_html)
content_div = content_html.find(name="div", attrs={"class":"rich_media_content "})
try:
thumbnail = None
for item in content_div.children:
if isinstance(item, bs4.element.Tag):
if item.name == "p":
thumbnail_tmp = handle_p(item, article_id, chrome, db_connect_cur)
if thumbnail is None:
thumbnail = thumbnail_tmp
elif item.name == "section":
for item_p in item.select("p"):
thumbnail_tmp = handle_p(item_p, article_id, chrome, db_connect_cur)
if thumbnail is None:
thumbnail = thumbnail_tmp
else:
tag_str = ''
for tag in item.children:
tag_str += str(tag)
print(tag_str)
dao.save_article_p(article_id=article_id, type='text',content=tag_str, db_connect_cur=db_connect_cur)
else:
print(str(item))
dao.save_article_p(article_id=article_id, type='text',content=str(item), db_connect_cur=db_connect_cur)
title = content_html.select(".rich_media_title")[0].getText()
wechat = content_html.select(".rich_media_meta_nickname a")[0].getText()
content_html.select(".rich_media_meta_list")[0].find(name="em", attrs={"id":"publish_time"})
article = {"id":article_id, "title":str(title).strip(), "url":"", "createdAt": "", "wechat":str(wechat).strip() }
dao.save_wechat_article(article, thumbnail, db_connect_cur)
except BaseException as e:
print("except :" , e)
conf.db.rollback()
return 0
else:
conf.db.commit()
return article_id内存保存Dao
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55import time
from common import config as conf
# 保存爬取内容item
def save_article_p(article_id, type, content, db_connect_cur):
sql = "insert into st_cms_article_tmp_p_tmp (article_id, type, content) " \
"values (%s, %s, %s)"
val = (article_id, type, content)
db_connect_cur.execute(sql, val)
def save_huoban_article(article = {}, img_key = '', db_connect_cur = None):
now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
sql = "insert into st_cms_article_tmp (id, title, url, category_id, categories, thumbnail, description, createdAt, " \
"updatedAt, viewCount, shareCount, creatorId, creatorName, creatorAvatar, batch, spiderTime, source, sourcePostTime, weChat) " \
"values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s , %s, %s , %s , %s , %s , %s)"
val = (article["id"], article["title"], article["url"], conf.categories[article["categories"]], article["categories"],
img_key, article["description"], article["createdAt"], article["updatedAt"], article["viewCount"], article["shareCount"],
article["creatorId"], article["creatorName"], article["creatorAvatar"], conf.batch, now, 'HUOBAN', article["createdAt"], '')
print("article[creatorName] ",article)
db_connect_cur.execute(sql, val)
def save_wechat_article(article = {}, img_key = '', db_connect_cur = None):
now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
sql = "insert into st_cms_article_tmp (id, title, url, category_id, categories, thumbnail, description, createdAt, " \
"updatedAt, viewCount, shareCount, creatorId, creatorName, creatorAvatar, batch, spiderTime, source, sourcePostTime, weChat) " \
"values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s , %s , %s , %s , %s , %s)"
val = (article["id"], article["title"], '', '0', '', img_key, '', '', '', '0', '0', '', '', '', "0", now, 'WECHAT', '', article["wechat"])
print("article[creatorName] ",article)
db_connect_cur.execute(sql, val)
def check_article_exists(article_id, db_connect_cur):
sql = "select * from st_cms_article_tmp where id = '{}'"
db_connect_cur.execute(sql.format(article_id))
# 如果已爬取,则跳过
if db_connect_cur.rowcount > 0:
return True
else:
return False
def save_checkin(video, content, ref_id, db_connect_cur = None):
now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
sql = "insert into st_cms_checkin (video, content, ref_id, collect_time) " \
"values (%s, %s, %s, %s)"
val = (video, content, ref_id, now)
db_connect_cur.execute(sql, val)
def check_checkin_exists(ref_id, db_connect_cur):
sql = "select * from st_cms_checkin where ref_id = '{}'"
db_connect_cur.execute(sql.format(ref_id))
# 如果已爬取,则跳过
if db_connect_cur.rowcount > 0:
return True
else:
return False
总结
- 遇到的问题:最开始使用的是urllib.request.Request方式爬取网页内容,但是遇到iframe,包括一些需要触发点击事件才能显示文本内容,使用urllib类库包根本没法解决,后面才使用新的Selenium模拟浏览器访问即可解决以上问题。主要包含chrome.find_element_by_id(“publish_time”).click()点击事件,包括iframe的chrome.switch_to.frame(item_frame)。
- 针对这种简单内容的爬虫,主要是就document处理,分析文章的内容结构。针对性的爬取所需要的内容即可,但是如果站点结构发生,爬虫程序得做对应的修改。