Skip to content

SyclEvent deleter no longer waits #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dpctl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ foreach(_cy_file ${_cython_sources})
build_dpctl_ext(${_trgt} ${_cy_file} "dpctl")
endforeach()

target_include_directories(_sycl_queue PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})

add_subdirectory(program)
add_subdirectory(memory)
add_subdirectory(tensor)
Expand Down
73 changes: 73 additions & 0 deletions dpctl/_host_task_util.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//===--- _host_tasl_util.hpp - Implements async DECREF =//
//
// Data Parallel Control (dpctl)
//
// Copyright 2020-2021 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//===----------------------------------------------------------------------===//
///
/// \file
/// This file implements a utility function to schedule host task to a sycl
/// queue depending on given array of sycl events to decrement reference counts
/// for the given array of Python objects.
///
/// N.B.: The host task attempts to acquire GIL, so queue wait, event wait and
/// other synchronization mechanisms should be called after releasing the GIL to
/// avoid deadlocks.
///
//===----------------------------------------------------------------------===//

#include "Python.h"
#include "syclinterface/dpctl_data_types.h"
#include <CL/sycl.hpp>

int async_dec_ref(DPCTLSyclQueueRef QRef,
PyObject **obj_array,
size_t obj_array_size,
DPCTLSyclEventRef *ERefs,
size_t nERefs)
{

sycl::queue *q = reinterpret_cast<sycl::queue *>(QRef);

std::vector<PyObject *> obj_vec;
obj_vec.reserve(obj_array_size);
for (size_t obj_id = 0; obj_id < obj_array_size; ++obj_id) {
obj_vec.push_back(obj_array[obj_id]);
}

try {
q->submit([&](sycl::handler &cgh) {
for (size_t ev_id = 0; ev_id < nERefs; ++ev_id) {
cgh.depends_on(
*(reinterpret_cast<sycl::event *>(ERefs[ev_id])));
}
cgh.host_task([obj_array_size, obj_vec]() {
{
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
for (size_t i = 0; i < obj_array_size; ++i) {
Py_DECREF(obj_vec[i]);
}
PyGILState_Release(gstate);
}
});
});
} catch (const std::exception &e) {
return 1;
}

return 0;
}
1 change: 0 additions & 1 deletion dpctl/_sycl_event.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ cdef class _SyclEvent:

def __dealloc__(self):
if (self._event_ref):
with nogil: DPCTLEvent_Wait(self._event_ref)
DPCTLEvent_Delete(self._event_ref)
self._event_ref = NULL
self.args = None
Expand Down
30 changes: 27 additions & 3 deletions dpctl/_sycl_queue.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,17 @@ import ctypes
from .enum_types import backend_type

from cpython cimport pycapsule
from cpython.ref cimport Py_DECREF, Py_INCREF, PyObject
from libc.stdlib cimport free, malloc

import collections.abc
import logging


cdef extern from "_host_task_util.hpp":
int async_dec_ref(DPCTLSyclQueueRef, PyObject **, size_t, DPCTLSyclEventRef *, size_t) nogil


