Python爬虫多线程



Python爬虫多线程

Python 多线程详细教程

Python多线程

爬虫如果要高效,肯定会用到多线程,多线程和单线程相比相当于把任务分解到多个人同时执行,如今几乎大多数的电脑和服务器CPU都是多核的,所以多线程解决了很大的效率问题。
但是和Java不同的是,Python代码在解释器执行中,同一时刻只能有一个主线程执行,像在单CPU的电脑上运行多个进程那样。内存还是可以存放多个程序的。这是由于Python的访问由全局解释器锁(GIL)控制的,这个锁保证同时只有一个线程在运行。在多线程环境中,Python虚拟机执行流程如下:

1、设置GIL。
2、切换到一个线程去执行。
3、运行。
4、把线程设置为睡眠状态。
5、解锁GIL。
6、再次重复以上步骤。

所以从某种意义上说Python的多线程很鸡肋,其实就是因为Python多线程用到了全局解释器锁(GIL锁)。不管如何我们下面来讲解下,Python多线程在爬虫中的应用。
推荐阅读:
Python多线程教程

线程安全的队列 Queue

队列是先进先出,在Python的标准库中提供了一个线程安全的队列可用于多线程的先进先出,可以用来在生产者消费者线程之间安全地传递消息或其他数据。Queue 的大小(其中包含的元素个数)可能要受限,以限制内存使用或处理。
在Python3中要引入Queue和Python2中引入Queue是不同,引入方式如下:
 # Filename : example.py
# Copyright : 2020 By Lidihuo
# Author by : www.lidihuo.com
# Date : 2020-09-11
#python 2
import Queue
# python 3
from queue import Queue

因为是线程安全的,很自然就可以利用Queue来实现一个多线程爬虫咯,而Queue的一些常见操作如下:
 # Filename : example.py
# Copyright : 2020 By Lidihuo
# Author by : www.lidihuo.com
# Date : 2020-09-11
# 实例化一个队列,可以在指定队列大小
q = Queue.Queue()
q_50 = Queue.Queue(50) # 指定一个长度为50的队列
# 入队一个数据data
q.put(data)
# 出队并赋值给item
item = q.get()
# 判断队列是否为空,是否满
if q.empty():
    print('队列为空')
if q.full():
    print('队列满')

 # Filename : example.py
# Copyright : 2020 By Lidihuo
# Author by : www.lidihuo.com
# Date : 2020-08-21
from pyquery import PyQuery as pq
doc = pq(filename='example.html')
print doc.html()
print type(doc)
li = doc('li')
print type(li)
print li.text()

除了普通队列,标准库中还有优先队列和后进先出队列这两个队列,分别为LifoQueue和PriorityQueue,其引用方式与Queue类似。

三、基于多线程爬虫爬取糗事百科的段子

下面进入实战的一个代码,代码的理解也相对简单,相信经过这个代码,大家也可以自行写出一个多线程爬虫。
实现思路和整体流程

1、构造任务队列pageQueue ,存放所有要爬取的页面url。
2、用多线程爬虫从糗事百科上抓取糗事,然后将抓取的页面内容存放到data_queue中
3、用多线程程序对data_queue中的页面内容进行解析,分别提取 糗事的图片url,糗事的题目和糗事内容,然后存放到的json文件中(一个时间点只有一个线程可以写文件IO,注意到Python的多线程机制使用了GIL锁)

 # Filename : example.py
