swoole 动态进程池

<?php
/**
 * Created by PhpStorm.
 * User: LeoKim
 * Date: 2017/5/13
 * Time: 9:10
 */

class BaseProcess{
    private $process;

    private $process_list = [];
    private $process_use = [];
    private $min_worker_num = 3;
    private $max_worker_num = 6;

    private $current_num;

    public function __construct()
    {
        $this->process = new swoole_process(array($this, 'run'), false, 2);
        $this->process->start();

        swoole_process::wait();
    }

    public function run($worker)
    {
        $this->current_num = $this->min_worker_num;

        //创建初始进程
        for($i=0; $i < $this->current_num; $i++){
            $process = new swoole_process(array($this, 'task_run'), false, 2);
            $pid = $process->start();

            echo $pid.': 我被初始创建了.'.date('H:i:s').PHP_EOL;
            $this->process_list[$pid] = $process;
            $this->process_use[$pid] = 0;
        }

        foreach($this->process_list as $process){
            //pipe管道被读操作的时候执行闭包的function?
            $this->bind_set_empty($process);
        }

        swoole_timer_tick(1000, function($timer_id){
            static $index=0;
            $index = $index+1;
            $flag = true;

            //我们在前面定义过 当pid对应的值为0的时候表示该进程现在空闲
            foreach($this->process_use as $pid => $used){
                if($used == 0){
                    $flag = false;
                    //我们要使用空闲的进程,把进程标记成工作状态
                    $this->process_use[$pid] = 1;
                    $this->process_list[$pid]->write($index." Hi 我开始工作了.");
                    break;
                }
            }

            //如果没有进程是空闲的, 那么检查进程是否超过最大值,没超过的话创建新的进程
            if($flag && $this->current_num < $this->max_worker_num)
            {
                $process = new swoole_process(array($this, 'task_run'), false, 2);
                $pid = $process->start();
                $this->process_list[$pid] = $process;
                $this->process_use[$pid] = 1 ;
                $this->process_list[$pid]->write($index." Hi 我是新来的,我开始工作了.");
                $this->current_num++;

                $this->bind_set_empty($process);
            }

            //执行n次退出
            if($index==20){
                foreach($this->process_list as $process){
                    foreach($this->process_list as $process){
                        $process->write("任务完毕 我退出了.");
                    }
                    swoole_timer_clear($timer_id);
                    $this->process->exit();
                }
            }
        });
    }

    //进程在创建的时候被执行
    public function task_run($worker)
    {
        //为每个进程绑定回调,当进程执行write的时候触发
        swoole_event_add($worker->pipe, function($pipe) use($worker){
            $data = $worker->read();
            var_dump($worker->pid.": ".$data.' -- '.date('H:i:s'));echo PHP_EOL;
            if($data == '任务完毕 我退出了.')
            {
                $worker->exit();
                exit;
            }

            sleep(5);

            //当worker进程执行write的时候会出发line:43的回调函数
            //把进程标记为空闲
            $worker->write($worker->pid);
        });
    }

    public function bind_set_empty($worker){
        swoole_event_add($worker->pipe, function($pipe) use($worker){
            $pid= $worker->read();
            echo $pid.' 报告,我处理完了之前的任务现在空下来了.'.date('H:i:s').PHP_EOL;
            $this->process_use[$pid] =  0;
        });
    }
}

new BaseProcess();