五月综合缴情婷婷六月,色94色欧美sute亚洲线路二,日韩制服国产精品一区,色噜噜一区二区三区,香港三级午夜理伦三级三

您現(xiàn)在的位置: 365建站網(wǎng) > 365文章 > PHP中fsockopen函數(shù)多線程的實(shí)現(xiàn)方法

PHP中fsockopen函數(shù)多線程的實(shí)現(xiàn)方法

文章來(lái)源:365jz.com     點(diǎn)擊數(shù):530    更新時(shí)間:2017-12-17 09:07   參與評(píng)論

PHP本身是不是支持多線程的,不過(guò)我們可以借助其他的方法來(lái)實(shí)現(xiàn)多線程,比如 shell 服務(wù),比如 web 服務(wù)器,本文我們來(lái)講講這兩個(gè)方法如何實(shí)現(xiàn)。需要的朋友可以來(lái)參考一下。

多線程是java中一個(gè)很不錯(cuò)的東西,很多朋友說(shuō)在php中不可以使用PHP多線程了,其實(shí)那是錯(cuò)誤的說(shuō)法PHP多線程實(shí)現(xiàn)方法和fsockopen函數(shù)有關(guān),下面我們來(lái)介紹具體實(shí)現(xiàn)程序代碼,有需要了解的同學(xué)可參考。

當(dāng)有人想要實(shí)現(xiàn)并發(fā)功能時(shí),他們通常會(huì)想到用fork或者spawn threads,但是當(dāng)他們發(fā)現(xiàn)php不支持多線程的時(shí)候,大概會(huì)轉(zhuǎn)換思路去用一些不夠好的語(yǔ)言,比如perl。

其實(shí)的是大多數(shù)情況下,你大可不必使用 fork 或者線程,并且你會(huì)得到比用 fork 或 thread 更好的性能。

假設(shè)你要建立一個(gè)服務(wù)來(lái)檢查正在運(yùn)行的n臺(tái)服務(wù)器,以確定他們還在正常運(yùn)轉(zhuǎn)。你可能會(huì)寫下面這樣的代碼:

代碼如下:

<?php
$hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
$timeout = 15;
$status = array();
foreach ($hosts as $host) {
$errno = 0;
 $errstr = "";
 $s = fsockopen($host, 80, $errno, $errstr, $timeout);
 if ($s) {
 $status[$host] = "Connectedn";
 fwrite($s, "HEAD / HTTP/1.0rnHost: $hostrnrn");
 do {
 $data = fread($s, 8192);
 if (strlen($data) == 0) {
 break;
 }
 $status[$host] .= $data;
 } while (true);
 fclose($s);
 } else {
 $status[$host] = "Connection failed: $errno $errstrn";
 }
}
print_r($status);
?> 

它運(yùn)行的很好,但是在fsockopen()分析完hostname并且建立一個(gè)成功的連接(或者延時(shí)$timeout秒)之前,擴(kuò)充這段代碼來(lái)管理大量服務(wù)器將耗費(fèi)很長(zhǎng)時(shí)間。

因此我們必須放棄這段代碼;我們可以建立異步連接-不需要等待fsockopen返回連接狀態(tài)。PHP仍然需要解析hostname(所以直接使用ip更加明智),不過(guò)將在打開一個(gè)連接之后立刻返回,繼而我們就可以連接下一臺(tái)服務(wù)器。

有兩種方法可以實(shí)現(xiàn);PHP5中可以使用新增的stream_socket_client()函數(shù)直接替換掉fsocketopen()。PHP5之前的版本,你需要自己動(dòng)手,用sockets擴(kuò)展解決問(wèn)題。

下面是PHP5中的解決方法:

它運(yùn)行的很好,但是在fsockopen()分析完hostname并且建立一個(gè)成功的連接(或者延時(shí)$timeout秒)之前,擴(kuò)充這段代碼來(lái)管理大量服務(wù)器將耗費(fèi)很長(zhǎng)時(shí)間。

因此我們必須放棄這段代碼;我們可以建立異步連接-不需要等待fsockopen返回連接狀態(tài)。PHP仍然需要解析hostname(所以直接使用ip更加明智),不過(guò)將在打開一個(gè)連接之后立刻返回,繼而我們就可以連接下一臺(tái)服務(wù)器。

