swoole 动态进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<?php
/**
 * Created by PhpStorm.
 * User: LeoKim
 * Date: 2017/5/13
 * Time: 9:10
 */
 
class BaseProcess{
    private $process;
 
    private $process_list = [];
    private $process_use = [];
    private $min_worker_num = 3;
    private $max_worker_num = 6;
 
    private $current_num;
 
    public function __construct()
    {
        $this->process = new swoole_process(array($this'run'), false, 2);
        $this->process->start();
 
        swoole_process::wait();
    }
 
    public function run($worker)
    {
        $this->current_num = $this->min_worker_num;
 
        //创建初始进程
        for($i=0; $i $this->current_num; $i++){
            $process new swoole_process(array($this'task_run'), false, 2);
            $pid $process->start();
 
            echo $pid.': 我被初始创建了.'.date('H:i:s').PHP_EOL;
            $this->process_list[$pid] = $process;
            $this->process_use[$pid] = 0;
        }
 
        foreach($this->process_list as $process){
            //pipe管道被读操作的时候执行闭包的function?
            $this->bind_set_empty($process);
        }
 
        swoole_timer_tick(1000, function($timer_id){
            static $index=0;
            $index $index+1;
            $flag = true;
 
            //我们在前面定义过 当pid对应的值为0的时候表示该进程现在空闲
            foreach($this->process_use as $pid => $used){
                if($used == 0){
                    $flag = false;
                    //我们要使用空闲的进程,把进程标记成工作状态
                    $this->process_use[$pid] = 1;
                    $this->process_list[$pid]->write($index." Hi 我开始工作了.");
                    break;
                }
            }
 
            //如果没有进程是空闲的, 那么检查进程是否超过最大值,没超过的话创建新的进程
            if($flag && $this->current_num < $this->max_worker_num)
            {
                $process new swoole_process(array($this'task_run'), false, 2);
                $pid $process->start();
                $this->process_list[$pid] = $process;
                $this->process_use[$pid] = 1 ;
                $this->process_list[$pid]->write($index." Hi 我是新来的,我开始工作了.");
                $this->current_num++;
 
                $this->bind_set_empty($process);
            }
 
            //执行n次退出
            if($index==20){
                foreach($this->process_list as $process){
                    foreach($this->process_list as $process){
                        $process->write("任务完毕 我退出了.");
                    }
                    swoole_timer_clear($timer_id);
                    $this->process->exit();
                }
            }
        });
    }
 
    //进程在创建的时候被执行
    public function task_run($worker)
    {
        //为每个进程绑定回调,当进程执行write的时候触发
        swoole_event_add($worker->pipe, function($pipeuse($worker){
            $data $worker->read();
            var_dump($worker->pid.": ".$data.' -- '.date('H:i:s'));echo PHP_EOL;
            if($data == '任务完毕 我退出了.')
            {
                $worker->exit();
                exit;
            }
 
            sleep(5);
 
            //当worker进程执行write的时候会出发line:43的回调函数
            //把进程标记为空闲
            $worker->write($worker->pid);
        });
    }
 
    public function bind_set_empty($worker){
        swoole_event_add($worker->pipe, function($pipeuse($worker){
            $pid$worker->read();
            echo $pid.' 报告,我处理完了之前的任务现在空下来了.'.date('H:i:s').PHP_EOL;
            $this->process_use[$pid] =  0;
        });
    }
}
 
new BaseProcess();