Skip to content

Commit 6a971a6

Browse files
authored
Merge pull request clue#13 from clue-labs/windows
Support Windows by using temporary network socket for process I/O
2 parents c51bb81 + b0dfef0 commit 6a971a6

File tree

6 files changed

+319
-34
lines changed

6 files changed

+319
-34
lines changed

.travis.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,18 @@ matrix:
1717
include:
1818
- php: hhvm
1919
install: composer require phpunit/phpunit:^5 --dev --no-interaction
20+
- name: "Windows"
21+
os: windows
22+
language: shell # no built-in php support
23+
before_install:
24+
- choco install php
25+
- choco install composer
26+
- export PATH="$(powershell -Command '("Process", "Machine" | % { [Environment]::GetEnvironmentVariable("PATH", $_) -Split ";" -Replace "\\$", "" } | Select -Unique | % { cygpath $_ }) -Join ":"')"
27+
- php -r "file_put_contents(php_ini_loaded_file(),'extension_dir=ext'.PHP_EOL,FILE_APPEND);"
28+
- php -r "file_put_contents(php_ini_loaded_file(),'extension=sqlite3'.PHP_EOL,FILE_APPEND);"
2029
allow_failures:
2130
- php: hhvm
31+
- os: windows
2232

2333
install:
2434
- composer install --no-interaction

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ This method returns a promise that will resolve with a `DatabaseInterface` on
7575
success or will reject with an `Exception` on error. The SQLite extension
7676
is inherently blocking, so this method will spawn an SQLite worker process
7777
to run all SQLite commands and queries in a separate process without
78-
blocking the main process.
78+
blocking the main process. On Windows, it uses a temporary network socket
79+
for this communication, on all other platforms it communicates over
80+
standard process I/O pipes.
7981