有兩種方法可以實(shí)現(xiàn);PHP5中可以使用新增的stream_socket_client()函數(shù)直接替換掉fsocketopen()。PHP5之前的版本,你需要自己動(dòng)手,用sockets擴(kuò)展解決問(wèn)題。

 代碼如下: 

<?php
$hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
$timeout = 15;
$status = array();
$sockets = array();
/* Initiate connections to all the hosts simultaneously */
foreach ($hosts as $id => $host) {
 $s = stream_socket_client("
$
$host:80", $errno, $errstr, $timeout,
 STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
 if ($s) {
 $sockets[$id] = $s;
 $status[$id] = "in progress";
 } else {
 $status[$id] = "failed, $errno $errstr";
 }
}
/* Now, wait for the results to come back in */
while (count($sockets)) {
 $read = $write = $sockets;
 /* This is the magic function - explained below */
 $n = stream_select($read, $write, $e = null, $timeout);
 if ($n > 0) {
 /* readable sockets either have data for us, or are failed
 * connection attempts */
 foreach ($read as $r) {
  $id = array_search($r, $sockets);
  $data = fread($r, 8192);
  if (strlen($data) == 0) {
 if ($status[$id] == "in progress") {
 $status[$id] = "failed to connect";
 }
 fclose($r);
 unset($sockets[$id]);
  } else {
 $status[$id] .= $data;
  }
 }
 /* writeable sockets can accept an HTTP request */
 foreach ($write as $w) {
 $id = array_search($w, $sockets);
 fwrite($w, "HEAD / HTTP/1.0rnHost: "
 . $hosts[$id] . "rnrn");
 $status[$id] = "waiting for response";
 }
 } else {
 /* timed out waiting; assume that all hosts associated
 * with $sockets are faulty */
 foreach ($sockets as $id => $s) {
 $status[$id] = "timed out " . $status[$id];
 }
 break;
 }
}
foreach ($hosts as $id => $host) {
 echo "Host: $hostn";
 echo "Status: " . $status[$id] . "nn";
} 
?> 

我們用stream_select()等待sockets打開的連接事件。stream_select()調(diào)用系統(tǒng)的select(2)函數(shù)來(lái)工作:前面三個(gè)參數(shù)是你要使用的streams的數(shù)組;你可以對(duì)其讀取,寫入和獲取異常(分別針對(duì)三個(gè)參數(shù))。stream_select()可以通過(guò)設(shè)置$timeout(秒)參數(shù)來(lái)等待事件發(fā)生-事件發(fā)生時(shí),相應(yīng)的sockets數(shù)據(jù)將寫入你傳入的參數(shù)。

下面是PHP4.1.0之后版本的實(shí)現(xiàn),如果你已經(jīng)在編譯PHP時(shí)包含了sockets(ext/sockets)支持,你可以使用根上面類似的代碼,只是需要將上面的streams/filesystem函數(shù)的功能用ext/sockets函數(shù)實(shí)現(xiàn)。主要的不同在于我們用下面的函數(shù)代替stream_socket_client()來(lái)建立連接:

代碼如下:

<?php
// This value is correct for Linux, other systems have other values
define('EINPROGRESS', 115);
function non_blocking_connect($host, $port, &$errno, &$errstr, $timeout) {
 $ip = gethostbyname($host);
 $s = socket_create(AF_INET, SOCK_STREAM, 0);
 if (socket_set_nonblock($s)) {
 $r = @socket_connect($s, $ip, $port);
 if ($r || socket_last_error() == EINPROGRESS) {
 $errno = EINPROGRESS;
 return $s;
 }
 }
 $errno = socket_last_error($s);
 $errstr = socket_strerror($errno);
 socket_close($s);
 return false;
}
?> 

現(xiàn)在用socket_select()替換掉stream_select(),用socket_read()替換掉fread(),用socket_write()替換掉fwrite(),用socket_close()替換掉fclose()就可以執(zhí)行腳本了!

PHP5的先進(jìn)之處在于,你可以用stream_select()處理幾乎所有的stream-例如你可以通過(guò)include STDIN用它接收鍵盤輸入并保存進(jìn)數(shù)組,你還可以接收通過(guò)proc_open()打開的管道中的數(shù)據(jù)。

下面來(lái)分享一個(gè)PHP多線程類

代碼如下:

class thread {
 
 var $hooks = array();
 var $args = array();
 
 function thread() {
 }
 
 function addthread($func)
 {
  $args = array_slice(func_get_args(), 1);
  $this->hooks[] = $func;
  $this->args[] = $args;
  return true;
 }
 
 function runthread()
 {
  if(isset($_GET['flag']))
  {
   $flag = intval($_GET['flag']);
  }
  if($flag || $flag === 0)
  {
   call_user_func_array($this->hooks[$flag], $this->args[$flag]);
  }
  else
  {
   for($i = 0, $size = count($this->hooks); $i < $size; $i++)
   {
    $fp=fsockopen($_SERVER['HTTP_HOST'],$_SERVER['SERVER_PORT']);
    if($fp)
    {
     $out = "GET {$_SERVER['PHP_SELF']}?flag=$i HTTP/1.1rn";
     $out .= "Host: {$_SERVER['HTTP_HOST']}rn";
     $out .= "Connection: Closernrn";
     fputs($fp,$out);
     fclose($fp);
    }
   }
  }
 }
}

 

項(xiàng)目情況:

要同步300W+的用戶數(shù)據(jù)到qcloud,只能每次一個(gè)curl同步,大概每秒同步3個(gè),算下來(lái)同步完300W數(shù)據(jù)估計(jì)要10天+,所以想到用多線程解決。

方案1:

用c++寫多進(jìn)程方案,fork多個(gè)進(jìn)程出來(lái)解決。主線程負(fù)責(zé)讀取/存儲(chǔ)數(shù)據(jù),子線程負(fù)責(zé)curl。

 

方案2:

用php寫多進(jìn)程方案,同方案1.主線程讀取,子線程curl。

但是由于php沒有多進(jìn)程,不能直接操控線程/進(jìn)程。所以只能依賴于linux來(lái)實(shí)現(xiàn)多進(jìn)程。

php函數(shù)pcntl_fork()可以創(chuàng)建進(jìn)程,等同于linux的fork。

和fork不同的是,pcntl_fork返回的0是子進(jìn)程,返回的id是子進(jìn)程的pid(而fork是父進(jìn)程返回0,子進(jìn)程返回ppid).

注意pcntl_fork()函數(shù)必須要在linux上面才行,據(jù)說(shuō)要加載pcntl.so模塊,但貌似不加載也行。

 

方案3:

php+shell模擬多線程

例如:test.php文件實(shí)現(xiàn)了項(xiàng)目所需功能(包含數(shù)據(jù)庫(kù)讀寫和curl)

再寫一份shell如下,保存為start.sh:

#!/bin/bash  //指定bash,必須

for(( i=0; i<20; i++))
do
    php test.php &    //執(zhí)行test.php,&符號(hào)的意思是把該操作放在后臺(tái)執(zhí)行,這樣shell就可以繼續(xù)執(zhí)行下一步sleep命令了。如果沒有這個(gè)符號(hào),shell會(huì)阻塞在這里。
    sleep 1s
done

最后執(zhí)行該shell,sudo ./start.sh

shell會(huì)啟動(dòng)20個(gè)進(jìn)程同時(shí)分別執(zhí)行test.php

但是由于每個(gè)進(jìn)程都相當(dāng)于主進(jìn)程,所以共同的資源不好控制,這個(gè)case中,共同的資源只有數(shù)據(jù)庫(kù),所以每次操作數(shù)據(jù)庫(kù)我都加鎖詳見我另一篇文章。

 因?yàn)橄鄬?duì)于數(shù)據(jù)庫(kù)操作基本都是每秒能解決,而curl連續(xù)發(fā)個(gè)幾百個(gè)需要幾十秒甚至更多,所以數(shù)據(jù)庫(kù)加鎖影響不大。
 

