diff --git a/async_simple/Signal.h b/async_simple/Signal.h index 8f5dfd30..bb74b9bc 100644 --- a/async_simple/Signal.h +++ b/async_simple/Signal.h @@ -205,14 +205,17 @@ class Slot { "we dont allow emplace an empty signal handler"); logicAssert(std::popcount(static_cast(type)) == 1, "It's not allow to emplace for multiple signals"); - // trigger-once signal has already been triggered + auto handler = std::make_unique( + std::forward(args)...); + auto oldHandlerPtr = loadHandler(type); + // check trigger-once signal has already been triggered + // if signal has already been triggered, return false if (!detail::SignalSlotSharedState::isMultiTriggerSignal(type) && (signal()->state() & type)) { return false; } - auto handler = std::make_unique( - std::forward(args)...); - auto oldHandlerPtr = loadHandler(type); + // if signal triggered later, we will found it by atomic handler CAS + // failed. auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire); if (oldHandler == &detail::SignalSlotSharedState::HandlerManager::emittedTag) { diff --git a/async_simple/coro/Collect.h b/async_simple/coro/Collect.h index cb224556..41f5487e 100644 --- a/async_simple/coro/Collect.h +++ b/async_simple/coro/Collect.h @@ -166,7 +166,7 @@ struct CollectAnyAwaiter { _slot, [c = continuation, e = event, size = input.size()]( SignalType type, Signal*) mutable { auto count = e->downCount(); - if (count > size + 1) { + if (count == size + 1) { c.resume(); } })) { // has canceled @@ -186,7 +186,7 @@ struct CollectAnyAwaiter { assert(e != nullptr); auto count = e->downCount(); // n+1: n coro + 1 cancel handler - if (count > size + 1) { + if (count == size + 1) { _result = std::make_unique(); _result->_idx = i; _result->_value = std::move(result); @@ -268,7 +268,7 @@ struct CollectAnyVariadicAwaiter { _slot, [c = continuation, e = event](SignalType type, Signal*) mutable { auto count = e->downCount(); - if (count > std::tuple_size() + 1) { + if (count == std::tuple_size() + 1) { c.resume(); } })) { // has canceled @@ -290,7 +290,7 @@ struct CollectAnyVariadicAwaiter { res) mutable { auto count = e->downCount(); // n+1: n coro + 1 cancel handler - if (count > std::tuple_size() + 1) { + if (count == std::tuple_size() + 1) { _result = std::make_unique( std::in_place_index_t(), std::move(res)); if (auto ptr = local->getSlot(); ptr) { @@ -388,7 +388,10 @@ struct CollectAllAwaiter { _slot->chainedSignal(_signal.get()); auto executor = promise_type._executor; - for (size_t i = 0; i < _input.size(); ++i) { + + _event.setAwaitingCoro(continuation); + auto size = _input.size(); + for (size_t i = 0; i < size; ++i) { auto& exec = _input[i]._coro.promise()._executor; if (exec == nullptr) { exec = executor; @@ -422,11 +425,6 @@ struct CollectAllAwaiter { } func(); } - _event.setAwaitingCoro(continuation); - auto awaitingCoro = _event.down(); - if (awaitingCoro) { - awaitingCoro.resume(); - } } inline auto await_resume() { return std::move(_output); } @@ -602,10 +600,6 @@ struct CollectAllVariadicAwaiter { } }(std::get(_inputs), std::get(_results)), ...); - - if (auto awaitingCoro = _event.down(); awaitingCoro) { - awaitingCoro.resume(); - } } void await_suspend(std::coroutine_handle<> continuation) { diff --git a/async_simple/coro/CountEvent.h b/async_simple/coro/CountEvent.h index ad33b378..2dae20da 100644 --- a/async_simple/coro/CountEvent.h +++ b/async_simple/coro/CountEvent.h @@ -34,7 +34,7 @@ namespace detail { // The last 'down' will resume the awaiting coroutine on this event. class CountEvent { public: - CountEvent(size_t count) : _count(count + 1) {} + CountEvent(size_t count) : _count(count) {} CountEvent(const CountEvent&) = delete; CountEvent(CountEvent&& other) : _count(other._count.exchange(0, std::memory_order_relaxed)),