實(shí)戰(zhàn)項(xiàng)目:簡單分布式爬蟲
本章繼續(xù)實(shí)戰(zhàn)項(xiàng)目,介紹如何打造分布式爬蟲,這對(duì)初學(xué)者來說是一個(gè)不小的挑戰(zhàn),也是一次有意義的嘗試。這次打造的分布式爬蟲采用比較簡單的主從模式,完全手工打造,不使用成熟框架,基本上涵蓋了前六章的主要知識(shí)點(diǎn),其中涉及的分布式知識(shí)點(diǎn)是分布式進(jìn)程和進(jìn)程間通信的內(nèi)容,算是對(duì)Python爬蟲基礎(chǔ)篇的總結(jié)。
目前,大型的爬蟲系統(tǒng)都采取分布式爬取結(jié)構(gòu),通過此次實(shí)戰(zhàn)項(xiàng)目,大家會(huì)對(duì)分布式爬蟲有一個(gè)比較清晰的了解,為之后系統(tǒng)地學(xué)習(xí)分布式爬蟲打下基礎(chǔ)。實(shí)戰(zhàn)目標(biāo):爬取2000個(gè)百度百科網(wǎng)絡(luò)爬蟲詞條以及相關(guān)詞條的標(biāo)題、摘要和鏈接等信息,采用分布式結(jié)構(gòu)改寫的基礎(chǔ)爬蟲,使其功能更加強(qiáng)大。
簡單分布式爬蟲結(jié)構(gòu)
本次分布式爬蟲采用主從模式。主從模式是指由一臺(tái)主機(jī)作為控制節(jié)點(diǎn),負(fù)責(zé)管理所有運(yùn)行網(wǎng)絡(luò)爬蟲的主機(jī),爬蟲只需要從控制節(jié)點(diǎn)那里接收任務(wù),并把新生成任務(wù)提交給控制節(jié)點(diǎn)就可以了,在這個(gè)過程中不必與其他爬蟲通信,這種方式實(shí)現(xiàn)簡單、利于管理。而控制節(jié)點(diǎn)則需要與所有爬蟲進(jìn)行通信,因此可以看到主從模式是有缺陷的,控制節(jié)點(diǎn)會(huì)成為整個(gè)系統(tǒng)的瓶頸,容易導(dǎo)致整個(gè)分布式網(wǎng)絡(luò)爬蟲系統(tǒng)性能下降。
此次使用三臺(tái)主機(jī)進(jìn)行分布式爬取,一臺(tái)主機(jī)作為控制節(jié)點(diǎn),另外兩臺(tái)主機(jī)作為爬蟲節(jié)點(diǎn)。爬蟲結(jié)構(gòu)如圖7-1所示。
控制節(jié)點(diǎn)
控制節(jié)點(diǎn)(ControlNode)主要分為URL管理器、數(shù)據(jù)存儲(chǔ)器和控制調(diào)度器??刂普{(diào)度器通過三個(gè)進(jìn)程來協(xié)調(diào)URL管理器和數(shù)據(jù)存儲(chǔ)器的工作:一個(gè)是URL管理進(jìn)程,負(fù)責(zé)URL的管理和將URL傳遞給爬蟲節(jié)點(diǎn);一個(gè)是數(shù)據(jù)提取進(jìn)程,負(fù)責(zé)讀取爬蟲節(jié)點(diǎn)返回的數(shù)據(jù),將返回?cái)?shù)據(jù)中的URL交給URL管理進(jìn)程,將標(biāo)題和摘要等數(shù)據(jù)交給數(shù)據(jù)存儲(chǔ)進(jìn)程;最后一個(gè)是數(shù)據(jù)存儲(chǔ)進(jìn)程,負(fù)責(zé)將數(shù)據(jù)提取進(jìn)程中提交的數(shù)據(jù)進(jìn)行本地存儲(chǔ)。執(zhí)行流程如圖7-2所示。
圖7-1 主從爬蟲結(jié)構(gòu)圖7-2 控制節(jié)點(diǎn)執(zhí)行流程
URL管理器
參考(Python)基礎(chǔ)爬蟲架構(gòu)及運(yùn)行流程,我們對(duì)URL管理器做了一些優(yōu)化。我們采用set內(nèi)存去重的方式,如果直接存儲(chǔ)大量的URL鏈接,尤其是URL鏈接很長的時(shí)候,很容易造成內(nèi)存溢出,所以我們將爬取過的URL進(jìn)行MD5處理。字符串經(jīng)過MD5處理后的信息摘要長度為128位,將生成的MD5摘要存儲(chǔ)到set后,可以減少好幾倍的內(nèi)存消耗,不過Python中的MD5算法生成的是256位,取中間的128位即可。我們同時(shí)添加了save_progress和load_progress方法進(jìn)行序列化的操作,將未爬取URL集合和已爬取的URL集合序列化到本地,保存當(dāng)前的進(jìn)度,以便下次恢復(fù)狀態(tài)。URL管理器URLManager.py代碼如下:
# coding:utf-8 import cPickle
import hashlib
class UrlManager(object):
def __init__(self):
self.new_urls = self.load_progress('new_urls.txt')# 未爬取URL集合self.old_urls = self.load_progress('old_urls.txt')# 已爬取URL集合
def has_new_url(self):
'''
判斷是否有未爬取的URL :return:
'''
return self.new_url_size()!=0 def get_new_url(self):
'''
獲取一個(gè)未爬取的URL :return:
'''
new_url = self.new_urls.pop()
m = hashlib.md5()
m.update(new_url)
self.old_urls.add(m.hexdigest()[8:-8])
return new_url
def add_new_url(self,url):
'''
'''
return len(self.new_urls)
def old_url_size(self):
'''
獲取已經(jīng)爬取URL集合的大小:return:
'''
return len(self.old_urls)
def save_progress(self,path,data):
'''
保存進(jìn)度:param path:文件路徑
:param data:數(shù)據(jù)
:return:
'''
with open(path, 'wb') as f:
cPickle.dump(data, f)
def load_progress(self,path):
'''
從本地文件加載進(jìn)度:param path:文件路徑
:return:返回set集合
'''
print '[+] 從文件加載進(jìn)度: %s' % pathtry:
with open(path, 'rb') as f:
tmp = cPickle.load(f)
return tmp
except:
print '[!] 無進(jìn)度文件, 創(chuàng)建: %s' % path
return set()
數(shù)據(jù)存儲(chǔ)器
數(shù)據(jù)存儲(chǔ)器的內(nèi)容基本上和第6章的一樣,不過生成的文件按照當(dāng)前時(shí)間進(jìn)行命名,以避免重復(fù),同時(shí)對(duì)文件進(jìn)行緩存寫入。代碼如下:
# coding:utf-8 import codecs
import time
class DataOutput(object):
def __init__(self):
self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%
S", time. localtime()) )
self.output_head(self.filepath)
self.datas=[] def store_data(self,data):
if data is None:
returnself.datas.append(data)
if len(self.datas)>10:
self.output_html(self.filepath)
def output_head(self,path):
'''
將HTML頭寫進(jìn)去:return:
'''
fout=codecs.open(path,'w',encoding='utf-8')
fout.write("<html>")
fout.write("<body>")
fout.write("<table>")
fout.close()
def output_html(self,path):
'''
將數(shù)據(jù)寫入HTML文件中:param path: 文件路徑
:return:
'''
fout=codecs.open(path,'a',encoding='utf-8')
for data in self.datas:
fout.write("<tr>")
fout.write("<td>%s</td>"%data['url'])
fout.write("<td>%s</td>"%data['title'])fout.write("<td>%s</td>"%data['summary'])
fout.write("</tr>")
self.datas.remove(data)
fout.close()
def ouput_end(self,path):
'''
輸出HTML結(jié)束:param path: 文件存儲(chǔ)路徑
:return:
'''
fout=codecs.open(path,'a',encoding='utf-8')
fout.write("</table>")
fout.write("</body>")
fout.write("</html>")
fout.close()
控制調(diào)度器
控制調(diào)度器主要是產(chǎn)生并啟動(dòng)URL管理進(jìn)程、數(shù)據(jù)提取進(jìn)程和數(shù)據(jù)存儲(chǔ)進(jìn)程,同時(shí)維護(hù)4個(gè)隊(duì)列保持進(jìn)程間的通信,分別為url_queue、result_queue、conn_q、store_q。4個(gè)隊(duì)列說明如下:
·url_q隊(duì)列是URL管理進(jìn)程將URL傳遞給爬蟲節(jié)點(diǎn)的通道。
·result_q隊(duì)列是爬蟲節(jié)點(diǎn)將數(shù)據(jù)返回給數(shù)據(jù)提取進(jìn)程的通道。
·conn_q隊(duì)列是數(shù)據(jù)提取進(jìn)程將新的URL數(shù)據(jù)提交給URL管理進(jìn)程的通道。
·store_q隊(duì)列是數(shù)據(jù)提取進(jìn)程將獲取到的數(shù)據(jù)交給數(shù)據(jù)存儲(chǔ)進(jìn)程的通道。
因?yàn)橐凸ぷ鞴?jié)點(diǎn)進(jìn)行通信,所以分布式進(jìn)程必不可少。創(chuàng)建一個(gè)分布式管理器,定義為start_manager方法。方法代碼如下:
def start_Manager(self,url_q,result_q):
'''
創(chuàng)建一個(gè)分布式管理器:param url_q: url隊(duì)列
:param result_q: 結(jié)果隊(duì)列
:return:
'''
# 把創(chuàng)建的兩個(gè)隊(duì)列注冊(cè)在網(wǎng)絡(luò)上,利用register方法,callable參數(shù)關(guān)聯(lián)了Queue對(duì)象,
# 將Queue對(duì)象在網(wǎng)絡(luò)中暴露
BaseManager.register('get_task_queue',callable=lambda:url_q)
BaseManager.register('get_result_queue',callable=lambda:result_q)
# 綁定端口8001,設(shè)置驗(yàn)證口令“baike”。這個(gè)相當(dāng)于對(duì)象的初始化manager=BaseManager(address=('',8001),authkey='baike')
# 返回manager對(duì)象return manager
URL管理進(jìn)程將從conn_q隊(duì)列獲取到的新URL提交給URL管理器,經(jīng)過去重之后,取出URL放入url_queue隊(duì)列中傳遞給爬蟲節(jié)點(diǎn),代碼如下:
def url_manager_proc(self,url_q,conn_q,root_url):
url_manager = UrlManager()
url_manager.add_new_url(root_url)
while True:
while(url_manager.has_new_url()):
# 從URL管理器獲取新的URL new_url = url_manager.get_new_url()
# 將新的URL發(fā)給工作節(jié)點(diǎn)url_q.put(new_url)
print 'old_url=',url_manager.old_url_size()
# 加一個(gè)判斷條件,當(dāng)爬取2000個(gè)鏈接后就關(guān)閉,并保存進(jìn)度if(url_manager.old_url_size()>2000):
# 通知爬行節(jié)點(diǎn)工作結(jié)束url_q.put('end')
print '控制節(jié)點(diǎn)發(fā)起結(jié)束通知!'
# 關(guān)閉管理節(jié)點(diǎn),同時(shí)存儲(chǔ)set狀態(tài)url_manager.save_progress('new_urls.txt',url_manager.new_urls)
url_manager.save_progress('old_urls.txt',url_manager.old_urls)
return
# 將從result_solve_proc獲取到的URL添加到URL管理器try:
if not conn_q.empty():
urls = conn_q.get()url_manager.add_new_urls(urls)
except BaseException,e:
time.sleep(0.1)# 延時(shí)休息
數(shù)據(jù)提取進(jìn)程從result_queue隊(duì)列讀取返回的數(shù)據(jù),并將數(shù)據(jù)中的URL添加到conn_q隊(duì)列交給URL管理進(jìn)程,將數(shù)據(jù)中的文章標(biāo)題和摘要添加到store_q隊(duì)列交給數(shù)據(jù)存儲(chǔ)進(jìn)程。代碼如下:
def result_solve_proc(self,result_q,conn_q,store_q):
while(True):
try:
if not result_q.empty():
content = result_q.get(True)
if content['new_urls']=='end':
# 結(jié)果分析進(jìn)程接收通知然后結(jié)束print '結(jié)果分析進(jìn)程接收通知然后結(jié)束!'
store_q.put('end')
return
conn_q.put(content['new_urls'])# url 為set 類型store_q.put(content['data'])# 解析出來的數(shù)據(jù)為dict 類型
else:
time.sleep(0.1)# 延時(shí)休息except BaseException,e:
time.sleep(0.1)# 延時(shí)休息
數(shù)據(jù)存儲(chǔ)進(jìn)程從store_q隊(duì)列中讀取數(shù)據(jù),并調(diào)用數(shù)據(jù)存儲(chǔ)器進(jìn)行數(shù)據(jù)存儲(chǔ)。代碼如下:
def store_proc(self,store_q):
output = DataOutput()
while True:
if not store_q.empty():
data = store_q.get()
if data=='end':
print '存儲(chǔ)進(jìn)程接受通知然后結(jié)束!'
output.ouput_end(output.filepath)
return
output.store_data(data)
else:
time.sleep(0.1)
最后啟動(dòng)分布式管理器、URL管理進(jìn)程、數(shù)據(jù)提取進(jìn)程和數(shù)據(jù)存儲(chǔ)進(jìn)程,并初始化4個(gè)隊(duì)列。代碼如下:
if __name__=='__main__':
# 初始化4個(gè)隊(duì)列url_q = Queue()
result_q = Queue()
store_q = Queue()
conn_q = Queue()
# 創(chuàng)建分布式管理器node = NodeManager()
manager = node.start_Manager(url_q,result_q)
# 創(chuàng)建URL管理進(jìn)程、 數(shù)據(jù)提取進(jìn)程和數(shù)據(jù)存儲(chǔ)進(jìn)程url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,
conn_q, 'http://baike.baidu.com/view/284853.htm',))
result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, conn_q,store_q,))
store_proc = Process(target=node.store_proc, args=(store_q,))
# 啟動(dòng)3個(gè)進(jìn)程和分布式管理器url_manager_proc.start()
result_solve_proc.start()
store_proc.start()
manager.get_server().serve_forever()
爬蟲節(jié)點(diǎn)
爬蟲節(jié)點(diǎn)(SpiderNode)相對(duì)簡單,主要包含HTML下載器、HTML解析器和爬蟲調(diào)度器。執(zhí)行流程如下:
·爬蟲調(diào)度器從控制節(jié)點(diǎn)中的url_q隊(duì)列讀取URL。
·爬蟲調(diào)度器調(diào)用HTML下載器、HTML解析器獲取網(wǎng)頁中新的URL和標(biāo)題摘要。
·爬蟲調(diào)度器將新的URL和標(biāo)題摘要傳入result_q隊(duì)列交給控制節(jié)點(diǎn)。
HTML下載器
HTML下載器的代碼和第6章的一致,只要注意網(wǎng)頁編碼即可。
代碼如下:
# coding:utf-8 import requests
class HtmlDownloader(object):
def download(self,url):
if url is None:
return None
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers={'User-Agent':user_agent} r = requests.get(url,headers=headers)
if r.status_code==200:
r.encoding='utf-8'
return r.text
return None
HTML解析器
HTML解析器的代碼和第6章的一致,詳細(xì)的網(wǎng)頁分析過程可以回顧第6章。代碼如下:
# coding:utf-8 import re
import urlparse
from bs4 import BeautifulSoup
class HtmlParser(object):
def parser(self,page_url,html_cont):
'''
用于解析網(wǎng)頁內(nèi)容,抽取URL和數(shù)據(jù):param page_url: 下載頁面的URL :param html_cont: 下載的網(wǎng)頁內(nèi)容
:return:返回URL和數(shù)據(jù)
'''
if page_url is None or html_cont is None:
return
soup = BeautifulSoup(html_cont,'html.parser',from_encoding='utf-8')
new_urls = self._get_new_urls(page_url,soup)
new_data = self._get_new_data(page_url,soup)
return new_urls,new_data
def _get_new_urls(self,page_url,soup):
'''
抽取新的URL集合:param page_url: 下載頁面的URL :param soup:soup
:return: 返回新的URL集合'''
new_urls = set()
# 抽取符合要求的a標(biāo)記links = soup.find_all('a',href=re.compile(r'/view/\d+\.htm'))
for link in links:
# 提取href屬性new_url = link['href'] # 拼接成完整網(wǎng)址
new_full_url = urlparse.urljoin(page_url,new_url)
new_urls.add(new_full_url)
return new_urls
def _get_new_data(self,page_url,soup):
'''
抽取有效數(shù)據(jù):param page_url:下載頁面的URL :param soup:
:return:返回有效數(shù)據(jù)'''
data={} data['url']=page_url
title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
data['title']=title.get_text()
summary = soup.find('div',class_='lemma-summary')
# 獲取tag中包含的所有文本內(nèi)容,包括子孫tag中的內(nèi)容,并將結(jié)果作為Unicode字符串返回
data['summary']=summary.get_text()
return data
爬蟲調(diào)度器
爬蟲調(diào)度器需要用到分布式進(jìn)程中工作進(jìn)程的代碼,具體內(nèi)容可以參考第1章的分布式進(jìn)程章節(jié)。爬蟲調(diào)度器需要先連接上控制節(jié)點(diǎn),然后從url_q隊(duì)列中獲取URL,下載并解析網(wǎng)頁,接著將獲取的數(shù)據(jù)交給result_q隊(duì)列并返回給控制節(jié)點(diǎn),代碼如下:
class SpiderWork(object):
def __init__(self):
# 初始化分布式進(jìn)程中工作節(jié)點(diǎn)的連接工作# 實(shí)現(xiàn)第一步:使用BaseManager注冊(cè)用于獲取Queue的方法名稱
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# 實(shí)現(xiàn)第二步:連接到服務(wù)器server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 注意保持端口和驗(yàn)證口令與服務(wù)進(jìn)程設(shè)置的完全一致self.m = BaseManager(address=(server_addr, 8001), authkey='baike')
# 從網(wǎng)絡(luò)連接self.m.connect()
# 實(shí)現(xiàn)第三步:獲取Queue的對(duì)象self.task = self.m.get_task_queue()
self.result = self.m.get_result_queue()
# 初始化網(wǎng)頁下載器和解析器self.downloader = HtmlDownloader()
self.parser = HtmlParser()
print 'init finish'def crawl(self):
while(True):
try:
if not self.task.empty():
url = self.task.get()
if url =='end':
print '控制節(jié)點(diǎn)通知爬蟲節(jié)點(diǎn)停止工作...'
# 接著通知其他節(jié)點(diǎn)停止工作self.result.put({'new_urls':'end','data':'end'})
return
print '爬蟲節(jié)點(diǎn)正在解析:%s'%url.encode('utf-8')
content = self.downloader.download(url)
new_urls,data = self.parser.parser(url,content)
self.result.put({"new_urls":new_urls,"data":data})
except EOFError,e:
print "連接工作節(jié)點(diǎn)失敗"
return
except Exception,e:
print e
print 'Crawl fali '
if __name__=="__main__":
spider = SpiderWork()
spider.crawl()
在爬蟲調(diào)度器中設(shè)置了一個(gè)本地IP 127.0.0.1,大家可以在一臺(tái)機(jī)器上測(cè)試代碼的正確性。當(dāng)然也可以使用三臺(tái)VPS服務(wù)器,兩臺(tái)運(yùn)行爬蟲節(jié)點(diǎn)程序,將IP改為控制節(jié)點(diǎn)主機(jī)的公網(wǎng)IP,一臺(tái)運(yùn)行控制節(jié)點(diǎn)程序,進(jìn)行分布式爬取,這樣更貼近真實(shí)的爬取環(huán)境。圖7-3為最終爬取的數(shù)據(jù),圖7-4為new_urls.txt的內(nèi)容,圖7-5為old_urls.txt的內(nèi)容,大家可以進(jìn)行對(duì)比測(cè)試,這個(gè)簡單的分布式爬蟲還有很大的發(fā)揮空間,希望大家發(fā)揮自己的聰明才智進(jìn)一步完善。
圖7-3 最終爬取的數(shù)據(jù)圖7-4 new_urls.txt
圖7-5 old_urls.txt
小結(jié)
本章講解了一個(gè)簡單的分布式爬蟲結(jié)構(gòu),主要目的是幫助大家對(duì)Python爬蟲基礎(chǔ)篇的知識(shí)進(jìn)行總結(jié)和強(qiáng)化,開拓思維,同時(shí)也讓大家知道分布式爬蟲并不是高不可攀。不過當(dāng)你親手打造一個(gè)分布式爬蟲后,就會(huì)知道分布式爬蟲的難點(diǎn)在于節(jié)點(diǎn)的調(diào)度,什么樣的結(jié)構(gòu)能讓各個(gè)節(jié)點(diǎn)穩(wěn)定高效地運(yùn)作才是分布式爬蟲要考慮的核心內(nèi)容。到本章為止,Python爬蟲基礎(chǔ)篇已經(jīng)結(jié)束,這個(gè)時(shí)候大家基本上可以編寫簡單的爬蟲,爬取一些靜態(tài)網(wǎng)站的內(nèi)容,但是Python爬蟲開發(fā)不僅如此,大家接著往下學(xué)習(xí)吧。
如對(duì)本文有疑問,請(qǐng)?zhí)峤坏浇涣髡搲?,廣大熱心網(wǎng)友會(huì)為你解答!! 點(diǎn)擊進(jìn)入論壇