Skip to content

[Process] Fix pipes handling #18066

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/Symfony/Component/Process/Pipes/AbstractPipes.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ abstract class AbstractPipes implements PipesInterface
/** @var bool */
private $blocked = true;

public function __construct($input)
{
if (is_resource($input)) {
$this->input = $input;
} elseif (is_string($input)) {
$this->inputBuffer = $input;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the string already fits in memory, there is no gain in putting it in a php://temp stream

} else {
$this->inputBuffer = (string) $input;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the (string) casting operator duplicates strings (at least on PHP 5), we use it only for non-strings.

}
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -71,4 +82,64 @@ protected function unblock()

$this->blocked = false;
}

/**
* Writes input to stdin.
*/
protected function write()
{
if (!isset($this->pipes[0])) {
return;
}

$e = array();
$r = null !== $this->input ? array($this->input) : $e;
$w = array($this->pipes[0]);

// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, 0)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to wait here as this is done in readAndWrite

return;
}

foreach ($w as $stdin) {
if (isset($this->inputBuffer[0])) {
$written = fwrite($stdin, $this->inputBuffer);
$this->inputBuffer = substr($this->inputBuffer, $written);
if (isset($this->inputBuffer[0])) {
return array($this->pipes[0]);
}
}

foreach ($r as $input) {
for (;;) {
$data = fread($input, self::CHUNK_SIZE);
if (!isset($data[0])) {
break;
}
$written = fwrite($stdin, $data);
$data = substr($data, $written);
if (isset($data[0])) {
$this->inputBuffer = $data;

return array($this->pipes[0]);
}
}
if (!isset($data[0]) && feof($input)) {
// no more data to read on input resource
// use an empty buffer in the next reads
$this->input = null;
}
}
}

// no input to read on resource, buffer is empty
if (null === $this->input && !isset($this->inputBuffer[0])) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
}

if (!$w) {
return array($this->pipes[0]);
}
}
}
81 changes: 17 additions & 64 deletions src/Symfony/Component/Process/Pipes/UnixPipes.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ public function __construct($ttyMode, $ptyMode, $input, $disableOutput)
$this->ptyMode = (bool) $ptyMode;
$this->disableOutput = (bool) $disableOutput;

if (is_resource($input)) {
$this->input = $input;
} else {
$this->input = fopen('php://temp', 'w+');
fwrite($this->input, $input);
fseek($this->input, 0);
}
parent::__construct($input);
}

public function __destruct()
Expand Down Expand Up @@ -100,36 +94,15 @@ public function getFiles()
*/
public function readAndWrite($blocking, $close = false)
{
// only stdin is left open, job has been done !
// we can now close it
if (1 === count($this->pipes) && array(0) === array_keys($this->pipes)) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
}

if (empty($this->pipes)) {
return array();
}

$this->unblock();
$w = $this->write();

$read = array();

if (null !== $this->input) {
// if input is a resource, let's add it to stream_select argument to
// fill a buffer
$r = array_merge($this->pipes, array('input' => $this->input));
} else {
$r = $this->pipes;
}
// discard read on stdin
$read = $e = array();
$r = $this->pipes;
unset($r[0]);

$w = isset($this->pipes[0]) ? array($this->pipes[0]) : null;
$e = null;

// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) {
if ($r && false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occurred, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
Expand All @@ -139,44 +112,24 @@ public function readAndWrite($blocking, $close = false)
return $read;
}

// nothing has changed
if (0 === $n) {
return $read;
}

foreach ($r as $pipe) {
// prior PHP 5.4 the array passed to stream_select is modified and
// lose key association, we have to find back the key
$type = (false !== $found = array_search($pipe, $this->pipes)) ? $found : 'input';
$data = '';
if ($type !== 'input') {
while ('' !== $dataread = (string) fread($pipe, self::CHUNK_SIZE)) {
$data .= $dataread;
}
// Remove extra null chars returned by fread
if ('' !== $data) {
$read[$type] = rtrim($data, "\x00");
}
} elseif (isset($w[0])) {
stream_copy_to_stream($this->input, $w[0], 4096);
}
$read[$type = array_search($pipe, $this->pipes, true)] = '';

if (false === $data || (true === $close && feof($pipe) && '' === $data)) {
if ($type === 'input') {
// no more data to read on input resource
// use an empty buffer in the next reads
$this->input = null;
} else {
fclose($this->pipes[$type]);
unset($this->pipes[$type]);
}
do {
$data = fread($pipe, self::CHUNK_SIZE);
$read[$type] .= $data;
} while (isset($data[0]));

if (!isset($read[$type][0])) {
unset($read[$type]);
}
}

// no input to read on resource and stdin still open
if (null === $this->input && isset($this->pipes[0])) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
if ($close && feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
}
}

return $read;
Expand Down
107 changes: 15 additions & 92 deletions src/Symfony/Component/Process/Pipes/WindowsPipes.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,13 @@ public function __construct($disableOutput, $input)
Process::STDERR => tempnam(sys_get_temp_dir(), 'err_sf_proc'),
);
foreach ($this->files as $offset => $file) {
if (false === $file || false === $this->fileHandles[$offset] = fopen($file, 'rb')) {
if (false === $file || false === $this->fileHandles[$offset] = @fopen($file, 'rb')) {
throw new RuntimeException('A temporary file could not be opened to write the process output to, verify that your TEMP environment variable is writable');
}
}
}

