PHP本身是不是支持多線程的,不過(guò)我們可以借助其他的方法來(lái)實(shí)現(xiàn)多線程,比如 shell 服務(wù),比如 web 服務(wù)器,本文我們來(lái)講講這兩個(gè)方法如何實(shí)現(xiàn)。需要的朋友可以來(lái)參考一下。
多線程是java中一個(gè)很不錯(cuò)的東西,很多朋友說(shuō)在php中不可以使用PHP多線程了,其實(shí)那是錯(cuò)誤的說(shuō)法PHP多線程實(shí)現(xiàn)方法和fsockopen函數(shù)有關(guān),下面我們來(lái)介紹具體實(shí)現(xiàn)程序代碼,有需要了解的同學(xué)可參考。
當(dāng)有人想要實(shí)現(xiàn)并發(fā)功能時(shí),他們通常會(huì)想到用fork或者spawn threads,但是當(dāng)他們發(fā)現(xiàn)php不支持多線程的時(shí)候,大概會(huì)轉(zhuǎn)換思路去用一些不夠好的語(yǔ)言,比如perl。
其實(shí)的是大多數(shù)情況下,你大可不必使用 fork 或者線程,并且你會(huì)得到比用 fork 或 thread 更好的性能。
假設(shè)你要建立一個(gè)服務(wù)來(lái)檢查正在運(yùn)行的n臺(tái)服務(wù)器,以確定他們還在正常運(yùn)轉(zhuǎn)。你可能會(huì)寫下面這樣的代碼:
代碼如下:
<?php $hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com"); $timeout = 15; $status = array(); foreach ($hosts as $host) { $errno = 0; $errstr = ""; $s = fsockopen($host, 80, $errno, $errstr, $timeout); if ($s) { $status[$host] = "Connectedn"; fwrite($s, "HEAD / HTTP/1.0rnHost: $hostrnrn"); do { $data = fread($s, 8192); if (strlen($data) == 0) { break; } $status[$host] .= $data; } while (true); fclose($s); } else { $status[$host] = "Connection failed: $errno $errstrn"; } } print_r($status); ?>
它運(yùn)行的很好,但是在fsockopen()分析完hostname并且建立一個(gè)成功的連接(或者延時(shí)$timeout秒)之前,擴(kuò)充這段代碼來(lái)管理大量服務(wù)器將耗費(fèi)很長(zhǎng)時(shí)間。
因此我們必須放棄這段代碼;我們可以建立異步連接-不需要等待fsockopen返回連接狀態(tài)。PHP仍然需要解析hostname(所以直接使用ip更加明智),不過(guò)將在打開一個(gè)連接之后立刻返回,繼而我們就可以連接下一臺(tái)服務(wù)器。
有兩種方法可以實(shí)現(xiàn);PHP5中可以使用新增的stream_socket_client()函數(shù)直接替換掉fsocketopen()。PHP5之前的版本,你需要自己動(dòng)手,用sockets擴(kuò)展解決問(wèn)題。
下面是PHP5中的解決方法:
它運(yùn)行的很好,但是在fsockopen()分析完hostname并且建立一個(gè)成功的連接(或者延時(shí)$timeout秒)之前,擴(kuò)充這段代碼來(lái)管理大量服務(wù)器將耗費(fèi)很長(zhǎng)時(shí)間。
因此我們必須放棄這段代碼;我們可以建立異步連接-不需要等待fsockopen返回連接狀態(tài)。PHP仍然需要解析hostname(所以直接使用ip更加明智),不過(guò)將在打開一個(gè)連接之后立刻返回,繼而我們就可以連接下一臺(tái)服務(wù)器。
有兩種方法可以實(shí)現(xiàn);PHP5中可以使用新增的stream_socket_client()函數(shù)直接替換掉fsocketopen()。PHP5之前的版本,你需要自己動(dòng)手,用sockets擴(kuò)展解決問(wèn)題。
代碼如下:
<?php $hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com"); $timeout = 15; $status = array(); $sockets = array(); /* Initiate connections to all the hosts simultaneously */ foreach ($hosts as $id => $host) { $s = stream_socket_client(" $ $host:80", $errno, $errstr, $timeout, STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT); if ($s) { $sockets[$id] = $s; $status[$id] = "in progress"; } else { $status[$id] = "failed, $errno $errstr"; } } /* Now, wait for the results to come back in */ while (count($sockets)) { $read = $write = $sockets; /* This is the magic function - explained below */ $n = stream_select($read, $write, $e = null, $timeout); if ($n > 0) { /* readable sockets either have data for us, or are failed * connection attempts */ foreach ($read as $r) { $id = array_search($r, $sockets); $data = fread($r, 8192); if (strlen($data) == 0) { if ($status[$id] == "in progress") { $status[$id] = "failed to connect"; } fclose($r); unset($sockets[$id]); } else { $status[$id] .= $data; } } /* writeable sockets can accept an HTTP request */ foreach ($write as $w) { $id = array_search($w, $sockets); fwrite($w, "HEAD / HTTP/1.0rnHost: " . $hosts[$id] . "rnrn"); $status[$id] = "waiting for response"; } } else { /* timed out waiting; assume that all hosts associated * with $sockets are faulty */ foreach ($sockets as $id => $s) { $status[$id] = "timed out " . $status[$id]; } break; } } foreach ($hosts as $id => $host) { echo "Host: $hostn"; echo "Status: " . $status[$id] . "nn"; } ?>
我們用stream_select()等待sockets打開的連接事件。stream_select()調(diào)用系統(tǒng)的select(2)函數(shù)來(lái)工作:前面三個(gè)參數(shù)是你要使用的streams的數(shù)組;你可以對(duì)其讀取,寫入和獲取異常(分別針對(duì)三個(gè)參數(shù))。stream_select()可以通過(guò)設(shè)置$timeout(秒)參數(shù)來(lái)等待事件發(fā)生-事件發(fā)生時(shí),相應(yīng)的sockets數(shù)據(jù)將寫入你傳入的參數(shù)。
下面是PHP4.1.0之后版本的實(shí)現(xiàn),如果你已經(jīng)在編譯PHP時(shí)包含了sockets(ext/sockets)支持,你可以使用根上面類似的代碼,只是需要將上面的streams/filesystem函數(shù)的功能用ext/sockets函數(shù)實(shí)現(xiàn)。主要的不同在于我們用下面的函數(shù)代替stream_socket_client()來(lái)建立連接:
代碼如下:
<?php // This value is correct for Linux, other systems have other values define('EINPROGRESS', 115); function non_blocking_connect($host, $port, &$errno, &$errstr, $timeout) { $ip = gethostbyname($host); $s = socket_create(AF_INET, SOCK_STREAM, 0); if (socket_set_nonblock($s)) { $r = @socket_connect($s, $ip, $port); if ($r || socket_last_error() == EINPROGRESS) { $errno = EINPROGRESS; return $s; } } $errno = socket_last_error($s); $errstr = socket_strerror($errno); socket_close($s); return false; } ?>
現(xiàn)在用socket_select()替換掉stream_select(),用socket_read()替換掉fread(),用socket_write()替換掉fwrite(),用socket_close()替換掉fclose()就可以執(zhí)行腳本了!
PHP5的先進(jìn)之處在于,你可以用stream_select()處理幾乎所有的stream-例如你可以通過(guò)include STDIN用它接收鍵盤輸入并保存進(jìn)數(shù)組,你還可以接收通過(guò)proc_open()打開的管道中的數(shù)據(jù)。
下面來(lái)分享一個(gè)PHP多線程類
代碼如下:
class thread { var $hooks = array(); var $args = array(); function thread() { } function addthread($func) { $args = array_slice(func_get_args(), 1); $this->hooks[] = $func; $this->args[] = $args; return true; } function runthread() { if(isset($_GET['flag'])) { $flag = intval($_GET['flag']); } if($flag || $flag === 0) { call_user_func_array($this->hooks[$flag], $this->args[$flag]); } else { for($i = 0, $size = count($this->hooks); $i < $size; $i++) { $fp=fsockopen($_SERVER['HTTP_HOST'],$_SERVER['SERVER_PORT']); if($fp) { $out = "GET {$_SERVER['PHP_SELF']}?flag=$i HTTP/1.1rn"; $out .= "Host: {$_SERVER['HTTP_HOST']}rn"; $out .= "Connection: Closernrn"; fputs($fp,$out); fclose($fp); } } } } }
項(xiàng)目情況:
要同步300W+的用戶數(shù)據(jù)到qcloud,只能每次一個(gè)curl同步,大概每秒同步3個(gè),算下來(lái)同步完300W數(shù)據(jù)估計(jì)要10天+,所以想到用多線程解決。
方案1:
用c++寫多進(jìn)程方案,fork多個(gè)進(jìn)程出來(lái)解決。主線程負(fù)責(zé)讀取/存儲(chǔ)數(shù)據(jù),子線程負(fù)責(zé)curl。
方案2:
用php寫多進(jìn)程方案,同方案1.主線程讀取,子線程curl。
但是由于php沒有多進(jìn)程,不能直接操控線程/進(jìn)程。所以只能依賴于linux來(lái)實(shí)現(xiàn)多進(jìn)程。
php函數(shù)pcntl_fork()可以創(chuàng)建進(jìn)程,等同于linux的fork。
和fork不同的是,pcntl_fork返回的0是子進(jìn)程,返回的id是子進(jìn)程的pid(而fork是父進(jìn)程返回0,子進(jìn)程返回ppid).
注意pcntl_fork()函數(shù)必須要在linux上面才行,據(jù)說(shuō)要加載pcntl.so模塊,但貌似不加載也行。
方案3:
php+shell模擬多線程
例如:test.php文件實(shí)現(xiàn)了項(xiàng)目所需功能(包含數(shù)據(jù)庫(kù)讀寫和curl)
再寫一份shell如下,保存為start.sh:
#!/bin/bash //指定bash,必須
for(( i=0; i<20; i++))
do
php test.php & //執(zhí)行test.php,&符號(hào)的意思是把該操作放在后臺(tái)執(zhí)行,這樣shell就可以繼續(xù)執(zhí)行下一步sleep命令了。如果沒有這個(gè)符號(hào),shell會(huì)阻塞在這里。
sleep 1s
done
最后執(zhí)行該shell,sudo ./start.sh
shell會(huì)啟動(dòng)20個(gè)進(jìn)程同時(shí)分別執(zhí)行test.php
但是由于每個(gè)進(jìn)程都相當(dāng)于主進(jìn)程,所以共同的資源不好控制,這個(gè)case中,共同的資源只有數(shù)據(jù)庫(kù),所以每次操作數(shù)據(jù)庫(kù)我都加鎖詳見我另一篇文章。
因?yàn)橄鄬?duì)于數(shù)據(jù)庫(kù)操作基本都是每秒能解決,而curl連續(xù)發(fā)個(gè)幾百個(gè)需要幾十秒甚至更多,所以數(shù)據(jù)庫(kù)加鎖影響不大。
我們?cè)谧鲰?xiàng)目的時(shí)候,有些需求,特別是數(shù)據(jù)的響應(yīng)處理需要花費(fèi)大量的時(shí)間,由于php是一個(gè)短生命周期的腳本語(yǔ)言,到了默認(rèn)的30秒,php的數(shù)據(jù)處理還沒完成,php的生命周期就結(jié)束了。這時(shí)需要使用異步并發(fā)處理策略,也就是說(shuō),一次php調(diào)用可以發(fā)出的多個(gè)請(qǐng)求,這些請(qǐng)求不是按照順序執(zhí)行,而是可以異步并發(fā)執(zhí)行的,一些請(qǐng)求用于在后臺(tái)處理數(shù)據(jù),一些請(qǐng)求用于接受后臺(tái)響應(yīng)狀態(tài),根據(jù)狀態(tài),與用戶做一些簡(jiǎn)單的交互。但是問(wèn)題來(lái)了,我們都知道php本身是不支持多線程的,那么應(yīng)該怎么實(shí)現(xiàn)php的多線程呢?
1、linux下的php多線程
下面所講的東西是源自php的pcntl_fork函數(shù).因?yàn)檫@個(gè)函數(shù)依賴操作系統(tǒng)fork的實(shí)現(xiàn),所以本文所講的東西只適用于linux/unix。那么先看看這個(gè)函數(shù)的用法吧.php手冊(cè)上是這么說(shuō)的:
<?php
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
// we are the parent
pcntl_wait($status); //Protect against Zombie children
} else {
// we are the child
}
?>
通過(guò)pcntl_fork創(chuàng)建一個(gè)子進(jìn)程,如果返回值是-1的話,那么說(shuō)明子進(jìn)程創(chuàng)建失敗.創(chuàng)建成功的進(jìn)程id會(huì)返回給父進(jìn)程,0返回給子進(jìn)程.不好理解吧,所以應(yīng)該這樣寫:
<?php
$pid = pcntl_fork();
if($pid == -1){
//創(chuàng)建失敗咱就退出唄,沒啥好說(shuō)的
die('could not fork');
}
else{
if($pid){
//從這里開始寫的代碼是父進(jìn)程的,因?yàn)閷懙氖窍到y(tǒng)程序,記得退出的時(shí)候給個(gè)返回值
exit(0);
}
else{
//從這里開始寫的代碼都是在新的進(jìn)程里執(zhí)行的,同樣正常退出的話,最好也給一個(gè)返回值
exit(0);
}
}
?>
這樣一改好理解多了,如果你父進(jìn)程希望知道子進(jìn)程正常退出的話,可以加上前面的pcntl_wait。
2.通過(guò)stream_socket_client 方式
function sendStream() {
$english_format_number = number_format($number, 4, '.', '');
echo $english_format_number;
exit();
$timeout = 10;
$result = array();
$sockets = array();
$convenient_read_block = 8192;
$host = "test.local.com";
$sql = "select waybill_id,order_id from xm_waybill where status>40 order by update_time desc limit 1 ";
$data = Yii::app()->db->createCommand($sql)->queryAll();
$id = 0;
foreach ($data as $k => $v) {
if ($k % 2 == 0) {
$send_data[$k]['body'] = NoticeOrder::getSendData($v['waybill_id']);
} else {
$send_data[$k]['body'] = array($v['order_id'] => array('extra' => 16));
}
$data = json_encode($send_data[$k]['body']);
$s = stream_socket_client($host . ":80", $errno, $errstr, $timeout, STREAM_CLIENT_ASYNC_CONNECT | STREAM_CLIENT_CONNECT);
if ($s) {
$sockets[$id++] = $s;
$http_message = "GET /php/test.php?data=" . $data . " HTTP/1.0\r\nHost:" . $host . "\r\n\r\n";
fwrite($s, $http_message);
} else {
echo "Stream " . $id . " failed to open correctly.";
}
}
while (count($sockets)) {
$read = $sockets;
stream_select($read, $w = null, $e = null, $timeout);
if (count($read)) {
/* stream_select generally shuffles $read, so we need to
compute from which socket(s) we're reading. */
foreach ($read as $r) {
$id = array_search($r, $sockets);
$data = fread($r, $convenient_read_block);
if (strlen($data) == 0) {
echo "Stream " . $id . " closes at " . date('h:i:s') . ".<br> ";
fclose($r);
unset($sockets[$id]);
} else {
$result[$id] = $data;
}
}
} else {
/* A time-out means that *all* streams have failed
to receive a response. */
echo "Time-out!\n";
break;
}
}
print_r($result);
}
3、通過(guò)多進(jìn)程代替多線程
function daemon($func_name,$args,$number){
while(true){
$pid=pcntl_fork();
if($pid==-1){
echo "fork process fail";
exit();
}elseif($pid){//創(chuàng)建的子進(jìn)程
static $num=0;
$num++;
if($num>=$number){
//當(dāng)進(jìn)程數(shù)量達(dá)到一定數(shù)量時(shí)候,就對(duì)子進(jìn)程進(jìn)行回收。
pcntl_wait($status);
$num--;
}
}else{ //為0 則代表是子進(jìn)程創(chuàng)建的,則直接進(jìn)入工作狀態(tài)
if(function_exists($func_name)){
while (true) {
$ppid=posix_getpid();
var_dump($ppid);
call_user_func_array($func_name,$args);
sleep(2);
}
}else{
echo "function is not exists";
}
exit();
}
}
}
function worker($args){
//do something
}
daemon('worker',array(1),2);
下一頁(yè)找到版本2的
下載下來(lái),這個(gè)v2 才是php5才可以使用的
下載下來(lái),安裝:
或者,您直接這樣下載:
cd /tools
wget https://github.com/krakjoe/pthreads/archive/v2.0.10.zip
unzip v2.0.10.zip
cd pthreads-2.0.10
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make
make install
注意:您的php 在編譯的時(shí)候需要開啟 –enable-maintainer-zts
./configure --prefix=/usr/local/php --disable-fileinfo --enable-fpm --with-config-file-path=/etc --with-config-file-scan-dir=/etc/php.d --with-openssl --with-zlib --with-curl --enable-ftp --with-gd --with-xmlrpc --with-jpeg-dir --with-png-dir --with-freetype-dir --enable-gd-native-ttf --enable-mbstring --with-mcrypt=/usr/local/libmcrypt --enable-zip --with-mysql=/usr/local/mysql --without-pear --enable-maintainer-zts
vim /etc/php.ini
添加
extension=pthreads.so
重啟php
/etc/init.d/php-fpm restart
如對(duì)本文有疑問(wèn),請(qǐng)?zhí)峤坏浇涣髡搲?,廣大熱心網(wǎng)友會(huì)為你解答??! 點(diǎn)擊進(jìn)入論壇