diff --git a/dpctl/CMakeLists.txt b/dpctl/CMakeLists.txt index 69292f897e..3ca3c9a79d 100644 --- a/dpctl/CMakeLists.txt +++ b/dpctl/CMakeLists.txt @@ -161,6 +161,8 @@ foreach(_cy_file ${_cython_sources}) build_dpctl_ext(${_trgt} ${_cy_file} "dpctl") endforeach() +target_include_directories(_sycl_queue PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + add_subdirectory(program) add_subdirectory(memory) add_subdirectory(tensor) diff --git a/dpctl/_host_task_util.hpp b/dpctl/_host_task_util.hpp new file mode 100644 index 0000000000..5208e11d2c --- /dev/null +++ b/dpctl/_host_task_util.hpp @@ -0,0 +1,73 @@ +//===--- _host_tasl_util.hpp - Implements async DECREF =// +// +// Data Parallel Control (dpctl) +// +// Copyright 2020-2021 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//===----------------------------------------------------------------------===// +/// +/// \file +/// This file implements a utility function to schedule host task to a sycl +/// queue depending on given array of sycl events to decrement reference counts +/// for the given array of Python objects. +/// +/// N.B.: The host task attempts to acquire GIL, so queue wait, event wait and +/// other synchronization mechanisms should be called after releasing the GIL to +/// avoid deadlocks. +/// +//===----------------------------------------------------------------------===// + +#include "Python.h" +#include "syclinterface/dpctl_data_types.h" +#include + +int async_dec_ref(DPCTLSyclQueueRef QRef, + PyObject **obj_array, + size_t obj_array_size, + DPCTLSyclEventRef *ERefs, + size_t nERefs) +{ + + sycl::queue *q = reinterpret_cast(QRef); + + std::vector obj_vec; + obj_vec.reserve(obj_array_size); + for (size_t obj_id = 0; obj_id < obj_array_size; ++obj_id) { + obj_vec.push_back(obj_array[obj_id]); + } + + try { + q->submit([&](sycl::handler &cgh) { + for (size_t ev_id = 0; ev_id < nERefs; ++ev_id) { + cgh.depends_on( + *(reinterpret_cast(ERefs[ev_id]))); + } + cgh.host_task([obj_array_size, obj_vec]() { + { + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + for (size_t i = 0; i < obj_array_size; ++i) { + Py_DECREF(obj_vec[i]); + } + PyGILState_Release(gstate); + } + }); + }); + } catch (const std::exception &e) { + return 1; + } + + return 0; +} diff --git a/dpctl/_sycl_event.pyx b/dpctl/_sycl_event.pyx index bd680eadf7..c64478d3c6 100644 --- a/dpctl/_sycl_event.pyx +++ b/dpctl/_sycl_event.pyx @@ -98,7 +98,6 @@ cdef class _SyclEvent: def __dealloc__(self): if (self._event_ref): - with nogil: DPCTLEvent_Wait(self._event_ref) DPCTLEvent_Delete(self._event_ref) self._event_ref = NULL self.args = None diff --git a/dpctl/_sycl_queue.pyx b/dpctl/_sycl_queue.pyx index 1e1b15813b..30a64fec4c 100644 --- a/dpctl/_sycl_queue.pyx +++ b/dpctl/_sycl_queue.pyx @@ -65,11 +65,17 @@ import ctypes from .enum_types import backend_type from cpython cimport pycapsule +from cpython.ref cimport Py_DECREF, Py_INCREF, PyObject from libc.stdlib cimport free, malloc import collections.abc import logging + +cdef extern from "_host_task_util.hpp": + int async_dec_ref(DPCTLSyclQueueRef, PyObject **, size_t, DPCTLSyclEventRef *, size_t) nogil + + __all__ = [ "SyclQueue", "SyclKernelInvalidRangeError", @@ -714,12 +720,14 @@ cdef class SyclQueue(_SyclQueue): cdef _arg_data_type *kargty = NULL cdef DPCTLSyclEventRef *depEvents = NULL cdef DPCTLSyclEventRef Eref = NULL - cdef int ret + cdef int ret = 0 cdef size_t gRange[3] cdef size_t lRange[3] cdef size_t nGS = len(gS) cdef size_t nLS = len(lS) if lS is not None else 0 cdef size_t nDE = len(dEvents) if dEvents is not None else 0 + cdef PyObject **arg_objects = NULL + cdef ssize_t i = 0 # Allocate the arrays to be sent to DPCTLQueue_Submit kargs = malloc(len(args) * sizeof(void*)) @@ -820,8 +828,24 @@ cdef class SyclQueue(_SyclQueue): raise SyclKernelSubmitError( "Kernel submission to Sycl queue failed." ) - - return SyclEvent._create(Eref, args) + # increment reference counts to each argument + arg_objects = malloc(len(args) * sizeof(PyObject *)) + for i in range(len(args)): + arg_objects[i] = (args[i]) + Py_INCREF( arg_objects[i]) + + # schedule decrement + if async_dec_ref(self.get_queue_ref(), arg_objects, len(args), &Eref, 1): + # async task submission failed, decrement ref counts and wait + for i in range(len(args)): + arg_objects[i] = (args[i]) + Py_DECREF( arg_objects[i]) + with nogil: DPCTLEvent_Wait(Eref) + + # free memory + free(arg_objects) + + return SyclEvent._create(Eref, []) cpdef void wait(self): with nogil: DPCTLQueue_Wait(self._queue_ref) diff --git a/dpctl/tests/test_sycl_kernel_submit.py b/dpctl/tests/test_sycl_kernel_submit.py index 3f3a03bb97..8a60294dd5 100644 --- a/dpctl/tests/test_sycl_kernel_submit.py +++ b/dpctl/tests/test_sycl_kernel_submit.py @@ -25,6 +25,7 @@ import dpctl import dpctl.memory as dpctl_mem import dpctl.program as dpctl_prog +import dpctl.tensor as dpt @pytest.mark.parametrize( @@ -107,4 +108,99 @@ def test_create_program_from_source(ctype_str, dtype, ctypes_ctor): ref_c = a * np.array(d, dtype=dtype) + b host_dt, device_dt = timer.dt assert type(host_dt) is float and type(device_dt) is float - assert np.allclose(c, ref_c), "Faled for {}, {}".formatg(r, lr) + assert np.allclose(c, ref_c), "Failed for {}, {}".formatg(r, lr) + + +def test_async_submit(): + try: + q = dpctl.SyclQueue("opencl") + except dpctl.SyclQueueCreationError: + pytest.skip("OpenCL queue could not be created") + oclSrc = ( + "kernel void kern1(global unsigned int *res, unsigned int mod) {" + " size_t index = get_global_id(0);" + " int ri = (index % mod);" + " res[index] = (ri * ri) % mod;" + "}" + " " + "kernel void kern2(global unsigned int *res, unsigned int mod) {" + " size_t index = get_global_id(0);" + " int ri = (index % mod);" + " int ri2 = (ri * ri) % mod;" + " res[index] = (ri2 * ri) % mod;" + "}" + " " + "kernel void kern3(" + " global unsigned int *res, global unsigned int *arg1, " + " global unsigned int *arg2)" + "{" + " size_t index = get_global_id(0);" + " res[index] = " + " (arg1[index] < arg2[index]) ? arg1[index] : arg2[index];" + "}" + ) + prog = dpctl_prog.create_program_from_source(q, oclSrc) + kern1Kernel = prog.get_sycl_kernel("kern1") + kern2Kernel = prog.get_sycl_kernel("kern2") + kern3Kernel = prog.get_sycl_kernel("kern3") + + assert isinstance(kern1Kernel, dpctl_prog.SyclKernel) + assert isinstance(kern2Kernel, dpctl_prog.SyclKernel) + assert isinstance(kern2Kernel, dpctl_prog.SyclKernel) + + n = 1024 * 1024 + X = dpt.empty((3, n), dtype="u4", usm_type="device", sycl_queue=q) + first_row = dpctl_mem.as_usm_memory(X[0]) + second_row = dpctl_mem.as_usm_memory(X[1]) + third_row = dpctl_mem.as_usm_memory(X[2]) + + e1 = q.submit( + kern1Kernel, + [ + first_row, + ctypes.c_uint(17), + ], + [ + n, + ], + ) + e2 = q.submit( + kern2Kernel, + [ + second_row, + ctypes.c_uint(27), + ], + [ + n, + ], + ) + e3 = q.submit( + kern3Kernel, + [third_row, first_row, second_row], + [ + n, + ], + None, + [e1, e2], + ) + status_complete = dpctl.event_status_type.complete + assert not all( + [ + e == status_complete + for e in ( + e1.execution_status, + e2.execution_status, + e3.execution_status, + ) + ] + ) + + e3.wait() + Xnp = dpt.asnumpy(X) + Xref = np.empty((3, n), dtype="u4") + for i in range(n): + Xref[0, i] = (i * i) % 17 + Xref[1, i] = (i * i * i) % 27 + Xref[2, i] = min(Xref[0, i], Xref[1, i]) + + assert np.array_equal(Xnp, Xref)