Skip to content

Commit

Permalink
fix signal dont triggered bug in collectAll & emplace (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Jan 23, 2025
1 parent 05f432e commit 18f3882
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 19 deletions.
11 changes: 7 additions & 4 deletions async_simple/Signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,17 @@ class Slot {
"we dont allow emplace an empty signal handler");
logicAssert(std::popcount(static_cast<uint64_t>(type)) == 1,
"It's not allow to emplace for multiple signals");
// trigger-once signal has already been triggered
auto handler = std::make_unique<detail::SignalSlotSharedState::Handler>(
std::forward<Args>(args)...);
auto oldHandlerPtr = loadHandler<true>(type);
// check trigger-once signal has already been triggered
// if signal has already been triggered, return false
if (!detail::SignalSlotSharedState::isMultiTriggerSignal(type) &&
(signal()->state() & type)) {
return false;
}
auto handler = std::make_unique<detail::SignalSlotSharedState::Handler>(
std::forward<Args>(args)...);
auto oldHandlerPtr = loadHandler<true>(type);
// if signal triggered later, we will found it by atomic handler CAS
// failed.
auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire);
if (oldHandler ==
&detail::SignalSlotSharedState::HandlerManager::emittedTag) {
Expand Down
22 changes: 8 additions & 14 deletions async_simple/coro/Collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ struct CollectAnyAwaiter {
_slot, [c = continuation, e = event, size = input.size()](
SignalType type, Signal*) mutable {
auto count = e->downCount();
if (count > size + 1) {
if (count == size + 1) {
c.resume();
}
})) { // has canceled
Expand All @@ -186,7 +186,7 @@ struct CollectAnyAwaiter {
assert(e != nullptr);
auto count = e->downCount();
// n+1: n coro + 1 cancel handler
if (count > size + 1) {
if (count == size + 1) {
_result = std::make_unique<ResultType>();
_result->_idx = i;
_result->_value = std::move(result);
Expand Down Expand Up @@ -268,7 +268,7 @@ struct CollectAnyVariadicAwaiter {
_slot, [c = continuation, e = event](SignalType type,
Signal*) mutable {
auto count = e->downCount();
if (count > std::tuple_size<InputType>() + 1) {
if (count == std::tuple_size<InputType>() + 1) {
c.resume();
}
})) { // has canceled
Expand All @@ -290,7 +290,7 @@ struct CollectAnyVariadicAwaiter {
res) mutable {
auto count = e->downCount();
// n+1: n coro + 1 cancel handler
if (count > std::tuple_size<InputType>() + 1) {
if (count == std::tuple_size<InputType>() + 1) {
_result = std::make_unique<ResultType>(
std::in_place_index_t<index>(), std::move(res));
if (auto ptr = local->getSlot(); ptr) {
Expand Down Expand Up @@ -388,7 +388,10 @@ struct CollectAllAwaiter {
_slot->chainedSignal(_signal.get());

auto executor = promise_type._executor;
for (size_t i = 0; i < _input.size(); ++i) {

_event.setAwaitingCoro(continuation);
auto size = _input.size();
for (size_t i = 0; i < size; ++i) {
auto& exec = _input[i]._coro.promise()._executor;
if (exec == nullptr) {
exec = executor;
Expand Down Expand Up @@ -422,11 +425,6 @@ struct CollectAllAwaiter {
}
func();
}
_event.setAwaitingCoro(continuation);
auto awaitingCoro = _event.down();
if (awaitingCoro) {
awaitingCoro.resume();
}
}
inline auto await_resume() { return std::move(_output); }

Expand Down Expand Up @@ -602,10 +600,6 @@ struct CollectAllVariadicAwaiter {
}
}(std::get<index>(_inputs), std::get<index>(_results)),
...);

if (auto awaitingCoro = _event.down(); awaitingCoro) {
awaitingCoro.resume();
}
}

void await_suspend(std::coroutine_handle<> continuation) {
Expand Down
2 changes: 1 addition & 1 deletion async_simple/coro/CountEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace detail {
// The last 'down' will resume the awaiting coroutine on this event.
class CountEvent {
public:
CountEvent(size_t count) : _count(count + 1) {}
CountEvent(size_t count) : _count(count) {}
CountEvent(const CountEvent&) = delete;
CountEvent(CountEvent&& other)
: _count(other._count.exchange(0, std::memory_order_relaxed)),
Expand Down

0 comments on commit 18f3882

Please sign in to comment.