Skip to content

React socket change #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/
/vendor
/composer.lock
11 changes: 6 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
],
"require": {
"php": ">=5.3",
"react/promise": "^2.0 || ^1.1",
"react/socket-client": "^0.5 || ^0.4 || ^0.3",
"react/event-loop": "0.3.*|0.4.*",
"react/promise": "^2.0",
"react/socket": "^0.8",
"react/event-loop": "0.4.*",
"clue/redis-protocol": "0.3.*",
"evenement/evenement": "~1.0|~2.0"
"evenement/evenement": "~2.0"
},
"autoload": {
"psr-4": { "Clue\\React\\Redis\\": "src/" }
},
"require-dev": {
"clue/block-react": "^1.1"
"clue/block-react": "^1.1",
"phpunit/phpunit": "~4.8"
}
}
26 changes: 16 additions & 10 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,30 @@

namespace Clue\React\Redis;

use React\SocketClient\ConnectorInterface;
use React\Stream\Stream;
use Clue\React\Redis\StreamingClient;
use React\Socket\ConnectorInterface;
use React\Stream\DuplexStreamInterface;
use Clue\Redis\Protocol\Factory as ProtocolFactory;
use React\SocketClient\Connector;
use React\Dns\Resolver\Factory as ResolverFactory;
use React\Socket\Connector;
use InvalidArgumentException;
use React\EventLoop\LoopInterface;
use React\Promise;

