diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 75f31d858d3306..82b2298c6c4929 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -137,8 +137,12 @@ def _resource_unlink(name, rtype): WAIT_ACTIVE_CHILDREN_TIMEOUT = 5.0 -HAVE_GETVALUE = not getattr(_multiprocessing, - 'HAVE_BROKEN_SEM_GETVALUE', False) +# Since gh-125828, we no longer need HAVE_GETVALUE. +# This value should be remove from Modules/_multiprocessing/multiprocessing.c. +# when cleanup is complete. +# ------------------- +# HAVE_GETVALUE = not getattr(_multiprocessing, +# 'HAVE_BROKEN_SEM_GETVALUE', False) WIN32 = (sys.platform == "win32") @@ -1664,10 +1668,8 @@ def test_semaphore(self): def test_bounded_semaphore(self): sem = self.BoundedSemaphore(2) self._test_semaphore(sem) - # Currently fails on OS/X - #if HAVE_GETVALUE: - # self.assertRaises(ValueError, sem.release) - # self.assertReturnsIfImplemented(2, get_value, sem) + self.assertRaises(ValueError, sem.release) + self.assertReturnsIfImplemented(2, get_value, sem) def test_timeout(self): if self.TYPE != 'processes': @@ -6867,6 +6869,63 @@ def child(): close_queue(q) +# +# Tests for workaround macOSX Semaphore +# + +ACQUIRE, RELEASE = range(2) +@unittest.skipIf(sys.platform != "darwin", "MacOSX only") +class _TestMacOSXSemaphore(BaseTestCase): + ALLOWED_TYPES = ('processes',) + @classmethod + def _run_thread(cls, sem, meth, ntime, delay): + if meth == ACQUIRE: + for _ in range(ntime): + sem.acquire() + time.sleep(delay) + else: + for _ in range(ntime): + sem.release() + time.sleep(delay) + + @classmethod + def _run_process(cls, sem, sem_meth, nthread=1, ntime=10, delay=0.1): + ts = [] + for _ in range(nthread): + t = threading.Thread(target=cls._run_thread, + args=(sem, sem_meth, ntime, delay)) + ts.append(t) + for t in ts: + t.start() + for t in ts: + t.join() + + def test_mix_several_acquire_release(self): + # n processes, threads per process and loops per threads + n_p_acq, n_th_acq, n_loop_acq = 15, 5, 20 + n_p_rel, n_th_rel, n_loop_rel = 8, 8, 8 + + n_acq = n_p_acq*n_th_acq*n_loop_acq + n_rel = n_p_rel*n_th_rel*n_loop_rel + sem = self.Semaphore(n_acq) + ps = [] + for _ in range(n_p_acq): + p = self.Process(target=self._run_process, + args=(sem, ACQUIRE, n_th_acq, n_loop_acq, 0.01)) + ps.append(p) + + for _ in range(n_p_rel): + p = self.Process(target=self._run_process, + args=(sem, RELEASE, n_th_rel, n_loop_rel, 0.005)) + ps.append(p) + + for p in ps: + p.start() + for p in ps: + p.join() + self.assertEqual(sem.get_value(), n_rel) + + # # Mixins # diff --git a/Misc/NEWS.d/next/macOS/2025-03-06-18-52-05.gh-issue-125828.JkMjD2.rst b/Misc/NEWS.d/next/macOS/2025-03-06-18-52-05.gh-issue-125828.JkMjD2.rst new file mode 100644 index 00000000000000..2a6e81a3cdc97c --- /dev/null +++ b/Misc/NEWS.d/next/macOS/2025-03-06-18-52-05.gh-issue-125828.JkMjD2.rst @@ -0,0 +1,3 @@ +Fix the not implemented ``get_value`` for :class:`multiprocessing.Semaphore` on MacOSX +by adding a dedicated workaround in ``_multiprocessing.SemLock``. +The changes are located in the ``semaphore.c`` file in :mod:`_multiprocessing`. diff --git a/Modules/_multiprocessing/dump_shm_macosx/dump_shared_mem.c b/Modules/_multiprocessing/dump_shm_macosx/dump_shared_mem.c new file mode 100644 index 00000000000000..c205cf1a391393 --- /dev/null +++ b/Modules/_multiprocessing/dump_shm_macosx/dump_shared_mem.c @@ -0,0 +1,102 @@ +#include +#include // puts, printf, scanf +#include // ctime, time +#include // memcpy, memcmp + +#include // sem_t +typedef sem_t *SEM_HANDLE; + +#define MAX_SEMAPHORES_SHOW 32 + +#include "../semaphore_macosx.h" +#include "shared_mem.h" + +// Static datas for each process. +CountersWorkaround shm_semlock_counters = { + .state_this = THIS_NOT_OPEN, + .name_shm = "/shm_gh125828", + .handle_shm = (MEMORY_HANDLE)0, + .create_shm = 0, + .name_shm_lock = "/mp_gh125828", + .handle_shm_lock = (SEM_HANDLE)0, + .header = (HeaderObject *)NULL, + .counters = (CounterObject *)NULL, +}; + +HeaderObject *header = NULL; +CounterObject *counter = NULL; + +static char *show_counter(char *p, CounterObject *counter) { + sprintf(p, "p:%p, n:%s, v:%d, u:%d, t:%s", counter, + counter->sem_name, + counter->internal_value, + counter->unlink_done, + ctime(&counter->ctimestamp)); + return p; +} + +static void dump_shm_semlock_counters(void) { +puts(__func__); + + char buf[256]; + int i = 0, j = 0; + + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + CounterObject *counter = shm_semlock_counters.counters; + HeaderObject *header = shm_semlock_counters.header; + dump_shm_semlock_header_counters(); + dump_shm_semlock_header(); + int show_max = header->n_semlocks > MAX_SEMAPHORES_SHOW ? MAX_SEMAPHORES_SHOW : header->n_semlocks; + for(; i < header->n_slots && j < show_max; i++, counter++ ) { + if (counter->sem_name[0] != 0) { + printf("%s", show_counter(buf, counter)); + ++j; + } + } + if (show_max < header->n_semlocks) { + printf("......\n--------- More %d Semphores ---------\n", header->n_semlocks-show_max); + } + } +} + +int main(int argc, char *argv[]) { + int repeat = 0; + long udelay = 5000; + HeaderObject save = {0}; + int unlink = 0; + int force_open = 1; + int release_lock = 1; + + puts("--------"); + printf("PID:%d, PPID:%d\n", getpid(), getppid()); + connect_shm_semlock_counters(unlink, force_open, release_lock); + puts("+++++++++"); + if (argc > 1) { + sscanf(argv[1], "%d", &repeat); + if (argc >= 2) { + puts(argv[2]); + sscanf(argv[2], "%lu", &udelay); + } + } else { + puts("dump_shared_mem where:\n repeat (-1 " + "is infinite) and a delay (us) between two dumps \n"); + return 1; + } + + printf("Repeat:%d, udelay:%lu\n", repeat, udelay); + + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + memset(&save, '\0', sizeof(save)); + do { + if (memcmp(&save, shm_semlock_counters.header, sizeof(HeaderObject)) ) { + time_t timestamp = time(NULL); + puts(ctime(×tamp)); + dump_shm_semlock_counters(); + memcpy(&save, shm_semlock_counters.header, sizeof(HeaderObject)); + puts("=========="); + } + usleep(udelay); + } while(repeat--); + } + return 1; +} diff --git a/Modules/_multiprocessing/dump_shm_macosx/make_all.sh b/Modules/_multiprocessing/dump_shm_macosx/make_all.sh new file mode 100755 index 00000000000000..2a1d6ded19a3d7 --- /dev/null +++ b/Modules/_multiprocessing/dump_shm_macosx/make_all.sh @@ -0,0 +1,2 @@ +gcc -o ./dump_shm ./dump_shared_mem.c ./shared_mem.c +gcc -o ./reset_shm ./reset_shared_mem.c ./shared_mem.c diff --git a/Modules/_multiprocessing/dump_shm_macosx/readme.md b/Modules/_multiprocessing/dump_shm_macosx/readme.md new file mode 100644 index 00000000000000..826579e11bc433 --- /dev/null +++ b/Modules/_multiprocessing/dump_shm_macosx/readme.md @@ -0,0 +1,41 @@ +** For MacOSX only ** +--- + +This directory contains 2 programs : + +* dump_shared_mem: view content of shared memory. +* reset_shared_mem: erase all stored datas of shared memory. + +the `make_all.sh` batch builds these 2 programs. + +# dump_shm. + +`dump_shm` tries to connect to the shared memory only if its exists. +This program doesn't use synchronization primitive to read the shared memory. +To quit this program, press `Ctrl+C`. + +```zsh +dump_shm -1 300 +``` +Executes this program forever, and check all 300 *us* if shared memory changes. + +When there are changes in the shared memory (only about sempahore count), program prints the new content of shared memory as below: + +```zsh +========== +Tue Feb 25 17:04:05 2025 + +dump_shm_semlock_counters +header:0x1022b4000 - counter array:0x1022b4010 +n sems:2 - n sem_slots:87551, n procs:1, size_shm:2801664 +p:0x1022b4010, n:/mp-fwl20ahw, v:6, r:0, t:Tue Feb 25 17:04:05 2025 +p:0x1022b4030, n:/mp-z3635cdr, v:6, r:0, t:Tue Feb 25 17:04:04 2025 + +``` + +# reset_shm. + +`reset_shm` tries to connect to the shared memory only if its exists. +This program uses synchronization primitive to read the shared memory. + +When exits, this program calls `shm_unlink`. \ No newline at end of file diff --git a/Modules/_multiprocessing/dump_shm_macosx/reset_shared_mem.c b/Modules/_multiprocessing/dump_shm_macosx/reset_shared_mem.c new file mode 100644 index 00000000000000..94aa5a73a776bf --- /dev/null +++ b/Modules/_multiprocessing/dump_shm_macosx/reset_shared_mem.c @@ -0,0 +1,84 @@ +#include +#include // puts, printf, scanf +#include // memcpy, memcmp, memset + +#include +typedef sem_t *SEM_HANDLE; + +#include "../semaphore_macosx.h" +#include "shared_mem.h" + +// Static datas for each process. +CountersWorkaround shm_semlock_counters = { + .state_this = THIS_NOT_OPEN, + .name_shm = "/shm_gh125828", + .handle_shm = (MEMORY_HANDLE)0, + .create_shm = 0, + .name_shm_lock = "/mp_gh125828", + .handle_shm_lock = (SEM_HANDLE)0, + .header = (HeaderObject *)NULL, + .counters = (CounterObject *)NULL, +}; + +HeaderObject *header = NULL; +CounterObject *counter = NULL; + +static void reset_shm_semlock_counters(int size, int nb_slots) { +puts(__func__); + + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + if (ACQUIRE_SHM_LOCK) { + CounterObject *counter = shm_semlock_counters.counters; + HeaderObject *header = shm_semlock_counters.header; + dump_shm_semlock_header_counters(); + dump_shm_semlock_header(); + long size_to_reset = header->size_shm-sizeof(HeaderObject); + printf("1 - size to reset:%lu\n", size_to_reset); + if (size && size <= size_to_reset) { + memset(counter, 0, size); + + } else { + memset(counter, 0, size_to_reset); + } + puts("2 - Reset all header parameters"); + if (nb_slots) { + header->n_slots = nb_slots; + } + header->n_semlocks = 0; + header->n_slots = CALC_NB_SLOTS(header->size_shm); + header->n_procs = 0; + dump_shm_semlock_header(); + RELEASE_SHM_LOCK; + } + } else { + puts("No datas"); + } +} + +int main(int argc, char *argv[]) { + char c; + int size = CALC_SIZE_SHM; + int nb_slots = CALC_NB_SLOTS(size); + int unlink = 1; + int force_open = 1; + int release_lock = 1; + + if (argc >= 2) { + sscanf(argv[2], "%d", &size); + nb_slots = CALC_NB_SLOTS(size); + } + puts("--------"); + printf("size:%d, sem slots:%d\n", size, nb_slots); + connect_shm_semlock_counters(unlink, force_open, release_lock); + puts("+++++++++"); + dump_shm_semlock_header_counters(); + dump_shm_semlock_header(); + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + puts("confirm (Y/N):"); + c = getchar(); + if ( c == 'Y' || c == 'y') { + reset_shm_semlock_counters(size, nb_slots); + } + } + return 1; +} diff --git a/Modules/_multiprocessing/dump_shm_macosx/shared_mem.c b/Modules/_multiprocessing/dump_shm_macosx/shared_mem.c new file mode 100644 index 00000000000000..2a457bde5e8a9f --- /dev/null +++ b/Modules/_multiprocessing/dump_shm_macosx/shared_mem.c @@ -0,0 +1,146 @@ +#include /* O_CREAT and O_EXCL */ +#include // signal +#include // printf, puts +#include // atexit +#include // errno +#include // sysconf + +#include // sem_open +typedef sem_t *SEM_HANDLE; + +#include "../semaphore_macosx.h" +#include "shared_mem.h" + +void sigterm(int code) { + exit(EXIT_SUCCESS); +} + +int acquire_lock(SEM_HANDLE sem) { + sem_wait(sem); + return 1; +} + +int release_lock(SEM_HANDLE sem) { + sem_post(sem); + return 1; +} + +void connect_shm_semlock_counters(int unlink, int force_open, int call_release_lock) { +puts(__func__); + + int oflag = O_RDWR; + int shm = -1; + int res = -1; + SEM_HANDLE sem = SEM_FAILED; + long size_shm_init = CALC_SIZE_SHM; + long size_shm = ALIGN_SHM_PAGE(size_shm_init); + + // printf("size1: %lu vs size2:%lu\n", size_shm_init, size_shm); + + // Install signals. + signal(SIGTERM, &sigterm); + signal(SIGINT, &sigterm); + + errno = 0; + if (sem == SEM_FAILED) { + errno = 0; + // Semaphore exists, just opens it. + sem = sem_open(shm_semlock_counters.name_shm_lock, 0); + // Not exists, creates it. + if (force_open && sem == SEM_FAILED) { + sem = sem_open(shm_semlock_counters.name_shm_lock, O_CREAT, 0600, 1); + } + } + printf("sem:%p\n", sem); + shm_semlock_counters.handle_shm_lock = sem; + + if (call_release_lock) { + RELEASE_SHM_LOCK; + } + + // Locks to semaphore. + if (sem != SEM_FAILED && ACQUIRE_SHM_LOCK) { + printf("Shm Lock ok on %p\n", sem); + // connect to Shared mem + shm = shm_open(shm_semlock_counters.name_shm, oflag, 0); + if (shm != -1) { + shm_semlock_counters.handle_shm = shm; + printf("Shared Mem ok on '%d'\n", shm); + char *ptr = (char *)mmap(NULL, + size_shm, + (PROT_WRITE | PROT_READ), + MAP_SHARED, + shm_semlock_counters.handle_shm, + 0L); + shm_semlock_counters.header = (HeaderObject *)ptr; + shm_semlock_counters.counters = (CounterObject *)(ptr+sizeof(HeaderObject)); + printf("Shared memory size is %lu vs %d\n", size_shm, + shm_semlock_counters.header->size_shm); + // Initialization is successful. + shm_semlock_counters.state_this = THIS_AVAILABLE; + header = shm_semlock_counters.header; + counter = shm_semlock_counters.counters; + if (unlink) { + atexit(delete_shm_semlock_counters); + + } else { + atexit(delete_shm_semlock_counters_without_unlink); + } + puts("Ok...."); + } else { + printf("The shared memory '%s' does not exist\n", shm_semlock_counters.name_shm); + } + RELEASE_SHM_LOCK; + printf("Shm Unlock ok on %p\n", sem); + } else { + puts("No Semaphore opened !!"); + } +} + +static void _delete_shm_semlock_counters(int unlink) { + + puts("clean up..."); + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + if (shm_semlock_counters.counters) { + if (ACQUIRE_SHM_LOCK) { + // unmmap + munmap(shm_semlock_counters.counters, + shm_semlock_counters.header->size_shm); + if (unlink) { + shm_unlink(shm_semlock_counters.name_shm); + } + shm_semlock_counters.state_this = THIS_CLOSED; + RELEASE_SHM_LOCK; + } + } + // close lock + sem_close(shm_semlock_counters.handle_shm_lock); + sem_unlink(shm_semlock_counters.name_shm_lock); + } +} + + +void delete_shm_semlock_counters_without_unlink(void) { +puts(__func__); + _delete_shm_semlock_counters(0); +} + +void delete_shm_semlock_counters(void) { +puts(__func__); + _delete_shm_semlock_counters(1); +} + +void dump_shm_semlock_header(void) { + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + printf("n sems:%d - n sem_slots:%d, n procs:%d, size_shm:%d\n", header->n_semlocks, + header->n_slots, + header->n_procs, + header->size_shm); + } +} + +void dump_shm_semlock_header_counters(void) { + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + printf("header:%p - counter array:%p\n", header, counter); + } +} diff --git a/Modules/_multiprocessing/dump_shm_macosx/shared_mem.h b/Modules/_multiprocessing/dump_shm_macosx/shared_mem.h new file mode 100644 index 00000000000000..ab3faf23c894db --- /dev/null +++ b/Modules/_multiprocessing/dump_shm_macosx/shared_mem.h @@ -0,0 +1,18 @@ +#ifndef SHARED_MEM_H +#define SHARED_MEM_H + +extern CountersWorkaround shm_semlock_counters; +extern HeaderObject *header; +extern CounterObject *counter; + +int acquire_lock(SEM_HANDLE sem); +int release_lock(SEM_HANDLE sem); + +void connect_shm_semlock_counters(int unlink, int force_connect, int release_lock); +void delete_shm_semlock_counters_without_unlink(void); +void delete_shm_semlock_counters(void); + +void dump_shm_semlock_header(void); +void dump_shm_semlock_header_counters(void); + +#endif /* SHARED_MEM_H */ diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c index a4a2a866ccbfce..5e0bd05056674e 100644 --- a/Modules/_multiprocessing/semaphore.c +++ b/Modules/_multiprocessing/semaphore.c @@ -18,17 +18,6 @@ // These match the values in Lib/multiprocessing/synchronize.py enum { RECURSIVE_MUTEX, SEMAPHORE }; -typedef struct { - PyObject_HEAD - SEM_HANDLE handle; - unsigned long last_tid; - int count; - int maxvalue; - int kind; - char *name; -} SemLockObject; - -#define _SemLockObject_CAST(op) ((SemLockObject *)(op)) /*[python input] class SEM_HANDLE_converter(CConverter): @@ -44,10 +33,25 @@ class _multiprocessing.SemLock "SemLockObject *" "&_PyMp_SemLockType" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=935fb41b7d032599]*/ +#ifndef HAVE_BROKEN_SEM_GETVALUE + +typedef struct { + PyObject_HEAD + SEM_HANDLE handle; + unsigned long last_tid; + int count; + int maxvalue; + int kind; + char *name; +} SemLockObject; + +#define _SemLockObject_CAST(op) ((SemLockObject *)(op)) + #include "clinic/semaphore.c.h" #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid) +#endif /* !HAVE_BROKEN_SEM_GETVALUE */ #ifdef MS_WINDOWS @@ -55,6 +59,7 @@ class _multiprocessing.SemLock "SemLockObject *" "&_PyMp_SemLockType" * Windows definitions */ + #define SEM_FAILED NULL #define SEM_CLEAR_ERROR() SetLastError(0) @@ -252,7 +257,7 @@ sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) tvdeadline.tv_sec = deadline->tv_sec; tvdeadline.tv_usec = deadline->tv_nsec / 1000; - for (delay = 0 ; ; delay += 1000) { + for (delay = 0;; delay += 1000) { /* poll */ if (sem_trywait(sem) == 0) return 0; @@ -301,6 +306,414 @@ sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) #endif /* !HAVE_SEM_TIMEDWAIT */ +#ifdef HAVE_BROKEN_SEM_GETVALUE +/* +cf: https://github.com/python/cpython/issues/125828 + +On MacOSX, `sem_getvalue` is not implemented. This workaround proposes to handle +an internal value for each Semaphore ((R)Lock are out of scope) in a shared memory +available for each processed. + +This internal value is stored in a structure named CounterObject with: ++ the referenced semaphore name, ++ the (internal) current value, ++ a flag to reset counter when unlink/dealloc. ++ a created timestamp. + +A header with 4 members is created to manage all the CounterObject in the shared memory. ++ the count of CountedObject stored. ++ the count of available slots. ++ the size of shared memory. ++ the count of attached processes. + +With each Semaphore (SemLock) a mutex is created in order to avoid data races +when the internal counter of CounterObject is updated. + +When impl/rebuid functions are called, a CounterObject and a mutex are associated to the Semaphore. +When acquire/release functions are called, internal value of CounterObject is updated. +When getvalue function is called, the internal value is returned. +When unlink is called, Semaphore and its associated mutex are unlink, CounterObject is reset. + +1 -> Structure of shared memory: + + ----------- fixed array of 'N' Counters ----------- + / \ ++-----------------+----------------+---/ /---+-------------+-------------+ +| Header | Counter 1 | | Counter N-1 | Counter N | +|-----------------|----------------| .... |-------------|-------------| +| | | | | | +| n_semlocks | sem_name | | | | +| n_semlocks | internal_value | | | | +| size_shm | unlink_done | | | | +| n_procs | ctimestamp | | | | ++-----------------+----------------+---/ /---+-------------+-------------+ + +A dedicated lock is also created to control operations to the shared memory. +Operations are: ++ create or connect shared mem. ++ looking for a free slot. ++ stored counter datas in a free slot. ++ looking for a stored counter. ++ clear counter datas. + +*/ +// ------------- list of structures -------------- + +#include "semaphore_macosx.h" // CounterObject, HeaderObject, CountersWorkaround + +/* +Datas for each process. +*/ +CountersWorkaround shm_semlock_counters = { + .state_this = THIS_NOT_OPEN, + .name_shm = "/shm_gh125828", + .handle_shm = (MEMORY_HANDLE)0, + .create_shm = 0, + .name_shm_lock = "/mp_gh125828", + .handle_shm_lock = (SEM_HANDLE)0, + .header = (HeaderObject *)NULL, + .counters = (CounterObject *)NULL +}; + +/* +SemLockObject with aditionnal members: ++ a mutex to handle safely the associated CounterObject. ++ a pointer to CounterObject (from array). +*/ +typedef struct { + PyObject_HEAD + SEM_HANDLE handle; + unsigned long last_tid; + int count; + int maxvalue; + int kind; + char *name; + /* Additionnal datas for handle MacOSX semaphore */ + SEM_HANDLE handle_mutex; + CounterObject *counter; +} SemLockObject; + +#define _SemLockObject_CAST(op) ((SemLockObject *)(op)) + +#define ISMINE(o) ((o)->count > 0 && PyThread_get_thread_ident() == (o)->last_tid) + +#include "clinic/semaphore.c.h" + +/* +Release a mutex/lock +*/ +static int +release_lock(SEM_HANDLE handle) { + int res = -1 ; + + errno = 0; + res = sem_post(handle); + if ( res < 0) { + PyErr_SetFromErrno(PyExc_OSError); + } + return res; +} + +/* +Acquire a mutex (See _multiprocessing_SemLock_acquire_impl function). +*/ +static int +acquire_lock(SEM_HANDLE handle) { + int res = -1; + int err = 0 ; + + /* Check whether we can acquire without releasing the GIL and blocking */ + errno = 0; + do { + res = sem_trywait(handle); + err = errno; + } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); + + errno = err; + if (res < 0 && errno == EAGAIN) { + /* Couldn't acquire immediately, need to block */ + do { + Py_BEGIN_ALLOW_THREADS + res = sem_wait(handle); + Py_END_ALLOW_THREADS + err = errno; + if (res == MP_EXCEPTION_HAS_BEEN_SET) + break; + } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); + } + if (res < 0) { + errno = err; + PyErr_SetFromErrno(PyExc_OSError); + return -1; + } + return res; +} + +/* +Shared memory management +*/ +static void +close_and_unlink_shm_lock(void) { + if( shm_semlock_counters.state_this == THIS_CLOSED) { + // close lock and unlink + sem_close(shm_semlock_counters.handle_shm_lock); + sem_unlink(shm_semlock_counters.name_shm_lock); + } +} + +static void // is a static function can be passed to atexit ? +delete_shm_semlock_counters(void) { + + if (shm_semlock_counters.handle_shm_lock != SEM_FAILED && + shm_semlock_counters.state_this == THIS_AVAILABLE) { + if (shm_semlock_counters.counters) { + if (ACQUIRE_SHM_LOCK) { + munmap(shm_semlock_counters.counters, + shm_semlock_counters.header->size_shm); + + // decreases counter of process. + --shm_semlock_counters.header->n_procs; + + /* + When and how to call the `shm_unlink' function ? + Currently, these two tests don't always work. + */ + if (!shm_semlock_counters.header->n_procs || shm_semlock_counters.create_shm == 1) { + shm_unlink(shm_semlock_counters.name_shm); + } + shm_semlock_counters.state_this = THIS_CLOSED; + + if (RELEASE_SHM_LOCK) { + close_and_unlink_shm_lock(); + } + } + } + } +} + +static void +create_or_connect_shm_lock(const char *from_sem_name) { + SEM_HANDLE sem = SEM_FAILED; + + errno = 0; + sem = SEM_CREATE(shm_semlock_counters.name_shm_lock, 1, 1); + if (sem == SEM_FAILED) { + errno = 0; + // Semaphore exists, just opens it. + sem = sem_open(shm_semlock_counters.name_shm_lock, 0); + } + shm_semlock_counters.handle_shm_lock = sem; +} + +static void +create_shm_semlock_counters(const char *from_sem_name) { + int oflag = O_RDWR; + int shm = -1; + int res = -1; + char *datas = NULL; + HeaderObject *header = NULL; + long size_shm = CALC_SIZE_SHM; + + // already done + if (shm_semlock_counters.state_this != THIS_NOT_OPEN) { + return; + } + // Create a lock or connect if exists. + create_or_connect_shm_lock(from_sem_name); + + // Acquire the shared memory lock in order to be alone to + // create shared memory. + if (ACQUIRE_SHM_LOCK) { + if (shm_semlock_counters.handle_shm == (MEMORY_HANDLE)0) { + // Calculate a new size as a multiple of SC_PAGESIZE. + size_shm = ALIGN_SHM_PAGE(size_shm); + + shm = shm_open(shm_semlock_counters.name_shm, oflag, 0); + res = 0; + if (shm == -1) { + oflag |= O_CREAT; + shm = shm_open(shm_semlock_counters.name_shm, oflag, S_IRUSR | S_IWUSR); + // Set size. + res = ftruncate(shm, size_shm); + shm_semlock_counters.create_shm = 1; + } + // mmap + if (res >= 0) { + shm_semlock_counters.handle_shm = shm; + datas = (char *)mmap(NULL, + size_shm, + (PROT_WRITE | PROT_READ), + (MAP_SHARED), + shm_semlock_counters.handle_shm, + 0L); + if (datas != MAP_FAILED) { + /* Header */ + shm_semlock_counters.header = (HeaderObject *)datas; + /* First slot of array */ + shm_semlock_counters.counters = (CounterObject *)(datas+sizeof(HeaderObject)); + header = shm_semlock_counters.header; + /* When mmap is just created, initialize all members. */ + if (oflag & O_CREAT) { + header->size_shm = size_shm; + header->n_slots = CALC_NB_SLOTS(size_shm); + header->n_semlocks = 0; + header->n_procs = 0; + } + ++header->n_procs; + + /* Initialization is successful. */ + shm_semlock_counters.state_this = THIS_AVAILABLE; + Py_AtExit(delete_shm_semlock_counters); + } + } + } + RELEASE_SHM_LOCK; + } +} + +/* +Build name of mutex associated with each Semaphore. +Name is unique and create from SemLock python class. +*/ +static char *gh_name = "_gh125828"; + +static char * +_build_sem_name(char *buf, const char *name) { + strcpy(buf, name); + strcat(buf, gh_name); + return buf; +} + +/* +Search if the semaphore name is already stored in the array of CounterObject +stored into the shared memory. +*/ +static CounterObject* +_search_counter_from_sem_name(const char *sem_name) { + int i = 0, j = 0; + HeaderObject *header = shm_semlock_counters.header; + CounterObject *counter = shm_semlock_counters.counters; + + while(i < header->n_slots && j < header->n_semlocks) { + if(!PyOS_stricmp(counter->sem_name, sem_name)) { + return counter; + } + if (counter->sem_name[0] != 0) { + ++j; + } + ++i; + ++counter; + } + return (CounterObject *)NULL; +} + +/* +Search for a free slot from the array of CounterObject. +*/ +static CounterObject* +_search_counter_free_slot(void) { + int i = 0; + HeaderObject *header = shm_semlock_counters.header; + CounterObject *counter = shm_semlock_counters.counters; + + while (i < header->n_slots) { + if(counter->sem_name[0] == 0) { + return counter; + } + ++counter; + ++i; + } + + /* + Not enough memory: see NSEMS_MAX in semaphore_macosx.h. + */ + return (CounterObject *)NULL; +} + +/* +Connect a Semaphore with an existing CounterObject, from `SemLock__rebuild. +*/ +static CounterObject * +connect_counter(SemLockObject *self, const char *name) { + CounterObject *counter = NULL; + + if (shm_semlock_counters.state_this == THIS_NOT_OPEN) { + create_shm_semlock_counters(name); + } + + // error is set in ACQUIRE/RELEASE_* macros. + if (ACQUIRE_SHM_LOCK) { + counter = _search_counter_from_sem_name(name); + if (!counter) { + PyErr_SetString(PyExc_ValueError, "Can't find reference to this Semaphore"); + } + RELEASE_SHM_LOCK; // error set in release_lock function + } + return counter; +} + +/* +Create a new CounterObject for a Semaphore, from `SemLock_Impl`. +*/ +static CounterObject * +new_counter(SemLockObject *self, const char *name, + int value, int unlink_done) { + CounterObject *counter = NULL; + + if (shm_semlock_counters.state_this == THIS_NOT_OPEN) { + create_shm_semlock_counters(name); + } + + // error is set in ACQUIRE/RELEASE_* macros. + if (ACQUIRE_SHM_LOCK) { // error set in acquire_lock function + counter = _search_counter_free_slot(); + if (counter) { + // Create a new counter. + strcpy(counter->sem_name, name); + counter->internal_value = value; + counter->unlink_done = unlink_done; + counter->ctimestamp = time(NULL); + + // Update header. + ++shm_semlock_counters.header->n_semlocks; + } else { + PyErr_SetString(PyExc_MemoryError, "Can't allocate more " + "shared memory for MacOSX " + "Semaphore workaround"); + } + if (!RELEASE_SHM_LOCK) { + memset(counter, 0 ,sizeof(CounterObject)); + --shm_semlock_counters.header->n_semlocks; + counter = NULL; + } + } + return counter; +} + +/* +Checks if CounterObject must be reset from the array. +*/ +static int +dealloc_counter(CounterObject *counter) { + int res = -1; + + if (counter->unlink_done) { + // error is set in ACQUIRE/RELEASE_* macros. + if (ACQUIRE_SHM_LOCK) { + // Reset counter. + memset(counter, 0, sizeof(CounterObject)); + // Update header. + --shm_semlock_counters.header->n_semlocks; + if (RELEASE_SHM_LOCK) { + return 0; + } + } + } + return res; +} + +#endif /* HAVE_BROKEN_SEM_GETVALUE */ + /*[clinic input] @critical_section _multiprocessing.SemLock.acquire @@ -318,12 +731,10 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking, { int res, err = 0; struct timespec deadline = {0}; - if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) { ++self->count; Py_RETURN_TRUE; } - int use_deadline = (timeout_obj != Py_None); if (use_deadline) { double timeout = PyFloat_AsDouble(timeout_obj); @@ -370,20 +781,32 @@ _multiprocessing_SemLock_acquire_impl(SemLockObject *self, int blocking, break; } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); } - if (res < 0) { errno = err; - if (errno == EAGAIN || errno == ETIMEDOUT) + if (errno == EAGAIN || errno == ETIMEDOUT) { Py_RETURN_FALSE; - else if (errno == EINTR) + } + if (errno == EINTR) { return NULL; - else - return PyErr_SetFromErrno(PyExc_OSError); + } + return PyErr_SetFromErrno(PyExc_OSError); + } +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + // error is set in ACQUIRE/RELEASE_* macros. + if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + --self->counter->internal_value; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } else { + return NULL; + } } +#endif ++self->count; self->last_tid = PyThread_get_thread_ident(); - Py_RETURN_TRUE; } @@ -401,8 +824,8 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self) if (self->kind == RECURSIVE_MUTEX) { if (!ISMINE(self)) { PyErr_SetString(PyExc_AssertionError, "attempt to " - "release recursive lock not owned " - "by thread"); + "release recursive lock " + "not owned by thread"); return NULL; } if (self->count > 1) { @@ -432,12 +855,29 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self) "times"); return NULL; } + } else { + int sval = -1; + if (ISSEMAPHORE(self)) { + if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + sval = self->counter->internal_value; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } else { + return NULL; + } + + if ( sval >= self->maxvalue) { + PyErr_SetString(PyExc_ValueError, "semaphore or lock " + "released too many times"); + return NULL; + } + } } -#else +#else /* HAVE_BROKEN_SEM_GETVALUE */ int sval; - /* This check is not an absolute guarantee that the semaphore - does not rise above maxvalue. */ + does not rise above maxvalue. */ if (sem_getvalue(self->handle, &sval) < 0) { return PyErr_SetFromErrno(PyExc_OSError); } else if (sval >= self->maxvalue) { @@ -451,6 +891,20 @@ _multiprocessing_SemLock_release_impl(SemLockObject *self) if (sem_post(self->handle) < 0) return PyErr_SetFromErrno(PyExc_OSError); +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + // error is set in ACQUIRE/RELEASE_* macros. + if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + ++self->counter->internal_value; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } else { + return NULL; + } + } +#endif + --self->count; Py_RETURN_NONE; } @@ -474,6 +928,10 @@ newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue, self->last_tid = 0; self->maxvalue = maxvalue; self->name = name; +#ifdef HAVE_BROKEN_SEM_GETVALUE + self->handle_mutex = SEM_FAILED; + self->counter = NULL; +#endif return (PyObject*)self; } @@ -497,6 +955,11 @@ _multiprocessing_SemLock_impl(PyTypeObject *type, int kind, int value, SEM_HANDLE handle = SEM_FAILED; PyObject *result; char *name_copy = NULL; +#ifdef HAVE_BROKEN_SEM_GETVALUE + char mutex_name[36]; + SemLockObject *semlock = NULL; + SEM_HANDLE handle_mutex = SEM_FAILED; +#endif if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) { PyErr_SetString(PyExc_ValueError, "unrecognized kind"); @@ -510,20 +973,43 @@ _multiprocessing_SemLock_impl(PyTypeObject *type, int kind, int value, } strcpy(name_copy, name); } - SEM_CLEAR_ERROR(); handle = SEM_CREATE(name, value, maxvalue); /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */ if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0) goto failure; - if (unlink && SEM_UNLINK(name) < 0) goto failure; +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE2(maxvalue, kind)) { + _build_sem_name(mutex_name, name); + handle_mutex = SEM_CREATE(mutex_name, 1, 1); + if (handle_mutex == SEM_FAILED) + goto failure; + if (unlink && SEM_UNLINK(mutex_name) < 0) + goto failure; + } +#endif + result = newsemlockobject(type, handle, kind, maxvalue, name_copy); if (!result) goto failure; +#ifdef HAVE_BROKEN_SEM_GETVALUE + semlock = _SemLockObject_CAST(result); + if (ISSEMAPHORE2(maxvalue, kind)) { + semlock->handle_mutex = handle_mutex; + semlock->counter = new_counter(semlock, name, value, unlink); + if (!semlock->counter) { + PyObject_GC_UnTrack(semlock); + type->tp_free(semlock); + Py_DECREF(type); + goto failure; + } + } +#endif + return result; failure: @@ -532,6 +1018,14 @@ _multiprocessing_SemLock_impl(PyTypeObject *type, int kind, int value, } if (handle != SEM_FAILED) SEM_CLOSE(handle); +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE2(maxvalue, kind)) { + if (handle_mutex != SEM_FAILED) { + SEM_CLOSE(handle_mutex); + } + } +#endif + PyMem_Free(name_copy); return NULL; } @@ -554,7 +1048,13 @@ _multiprocessing_SemLock__rebuild_impl(PyTypeObject *type, SEM_HANDLE handle, const char *name) /*[clinic end generated code: output=2aaee14f063f3bd9 input=f7040492ac6d9962]*/ { + PyObject *result = NULL; char *name_copy = NULL; +#ifdef HAVE_BROKEN_SEM_GETVALUE + char mutex_name[36]; + SemLockObject *semlock = NULL; + SEM_HANDLE handle_mutex = SEM_FAILED; +#endif if (name != NULL) { name_copy = PyMem_Malloc(strlen(name) + 1); @@ -571,21 +1071,72 @@ _multiprocessing_SemLock__rebuild_impl(PyTypeObject *type, SEM_HANDLE handle, PyMem_Free(name_copy); return NULL; } +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE2(maxvalue, kind)) { + _build_sem_name(mutex_name, name); + handle_mutex = sem_open(mutex_name, 0); + if (handle_mutex == SEM_FAILED) { + if (handle != SEM_FAILED) { + SEM_CLOSE(handle); + } + PyErr_SetFromErrno(PyExc_OSError); + PyMem_Free(name_copy); + return NULL; + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE */ } -#endif +#endif /* !MS_WINDOWS */ + + result = newsemlockobject(type, handle, kind, maxvalue, name_copy); + +#ifdef HAVE_BROKEN_SEM_GETVALUE + semlock = _SemLockObject_CAST(result); + if (ISSEMAPHORE(semlock)) { + semlock->handle_mutex = handle_mutex; + semlock->counter = connect_counter(semlock, name); + if (!semlock->counter) { + if (semlock->handle != SEM_FAILED) { + SEM_CLOSE(semlock->handle); + } + if (semlock->handle_mutex != SEM_FAILED) { + SEM_CLOSE(semlock->handle_mutex); + } - return newsemlockobject(type, handle, kind, maxvalue, name_copy); + PyObject_GC_UnTrack(semlock); + type->tp_free(semlock); + Py_DECREF(type); + PyErr_SetFromErrno(PyExc_OSError); + PyMem_Free(name_copy); + return NULL; + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE */ + return result; } static void -semlock_dealloc(PyObject *op) +semlock_dealloc(SemLockObject* self) { - SemLockObject *self = _SemLockObject_CAST(op); PyTypeObject *tp = Py_TYPE(self); PyObject_GC_UnTrack(self); - if (self->handle != SEM_FAILED) + if (self->handle != SEM_FAILED) { SEM_CLOSE(self->handle); - PyMem_Free(self->name); + } +#ifdef HAVE_BROKEN_SEM_GETVALUE + if (ISSEMAPHORE(self)) { + if (self->handle_mutex != SEM_FAILED) { + SEM_CLOSE(self->handle_mutex); + } + /* Case of fork with MacOSX */ + if (self->counter) { + dealloc_counter(self->counter); + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE */ + if (self->name) { + PyMem_Free(self->name); + } tp->tp_free(self); Py_DECREF(tp); } @@ -618,6 +1169,8 @@ _multiprocessing_SemLock__is_mine_impl(SemLockObject *self) return PyBool_FromLong(ISMINE(self)); } +PyObject * _multiprocessing_SemLock__is_zero_impl(SemLockObject *self); + /*[clinic input] _multiprocessing.SemLock._get_value @@ -628,17 +1181,35 @@ static PyObject * _multiprocessing_SemLock__get_value_impl(SemLockObject *self) /*[clinic end generated code: output=64bc1b89bda05e36 input=cb10f9a769836203]*/ { + int sval = -1; + #ifdef HAVE_BROKEN_SEM_GETVALUE - PyErr_SetNone(PyExc_NotImplementedError); - return NULL; + if (self->maxvalue <= 1) { + return PyLong_FromLong((long)Py_IsFalse(_multiprocessing_SemLock__is_zero_impl(self))); + } + + // error is set in ACQUIRE/RELEASE_* macros. + if (ACQUIRE_COUNTER_MUTEX(self->handle_mutex)) { + sval = self->counter->internal_value; + if (!RELEASE_COUNTER_MUTEX(self->handle_mutex)) { + return NULL; + } + } else { + return NULL; + } + if (sval < 0) { + sval = 0; + } + return PyLong_FromLong((long)sval); #else - int sval; - if (SEM_GETVALUE(self->handle, &sval) < 0) + if (SEM_GETVALUE(self->handle, &sval) < 0) { return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + } /* some posix implementations use negative numbers to indicate the number of waiting threads */ - if (sval < 0) + if (sval < 0) { sval = 0; + } return PyLong_FromLong((long)sval); #endif } @@ -721,7 +1292,7 @@ _multiprocessing_SemLock___exit___impl(SemLockObject *self, } static int -semlock_traverse(PyObject *s, visitproc visit, void *arg) +semlock_traverse(SemLockObject *s, visitproc visit, void *arg) { Py_VISIT(Py_TYPE(s)); return 0; @@ -798,6 +1369,32 @@ _PyMp_sem_unlink(const char *name) _PyMp_SetError(NULL, MP_STANDARD_ERROR); return NULL; } +#ifdef HAVE_BROKEN_SEM_GETVALUE + char mutex_name[36]; + CounterObject *counter = NULL; + int res = -1; + + /* test if unlink was called from a [Bounded]Semaphore + not from a [R]Lock */ + if (shm_semlock_counters.state_this == THIS_AVAILABLE) { + counter = _search_counter_from_sem_name(name); + if (counter) { + + counter->unlink_done = 1; + res = dealloc_counter(counter); + + /* unlink associated mutex */ + _build_sem_name(mutex_name, name); + if (SEM_UNLINK(mutex_name) < 0) { + _PyMp_SetError(NULL, MP_STANDARD_ERROR); + return NULL; + } + if (res < 0) { + return NULL; + } + } + } +#endif /* HAVE_BROKEN_SEM_GETVALUE */ Py_RETURN_NONE; } diff --git a/Modules/_multiprocessing/semaphore_macosx.h b/Modules/_multiprocessing/semaphore_macosx.h new file mode 100644 index 00000000000000..3a0f52abf06cb2 --- /dev/null +++ b/Modules/_multiprocessing/semaphore_macosx.h @@ -0,0 +1,68 @@ +#ifndef SEMAPHORE_MACOSX_H +#define SEMAPHORE_MACOSX_H + +#include // sysconf(SC_PAGESIZE) +#include // shm_open, shm_unlink + +/* +On my MacOSX m4 pro, sysconf(_SC_SEM_NSEM_MAX) returns 87381. +Perharps, this value is to high ? +*/ +#define NSEMS_MAX sysconf(_SC_SEM_NSEMS_MAX) + +#define CALC_SIZE_SHM (NSEMS_MAX * sizeof(CounterObject)) + sizeof(HeaderObject); + +#define SC_PAGESIZE sysconf(_SC_PAGESIZE) +#define ALIGN_SHM_PAGE(s) ((int)((s)/SC_PAGESIZE)+1)*SC_PAGESIZE + +#define CALC_NB_SLOTS(s) (int)((((s)) - sizeof(HeaderObject)) / sizeof(CounterObject)) + +/* +Structure in shared memory +*/ +typedef struct { + int n_semlocks; // Current number of semaphores. Starts 0. + int n_slots; // Current slots in the counter array. + int size_shm; // Size of allocated shared memory (this and N counters). + int n_procs; // Number of attached processes (Used to check). +} HeaderObject; + +typedef struct { + char sem_name[16]; // Name of semaphore. + int internal_value; // Internal value of semaphore, update on each acquire/release. + int unlink_done; // Can reset counter if unlink is done. + time_t ctimestamp; // Created timestamp. +} CounterObject; + +/* +2 -> Structure of static memory: +*/ + +typedef int MEMORY_HANDLE; +enum _state {THIS_NOT_OPEN, THIS_AVAILABLE, THIS_CLOSED}; + +typedef struct { + /*-- global datas --*/ + int state_this; // State of this structure. + char *name_shm; + MEMORY_HANDLE handle_shm; // Memory handle. + int create_shm; // Did I create this shared memory ? + char *name_shm_lock; + SEM_HANDLE handle_shm_lock; // Global memory lock to handle shared memory. + /*-- Pointers to shared memory --*/ + HeaderObject *header; // Pointer to header (shared memory). + CounterObject*counters; // Pointer to first item of fix array (shared memory). +} CountersWorkaround; + +#define ACQUIRE_SHM_LOCK (acquire_lock(shm_semlock_counters.handle_shm_lock) >= 0) +#define RELEASE_SHM_LOCK (release_lock(shm_semlock_counters.handle_shm_lock) >= 0) + +#define ACQUIRE_COUNTER_MUTEX(s) (acquire_lock((s)) >= 0) +#define RELEASE_COUNTER_MUTEX(s) (release_lock((s)) >= 0) + +#define ISSEMAPHORE2(m, k) ((m) > 1 && (k) == SEMAPHORE) +#define ISSEMAPHORE(o) ((o)->maxvalue > 1 && (o)->kind == SEMAPHORE) + +#define NO_VALUE (-11111111) + +#endif /* SEMAPHORE_MACOSX_H */