-
-
Notifications
You must be signed in to change notification settings - Fork 32.5k
gh-85984: Utilize new "winsize" functions from termios in pty tests. #101831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
63fb4d8
5082b4a
cd0742c
4ae71ad
aaff7ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,20 +15,12 @@ | |
import socket | ||
import io # readline | ||
import unittest | ||
|
||
import struct | ||
import fcntl | ||
import warnings | ||
|
||
TEST_STRING_1 = b"I wish to buy a fish license.\n" | ||
TEST_STRING_2 = b"For my pet fish, Eric.\n" | ||
|
||
try: | ||
_TIOCGWINSZ = tty.TIOCGWINSZ | ||
_TIOCSWINSZ = tty.TIOCSWINSZ | ||
_HAVE_WINSZ = True | ||
except AttributeError: | ||
_HAVE_WINSZ = False | ||
_HAVE_WINSZ = hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ") | ||
|
||
if verbose: | ||
def debug(msg): | ||
|
@@ -82,14 +74,6 @@ def expectedFailureIfStdinIsTTY(fun): | |
pass | ||
return fun | ||
|
||
def _get_term_winsz(fd): | ||
s = struct.pack("HHHH", 0, 0, 0, 0) | ||
return fcntl.ioctl(fd, _TIOCGWINSZ, s) | ||
|
||
def _set_term_winsz(fd, winsz): | ||
fcntl.ioctl(fd, _TIOCSWINSZ, winsz) | ||
|
||
|
||
# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing | ||
# because pty code is not too portable. | ||
class PtyTest(unittest.TestCase): | ||
|
@@ -105,18 +89,14 @@ def setUp(self): | |
self.addCleanup(signal.alarm, 0) | ||
signal.alarm(10) | ||
|
||
# Save original stdin window size | ||
self.stdin_rows = None | ||
self.stdin_cols = None | ||
# Save original stdin window size. | ||
self.stdin_dim = None | ||
if _HAVE_WINSZ: | ||
try: | ||
stdin_dim = os.get_terminal_size(pty.STDIN_FILENO) | ||
self.stdin_rows = stdin_dim.lines | ||
self.stdin_cols = stdin_dim.columns | ||
old_stdin_winsz = struct.pack("HHHH", self.stdin_rows, | ||
self.stdin_cols, 0, 0) | ||
self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz) | ||
except OSError: | ||
self.stdin_dim = tty.tcgetwinsize(pty.STDIN_FILENO) | ||
self.addCleanup(tty.tcsetwinsize, pty.STDIN_FILENO, \ | ||
gpshead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.stdin_dim) | ||
except tty.error: | ||
pass | ||
|
||
def handle_sig(self, sig, frame): | ||
|
@@ -131,41 +111,40 @@ def test_openpty(self): | |
try: | ||
mode = tty.tcgetattr(pty.STDIN_FILENO) | ||
except tty.error: | ||
# not a tty or bad/closed fd | ||
# Not a tty or bad/closed fd. | ||
debug("tty.tcgetattr(pty.STDIN_FILENO) failed") | ||
mode = None | ||
|
||
new_stdin_winsz = None | ||
if self.stdin_rows is not None and self.stdin_cols is not None: | ||
new_dim = None | ||
if self.stdin_dim != None: | ||
gpshead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
# Modify pty.STDIN_FILENO window size; we need to | ||
# check if pty.openpty() is able to set pty slave | ||
# window size accordingly. | ||
debug("Setting pty.STDIN_FILENO window size") | ||
debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})") | ||
target_stdin_rows = self.stdin_rows + 1 | ||
target_stdin_cols = self.stdin_cols + 1 | ||
debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})") | ||
target_stdin_winsz = struct.pack("HHHH", target_stdin_rows, | ||
target_stdin_cols, 0, 0) | ||
_set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz) | ||
debug("Setting pty.STDIN_FILENO window size.") | ||
debug(f"original size: (row, col) = {self.stdin_dim}") | ||
target_dim = (self.stdin_dim[0] + 1, self.stdin_dim[1] + 1) | ||
debug(f"target size: (row, col) = {target_dim}") | ||
tty.tcsetwinsize(pty.STDIN_FILENO, target_dim) | ||
|
||
# Were we able to set the window size | ||
# of pty.STDIN_FILENO successfully? | ||
new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO) | ||
self.assertEqual(new_stdin_winsz, target_stdin_winsz, | ||
new_dim = tty.tcgetwinsize(pty.STDIN_FILENO) | ||
self.assertEqual(new_dim, target_dim, | ||
"pty.STDIN_FILENO window size unchanged") | ||
except OSError: | ||
warnings.warn("Failed to set pty.STDIN_FILENO window size") | ||
warnings.warn("Failed to set pty.STDIN_FILENO window size.") | ||
pass | ||
|
||
try: | ||
debug("Calling pty.openpty()") | ||
try: | ||
master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz) | ||
master_fd, slave_fd, slave_name = pty.openpty(mode, new_dim, \ | ||
gpshead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
True) | ||
except TypeError: | ||
master_fd, slave_fd = pty.openpty() | ||
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'") | ||
slave_name = None | ||
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}', slave_name '{slave_name}'") | ||
gpshead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except OSError: | ||
# " An optional feature could not be imported " ... ? | ||
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.") | ||
|
@@ -181,8 +160,8 @@ def test_openpty(self): | |
if mode: | ||
self.assertEqual(tty.tcgetattr(slave_fd), mode, | ||
"openpty() failed to set slave termios") | ||
if new_stdin_winsz: | ||
self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz, | ||
if new_dim: | ||
self.assertEqual(tty.tcgetwinsize(slave_fd), new_dim, | ||
"openpty() failed to set slave window size") | ||
|
||
# Ensure the fd is non-blocking in case there's nothing to read. | ||
|
@@ -367,9 +346,8 @@ def _socketpair(self): | |
self.files.extend(socketpair) | ||
return socketpair | ||
|
||
def _mock_select(self, rfds, wfds, xfds, timeout=0): | ||
def _mock_select(self, rfds, wfds, xfds): | ||
# This will raise IndexError when no more expected calls exist. | ||
# This ignores the timeout | ||
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) | ||
return self.select_rfds_results.pop(0), [], [] | ||
|
||
|
@@ -409,27 +387,27 @@ def test__copy_to_each(self): | |
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') | ||
self.assertEqual(os.read(masters[1], 20), b'from stdin') | ||
|
||
def test__copy_eof_on_all(self): | ||
"""Test the empty read EOF case on both master_fd and stdin.""" | ||
read_from_stdout_fd, mock_stdout_fd = self._pipe() | ||
pty.STDOUT_FILENO = mock_stdout_fd | ||
mock_stdin_fd, write_to_stdin_fd = self._pipe() | ||
pty.STDIN_FILENO = mock_stdin_fd | ||
socketpair = self._socketpair() | ||
masters = [s.fileno() for s in socketpair] | ||
|
||
socketpair[1].close() | ||
os.close(write_to_stdin_fd) | ||
|
||
pty.select = self._mock_select | ||
self.select_rfds_lengths.append(2) | ||
self.select_rfds_results.append([mock_stdin_fd, masters[0]]) | ||
# We expect that both fds were removed from the fds list as they | ||
# both encountered an EOF before the second select call. | ||
self.select_rfds_lengths.append(0) | ||
|
||
# We expect the function to return without error. | ||
self.assertEqual(pty._copy(masters[0]), None) | ||
# def test__copy_eof_on_all(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed, this test doesn't make sense as written, please delete it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
# """Test the empty read EOF case on both master_fd and stdin.""" | ||
# read_from_stdout_fd, mock_stdout_fd = self._pipe() | ||
# pty.STDOUT_FILENO = mock_stdout_fd | ||
# mock_stdin_fd, write_to_stdin_fd = self._pipe() | ||
# pty.STDIN_FILENO = mock_stdin_fd | ||
# socketpair = self._socketpair() | ||
# masters = [s.fileno() for s in socketpair] | ||
|
||
# socketpair[1].close() | ||
# os.close(write_to_stdin_fd) | ||
|
||
# pty.select = self._mock_select | ||
# self.select_rfds_lengths.append(2) | ||
# self.select_rfds_results.append([mock_stdin_fd, masters[0]]) | ||
# # We expect that both fds were removed from the fds list as they | ||
# # both encountered an EOF before the second select call. | ||
# self.select_rfds_lengths.append(0) | ||
|
||
# # We expect the function to return without error. | ||
# self.assertEqual(pty._copy(masters[0]), None) | ||
|
||
def test__restore_tty_mode_normal_return(self): | ||
"""Test that spawn resets the tty mode no when _copy returns normally.""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Utilize new "winsize" functions from termios in pty tests. |
Uh oh!
There was an error while loading. Please reload this page.