# Copyright : 2020 By Lidihuo
# Author by : www.lidihuo.com
# Date : 2020-09-11
import requests
from lxml import etree
from queue import Queue
import threading
import json
'''
Queue.qsize(队列名) #返回队列的大小
Queue.empty(队列名) # 队列为空返回true,否则为false
Queue.full(队列名) # 队列满返回true
Queue.get(队列名,值) # 出队
Queue.put(队列名,值) # 入队
FIFO 先进先出
'''
class Crawl_thread(threading.Thread):
    '''
    抓取线程类,注意需要继承线程类Thread
    '''
    def __init__(self,thread_id,queue):
        threading.Thread.__init__(self) # 需要对父类的构造函数进行初始化
        self.thread_id = thread_id
        self.queue = queue # 任务队列
    def run(self):
        '''
        线程在调用过程中就会调用对应的run方法
        :return:
        '''
        print('启动线程:',self.thread_id)
        self.crawl_spider()
        print('退出了该线程:',self.thread_id)
    def crawl_spider(self):
        while True:
            if self.queue.empty(): #如果队列为空,则跳出
                break
            else:
                page = self.queue.get()
                print('当前工作的线程为:',self.thread_id," 正在采集:",page)
                url = 'https://www.qiushibaike.com/Shr/page/{}/'.format(str(page))
                headers = {
                    'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3371.0 Safari/537.36'
                }
                try:
                    content = requests.get(url,headers=headers)
                    data_queue.put(content.text) # 将采集的结果放入data_queue中
                except Exception as e:
                    print('采集线程错误',e)
class Parser_thread(threading.Thread):
    '''
    解析网页的类,就是对采集结果进行解析,也是多线程方式进行解析
    '''
    def __init__(self,thread_id,queue,file):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.queue = queue
        self.file = file
    def run(self):
        print('启动线程:', self.thread_id)
        while not flag:
            try:
                item = self.queue.get(False) # get参数为false时队列为空,会抛出异常
                if not item:
                    pass
                self.parse_data(item)
                self.queue.task_done() # 每当发出一次get操作,就会提示是否堵塞
            except Exception as e:
                pass
        print('退出了该线程:', self.thread_id)
    def parse_data(self,item):
        '''
        解析网页内容的函数
        :param item:
        :return:
        '''
        try:
            html = etree.HTML(item)
            result = html.xpath('//div[contains(@id,"qiushi_tag")]') # 匹配所有段子内容
            for site in result:
                try:
                    img_url = site.xpath('.//img/@src')[0] # 糗事图片
                    title = site.xpath('.//h2')[0].text # 糗事题目
                    content = site.xpath('.//div[@class="content"]/span')[0].text.strip() # 糗事内容
                    response={
                        'img_url':img_url,
                        'title':title,
                        'content':content
                    } #构造json
                    json.dump(response,fp=self.file,ensure_ascii=False) # 存放json文件
                except Exception as e:
                    print('parse 2: ', e)
        except Exception as e:
            print('parse 1: ',e)
data_queue = Queue() # 存放解析数据的queue
flag = False
def main():
    output = open('qiushi.json','a',encoding='utf-8') # 将结果保存到一个json文件中
    pageQueue = Queue(50) # 任务队列,存放网页的队列
    for page in range(1,11):
        pageQueue.put(page) # 构造任务队列
    # 初始化采集线程
    crawl_threads = []
    crawl_name_list = ['crawl_1','crawl_2','crawl_3'] # 总共构造3个爬虫线程
    for thread_id in crawl_name_list:
        thread = Crawl_thread(thread_id,pageQueue) # 启动爬虫线程
        thread.start() # 启动线程
        crawl_threads.append(thread)
    # 初始化解析线程
    parse_thread = []
    parser_name_list = ['parse_1','parse_2','parse_3']
    for thread_id in parser_name_list: #
        thread = Parser_thread(thread_id,data_queue,output)
        thread.start() # 启动线程
        parse_thread.append(thread)
    # 等待队列情况,先进行网页的抓取
    while not pageQueue.empty(): # 判断是否为空
        pass # 不为空,则继续阻塞
    # 等待所有线程结束
    for t in crawl_threads:
        t.join()
    # 等待队列情况,对采集的页面队列中的页面进行解析,等待所有页面解析完成
    while not data_queue.empty():
        pass
    # 通知线程退出
    global flag
    flag = True
    for t in parse_thread:
        t.join() # 等待所有线程执行到此处再继续往下执行
    print('退出主线程')
    output.close()
if __name__ == '__main__':
    main()