From e1189e509ec1c6510f20e40d4c126cd30af9344d Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlyk Date: Wed, 26 Jan 2022 18:55:26 -0600 Subject: [PATCH 1/5] SyclEvent deleter no longer waits Added _host_task_utils.hpp file that provides function to submit host task to decrement reference count for each Python object passed to one gated by passed event references. Host task is submitted to sycl::queue behind the given QRef. SyclQueue.submit changes the mechanism by which it ensures that Python objects (and memory they hold) that the kernel works on are not GC collected before the kernel completes its execution. We used to store reference to arguments in the SyclEvent object returned by SyclQueue.submit. To avoid GC-ing kernel arguments before kernel completes execution, deleter of SyclEvent has to call `SyclEvent.wait`. With this PR `SyclQueue.submit` increments reference counts of arguments after submission, then schedules a host task on the same queue, dependent on the kernel event, which aquires GIL and decrements reference counts. After this change the SyclEven.__dealloc__ no longer has to wait. It also no longer needs to store args attribute. This is now deperecated and will be removed. --- dpctl/CMakeLists.txt | 2 ++ dpctl/_host_task_util.hpp | 42 +++++++++++++++++++++++++++++++++++++++ dpctl/_sycl_event.pyx | 2 +- dpctl/_sycl_queue.pyx | 24 +++++++++++++++++++--- 4 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 dpctl/_host_task_util.hpp diff --git a/dpctl/CMakeLists.txt b/dpctl/CMakeLists.txt index 69292f897e..3ca3c9a79d 100644 --- a/dpctl/CMakeLists.txt +++ b/dpctl/CMakeLists.txt @@ -161,6 +161,8 @@ foreach(_cy_file ${_cython_sources}) build_dpctl_ext(${_trgt} ${_cy_file} "dpctl") endforeach() +target_include_directories(_sycl_queue PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + add_subdirectory(program) add_subdirectory(memory) add_subdirectory(tensor) diff --git a/dpctl/_host_task_util.hpp b/dpctl/_host_task_util.hpp new file mode 100644 index 0000000000..9671ed1547 --- /dev/null +++ b/dpctl/_host_task_util.hpp @@ -0,0 +1,42 @@ +#include "Python.h" +#include "syclinterface/dpctl_data_types.h" +#include + +int async_dec_ref(DPCTLSyclQueueRef QRef, + PyObject **obj_array, + size_t obj_array_size, + DPCTLSyclEventRef *ERefs, + size_t nERefs) +{ + + sycl::queue *q = reinterpret_cast(QRef); + + std::vector obj_vec; + obj_vec.reserve(obj_array_size); + for (size_t obj_id = 0; obj_id < obj_array_size; ++obj_id) { + obj_vec.push_back(obj_array[obj_id]); + } + + try { + q->submit([&](sycl::handler &cgh) { + for (size_t ev_id = 0; ev_id < nERefs; ++ev_id) { + cgh.depends_on( + *(reinterpret_cast(ERefs[ev_id]))); + } + cgh.host_task([obj_array_size, obj_vec]() { + { + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + for (size_t i = 0; i < obj_array_size; ++i) { + Py_DECREF(obj_vec[i]); + } + PyGILState_Release(gstate); + } + }); + }); + } catch (const std::exception &e) { + return 1; + } + + return 0; +} diff --git a/dpctl/_sycl_event.pyx b/dpctl/_sycl_event.pyx index bd680eadf7..77f3e7a0f0 100644 --- a/dpctl/_sycl_event.pyx +++ b/dpctl/_sycl_event.pyx @@ -98,7 +98,7 @@ cdef class _SyclEvent: def __dealloc__(self): if (self._event_ref): - with nogil: DPCTLEvent_Wait(self._event_ref) + # with nogil: DPCTLEvent_Wait(self._event_ref) DPCTLEvent_Delete(self._event_ref) self._event_ref = NULL self.args = None diff --git a/dpctl/_sycl_queue.pyx b/dpctl/_sycl_queue.pyx index 1e1b15813b..896c568061 100644 --- a/dpctl/_sycl_queue.pyx +++ b/dpctl/_sycl_queue.pyx @@ -65,11 +65,17 @@ import ctypes from .enum_types import backend_type from cpython cimport pycapsule +from cpython.ref cimport Py_INCREF, PyObject from libc.stdlib cimport free, malloc import collections.abc import logging + +cdef extern from "_host_task_util.hpp": + int async_dec_ref(DPCTLSyclQueueRef, PyObject **, size_t, DPCTLSyclEventRef *, size_t) nogil + + __all__ = [ "SyclQueue", "SyclKernelInvalidRangeError", @@ -714,12 +720,14 @@ cdef class SyclQueue(_SyclQueue): cdef _arg_data_type *kargty = NULL cdef DPCTLSyclEventRef *depEvents = NULL cdef DPCTLSyclEventRef Eref = NULL - cdef int ret + cdef int ret = 0 cdef size_t gRange[3] cdef size_t lRange[3] cdef size_t nGS = len(gS) cdef size_t nLS = len(lS) if lS is not None else 0 cdef size_t nDE = len(dEvents) if dEvents is not None else 0 + cdef PyObject **arg_objects = NULL + cdef ssize_t i = 0 # Allocate the arrays to be sent to DPCTLQueue_Submit kargs = malloc(len(args) * sizeof(void*)) @@ -820,8 +828,18 @@ cdef class SyclQueue(_SyclQueue): raise SyclKernelSubmitError( "Kernel submission to Sycl queue failed." ) - - return SyclEvent._create(Eref, args) + # increment reference counts to each argument + arg_objects = malloc(len(args) * sizeof(PyObject *)) + for i in range(len(args)): + arg_objects[i] = (args[i]) + Py_INCREF( arg_objects[i]) + + # schedule decrement + async_dec_ref(self.get_queue_ref(), arg_objects, len(args), &Eref, 1) + # free memory + free(arg_objects) + + return SyclEvent._create(Eref, []) cpdef void wait(self): with nogil: DPCTLQueue_Wait(self._queue_ref) From 68616129775d1753d21ddf5e938ee763d80d236b Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlyk Date: Fri, 28 Jan 2022 18:32:19 -0600 Subject: [PATCH 2/5] added license header and explanatory comment --- dpctl/_host_task_util.hpp | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/dpctl/_host_task_util.hpp b/dpctl/_host_task_util.hpp index 9671ed1547..5208e11d2c 100644 --- a/dpctl/_host_task_util.hpp +++ b/dpctl/_host_task_util.hpp @@ -1,3 +1,34 @@ +//===--- _host_tasl_util.hpp - Implements async DECREF =// +// +// Data Parallel Control (dpctl) +// +// Copyright 2020-2021 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//===----------------------------------------------------------------------===// +/// +/// \file +/// This file implements a utility function to schedule host task to a sycl +/// queue depending on given array of sycl events to decrement reference counts +/// for the given array of Python objects. +/// +/// N.B.: The host task attempts to acquire GIL, so queue wait, event wait and +/// other synchronization mechanisms should be called after releasing the GIL to +/// avoid deadlocks. +/// +//===----------------------------------------------------------------------===// + #include "Python.h" #include "syclinterface/dpctl_data_types.h" #include From f60faceca1fed0a90fd060fcfa2db66ce47897ae Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlyk Date: Mon, 31 Jan 2022 09:47:09 -0600 Subject: [PATCH 3/5] removed commented out code line --- dpctl/_sycl_event.pyx | 1 - 1 file changed, 1 deletion(-) diff --git a/dpctl/_sycl_event.pyx b/dpctl/_sycl_event.pyx index 77f3e7a0f0..c64478d3c6 100644 --- a/dpctl/_sycl_event.pyx +++ b/dpctl/_sycl_event.pyx @@ -98,7 +98,6 @@ cdef class _SyclEvent: def __dealloc__(self): if (self._event_ref): - # with nogil: DPCTLEvent_Wait(self._event_ref) DPCTLEvent_Delete(self._event_ref) self._event_ref = NULL self.args = None From fad31cce53e7d012929b3553702b3c30cb9e5308 Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlyk Date: Mon, 31 Jan 2022 09:48:16 -0600 Subject: [PATCH 4/5] Handle error in async_dec_ref: decrement ref counts and wait for the job to complete --- dpctl/_sycl_queue.pyx | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dpctl/_sycl_queue.pyx b/dpctl/_sycl_queue.pyx index 896c568061..30a64fec4c 100644 --- a/dpctl/_sycl_queue.pyx +++ b/dpctl/_sycl_queue.pyx @@ -65,7 +65,7 @@ import ctypes from .enum_types import backend_type from cpython cimport pycapsule -from cpython.ref cimport Py_INCREF, PyObject +from cpython.ref cimport Py_DECREF, Py_INCREF, PyObject from libc.stdlib cimport free, malloc import collections.abc @@ -835,7 +835,13 @@ cdef class SyclQueue(_SyclQueue): Py_INCREF( arg_objects[i]) # schedule decrement - async_dec_ref(self.get_queue_ref(), arg_objects, len(args), &Eref, 1) + if async_dec_ref(self.get_queue_ref(), arg_objects, len(args), &Eref, 1): + # async task submission failed, decrement ref counts and wait + for i in range(len(args)): + arg_objects[i] = (args[i]) + Py_DECREF( arg_objects[i]) + with nogil: DPCTLEvent_Wait(Eref) + # free memory free(arg_objects) From 8effe7762ea76d02d8db82426f568410ea207ffe Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlyk Date: Mon, 31 Jan 2022 09:49:41 -0600 Subject: [PATCH 5/5] Add test to check asynchronicity of submit --- dpctl/tests/test_sycl_kernel_submit.py | 98 +++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/dpctl/tests/test_sycl_kernel_submit.py b/dpctl/tests/test_sycl_kernel_submit.py index 3f3a03bb97..8a60294dd5 100644 --- a/dpctl/tests/test_sycl_kernel_submit.py +++ b/dpctl/tests/test_sycl_kernel_submit.py @@ -25,6 +25,7 @@ import dpctl import dpctl.memory as dpctl_mem import dpctl.program as dpctl_prog +import dpctl.tensor as dpt @pytest.mark.parametrize( @@ -107,4 +108,99 @@ def test_create_program_from_source(ctype_str, dtype, ctypes_ctor): ref_c = a * np.array(d, dtype=dtype) + b host_dt, device_dt = timer.dt assert type(host_dt) is float and type(device_dt) is float - assert np.allclose(c, ref_c), "Faled for {}, {}".formatg(r, lr) + assert np.allclose(c, ref_c), "Failed for {}, {}".formatg(r, lr) + + +def test_async_submit(): + try: + q = dpctl.SyclQueue("opencl") + except dpctl.SyclQueueCreationError: + pytest.skip("OpenCL queue could not be created") + oclSrc = ( + "kernel void kern1(global unsigned int *res, unsigned int mod) {" + " size_t index = get_global_id(0);" + " int ri = (index % mod);" + " res[index] = (ri * ri) % mod;" + "}" + " " + "kernel void kern2(global unsigned int *res, unsigned int mod) {" + " size_t index = get_global_id(0);" + " int ri = (index % mod);" + " int ri2 = (ri * ri) % mod;" + " res[index] = (ri2 * ri) % mod;" + "}" + " " + "kernel void kern3(" + " global unsigned int *res, global unsigned int *arg1, " + " global unsigned int *arg2)" + "{" + " size_t index = get_global_id(0);" + " res[index] = " + " (arg1[index] < arg2[index]) ? arg1[index] : arg2[index];" + "}" + ) + prog = dpctl_prog.create_program_from_source(q, oclSrc) + kern1Kernel = prog.get_sycl_kernel("kern1") + kern2Kernel = prog.get_sycl_kernel("kern2") + kern3Kernel = prog.get_sycl_kernel("kern3") + + assert isinstance(kern1Kernel, dpctl_prog.SyclKernel) + assert isinstance(kern2Kernel, dpctl_prog.SyclKernel) + assert isinstance(kern2Kernel, dpctl_prog.SyclKernel) + + n = 1024 * 1024 + X = dpt.empty((3, n), dtype="u4", usm_type="device", sycl_queue=q) + first_row = dpctl_mem.as_usm_memory(X[0]) + second_row = dpctl_mem.as_usm_memory(X[1]) + third_row = dpctl_mem.as_usm_memory(X[2]) + + e1 = q.submit( + kern1Kernel, + [ + first_row, + ctypes.c_uint(17), + ], + [ + n, + ], + ) + e2 = q.submit( + kern2Kernel, + [ + second_row, + ctypes.c_uint(27), + ], + [ + n, + ], + ) + e3 = q.submit( + kern3Kernel, + [third_row, first_row, second_row], + [ + n, + ], + None, + [e1, e2], + ) + status_complete = dpctl.event_status_type.complete + assert not all( + [ + e == status_complete + for e in ( + e1.execution_status, + e2.execution_status, + e3.execution_status, + ) + ] + ) + + e3.wait() + Xnp = dpt.asnumpy(X) + Xref = np.empty((3, n), dtype="u4") + for i in range(n): + Xref[0, i] = (i * i) % 17 + Xref[1, i] = (i * i * i) % 27 + Xref[2, i] = min(Xref[0, i], Xref[1, i]) + + assert np.array_equal(Xnp, Xref)