1.4 進程和線程
在爬蟲開發(fā)中,進程和線程的概念是非常重要的。提高爬蟲的工作效率,打造分布式爬蟲,都離不開進程和線程的身影。本節(jié)將從多
進程、多線程、協(xié)程和分布式進程等四個方面,幫助大家回顧Python
語言中進程和線程中的常用操作,以便在接下來的爬蟲開發(fā)中靈活運用進程和線程。
1.4.1 多進程
Python實現(xiàn)多進程的方式主要有兩種,一種方法是使用os模塊中的fork方法,另一種方法是使用multiprocessing模塊。這兩種方法的區(qū)別在于前者僅適用于Unix/Linux操作系統(tǒng),對Windows不支持,后者則是跨平臺的實現(xiàn)方式。由于現(xiàn)在很多爬蟲程序都是運行在Unix/
Linux操作系統(tǒng)上,所以本節(jié)對兩種方式都進行講解。
1.使用os模塊中的fork方式實現(xiàn)多進程
Python的os模塊封裝了常見的系統(tǒng)調用,其中就有fork方法。
fork方法來自于Unix/Linux操作系統(tǒng)中提供的一個fork系統(tǒng)調用,這個方法非常特殊。普通的方法都是調用一次,返回一次,而fork方法是調用一次,返回兩次,原因在于操作系統(tǒng)將當前進程(父進程)復制出一份進程(子進程),這兩個進程幾乎完全相同,于是fork方法分別在父進程和子進程中返回。子進程中永遠返回0,父進程中返回的是子進程的ID。下面舉個例子,對Python使用fork方法創(chuàng)建進程進行講解。其中os模塊中的getpid方法用于獲取當前進程的ID,getppid方法用于獲取父進程的ID。代碼如下:
import os
if __name__ == '__main__':
print 'current Process (%s) start ...'%(os.getpid())pid = os.fork()
if pid < 0:
print 'error in fork'
elif pid == 0:
print 'I am child process(%s) and my parent process is (%s)',(os.getpid(), os.getppid())
else:
print 'I(%s) created a chlid process (%s).',(os.getpid(),pid)
運行結果如下:
current Process (3052) start ...
I(3052) created a chlid process (3053).
I am child process(3053) and my parent process is (3052)
2.使用multiprocessing模塊創(chuàng)建多進程
multiprocessing模塊提供了一個Process類來描述一個進程對象。
創(chuàng)建子進程時,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù),即可完成一個Process實例的創(chuàng)建,用start()方法啟動進程,用join()方法實現(xiàn)進程間的同步。下面通過一個例子來演示創(chuàng)建多進程的流程,代碼如下:
import os
from multiprocessing import Process# 子進程要執(zhí)行的代碼
def run_proc(name):
print 'Child process %s (%s) Running...' % (name, os.getpid())
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
for i in range(5):
p = Process(target=run_proc, args=(str(i),))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
運行結果如下:
Parent process 2392.
Process will start.
Process will start.
Process will start.
Process will start.
Process will start.
Child process 2 (10748) Running...
Child process 0 (5324) Running...
Child process 1 (3196) Running...
Child process 3 (4680) Running...
Child process 4 (10696) Running...
Process end.
以上介紹了創(chuàng)建進程的兩種方法,但是要啟動大量的子進程,使用進程池批量創(chuàng)建子進程的方式更加常見,因為當被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)生成多個進程,如果是上百個、上千個目標,手動去限制進程數(shù)量卻又太過繁瑣,這時候進程池Pool發(fā)揮作用的時候就到了。
3.multiprocessing模塊提供了一個Pool類來代表進程池對象
Pool可以提供指定數(shù)量的進程供用戶調用,默認大小是CPU的核數(shù)。當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結束,才會創(chuàng)建新的進程來處理它。下面通過一個例子來演示進程池的工作流程,代碼如下:
from multiprocessing import Pool
import os, time, random
def run_task(name):
print 'Task %s (pid = %s) is running...' % (name, os.getpid())
time.sleep(random.random() * 3)
print 'Task %s end.' % name
if __name__=='__main__':
print 'Current process %s.' % os.getpid()
p = Pool(processes=3)
for i in range(5):
p.apply_async(run_task, args=(i,))print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'
運行結果如下:
Current process 9176.
Waiting for all subprocesses done...
Task 0 (pid = 11012) is running...
Task 1 (pid = 12464) is running...
Task 2 (pid = 11260) is running...
Task 2 end.
Task 3 (pid = 11260) is running...
Task 0 end.
Task 4 (pid = 11012) is running...
Task 1 end.
Task 3 end.
Task 4 end.
All subprocesses done.
上述程序先創(chuàng)建了容量為3的進程池,依次向進程池中添加了5
個任務。從運行結果中可以看到雖然添加了5個任務,但是一開始只運行了3個,而且每次最多運行3個進程。當一個任務結束了,新的任務依次添加進來,任務執(zhí)行使用的進程依然是原來的進程,這一點
通過進程的pid就可以看出來。
注意 Pool對象調用join()方法會等待所有子進程執(zhí)行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續(xù)添加新的Process了。
4.進程間通信
假如創(chuàng)建了大量的進程,那進程間通信是必不可少的。Python提供了多種進程間通信的方式,例如Queue、Pipe、Value+Array等。本節(jié)主要講解Queue和Pipe這兩種方式。Queue和Pipe的區(qū)別在于Pipe常用來在兩個進程間通信,Queue用來在多個進程間實現(xiàn)通信。
首先講解一下Queue通信方式。Queue是多進程安全的隊列,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞。有兩個方法:Put和Get可以進行Queue操作:
·Put方法用以插入數(shù)據(jù)到隊列中,它還有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
·Get方法可以從隊列讀取并且刪除一個元素。同樣,Get方法有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,分兩種情況:如果Queue有一個值可用,則立即返回該值;否則,如果隊列為空,則立即拋出Queue.Empty異常。
下面通過一個例子進行說明:在父進程中創(chuàng)建三個子進程,兩個子進程往Queue中寫入數(shù)據(jù),一個子進程從Queue中讀取數(shù)據(jù)。程序示例如下:
from multiprocessing import Process, Queue
import os, time, random
# 寫數(shù)據(jù)進程執(zhí)行的代碼:
def proc_write(q,urls):
print('Process(%s) is writing...' % os.getpid())
for url in urls:
q.put(url)
print('Put %s to queue...' % url)
time.sleep(random.random())
# 讀數(shù)據(jù)進程執(zhí)行的代碼:
def proc_read(q):
print('Process(%s) is reading...' % os.getpid())
while True:
url = q.get(True)
print('Get %s from queue.' % url)
if __name__=='__main__':
def proc_send(pipe,urls):
for url in urls:
print "Process(%s) send: %s" %(os.getpid(),url)
pipe.send(url)
time.sleep(random.random())
def proc_recv(pipe):
while True:
print "Process(%s) rev:%s" %(os.getpid(),pipe.recv())
time.sleep(random.random())
if __name__ == "__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc_send, args=(pipe[0],['url_'+str(i) for i in range(10) ]))
p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
運行結果如下:
Process(10448) send: url_0 Process(5832) rev:url_0
Process(10448) send: url_1 Process(5832) rev:url_1 Process(10448) send: url_2 Process(5832) rev:url_2 Process(10448) send: url_3 Process(10448) send: url_4 Process(5832) rev:url_3 Process(10448) send: url_5 Process(10448) send: url_6 Process(5832) rev:url_4 Process(5832) rev:url_5 Process(10448) send: url_7 Process(10448) send: url_8 Process(5832) rev:url_6 Process(5832) rev:url_7 Process(10448) send: url_9 Process(5832) rev:url_8 Process(5832) rev:url_9
注意 以上多進程程序運行結果的打印順序在不同的系統(tǒng)和硬件條件下略有不同。
1.4.2 多線程
多線程類似于同時執(zhí)行多個不同程序,多線程運行有如下優(yōu)點:
·可以把運行時間長的任務放到后臺去處理。
·用戶界面可以更加吸引人,比如用戶點擊了一個按鈕去觸發(fā)某些事件的處理,可以彈出一個進度條來顯示處理的進度。
·程序的運行速度可能加快。
·在一些需要等待的任務實現(xiàn)上,如用戶輸入、文件讀寫和網絡收發(fā)數(shù)據(jù)等,線程就比較有用了。在這種情況下我們可以釋放一些珍貴的資源,如內存占用等。
Python的標準庫提供了兩個模塊:thread和threading,thread是低級模塊,threading是高級模塊,對thread進行了封裝。絕大多數(shù)情況下,我們只需要使用threading這個高級模塊。
1.用threading模塊創(chuàng)建多線程
threading模塊一般通過兩種方式創(chuàng)建多線程:第一種方式是把一個函數(shù)傳入并創(chuàng)建Thread實例,然后調用start方法開始執(zhí)行;第二種方式是直接從threading.Thread繼承并創(chuàng)建線程類,然后重寫__init__
方法和run方法。
首先介紹第一種方法,通過一個簡單例子演示創(chuàng)建多線程的流程,程序如下:
import random
import time, threading
# 新線程執(zhí)行的代碼:
def thread_run(urls):
print 'Current %s is running...' % threading.current_thread().namefor url in urls:
print '%s ---->>> %s' % (threading.current_thread().name,url)
time.sleep(random.random())
print '%s ended.' % threading.current_thread().name
print '%s is running...' % threading.current_thread().name
t1 = threading.Thread(target=thread_run, name='Thread_1',args=(['url_1','url_2','
url_3'],))
t2 = threading.Thread(target=thread_run, name='Thread_2',args=(['url_4','url_5','
url_6'],))
t1.start()
t2.start()
t1.join()
t2.join()
print '%s ended.' % threading.current_thread().name
運行結果如下:
MainThread is running...
Current Thread_1 is running...
Thread_1 ---->>> url_1 Current Thread_2 is running...
Thread_2 ---->>> url_4 Thread_1 ---->>> url_2Thread_2 ---->>> url_5 Thread_2 ---->>> url_6 Thread_1 ---->>> url_3 Thread_1 ended.
Thread_2 ended.
MainThread ended.
第二種方式從threading.Thread繼承創(chuàng)建線程類,下面將方法一的程序進行重寫,程序如下:
import random
import threading
import time
class myThread(threading.Thread):
def __init__(self,name,urls):
threading.Thread.__init__(self,name=name)
self.urls = urls
def run(self):
print 'Current %s is running...' % threading.current_thread().name
for url in self.urls:
print '%s ---->>> %s' % (threading.current_thread().name,url)
time.sleep(random.random())
print '%s ended.' % threading.current_thread().name
print '%s is running...' % threading.current_thread().name
t1 = myThread(name='Thread_1',urls=['url_1','url_2','url_3'])t2 = myThread(name='Thread_2',urls=['url_4','url_5','url_6'])
t1.start()
t2.start()
t1.join()
t2.join()
print '%s ended.' % threading.current_thread().name
運行結果如下:
MainThread is running...
Current Thread_1 is running...
Thread_1 ---->>> url_1 Current Thread_2 is running...
Thread_2 ---->>> url_4 Thread_2 ---->>> url_5 Thread_1 ---->>> url_2 Thread_1 ---->>> url_3 Thread_2 ---->>> url_6 Thread_2 ended.
Thread_1 ended.
2.線程同步
如果多個線程共同對某個數(shù)據(jù)修改,則可能出現(xiàn)不可預料的結果,為了保證數(shù)據(jù)的正確性,需要對多個線程進行同步。使用Thread
對象的Lock和RLock可以實現(xiàn)簡單的線程同步,這兩個對象都有
acquire方法和release方法,對于那些每次只允許一個線程操作的數(shù)據(jù),可以將其操作放到acquire和release方法之間。
對于Lock對象而言,如果一個線程連續(xù)兩次進行acquire操作,那么由于第一次acquire之后沒有release,第二次acquire將掛起線程。這會導致Lock對象永遠不會release,使得線程死鎖。RLock對象允許一個線程多次對其進行acquire操作,因為在其內部通過一個
counter變量維護著線程acquire的次數(shù)。而且每一次的acquire操作必須有一個release操作與之對應,在所有的release操作完成之后,別的線程才能申請該RLock對象。下面通過一個簡單的例子演示線程同步的過程:
import threading
mylock = threading.RLock()
num=0 class myThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self,name=name)
def run(self):
global num
while True:
mylock.acquire()
print '%s locked, Number: %d'%(threading.current_thread().name,
num)
if num>=4:mylock.release()
print '%s released, Number: %d'%(threading.current_thread().
name, num)
break
num+=1 print '%s released, Number: %d'%(threading.current_thread().name, num)
mylock.release()
if __name__== '__main__':
thread1 = myThread('Thread_1')
thread2 = myThread('Thread_2')
thread1.start()
thread2.start()
運行結果如下:
Thread_1 locked, Number: 0 Thread_1 released, Number: 1 Thread_1 locked, Number: 1 Thread_1 released, Number: 2 Thread_2 locked, Number: 2 Thread_2 released, Number: 3 Thread_1 locked, Number: 3 Thread_1 released, Number: 4
Thread_2 locked, Number: 4 Thread_2 released, Number: 4 Thread_1 locked, Number: 4 Thread_1 released, Number: 4
3.全局解釋器鎖(GIL)
在Python的原始解釋器CPython中存在著GIL(Global Interpreter
Lock,全局解釋器鎖),因此在解釋執(zhí)行Python代碼時,會產生互斥鎖來限制線程對共享資源的訪問,直到解釋器遇到I/O操作或者操作次數(shù)達到一定數(shù)目時才會釋放GIL。由于全局解釋器鎖的存在,在進行多線程操作的時候,不能調用多個CPU內核,只能利用一個內核,所以在進行CPU密集型操作的時候,不推薦使用多線程,更加傾向于多進程。那么多線程適合什么樣的應用場景呢?對于IO密集型操作,多線程可以明顯提高效率,例如Python爬蟲的開發(fā),絕大多數(shù)時間爬蟲是在等待socket返回數(shù)據(jù),網絡IO的操作延時比CPU大得多。
1.4.3 協(xié)程
協(xié)程(coroutine),又稱微線程,纖程,是一種用戶級的輕量級線程。協(xié)程擁有自己的寄存器上下文和棧。協(xié)程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此協(xié)程能保留上一次調用時的狀態(tài),每次過程重入時,就相當于進入上一次調用的狀態(tài)。在并發(fā)編程中,協(xié)程與線程類似,每個協(xié)程表示一個執(zhí)行單元,有自己的本地數(shù)據(jù),與其他協(xié)程共享全局數(shù)據(jù)和其他資源。
協(xié)程需要用戶自己來編寫調度邏輯,對于CPU來說,協(xié)程其實是單線程,所以CPU不用去考慮怎么調度、切換上下文,這就省去了
CPU的切換開銷,所以協(xié)程在一定程度上又好于多線程。那么在
Python中是如何實現(xiàn)協(xié)程的呢?
Python通過yield提供了對協(xié)程的基本支持,但是不完全,而使用第三方gevent庫是更好的選擇,gevent提供了比較完善的協(xié)程支持。
gevent是一個基于協(xié)程的Python網絡函數(shù)庫,使用greenlet在libev事件循環(huán)頂部提供了一個有高級別并發(fā)性的API。主要特性有以下幾點:
·基于libev的快速事件循環(huán),Linux上是epoll機制。
·基于greenlet的輕量級執(zhí)行單元。
·API復用了Python標準庫里的內容。
·支持SSL的協(xié)作式sockets。
·可通過線程池或c-ares實現(xiàn)DNS查詢。
·通過monkey patching功能使得第三方模塊變成協(xié)作式。
gevent對協(xié)程的支持,本質上是greenlet在實現(xiàn)切換工作。
greenlet工作流程如下:假如進行訪問網絡的IO操作時,出現(xiàn)阻塞,
greenlet就顯式切換到另一段沒有被阻塞的代碼段執(zhí)行,直到原先的阻塞狀況消失以后,再自動切換回原來的代碼段繼續(xù)處理。因此,
greenlet是一種合理安排的串行方式。
由于IO操作非常耗時,經常使程序處于等待狀態(tài),有了gevent為我們自動切換協(xié)程,就保證總有greenlet在運行,而不是等待IO,這就是協(xié)程一般比多線程效率高的原因。由于切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,將一些常見的阻塞,如socket、select等地方實現(xiàn)協(xié)程跳轉,這一過程在啟動時通過
monkey patch完成。下面通過一個的例子來演示gevent的使用流程,代碼如下:
from gevent import monkey; monkey.patch_all()
import gevent
import urllib2 def run_task(url):
print 'Visit --> %s' % url
try:
response = urllib2.urlopen(url)
data = response.read()
print '%d bytes received from %s.' % (len(data), url)
except Exception,e:
print e
if __name__=='__main__':
urls = ['https:// github.com/','https:// www.python.org/','http://www.cnblogs.com/'] greenlets = [gevent.spawn(run_task, url) for url in urls ] gevent.joinall(greenlets)
運行結果如下:
Visit --> https:// github.com/ Visit --> https:// www.python.org/ Visit --> http://www.cnblogs.com/ 45740 bytes received from http://www.cnblogs.com/.
25482 bytes received from https:// github.com/.
47445 bytes received from https:// www.python.org/.
以上程序主要用了gevent中的spawn方法和joinall方法。spawn方法可以看做是用來形成協(xié)程,joinall方法就是添加這些協(xié)程任務,并且啟動運行。從運行結果來看,3個網絡操作是并發(fā)執(zhí)行的,而且結束順序不同,但其實只有一個線程。
gevent中還提供了對池的支持。當擁有動態(tài)數(shù)量的greenlet需要進行并發(fā)管理(限制并發(fā)數(shù))時,就可以使用池,這在處理大量的網絡和IO操作時是非常需要的。接下來使用gevent中pool對象,對上面的例子進行改寫,程序如下:
from gevent import monkey
monkey.patch_all()
import urllib2 from gevent.pool import Pool
def run_task(url):
print 'Visit --> %s' % urltry:
response = urllib2.urlopen(url)
data = response.read()
print '%d bytes received from %s.' % (len(data), url)
except Exception,e:
print e
return 'url:%s --->finish'% url
if __name__=='__main__':
pool = Pool(2)
urls = ['https:// github.com/','https:// www.python.org/','http://www.cnblogs.com/'] results = pool.map(run_task,urls)
print results
運行結果如下:
Visit --> https:// github.com/ Visit --> https:// www.python.org/ 25482 bytes received from https:// github.com/.
Visit --> http://www.cnblogs.com/ 47445 bytes received from https:// www.python.org/.
45687 bytes received from http://www.cnblogs.com/.
['url:https:// github.com/ --->finish', 'url:https:// www.python.org/ --->finish', 'url:http://
www.cnblogs.com/ --->finish']
通過運行結果可以看出,Pool對象確實對協(xié)程的并發(fā)數(shù)量進行了管理,先訪問了前兩個網址,當其中一個任務完成時,才會執(zhí)行第三個。
1.4.4 分布式進程
分布式進程指的是將Process進程分布到多臺機器上,充分利用多臺機器的性能完成復雜的任務。我們可以將這一點應用到分布式爬蟲的開發(fā)中。
分布式進程在Python中依然要用到multiprocessing模塊。
multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。可以寫一個服務進程作為調度者,將任務分布到其他多個進程中,依靠網絡通信進行管理。舉個例子:在做爬蟲程序時,常常會遇到這樣的場景,我們想抓取某個網站的所有圖片,如果使用多進程的話,一般是一個進程負責抓取圖片的鏈接地址,將鏈接地址存放到Queue中,另外的進程負責從Queue中讀取鏈接地址進行下載和存儲到本地?,F(xiàn)在把這個過程做成分布式,一臺機器上的進程負責抓取鏈接,其他機器上的進程負責下載存儲。那么遇到的主要問題是將Queue暴露到網絡中,讓其他機器進程都可以訪問,分布式進程就是將這一個過程進行了封裝,我們可以將這個過程稱為本地隊列的網絡化。整體過程如圖1-24所示。
圖1-24 分布式進程
要實現(xiàn)上面例子的功能,創(chuàng)建分布式進程需要分為六個步驟:
1)建立隊列Queue,用來進行進程間的通信。服務進程創(chuàng)建任務隊列task_queue,用來作為傳遞任務給任務進程的通道;服務進程創(chuàng)建結果隊列result_queue,作為任務進程完成任務后回復服務進程的通道。在分布式多進程環(huán)境下,必須通過由Queuemanager獲得的
Queue接口來添加任務。
2)把第一步中建立的隊列在網絡上注冊,暴露給其他進程(主機),注冊后獲得網絡隊列,相當于本地隊列的映像。
3)建立一個對象(Queuemanager(BaseManager))實例
manager,綁定端口和驗證口令。
4)啟動第三步中建立的實例,即啟動管理manager,監(jiān)管信息通道。
5)通過管理實例的方法獲得通過網絡訪問的Queue對象,即再把網絡隊列實體化成可以使用的本地隊列。
6)創(chuàng)建任務到“本地”隊列中,自動上傳任務到網絡隊列中,分配給任務進程進行處理。
接下來通過程序實現(xiàn)上面的例子(Linux版),首先編寫的是服務進程(taskManager.py),代碼如下:
import random,time,Queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()
class Queuemanager(BaseManager):
pass
# 第二步:把創(chuàng)建的兩個隊列注冊在網絡上,利用register方法,
callable參數(shù)關聯(lián)了Queue對象,
# 將Queue對象在網絡中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
# 第三步:綁定端口8001,設置驗證口令‘qiye’。
這個相當于對象的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')
# 第四步:啟動管理,監(jiān)聽信息通道
manager.start()
# 第五步:通過管理實例的方法獲得通過網絡訪問的Queue對象
task=manager.get_task_queue()
result=manager.get_result_queue()
# 第六步:添加任務
for url in ["ImageUrl_"+i for i in range(10)]:
print 'put task %s ...' %url
task.put(url)
# 獲取返回結果
print 'try get result...'
for i in range(10):
print 'result is %s' %result.get(timeout=10)
# 關閉管理
manager.shutdown()
任務進程已經編寫完成,接下來編寫任務進程(taskWorker.py),創(chuàng)建任務進程的步驟相對較少,需要四個步驟:
1)使用QueueManager注冊用于獲取Queue的方法名稱,任務進程只能通過名稱來在網絡上獲取Queue。
2)連接服務器,端口和驗證口令注意保持與服務進程中完全一致。
3)從網絡上獲取Queue,進行本地化。
4)從task隊列獲取任務,并把結果寫入result隊列。
程序taskWorker.py代碼(win/linux版)如下:
# coding:utf-8 import time
from multiprocessing.managers import BaseManager
# 創(chuàng)建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 第一步:使用QueueManager注冊用于獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:連接到服務器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證口令注意保持與服務進程完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡連接:
m.connect()
# 第三步:獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:從task隊列獲取任務,并把結果寫入result隊列:
while(not task.empty()):
image_url = task.get(True,timeout=5)
print('run task download %s...' % image_url)
time.sleep(1)
result.put('%s--->success'%image_url)
# 處理結束:
print('worker exit.')
最后開始運行程序,先啟動服務進程taskManager.py,運行結果
如下:
put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...
接著再啟動任務進程taskWorker.py,運行結果如下:
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.
當任務進程運行結束后,服務進程運行結果如下:
result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success
其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,實現(xiàn)大規(guī)模的分布式爬蟲。
注意 由于平臺的特性,創(chuàng)建服務進程的代碼在Linux和
Windows上有一些不同,創(chuàng)建工作進程的代碼是一致的。
taskManager.py程序在Windows版下的代碼如下:
# coding:utf-8 # taskManager.py for windows
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務個數(shù)
task_number = 10 # 定義收發(fā)隊列
task_queue = Queue.Queue(task_number);
result_queue = Queue.Queue(task_number);
def get_task():
return task_queue
def get_result():
return result_queue
# 創(chuàng)建類似的QueueManager:
class QueueManager(BaseManager):
pass
def win_run():
# Windows下綁定調用接口不能使用lambda,所以只能先定義函數(shù)再綁定
QueueManager.register('get_task_queue',callable = get_task)
QueueManager.register('get_result_queue',callable = get_result)
# 綁定端口并設置驗證口令,Windows下需要填寫IP地址,
Linux下不填默認為本地
manager = QueueManager(address = ('127.0.0.1',8001),authkey = 'qiye')
# 啟動
manager.start()
try:
# 通過網絡獲取任務隊列和結果隊列
task = manager.get_task_queue()
result = manager.get_result_queue()
# 添加任務
for url in ["ImageUrl_"+str(i) for i in range(10)]:
print 'put task %s ...' %url
task.put(url)
print 'try get result...'
for i in range(10):
print 'result is %s' %result.get(timeout=10)
except:
print('Manager error')
finally:
# 一定要關閉,否則會報管道未關閉的錯誤
manager.shutdown()
if __name__ == '__main__':
# Windows下多進程可能會有問題,添加這句可以緩解
freeze_support()
win_run()
如對本文有疑問,請?zhí)峤坏浇涣髡搲?,廣大熱心網友會為你解答!! 點擊進入論壇