diff --git a/sycl/include/CL/sycl/detail/cg.hpp b/sycl/include/CL/sycl/detail/cg.hpp index 0e639b85da34c..9716980d02f67 100644 --- a/sycl/include/CL/sycl/detail/cg.hpp +++ b/sycl/include/CL/sycl/detail/cg.hpp @@ -307,9 +307,16 @@ class CGInteropTask : public CG { class CGHostTask : public CG { public: std::unique_ptr MHostTask; + // queue for host-interop task + shared_ptr_class MQueue; + // context for host-interop task + shared_ptr_class MContext; vector_class MArgs; - CGHostTask(std::unique_ptr HostTask, vector_class Args, + CGHostTask(std::unique_ptr HostTask, + std::shared_ptr Queue, + std::shared_ptr Context, + vector_class Args, std::vector> ArgsStorage, std::vector AccStorage, std::vector> SharedPtrStorage, @@ -319,7 +326,8 @@ class CGHostTask : public CG { : CG(Type, std::move(ArgsStorage), std::move(AccStorage), std::move(SharedPtrStorage), std::move(Requirements), std::move(Events), std::move(loc)), - MHostTask(std::move(HostTask)), MArgs(std::move(Args)) {} + MHostTask(std::move(HostTask)), MQueue(Queue), MContext(Context), + MArgs(std::move(Args)) {} }; class CGBarrier : public CG { diff --git a/sycl/include/CL/sycl/handler.hpp b/sycl/include/CL/sycl/handler.hpp index c7b4ea5565526..d919c83bff526 100644 --- a/sycl/include/CL/sycl/handler.hpp +++ b/sycl/include/CL/sycl/handler.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -870,6 +871,21 @@ class __SYCL_EXPORT handler { MCGType = detail::CG::CODEPLAY_HOST_TASK; } + template + typename std::enable_if< + detail::check_fn_signature::type, + void(interop_handle)>::value>::type + codeplay_host_task(FuncT &&Func) { + throwIfActionIsCreated(); + + MNDRDesc.set(range<1>(1)); + MArgs = std::move(MAssociatedAccesors); + + MHostTask.reset(new detail::HostTask(std::move(Func))); + + MCGType = detail::CG::CODEPLAY_HOST_TASK; + } + /// Defines and invokes a SYCL kernel function for the specified range and /// offset. /// diff --git a/sycl/source/CMakeLists.txt b/sycl/source/CMakeLists.txt index 004b596e84955..019a6f3a276c9 100644 --- a/sycl/source/CMakeLists.txt +++ b/sycl/source/CMakeLists.txt @@ -153,6 +153,7 @@ set(SYCL_SOURCES "sampler.cpp" "stream.cpp" "spirv_ops.cpp" + "interop_handle.cpp" "$<$:detail/windows_pi.cpp>" "$<$,$>:detail/posix_pi.cpp>" ) diff --git a/sycl/source/detail/scheduler/commands.cpp b/sycl/source/detail/scheduler/commands.cpp index c1b8063d6a3c7..529f0e2eb30aa 100644 --- a/sycl/source/detail/scheduler/commands.cpp +++ b/sycl/source/detail/scheduler/commands.cpp @@ -159,6 +159,7 @@ getPiEvents(const std::vector &EventImpls) { class DispatchHostTask { ExecCGCommand *MThisCmd; + std::vector MReqToMem; void waitForEvents() const { std::map> @@ -187,7 +188,9 @@ class DispatchHostTask { } public: - DispatchHostTask(ExecCGCommand *ThisCmd) : MThisCmd{ThisCmd} {} + DispatchHostTask(ExecCGCommand *ThisCmd, + std::vector ReqToMem) + : MThisCmd{ThisCmd} {} void operator()() const { waitForEvents(); @@ -197,7 +200,17 @@ class DispatchHostTask { CGHostTask &HostTask = static_cast(MThisCmd->getCG()); // we're ready to call the user-defined lambda now - HostTask.MHostTask->call(); + if (HostTask.MHostTask->isInteropTask()) { + auto Queue = HostTask.MQueue->get(); + auto DeviceId = HostTask.MQueue->get_device().get(); + auto Context = HostTask.MQueue->get_context().get(); + + interop_handle IH{MReqToMem, Queue, DeviceId, Context}; + + HostTask.MHostTask->call(IH); + } else + HostTask.MHostTask->call(); + HostTask.MHostTask.reset(); // unblock user empty command here @@ -1943,7 +1956,8 @@ cl_int ExecCGCommand::enqueueImp() { } } - MQueue->getThreadPool().submit(DispatchHostTask(this)); + MQueue->getThreadPool().submit(DispatchHostTask( + this, std::move(ReqToMem))); MShouldCompleteEventIfPossible = false; diff --git a/sycl/source/detail/scheduler/graph_builder.cpp b/sycl/source/detail/scheduler/graph_builder.cpp index cd588cf3c3742..9fdbaa3258bbd 100644 --- a/sycl/source/detail/scheduler/graph_builder.cpp +++ b/sycl/source/detail/scheduler/graph_builder.cpp @@ -927,10 +927,11 @@ void Scheduler::GraphBuilder::connectDepEvent(Command *const Cmd, { std::unique_ptr HT(new detail::HostTask); std::unique_ptr ConnectCG(new detail::CGHostTask( - std::move(HT), /* Args = */ {}, /* ArgsStorage = */ {}, - /* AccStorage = */ {}, /* SharedPtrStorage = */ {}, - /* Requirements = */ {}, /* DepEvents = */ {DepEvent}, - CG::CODEPLAY_HOST_TASK, /* Payload */ {})); + std::move(HT), /* Queue = */ {}, /* Context = */ {}, /* Args = */ {}, + /* ArgsStorage = */ {}, /* AccStorage = */ {}, + /* SharedPtrStorage = */ {}, /* Requirements = */ {}, + /* DepEvents = */ {DepEvent}, CG::CODEPLAY_HOST_TASK, + /* Payload */ {})); ConnectCmd = new ExecCGCommand( std::move(ConnectCG), Scheduler::getInstance().getDefaultHostQueue()); } diff --git a/sycl/source/handler.cpp b/sycl/source/handler.cpp index 9939638cc851b..7ad8e2bf88b76 100644 --- a/sycl/source/handler.cpp +++ b/sycl/source/handler.cpp @@ -85,9 +85,10 @@ event handler::finalize() { break; case detail::CG::CODEPLAY_HOST_TASK: CommandGroup.reset(new detail::CGHostTask( - std::move(MHostTask), std::move(MArgs), std::move(MArgsStorage), - std::move(MAccStorage), std::move(MSharedPtrStorage), - std::move(MRequirements), std::move(MEvents), MCGType, MCodeLoc)); + std::move(MHostTask), MQueue, MQueue->getContextImplPtr(), + std::move(MArgs), std::move(MArgsStorage), std::move(MAccStorage), + std::move(MSharedPtrStorage), std::move(MRequirements), + std::move(MEvents), MCGType, MCodeLoc)); break; case detail::CG::BARRIER: case detail::CG::BARRIER_WAITLIST: diff --git a/sycl/test/host-interop-task/interop-task-dependency.cpp b/sycl/test/host-interop-task/interop-task-dependency.cpp new file mode 100644 index 0000000000000..2fb273eebb31b --- /dev/null +++ b/sycl/test/host-interop-task/interop-task-dependency.cpp @@ -0,0 +1,202 @@ +// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out %threads_lib +// RUN: %CPU_RUN_PLACEHOLDER SYCL_PI_TRACE=-1 %t.out 2>&1 %CPU_CHECK_PLACEHOLDER +// RUN: %GPU_RUN_PLACEHOLDER SYCL_PI_TRACE=-1 %t.out 2>&1 %GPU_CHECK_PLACEHOLDER +// RUN: %ACC_RUN_PLACEHOLDER SYCL_PI_TRACE=-1 %t.out 2>&1 %ACC_CHECK_PLACEHOLDER + +#include +#include +#include +#include +#include + +#include + +namespace S = cl::sycl; + +struct Context { + std::atomic_bool Flag; + S::queue &Queue; + S::buffer Buf1; + S::buffer Buf2; + S::buffer Buf3; + std::mutex Mutex; + std::condition_variable CV; +}; + +void Thread1Fn(Context *Ctx) { + // 0. initialize resulting buffer with apriori wrong result + { + S::accessor + Acc(Ctx->Buf2); + + for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx) + Acc[Idx] = -1; + } + + { + S::accessor + Acc(Ctx->Buf2); + + for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx) + Acc[Idx] = -2; + } + + { + S::accessor + Acc(Ctx->Buf3); + + for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx) + Acc[Idx] = -3; + } + + // 1. submit task writing to buffer 1 + Ctx->Queue.submit([&](S::handler &CGH) { + S::accessor + GeneratorAcc(Ctx->Buf1, CGH); + + auto GeneratorKernel = [GeneratorAcc]() { + for (size_t Idx = 0; Idx < GeneratorAcc.get_count(); ++Idx) + GeneratorAcc[Idx] = Idx; + }; + + CGH.single_task(GeneratorKernel); + }); + + // 2. submit host task writing from buf 1 to buf 2 + auto HostTaskEvent = Ctx->Queue.submit([&](S::handler &CGH) { + S::accessor + CopierSrcAcc(Ctx->Buf1, CGH); + S::accessor + CopierDstAcc(Ctx->Buf2, CGH); + + auto CopierHostTask = [CopierSrcAcc, CopierDstAcc, &Ctx](S::interop_handle IH) { + // TODO write through interop handle objects + //(void)IH.get_native_mem(CopierSrcAcc); + //(void)IH.get_native_mem(CopierDstAcc); + (void)IH.get_native_queue(); + (void)IH.get_native_device(); + (void)IH.get_native_context(); + for (size_t Idx = 0; Idx < CopierDstAcc.get_count(); ++Idx) + CopierDstAcc[Idx] = CopierSrcAcc[Idx]; + + bool Expected = false; + bool Desired = true; + assert(Ctx->Flag.compare_exchange_strong(Expected, Desired)); + + { + std::lock_guard Lock(Ctx->Mutex); + Ctx->CV.notify_all(); + } + }; + + CGH.codeplay_host_task(CopierHostTask); + }); + + // 3. submit simple task to move data between two buffers + Ctx->Queue.submit([&](S::handler &CGH) { + S::accessor + SrcAcc(Ctx->Buf2, CGH); + S::accessor + DstAcc(Ctx->Buf3, CGH); + + CGH.depends_on(HostTaskEvent); + + auto CopierKernel = [SrcAcc, DstAcc]() { + for (size_t Idx = 0; Idx < DstAcc.get_count(); ++Idx) + DstAcc[Idx] = SrcAcc[Idx]; + }; + + CGH.single_task(CopierKernel); + }); + + // 4. check data in buffer #3 + { + S::accessor + Acc(Ctx->Buf3); + + bool Failure = false; + + for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx) { + fprintf(stderr, "Third buffer [%3zu] = %i\n", Idx, Acc[Idx]); + + Failure |= (Acc[Idx] != Idx); + } + + assert(!Failure && "Invalid data in third buffer"); + } +} + +void Thread2Fn(Context *Ctx) { + std::unique_lock Lock(Ctx->Mutex); + + // T2.1. Wait until flag F is set eq true. + Ctx->CV.wait(Lock, [&Ctx] { return Ctx->Flag.load(); }); + + assert(Ctx->Flag.load()); +} + +void test() { + auto EH = [](S::exception_list EL) { + for (const std::exception_ptr &E : EL) { + throw E; + } + }; + + S::queue Queue(EH); + + Context Ctx{{false}, Queue, {10}, {10}, {10}, {}, {}}; + + // 0. setup: thread 1 T1: exec smth; thread 2 T2: waits; init flag F = false + auto A1 = std::async(std::launch::async, Thread1Fn, &Ctx); + auto A2 = std::async(std::launch::async, Thread2Fn, &Ctx); + + A1.get(); + A2.get(); + + assert(Ctx.Flag.load()); + + // 3. check via host accessor that buf 2 contains valid data + { + S::accessor + ResultAcc(Ctx.Buf2); + + bool failure = false; + for (size_t Idx = 0; Idx < ResultAcc.get_count(); ++Idx) { + fprintf(stderr, "Second buffer [%3zu] = %i\n", Idx, ResultAcc[Idx]); + + failure |= (ResultAcc[Idx] != Idx); + } + + assert(!failure && "Invalid data in result buffer"); + } +} + +int main() { + test(); + + return 0; +} + +// launch of GeneratorTask kernel +// CHECK:---> piKernelCreate( +// CHECK: GeneratorTask +// CHECK:---> piEnqueueKernelLaunch( +// prepare for host task +// CHECK:---> piEnqueueMemBufferMap( +// launch of CopierTask kernel +// CHECK:---> piKernelCreate( +// CHECK: CopierTask +// CHECK:---> piEnqueueKernelLaunch( +// TODO need to check for piEventsWait as "wait on dependencies of host task". +// At the same time this piEventsWait may occur anywhere after +// piEnqueueMemBufferMap ("prepare for host task").