|
5 | 5 | import struct
|
6 | 6 | import sys
|
7 | 7 | import unittest
|
| 8 | +from multiprocessing import Process |
8 | 9 | from test.support import (verbose, TESTFN, unlink, run_unittest, import_module,
|
9 | 10 | cpython_only)
|
10 | 11 |
|
@@ -137,34 +138,33 @@ def test_flock(self):
|
137 | 138 | self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
|
138 | 139 | self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
|
139 | 140 |
|
140 |
| - def test_lockf(self): |
141 |
| - from multiprocessing import Process |
| 141 | + def test_lockf_exclusive(self): |
142 | 142 | self.f = open(TESTFN, 'wb+')
|
143 |
| - self.f.write(b'testpython') |
144 |
| - ex_cmd = fcntl.LOCK_EX | fcntl.LOCK_NB |
145 |
| - sh_cmd = fcntl.LOCK_SH | fcntl.LOCK_NB |
146 |
| - def try_lock_f_on_other_process(cmd): |
147 |
| - if cmd == ex_cmd: |
148 |
| - self.assertRaises(BlockingIOError, fcntl.lockf, self.f, cmd, 1, 0) |
149 |
| - if cmd == sh_cmd: |
150 |
| - fcntl.lockf(self.f, cmd, 1, 0) |
151 |
| - fcntl.lockf(self.f, cmd, 1, 1) |
152 |
| - |
153 |
| - fcntl.lockf(self.f, ex_cmd, 1, 0) |
154 |
| - p = Process(target=try_lock_f_on_other_process, args=(ex_cmd,)) |
| 143 | + cmd = fcntl.LOCK_EX | fcntl.LOCK_NB |
| 144 | + def try_lockf_on_other_process(): |
| 145 | + self.assertRaises(BlockingIOError, fcntl.lockf, self.f, cmd) |
| 146 | + |
| 147 | + fcntl.lockf(self.f, cmd) |
| 148 | + p = Process(target=try_lockf_on_other_process) |
155 | 149 | p.start()
|
156 | 150 | p.join()
|
157 | 151 | fcntl.lockf(self.f, fcntl.LOCK_UN)
|
158 | 152 | self.assertEqual(p.exitcode, 0)
|
159 | 153 |
|
160 |
| - fcntl.lockf(self.f, sh_cmd, 1, 0) |
161 |
| - p = Process(target=try_lock_f_on_other_process, args=(sh_cmd,)) |
| 154 | + def test_lockf_share(self): |
| 155 | + self.f = open(TESTFN, 'wb+') |
| 156 | + cmd = fcntl.LOCK_SH | fcntl.LOCK_NB |
| 157 | + def try_lockf_on_other_process(): |
| 158 | + fcntl.lockf(self.f, cmd) |
| 159 | + fcntl.lockf(self.f, fcntl.LOCK_UN) |
| 160 | + |
| 161 | + fcntl.lockf(self.f, cmd) |
| 162 | + p = Process(target=try_lockf_on_other_process) |
162 | 163 | p.start()
|
163 | 164 | p.join()
|
164 | 165 | fcntl.lockf(self.f, fcntl.LOCK_UN)
|
165 | 166 | self.assertEqual(p.exitcode, 0)
|
166 | 167 |
|
167 |
| - |
168 | 168 | @cpython_only
|
169 | 169 | def test_flock_overflow(self):
|
170 | 170 | import _testcapi
|
|
0 commit comments