1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | <?php /** * Created by PhpStorm. * User: LeoKim * Date: 2017/5/13 * Time: 9:10 */ class BaseProcess{ private $process ; private $process_list = []; private $process_use = []; private $min_worker_num = 3; private $max_worker_num = 6; private $current_num ; public function __construct() { $this ->process = new swoole_process( array ( $this , 'run' ), false, 2); $this ->process->start(); swoole_process::wait(); } public function run( $worker ) { $this ->current_num = $this ->min_worker_num; //创建初始进程 for ( $i =0; $i < $this ->current_num; $i ++){ $process = new swoole_process( array ( $this , 'task_run' ), false, 2); $pid = $process ->start(); echo $pid . ': 我被初始创建了.' . date ( 'H:i:s' ).PHP_EOL; $this ->process_list[ $pid ] = $process ; $this ->process_use[ $pid ] = 0; } foreach ( $this ->process_list as $process ){ //pipe管道被读操作的时候执行闭包的function? $this ->bind_set_empty( $process ); } swoole_timer_tick(1000, function ( $timer_id ){ static $index =0; $index = $index +1; $flag = true; //我们在前面定义过 当pid对应的值为0的时候表示该进程现在空闲 foreach ( $this ->process_use as $pid => $used ){ if ( $used == 0){ $flag = false; //我们要使用空闲的进程,把进程标记成工作状态 $this ->process_use[ $pid ] = 1; $this ->process_list[ $pid ]->write( $index . " Hi 我开始工作了." ); break ; } } //如果没有进程是空闲的, 那么检查进程是否超过最大值,没超过的话创建新的进程 if ( $flag && $this ->current_num < $this ->max_worker_num) { $process = new swoole_process( array ( $this , 'task_run' ), false, 2); $pid = $process ->start(); $this ->process_list[ $pid ] = $process ; $this ->process_use[ $pid ] = 1 ; $this ->process_list[ $pid ]->write( $index . " Hi 我是新来的,我开始工作了." ); $this ->current_num++; $this ->bind_set_empty( $process ); } //执行n次退出 if ( $index ==20){ foreach ( $this ->process_list as $process ){ foreach ( $this ->process_list as $process ){ $process ->write( "任务完毕 我退出了." ); } swoole_timer_clear( $timer_id ); $this ->process-> exit (); } } }); } //进程在创建的时候被执行 public function task_run( $worker ) { //为每个进程绑定回调,当进程执行write的时候触发 swoole_event_add( $worker ->pipe, function ( $pipe ) use ( $worker ){ $data = $worker ->read(); var_dump( $worker ->pid. ": " . $data . ' -- ' . date ( 'H:i:s' )); echo PHP_EOL; if ( $data == '任务完毕 我退出了.' ) { $worker -> exit (); exit ; } sleep(5); //当worker进程执行write的时候会出发line:43的回调函数 //把进程标记为空闲 $worker ->write( $worker ->pid); }); } public function bind_set_empty( $worker ){ swoole_event_add( $worker ->pipe, function ( $pipe ) use ( $worker ){ $pid = $worker ->read(); echo $pid . ' 报告,我处理完了之前的任务现在空下来了.' . date ( 'H:i:s' ).PHP_EOL; $this ->process_use[ $pid ] = 0; }); } } new BaseProcess(); |