Skip to content

Commit

Permalink
feat: add ResumeBySchedule interface (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
chloro-pn authored Jan 6, 2024
1 parent d97b636 commit 90f56d9
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 2 deletions.
82 changes: 82 additions & 0 deletions async_simple/coro/ResumeBySchedule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2022, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ASYNC_RESUME_BY_SCHEDULE_H
#define ASYNC_RESUME_BY_SCHEDULE_H

#include "async_simple/Executor.h"
#include "async_simple/Future.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/experimental/coroutine.h"

#include <type_traits>
#include <utility>

namespace async_simple::coro {

namespace detail {

template <typename T>
class FutureResumeByScheduleAwaiter {
public:
FutureResumeByScheduleAwaiter(Future<T>&& f) : _future(std::move(f)) {}

bool await_ready() { return _future.hasResult(); }

template <typename PromiseType>
void await_suspend(std::coroutine_handle<PromiseType> continuation) {
static_assert(std::is_base_of_v<LazyPromiseBase, PromiseType>,
"FutureResumeByScheduleAwaiter is only allowed to be "
"called by Lazy");
Executor* ex = continuation.promise()._executor;
_future.setContinuation([continuation, ex](Try<T>&& t) mutable {
if (ex != nullptr) {
ex->schedule(continuation);
} else {
continuation.resume();
}
});
}

auto await_resume() { return std::move(_future.value()); }

private:
Future<T> _future;
};

template <typename T>
class FutureResumeByScheduleAwaitable {
public:
explicit FutureResumeByScheduleAwaitable(Future<T>&& f)
: _future(std::move(f)) {}

auto coAwait(Executor*) {
return FutureResumeByScheduleAwaiter(std::move(_future));
}

private:
Future<T> _future;
};

} // namespace detail

template <typename T>
inline auto ResumeBySchedule(Future<T>&& future) {
return detail::FutureResumeByScheduleAwaitable<T>(std::move(future));
}

} // namespace async_simple::coro

#endif
137 changes: 137 additions & 0 deletions async_simple/coro/test/ResumeByScheduleTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2022, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/ResumeBySchedule.h"
#include "async_simple/executors/SimpleExecutor.h"

#include "gtest/gtest.h"

#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>

using namespace async_simple::executors;

namespace async_simple {
namespace coro {

class CallBackSystem {
public:
using Func = std::function<void()>;

CallBackSystem() : stop_(false) {
backend_ = std::thread([this]() {
while (true) {
std::vector<Func> tasks;
std::unique_lock<std::mutex> guard(this->mut_);
cv_.wait(guard, [&]() {
return this->tasks_.empty() == false || this->stop_ == true;
});
if (this->tasks_.empty() == true && this->stop_ == true) {
return;
}
tasks.swap(this->tasks_);
guard.unlock();
for (auto&& each : tasks) {
each();
}
}
});
}

void Call(Func func) {
std::unique_lock<std::mutex> guard(this->mut_);
tasks_.push_back(std::move(func));
cv_.notify_one();
}

void Stop() {
std::unique_lock<std::mutex> guard(this->mut_);
if (stop_ == true) {
return;
}
stop_ = true;
cv_.notify_one();
guard.unlock();
backend_.join();
}

private:
std::thread backend_;
std::mutex mut_;
std::condition_variable cv_;
bool stop_;
std::vector<Func> tasks_;
};

class MockExecutorForResumeBySchedule : public SimpleExecutor {
public:
using Base = SimpleExecutor;

explicit MockExecutorForResumeBySchedule(size_t thread_num)
: SimpleExecutor(thread_num), schedule_count_(0), checkin_count_(0) {}

bool schedule(Func func) override {
++schedule_count_;
return Base::schedule(std::move(func));
}

bool checkin(Func func, Base::Context ctx, ScheduleOptions opts) override {
++checkin_count_;
return Base::checkin(std::move(func), ctx, opts);
}

size_t schedule_count_;
size_t checkin_count_;
};

TEST(ResumeBySchedule, basic) {
MockExecutorForResumeBySchedule ex(2);
CallBackSystem cbs;

auto task = [&cbs]() -> Lazy<void> {
Promise<int> pr;
auto fu = pr.getFuture();
cbs.Call([pr = std::move(pr)]() mutable { pr.setValue(1); });
int v = co_await ResumeBySchedule(std::move(fu));
EXPECT_EQ(v, 1);
co_return;
};

std::mutex mut;
std::condition_variable cv;
size_t done_count = 0;

for (size_t i = 0; i < 100; ++i) {
task().via(&ex).start([&](auto&&) {
std::unique_lock guard(mut);
done_count += 1;
cv.notify_one();
});
}

std::unique_lock guard(mut);
cv.wait(guard, [&]() -> bool { return done_count == 100; });
cbs.Stop();
EXPECT_EQ(ex.checkin_count_, 0);
EXPECT_LE(ex.schedule_count_, 200);
}

} // namespace coro
} // namespace async_simple
2 changes: 2 additions & 0 deletions docs/docs.cn/Future.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ auto val = co_await std::move(fut);

如果 Future 已经设置过 Executor,那么此时当前协程何时 Resumption 由该调度器控制。注意这不会更改当前协程的 Executor 环境。

当绑定Executor时,默认情况下`co_await` Future会通过`Executor.checkin`接口恢复执行,用户可以通过`co_await ResumeBySchedule(std::move(future))`来显式指定future通过`Executor.schedule`接口恢复执行。

如果当前协程已设置了调度器,Future 未设置调度器且我们希望由该调度器介入,我们可以通过 `co_await CurrentExecutor{};` 来做到:

```cpp
Expand Down
5 changes: 3 additions & 2 deletions docs/docs.en/Future.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ std::this_thread::sleep_for(std::chrono::milliseconds(500));
auto val = co_await std::move(fut);
```

If the Future has set Executor already, the Executor would decide when will the Lazy to be resumed. Note that it wouldn't change the executor
the Lazy lives in.
If the Future has set Executor already, the Executor would decide when will the Lazy to be resumed. Note that it wouldn't change the executor the Lazy lives in.

When binding Executor, `co_await` Future will resume Lazy through the `Executor.checkin` interface by default, and users can explicitly specify that Future resume Lazy through the `Executor.schedule` interface by `co_await ResumeBySchedule(std::move(future))`.

If the Lazy has set Executor already and we want to set that executor for the Future, we can make it by `co_await CurrentExecutor{};`.

Expand Down

0 comments on commit 90f56d9

Please sign in to comment.