diff --git a/Lib/test/_test_eintr.py b/Lib/test/_test_eintr.py index ae799808ca067e6..3ac852f65e8d80d 100644 --- a/Lib/test/_test_eintr.py +++ b/Lib/test/_test_eintr.py @@ -91,7 +91,7 @@ class OSEINTRTest(EINTRBaseTest): """ EINTR tests for the os module. """ def new_sleep_process(self): - code = 'import time; time.sleep(%r)' % self.sleep_time + code = f'import time; time.sleep({self.sleep_time!r})' return self.subprocess(code) def _test_wait_multiple(self, wait_func): @@ -130,26 +130,26 @@ def test_read(self): # the payload below are smaller than PIPE_BUF, hence the writes will be # atomic - datas = [b"hello", b"world", b"spam"] + data = [b"hello", b"world", b"spam"] code = '\n'.join(( 'import os, sys, time', '', 'wr = int(sys.argv[1])', - 'datas = %r' % datas, - 'sleep_time = %r' % self.sleep_time, + f'data = {data!r}', + f'sleep_time = {self.sleep_time!r}', '', - 'for data in datas:', + 'for item in data:', ' # let the parent block on read()', ' time.sleep(sleep_time)', - ' os.write(wr, data)', + ' os.write(wr, item)', )) proc = self.subprocess(code, str(wr), pass_fds=[wr]) with kill_on_error(proc): os.close(wr) - for data in datas: - self.assertEqual(data, os.read(rd, len(data))) + for item in data: + self.assertEqual(item, os.read(rd, len(item))) self.assertEqual(proc.wait(), 0) def test_readinto(self): @@ -159,28 +159,28 @@ def test_readinto(self): # the payload below are smaller than PIPE_BUF, hence the writes will be # atomic - datas = [b"hello", b"world", b"spam"] + data = [b"hello", b"world", b"spam"] code = '\n'.join(( 'import os, sys, time', '', 'wr = int(sys.argv[1])', - 'datas = %r' % datas, - 'sleep_time = %r' % self.sleep_time, + f'data = {data!r}', + f'sleep_time = {self.sleep_time!r}', '', - 'for data in datas:', + 'for item in data:', ' # let the parent block on read()', ' time.sleep(sleep_time)', - ' os.write(wr, data)', + ' os.write(wr, item)', )) proc = self.subprocess(code, str(wr), pass_fds=[wr]) with kill_on_error(proc): os.close(wr) - for data in datas: - buffer = bytearray(len(data)) - self.assertEqual(os.readinto(rd, buffer), len(data)) - self.assertEqual(buffer, data) + for item in data: + buffer = bytearray(len(item)) + self.assertEqual(os.readinto(rd, buffer), len(item)) + self.assertEqual(buffer, item) self.assertEqual(proc.wait(), 0) def test_write(self): @@ -195,8 +195,8 @@ def test_write(self): 'import io, os, sys, time', '', 'rd = int(sys.argv[1])', - 'sleep_time = %r' % self.sleep_time, - 'data = b"x" * %s' % support.PIPE_MAX_SIZE, + f'sleep_time = {self.sleep_time!r}', + f'data = b"x" * {support.PIPE_MAX_SIZE}', 'data_len = len(data)', '', '# let the parent block on write()', @@ -209,8 +209,8 @@ def test_write(self): '', 'value = read_data.getvalue()', 'if value != data:', - ' raise Exception("read error: %s vs %s bytes"', - ' % (len(value), data_len))', + ' raise Exception(f"read error: {len(value)}' + ' vs {data_len} bytes")', )) proc = self.subprocess(code, str(rd), pass_fds=[rd]) @@ -233,33 +233,33 @@ def _test_recv(self, recv_func): # wr closed explicitly by parent # single-byte payload guard us against partial recv - datas = [b"x", b"y", b"z"] + data = [b"x", b"y", b"z"] code = '\n'.join(( 'import os, socket, sys, time', '', 'fd = int(sys.argv[1])', - 'family = %s' % int(wr.family), - 'sock_type = %s' % int(wr.type), - 'datas = %r' % datas, - 'sleep_time = %r' % self.sleep_time, + f'family = {int(wr.family)}', + f'sock_type = {int(wr.type)}', + f'data = {data!r}', + f'sleep_time = {self.sleep_time!r}', '', 'wr = socket.fromfd(fd, family, sock_type)', 'os.close(fd)', '', 'with wr:', - ' for data in datas:', + ' for item in data:', ' # let the parent block on recv()', ' time.sleep(sleep_time)', - ' wr.sendall(data)', + ' wr.sendall(item)', )) fd = wr.fileno() proc = self.subprocess(code, str(fd), pass_fds=[fd]) with kill_on_error(proc): wr.close() - for data in datas: - self.assertEqual(data, recv_func(rd, len(data))) + for item in data: + self.assertEqual(item, recv_func(rd, len(item))) self.assertEqual(proc.wait(), 0) def test_recv(self): @@ -281,10 +281,10 @@ def _test_send(self, send_func): 'import os, socket, sys, time', '', 'fd = int(sys.argv[1])', - 'family = %s' % int(rd.family), - 'sock_type = %s' % int(rd.type), - 'sleep_time = %r' % self.sleep_time, - 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3), + f'family = {int(rd.family)}', + f'sock_type = {int(rd.type)}', + f'sleep_time = {self.sleep_time!r}', + f'data = b"xyz" * {support.SOCK_MAX_SIZE // 3}', 'data_len = len(data)', '', 'rd = socket.fromfd(fd, family, sock_type)', @@ -300,8 +300,8 @@ def _test_send(self, send_func): ' n += rd.recv_into(memoryview(received_data)[n:])', '', 'if received_data != data:', - ' raise Exception("recv error: %s vs %s bytes"', - ' % (len(received_data), data_len))', + ' raise Exception(f"recv error: {len(received_data)}' + ' vs {data_len} bytes")', )) fd = rd.fileno() @@ -333,9 +333,9 @@ def test_accept(self): code = '\n'.join(( 'import socket, time', '', - 'host = %r' % socket_helper.HOST, - 'port = %s' % port, - 'sleep_time = %r' % self.sleep_time, + f'host = {socket_helper.HOST!r}', + f'port = {port}', + f'sleep_time = {self.sleep_time!r}', '', '# let parent block on accept()', 'time.sleep(sleep_time)', @@ -363,15 +363,15 @@ def _test_open(self, do_open_close_reader, do_open_close_writer): os_helper.unlink(filename) try: os.mkfifo(filename) - except PermissionError as e: - self.skipTest('os.mkfifo(): %s' % e) + except PermissionError as exc: + self.skipTest(f'os.mkfifo(): {exc!r}') self.addCleanup(os_helper.unlink, filename) code = '\n'.join(( 'import os, time', '', - 'path = %a' % filename, - 'sleep_time = %r' % self.sleep_time, + f'path = {filename!a}', + f'sleep_time = {self.sleep_time!r}', '', '# let the parent block', 'time.sleep(sleep_time)', @@ -427,21 +427,20 @@ class SignalEINTRTest(EINTRBaseTest): def check_sigwait(self, wait_func): signum = signal.SIGUSR1 - pid = os.getpid() old_handler = signal.signal(signum, lambda *args: None) self.addCleanup(signal.signal, signum, old_handler) code = '\n'.join(( 'import os, time', - 'pid = %s' % os.getpid(), - 'signum = %s' % int(signum), - 'sleep_time = %r' % self.sleep_time, + f'pid = {os.getpid()}', + f'signum = {int(signum)}', + f'sleep_time = {self.sleep_time!r}', 'time.sleep(sleep_time)', 'os.kill(pid, signum)', )) - old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) proc = self.subprocess(code)