8082
```php
8183
$factory->open('users.db')->then(function (DatabaseInterface $db) {

res/sqlite-worker.php

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
11
<?php
22

3+
// This child worker process will be started by the main process to start communication over process pipe I/O
4+
//
5+
// Communication happens via newline-delimited JSON-RPC messages, see:
6+
// $ php res/sqlite-worker.php
7+
// < {"id":0,"method":"open","params":["test.db"]}
8+
// > {"id":0,"result":true}
9+
//
10+
// Or via socket connection (used for Windows, which does not support non-blocking process pipe I/O)
11+
// $ nc localhost 8080
12+
// $ php res/sqlite-worker.php localhost:8080
13+
14+
use Clue\React\NDJson\Decoder;
15+
use Clue\React\NDJson\Encoder;
316
use React\EventLoop\Factory;
17+
use React\Stream\DuplexResourceStream;
418
use React\Stream\ReadableResourceStream;
19+
use React\Stream\ThroughStream;
520
use React\Stream\WritableResourceStream;
6-
use Clue\React\NDJson\Decoder;
7-
use Clue\React\NDJson\Encoder;
821

922
if (file_exists(__DIR__ . '/../vendor/autoload.php')) {
1023
// local project development, go from /res to /vendor
@@ -15,8 +28,27 @@
1528
}
1629

1730
$loop = Factory::create();
18-
$in = new Decoder(new ReadableResourceStream(\STDIN, $loop));
19-
$out = new Encoder(new WritableResourceStream(\STDOUT, $loop));
31+
32+
if (isset($_SERVER['argv'][1])) {
33+
// socket address given, so try to connect through socket (Windows)
34+
$socket = stream_socket_client($_SERVER['argv'][1]);
35+
$stream = new DuplexResourceStream($socket, $loop);
36+
37+
// pipe input through a wrapper stream so that an error on the input stream
38+
// will not immediately close the output stream without a chance to report
39+
// this error through the output stream.
40+
$through = new ThroughStream();
41+
$stream->on('data', function ($data) use ($through) {
42+
$through->write($data);
43+
});
44+
45+
$in = new Decoder($through);
46+
$out = new Encoder($stream);
47+
} else {
48+
// no socket address given, use process I/O pipes
49+
$in = new Decoder(new ReadableResourceStream(\STDIN, $loop));
50+
$out = new Encoder(new WritableResourceStream(\STDOUT, $loop));
51+
}
2052

2153
// report error when input is invalid NDJSON
2254
$in->on('error', function (Exception $e) use ($out) {

src/Factory.php

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55
use React\ChildProcess\Process;
66
use React\EventLoop\LoopInterface;
77
use Clue\React\SQLite\Io\ProcessIoDatabase;
8+
use React\Stream\DuplexResourceStream;
9+
use React\Promise\Deferred;
10+
use React\Stream\ThroughStream;
811

912
class Factory
1013
{
1114
private $loop;
1215

16+
private $useSocket;
17+
1318
/**
1419
* The `Factory` is responsible for opening your [`DatabaseInterface`](#databaseinterface) instance.
1520
* It also registers everything with the main [`EventLoop`](https://github.com/reactphp/event-loop#usage).
@@ -24,6 +29,9 @@ class Factory
2429
public function __construct(LoopInterface $loop)
2530
{
2631
$this->loop = $loop;
32+
33+
// use socket I/O for Windows only, use faster process pipes everywhere else
34+
$this->useSocket = DIRECTORY_SEPARATOR === '\\';
2735
}
2836

2937
/**
@@ -33,7 +41,9 @@ public function __construct(LoopInterface $loop)
3341
* success or will reject with an `Exception` on error. The SQLite extension
3442
* is inherently blocking, so this method will spawn an SQLite worker process
3543
* to run all SQLite commands and queries in a separate process without
36-
* blocking the main process.
44+
* blocking the main process. On Windows, it uses a temporary network socket
45+
* for this communication, on all other platforms it communicates over
46+
* standard process I/O pipes.
3747
*
3848
* ```php
3949
* $factory->open('users.db')->then(function (DatabaseInterface $db) {
@@ -62,6 +72,11 @@ public function __construct(LoopInterface $loop)
6272
* @return PromiseInterface<DatabaseInterface> Resolves with DatabaseInterface instance or rejects with Exception
6373
*/
6474
public function open($filename, $flags = null)
75+
{
76+
return $this->useSocket ? $this->openSocketIo($filename, $flags) : $this->openProcessIo($filename, $flags);
77+
}
78+
79+
private function openProcessIo($filename, $flags = null)
6580
{
6681
$command = 'exec ' . \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php');
6782

@@ -121,4 +136,82 @@ public function open($filename, $flags = null)
121136
throw $e;
122137
});
123138
}
139+
140+
private function openSocketIo($filename, $flags = null)
141+
{
142+
$command = \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php');
143+
144+
// launch process without default STDIO pipes
145+
$null = \DIRECTORY_SEPARATOR === '\\' ? 'nul' : '/dev/null';
146+
$pipes = array(
147+
array('file', $null, 'r'),
148+
array('file', $null, 'w'),
149+
STDERR // array('file', $null, 'w'),
150+
);
151+
152+
// start temporary socket on random address
153+
$server = @stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr);
154+
if ($server === false) {
155+
return \React\Promise\reject(
156+
new \RuntimeException('Unable to start temporary socket I/O server: ' . $errstr, $errno)
157+
);
158+
}
159+
160+
// pass random server address to child process to connect back to parent process
161+
stream_set_blocking($server, false);
162+
$command .= ' ' . stream_socket_get_name($server, false);
163+
164+
$process = new Process($command, null, null, $pipes);
165+
$process->start($this->loop);
166+
167+
$deferred = new Deferred(function () use ($process, $server) {
168+
$this->loop->removeReadStream($server);
169+
fclose($server);
170+
$process->terminate();
171+
172+
throw new \RuntimeException('Opening database cancelled');
173+
});
174+
175+
// time out after a few seconds if we don't receive a connection
176+
$timeout = $this->loop->addTimer(5.0, function () use ($server, $deferred, $process) {
177+
$this->loop->removeReadStream($server);
178+
fclose($server);
179+
$process->terminate();
180+
181+
$deferred->reject(new \RuntimeException('No connection detected'));
182+
});
183+
184+
$this->loop->addReadStream($server, function () use ($server, $timeout, $filename, $flags, $deferred, $process) {
185+
// accept once connection on server socket and stop server socket
186+
$this->loop->cancelTimer($timeout);
187+
$peer = stream_socket_accept($server, 0);
188+
$this->loop->removeReadStream($server);
189+
fclose($server);
190+
191+
// use this one connection as fake process I/O streams
192+
$connection = new DuplexResourceStream($peer, $this->loop, -1);
193+
$process->stdin = $process->stdout = $connection;
194+
$connection->on('close', function () use ($process) {
195+
$process->terminate();
196+
});
197+
$process->on('exit', function () use ($connection) {
198+
$connection->close();
199+
});
200+
201+
$db = new ProcessIoDatabase($process);
202+
$args = array($filename);
203+
if ($flags !== null) {
204+
$args[] = $flags;
205+
}
206+
207+
$db->send('open', $args)->then(function () use ($deferred, $db) {
208+
$deferred->resolve($db);
209+
}, function ($e) use ($deferred, $db) {
210+
$db->close();
211+
$deferred->reject($e);
212+
});
213+
});
214+
215+
return $deferred->promise();
216+
}
124217
}

src/Io/ProcessIoDatabase.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ public function quit()
9191
{
9292
$promise = $this->send('close', array());
9393

94-
$this->process->stdin->end();
94+
if ($this->process->stdin === $this->process->stdout) {
95+
$promise->then(function () { $this->process->stdin->close(); });
96+
} else {
97+
$this->process->stdin->end();
98+
}
9599

96100
return $promise;
97101
}
@@ -120,7 +124,7 @@ public function close()
120124
/** @internal */
121125
public function send($method, array $params)
122126
{
123-
if (!$this->process->stdin->isWritable()) {
127+
if ($this->closed || !$this->process->stdin->isWritable()) {
124128
return \React\Promise\reject(new \RuntimeException('Database closed'));
125129
}
126130

0 commit comments

Comments
 (0)