技术栈

主页 > 后端开发 >

基于swoole与php协程实现异步非阻塞IO

本文由neuSnail在segmentfault发表,未经允许严禁转载,原文地址

本文协程调度的实现参考了有赞的zanphp实现:http://zanphp.io/

自己写了一个简单的swoole+php协程的框架:https://github.com/neuSnail/Pcs
有兴趣的可以看看,很不成熟欢迎指教。

Pcs是我参考zanphp做的毕业设计,和zan不同的是zan自己写了一个zan拓展代替swoole来实现eventloop等swoole没有的功能。而pcs选择继续使用swoole,使用异步task来实现异步,对比zan复杂度较低杂易于理解。


关于什么是协程以及php基于generator的协程是怎么实现的这里不做详细解释,不了解的同学可以参考laruence的这篇文章:http://www.laruence.com/2015/...

在许多文章中可以看到这样的描述:

    “协程可以在遇到阻塞的时候中断主动让渡资源,调度程序选择其他的协程运行。从而实现非阻塞IO”

然而php是不支持原生协程的,遇到阻塞时如不交由异步进程来执行是没有任何意义的,代码还是同步执行的,如下所示:

function foo()
{
    $db=new Db();
    $result=(yield $db->query());
    yield $result;
}

上面的数据库查询操作是阻塞的,当调度器调度该协程到这一步时发现执行了阻塞操作,此时调度器该怎么办?选择其余协程执行?那该协程的阻塞操作又该何时执行,交由谁执行呢?所以说在php协程中抛开异步调用谈非阻塞IO属于耍流氓。
而swoole的异步task提供了一个实现异步的解决方案,关于swoole_task可以参考官方文档https://wiki.swoole.com/wiki/...

核心功能实现

首先创建一个swoole_server并设置回调

class HttpServer implements Server
{
    private $swooleHttpServer;

    public function __construct(swoole_http_server $swooleHttpServer)
    {
        $this->swooleHttpServer = $swooleHttpServer;
    }

    public function start()
    {
        $this->swooleHttpServer->on('start', [$this, 'onStart']);
        $this->swooleHttpServer->on('shutdown', [$this, 'onShutdown']);

        $this->swooleHttpServer->on('workerStart', [$this, 'onWorkerStart']);
        $this->swooleHttpServer->on('workerStop', [$this, 'onWorkerStop']);
        $this->swooleHttpServer->on('workerError', [$this, 'onWorkerError']);
        $this->swooleHttpServer->on('task', [$this, 'onTask']);
        $this->swooleHttpServer->on('finish', [$this, 'onFinish']);


        $this->swooleHttpServer->on('request', [$this, 'onRequest']);

        $this->swooleHttpServer->start();
    }

onRequest方法:

 public function onRequest(swoole_http_request $request, swoole_http_response $response)
    {
        $requestHandler = new RequestHandler($request, $response);
        $requestHandler->handle();
    }

在ReqeustHandler中执行handle方法,来解析请求的路由,并创建控制器,调用相应的方法,x相关实现这里不再赘述。

 public function handle()
    {
        $this->context = new Context($this->request, $this->response, $this->getFd());
        $this->router = new Router($this->request);

        try {
            if (false === $this->router->parse()) {
                $this->response->output('');
                return;
            }
            $coroutine = $this->doRun();
            $task = new Task($coroutine, $this->context);
            $task->run();
        } catch (Exception $e) {
            PcsExceptionHandler::handle($e, $this->response);
        }
    }
    
 private function doRun()
    {
        $ret = (yield $this->dispatch());
        yield $this->response->send($ret);
    }

上面代码中的$coroutine就是一次请求封装成的协程,doRun方法中的$ret是$controller->$action()的调用结果,yield $this->response->send($ret);是向对客户端请求的应答。

$coroutine是这一次请求形成的一个协程(Genetator对象),包含了整个请求的流程,接下来就要对这个协程进行调度来获取真正的执行结果。

namespace PcsCoroutine;

use PcsNetworkContextContext;

class Task
{
    private $coroutine;
    private $context;
    private $status;
    private $scheduler;
    private $sendValue;

    public function __construct(Generator $coroutine, Context $context)
    {
        $this->coroutine = $coroutine;
        $this->context = $context;
        $this->scheduler = new Scheduler($this);

    }

    public function run()
    {
        while (true) {
            try {
                $this->status = $this->scheduler->schedule();
                switch ($this->status) {
                    case TaskStatus::TASK_WAIT:
                        echo "task status: TASK_WAIT
";
                        return null;

                    case TaskStatus::TASK_DONE:
                        echo "task status: TASK_DONE
";
                        return null;

                    case TaskStatus::TASK_CONTINUE;
                        echo "task status: TASK_CONTINUE
";
                        break;
                }

            } catch (Exception $e) {
                $this->scheduler->throwException($e);
            }
        }
    }
    public function setCoroutine($coroutine)
    {
        $this->coroutine = $coroutine;
    }

