Skip to content

Commit 8e11e08

Browse files
committed
Add workTaskflowExample
workTaskflowExample is an example custom task scheduling backend based on the taskflow library. (Internal change: 2371697) (Internal change: 2371781) (Internal change: 2371733) (Internal change: 2371697)
1 parent d60da5e commit 8e11e08

File tree

15 files changed

+837
-0
lines changed

15 files changed

+837
-0
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
cmake_minimum_required(VERSION 3.26)
2+
3+
project(workTaskflowExample
4+
LANGUAGES CXX
5+
)
6+
7+
include(CMakePackageConfigHelpers)
8+
include(FetchContent)
9+
include(GNUInstallDirs)
10+
11+
set(CMAKE_CXX_STANDARD 17)
12+
set(CMAKE_CXX_STANDARD_REQUIRED True)
13+
14+
# Fetch and install Taskflow unless the user has already supplied that
15+
# dependency and it can be found via a call to find_package. This would
16+
# typically happen if the user made Taskflow's config file package visible
17+
# to cmake, e.g. by setting Taskflow_DIR to the directory where Taskflow's
18+
# config files are installed.
19+
#
20+
# Note that by default FetchContent doesn't provide any diagnostic output,
21+
# but users can specify FETCHCONTENT_QUIET=OFF when running cmake to get
22+
# more information.
23+
FetchContent_Declare(
24+
Taskflow
25+
GIT_REPOSITORY https://github.com/taskflow/taskflow.git
26+
GIT_TAG 2dfa50a567d48b8439807f5da8a041ba64d4fb63 # v3.10.0
27+
FIND_PACKAGE_ARGS CONFIG
28+
)
29+
30+
# Disable tests and examples since we don't need them and they can
31+
# be slow-ish to build.
32+
set(TF_BUILD_EXAMPLES OFF CACHE INTERNAL BOOL)
33+
set(TF_BUILD_TESTS OFF CACHE INTERNAL BOOL)
34+
35+
FetchContent_MakeAvailable(Taskflow)
36+
37+
# Make sure that Taskflow_DIR is set to the location of the _installed_
38+
# Taskflow package config file used to build this library. We'll use this
39+
# below when setting up our package config file.
40+
#
41+
# If we found an externally-supplied Taskflow package above, Taskflow_FOUND
42+
# will be set and Taskflow_DIR will already be set to the location we want.
43+
#
44+
# If we built Taskflow ourselves, Taskflow_FOUND will not be set and
45+
# Taskflow_DIR will be set to some internal build location. We want to
46+
# reset that to the location where Taskflow will be installed.
47+
if (NOT Taskflow_FOUND)
48+
set(Taskflow_DIR "${CMAKE_INSTALL_LIBDIR}/cmake/Taskflow")
49+
endif()
50+
51+
# Set up library target.
52+
add_library(workTaskflowExample SHARED)
53+
54+
target_link_libraries(
55+
workTaskflowExample
56+
PUBLIC
57+
Taskflow::Taskflow
58+
)
59+
60+
target_sources(
61+
workTaskflowExample
62+
PRIVATE
63+
detachedTask.cpp
64+
dispatcher.cpp
65+
PUBLIC
66+
FILE_SET HEADERS
67+
FILES
68+
api.h
69+
detachedTask.h
70+
dispatcher.h
71+
executorStack.h
72+
impl.h
73+
loops.h
74+
reduce.h
75+
sort.h
76+
threadLimits.h
77+
withScopedParallelism.h
78+
)
79+
80+
target_include_directories(
81+
workTaskflowExample
82+
PUBLIC
83+
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/..>
84+
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
85+
)
86+
87+
target_compile_definitions(
88+
workTaskflowExample
89+
PRIVATE
90+
"WORK_TASKFLOW_EXAMPLE_EXPORT"
91+
)
92+
93+
# Install artifacts and set up package config file
94+
install(
95+
TARGETS workTaskflowExample
96+
EXPORT workTaskflowExampleTargets
97+
FILE_SET HEADERS
98+
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/workTaskflowExample"
99+
)
100+
101+
install(
102+
EXPORT workTaskflowExampleTargets
103+
FILE workTaskflowExampleTargets.cmake
104+
NAMESPACE workTaskflowExample::
105+
DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/workTaskflowExample"
106+
)
107+
108+
configure_package_config_file(
109+
workTaskflowExampleConfig.cmake.in
110+
"${CMAKE_CURRENT_BINARY_DIR}/workTaskflowExampleConfig.cmake"
111+
INSTALL_DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/workTaskflowExample"
112+
PATH_VARS Taskflow_DIR
113+
)
114+
115+
install(
116+
FILES "${CMAKE_CURRENT_BINARY_DIR}/workTaskflowExampleConfig.cmake"
117+
DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/workTaskflowExample"
118+
)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
workTaskflowExample
2+
===================
3+
`workTaskflowExample` is an example demonstrating how to create and build USD
4+
against a custom task management implementation. It is an example only and is
5+
unsupported for general use.
6+
7+
`workTaskflowExample` is based on the open-source `Taskflow` library available
8+
at https://github.com/taskflow/taskflow.
9+
10+
## Building `workTaskflowExample`
11+
12+
Run cmake to configure and build the library. With the example command line
13+
below, its headers and shared library will be installed under
14+
`/path/to/install`.
15+
16+
```
17+
cmake --install-prefix /path/to/install -B /path/to/build -S OpenUSD/pxr/extras/usd/examples/workTaskflowExample
18+
cmake --build /path/to/build --target install --config Release
19+
```
20+
21+
By default, the `Taskflow` library will automatically be downloaded and
22+
installed under the same location as `workTaskflowExample` itself. This can
23+
be overridden to use a pre-existing installation by setting the
24+
`Taskflow_DIR` option to point to the directory containing its cmake package
25+
config file.
26+
27+
## Building USD with `workTaskflowExample`
28+
29+
Specify `workTaskflowExample` as the custom task management implementation
30+
by setting the `PXR_WORK_IMPL` option to `workTaskflowExample` when running
31+
cmake. The `workTaskflowExample_DIR` option must also be set to the directory
32+
containing the package config file named `workTaskflowExampleConfig.cmake`
33+
that was installed as part of the build above.
34+
35+
```
36+
cmake -DPXR_WORK_IMPL=workTaskflowExample -DworkTaskflowExample_DIR=/path/to/install/lib64/cmake/workTaskflowExample ...
37+
```
38+
39+
If you are building USD with build_usd.py instead of using cmake directly,
40+
these arguments can be passed through using `--build-args`:
41+
42+
```
43+
build_usd.py --build-args USD,"-DPXR_WORK_IMPL=workTaskflowExample -DworkTaskflowExample_DIR=/path/to/install/lib64/cmake/workTaskflowExample" ...
44+
```
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
//
2+
// Copyright 2025 Pixar
3+
//
4+
// Licensed under the terms set forth in the LICENSE.txt file available at
5+
// https://openusd.org/license.
6+
//
7+
#ifndef PXR_EXTRAS_USD_EXAMPLES_WORK_TASKFLOW_EXAMPLE_API_H
8+
#define PXR_EXTRAS_USD_EXAMPLES_WORK_TASKFLOW_EXAMPLE_API_H
9+
10+
#if defined(_WIN32)
11+
#if defined(WORK_TASKFLOW_EXAMPLE_EXPORT)
12+
#define WORK_TASKFLOW_EXAMPLE_API __declspec(dllexport)
13+
#else
14+
#define WORK_TASKFLOW_EXAMPLE_API __declspec(dllimport)
15+
#endif
16+
#else
17+
#define WORK_TASKFLOW_EXAMPLE_API
18+
#endif
19+
20+
#endif // PXR_EXTRAS_USD_EXAMPLES_WORK_TASKFLOW_EXAMPLE_API_H
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//
2+
// Copyright 2025 Pixar
3+
//
4+
// Licensed under the terms set forth in the LICENSE.txt file available at
5+
// https://openusd.org/license.
6+
//
7+
#include "workTaskflowExample/detachedTask.h"
8+
9+
#include <atomic>
10+
#include <chrono>
11+
#include <thread>
12+
13+
static std::atomic<std::thread *> detachedWaiter { nullptr };
14+
15+
WorkImpl_Dispatcher &
16+
WorkTaskflow_GetDetachedDispatcher()
17+
{
18+
// Deliberately leak this in case there are tasks still using it after we
19+
// exit from main().
20+
static WorkImpl_Dispatcher *theDispatcher = new WorkImpl_Dispatcher;
21+
return *theDispatcher;
22+
}
23+
24+
void
25+
WorkTaskflow_EnsureDetachedTaskProgress()
26+
{
27+
// Check to see if there's a waiter thread already. If not, try to create
28+
// one.
29+
std::thread *c = detachedWaiter.load();
30+
if (!c) {
31+
std::thread *newThread = new std::thread;
32+
if (detachedWaiter.compare_exchange_strong(c, newThread)) {
33+
// We won the race, so start the waiter thread.
34+
WorkImpl_Dispatcher &dispatcher =
35+
WorkTaskflow_GetDetachedDispatcher();
36+
*newThread =
37+
std::thread([&dispatcher]() {
38+
while (true) {
39+
// Process detached tasks.
40+
dispatcher.Wait();
41+
// Now sleep for a bit, and try again.
42+
using namespace std::chrono_literals;
43+
std::this_thread::sleep_for(50ms);
44+
}
45+
});
46+
newThread->detach();
47+
}
48+
else {
49+
// We lost the race, so delete our temporary thread.
50+
delete newThread;
51+
}
52+
}
53+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//
2+
// Copyright 2025 Pixar
3+
//
4+
// Licensed under the terms set forth in the LICENSE.txt file available at
5+
// https://openusd.org/license.
6+
//
7+
#ifndef PXR_EXTRAS_USD_EXAMPLES_WORK_TASKFLOW_EXAMPLE_DETACHED_TASK_H
8+
#define PXR_EXTRAS_USD_EXAMPLES_WORK_TASKFLOW_EXAMPLE_DETACHED_TASK_H
9+
10+
#include "workTaskflowExample/api.h"
11+
#include "workTaskflowExample/dispatcher.h"
12+
13+
WORK_TASKFLOW_EXAMPLE_API
14+
WorkImpl_Dispatcher &
15+
WorkTaskflow_GetDetachedDispatcher();
16+
17+
WORK_TASKFLOW_EXAMPLE_API
18+
void
19+
WorkTaskflow_EnsureDetachedTaskProgress();
20+
21+
template <typename Fn>
22+
void
23+
WorkImpl_RunDetachedTask(Fn &&fn)
24+
{
25+
WorkImpl_Dispatcher &dispatcher = WorkTaskflow_GetDetachedDispatcher();
26+
dispatcher.RunWithGlobal(std::forward<Fn>(fn));
27+
WorkTaskflow_EnsureDetachedTaskProgress();
28+
};
29+
30+
#endif // PXR_EXTRAS_USD_EXAMPLES_WORK_TASKFLOW_EXAMPLE_DETACHED_TASK_H
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//
2+
// Copyright 2025 Pixar
3+
//
4+
// Licensed under the terms set forth in the LICENSE.txt file available at
5+
// https://openusd.org/license.
6+
//
7+
#include "dispatcher.h"
8+
9+
WorkImpl_Dispatcher::WorkImpl_Dispatcher()
10+
{
11+
_jobCount.store(0);
12+
}
13+
14+
WorkImpl_Dispatcher::~WorkImpl_Dispatcher() noexcept
15+
{
16+
Wait();
17+
}
18+
19+
void
20+
WorkImpl_Dispatcher::Wait()
21+
{
22+
if ( WorkImpl_GetConcurrencyLimit() == 1){
23+
_GetGlobalSerialExecutor().run(_serialTasks).wait();
24+
} else {
25+
tf::Executor *e;
26+
WorkTaskflow_Executor * s = WorkTaskflow_LocalStack::Get().head;
27+
if (s == nullptr || s->executor == nullptr) {
28+
e = &_GetGlobalExecutor();
29+
} else {
30+
e = s->executor;
31+
}
32+
33+
// Each dispatcher needs to have its own task grouping, but each
34+
// executor can have multiple tasks from each dispatcher. We cannot call
35+
// tf:::Executor::wait_for_all() since that wouid wait on all the tasks
36+
// assigned to that executor. Instead we manually keep track of jobs and
37+
// spin until they are complete.
38+
tf::Taskflow taskflow;
39+
taskflow.emplace([&](){
40+
while(_jobCount.load() != 0){
41+
std::this_thread::yield();
42+
}
43+
});
44+
45+
e->run(taskflow).wait();
46+
}
47+
}
48+
49+
void
50+
WorkImpl_Dispatcher::Reset()
51+
{
52+
return;
53+
}
54+
55+
void
56+
WorkImpl_Dispatcher::Cancel()
57+
{
58+
return;
59+
}
60+
61+
tf::Executor &
62+
WorkImpl_Dispatcher::_GetGlobalExecutor()
63+
{
64+
// Creating Executors has a non-negible overhead. Limiting to 4 threads
65+
// so we don't try to spawn 31 threads per executor.
66+
static tf::Executor globalExecutor(4);
67+
return globalExecutor;
68+
}
69+
70+
tf::Executor &
71+
WorkImpl_Dispatcher::_GetGlobalSerialExecutor()
72+
{
73+
static tf::Executor globalSerialExecutor(1);
74+
return globalSerialExecutor;
75+
}

0 commit comments

Comments
 (0)