标题:PHP curl实现多进程并发高效率采集爬虫 出处:沧海一粟 时间:Tue, 13 Dec 2016 15:44:58 +0000 作者:jed 地址:http://www.dzhope.com/post/1029/ 内容: PHP curl实现多进程并发抓取数据我们经常的用到了,今天我们来看一篇关于PHP curl实现多进程并发高效率采集爬虫的例子,具体的细节如下。 主要封装函数 multi_process(); 根据参数,创建指点数目的子进程。 亮点功能1:子进程各种异常退出,比如segment fault, Allowed memory size exhausted等,中断一个子进程后,父进程会重新fork一个新进程顶上去,保持子进程数量。如果子进程里完成任务(比如判断tid达到10000),可以在子进程里exit(9),父进程会收到这个退出状态(9),然后等待所有子进程退出,最后退出自身进程。 亮点功能2:与curl封装函数一起实现了一统计功能,在程序关闭后会显示出一些主要的统计信息(图2的底部)。 mp_counter(); 在父进程以及所有子进程之间通信,负责协调分配各子进程的任务,使用了锁机制。可以设置’init’参数重置计数,可以设置每次更新计数的值。 curl_get(); 对curl相关函数的封装,加入了大量的错误机制,支持POST,GET,Cookie,Proxy,下载。 mp_msg(); 实现规范之一就是,每条任务处理完,只输出一行信息。 亮点功能:这个函数会判断终端的高度和宽度,实现每一屏内容会显示一条统计信息(图1的紫色行),便于观察程序的执行情况,控制每一行输出的长度,保持一条信息不会超过一行。 rand_exit(); 众所周知,PHP存在内在泄露的问题,所以每一个子进程里执行一定次数的任务后就退出,由multi_process()负责自动建立新的子进程(如图1中的绿色行)。 curl.lib.php 'web_domain', 'e.hiphotos.baidu.com'=>'http://hi.baidu.com/'); */ $referer_config = array('img1.51cto.com'=>'blog.51cto.com', '360doc.com'=>'www.360doc.com'); /* 针对指定域名设置User-agent,优先于$curl_default_config 默认使用百度蜘蛛的UA,拒绝百度UA的网站极少 eg: $useragent_config = array( 'web_domain'=>'user agent', 'www.xxx.com'=>'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0)'); */ $useragent_config = array('hiphotos.baidu.com'=>'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0)'; /* * 如果机器有多个IP地址,可以改变默认的出口IP,每次调用会在数组中随机选择一个。考虑到可能会有需要排除的IP,所以这里不自动配置为所有的IP。 * eg: $curl_ip_config = array('11.11.11.11', '22.22.22.22'); */ $local_ip_config = array(); // cookie和临时文件目录 if((@file_exists('/dev/shm/') && @is_writable('/dev/shm/'))){ $cookie_dir = $tmpfile_dir = '/dev/shm/'; }else{ $cookie_dir = $tmpfile_dir = '/tmp/'; } // 清除过期的cookie文件和下载临时文件 if(php_sapi_name() == 'cli'){ clear_curl_file(); } /** * GET方式抓取网页 * * @param string $url 网页URL地址 * @param string $encode 返回的页面编码,默认为GBK,设置为空值则不转换 * @return string 网页HTML内容 */ function curl_get($url, $encode='gbk'){ return curl_func($url, 'GET', null, null, null, $encode); } /** * POST方式请求网页 * * @param string $url 请求的URL地址 * @param array $data 发送的POST数据 * @param string $encode 返回的页面编码,默认为GBK,设置为空值则不转换 * @return bool */ function curl_post($url, $data, $encode='gbk'){ return curl_func($url, 'POST', $data, null, null, $encode); } /** * 获取页面的HEADER信息 * * HTTP状态码并不是以“名称:值”的形式返回,这里以http_code作为它的名称,其他的值都有固定的名称,并且转成小写 * * @param string $url URL地址 * @return array 返回HEADER数组 */ function curl_header($url, $follow=true){ $header_text = curl_func($url, 'HEADER'); if(!$header_text){ // 获取HTTP头失败 return FALSE; } $header_array =explode("\r\n\r\n", trim($header_text)); if($follow){ $last_header = array_pop($header_array); }else{ $last_header = array_shift($header_array); } $lines = explode("\n", trim($last_header)); // 处理状态码 $status_line = trim(array_shift($lines)); preg_match("/(\d\d\d)/", $status_line, $preg); if(!empty($preg[1])){ $header['http_code'] = $preg[1]; }else{ $header['http_code'] = 0; } foreach ($lines as $line) { list($key, $val) = explode(':', $line, 2); $key = str_replace('-', '_', strtolower(trim($key))); $header[$key] = trim($val); } return $header; } /** * 下载文件 * * @param $url 文件地址 * @param $path 保存到的本地路径 * @return bool 下载是否成功 */ function curl_down($url, $path, $data=null, $proxy=null){ if(empty($data)){ $method = 'GET'; }else{ $method = 'POST'; } return curl_func($url, $method, $data, $path, $proxy); } /** * 使用代理发起GET请求 * * @param string $url 请求的URL地址 * @param string $proxy 代理地址 * @param string $encode 返回编码 * * @return string 网页内容 */ function curl_get_by_proxy($url, $proxy, $encode='gbk'){ return curl_func($url, 'GET', null, null, $proxy, $encode); } /** * 使用代理发起POST请求 * * @param string $url 请求的URL地址 * @param string $proxy 代理地址 * @param string $encode 返回编码 * * @return string 网页内容 */ function curl_post_by_proxy($url, $data, $proxy, $encode='gbk'){ return curl_func($url, 'POST', $data, null, $proxy, $encode); } /** * @param string $url 请求的URL地址 * @param string $encode 返回编码 * * @return string 网页内容 */ function img_down($url, $path_pre){ $img_tmp = '/tmp/curl_imgtmp_pid_'.getmypid(); $res = curl_down($url, $img_tmp); if(empty($res)){ return $res; } $ext = get_img_ext($img_tmp); if(empty($ext)){ return NULL; } $path = "{$path_pre}.{$ext}"; @mkdir(dirname($path), 0777, TRUE); // 转移临时的文件路径 rename($img_tmp, $path); return $path; } function get_img_ext($path){ $types = array( 1 => 'gif', 2 => 'jpg', 3 => 'png', 6 => 'bmp' ); $info = @getimagesize($path); if(isset($types[$info[2]])){ $ext = $info['type'] = $types[$info[2]]; $ext == 'jpeg' && $ext = 'jpg'; } else{ $ext = FALSE; } return $ext; } /** * 获取文件类型 * * @param string $filepath 文件路径 * @return array 返回数组,格式为array($type, $ext) */ function get_file_type($filepath){ } /** * 返回文件的大小,用于下载文件后判断与本地文件大小是否相同 * curl_getinfo()方式获得的size_download并不一定是文件的真实大小 * * @param string $url URL地址 * @return string 网络文件的大小 */ function get_file_size($url){ $header = curl_header($url); if(!empty($header['content_length'])){ return $header['content_length']; }else{ return FALSE; } } /** * 获取状态码 * * @param string $url URL地址 * @return string 状态码 */ function get_http_code($url, $follow=true){ $header = curl_header($url, $follow); if(!empty($header['http_code'])){ return $header['http_code']; }else{ return FALSE; } } /** * 获取URL文件后缀 * * @param string $url URL地址 * @return array 文件类型的后缀 */ function curl_get_ext($url){ $header = curl_header($url); if(!empty($header['content_type'])){ @list($type, $ext) = @explode('/', $header['content_type']); if(!empty($type) && !empty($ext)){ return array($type, $ext); }else{ return array('', ''); } }else{ return array('', ''); } } /** * 封装curl操作 * * @param string $url 请求的URL地址 * @param string $method 请求的方法(POST, GET, HEADER, DOWN) * @param mix $arg POST方式为POST数据,DOWN方式时为下载保存的路径 * @param string $return_encode 网页返回的编码 * @param string $proxy 代理 * @return mix 返回内容。4xx序列错误和空白页面会返回NULL,curl抓取错误返回False。结果正常则返回页面内容。 */ // 待改进,下载到临时文件,下载成功后再转移(已经有文件则覆盖),下载失败则删除。 // 待改进,参数形式改成curl_func($url, $method, $data=null, savepath=null, $proxy=null, $return_encode='gbk') function curl_func($url, $method, $data=null, $savepath=null, $proxy=null, $return_encode=null){ global $colors, $cookie_dir, $tmpfile_dir, $referer_config, $useragent_config, $local_ip_config, $curl_config; // 控制台输出颜色 extract($colors); // 去除URL中的/../ $url = get_absolute_path($url); // 去除实体转码 $url = htmlspecialchars_decode($url); // 统计数据 if(function_exists('mp_counter')){ if(!empty($savepath)){ mp_counter('down_total'); // 下载次数计数 }elseif($method == 'HEADER'){ mp_counter('header_total'); // 抓取HTTP头次数计数 }else{ mp_counter('fetch_total'); // 抓取网页次数计数 } } for($i = 0; $i < curl_config_get('retry'); $i ){ // 初始化 $ch = curl_init(); curl_setopt($ch, CURLOPT_URL, $url); // 设置超时 curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, curl_config_get('conntimeout')); // 连接超时 if(empty($savepath)){ curl_setopt($ch, CURLOPT_TIMEOUT, curl_config_get('fetchtimeout')); // 抓取网页(包括HEADER)超时 }else{ curl_setopt($ch, CURLOPT_TIMEOUT, curl_config_get('downtimeout')); // 下载文件超时 } // 接收网页内容到变量 curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE); // 忽略SSL验证 curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 0); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, 0); // 设置referer, 在文件里配置的优先级最高 foreach($referer_config as $domain=>$ref){ if(stripos($url, $domain) !== FALSE){ $referer = $ref; break; } } // 检查是否有通过curl_set_referer()设置referer if(empty($referer) && !empty($curl_config[getmypid()]['referer'])){ $referer = $curl_config[getmypid()]['referer']; } if(!empty($referer)){ curl_setopt($ch, CURLOPT_REFERER, $referer); } // 设置HTTP请求标识,在文件里配置的优先级最高 foreach($useragent_config as $domain=>$ua){ if(stripos($url, $domain) !== FALSE){ $useragent = $ua; break; } } // 检查是否有通过curl_set_ua()设置useragent if(empty($useragent)){ $useragent = curl_config_get('ua'); } curl_setopt($ch, CURLOPT_USERAGENT, $useragent); // 出口IP if(!empty($local_ip_config)){ curl_setopt($ch, CURLOPT_INTERFACE, $local_ip_config[array_rand($local_ip_config)]); } // 设置代理 if(!empty($proxy)){ curl_setopt($ch, CURLOPT_PROXY, $proxy); curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5); } // 设置允许接收gzip压缩数据,以及解压,抓取HEADER时不使用(获取不到正确的文件大小,影响判断下载成功) if($method != 'HEADER') { curl_setopt($ch, CURLOPT_HTTPHEADER, array('Accept-Encoding: gzip, deflate')); curl_setopt($ch, CURLOPT_ENCODING, ""); } // 遇到301和302转向自动跳转继续抓取,如果用于WEB程序并且设置了open_basedir,这个选项无效 @curl_setopt($ch, CURLOPT_FOLLOWLOCATION, TRUE); // 最大转向次数,避免进入到死循环 curl_setopt($ch, CURLOPT_MAXREDIRS, 5); // 启用cookie $cookie_path = $cookie_dir . 'curl_cookie_pid_' . get_ppid(); curl_setopt($ch, CURLOPT_COOKIEFILE, $cookie_path); curl_setopt($ch, CURLOPT_COOKIEJAR, $cookie_path); // 设置post参数内容 if($method == 'POST'){ curl_setopt($ch, CURLOPT_HEADER, 0); curl_setopt($ch, CURLOPT_POSTFIELDS, $data); } // 设置用于下载的参数 if(!empty($savepath)){ $tmpfile = $tmpfile_dir . '/curl_tmpfile_pid_'.getmypid(); file_exists($tmpfile) && unlink($tmpfile); $fp = fopen($tmpfile, 'w'); curl_setopt($ch, CURLOPT_FILE, $fp); } // 仅获取header if($method == 'HEADER'){ curl_setopt($ch, CURLOPT_NOBODY, TRUE); curl_setopt($ch, CURLOPT_HEADER, TRUE); } // 抓取结果 $curl_res = curl_exec($ch); // curl info $info = curl_getinfo($ch); // 调试curl时间,记录连接时间,等待时间,传输时间,总时间。 // 测试方法,任何输出前设置sleep,输出中间设置sleep /* foreach($info as $key=>$val){ echo "$key:$val\n"; } exit(9); */ // 错误信息 $error_msg = curl_error($ch); $error_no = curl_errno($ch); // 关闭CURL句柄 curl_close($ch); // 如果CURL有错误信息则判断为抓取失败,重试 if(!empty($error_no) || !empty($error_msg)){ $error_msg = "{$error_msg}($error_no)"; curl_msg($error_msg, $method, $url, 'yellow'); continue; } // 统计流量 if(function_exists('mp_counter')){ if(!empty($info['size_download']) && $info['size_download'] > 0){ mp_counter('download_total', $info['size_download']); } } // 对结果进行处理 if($method == 'HEADER'){ // 返回header信息 return $curl_res; }else{ // 最终的状态码 $status_code = $info['http_code']; if(in_array($status_code, array_merge(range(400, 417), array(500, 444)))){ // 非服务器故障性的错误,直接退出,返回NULL $error_msg = $status_code; if(!empty($savepath)){ $method = "{$method}|DOWN"; } curl_msg($error_msg, $method, $url, 'red'); return NULL; }if($status_code != 200){ // 防止网站502等临时错误,排除了上面的情况后,非200就重试。这一条规则需要后续根据情况来改进。 // curl执行过程中会自动跳转,这里不会出现301和302,除非跳转次数超过CURLOPT_MAXREDIRS的值 $error_msg = $status_code; curl_msg($error_msg, $method, $url, 'yellow'); continue; } if(empty($savepath)){ // 抓取页面 if(empty($curl_res)){ // 空白页面 $error_msg = "blank page"; // 返回NULL值,调用处注意判断 return NULL; }else{ // 默认将页面以GBK编码返回 // 分析页面编码 preg_match_all("/ "SIGINT", SIGHUP => "SIGHUP", SIGQUIT => "SIGQUIT" ); // 命令行颜色输出 $colors['red'] = "\33[31m"; $colors['green'] = "\33[32m"; $colors['yellow'] = "\33[33m"; $colors['end'] = "\33[0m"; $colors['reverse'] = "\33[7m"; $colors['purple'] = "\33[35m"; $colors['cyan'] = "\33[36m"; // 程序开始运行时间 $start_time = time(); // 父进程PID $fpid = getmypid(); // 文件保存目录,/dev/shm/是内存空间映射到硬盘上,IO速度快。 // 有些环境上可能会没有这个目录,比如OpenVZ的VPS,这个路径实际是在硬盘上 if(file_exists('/dev/shm/') && is_dir('/dev/shm/')){ $process_file_dir = '/dev/shm/'; }else{ $process_file_dir = '/tmp/'; } // 清理过期资源(文件和SEM信号锁),每次程序执行都需要调用,清除掉之前执行时的残留文件。 clear_process_resource(); // 判断是否在子进程中 function is_subprocess(){ global $fpid; if(getmypid() != $fpid){ return true; }else{ return false; } } /** * 多进程计数 * * 1,用于多进程运行时的任务分配与计数,比如要采集某DZ论坛的帖子,则可以将计数器用于/thread-tid-1-1.html中 * 的tid,实现进程间的协调工作 * 2,由于shm_*系列函数的操作不够灵活,所以这里主要用于/proc/和/dev/shm/这二个目录来实现数据的读写(内存操 * 作,不受硬盘IO性能影响),用semaphore信号来实现锁定和互斥机制 * 3,编译PHP时需要使用参数--enable-sysvmsg安装所需的模块 * * @param string $countername 计数器名称 * @param mix $update 计数器的更新值,如果是'init',计数器则被初始化为0 * @return int 返回计数 */ function mp_counter($countername, $update=1){ global $process_file_dir; $time = date('Y-m-d H:i:s'); // 父进程PID或者自身PID $top_pid = get_ppid(); // 系统启动时间 $sysuptime = get_sysuptime(); // 进程启动时间 $ppuptime = get_ppuptime($top_pid); // 由父进程ID确定变量文件路径前缀 $path_pre = "{$process_file_dir}mp_counter_{$countername}_pid_{$top_pid}_"; // 由于系统启动时间和当前父进程启动时间(jiffies格式)确定计数使用的文件 $cur_path = "{$path_pre}btime_{$sysuptime}_ptime_{$ppuptime}"; // 更新计数,先锁定 $lock = sem_lock(); if(!file_exists($cur_path)){ // 调试代码。个别系统上启动时间会变化,造成文件路径跟随变化,最终导致计数归0。 // $log = "[{$time}] - {$countername}($cur_path) - init\n"; // file_put_contents('/tmp/process.log', $log, FILE_APPEND); $counter = 0; }else{ // 理论上在这里,文件是一定存在的 $counter = file_get_contents($cur_path); } // 更新记数, 继续研究下判断init不能用== if($update === 'init'){ // 如果接收到更新值为init,或者变量文件不存在,则将计数初始化为0。 $new_counter = 0; }else{ $new_counter = $counter $update; } // 写入计数,解锁 file_put_contents($cur_path, $new_counter); sem_unlock($lock); return $new_counter; } /** * 创建多进程 * * 1,通过mp_counter()函数实现进程间的任务协调 * 2,由于PHP进程可能会由于异常而退出(主要是segment fault),并且由于处理内存泄露的问题需要子进程主动退出,本函数可以实现自动建立 * 新的进程,使子进程数量始终保持在$num的数量 * 3,编译PHP时需要使用参数--enable-pcntl安装所需的模块 * 4,如果在子进程中调用了exit(9),那么主进程和所有子进程都将退出 * * @param int $num 进程数量 * @param bool $stat 结束后是否输出统计信息 */ function multi_process($num, $stat=FALSE){ global $colors, $signals; extract($colors); if(empty($num)){ $num = 1; } // 记录进程数量,统计用 mp_counter('process_num', 'init'); mp_counter('process_num', $num); // 子进程数量 $child = 0; // 任务完成标识 $task_finish = FALSE; while(TRUE) { // 清空子进程退出状态 unset($status); // 如果任务未完成,并且子进程数量没有达到最高,则创建 if ($task_finish == FALSE && $child < $num) { $pid = pcntl_fork(); if ($pid) { // 有PID,这里是父进程 $child ; // 注册父进程的信号处理函数 if($stat){ foreach ($signals as $signal => $name) { if (!pcntl_signal($signal, "signal_handler")) { die("Install signal handler for {$name} failed"); Generated by Bo-blog 2.1.1 Release