我們?cè)谧鲰?xiàng)目的時(shí)候,有些需求,特別是數(shù)據(jù)的響應(yīng)處理需要花費(fèi)大量的時(shí)間,由于php是一個(gè)短生命周期的腳本語(yǔ)言,到了默認(rèn)的30秒,php的數(shù)據(jù)處理還沒完成,php的生命周期就結(jié)束了。這時(shí)需要使用異步并發(fā)處理策略,也就是說(shuō),一次php調(diào)用可以發(fā)出的多個(gè)請(qǐng)求,這些請(qǐng)求不是按照順序執(zhí)行,而是可以異步并發(fā)執(zhí)行的,一些請(qǐng)求用于在后臺(tái)處理數(shù)據(jù),一些請(qǐng)求用于接受后臺(tái)響應(yīng)狀態(tài),根據(jù)狀態(tài),與用戶做一些簡(jiǎn)單的交互。但是問(wèn)題來(lái)了,我們都知道php本身是不支持多線程的,那么應(yīng)該怎么實(shí)現(xiàn)php的多線程呢?

 

一、php模擬實(shí)現(xiàn)多線程的三種方法

 

1、linux下的php多線程

下面所講的東西是源自php的pcntl_fork函數(shù).因?yàn)檫@個(gè)函數(shù)依賴操作系統(tǒng)fork的實(shí)現(xiàn),所以本文所講的東西只適用于linux/unix。那么先看看這個(gè)函數(shù)的用法吧.php手冊(cè)上是這么說(shuō)的:

 

