Skip to content

Make dppl rt threadlocal #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ __pycache__/
# CMake build and local install directory
build
install
build_cmake

# Emacs temp files
*~

# Eclipse project files
.project
.pydevproject

# Distribution / packaging
.Python
env/
Expand Down
88 changes: 57 additions & 31 deletions dppl/oneapi_interface.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ cdef extern from "dppl_oneapi_interface.hpp" namespace "dppl":
int64_t activateQueue (void **Q, _device_type DTy,
size_t device_num) except -1
int64_t deactivateCurrentQueue () except -1
int64_t number_of_activated_queues (size_t &num) except -1
int64_t dump () except -1
int64_t dump_queue (const void *Q) except -1

Expand All @@ -73,19 +74,12 @@ cdef void delete_queue (object cap):
deleteQueue(PyCapsule_GetPointer(cap, NULL))


cdef class DpplRuntime:
cdef class _DpplRuntime:
cdef DpplOneAPIRuntime rt

def __cinit__ (self):
self.rt = DpplOneAPIRuntime()

def get_num_platforms (self):
''' Returns the number of available SYCL/OpenCL platforms.
'''
cdef size_t num_platforms = 0
self.rt.getNumPlatforms(&num_platforms)
return num_platforms

def _activate_queue (self, device_ty, device_id):
cdef void *queue_ptr
if device_ty == device_type.gpu:
Expand All @@ -101,37 +95,70 @@ cdef class DpplRuntime:
def _deactivate_current_queue (self):
self.rt.deactivateCurrentQueue()

def get_current_queue (self):
''' Returns the activated SYCL queue as a PyCapsule.
'''
cdef void* queue_ptr = NULL;
self.rt.getCurrentQueue(&queue_ptr);
return PyCapsule_New(queue_ptr, NULL, &delete_queue)

# def set_global_queue (self, device_ty, device_id):
# if device_ty == device_type.gpu:
# self.rt.setGlobalContextWithGPU(device_id)
# elif device_ty == device_type.cpu:
# self.rt.setGlobalContextWithCPU(device_id)
# else:
# e = UnsupportedDeviceTypeError("Device can only be cpu or gpu")
# raise e

def dump (self):
''' Prints information about the Runtime object.
'''
return self.rt.dump()

def dump_queue (self, queue_cap):
def dump_queue_info (self, queue_cap):
''' Prints information about the SYCL queue object.
'''
if PyCapsule_IsValid(queue_cap, NULL):
self.rt.dump_queue(PyCapsule_GetPointer(queue_cap, NULL))
return self.rt.dump_queue(PyCapsule_GetPointer(queue_cap, NULL))
else:
raise ValueError("Expected a PyCapsule encapsulating a SYCL queue")

# Global runtime object
runtime = DpplRuntime()
def get_current_queue (self):
''' Returns the activated SYCL queue as a PyCapsule.
'''
cdef void* queue_ptr = NULL;
self.rt.getCurrentQueue(&queue_ptr);
return PyCapsule_New(queue_ptr, NULL, &delete_queue)

def get_num_platforms (self):
''' Returns the number of available SYCL/OpenCL platforms.
'''
cdef size_t num_platforms = 0
self.rt.getNumPlatforms(&num_platforms)
return num_platforms

def set_default_queue (self, device_ty, device_id):
if device_ty == device_type.gpu:
self.rt.resetGlobalQueue(_device_type._GPU, device_id)
elif device_ty == device_type.cpu:
self.rt.resetGlobalQueue(_device_type._CPU, device_id)
else:
e = UnsupportedDeviceTypeError("Device can only be cpu or gpu")
raise e

def is_in_dppl_ctxt (self):
cdef size_t num = 0
self.rt.number_of_activated_queues(num)
if num:
return True
else:
return False


# thread-local storage
from threading import local as threading_local

# Initialize a thread local instance of _DpplRuntime
_tls = threading_local()
_tls._runtime = _DpplRuntime()


################################################################################
#--------------------------------- Public API ---------------------------------#
################################################################################


dump = _tls._runtime.dump
dump_queue_info = _tls._runtime.dump_queue_info
get_current_queue = _tls._runtime.get_current_queue
get_num_platforms = _tls._runtime.get_num_platforms
set_default_queue = _tls._runtime.set_default_queue
is_in_dppl_ctxt = _tls._runtime.is_in_dppl_ctxt
Comment on lines +147 to +161

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the entire runtime thread-local? The only thing which is thread-local is the stack of contexts. Making shared data thread-local is calling for trouble.
I think the runtime class should encapsulate the thread-local business and return the thread-local values in its member functions like get_current_queue. Maybe this needs to be done in C++ (e.g. your std::deque be thread-local).


from contextlib import contextmanager

Expand All @@ -146,12 +173,11 @@ def device_context (dev=device_type.gpu, device_num=0):
# If set_context is unable to create a new context an exception is raised.
try:
ctxt = None
ctxt = runtime._activate_queue(dev, device_num)
ctxt = _tls._runtime._activate_queue(dev, device_num)
yield ctxt
finally:
# Code to release resource
if ctxt:
print("Removing the context from the deque of active contexts")
runtime._deactivate_current_queue()
_tls._runtime._deactivate_current_queue()
else:
print("No context was created so nothing to do")
87 changes: 87 additions & 0 deletions dppl/tests/dppl_tests/t_dump_fns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
##===---------- t_is_in_dppl_ctxt.py - dppl -------------*- Python -*-----===##
##
## Python Data Parallel Processing Library (PyDPPL)
##
## Copyright 2020 Intel Corporation
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
##===----------------------------------------------------------------------===##
###
### \file
### This file has unit test cases to check the dump functions provided by dppl.
##===----------------------------------------------------------------------===##