if (is_resource($input)) {
$this->input = $input;
} else {
$this->inputBuffer = $input;
}
parent::__construct($input);
}

public function __destruct()
Expand Down Expand Up @@ -109,28 +105,22 @@ public function getFiles()
*/
public function readAndWrite($blocking, $close = false)
{
$this->write($blocking, $close);
$this->unblock();
$this->write();

$read = array();
$fh = $this->fileHandles;
foreach ($fh as $type => $fileHandle) {
if (0 !== fseek($fileHandle, $this->readBytes[$type])) {
continue;
}
$data = '';
$dataread = null;
while (!feof($fileHandle)) {
if (false !== $dataread = fread($fileHandle, self::CHUNK_SIZE)) {
$data .= $dataread;
}
}
if (0 < $length = strlen($data)) {
$this->readBytes[$type] += $length;
if ($this->fileHandles && $blocking) {
usleep(Process::TIMEOUT_PRECISION * 1E6);
}
foreach ($this->fileHandles as $type => $fileHandle) {
$data = stream_get_contents($fileHandle, -1, $this->readBytes[$type]);

if (isset($data[0])) {
$this->readBytes[$type] += strlen($data);
$read[$type] = $data;
}

if (false === $dataread || (true === $close && feof($fileHandle) && '' === $data)) {
fclose($this->fileHandles[$type]);
if ($close) {
fclose($fileHandle);
unset($this->fileHandles[$type]);
}
}
Expand All @@ -143,7 +133,7 @@ public function readAndWrite($blocking, $close = false)
*/
public function areOpen()
{
return (bool) $this->pipes && (bool) $this->fileHandles;
return $this->pipes && $this->fileHandles;
}

/**
Expand Down Expand Up @@ -183,71 +173,4 @@ private function removeFiles()
}
$this->files = array();
}

/**
* Writes input to stdin.
*
* @param bool $blocking
* @param bool $close
*/
private function write($blocking, $close)
{
if (empty($this->pipes)) {
return;
}

$this->unblock();

$r = null !== $this->input ? array('input' => $this->input) : null;
$w = isset($this->pipes[0]) ? array($this->pipes[0]) : null;
$e = null;

// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occurred, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
$this->pipes = array();
}

return;
}

// nothing has changed
if (0 === $n) {
return;
}

if (null !== $w && 0 < count($r)) {
$data = '';
while ($dataread = fread($r['input'], self::CHUNK_SIZE)) {
$data .= $dataread;
}

$this->inputBuffer .= $data;

if (false === $data || (true === $close && feof($r['input']) && '' === $data)) {
// no more data to read on input resource
// use an empty buffer in the next reads
$this->input = null;
}
}

if (null !== $w && 0 < count($w)) {
while (strlen($this->inputBuffer)) {
$written = fwrite($w[0], $this->inputBuffer, 2 << 18);
if ($written > 0) {
$this->inputBuffer = (string) substr($this->inputBuffer, $written);
} else {
break;
}
}
}

// no input to read on resource, buffer is empty and stdin still open
if ('' === $this->inputBuffer && null === $this->input && isset($this->pipes[0])) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
}
}
}
18 changes: 6 additions & 12 deletions src/Symfony/Component/Process/Tests/ProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,6 @@ public function testProcessPipes($code, $size)
*/
public function testSetStreamAsInput($code, $size)
{
if ('\\' === DIRECTORY_SEPARATOR) {
$this->markTestIncomplete('This test fails with a timeout on Windows, can someone investigate please?');
}
$expected = str_repeat(str_repeat('*', 1024), $size).'!';
$expectedLength = (1024 * $size) + 1;

Expand Down Expand Up @@ -791,26 +788,23 @@ public function testIdleTimeout()

public function testIdleTimeoutNotExceededWhenOutputIsSent()
{
if ('\\' === DIRECTORY_SEPARATOR) {
$this->markTestIncomplete('This test fails with a timeout on Windows, can someone investigate please?');
}
$process = $this->getProcess(sprintf('%s -r %s', self::$phpBin, escapeshellarg('while (true) {echo "foo\n"; usleep(10000);}')));
$process = $this->getProcess(sprintf('%s -r %s', self::$phpBin, escapeshellarg('while (true) {echo \'foo \'; usleep(1000);}')));
$process->setTimeout(1);
$process->start();

while (false === strpos($process->getOutput(), 'foo')) {
usleep(1000);
}

$process->setIdleTimeout(0.1);
$process->setIdleTimeout(0.5);

try {
$process->wait();
$this->fail('A timeout exception was expected.');
} catch (ProcessTimedOutException $ex) {
$this->assertTrue($ex->isGeneralTimeout(), 'A general timeout is expected.');
$this->assertFalse($ex->isIdleTimeout(), 'No idle timeout is expected.');
$this->assertEquals(1, $ex->getExceededTimeout());
} catch (ProcessTimedOutException $e) {
$this->assertTrue($e->isGeneralTimeout(), 'A general timeout is expected.');
$this->assertFalse($e->isIdleTimeout(), 'No idle timeout is expected.');
$this->assertEquals(1, $e->getExceededTimeout());
}
}

Expand Down