Python + Selenium + ChromeDriver 爬取微信文章

前言

Python爬取微信文章内容,使用Selenium自动化测试工具模拟浏览器访问微信文章内容,从而爬取微信文章内容,资源文件图片,音频,视频上传至七牛服务器

Python安装

官网下载地址:https://www.python.org/downloads/release/python-370/

个人网盘下载地址,我这里安装的是3.7.0

准备文件 下载地址
python-3.7.0-amd64 链接: https://pan.baidu.com/s/1dlUjcLjemTcm7jnc0HDiwQ 提取码: typ

Selenium安装

1
2
# 安装好python之后,默认就可以执行pip命令
pip install selenium

ChromeDriver安装

官网下载地址:http://chromedriver.storage.googleapis.com/index.html

个人网盘下载地址,我这里安装的是71.0.3578.33,下载解压到Python安装目录,我这是放置在Scripts目录下,然后在配置环境变量即可

准备文件 下载地址
chromedriver_win32_71.0.3578.33 链接: https://pan.baidu.com/s/1dlUjcLjemTcm7jnc0HDiwQ 提取码: typ

Python Requests安装

1
pip install requests

Python Myql安装

1
pip install mysql-connector

Python bs4安装

1
pip install bs4

Python urllib安装

1
pip install urllib

