|
12 | 12 | fcntl = import_module('fcntl')
|
13 | 13 |
|
14 | 14 |
|
15 |
| -# TODO - Write tests for flock() and lockf(). |
16 | 15 |
|
17 | 16 | def get_lockdata():
|
18 | 17 | try:
|
@@ -138,6 +137,34 @@ def test_flock(self):
|
138 | 137 | self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
|
139 | 138 | self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
|
140 | 139 |
|
| 140 | + def test_lockf(self): |
| 141 | + from multiprocessing import Process |
| 142 | + self.f = open(TESTFN, 'wb+') |
| 143 | + self.f.write(b'testpython') |
| 144 | + ex_cmd = fcntl.LOCK_EX | fcntl.LOCK_NB |
| 145 | + sh_cmd = fcntl.LOCK_SH | fcntl.LOCK_NB |
| 146 | + def try_lock_f_on_other_process(cmd): |
| 147 | + if cmd == ex_cmd: |
| 148 | + self.assertRaises(BlockingIOError, fcntl.lockf, self.f, cmd, 1, 0) |
| 149 | + if cmd == sh_cmd: |
| 150 | + fcntl.lockf(self.f, cmd, 1, 0) |
| 151 | + fcntl.lockf(self.f, cmd, 1, 1) |
| 152 | + |
| 153 | + fcntl.lockf(self.f, ex_cmd, 1, 0) |
| 154 | + p = Process(target=try_lock_f_on_other_process, args=(ex_cmd,)) |
| 155 | + p.start() |
| 156 | + p.join() |
| 157 | + fcntl.lockf(self.f, fcntl.LOCK_UN) |
| 158 | + self.assertEqual(p.exitcode, 0) |
| 159 | + |
| 160 | + fcntl.lockf(self.f, sh_cmd, 1, 0) |
| 161 | + p = Process(target=try_lock_f_on_other_process, args=(sh_cmd,)) |
| 162 | + p.start() |
| 163 | + p.join() |
| 164 | + fcntl.lockf(self.f, fcntl.LOCK_UN) |
| 165 | + self.assertEqual(p.exitcode, 0) |
| 166 | + |
| 167 | + |
141 | 168 | @cpython_only
|
142 | 169 | def test_flock_overflow(self):
|
143 | 170 | import _testcapi
|
|
0 commit comments