src/ex/thread_pool.cpp

100.0% Lines (119/119) 100.0% List of functions (24/24) 85.2% Branches (46/54)
f(x) Functions (24)
Function Calls Lines Branches Blocks
boost::capy::thread_pool::impl::work::work(std::__n4861::coroutine_handle<void>) :55 0 100.0% boost::capy::thread_pool::impl::work::run() :60 0 100.0% 66.7% boost::capy::thread_pool::impl::work::destroy() :67 0 100.0% 77.8% boost::capy::thread_pool::impl::~impl() :88 0 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :94 0 100.0% 100.0% boost::capy::thread_pool::impl::post(std::__n4861::coroutine_handle<void>) :107 0 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :119 0 100.0% boost::capy::thread_pool::impl::on_work_finished() :125 0 100.0% 100.0% boost::capy::thread_pool::impl::join() :138 0 100.0% 100.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :154 0 100.0% 75.0% boost::capy::thread_pool::impl::stop() :166 0 100.0% boost::capy::thread_pool::impl::ensure_started() :177 0 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :179 0 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :182 0 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :187 0 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :199 0 100.0% 90.0% boost::capy::thread_pool::~thread_pool() :215 0 100.0% 50.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :225 0 100.0% 60.0% boost::capy::thread_pool::join() :233 0 100.0% boost::capy::thread_pool::stop() :240 0 100.0% boost::capy::thread_pool::get_executor() const :249 0 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :257 0 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :264 0 100.0% boost::capy::thread_pool::executor_type::post(std::__n4861::coroutine_handle<void>) const :271 0 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <algorithm>
15 #include <atomic>
16 #include <condition_variable>
17 #include <cstdio>
18 #include <mutex>
19 #include <thread>
20 #include <vector>
21
22 /*
23 Thread pool implementation using a shared work queue.
24
25 Work items are coroutine handles wrapped in intrusive list nodes, stored
26 in a single queue protected by a mutex. Worker threads wait on a
27 condition_variable until work is available or stop is requested.
28
29 Threads are started lazily on first post() via std::call_once to avoid
30 spawning threads for pools that are constructed but never used. Each
31 thread is named with a configurable prefix plus index for debugger
32 visibility.
33
34 Work tracking: on_work_started/on_work_finished maintain an atomic
35 outstanding_work_ counter. join() blocks until this counter reaches
36 zero, then signals workers to stop and joins threads.
37
38 Two shutdown paths:
39 - join(): waits for outstanding work to drain, then stops workers.
40 - stop(): immediately signals workers to exit; queued work is abandoned.
41 - Destructor: stop() then join() (abandon + wait for threads).
42 */
43
44 namespace boost {
45 namespace capy {
46
47 //------------------------------------------------------------------------------
48
49 class thread_pool::impl
50 {
51 struct work : detail::intrusive_queue<work>::node
52 {
53 std::coroutine_handle<> h_;
54
55 810x explicit work(std::coroutine_handle<> h) noexcept
56 810x : h_(h)
57 {
58 810x }
59
60 638x void run()
61 {
62 638x auto h = h_;
63
1/2
✓ Branch 0 taken 638 times.
✗ Branch 1 not taken.
638x delete this;
64
1/1
✓ Branch 1 taken 638 times.
638x h.resume();
65 638x }
66
67 172x void destroy()
68 {
69 172x auto h = h_;
70
1/2
✓ Branch 0 taken 172 times.
✗ Branch 1 not taken.
172x delete this;
71
5/6
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 119 times.
✓ Branch 7 taken 53 times.
✓ Branch 8 taken 119 times.
✓ Branch 9 taken 53 times.
172x if(h && h != std::noop_coroutine())
72
1/1
✓ Branch 1 taken 119 times.
119x h.destroy();
73 172x }
74 };
75
76 std::mutex mutex_;
77 std::condition_variable cv_;
78 detail::intrusive_queue<work> q_;
79 std::vector<std::thread> threads_;
80 std::atomic<std::size_t> outstanding_work_{0};
81 bool stop_{false};
82 bool joined_{false};
83 std::size_t num_threads_;
84 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
85 std::once_flag start_flag_;
86
87 public:
88 152x ~impl()
89 {
90
2/2
✓ Branch 1 taken 172 times.
✓ Branch 2 taken 152 times.
324x while(auto* w = q_.pop())
91 172x w->destroy();
92 152x }
93
94 152x impl(std::size_t num_threads, std::string_view thread_name_prefix)
95 152x : num_threads_(num_threads)
96 {
97
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 150 times.
152x if(num_threads_ == 0)
98 4x num_threads_ = std::max(
99 2x std::thread::hardware_concurrency(), 1u);
100
101 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102
1/1
✓ Branch 1 taken 152 times.
152x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103 152x thread_name_prefix_[n] = '\0';
104 152x }
105
106 void
107 810x post(std::coroutine_handle<> h)
108 {
109 810x ensure_started();
110 810x auto* w = new work(h);
111 {
112
1/1
✓ Branch 1 taken 810 times.
810x std::lock_guard<std::mutex> lock(mutex_);
113 810x q_.push(w);
114 810x }
115 810x cv_.notify_one();
116 810x }
117
118 void
119 340x on_work_started() noexcept
120 {
121 340x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122 340x }
123
124 void
125 340x on_work_finished() noexcept
126 {
127 340x if(outstanding_work_.fetch_sub(
128
2/2
✓ Branch 0 taken 80 times.
✓ Branch 1 taken 260 times.
340x 1, std::memory_order_acq_rel) == 1)
129 {
130 80x std::lock_guard<std::mutex> lock(mutex_);
131
4/4
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 28 times.
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 48 times.
80x if(joined_ && !stop_)
132 4x stop_ = true;
133 80x cv_.notify_all();
134 80x }
135 340x }
136
137 void
138 163x join() noexcept
139 {
140 {
141 163x std::unique_lock<std::mutex> lock(mutex_);
142
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 152 times.
163x if(joined_)
143 11x return;
144 152x joined_ = true;
145
146 152x if(outstanding_work_.load(
147
2/2
✓ Branch 0 taken 102 times.
✓ Branch 1 taken 50 times.
152x std::memory_order_acquire) == 0)
148 {
149 102x stop_ = true;
150 102x cv_.notify_all();
151 }
152 else
153 {
154 50x cv_.wait(lock, [this]{
155 55x return stop_;
156 });
157 }
158 163x }
159
160
2/2
✓ Branch 5 taken 171 times.
✓ Branch 6 taken 152 times.
323x for(auto& t : threads_)
161
1/2
✓ Branch 1 taken 171 times.
✗ Branch 2 not taken.
171x if(t.joinable())
162 171x t.join();
163 }
164
165 void
166 154x stop() noexcept
167 {
168 {
169 154x std::lock_guard<std::mutex> lock(mutex_);
170 154x stop_ = true;
171 154x }
172 154x cv_.notify_all();
173 154x }
174
175 private:
176 void
177 810x ensure_started()
178 {
179
1/1
✓ Branch 1 taken 810 times.
810x std::call_once(start_flag_, [this]{
180 96x threads_.reserve(num_threads_);
181
2/2
✓ Branch 0 taken 171 times.
✓ Branch 1 taken 96 times.
267x for(std::size_t i = 0; i < num_threads_; ++i)
182
1/1
✓ Branch 2 taken 171 times.
342x threads_.emplace_back([this, i]{ run(i); });
183 96x });
184 810x }
185
186 void
187 171x run(std::size_t index)
188 {
189 // Build name; set_current_thread_name truncates to platform limits.
190 char name[16];
191 171x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192 171x set_current_thread_name(name);
193
194 for(;;)
195 {
196 809x work* w = nullptr;
197 {
198
1/1
✓ Branch 1 taken 809 times.
809x std::unique_lock<std::mutex> lock(mutex_);
199
1/1
✓ Branch 1 taken 809 times.
809x cv_.wait(lock, [this]{
200
2/2
✓ Branch 1 taken 286 times.
✓ Branch 2 taken 730 times.
1302x return !q_.empty() ||
201
2/2
✓ Branch 0 taken 79 times.
✓ Branch 1 taken 207 times.
1302x stop_;
202 });
203
2/2
✓ Branch 0 taken 171 times.
✓ Branch 1 taken 638 times.
809x if(stop_)
204 342x return;
205 638x w = q_.pop();
206 809x }
207
1/2
✓ Branch 0 taken 638 times.
✗ Branch 1 not taken.
638x if(w)
208
1/1
✓ Branch 1 taken 638 times.
638x w->run();
209 638x }
210 }
211 };
212
213 //------------------------------------------------------------------------------
214
215 152x thread_pool::
216 ~thread_pool()
217 {
218 152x impl_->stop();
219 152x impl_->join();
220 152x shutdown();
221 152x destroy();
222
1/2
✓ Branch 0 taken 152 times.
✗ Branch 1 not taken.
152x delete impl_;
223 152x }
224
225 152x thread_pool::
226 152x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227
2/4
✓ Branch 2 taken 152 times.
✓ Branch 5 taken 152 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
152x : impl_(new impl(num_threads, thread_name_prefix))
228 {
229
1/1
✓ Branch 1 taken 152 times.
152x this->set_frame_allocator(std::allocator<void>{});
230 152x }
231
232 void
233 11x thread_pool::
234 join() noexcept
235 {
236 11x impl_->join();
237 11x }
238
239 void
240 2x thread_pool::
241 stop() noexcept
242 {
243 2x impl_->stop();
244 2x }
245
246 //------------------------------------------------------------------------------
247
248 thread_pool::executor_type
249 158x thread_pool::
250 get_executor() const noexcept
251 {
252 158x return executor_type(
253 158x const_cast<thread_pool&>(*this));
254 }
255
256 void
257 340x thread_pool::executor_type::
258 on_work_started() const noexcept
259 {
260 340x pool_->impl_->on_work_started();
261 340x }
262
263 void
264 340x thread_pool::executor_type::
265 on_work_finished() const noexcept
266 {
267 340x pool_->impl_->on_work_finished();
268 340x }
269
270 void
271 810x thread_pool::executor_type::
272 post(std::coroutine_handle<> h) const
273 {
274 810x pool_->impl_->post(h);
275 810x }
276
277 } // capy
278 } // boost
279