1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/error.hpp>
16  
#include <boost/capy/error.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/io_result.hpp>
18  
#include <boost/capy/io_result.hpp>
19  

19  

20  
#include <stop_token>
20  
#include <stop_token>
21  

21  

22  
#include <atomic>
22  
#include <atomic>
23  
#include <coroutine>
23  
#include <coroutine>
24  
#include <new>
24  
#include <new>
25  
#include <utility>
25  
#include <utility>
26  

26  

27  
/*  async_event implementation notes
27  
/*  async_event implementation notes
28  
    =================================
28  
    =================================
29  

29  

30  
    Same cancellation pattern as async_mutex (see that file for the
30  
    Same cancellation pattern as async_mutex (see that file for the
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
32  
    and threading assumptions).
32  
    and threading assumptions).
33  

33  

34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
35  
    It pops every waiter from the list and posts the ones it
35  
    It pops every waiter from the list and posts the ones it
36  
    claims. Waiters already claimed by a stop callback are skipped.
36  
    claims. Waiters already claimed by a stop callback are skipped.
37  

37  

38  
    Because set() pops all waiters, a canceled waiter may have been
38  
    Because set() pops all waiters, a canceled waiter may have been
39  
    removed from the list by set() before its await_resume runs.
39  
    removed from the list by set() before its await_resume runs.
40  
    This requires a separate in_list_ flag (unlike async_mutex where
40  
    This requires a separate in_list_ flag (unlike async_mutex where
41  
    active_ served double duty). await_resume only calls remove()
41  
    active_ served double duty). await_resume only calls remove()
42  
    when in_list_ is true.
42  
    when in_list_ is true.
43  
*/
43  
*/
44  

44  

