diff --git a/backends/qnnpack/QNNPackBackend.cpp b/backends/qnnpack/QNNPackBackend.cpp index 8bcfcb73186..eb33c7fd94d 100644 --- a/backends/qnnpack/QNNPackBackend.cpp +++ b/backends/qnnpack/QNNPackBackend.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/backends/qnnpack/targets.bzl b/backends/qnnpack/targets.bzl index 3916cf21842..f0fc478ae8c 100644 --- a/backends/qnnpack/targets.bzl +++ b/backends/qnnpack/targets.bzl @@ -83,7 +83,7 @@ def define_common_targets(): "//executorch/runtime/core/exec_aten/util:scalar_type_util", "//executorch/runtime/core/exec_aten/util:tensor_util", "//executorch/runtime/backend:backend_registry", - "//executorch/extension/fb/threadpool:threadpool", + "//executorch/backends/xnnpack/threadpool:threadpool", "//executorch/util:memory_utils", "//{prefix}caffe2/aten/src/ATen/native/quantized/cpu/qnnpack:pytorch_qnnpack".format( prefix = ( diff --git a/backends/xnnpack/runtime/XNNCompiler.cpp b/backends/xnnpack/runtime/XNNCompiler.cpp index 5e83f07890a..0c1c9e6d42c 100644 --- a/backends/xnnpack/runtime/XNNCompiler.cpp +++ b/backends/xnnpack/runtime/XNNCompiler.cpp @@ -7,8 +7,8 @@ */ #include +#include #include -#include #include #include diff --git a/backends/xnnpack/targets.bzl b/backends/xnnpack/targets.bzl index 4780c60a0fb..571cf4681f0 100644 --- a/backends/xnnpack/targets.bzl +++ b/backends/xnnpack/targets.bzl @@ -54,7 +54,7 @@ def define_common_targets(): ":xnnpack_schema", "//executorch/runtime/backend:backend_registry", "//executorch/backends/qnnpack:qnnpack_utils", # TODO Use (1) portable for choose_qparams(), (2) xnnpack for quantize_per_tensor() - "//executorch/extension/fb/threadpool:threadpool", + "//executorch/backends/xnnpack/threadpool:threadpool", "//executorch/util:memory_utils", "//executorch/runtime/core/exec_aten/util:tensor_util", ], diff --git a/backends/xnnpack/third-party/cpuinfo-wrappers/arm/mach/init.c b/backends/xnnpack/third-party/cpuinfo-wrappers/arm/mach/init.c index 47a71a6ba72..6316d8efd4f 100644 --- a/backends/xnnpack/third-party/cpuinfo-wrappers/arm/mach/init.c +++ b/backends/xnnpack/third-party/cpuinfo-wrappers/arm/mach/init.c @@ -4,6 +4,6 @@ #include #endif /* __APPLE__ */ -#if (defined(__arm__) || defined(__aarch64__)) && defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE +#if (defined(__arm__) || defined(__aarch64__)) && defined(TARGET_OS_MAC) && TARGET_OS_MAC #include -#endif /* (defined(__arm__) || defined(__aarch64__)) && defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE */ +#endif /* (defined(__arm__) || defined(__aarch64__)) && defined(TARGET_OS_MAC) && TARGET_OS_MAC */ diff --git a/backends/xnnpack/third-party/generate-cpuinfo-wrappers.py b/backends/xnnpack/third-party/generate-cpuinfo-wrappers.py index 4d5f2a1dce4..a04ef9e1dcf 100644 --- a/backends/xnnpack/third-party/generate-cpuinfo-wrappers.py +++ b/backends/xnnpack/third-party/generate-cpuinfo-wrappers.py @@ -63,7 +63,7 @@ "(defined(__arm__) || defined(__aarch64__)) && defined(__ANDROID__)": [ "arm/android/properties.c", ], - "(defined(__arm__) || defined(__aarch64__)) && defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE": [ + "(defined(__arm__) || defined(__aarch64__)) && defined(TARGET_OS_MAC) && TARGET_OS_MAC": [ "arm/mach/init.c", ],} diff --git a/backends/xnnpack/threadpool/TARGETS b/backends/xnnpack/threadpool/TARGETS new file mode 100644 index 00000000000..30d971558f3 --- /dev/null +++ b/backends/xnnpack/threadpool/TARGETS @@ -0,0 +1,6 @@ +# Any targets that should be shared between fbcode and xplat must be defined in +# targets.bzl. This file can contain fbcode-only targets. + +load(":targets.bzl", "define_common_targets") + +define_common_targets() diff --git a/backends/xnnpack/threadpool/targets.bzl b/backends/xnnpack/threadpool/targets.bzl new file mode 100644 index 00000000000..0c9e26d020d --- /dev/null +++ b/backends/xnnpack/threadpool/targets.bzl @@ -0,0 +1,42 @@ +load("@fbsource//xplat/executorch/backends/xnnpack/third-party:third_party_libs.bzl", "third_party_dep") +load("@fbsource//xplat/executorch/build:runtime_wrapper.bzl", "runtime") + +def define_common_targets(): + """Defines targets that should be shared between fbcode and xplat. + + The directory containing this targets.bzl file should also contain both + TARGETS and BUCK files that call this function. + """ + + _THREADPOOL_SRCS = [ + "threadpool.cpp", + "threadpool_guard.cpp", + ] + (["fb/threadpool_use_n_threads.cpp"] if not runtime.is_oss else []) + + _THREADPOOL_HEADERS = [ + "threadpool.h", + "threadpool_guard.h", + ] + (["fb/threadpool_use_n_threads.h"] if not runtime.is_oss else []) + + runtime.cxx_library( + name = "threadpool", + srcs = _THREADPOOL_SRCS, + deps = [ + "//executorch/runtime/core:core", + ], + exported_headers = _THREADPOOL_HEADERS, + exported_deps = [ + third_party_dep("pthreadpool"), + ], + external_deps = ["cpuinfo"], + exported_preprocessor_flags = [ + "-DET_USE_THREADPOOL", + ], + visibility = [ + "//executorch/...", + "//executorch/backends/...", + "//executorch/runtime/backend/...", + "//executorch/extension/threadpool/test/...", + "@EXECUTORCH_CLIENTS", + ], + ) diff --git a/backends/xnnpack/threadpool/test/TARGETS b/backends/xnnpack/threadpool/test/TARGETS new file mode 100644 index 00000000000..2341af9282f --- /dev/null +++ b/backends/xnnpack/threadpool/test/TARGETS @@ -0,0 +1,8 @@ +# Any targets that should be shared between fbcode and xplat must be defined in +# targets.bzl. This file can contain fbcode-only targets. + +load(":targets.bzl", "define_common_targets") + +oncall("executorch") + +define_common_targets() diff --git a/backends/xnnpack/threadpool/test/targets.bzl b/backends/xnnpack/threadpool/test/targets.bzl new file mode 100644 index 00000000000..7bbcd8c4c03 --- /dev/null +++ b/backends/xnnpack/threadpool/test/targets.bzl @@ -0,0 +1,20 @@ +load("@fbsource//xplat/executorch/build:runtime_wrapper.bzl", "runtime") + +def define_common_targets(): + """Defines targets that should be shared between fbcode and xplat. + + The directory containing this targets.bzl file should also contain both + TARGETS and BUCK files that call this function. + """ + + _THREADPOOL_TESTS = [ + "threadpool_test.cpp", + ] + (["fb/threadpool_use_n_threads_test.cpp"] if not runtime.is_oss else []) + + runtime.cxx_test( + name = "threadpool_test", + srcs = _THREADPOOL_TESTS, + deps = [ + "//executorch/backends/xnnpack/threadpool:threadpool", + ], + ) diff --git a/backends/xnnpack/threadpool/test/threadpool_test.cpp b/backends/xnnpack/threadpool/test/threadpool_test.cpp new file mode 100644 index 00000000000..a63a264e27b --- /dev/null +++ b/backends/xnnpack/threadpool/test/threadpool_test.cpp @@ -0,0 +1,188 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include + +#include +#include + +using namespace ::testing; + +namespace { + +size_t div_round_up(const size_t divident, const size_t divisor) { + return (divident + divisor - 1) / divisor; +} + +void resize_and_fill_vector(std::vector& a, const size_t size) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distrib(1, size * 2); + a.resize(size); + auto generator = [&distrib, &gen]() { return distrib(gen); }; + std::generate(a.begin(), a.end(), generator); +} + +void generate_add_test_inputs( + std::vector& a, + std::vector& b, + std::vector& c_ref, + std::vector& c, + size_t vector_size) { + resize_and_fill_vector(a, vector_size); + resize_and_fill_vector(b, vector_size); + resize_and_fill_vector(c, vector_size); + resize_and_fill_vector(c_ref, vector_size); + for (size_t i = 0, size = a.size(); i < size; ++i) { + c_ref[i] = a[i] + b[i]; + } +} + +void generate_reduce_test_inputs( + std::vector& a, + int32_t& c_ref, + size_t vector_size) { + resize_and_fill_vector(a, vector_size); + c_ref = 0; + for (size_t i = 0, size = a.size(); i < size; ++i) { + c_ref += a[i]; + } +} + +void run_lambda_with_size( + std::function f, + size_t range, + size_t grain_size) { + size_t num_grains = div_round_up(range, grain_size); + + auto threadpool = torch::executorch::threadpool::get_threadpool(); + threadpool->run(f, range); +} +} // namespace + +TEST(ThreadPoolTest, ParallelAdd) { + std::vector a, b, c, c_ref; + size_t vector_size = 100; + size_t grain_size = 10; + + auto add_lambda = [&](size_t i) { + size_t start_index = i * grain_size; + size_t end_index = start_index + grain_size; + end_index = std::min(end_index, vector_size); + for (size_t j = start_index; j < end_index; ++j) { + c[j] = a[j] + b[j]; + } + }; + + auto threadpool = torch::executorch::threadpool::get_threadpool(); + EXPECT_GT(threadpool->get_thread_count(), 1); + + generate_add_test_inputs(a, b, c_ref, c, vector_size); + run_lambda_with_size(add_lambda, vector_size, grain_size); + EXPECT_EQ(c, c_ref); + + // Try smaller grain size + grain_size = 5; + generate_add_test_inputs(a, b, c_ref, c, vector_size); + run_lambda_with_size(add_lambda, vector_size, grain_size); + EXPECT_EQ(c, c_ref); + + vector_size = 7; + generate_add_test_inputs(a, b, c_ref, c, vector_size); + run_lambda_with_size(add_lambda, vector_size, grain_size); + EXPECT_EQ(c, c_ref); + + vector_size = 7; + grain_size = 5; + generate_add_test_inputs(a, b, c_ref, c, vector_size); + run_lambda_with_size(add_lambda, vector_size, grain_size); + EXPECT_EQ(c, c_ref); +} + +// Test parallel reduction where we acquire lock within lambda +TEST(ThreadPoolTest, ParallelReduce) { + std::vector a; + int32_t c = 0, c_ref = 0; + size_t vector_size = 100; + size_t grain_size = 11; + std::mutex m; + + auto reduce_lambda = [&](size_t i) { + size_t start_index = i * grain_size; + size_t end_index = start_index + grain_size; + end_index = std::min(end_index, vector_size); + std::lock_guard lock(m); + for (size_t j = start_index; j < end_index; ++j) { + c += a[j]; + } + }; + + auto threadpool = torch::executorch::threadpool::get_threadpool(); + EXPECT_GT(threadpool->get_thread_count(), 1); + + generate_reduce_test_inputs(a, c_ref, vector_size); + run_lambda_with_size(reduce_lambda, vector_size, grain_size); + EXPECT_EQ(c, c_ref); + + vector_size = 7; + c = c_ref = 0; + generate_reduce_test_inputs(a, c_ref, vector_size); + run_lambda_with_size(reduce_lambda, vector_size, grain_size); + EXPECT_EQ(c, c_ref); +} + +// Copied from +// caffe2/aten/src/ATen/test/test_thread_pool_guard.cp +TEST(TestNoThreadPoolGuard, TestThreadPoolGuard) { + auto threadpool_ptr = torch::executorch::threadpool::get_pthreadpool(); + + ASSERT_NE(threadpool_ptr, nullptr); + { + torch::executorch::threadpool::NoThreadPoolGuard g1; + auto threadpool_ptr1 = torch::executorch::threadpool::get_pthreadpool(); + ASSERT_EQ(threadpool_ptr1, nullptr); + + { + torch::executorch::threadpool::NoThreadPoolGuard g2; + auto threadpool_ptr2 = torch::executorch::threadpool::get_pthreadpool(); + ASSERT_EQ(threadpool_ptr2, nullptr); + } + + // Guard should restore prev value (nullptr) + auto threadpool_ptr3 = torch::executorch::threadpool::get_pthreadpool(); + ASSERT_EQ(threadpool_ptr3, nullptr); + } + + // Guard should restore prev value (pthreadpool_) + auto threadpool_ptr4 = torch::executorch::threadpool::get_pthreadpool(); + ASSERT_NE(threadpool_ptr4, nullptr); + ASSERT_EQ(threadpool_ptr4, threadpool_ptr); +} + +TEST(TestNoThreadPoolGuard, TestRunWithGuard) { + const std::vector array = {1, 2, 3}; + + auto pool = torch::executorch::threadpool::get_threadpool(); + int64_t inner = 0; + { + // Run on same thread + torch::executorch::threadpool::NoThreadPoolGuard g1; + auto fn = [&array, &inner](const size_t task_id) { + inner += array[task_id]; + }; + pool->run(fn, 3); + + // confirm the guard is on + auto threadpool_ptr = torch::executorch::threadpool::get_pthreadpool(); + ASSERT_EQ(threadpool_ptr, nullptr); + } + ASSERT_EQ(inner, 6); +} diff --git a/backends/xnnpack/threadpool/threadpool.cpp b/backends/xnnpack/threadpool/threadpool.cpp new file mode 100644 index 00000000000..22489c4d1f9 --- /dev/null +++ b/backends/xnnpack/threadpool/threadpool.cpp @@ -0,0 +1,130 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include + +#include + +#include + +namespace torch { +namespace executorch { +namespace threadpool { + +#if !(defined(WIN32)) +namespace { +// After fork, the child process inherits the data-structures of the parent +// process' thread-pool, but since those threads don't exist, the thread-pool +// is corrupt. It's leaked in order to prevent segfaults. +// Ref: https://github.com/pytorch/pytorch/issues/54752#issuecomment-810315302 +bool leak_corrupted_threadpool = false; + +void child_atfork() { + leak_corrupted_threadpool = true; +} + +} // namespace +#endif + +ThreadPool::ThreadPool(size_t thread_count) + : threadpool_(pthreadpool_create(thread_count), pthreadpool_destroy) {} + +size_t ThreadPool::get_thread_count() const { + std::lock_guard lock{mutex_}; + + ET_CHECK_MSG(threadpool_.get(), "Invalid threadpool!"); + return pthreadpool_get_threads_count(threadpool_.get()); +} + +void ThreadPool::run( + const std::function& fn, + const size_t range) { + // Run on same thread if NoThreadPoolGuard guard is enabled + if (NoThreadPoolGuard::is_enabled()) { + for (size_t i = 0; i < range; ++i) { + fn(i); + } + return; + } + + std::lock_guard lock{mutex_}; + + ET_CHECK_MSG(!NoThreadPoolGuard::is_enabled(), "Inside a threadpool guard!"); + ET_CHECK_MSG(threadpool_.get(), "Invalid threadpool!"); + + struct Context final { + const std::function& fn; + } context{ + fn, + }; + + pthreadpool_parallelize_1d( + threadpool_.get(), + // Note: pthreadpool_parallelize_1d() is a blocking function. The + // function pointer to this lambda passed on to + // pthreadpool_parallelize_1d() cannot go out of scope until + // pthreadpool_parallelize_1d() returns. + [](void* const context, const size_t item) { + reinterpret_cast(context)->fn(item); + }, + &context, + range, + 0u); +} + +// get_threadpool is not thread safe due to leak_corrupted_threadpool +// Make this part threadsafe: TODO(kimishpatel) +ThreadPool* get_threadpool() { + ET_CHECK_MSG(cpuinfo_initialize(), "cpuinfo initialization failed"); + int num_threads = cpuinfo_get_processors_count(); + /* + * For llvm-tsan, holding limit for the number of locks for a single thread + * is 63 (because of comparison < 64 instead of <=). pthreadpool's worst + * case is the number of threads in a pool. So we want to limit the threadpool + * size to 64 when running with tsan. However, sometimes it is tricky to + * detect if we are running under tsan, for now capping the default + * threadcount to the tsan limit unconditionally. + */ + constexpr int tsan_thread_limit = 63; + num_threads = std::min(num_threads, tsan_thread_limit); + static auto threadpool = std::make_unique(num_threads); + +// Inheriting from old threadpool to get around segfault issue +// commented above at child_atfork +#if !(defined(WIN32)) + // @lint-ignore CLANGTIDY facebook-hte-std::once_flag + static std::once_flag flag; + // @lint-ignore CLANGTIDY facebook-hte-std::call_once + std::call_once( + flag, []() { pthread_atfork(nullptr, nullptr, child_atfork); }); + if __ET_UNLIKELY (leak_corrupted_threadpool) { + leak_corrupted_threadpool = false; + if (auto leaked = threadpool.release()) { + auto t = leaked->get_thread_count(); + threadpool = std::make_unique(t); + } + } +#endif + return threadpool.get(); +} + +pthreadpool_t get_pthreadpool() { + if (NoThreadPoolGuard::is_enabled()) { + return nullptr; + } + ThreadPool* const threadpool = get_threadpool(); + ET_CHECK_MSG(threadpool, "Failed to acquire an instance of ThreadPool!"); + return threadpool->threadpool_.get(); +} + +} // namespace threadpool +} // namespace executorch +} // namespace torch diff --git a/backends/xnnpack/threadpool/threadpool.h b/backends/xnnpack/threadpool/threadpool.h new file mode 100644 index 00000000000..90604cbd753 --- /dev/null +++ b/backends/xnnpack/threadpool/threadpool.h @@ -0,0 +1,69 @@ +#pragma once + +#include + +// @nolint PATTERNLINT Ok to use stdlib for this optional library +#include +// @nolint PATTERNLINT Ok to use stdlib for this optional library +#include +// @nolint PATTERNLINT Ok to use stdlib for this optional library +#include + +namespace torch { +namespace executorch { +namespace threadpool { + +class ThreadPool final { + public: + explicit ThreadPool(size_t thread_count = 0); + ~ThreadPool() = default; + + // Make threadpool non copyable + // Non-copyable: threadpool cannot be copied because it will + // effectively require cloning of threadpool. + // Cloning can be done by just calling create_thread_pool. + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + + // Make threadpool non-movable. + // For now this is non-movable, but if we want to have clients + // such as say torch::executorch::Executor, to be able to own + // threadpool, then we will have to make this movable. + ThreadPool(ThreadPool&&) = delete; + ThreadPool& operator=(ThreadPool&&) = delete; + + size_t get_thread_count() const; + + // Run, in parallel, function fn(task_id) over task_id in range [0, range). + // This function is blocking. All input is processed by the time it returns. + // NoThreadPoolGuard (see threadpool_guard.h) can used to disable + // use of multiple threads with the scope of the guard + // When NoThreadPoolGuard is not used all calls to run method are serialized. + void run(const std::function& fn, size_t range); + + private: + friend pthreadpool_t get_pthreadpool(); + + private: + // This mutex is used inside get_thread_count API but it is not + // really needed. Since data members of ThreadPool objects are not + // really mutable. + // Figure out if we will allow set_num_threads API, in which mutex + // will be useful. Otherwise remove it. + // TODO(kimishpatel) + mutable std::mutex mutex_; + std::unique_ptr threadpool_; +}; + +// Return a singleton instance of ThreadPool for ATen/TH multithreading. +ThreadPool* get_threadpool(); + +// Exposes the underlying implementation of ThreadPool. +// Only for use in external libraries so as to unify threading across +// internal (i.e. ATen, etc.) and external (e.g. NNPACK, QNNPACK, XNNPACK) +// use cases. +pthreadpool_t get_pthreadpool(); + +} // namespace threadpool +} // namespace executorch +} // namespace torch diff --git a/backends/xnnpack/threadpool/threadpool_guard.cpp b/backends/xnnpack/threadpool/threadpool_guard.cpp new file mode 100644 index 00000000000..a7f2a1803db --- /dev/null +++ b/backends/xnnpack/threadpool/threadpool_guard.cpp @@ -0,0 +1,27 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +namespace torch { +namespace executorch { +namespace threadpool { + +thread_local bool NoThreadPoolGuard_enabled = false; + +bool NoThreadPoolGuard::is_enabled() { + return NoThreadPoolGuard_enabled; +} + +void NoThreadPoolGuard::set_enabled(bool enabled) { + NoThreadPoolGuard_enabled = enabled; +} + +} // namespace threadpool +} // namespace executorch +} // namespace torch diff --git a/backends/xnnpack/threadpool/threadpool_guard.h b/backends/xnnpack/threadpool/threadpool_guard.h new file mode 100644 index 00000000000..5871897ab14 --- /dev/null +++ b/backends/xnnpack/threadpool/threadpool_guard.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +namespace torch { +namespace executorch { +namespace threadpool { + +// A RAII, thread local (!) guard that enables or disables guard upon +// construction, and sets it back to the original value upon destruction. +struct NoThreadPoolGuard { + static bool is_enabled(); + static void set_enabled(bool enabled); + + NoThreadPoolGuard() : prev_mode_(NoThreadPoolGuard::is_enabled()) { + NoThreadPoolGuard::set_enabled(true); + } + ~NoThreadPoolGuard() { + NoThreadPoolGuard::set_enabled(prev_mode_); + } + + private: + const bool prev_mode_; +}; + +} // namespace threadpool +} // namespace executorch +} // namespace torch diff --git a/configurations/targets.bzl b/configurations/targets.bzl index dcd04c61d62..5a8e3af21a5 100644 --- a/configurations/targets.bzl +++ b/configurations/targets.bzl @@ -19,7 +19,7 @@ def define_common_targets(): runtime.cxx_library( name = "executor_cpu_optimized", exported_deps = [ - "//executorch/extension/fb/threadpool:threadpool", + "//executorch/backends/xnnpack/threadpool:threadpool", ] + get_all_cpu_backend_targets(), visibility = [ "//executorch/test/...", diff --git a/sdk/runners/executor_runner.cpp b/sdk/runners/executor_runner.cpp index 073e5ac26b3..178b60de79e 100644 --- a/sdk/runners/executor_runner.cpp +++ b/sdk/runners/executor_runner.cpp @@ -23,8 +23,8 @@ #endif #if !defined(USE_ATEN_LIB) -#include -#include +#include +#include #endif // This tool includes all of the headers necessary to execute a model.