<?php
$pid = pcntl_fork();
if ($pid == -1) {
         die('could not fork');
} else if ($pid) {
         // we are the parent
         pcntl_wait($status); //Protect against Zombie children
} else {
         // we are the child
}
?>

 

 

通過(guò)pcntl_fork創(chuàng)建一個(gè)子進(jìn)程,如果返回值是-1的話,那么說(shuō)明子進(jìn)程創(chuàng)建失敗.創(chuàng)建成功的進(jìn)程id會(huì)返回給父進(jìn)程,0返回給子進(jìn)程.不好理解吧,所以應(yīng)該這樣寫:

 

<?php
$pid = pcntl_fork();
if($pid == -1){
         //創(chuàng)建失敗咱就退出唄,沒啥好說(shuō)的
         die('could not fork');
}
else{
        if($pid){
                //從這里開始寫的代碼是父進(jìn)程的,因?yàn)閷懙氖窍到y(tǒng)程序,記得退出的時(shí)候給個(gè)返回值
                exit(0);
        }
        else{
                //從這里開始寫的代碼都是在新的進(jìn)程里執(zhí)行的,同樣正常退出的話,最好也給一個(gè)返回值
                exit(0);
        }
}
?>

這樣一改好理解多了,如果你父進(jìn)程希望知道子進(jìn)程正常退出的話,可以加上前面的pcntl_wait。

 

 

2.通過(guò)stream_socket_client 方式

 

function sendStream() { 
    $english_format_number = number_format($number, 4, '.', ''); 
  
    echo $english_format_number;  
    exit(); 
    $timeout = 10; 
    $result = array(); 
    $sockets = array(); 
    $convenient_read_block = 8192; 
    $host = "test.local.com"; 
    $sql = "select waybill_id,order_id from xm_waybill where status>40 order by update_time desc limit 1 ";  
    $data = Yii::app()->db->createCommand($sql)->queryAll(); 
    $id = 0; 
  
    foreach ($data as $k => $v) { 
      if ($k % 2 == 0) { 
        $send_data[$k]['body'] = NoticeOrder::getSendData($v['waybill_id']); 
  
      } else { 
        $send_data[$k]['body'] = array($v['order_id'] => array('extra' => 16));  
      }  
      $data = json_encode($send_data[$k]['body']); 
      $s = stream_socket_client($host . ":80", $errno, $errstr, $timeout, STREAM_CLIENT_ASYNC_CONNECT | STREAM_CLIENT_CONNECT); 
      if ($s) {  
        $sockets[$id++] = $s; 
        $http_message = "GET /php/test.php?data=" . $data . " HTTP/1.0\r\nHost:" . $host . "\r\n\r\n";  
        fwrite($s, $http_message); 
      } else {  
        echo "Stream " . $id . " failed to open correctly."; 
      }  
    } 
  
    while (count($sockets)) { 
  
      $read = $sockets; 
  
      stream_select($read, $w = null, $e = null, $timeout); 
       if (count($read)) {  
        /* stream_select generally shuffles $read, so we need to 
         compute from which socket(s) we're reading. */
        foreach ($read as $r) { 
  
          $id = array_search($r, $sockets); 
          $data = fread($r, $convenient_read_block); 
          if (strlen($data) == 0) { 
            echo "Stream " . $id . " closes at " . date('h:i:s') . ".<br>  "; 
            fclose($r); 
             unset($sockets[$id]); 
          } else { 
            $result[$id] = $data; 
          } 
        } 
      } else {  
        /* A time-out means that *all* streams have failed 
         to receive a response. */
        echo "Time-out!\n"; 
        break; 
      }  
    }  
    print_r($result); 
  
  }

 

 

