爬虫-多线程爬虫

线程回顾

引入

多任务,多个任务同时进行,如何解决该问题?(2种方式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time


def sing():
for x in range(1, 6):
print('我在唱什么')
time.sleep(1)


def dance():
for x in range(1, 6):
print('我在跳hiphop舞')
time.sleep(1)


def main():
sing()
dance()


if __name__ == '__main__':
main()

  1. 多进程:电脑上同时打开sublime、录屏、vnc服务器
  2. 多线程:
    • 在word文档中同时编辑、检查(多线程)
    • 在qq中同时语音、视频、发送消息(多线程)

创建线程Thread(2种方式)

  1. 面向过程

    1
    2
    3
    4
    5
    6
    7
    t = threading.Thread(target=xxx, name=xxx, args=(xx, xx))
    target: 线程启动之后要执行的函数
    name: 线程的名字
    获取线程名字: threading.current_thread().name
    args: 主线程向子线程传递参数
    t.start(): 启动线程
    t.join(): 让主线程等待子线程结束
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import time
    import threading


    def sing(a):
    print(f'线程为:{threading.current_thread().name};接收过来的参数为:{a}')
    for x in range(1, 6):
    print('我在唱舞娘')
    time.sleep(1)


    def dance(b):
    print(f'线程为:{threading.current_thread().name};接收过来的参数为:{b}')
    for x in range(1, 6):
    print('我在跳钢管舞')
    time.sleep(1)


    # 一个主线程、两个子线程(唱歌线程、跳舞线程)
    def main():
    a, b = '孙悟空', '猪八戒'
    # 创建唱歌线程
    tsing = threading.Thread(target=sing, name='唱歌', args=(a,))
    # 创建跳舞线程
    tdance = threading.Thread(target=dance, name='跳舞', args=(b,))
    # 启动线程
    tsing.start()
    tdance.start()
    # 让主线程等待子线程结束之后在结束
    tsing.join()
    tdance.join()
    # 这里是主线程在运行
    print(f'这里是主线程:{threading.current_thread().name}')


    if __name__ == '__main__':
    main()

  2. 面向对象

    定义一个类,继承自threading.Thread,重写一个方法run方法,需要线程名字、传递参数,重写构造方法,在重写构造方法的时候,一定要注意手动调用父类的构造方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    import time
    import threading


    # 写一个类,继承自threading.Thread
    class SingThread(threading.Thread):
    def __init__(self, name, a):
    super().__init__()
    self.name = name
    self.a = a

    def run(self):
    print(f'线程的名字是:{self.name},接收过来的参数为:{self.a}')
    for x in range(1, 6):
    print('我在唱七里香')
    time.sleep(1)


    class DanceThread(threading.Thread):
    def __init__(self, name, b):
    super().__init__()
    self.name = name
    self.b = b

    def run(self):
    print(f'线程的名字是:{self.name},接收过来的参数为:{self.b}')
    for x in range(1, 6):
    print('我在跳广场舞')
    time.sleep(1)


    def main():
    a, b = '孙悟空', '猪八戒'
    # 创建线程
    tsing = SingThread('唱歌', a)
    tdance = DanceThread('跳舞', b)

    # 启动线程
    tsing.start()
    tdance.start()

    # 让主线程等待子线程结束后再结束
    tsing.join()
    tdance.join()

    print('主线程和子线程全部结束!')


    if __name__ == '__main__':
    main()

线程同步

  • 线程之间共享全局变量,很容易发生数据的紊乱问题,这个时候要使用线程锁;抢,谁抢到,谁先上锁之后,谁就先使用
  • 创建锁:suo = threading.Lock()
  • 上锁:suo.acquire()
  • 释放锁:suo.release()

队列(queue)

  • 下载线程
  • 解析线程,通过队列进行交互
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # 创建对列
    q = Queue(5)

    # 存储数据
    q.put('xxx') 如果队列已满,程序卡在这里等待
    q.put(xxx, False) 如果队列已满,程序直接报错
    q.put(xxx, True, 3) 如果队列已满,程序等待3s再报错

    # 取数据,先进先出
    q.get() 如果队列为空,程序卡在这里等待
    q.get(False) 如果队列为空,程序直接报错
    q.get(True, 3) 如果队列为空,程序等待3s报错

    q.empty() 判断队列是否为空
    q.full() 判断队列是否已满
    q.qsize() 获取队列长度
  • 示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    from queue import Queue


    def main():
    # 创建对列
    q = Queue(5)
    # 判断队列是否为空
    print(q.empty()) # True

    # 存储数据
    q.put('科比')
    q.put('勒布朗')
    q.put('JR')
    q.put('汤普森')
    # 获取队列长度
    print(q.qsize()) # 4
    q.put('love')
    # 判断队列是否已满
    print(q.full()) # True
    # 如果队列已满,程序等待3s再报错
    # q.put('乔治希尔', True, 3) # queue.Full
    print(q)

    # 取数据,先进先出
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    # 如果队列为空,程序等待3s报错
    # print(q.get(True, 3)) # queue.Empty


    if __name__ == '__main__':
    main()

多线程爬虫

分析

  • 两类线程:下载(3)、解析(3)
  • 内容队列:下载线程往队列中put数据,解析线程从队列get数据
  • url队列:下载线程从url队列get数据
  • 写数据:上锁

图示

示例:爬取贱图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import time
import json
import requests
import threading
from lxml import etree
from queue import Queue

# 用来存放采集线程
g_crawl_list = []
# 用来存放解析线程
g_parse_list = []


class CrawlThread(threading.Thread):
def __init__(self, name, page_queue, data_queue):
super().__init__()
self.name = name
self.page_queue = page_queue
self.data_queue = data_queue
self.url = 'http://www.ifanjian.net/jiantu-{}'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36'
}