from contextlib import contextmanager
import ctypes
import dppl
import io
import os, sys
import tempfile
import unittest


libc = ctypes.CDLL(None)
c_stdout = ctypes.c_void_p.in_dll(libc, 'stdout')

# Sourced from
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
@contextmanager
def stdout_redirector (stream):
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stdout_fd = sys.stdout.fileno()

def _redirect_stdout(to_fd):
"""Redirect stdout to the given file descriptor."""
# Flush the C-level buffer stdout
libc.fflush(c_stdout)
# Flush and close sys.stdout - also closes the file descriptor (fd)
sys.stdout.close()
# Make original_stdout_fd point to the same file as to_fd
os.dup2(to_fd, original_stdout_fd)
# Create a new sys.stdout that points to the redirected fd
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, 'wb'))

# Save a copy of the original stdout fd in saved_stdout_fd
saved_stdout_fd = os.dup(original_stdout_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode='w+b')
_redirect_stdout(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stdout(saved_stdout_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
if stream:
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stdout_fd)


class TestDPPLDumpFns(unittest.TestCase):

def test_dppl_dump_runtime(self):
with stdout_redirector(None):
self.assertEqual(dppl.dump(), 0)

def test_dppl_dump_queue_info(self):
with stdout_redirector(None):
q = dppl.get_current_queue()
self.assertEqual(dppl.dump_queue_info(q), 0)


suite = unittest.TestLoader().loadTestsFromTestCase(TestDPPLDumpFns)
unittest.TextTestRunner(verbosity=2).run(suite)
46 changes: 46 additions & 0 deletions dppl/tests/dppl_tests/t_is_in_dppl_ctxt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
##===---------- t_is_in_dppl_ctxt.py - dppl -------------*- Python -*-----===##
##
## Python Data Parallel Processing Library (PyDPPL)
##
## Copyright 2020 Intel Corporation
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
##===----------------------------------------------------------------------===##
###
### \file
### This file has unit test cases to check the value returned by the
### dppl.is_in_dppl_ctxt function.
##===----------------------------------------------------------------------===##

import dppl
import unittest

class TestDPPLIsInDPPLCtxt (unittest.TestCase):

def test_is_in_dppl_ctxt_outside_device_ctxt (self):
self.assertFalse(dppl.is_in_dppl_ctxt())

def test_is_in_dppl_ctxt_inside_device_ctxt (self):
with dppl.device_context(dppl.device_type.gpu):
self.assertTrue(dppl.is_in_dppl_ctxt())

def test_is_in_dppl_ctxt_inside_nested_device_ctxt (self):
with dppl.device_context(dppl.device_type.cpu):
with dppl.device_context(dppl.device_type.gpu):
self.assertTrue(dppl.is_in_dppl_ctxt())
self.assertTrue(dppl.is_in_dppl_ctxt())
self.assertFalse(dppl.is_in_dppl_ctxt())

suite = unittest.TestLoader().loadTestsFromTestCase(TestDPPLIsInDPPLCtxt)
unittest.TextTestRunner(verbosity=2).run(suite)
83 changes: 83 additions & 0 deletions dppl/tests/ocldrv_tests/t_ocldrv_dump_fns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
##===---------- t_ocldrv_dump_fns.py - dppl.ocldrv ------*- Python -*-----===##
##
## Python Data Parallel Processing Library (PyDPPL)
##
## Copyright 2020 Intel Corporation
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
##===----------------------------------------------------------------------===##
###
### \file
### This file has unit test cases to check the dump functions in dppl.ocldrv.
##===----------------------------------------------------------------------===##

from contextlib import contextmanager
import ctypes
import io
import os, sys
import tempfile
import unittest

import dppl.ocldrv as drv

libc = ctypes.CDLL(None)
c_stdout = ctypes.c_void_p.in_dll(libc, 'stdout')

# Sourced from
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
@contextmanager
def stdout_redirector (stream):
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stdout_fd = sys.stdout.fileno()

def _redirect_stdout(to_fd):
"""Redirect stdout to the given file descriptor."""
# Flush the C-level buffer stdout
libc.fflush(c_stdout)
# Flush and close sys.stdout - also closes the file descriptor (fd)
sys.stdout.close()
# Make original_stdout_fd point to the same file as to_fd
os.dup2(to_fd, original_stdout_fd)
# Create a new sys.stdout that points to the redirected fd
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, 'wb'))

# Save a copy of the original stdout fd in saved_stdout_fd
saved_stdout_fd = os.dup(original_stdout_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode='w+b')
_redirect_stdout(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stdout(saved_stdout_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
if stream:
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stdout_fd)


class TestOcldrvDumpFns (unittest.TestCase):

def test_dppl_ocldrv_dump_runtime(self):
with stdout_redirector(None):
self.assertEqual(drv.runtime.dump(), 0)


suite = unittest.TestLoader().loadTestsFromTestCase(TestOcldrvDumpFns)
unittest.TextTestRunner(verbosity=2).run(suite)

Loading