Skip to content

[SYCL] Don't block execution when flushing a stream #2581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sycl/include/CL/sycl/detail/cg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class CGExecKernel : public CG {
vector_class<shared_ptr_class<detail::stream_impl>> getStreams() const {
return MStreams;
}
void clearStreams() { MStreams.clear(); }
};

/// "Copy memory" command group class.
Expand Down
9 changes: 8 additions & 1 deletion sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,14 @@ AllocaCommandBase *ExecCGCommand::getAllocaForReq(Requirement *Req) {
}

vector_class<StreamImplPtr> ExecCGCommand::getStreams() const {
return ((CGExecKernel *)MCommandGroup.get())->getStreams();
if (MCommandGroup->getType() == CG::KERNEL)
return ((CGExecKernel *)MCommandGroup.get())->getStreams();
return {};
}

void ExecCGCommand::clearStreams() {
if (MCommandGroup->getType() == CG::KERNEL)
((CGExecKernel *)MCommandGroup.get())->clearStreams();
}

cl_int UpdateHostRequirementCommand::enqueueImp() {
Expand Down
2 changes: 2 additions & 0 deletions sycl/source/detail/scheduler/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ class ExecCGCommand : public Command {

vector_class<StreamImplPtr> getStreams() const;

void clearStreams();

void printDot(std::ostream &Stream) const final override;
void emitInstrumentationData() final override;

Expand Down
28 changes: 26 additions & 2 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,9 @@ void Scheduler::GraphBuilder::decrementLeafCountersForRecord(
}
}

void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
void Scheduler::GraphBuilder::cleanupCommandsForRecord(
MemObjRecord *Record,
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
std::vector<AllocaCommandBase *> &AllocaCommands = Record->MAllocaCommands;
if (AllocaCommands.empty())
return;
Expand Down Expand Up @@ -872,6 +874,16 @@ void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
if (!markNodeAsVisited(Cmd, MVisitedCmds))
continue;

// Collect stream objects for a visited command.
if (Cmd->getType() == Command::CommandType::RUN_CG) {
auto ExecCmd = static_cast<ExecCGCommand *>(Cmd);
std::vector<std::shared_ptr<stream_impl>> Streams =
std::move(ExecCmd->getStreams());
ExecCmd->clearStreams();
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
Streams.end());
}

for (Command *UserCmd : Cmd->MUsers)
if (UserCmd->getType() != Command::CommandType::ALLOCA)
MCmdsToVisit.push(UserCmd);
Expand Down Expand Up @@ -909,7 +921,9 @@ void Scheduler::GraphBuilder::cleanupCommandsForRecord(MemObjRecord *Record) {
handleVisitedNodes(MVisitedCmds);
}

void Scheduler::GraphBuilder::cleanupFinishedCommands(Command *FinishedCmd) {
void Scheduler::GraphBuilder::cleanupFinishedCommands(
Command *FinishedCmd,
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
assert(MCmdsToVisit.empty());
MCmdsToVisit.push(FinishedCmd);
MVisitedCmds.clear();
Expand All @@ -922,6 +936,16 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands(Command *FinishedCmd) {
if (!markNodeAsVisited(Cmd, MVisitedCmds))
continue;

// Collect stream objects for a visited command.
if (Cmd->getType() == Command::CommandType::RUN_CG) {
auto ExecCmd = static_cast<ExecCGCommand *>(Cmd);
std::vector<std::shared_ptr<stream_impl>> Streams =
std::move(ExecCmd->getStreams());
ExecCmd->clearStreams();
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
Streams.end());
}

for (const DepDesc &Dep : Cmd->MDeps) {
if (Dep.MDepCommand)
MCmdsToVisit.push(Dep.MDepCommand);
Expand Down
109 changes: 77 additions & 32 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <detail/stream_impl.hpp>

#include <chrono>
#include <cstdio>
#include <memory>
#include <mutex>
#include <set>
Expand Down Expand Up @@ -161,47 +162,77 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
}

void Scheduler::cleanupFinishedCommands(EventImplPtr FinishedEvent) {
// Avoiding deadlock situation, where one thread is in the process of
// enqueueing (with a locked mutex) a currently blocked task that waits for
// another thread which is stuck at attempting cleanup.
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::try_to_lock);
if (Lock.owns_lock()) {
Command *FinishedCmd = static_cast<Command *>(FinishedEvent->getCommand());
// The command might have been cleaned up (and set to nullptr) by another
// thread
if (FinishedCmd)
MGraphBuilder.cleanupFinishedCommands(FinishedCmd);
// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
// objects, this is needed to guarantee that streamed data is printed and
// resources are released.
std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
{
// Avoiding deadlock situation, where one thread is in the process of
// enqueueing (with a locked mutex) a currently blocked task that waits for
// another thread which is stuck at attempting cleanup.
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock,
std::try_to_lock);
if (Lock.owns_lock()) {
Command *FinishedCmd =
static_cast<Command *>(FinishedEvent->getCommand());
// The command might have been cleaned up (and set to nullptr) by another
// thread
if (FinishedCmd)
MGraphBuilder.cleanupFinishedCommands(FinishedCmd, StreamsToDeallocate);
}
}
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (std::vector<std::shared_ptr<stream_impl>>::reverse_iterator
StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

void Scheduler::removeMemoryObject(detail::SYCLMemObjI *MemObj) {
MemObjRecord *Record = nullptr;
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);

// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
// objects, this is needed to guarantee that streamed data is printed and
// resources are released.
std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
{
lockSharedTimedMutex(Lock);
MemObjRecord *Record = nullptr;
std::unique_lock<std::shared_timed_mutex> Lock(MGraphLock, std::defer_lock);

Record = MGraphBuilder.getMemObjRecord(MemObj);
if (!Record)
// No operations were performed on the mem object
return;
{
lockSharedTimedMutex(Lock);

Lock.unlock();
}
Record = MGraphBuilder.getMemObjRecord(MemObj);
if (!Record)
// No operations were performed on the mem object
return;

{
// This only needs a shared mutex as it only involves enqueueing and
// awaiting for events
std::shared_lock<std::shared_timed_mutex> Lock(MGraphLock);
waitForRecordToFinish(Record);
}
Lock.unlock();
}

{
lockSharedTimedMutex(Lock);
MGraphBuilder.decrementLeafCountersForRecord(Record);
MGraphBuilder.cleanupCommandsForRecord(Record);
MGraphBuilder.removeRecordForMemObj(MemObj);
{
// This only needs a shared mutex as it only involves enqueueing and
// awaiting for events
std::shared_lock<std::shared_timed_mutex> Lock(MGraphLock);
waitForRecordToFinish(Record);
}

{
lockSharedTimedMutex(Lock);
MGraphBuilder.decrementLeafCountersForRecord(Record);
MGraphBuilder.cleanupCommandsForRecord(Record, StreamsToDeallocate);
MGraphBuilder.removeRecordForMemObj(MemObj);
}
}
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (std::vector<std::shared_ptr<stream_impl>>::reverse_iterator
StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {
Expand Down Expand Up @@ -251,11 +282,12 @@ void Scheduler::allocateStreamBuffers(stream_impl *Impl,
size_t FlushBufferSize) {
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
StreamBuffersPool.insert(
{Impl, StreamBuffers(StreamBufferSize, FlushBufferSize)});
{Impl, new StreamBuffers(StreamBufferSize, FlushBufferSize)});
}

void Scheduler::deallocateStreamBuffers(stream_impl *Impl) {
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
delete StreamBuffersPool[Impl];
StreamBuffersPool.erase(Impl);
}

Expand All @@ -266,6 +298,19 @@ Scheduler::Scheduler() {
/*PropList=*/{}));
}

Scheduler::~Scheduler() {
// By specification there are several possible sync points: buffer
// destruction, wait() method of a queue or event. Stream doesn't introduce
// any synchronization point. It is guaranteed that stream is flushed and
// resources are released only if one of the listed sync points was used for
// the kernel. Otherwise resources for stream will not be released, issue a
// warning in this case.
if (StreamBuffersPool.size() > 0)
printf("\nWARNING: Some commands may have not finished the execution and "
"not all resources were released. Please be sure that all kernels "
"have sycnhronization points.\n");
}

void Scheduler::lockSharedTimedMutex(
std::unique_lock<std::shared_timed_mutex> &Lock) {
#ifdef _WIN32
Expand Down
20 changes: 15 additions & 5 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ class Scheduler {

protected:
Scheduler();
~Scheduler();
static Scheduler instance;

/// Provides exclusive access to std::shared_timed_mutex object with deadlock
Expand Down Expand Up @@ -490,7 +491,9 @@ class Scheduler {

/// Removes finished non-leaf non-alloca commands from the subgraph
/// (assuming that all its commands have been waited for).
void cleanupFinishedCommands(Command *FinishedCmd);
void cleanupFinishedCommands(
Command *FinishedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);

/// Reschedules the command passed using Queue provided.
///
Expand All @@ -513,7 +516,9 @@ class Scheduler {
void decrementLeafCountersForRecord(MemObjRecord *Record);

/// Removes commands that use the given MemObjRecord from the graph.
void cleanupCommandsForRecord(MemObjRecord *Record);
void cleanupCommandsForRecord(
MemObjRecord *Record,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);

/// Removes the MemObjRecord for the memory object passed.
void removeRecordForMemObj(SYCLMemObjI *MemObject);
Expand Down Expand Up @@ -728,7 +733,12 @@ class Scheduler {
: Data(StreamBufferSize, 0),
Buf(Data.data(), range<1>(StreamBufferSize),
{property::buffer::use_host_ptr()}),
FlushBuf(range<1>(FlushBufferSize)) {}
FlushBuf(range<1>(FlushBufferSize)) {
// Disable copy back on buffer destruction. Copy is scheduled as a host
// task which fires up as soon as kernel has completed exectuion.
Buf.set_write_back(false);
FlushBuf.set_write_back(false);
}

// Vector on the host side which is used to initialize the stream
// buffer
Expand All @@ -745,12 +755,12 @@ class Scheduler {

// Protects stream buffers pool
std::mutex StreamBuffersPoolMutex;
std::map<stream_impl *, StreamBuffers> StreamBuffersPool;
std::map<stream_impl *, StreamBuffers *> StreamBuffersPool;

/// Allocate buffers in the pool for a provided stream
void allocateStreamBuffers(stream_impl *, size_t, size_t);

/// Deallocate buffers in the pool for a provided stream
/// Deallocate all stream buffers in the pool
void deallocateStreamBuffers(stream_impl *);
};

Expand Down
53 changes: 34 additions & 19 deletions sycl/source/detail/stream_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//
//===----------------------------------------------------------------------===//

#include <CL/sycl/queue.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all these includes using "" instead as they are implementation details and not user-facing system includes? What happens if there is already an incompatible SYCL implementation on the system and you are testing this one at the same time without installing it?

#include <detail/scheduler/scheduler.hpp>
#include <detail/stream_impl.hpp>

Expand Down Expand Up @@ -33,24 +34,25 @@ stream_impl::stream_impl(size_t BufferSize, size_t MaxStatementSize,
GlobalBufAccessorT stream_impl::accessGlobalBuf(handler &CGH) {
return detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second.Buf.get_access<cl::sycl::access::mode::read_write>(
->second->Buf.get_access<cl::sycl::access::mode::read_write>(
CGH, range<1>(BufferSize_), id<1>(OffsetSize));
}

// Method to provide an accessor to the global flush buffer
GlobalBufAccessorT stream_impl::accessGlobalFlushBuf(handler &CGH) {
return detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second.FlushBuf.get_access<cl::sycl::access::mode::read_write>(
->second->FlushBuf.get_access<cl::sycl::access::mode::read_write>(
CGH, range<1>(MaxStatementSize_), id<1>(0));
}

// Method to provide an atomic access to the offset in the global stream
// buffer and offset in the flush buffer
GlobalOffsetAccessorT stream_impl::accessGlobalOffset(handler &CGH) {
auto OffsetSubBuf = buffer<char, 1>(
detail::Scheduler::getInstance().StreamBuffersPool.find(this)->second.Buf,
id<1>(0), range<1>(OffsetSize));
auto OffsetSubBuf = buffer<char, 1>(detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second->Buf,
id<1>(0), range<1>(OffsetSize));
auto ReinterpretedBuf = OffsetSubBuf.reinterpret<unsigned, 1>(range<1>(2));
return ReinterpretedBuf.get_access<cl::sycl::access::mode::atomic>(
CGH, range<1>(2), id<1>(0));
Expand All @@ -60,20 +62,33 @@ size_t stream_impl::get_size() const { return BufferSize_; }
size_t stream_impl::get_max_statement_size() const { return MaxStatementSize_; }

void stream_impl::flush() {
// Access the stream buffer on the host. This access guarantees that kernel is
// executed and buffer contains streamed data.
{
auto HostAcc = detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second.Buf.get_access<cl::sycl::access::mode::read>(
range<1>(BufferSize_), id<1>(OffsetSize));

printf("%s", HostAcc.get_pointer());
fflush(stdout);
}

// Flushed the stream, can deallocate the buffers now.
detail::Scheduler::getInstance().deallocateStreamBuffers(this);
// We don't want stream flushing to be blocking operation that is why submit a
// host task to print stream buffer. It will fire up as soon as the kernel
// finishes execution.
auto Q = detail::createSyclObjFromImpl<queue>(
cl::sycl::detail::Scheduler::getInstance().getDefaultHostQueue());
Q.submit([&](handler &cgh) {
auto BufHostAcc =
detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second->Buf
.get_access<access::mode::read_write, access::target::host_buffer>(
cgh, range<1>(BufferSize_), id<1>(OffsetSize));
// Create accessor to the flush buffer even if not using it yet. Otherwise
// kernel will be a leaf for the flush buffer and scheduler will not be able
// to cleanup the kernel. TODO: git rid of finalize method by using host
// accessor to the flush buffer.
auto FlushBufHostAcc =
detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second->FlushBuf
.get_access<access::mode::read_write, access::target::host_buffer>(
cgh);
cgh.codeplay_host_task([=] {
printf("%s", BufHostAcc.get_pointer());
fflush(stdout);
});
});
}
} // namespace detail
} // namespace sycl
Expand Down
2 changes: 1 addition & 1 deletion sycl/source/detail/stream_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class __SYCL_EXPORT stream_impl {
// buffer and offset in the flush buffer
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH);

// Copy stream buffer to the host and print the contents
// Enqueue task to copy stream buffer to the host and print the contents
void flush();

size_t get_size() const;
Expand Down
Loading