def run(self):
print(f'{self.name}------线程启动')
while 1:
# 判断采集线程何时退出
if self.page_queue.empty():
break
# 从队列中取出页码
page = self.page_queue.get()
# 拼接url,发送请求
url = self.url.format(page)
r = requests.get(url, headers=self.headers)
# 将响应内容存放在data_queue中
self.data_queue.put(r.text)
print(f'{self.name}======线程结束')


class ParserThread(threading.Thread):
def __init__(self, name, page_queue, data_queue, fp, lock):
super().__init__()
self.name = name
self.page_queue = page_queue
self.data_queue = data_queue
self.fp = fp
self.lock = lock

def parse_content(self, data):
tree = etree.HTML(data)
# 先查找所有的li,在从li里边找自己的标题和url
li_list = tree.xpath('//ul[@class="cont-list"]/li')
items = []
for oli in li_list:
print('*' * 60)
# 获取标题
title = oli.xpath('./h2/a/text()')[0]
print(title)
# 获取图片url,懒加载
try:
image_url = oli.xpath('./div[@class="cont-list-main"]/p[2]/img/@data-src')[0]
except IndexError as e:
print('#' * 60)
image_url = oli.xpath('./div[@class="cont-list-main"]/p[2]/img/@src')[0]
print(image_url)
else:
print(image_url)
item = {
'标题': title,
'链接': image_url,
}
items.append(item)
# 写到文件中
self.lock.acquire()
self.fp.write(json.dumps(items, ensure_ascii=False) + '\n')
self.lock.release()

def run(self):
print(f'{self.name}------线程启动')
while 1:
# 判断解析线程何时退出
if self.page_queue.empty():
time.sleep(5)
if self.data_queue.empty():
break
# 从data_queue中取出一页数据
data = self.data_queue.get()
# print(data)
# 解析内容
self.parse_content(data)
print(f'{self.name}======线程结束')


def create_queue():
# 创建页码对列
page_queue = Queue()
for page in range(1, 51):
page_queue.put(page)

# 创建内容对列
data_queue = Queue()
return page_queue, data_queue


# 创建采集线程
def create_crawl_thread(page_queue, data_queue):
crawl_name = ['采集线程1号', '采集线程2号', '采集线程3号']
for name in crawl_name:
# 创建一个采集线程
tcrawl = CrawlThread(name, page_queue, data_queue)
# 保存到列表中
g_crawl_list.append(tcrawl)


# 创建解析线程
def create_parse_thread(page_queue, data_queue, fp, lock):
parse_name = ['解析线程1号', '解析线程2号', '解析线程3号']
for name in parse_name:
# 创建一个解析线程
tparse = ParserThread(name, page_queue, data_queue, fp, lock)
# 保存到列表中
g_parse_list.append(tparse)


def main():
# 创建对列函数
page_queue, data_queue = create_queue()
# 打开文件
fp = open('jian.json', 'a', encoding='utf8')
# 创建锁
lock = threading.Lock()
# 创建采集线程
create_crawl_thread(page_queue, data_queue)
# 创建解析线程
create_parse_thread(page_queue, data_queue, fp, lock)
# 启动所有采集线程和解析线程
for tcrawl, tparse in zip(g_crawl_list, g_parse_list):
tcrawl.start()
tparse.start()
# 让主线程等待子线程结束再结束
for tcrawl, tparse in zip(g_crawl_list, g_parse_list):
tcrawl.join()
tparse.join()
# 关闭文件
fp.close()
print('主线程和子线程全部结束')


if __name__ == '__main__':
main()