微信文章内容爬取

  • 配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    # -*- coding: utf-8 -*-
    import mysql.connector
    from qiniu import Auth
    from qiniu import BucketManager

    wechat_article_url = "https://mp.weixin.qq.com/s/{}"
    wechat_video_url = "https://res.wx.qq.com/voice/getvoice?mediaid={}"


    db = mysql.connector.connect(
    host="192.168.0.134",
    user="root",
    passwd="123456q",
    database="smart_trip",
    charset='utf8'
    )

    # 七牛秘钥
    qiniu_access_key = "xxxxxxxxxxxxx";
    qiniu_secret_key = "xxxxxxxxxxx"
    #要上传的空间
    qiniu_bucket_name = 'xxxxx'

    qiniu_auth = Auth(qiniu_access_key, qiniu_secret_key)
    #生成上传 Token,可以指定过期时间等
    token = qiniu_auth.upload_token(qiniu_bucket_name, None, 36000)
    qiniu_bucket = BucketManager(qiniu_auth)

    # 批次
    batch = 1

    # 初始化chrome浏览器, 以下方式是无窗口模式
    from selenium.webdriver.chrome.options import Options
    from selenium import webdriver
    def get_chrom():
    chrome_options = Options()
    chrome_options.add_argument('--headless')
    chrome_options.add_argument('--disable-gpu')
    chrome = webdriver.Chrome(chrome_options=chrome_options)
    return chrome
  • 程序入口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # -*- coding: utf-8 -*-
    # 爬取微信公众号内容
    from wechat import wechat_article as spider
    from common import config as conf


    chrome = conf.get_chrom()
    spider.spider_article("G1FvfYtcd5FVCJbaFShEGA", chrome)
    chrome.close()
    print("done")
  • 微信文章内容处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    # -*- coding: utf-8 -*-
    # 爬取列表内容
    from common import config as conf
    from common import article_dao as dao
    from common import qiniu_utils as qiniu
    import bs4
    from bs4 import BeautifulSoup

    def handle_p(item, article_id, chrome, db_connect_cur):
    if len(item.select("iframe")) > 0:
    item_frame = chrome.find_elements_by_tag_name("iframe")[0]
    chrome.switch_to.frame(item_frame)
    iframe_text = str(chrome.page_source)
    print(iframe_text)
    dao.save_article_p(article_id=article_id, type='video',content=iframe_text, db_connect_cur=db_connect_cur)
    # 处理音频
    elif len(item.select("mpvoice")) >0:
    video_source = item.select("mpvoice")[0].get("voice_encode_fileid")
    print(video_source)
    # 爬取音频
    video_key = qiniu.spider_source(conf.source_url.format(video_source))
    # # 保存内容
    dao.save_article_p(article_id=article_id, type='audio',content=video_key, db_connect_cur=db_connect_cur)
    elif len(item.select("img")) > 0:
    source_img = item.select("img")[0].get("data-src")
    print(source_img)
    # 爬取图片
    img_key = qiniu.spider_source(conf.source_url.format(source_img))
    # 保存内容
    dao.save_article_p(article_id=article_id, type='img',content=img_key, db_connect_cur=db_connect_cur)
    return img_key
    else:
    tag_str = ''
    for tag in item.children:
    tag_str += str(tag)
    print(tag_str)
    dao.save_article_p(article_id=article_id, type='text',content=tag_str, db_connect_cur=db_connect_cur)


    def spider_article(article_id, chrome):
    db_connect_cur = conf.db.cursor(buffered=True)
    if dao.check_article_exists(article_id, db_connect_cur):
    print("article_id already exists", article_id)
    return article_id

    # url format
    article_url = conf.wechat_article_url.format(article_id)
    chrome.get(article_url)
    chrome.find_element_by_id("publish_time").click()
    content_html = BeautifulSoup(chrome.page_source, 'html.parser')
    # print(content_html)
    content_div = content_html.find(name="div", attrs={"class":"rich_media_content "})
    try:
    thumbnail = None
    for item in content_div.children:
    if isinstance(item, bs4.element.Tag):
    if item.name == "p":
    thumbnail_tmp = handle_p(item, article_id, chrome, db_connect_cur)
    if thumbnail is None:
    thumbnail = thumbnail_tmp
    elif item.name == "section":
    for item_p in item.select("p"):
    thumbnail_tmp = handle_p(item_p, article_id, chrome, db_connect_cur)
    if thumbnail is None:
    thumbnail = thumbnail_tmp
    else:
    tag_str = ''
    for tag in item.children:
    tag_str += str(tag)
    print(tag_str)
    dao.save_article_p(article_id=article_id, type='text',content=tag_str, db_connect_cur=db_connect_cur)
    else:
    print(str(item))
    dao.save_article_p(article_id=article_id, type='text',content=str(item), db_connect_cur=db_connect_cur)

    title = content_html.select(".rich_media_title")[0].getText()
    wechat = content_html.select(".rich_media_meta_nickname a")[0].getText()
    content_html.select(".rich_media_meta_list")[0].find(name="em", attrs={"id":"publish_time"})
    article = {"id":article_id, "title":str(title).strip(), "url":"", "createdAt": "", "wechat":str(wechat).strip() }
    dao.save_wechat_article(article, thumbnail, db_connect_cur)
    except BaseException as e:
    print("except :" , e)
    conf.db.rollback()
    return 0
    else:
    conf.db.commit()

    return article_id
  • 内存保存Dao

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    import time
    from common import config as conf

    # 保存爬取内容item
    def save_article_p(article_id, type, content, db_connect_cur):
    sql = "insert into st_cms_article_tmp_p_tmp (article_id, type, content) " \
    "values (%s, %s, %s)"
    val = (article_id, type, content)
    db_connect_cur.execute(sql, val)

    def save_huoban_article(article = {}, img_key = '', db_connect_cur = None):
    now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
    sql = "insert into st_cms_article_tmp (id, title, url, category_id, categories, thumbnail, description, createdAt, " \
    "updatedAt, viewCount, shareCount, creatorId, creatorName, creatorAvatar, batch, spiderTime, source, sourcePostTime, weChat) " \
    "values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s , %s, %s , %s , %s , %s , %s)"
    val = (article["id"], article["title"], article["url"], conf.categories[article["categories"]], article["categories"],
    img_key, article["description"], article["createdAt"], article["updatedAt"], article["viewCount"], article["shareCount"],
    article["creatorId"], article["creatorName"], article["creatorAvatar"], conf.batch, now, 'HUOBAN', article["createdAt"], '')
    print("article[creatorName] ",article)
    db_connect_cur.execute(sql, val)

    def save_wechat_article(article = {}, img_key = '', db_connect_cur = None):
    now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
    sql = "insert into st_cms_article_tmp (id, title, url, category_id, categories, thumbnail, description, createdAt, " \
    "updatedAt, viewCount, shareCount, creatorId, creatorName, creatorAvatar, batch, spiderTime, source, sourcePostTime, weChat) " \
    "values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s , %s , %s , %s , %s , %s)"
    val = (article["id"], article["title"], '', '0', '', img_key, '', '', '', '0', '0', '', '', '', "0", now, 'WECHAT', '', article["wechat"])
    print("article[creatorName] ",article)
    db_connect_cur.execute(sql, val)

    def check_article_exists(article_id, db_connect_cur):
    sql = "select * from st_cms_article_tmp where id = '{}'"
    db_connect_cur.execute(sql.format(article_id))
    # 如果已爬取,则跳过
    if db_connect_cur.rowcount > 0:
    return True
    else:
    return False


    def save_checkin(video, content, ref_id, db_connect_cur = None):
    now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
    sql = "insert into st_cms_checkin (video, content, ref_id, collect_time) " \
    "values (%s, %s, %s, %s)"
    val = (video, content, ref_id, now)
    db_connect_cur.execute(sql, val)

    def check_checkin_exists(ref_id, db_connect_cur):
    sql = "select * from st_cms_checkin where ref_id = '{}'"
    db_connect_cur.execute(sql.format(ref_id))
    # 如果已爬取,则跳过
    if db_connect_cur.rowcount > 0:
    return True
    else:
    return False

总结

  • 遇到的问题:最开始使用的是urllib.request.Request方式爬取网页内容,但是遇到iframe,包括一些需要触发点击事件才能显示文本内容,使用urllib类库包根本没法解决,后面才使用新的Selenium模拟浏览器访问即可解决以上问题。主要包含chrome.find_element_by_id(“publish_time”).click()点击事件,包括iframe的chrome.switch_to.frame(item_frame)。
  • 针对这种简单内容的爬虫,主要是就document处理,分析文章的内容结构。针对性的爬取所需要的内容即可,但是如果站点结构发生,爬虫程序得做对应的修改。
分享到 评论