__all__ = [
"SyclQueue",
"SyclKernelInvalidRangeError",
Expand Down Expand Up @@ -714,12 +720,14 @@ cdef class SyclQueue(_SyclQueue):
cdef _arg_data_type *kargty = NULL
cdef DPCTLSyclEventRef *depEvents = NULL
cdef DPCTLSyclEventRef Eref = NULL
cdef int ret
cdef int ret = 0
cdef size_t gRange[3]
cdef size_t lRange[3]
cdef size_t nGS = len(gS)
cdef size_t nLS = len(lS) if lS is not None else 0
cdef size_t nDE = len(dEvents) if dEvents is not None else 0
cdef PyObject **arg_objects = NULL
cdef ssize_t i = 0

# Allocate the arrays to be sent to DPCTLQueue_Submit
kargs = <void**>malloc(len(args) * sizeof(void*))
Expand Down Expand Up @@ -820,8 +828,24 @@ cdef class SyclQueue(_SyclQueue):
raise SyclKernelSubmitError(
"Kernel submission to Sycl queue failed."
)

return SyclEvent._create(Eref, args)
# increment reference counts to each argument
arg_objects = <PyObject **>malloc(len(args) * sizeof(PyObject *))
for i in range(len(args)):
arg_objects[i] = <PyObject *>(args[i])
Py_INCREF(<object> arg_objects[i])

# schedule decrement
if async_dec_ref(self.get_queue_ref(), arg_objects, len(args), &Eref, 1):
# async task submission failed, decrement ref counts and wait
for i in range(len(args)):
arg_objects[i] = <PyObject *>(args[i])
Py_DECREF(<object> arg_objects[i])
with nogil: DPCTLEvent_Wait(Eref)

# free memory
free(arg_objects)

return SyclEvent._create(Eref, [])

cpdef void wait(self):
with nogil: DPCTLQueue_Wait(self._queue_ref)
Expand Down
98 changes: 97 additions & 1 deletion dpctl/tests/test_sycl_kernel_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import dpctl
import dpctl.memory as dpctl_mem
import dpctl.program as dpctl_prog
import dpctl.tensor as dpt


@pytest.mark.parametrize(
Expand Down Expand Up @@ -107,4 +108,99 @@ def test_create_program_from_source(ctype_str, dtype, ctypes_ctor):
ref_c = a * np.array(d, dtype=dtype) + b
host_dt, device_dt = timer.dt
assert type(host_dt) is float and type(device_dt) is float
assert np.allclose(c, ref_c), "Faled for {}, {}".formatg(r, lr)
assert np.allclose(c, ref_c), "Failed for {}, {}".formatg(r, lr)


def test_async_submit():
try:
q = dpctl.SyclQueue("opencl")
except dpctl.SyclQueueCreationError:
pytest.skip("OpenCL queue could not be created")
oclSrc = (
"kernel void kern1(global unsigned int *res, unsigned int mod) {"
" size_t index = get_global_id(0);"
" int ri = (index % mod);"
" res[index] = (ri * ri) % mod;"
"}"
" "
"kernel void kern2(global unsigned int *res, unsigned int mod) {"
" size_t index = get_global_id(0);"
" int ri = (index % mod);"
" int ri2 = (ri * ri) % mod;"
" res[index] = (ri2 * ri) % mod;"
"}"
" "
"kernel void kern3("
" global unsigned int *res, global unsigned int *arg1, "
" global unsigned int *arg2)"
"{"
" size_t index = get_global_id(0);"
" res[index] = "
" (arg1[index] < arg2[index]) ? arg1[index] : arg2[index];"
"}"
)
prog = dpctl_prog.create_program_from_source(q, oclSrc)
kern1Kernel = prog.get_sycl_kernel("kern1")
kern2Kernel = prog.get_sycl_kernel("kern2")
kern3Kernel = prog.get_sycl_kernel("kern3")

assert isinstance(kern1Kernel, dpctl_prog.SyclKernel)
assert isinstance(kern2Kernel, dpctl_prog.SyclKernel)
assert isinstance(kern2Kernel, dpctl_prog.SyclKernel)

n = 1024 * 1024
X = dpt.empty((3, n), dtype="u4", usm_type="device", sycl_queue=q)
first_row = dpctl_mem.as_usm_memory(X[0])
second_row = dpctl_mem.as_usm_memory(X[1])
third_row = dpctl_mem.as_usm_memory(X[2])

e1 = q.submit(
kern1Kernel,
[
first_row,
ctypes.c_uint(17),
],
[
n,
],
)
e2 = q.submit(
kern2Kernel,
[
second_row,
ctypes.c_uint(27),
],
[
n,
],
)
e3 = q.submit(
kern3Kernel,
[third_row, first_row, second_row],
[
n,
],
None,
[e1, e2],
)
status_complete = dpctl.event_status_type.complete
assert not all(
[
e == status_complete
for e in (
e1.execution_status,
e2.execution_status,
e3.execution_status,
)
]
)

e3.wait()
Xnp = dpt.asnumpy(X)
Xref = np.empty((3, n), dtype="u4")
for i in range(n):
Xref[0, i] = (i * i) % 17
Xref[1, i] = (i * i * i) % 27
Xref[2, i] = min(Xref[0, i], Xref[1, i])

assert np.array_equal(Xnp, Xref)