diff --git a/.travis.yml b/.travis.yml index 0af783a9..4e946173 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,11 @@ cache: directories: - $HOME/.composer/cache/files +before_install: + - sudo add-apt-repository ppa:ondrej/php -y + - sudo apt-get update -q + - sudo apt-get install libuv1-dev || true + install: - ./travis-init.sh - composer install diff --git a/README.md b/README.md index 40cb822a..1d25bdac 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ single [`run()`](#run) call that is controlled by the user. * [ExtLibeventLoop](#extlibeventloop) * [ExtLibevLoop](#extlibevloop) * [ExtEvLoop](#extevloop) + * [ExtUvLoop](#extuvloop) * [LoopInterface](#loopinterface) * [run()](#run) * [stop()](#stop) @@ -208,6 +209,14 @@ provides an interface to `libev` library. This loop is known to work with PHP 5.4 through PHP 7+. +#### ExtUvLoop + +An `ext-uv` based event loop. + +This loop uses the [`uv` PECL extension](https://pecl.php.net/package/uv), that +provides an interface to `libuv` library. + +This loop is known to work with PHP 7+. #### ExtLibeventLoop diff --git a/composer.json b/composer.json index c8ff91ec..f6517df4 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,8 @@ }, "suggest": { "ext-event": "~1.0 for ExtEventLoop", - "ext-pcntl": "For signal handling support when using the StreamSelectLoop" + "ext-pcntl": "For signal handling support when using the StreamSelectLoop", + "ext-uv": "* for ExtUvLoop" }, "autoload": { "psr-4": { diff --git a/src/ExtUvLoop.php b/src/ExtUvLoop.php new file mode 100644 index 00000000..aade9943 --- /dev/null +++ b/src/ExtUvLoop.php @@ -0,0 +1,316 @@ +uv = \uv_loop_new(); + $this->futureTickQueue = new FutureTickQueue(); + $this->timers = new SplObjectStorage(); + $this->streamListener = $this->createStreamListener(); + $this->signals = new SignalsHandler(); + } + + /** + * Returns the underlying ext-uv event loop. (Internal ReactPHP use only.) + * + * @internal + * + * @return resource + */ + public function getUvLoop() + { + return $this->uv; + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, $listener) + { + if (isset($this->readStreams[(int) $stream])) { + return; + } + + $this->readStreams[(int) $stream] = $listener; + $this->addStream($stream); + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, $listener) + { + if (isset($this->writeStreams[(int) $stream])) { + return; + } + + $this->writeStreams[(int) $stream] = $listener; + $this->addStream($stream); + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + unset($this->readStreams[(int) $stream]); + $this->removeStream($stream); + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + unset($this->writeStreams[(int) $stream]); + $this->removeStream($stream); + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, $callback) + { + $timer = new Timer($interval, $callback, false); + + $that = $this; + $timers = $this->timers; + $callback = function () use ($timer, $timers, $that) { + \call_user_func($timer->getCallback(), $timer); + + if ($timers->contains($timer)) { + $that->cancelTimer($timer); + } + }; + + $event = \uv_timer_init($this->uv); + $this->timers->attach($timer, $event); + \uv_timer_start( + $event, + (int) ($interval * 1000) + 1, + 0, + $callback + ); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, $callback) + { + $timer = new Timer($interval, $callback, true); + + $callback = function () use ($timer) { + \call_user_func($timer->getCallback(), $timer); + }; + + $event = \uv_timer_init($this->uv); + $this->timers->attach($timer, $event); + \uv_timer_start( + $event, + (int) ($interval * 1000) + 1, + (int) ($interval * 1000) + 1, + $callback + ); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + if (isset($this->timers[$timer])) { + @\uv_timer_stop($this->timers[$timer]); + $this->timers->detach($timer); + } + } + + /** + * {@inheritdoc} + */ + public function futureTick($listener) + { + $this->futureTickQueue->add($listener); + } + + public function addSignal($signal, $listener) + { + $this->signals->add($signal, $listener); + + if (!isset($this->signalEvents[$signal])) { + $signals = $this->signals; + $this->signalEvents[$signal] = \uv_signal_init($this->uv); + \uv_signal_start($this->signalEvents[$signal], function () use ($signals, $signal) { + $signals->call($signal); + }, $signal); + } + } + + public function removeSignal($signal, $listener) + { + $this->signals->remove($signal, $listener); + + if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) { + \uv_signal_stop($this->signalEvents[$signal]); + unset($this->signalEvents[$signal]); + } + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->futureTickQueue->tick(); + + $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); + $wasJustStopped = !$this->running; + $nothingLeftToDo = !$this->readStreams + && !$this->writeStreams + && !$this->timers->count() + && $this->signals->isEmpty(); + + // Use UV::RUN_ONCE when there are only I/O events active in the loop and block until one of those triggers, + // otherwise use UV::RUN_NOWAIT. + // @link http://docs.libuv.org/en/v1.x/loop.html#c.uv_run + $flags = \UV::RUN_ONCE; + if ($wasJustStopped || $hasPendingCallbacks) { + $flags = \UV::RUN_NOWAIT; + } elseif ($nothingLeftToDo) { + break; + } + + \uv_run($this->uv, $flags); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } + + private function addStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + $this->streamEvents[(int)$stream] = \uv_poll_init_socket($this->uv, $stream); + } + + if ($this->streamEvents[(int) $stream] !== false) { + $this->pollStream($stream); + } + } + + private function removeStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + if (!isset($this->readStreams[(int) $stream]) + && !isset($this->writeStreams[(int) $stream])) { + \uv_poll_stop($this->streamEvents[(int) $stream]); + \uv_close($this->streamEvents[(int) $stream]); + unset($this->streamEvents[(int) $stream]); + return; + } + + $this->pollStream($stream); + } + + private function pollStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + $flags = 0; + if (isset($this->readStreams[(int) $stream])) { + $flags |= \UV::READABLE; + } + + if (isset($this->writeStreams[(int) $stream])) { + $flags |= \UV::WRITABLE; + } + + \uv_poll_start($this->streamEvents[(int) $stream], $flags, $this->streamListener); + } + + /** + * Create a stream listener + * + * @return callable Returns a callback + */ + private function createStreamListener() + { + $callback = function ($event, $status, $events, $stream) { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + if (($events | 4) === 4) { + // Disconnected + return; + } + + if (isset($this->readStreams[(int) $stream]) && ($events & \UV::READABLE)) { + \call_user_func($this->readStreams[(int) $stream], $stream); + } + + if (isset($this->writeStreams[(int) $stream]) && ($events & \UV::WRITABLE)) { + \call_user_func($this->writeStreams[(int) $stream], $stream); + } + }; + + return $callback; + } +} diff --git a/src/Factory.php b/src/Factory.php index 763c077b..d1767bf8 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -24,14 +24,17 @@ final class Factory public static function create() { // @codeCoverageIgnoreStart - if (\class_exists('libev\EventLoop', false)) { + if (\function_exists('uv_loop_new')) { + // only use ext-uv on PHP 7 + return new ExtUvLoop(); + } elseif (\class_exists('libev\EventLoop', false)) { return new ExtLibevLoop(); } elseif (\class_exists('EvLoop', false)) { return new ExtEvLoop(); } elseif (\class_exists('EventBase', false)) { return new ExtEventLoop(); - } elseif (\function_exists('event_base_new') && \PHP_VERSION_ID < 70000) { - // only use ext-libevent on PHP < 7 for now + } elseif (\function_exists('event_base_new') && \PHP_MAJOR_VERSION === 5) { + // only use ext-libevent on PHP 5 for now return new ExtLibeventLoop(); } diff --git a/tests/ExtUvLoopTest.php b/tests/ExtUvLoopTest.php new file mode 100644 index 00000000..61a94a9f --- /dev/null +++ b/tests/ExtUvLoopTest.php @@ -0,0 +1,17 @@ +markTestSkipped('uv tests skipped because ext-uv is not installed.'); + } + + return new ExtUvLoop(); + } +} diff --git a/tests/Timer/ExtUvTimerTest.php b/tests/Timer/ExtUvTimerTest.php new file mode 100644 index 00000000..e0c70233 --- /dev/null +++ b/tests/Timer/ExtUvTimerTest.php @@ -0,0 +1,17 @@ +markTestSkipped('uv tests skipped because ext-uv is not installed.'); + } + + return new ExtUvLoop(); + } +} diff --git a/travis-init.sh b/travis-init.sh index 9ed7afaa..9ea4e6f9 100755 --- a/travis-init.sh +++ b/travis-init.sh @@ -42,4 +42,11 @@ if [[ "$TRAVIS_PHP_VERSION" != "hhvm" && echo "extension=libev.so" >> "$(php -r 'echo php_ini_loaded_file();')" fi + # install 'libuv' PHP extension (does not support php 5) + if [[ "$TRAVIS_PHP_VERSION" = "7.0" || + "$TRAVIS_PHP_VERSION" = "7.1" || + "$TRAVIS_PHP_VERSION" = "7.2" ]]; then + echo "yes" | pecl install uv-beta + fi + fi