3、通過(guò)多進(jìn)程代替多線程

 

function daemon($func_name,$args,$number){ 
  while(true){ 
    $pid=pcntl_fork(); 
    if($pid==-1){ 
      echo "fork process fail"; 
      exit(); 
    }elseif($pid){//創(chuàng)建的子進(jìn)程 
  
      static $num=0; 
      $num++; 
      if($num>=$number){ 
        //當(dāng)進(jìn)程數(shù)量達(dá)到一定數(shù)量時(shí)候,就對(duì)子進(jìn)程進(jìn)行回收。 
        pcntl_wait($status); 
  
        $num--; 
      }  
    }else{ //為0 則代表是子進(jìn)程創(chuàng)建的,則直接進(jìn)入工作狀態(tài) 
  
      if(function_exists($func_name)){ 
        while (true) { 
          $ppid=posix_getpid(); 
          var_dump($ppid); 
          call_user_func_array($func_name,$args); 
          sleep(2); 
        } 
      }else{ 
        echo "function is not exists"; 
      } 
      exit();   
    } 
  } 
}  
function worker($args){  
  //do something 
  
}  
daemon('worker',array(1),2); 

 

 

二、真正實(shí)現(xiàn)php多線程的方法

php真正的多線程實(shí)現(xiàn)方式,通過(guò)安裝php的擴(kuò)展 pthread 可以做到。
 
 
但是這個(gè)下載的是 版本3 也就是php 7 才能用的,我們需要使的是 版本2
 
php擴(kuò)展

 
然后刷新的頁(yè)面如下,拖到最底部:

php多線程
 
下載文件

下一頁(yè)找到版本2的

下載下來(lái),這個(gè)v2 才是php5才可以使用的

下載下來(lái),安裝:

或者,您直接這樣下載:

cd /tools  
   wget https://github.com/krakjoe/pthreads/archive/v2.0.10.zip  
   unzip   v2.0.10.zip  
   cd pthreads-2.0.10  
   /usr/local/php/bin/phpize  
   ./configure --with-php-config=/usr/local/php/bin/php-config    
   make  
   make install
注意:您的php 在編譯的時(shí)候需要開啟 –enable-maintainer-zts
./configure --prefix=/usr/local/php --disable-fileinfo   --enable-fpm --with-config-file-path=/etc --with-config-file-scan-dir=/etc/php.d --with-openssl --with-zlib --with-curl --enable-ftp --with-gd --with-xmlrpc  --with-jpeg-dir --with-png-dir --with-freetype-dir --enable-gd-native-ttf --enable-mbstring --with-mcrypt=/usr/local/libmcrypt --enable-zip --with-mysql=/usr/local/mysql --without-pear --enable-maintainer-zts 
vim /etc/php.ini 
添加
extension=pthreads.so
重啟php  
/etc/init.d/php-fpm restart
 

如對(duì)本文有疑問(wèn),請(qǐng)?zhí)峤坏浇涣髡搲?,廣大熱心網(wǎng)友會(huì)為你解答??! 點(diǎn)擊進(jìn)入論壇

發(fā)表評(píng)論 (530人查看,0條評(píng)論)
請(qǐng)自覺遵守互聯(lián)網(wǎng)相關(guān)的政策法規(guī),嚴(yán)禁發(fā)布色情、暴力、反動(dòng)的言論。
昵稱:
最新評(píng)論
------分隔線----------------------------

其它欄目

· 建站教程
· 365學(xué)習(xí)

業(yè)務(wù)咨詢

· 技術(shù)支持
· 服務(wù)時(shí)間:9:00-18:00
365建站網(wǎng)二維碼

Powered by 365建站網(wǎng) RSS地圖 HTML地圖

copyright © 2013-2024 版權(quán)所有 鄂ICP備17013400號(hào)