    public function getCoroutine()
    {
        return $this->coroutine;
    }

    public function valid()
    {
        if ($this->coroutine->valid()) {
            return true;
        } else {
            return false;
        }
    }

    public function send($value)
    {
        $this->sendValue = $value;
        $ret = $this->coroutine->send($value);
        return $ret;
    }

    public function getSendVal()
    {
        return $this->sendValue;
    }
}

Task依赖于Generator对象$coroutine,在Task类中定义了一些get/set方法,以及一些Generator的方法,Task::run()方法用来执行对协程的调度,调度行为由Schedule来执行,每次调度都会返回当前这次调度的状态。laruence的文章以及大部分网上的资料都是多个协程共用一个调度器,而这里run方法会为每个协程创建一个调度器,原因是每个协程都是一个客户端的请求,使用一个单独的调度器能减少相互间的影响,而且多个协程之间的调度顺序是swoole来处理的,这里的调度器不用关心。下面给出调度的代码:

<?php
namespace PcsCoroutine;

class Scheduler
{
    private $task;
    private $stack;
    const SCHEDULE_CONTINUE = 10;

    public function __construct(Task $task)
    {
        $this->task = $task;
        $this->stack = new SplStack();
    }
    
    public function schedule()
    {
        $coroutine = $this->task->getCoroutine();
        $value = $coroutine->current();

        $status = $this->handleSystemCall($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handleStackPush($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handleAsyncJob($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handelYieldValue($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handelStackPop();
        if ($status !== self::SCHEDULE_CONTINUE) return $status;


        return TaskStatus::TASK_DONE;
    }

    public function isStackEmpty()
    {
        return $this->stack->isEmpty();
    }

    private function handleSystemCall($value)
    {
        if (!$value instanceof SystemCall) {
            return self::SCHEDULE_CONTINUE;
        }
    }

    private function handleStackPush($value)
    {
        if (!$value instanceof Generator) {
            return self::SCHEDULE_CONTINUE;
        }

        $coroutine = $this->task->getCoroutine();
        $this->stack->push($coroutine);
        $this->task->setCoroutine($value);

        return TaskStatus::TASK_CONTINUE;
    }

    private function handleAsyncJob($value)
    {
        if (!is_subclass_of($value, Async::class)) {
            return self::SCHEDULE_CONTINUE;
        }

        $value->execute([$this, 'asyncCallback']);

        return TaskStatus::TASK_WAIT;
    }

    public function asyncCallback($response, $exception = null)
    {
        if ($exception !== null
            && $exception instanceof Exception
        ) {
            $this->throwException($exception, true);
        } else {
            $this->task->send($response);
            $this->task->run();
        }
    }

    private function handelYieldValue($value)
    {
        if (!$this->task->valid()) {
            return self::SCHEDULE_CONTINUE;
        }

        $ret = $this->task->send($value);
        return TaskStatus::TASK_CONTINUE;
    }


    private function handelStackPop()
    {
        if ($this->isStackEmpty()) {
            return self::SCHEDULE_CONTINUE;
        }

        $coroutine = $this->stack->pop();
        $this->task->setCoroutine($coroutine);

        $value = $this->task->getSendVal();
        $this->task->send($value);

        return TaskStatus::TASK_CONTINUE;
    }

    public function throwException($e, $isFirstCall = false)
    {
        if ($this->isStackEmpty()) {
            $this->task->getCoroutine()->throw($e);
            return;
        }

        try {
            if ($isFirstCall) {
                $coroutine = $this->task->getCoroutine();
            } else {
                $coroutine = $this->stack->pop();
            }

            $this->task->setCoroutine($coroutine);
            $coroutine->throw($e);

            $this->task->run();
        } catch (Exception $e) {
            $this->throwException($e);
        }
    }
}

Scheduler中的schedule方法会获取当前Task的协程,并通过current()方法获取当前中断点的返回值,接着依次调用5个方法来对返回值进行处理。
1:handleSystemCall
如果返回的值是SystemCall类型的对象,则执行系统调用,如killTask之类的操作,systemCall是第一优先级。
2:handleStackPush
在A函数中调用B函数,则B函数称为A函数的子例程(子函数),然而在协程中却不能像普通函数那样调用。

<img src="https://www.baidu.com/img/bd_... width="400" height="100"/>

3:handleAsyncJob
4:handleYieldValue
5:handleStackPush

责任编辑:admin  二维码分享:
本文标签: coroutine异步iophpswoole协程
点击我更换图片

评论列表