45  
namespace boost {
45  
namespace boost {
46  
namespace capy {
46  
namespace capy {
47  

47  

48  
/** An asynchronous event for coroutines.
48  
/** An asynchronous event for coroutines.
49  

49  

50  
    This event provides a way to notify multiple coroutines that some
50  
    This event provides a way to notify multiple coroutines that some
51  
    condition has occurred. When a coroutine awaits an unset event, it
51  
    condition has occurred. When a coroutine awaits an unset event, it
52  
    suspends and is added to a wait queue. When the event is set, all
52  
    suspends and is added to a wait queue. When the event is set, all
53  
    waiting coroutines are resumed.
53  
    waiting coroutines are resumed.
54  

54  

55  
    @par Cancellation
55  
    @par Cancellation
56  

56  

57  
    When a coroutine is suspended waiting for the event and its stop
57  
    When a coroutine is suspended waiting for the event and its stop
58  
    token is triggered, the waiter completes with `error::canceled`
58  
    token is triggered, the waiter completes with `error::canceled`
59  
    instead of waiting for `set()`.
59  
    instead of waiting for `set()`.
60  

60  

61  
    Cancellation only applies while the coroutine is suspended in the
61  
    Cancellation only applies while the coroutine is suspended in the
62  
    wait queue. If the event is already set when `wait()` is called,
62  
    wait queue. If the event is already set when `wait()` is called,
63  
    the wait completes immediately even if the stop token is already
63  
    the wait completes immediately even if the stop token is already
64  
    signaled.
64  
    signaled.
65  

65  

66  
    @par Zero Allocation
66  
    @par Zero Allocation
67  

67  

68  
    No heap allocation occurs for wait operations.
68  
    No heap allocation occurs for wait operations.
69  

69  

70  
    @par Thread Safety
70  
    @par Thread Safety
71  

71  

72  
    Distinct objects: Safe.@n
72  
    Distinct objects: Safe.@n
73  
    Shared objects: Unsafe.
73  
    Shared objects: Unsafe.
74  

74  

75  
    The event operations are designed for single-threaded use on one
75  
    The event operations are designed for single-threaded use on one
76  
    executor. The stop callback may fire from any thread.
76  
    executor. The stop callback may fire from any thread.
77  

77  

78  
    This type is non-copyable and non-movable because suspended
78  
    This type is non-copyable and non-movable because suspended
79  
    waiters hold intrusive pointers into the event's internal list.
79  
    waiters hold intrusive pointers into the event's internal list.
80  

80  

81  
    @par Example
81  
    @par Example
82  
    @code
82  
    @code
83  
    async_event event;
83  
    async_event event;
84  

84  

85  
    task<> waiter() {
85  
    task<> waiter() {
86  
        auto [ec] = co_await event.wait();
86  
        auto [ec] = co_await event.wait();
87  
        if(ec)
87  
        if(ec)
88  
            co_return;
88  
            co_return;
89  
        // ... event was set ...
89  
        // ... event was set ...
90  
    }
90  
    }
91  

91  

92  
    task<> notifier() {
92  
    task<> notifier() {
93  
        // ... do some work ...
93  
        // ... do some work ...
94  
        event.set();  // Wake all waiters
94  
        event.set();  // Wake all waiters
95  
    }
95  
    }
96  
    @endcode
96  
    @endcode
97  
*/
97  
*/
98  
class async_event
98  
class async_event
99  
{
99  
{
100  
public:
100  
public:
101  
    class wait_awaiter;
101  
    class wait_awaiter;
102  

102  

103  
private:
103  
private:
104  
    bool set_ = false;
104  
    bool set_ = false;
105  
    detail::intrusive_list<wait_awaiter> waiters_;
105  
    detail::intrusive_list<wait_awaiter> waiters_;
106  

106  

107  
public:
107  
public:
108  
    /** Awaiter returned by wait().
108  
    /** Awaiter returned by wait().
109  
    */
109  
    */
110  
    class wait_awaiter
110  
    class wait_awaiter
111  
        : public detail::intrusive_list<wait_awaiter>::node
111  
        : public detail::intrusive_list<wait_awaiter>::node
112  
    {
112  
    {
113  
        friend class async_event;
113  
        friend class async_event;
114  

114  

115  
        async_event* e_;
115  
        async_event* e_;
116  
        std::coroutine_handle<> h_;
116  
        std::coroutine_handle<> h_;
117  
        executor_ref ex_;
117  
        executor_ref ex_;
118  

118  

119  
        // Declared before stop_cb_buf_: the callback
119  
        // Declared before stop_cb_buf_: the callback
120  
        // accesses these members, so they must still be
120  
        // accesses these members, so they must still be
121  
        // alive if the stop_cb_ destructor blocks.
121  
        // alive if the stop_cb_ destructor blocks.
122  
        std::atomic<bool> claimed_{false};
122  
        std::atomic<bool> claimed_{false};
123  
        bool canceled_ = false;
123  
        bool canceled_ = false;
124  
        bool active_ = false;
124  
        bool active_ = false;
125  
        bool in_list_ = false;
125  
        bool in_list_ = false;
126  

126  

127  
        struct cancel_fn
127  
        struct cancel_fn
128  
        {
128  
        {
129  
            wait_awaiter* self_;
129  
            wait_awaiter* self_;
130  

130  

131  
            void operator()() const noexcept
131  
            void operator()() const noexcept
132  
            {
132  
            {
133  
                if(!self_->claimed_.exchange(
133  
                if(!self_->claimed_.exchange(
134  
                    true, std::memory_order_acq_rel))
134  
                    true, std::memory_order_acq_rel))
135  
                {
135  
                {
136  
                    self_->canceled_ = true;
136  
                    self_->canceled_ = true;
137  
                    self_->ex_.post(self_->h_);
137  
                    self_->ex_.post(self_->h_);
138  
                }
138  
                }
139  
            }
139  
            }
140  
        };
140  
        };
141  

141  

142  
        using stop_cb_t =
142  
        using stop_cb_t =
143  
            std::stop_callback<cancel_fn>;
143  
            std::stop_callback<cancel_fn>;
144  

144  

145  
        // Aligned storage for stop_cb_t. Declared last:
145  
        // Aligned storage for stop_cb_t. Declared last:
146  
        // its destructor may block while the callback
146  
        // its destructor may block while the callback
147  
        // accesses the members above.
147  
        // accesses the members above.
148  
        BOOST_CAPY_MSVC_WARNING_PUSH
148  
        BOOST_CAPY_MSVC_WARNING_PUSH
149  
        BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
149  
        BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
150  
        alignas(stop_cb_t)
150  
        alignas(stop_cb_t)
151  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
151  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
152  
        BOOST_CAPY_MSVC_WARNING_POP
152  
        BOOST_CAPY_MSVC_WARNING_POP
153  

153  

154  
        stop_cb_t& stop_cb_() noexcept
154  
        stop_cb_t& stop_cb_() noexcept
155  
        {
155  
        {
156  
            return *reinterpret_cast<stop_cb_t*>(
156  
            return *reinterpret_cast<stop_cb_t*>(
157  
                stop_cb_buf_);
157  
                stop_cb_buf_);
158  
        }
158  
        }
159  

159  

160  
    public:
160  
    public:
161  
        ~wait_awaiter()
161  
        ~wait_awaiter()
162  
        {
162  
        {
163  
            if(active_)
163  
            if(active_)
164  
                stop_cb_().~stop_cb_t();
164  
                stop_cb_().~stop_cb_t();
165  
            if(in_list_)
165  
            if(in_list_)
166  
                e_->waiters_.remove(this);
166  
                e_->waiters_.remove(this);
167  
        }
167  
        }
168  

168  

169  
        explicit wait_awaiter(async_event* e) noexcept
169  
        explicit wait_awaiter(async_event* e) noexcept
170  
            : e_(e)
170  
            : e_(e)
171  
        {
171  
        {
172  
        }
172  
        }
173  

173  

174  
        wait_awaiter(wait_awaiter&& o) noexcept
174  
        wait_awaiter(wait_awaiter&& o) noexcept
175  
            : e_(o.e_)
175  
            : e_(o.e_)
176  
            , h_(o.h_)
176  
            , h_(o.h_)
177  
            , ex_(o.ex_)
177  
            , ex_(o.ex_)
178  
            , claimed_(o.claimed_.load(
178  
            , claimed_(o.claimed_.load(
179  
                std::memory_order_relaxed))
179  
                std::memory_order_relaxed))
180  
            , canceled_(o.canceled_)
180  
            , canceled_(o.canceled_)
181  
            , active_(std::exchange(o.active_, false))
181  
            , active_(std::exchange(o.active_, false))
182  
            , in_list_(std::exchange(o.in_list_, false))
182  
            , in_list_(std::exchange(o.in_list_, false))
183  
        {
183  
        {
184  
        }
184  
        }
185  

185  

186  
        wait_awaiter(wait_awaiter const&) = delete;
186  
        wait_awaiter(wait_awaiter const&) = delete;
187  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
187  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
188  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
188  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
189  

189  

190  
        bool await_ready() const noexcept
190  
        bool await_ready() const noexcept
191  
        {
191  
        {
192  
            return e_->set_;
192  
            return e_->set_;
193  
        }
193  
        }
194  

194  

195  
        /** IoAwaitable protocol overload. */
195  
        /** IoAwaitable protocol overload. */
196  
        std::coroutine_handle<>
196  
        std::coroutine_handle<>
197  
        await_suspend(
197  
        await_suspend(
198  
            std::coroutine_handle<> h,
198  
            std::coroutine_handle<> h,
199  
            io_env const* env) noexcept
199  
            io_env const* env) noexcept
200  
        {
200  
        {
201  
            if(env->stop_token.stop_requested())
201  
            if(env->stop_token.stop_requested())
202  
            {
202  
            {
203  
                canceled_ = true;
203  
                canceled_ = true;
204  
                return h;
204  
                return h;
205  
            }
205  
            }
206  
            h_ = h;
206  
            h_ = h;
207  
            ex_ = env->executor;
207  
            ex_ = env->executor;
208  
            e_->waiters_.push_back(this);
208  
            e_->waiters_.push_back(this);
209  
            in_list_ = true;
209  
            in_list_ = true;
210  
            ::new(stop_cb_buf_) stop_cb_t(
210  
            ::new(stop_cb_buf_) stop_cb_t(
211  
                env->stop_token, cancel_fn{this});
211  
                env->stop_token, cancel_fn{this});
212  
            active_ = true;
212  
            active_ = true;
213  
            return std::noop_coroutine();
213  
            return std::noop_coroutine();
214  
        }
214  
        }
215  

215  

216  
        io_result<> await_resume() noexcept
216  
        io_result<> await_resume() noexcept
217  
        {
217  
        {
218  
            if(active_)
218  
            if(active_)
219  
            {
219  
            {
220  
                stop_cb_().~stop_cb_t();
220  
                stop_cb_().~stop_cb_t();
221  
                active_ = false;
221  
                active_ = false;
222  
            }
222  
            }
223  
            if(canceled_)
223  
            if(canceled_)
224  
            {
224  
            {
225  
                if(in_list_)
225  
                if(in_list_)
226  
                {
226  
                {
227  
                    e_->waiters_.remove(this);
227  
                    e_->waiters_.remove(this);
228  
                    in_list_ = false;
228  
                    in_list_ = false;
229  
                }
229  
                }
230  
                return {make_error_code(
230  
                return {make_error_code(
231  
                    error::canceled)};
231  
                    error::canceled)};
232  
            }
232  
            }
233  
            return {{}};
233  
            return {{}};
234  
        }
234  
        }
235  
    };
235  
    };
236  

236  

237  
    /// Construct an unset event.
237  
    /// Construct an unset event.
238  
    async_event() = default;
238  
    async_event() = default;
239  

239  

240  
    /// Copy constructor (deleted).
240  
    /// Copy constructor (deleted).
241  
    async_event(async_event const&) = delete;
241  
    async_event(async_event const&) = delete;
242  

242  

243  
    /// Copy assignment (deleted).
243  
    /// Copy assignment (deleted).
244  
    async_event& operator=(async_event const&) = delete;
244  
    async_event& operator=(async_event const&) = delete;
245  

245  

246  
    /// Move constructor (deleted).
246  
    /// Move constructor (deleted).
247  
    async_event(async_event&&) = delete;
247  
    async_event(async_event&&) = delete;
248  

248  

249  
    /// Move assignment (deleted).
249  
    /// Move assignment (deleted).
250  
    async_event& operator=(async_event&&) = delete;
250  
    async_event& operator=(async_event&&) = delete;
251  

251  

252  
    /** Returns an awaiter that waits until the event is set.
252  
    /** Returns an awaiter that waits until the event is set.
253  

253  

254  
        If the event is already set, completes immediately.
254  
        If the event is already set, completes immediately.
255  

255  

256  
        @return An awaitable that await-returns `(error_code)`.
256  
        @return An awaitable that await-returns `(error_code)`.
257  
    */
257  
    */
258  
    wait_awaiter wait() noexcept
258  
    wait_awaiter wait() noexcept
259  
    {
259  
    {
260  
        return wait_awaiter{this};
260  
        return wait_awaiter{this};
261  
    }
261  
    }
262  

262  

263  
    /** Sets the event.
263  
    /** Sets the event.
264  

264  

265  
        All waiting coroutines are resumed. Canceled waiters
265  
        All waiting coroutines are resumed. Canceled waiters
266  
        are skipped. Subsequent calls to wait() complete
266  
        are skipped. Subsequent calls to wait() complete
267  
        immediately until clear() is called.
267  
        immediately until clear() is called.
268  
    */
268  
    */
269  
    void set()
269  
    void set()
270  
    {
270  
    {
271  
        set_ = true;
271  
        set_ = true;
272  
        for(;;)
272  
        for(;;)
273  
        {
273  
        {
274  
            auto* w = waiters_.pop_front();
274  
            auto* w = waiters_.pop_front();
275  
            if(!w)
275  
            if(!w)
276  
                break;
276  
                break;
277  
            w->in_list_ = false;
277  
            w->in_list_ = false;
278  
            if(!w->claimed_.exchange(
278  
            if(!w->claimed_.exchange(
279  
                true, std::memory_order_acq_rel))
279  
                true, std::memory_order_acq_rel))
280  
            {
280  
            {
281  
                w->ex_.post(w->h_);
281  
                w->ex_.post(w->h_);
282  
            }
282  
            }
283  
        }
283  
        }
284  
    }
284  
    }
285  

285  

286  
    /** Clears the event.
286  
    /** Clears the event.
287  

287  

288  
        Subsequent calls to wait() will suspend until
288  
        Subsequent calls to wait() will suspend until
289  
        set() is called again.
289  
        set() is called again.
290  
    */
290  
    */
291  
    void clear() noexcept
291  
    void clear() noexcept
292  
    {
292  
    {
293  
        set_ = false;
293  
        set_ = false;
294  
    }
294  
    }
295  

295  

296  
    /** Returns true if the event is currently set.
296  
    /** Returns true if the event is currently set.
297  
    */
297  
    */
298  
    bool is_set() const noexcept
298  
    bool is_set() const noexcept
299  
    {
299  
    {
300  
        return set_;
300  
        return set_;
301  
    }
301  
    }
302  
};
302  
};
303  

303  

304  
} // namespace capy
304  
} // namespace capy
305  
} // namespace boost
305  
} // namespace boost
306  

306  

307  
#endif
307  
#endif