class Factory
{
private $connector;
private $protocol;
/**
* @var ConnectorInterface
*/
protected $connector;

/**
* @var ProtocolFactory
*/
protected $protocol;

public function __construct(LoopInterface $loop, ConnectorInterface $connector = null, ProtocolFactory $protocol = null)
{
if ($connector === null) {
$resolverFactory = new ResolverFactory();
$connector = new Connector($loop, $resolverFactory->create('8.8.8.8', $loop));
$connector = new Connector($loop);
}

if ($protocol === null) {
Expand All @@ -48,7 +52,9 @@ public function createClient($target = null)

$protocol = $this->protocol;

$promise = $this->connector->create($parts['host'], $parts['port'])->then(function (Stream $stream) use ($protocol) {
$uri = $parts['host'] . ':' . $parts['port'];

$promise = $this->connector->connect($uri)->then(function (DuplexStreamInterface $stream) use ($protocol) {
return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer());
});

Expand Down
8 changes: 2 additions & 6 deletions src/StreamingClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
namespace Clue\React\Redis;

use Evenement\EventEmitter;
use React\Stream\Stream;
use React\Stream\DuplexStreamInterface;
use Clue\Redis\Protocol\Parser\ParserInterface;
use Clue\Redis\Protocol\Parser\ParserException;
use Clue\Redis\Protocol\Model\ErrorReplyException;
use Clue\Redis\Protocol\Serializer\SerializerInterface;
use Clue\Redis\Protocol\Factory as ProtocolFactory;
use UnderflowException;
Expand All @@ -18,9 +17,6 @@
use Clue\Redis\Protocol\Model\MultiBulkReply;
use Clue\Redis\Protocol\Model\StatusReply;

/**
* @internal
*/
class StreamingClient extends EventEmitter implements Client
{
private $stream;
Expand All @@ -34,7 +30,7 @@ class StreamingClient extends EventEmitter implements Client
private $psubscribed = 0;
private $monitoring = false;

public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
public function __construct(DuplexStreamInterface $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
{
if ($parser === null || $serializer === null) {
$factory = new ProtocolFactory();
Expand Down
22 changes: 11 additions & 11 deletions tests/FactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class FactoryTest extends TestCase
public function setUp()
{
$this->loop = $this->getMock('React\EventLoop\LoopInterface');
$this->connector = $this->getMock('React\SocketClient\ConnectorInterface');
$this->connector = $this->getMock('React\Socket\ConnectorInterface');
$this->factory = new Factory($this->loop, $this->connector);
}

Expand All @@ -25,54 +25,54 @@ public function testCtor()

public function testWillConnectToLocalIpWithDefaultPortIfTargetIsNotGiven()
{
$this->connector->expects($this->once())->method('create')->with('127.0.0.1', 6379)->willReturn(Promise\reject(new \RuntimeException()));
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:6379')->willReturn(Promise\reject(new \RuntimeException()));
$this->factory->createClient();
}

public function testWillConnectWithDefaultPort()
{
$this->connector->expects($this->once())->method('create')->with('redis.example.com', 6379)->willReturn(Promise\reject(new \RuntimeException()));
$this->connector->expects($this->once())->method('connect')->with('redis.example.com:6379')->willReturn(Promise\reject(new \RuntimeException()));
$this->factory->createClient('redis.example.com');
}

public function testWillConnectToLocalIpWhenTargetIsLocalhost()
{
$this->connector->expects($this->once())->method('create')->with('127.0.0.1', 1337)->willReturn(Promise\reject(new \RuntimeException()));
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:1337')->willReturn(Promise\reject(new \RuntimeException()));
$this->factory->createClient('tcp://localhost:1337');
}

public function testWillResolveIfConnectorResolves()
{
$stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->getMock();
$stream = $this->getMockBuilder('React\Stream\DuplexStreamInterface')->disableOriginalConstructor()->getMock();
$stream->expects($this->never())->method('write');

$this->connector->expects($this->once())->method('create')->willReturn(Promise\resolve($stream));
$this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream));
$promise = $this->factory->createClient();

$this->expectPromiseResolve($promise);
}

public function testWillWriteSelectCommandIfTargetContainsPath()
{
$stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->getMock();
$stream = $this->getMockBuilder('React\Stream\DuplexStreamInterface')->disableOriginalConstructor()->getMock();
$stream->expects($this->once())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n");

$this->connector->expects($this->once())->method('create')->willReturn(Promise\resolve($stream));
$this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream));
$this->factory->createClient('tcp://127.0.0.1/demo');
}

public function testWillWriteAuthCommandIfTargetContainsUserInfo()
{
$stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->getMock();
$stream = $this->getMockBuilder('React\Stream\DuplexStreamInterface')->disableOriginalConstructor()->getMock();
$stream->expects($this->once())->method('write')->with("*2\r\n$4\r\nauth\r\n$11\r\nhello:world\r\n");

$this->connector->expects($this->once())->method('create')->willReturn(Promise\resolve($stream));
$this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream));
$this->factory->createClient('tcp://hello:[email protected]');
}

public function testWillRejectIfConnectorRejects()
{
$this->connector->expects($this->once())->method('create')->with('127.0.0.1', 2)->willReturn(Promise\reject(new \RuntimeException()));
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException()));
$promise = $this->factory->createClient('tcp://127.0.0.1:2');

$this->expectPromiseReject($promise);
Expand Down
6 changes: 3 additions & 3 deletions tests/FunctionalTest.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<?php

use React\Stream\Stream;
use React\Stream\ReadableStream;
use React\Stream\DuplexResourceStream;
use Clue\React\Redis\Factory;
use Clue\React\Redis\StreamingClient;
use React\Promise\Deferred;
Expand Down Expand Up @@ -171,7 +170,8 @@ protected function createClientResponse($response)
fwrite($fp, $response);
fseek($fp, 0);

$stream = new Stream($fp, $this->loop);
//$stream = new Stream($fp, $this->loop);
$stream = new DuplexResourceStream($fp, $this->loop);

return new StreamingClient($stream);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/StreamingClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class StreamingClientTest extends TestCase

public function setUp()
{
$this->stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock();
$this->stream = $this->getMockBuilder('React\Stream\DuplexResourceStream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock();
$this->parser = $this->getMock('Clue\Redis\Protocol\Parser\ParserInterface');
$this->serializer = $this->getMock('Clue\Redis\Protocol\Serializer\SerializerInterface');

Expand Down