From b9f8137d150c15bfbae9db1f2a09cf9897cb737b Mon Sep 17 00:00:00 2001 From: mlkt <365690226@qq.com> Date: Fri, 15 Mar 2019 20:15:01 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E8=AD=A6=E5=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libgo/netio/unix/hook.cpp | 18 +++++++++--------- libgo/task/task.cpp | 3 ++- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/libgo/netio/unix/hook.cpp b/libgo/netio/unix/hook.cpp index 6e6a935e..ffa7a613 100644 --- a/libgo/netio/unix/hook.cpp +++ b/libgo/netio/unix/hook.cpp @@ -516,9 +516,9 @@ struct hostent* gethostbyname(const char* name) int & host_errno = CLS(int); int ret = -1; - while (ret = gethostbyname_r(name, host, &buf[0], - buf.size(), &result, &host_errno) == ERANGE && - host_errno == NETDB_INTERNAL ) + while (( ret = (gethostbyname_r(name, host, &buf[0], + buf.size(), &result, &host_errno) == ERANGE) + && (host_errno == NETDB_INTERNAL) ) ) { if (buf.size() < 1024) buf.resize(1024); @@ -568,9 +568,9 @@ struct hostent* gethostbyname2(const char* name, int af) int & host_errno = CLS(int); int ret = -1; - while (ret = gethostbyname2_r(name, af, host, &buf[0], - buf.size(), &result, &host_errno) == ERANGE && - host_errno == NETDB_INTERNAL ) + while (( ret = (gethostbyname2_r(name, af, host, &buf[0], + buf.size(), &result, &host_errno) == ERANGE) + && (host_errno == NETDB_INTERNAL) )) { if (buf.size() < 1024) buf.resize(1024); @@ -618,9 +618,9 @@ struct hostent *gethostbyaddr(const void *addr, socklen_t len, int type) int & host_errno = CLS(int); int ret = -1; - while (ret = gethostbyaddr_r(addr, len, type, - host, &buf[0], buf.size(), &result, &host_errno) == ERANGE && - host_errno == NETDB_INTERNAL ) + while (( ret = (gethostbyaddr_r(addr, len, type, + host, &buf[0], buf.size(), &result, &host_errno) == ERANGE) && + (host_errno == NETDB_INTERNAL) ) ) { if (buf.size() < 1024) buf.resize(1024); diff --git a/libgo/task/task.cpp b/libgo/task/task.cpp index 1f9a6c14..af4fe4d6 100644 --- a/libgo/task/task.cpp +++ b/libgo/task/task.cpp @@ -95,7 +95,8 @@ Task::~Task() const char* Task::DebugInfo() { - if (reinterpret_cast(this) == nullptr) return "nil"; + char& thiz = *reinterpret_cast(this); + if (std::addressof(thiz) == nullptr) return "nil"; return TaskDebugInfo(this); } From 109958f0fc3ee38a1d85aafb561734659885ad2b Mon Sep 17 00:00:00 2001 From: mlkt <365690226@qq.com> Date: Fri, 15 Mar 2019 21:06:54 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E7=9B=91=E5=90=AC=EF=BC=8C=E4=BC=98=E5=8C=96listener?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 9 +++++++ libgo/common/cmake_config.h.in | 2 ++ libgo/debug/listener.h | 45 ++++++++++++++++++++++++++++------ libgo/scheduler/processer.cpp | 3 +-- libgo/scheduler/processer.h | 7 ++---- libgo/scheduler/scheduler.cpp | 29 ++++++++++++++++------ libgo/scheduler/scheduler.h | 9 ------- libgo/task/task.cpp | 26 ++++---------------- libgo/task/task.h | 8 ++++++ tutorial/sample12_listener.cpp | 41 +++++++++++++++++++++++-------- tutorial/sample13_cls.cpp | 3 +-- 11 files changed, 117 insertions(+), 65 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3e13e997..e5c2d673 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,6 +24,15 @@ else() message (" enable_debugger: no") endif() +option(ENABLE_LISTENER "enable listener" ${ENABLE_DEBUGGER}) +if (ENABLE_LISTENER) + set(ENABLE_LISTENER 1) + message (" enable_listener: yes") +else() + set(ENABLE_LISTENER 0) + message (" enable_listener: no") +endif() + option(DISABLE_HOOK "disable hook" OFF) if (DISABLE_HOOK) set(ENABLE_HOOK 0) diff --git a/libgo/common/cmake_config.h.in b/libgo/common/cmake_config.h.in index 19f55f88..7253baea 100644 --- a/libgo/common/cmake_config.h.in +++ b/libgo/common/cmake_config.h.in @@ -2,4 +2,6 @@ #define ENABLE_DEBUGGER ${ENABLE_DEBUGGER} +#define ENABLE_LISTENER ${ENABLE_LISTENER} + #define ENABLE_HOOK ${ENABLE_HOOK} diff --git a/libgo/debug/listener.h b/libgo/debug/listener.h index 2b2efe62..77d18418 100644 --- a/libgo/debug/listener.h +++ b/libgo/debug/listener.h @@ -1,5 +1,7 @@ #pragma once + #include +#include "../task/task.h" namespace co { @@ -13,6 +15,20 @@ class Listener */ class TaskListener { public: + /** + * 协程准备初始化、即将被创建的时候被调用,可以进行对协程的任务进行封装或者拦截 + * (注意此时并未运行在协程中) + * + * @prarm task_id 协程ID + * @prarm fn 协程任务,可以赋值修改此参数对协程任务进行二次封装 + * @param opt 协程创建的参数,可以赋值修改此参数值 + * + * @return 返回true,正常创建该任务;返回false,放弃此任务 + */ + virtual bool onInit(uint64_t task_id, co::TaskF& fn, co::TaskOpt& opt) noexcept { + return true; + } + /** * 协程被创建时被调用 * (注意此时并未运行在协程中) @@ -88,17 +104,17 @@ class Listener // s: Scheduler,表示该方法运行在调度器上下文中 // c: Coroutine,表示该方法运行在协程上下文中 // - // -->[c]onCompleted-> - // | | - // [s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut - // |\ | | - // | \<-[s]onSwapIn--V | - // | | - // -->[c]onException-> + // -->[c]onCompleted-> + // | | + // [s]onInit-->[s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut + // |\ | | + // | \<-[s]onSwapIn--V | + // | | + // -->[c]onException-> }; public: -#if ENABLE_DEBUGGER +#if ENABLE_LISTENER ALWAYS_INLINE static TaskListener*& GetTaskListener() { static TaskListener* task_listener = nullptr; return task_listener; @@ -108,6 +124,19 @@ class Listener GetTaskListener() = listener; } #endif + +#if ENABLE_LISTENER +#define SAFE_CALL_LISTENER(listener, method, ...) \ + do { \ + auto* __listener = (listener); \ + if (__listener) { \ + __listener->method(__VA_ARGS__); \ + } \ + } while(0) + +#else +#define SAFE_CALL_LISTENER(...) do {} while(0) +#endif }; } // namespace co diff --git a/libgo/scheduler/processer.cpp b/libgo/scheduler/processer.cpp index cd128484..a1c3e368 100644 --- a/libgo/scheduler/processer.cpp +++ b/libgo/scheduler/processer.cpp @@ -98,9 +98,8 @@ void Processer::Process() #if ENABLE_DEBUGGER DebugPrint(dbg_switch, "enter task(%s)", runningTask_->DebugInfo()); - if (Listener::GetTaskListener()) - Listener::GetTaskListener()->onSwapIn(runningTask_->id_); #endif + SAFE_CALL_LISTENER(Listener::GetTaskListener(), onSwapIn, runningTask_->id_); ++switchCount_; diff --git a/libgo/scheduler/processer.h b/libgo/scheduler/processer.h index d61f5d14..b1e637c2 100644 --- a/libgo/scheduler/processer.h +++ b/libgo/scheduler/processer.h @@ -3,10 +3,7 @@ #include "../common/clock.h" #include "../task/task.h" #include "../common/ts_queue.h" - -#if ENABLE_DEBUGGER #include "../debug/listener.h" -#endif #include #include #include @@ -179,10 +176,10 @@ ALWAYS_INLINE void Processer::CoYield() #if ENABLE_DEBUGGER DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_)); - if (Listener::GetTaskListener()) - Listener::GetTaskListener()->onSwapOut(tk->id_); #endif + SAFE_CALL_LISTENER(Listener::GetTaskListener(), onSwapOut, tk->id_); + tk->SwapOut(); } diff --git a/libgo/scheduler/scheduler.cpp b/libgo/scheduler/scheduler.cpp index f4c87b3a..196d51ea 100644 --- a/libgo/scheduler/scheduler.cpp +++ b/libgo/scheduler/scheduler.cpp @@ -1,6 +1,7 @@ #include "scheduler.h" #include "../common/error.h" #include "../common/clock.h" +#include "../debug/listener.h" #include #include #include @@ -11,7 +12,7 @@ namespace co { -inline atomic_t & GetTaskIdFactory() +static inline atomic_t & GetTaskIdFactory() { static atomic_t factory; return factory; @@ -71,22 +72,34 @@ Scheduler::~Scheduler() Stop(); } -void Scheduler::CreateTask(TaskF const& fn, TaskOpt const& opt) +void Scheduler::CreateTask(TaskF const& _fn, TaskOpt const& _opt) { + uint64_t id = ++GetTaskIdFactory(); + +#if ENABLE_LISTENER + TaskF fn = _fn; + TaskOpt opt = _opt; + auto* listener = Listener::GetTaskListener(); + if (listener && !listener->onInit(id, fn, opt)) { + return; + } +#else + auto& fn = _fn; + auto& opt = _opt; +#endif + Task* tk = new Task(fn, opt.stack_size_ ? opt.stack_size_ : CoroutineOptions::getInstance().stack_size); + // printf("new tk = %p impl = %p\n", tk, tk->impl_); tk->SetDeleter(Deleter(&Scheduler::DeleteTask, this)); - tk->id_ = ++GetTaskIdFactory(); + tk->id_ = id; TaskRefAffinity(tk) = opt.affinity_; TaskRefLocation(tk).Init(opt.file_, opt.lineno_); ++taskCount_; DebugPrint(dbg_task, "task(%s) created in scheduler(%p).", TaskDebugInfo(tk), (void*)this); -#if ENABLE_DEBUGGER - if (Listener::GetTaskListener()) { - Listener::GetTaskListener()->onCreated(tk->id_); - } -#endif + + SAFE_CALL_LISTENER(Listener::GetTaskListener(), onCreated, tk->id_); AddTask(tk); } diff --git a/libgo/scheduler/scheduler.h b/libgo/scheduler/scheduler.h index e4e75416..94923df5 100644 --- a/libgo/scheduler/scheduler.h +++ b/libgo/scheduler/scheduler.h @@ -4,20 +4,11 @@ #include "../common/spinlock.h" #include "../common/timer.h" #include "../task/task.h" -#include "../debug/listener.h" #include "processer.h" #include namespace co { -struct TaskOpt -{ - bool affinity_ = false; - int lineno_ = 0; - std::size_t stack_size_ = 0; - const char* file_ = nullptr; -}; - // 协程调度器 // 负责管理1到N个调度线程, 调度从属协程. // 可以调用Create接口创建更多额外的调度器 diff --git a/libgo/task/task.cpp b/libgo/task/task.cpp index af4fe4d6..5c9aa8cc 100644 --- a/libgo/task/task.cpp +++ b/libgo/task/task.cpp @@ -28,20 +28,12 @@ const char* GetTaskStateName(TaskState state) void Task::Run() { auto call_fn = [this]() { -#if ENABLE_DEBUGGER - if (Listener::GetTaskListener()) { - Listener::GetTaskListener()->onStart(this->id_); - } -#endif + SAFE_CALL_LISTENER(Listener::GetTaskListener(), onStart, this->id_); this->fn_(); this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行 -#if ENABLE_DEBUGGER - if (Listener::GetTaskListener()) { - Listener::GetTaskListener()->onCompleted(this->id_); - } -#endif + SAFE_CALL_LISTENER(Listener::GetTaskListener(), onCompleted, this->id_); }; if (CoroutineOptions::getInstance().exception_handle == eCoExHandle::immedaitely_throw) { @@ -52,22 +44,14 @@ void Task::Run() } catch (...) { this->fn_ = TaskF(); - std::exception_ptr eptr = std::current_exception(); + this->eptr_ = std::current_exception(); DebugPrint(dbg_exception, "task(%s) catched exception.", DebugInfo()); -#if ENABLE_DEBUGGER - if (Listener::GetTaskListener()) { - Listener::GetTaskListener()->onException(this->id_, eptr); - } -#endif + SAFE_CALL_LISTENER(Listener::GetTaskListener(), onException, this->id_, this->eptr_); } } -#if ENABLE_DEBUGGER - if (Listener::GetTaskListener()) { - Listener::GetTaskListener()->onFinished(this->id_); - } -#endif + SAFE_CALL_LISTENER(Listener::GetTaskListener(), onFinished, this->id_); state_ = TaskState::done; Processer::StaticCoYield(); diff --git a/libgo/task/task.h b/libgo/task/task.h index a8b78672..ce2eac2a 100644 --- a/libgo/task/task.h +++ b/libgo/task/task.h @@ -19,6 +19,14 @@ const char* GetTaskStateName(TaskState state); typedef std::function TaskF; +struct TaskOpt +{ + bool affinity_ = false; + int lineno_ = 0; + std::size_t stack_size_ = 0; + const char* file_ = nullptr; +}; + struct TaskGroupKey {}; typedef Anys TaskAnys; diff --git a/tutorial/sample12_listener.cpp b/tutorial/sample12_listener.cpp index d638a480..c092aac3 100644 --- a/tutorial/sample12_listener.cpp +++ b/tutorial/sample12_listener.cpp @@ -13,21 +13,40 @@ using namespace std; //协程监听器的调用过程: // s: Scheduler,表示该方法运行在调度器上下文中 // c: Coroutine,表示该方法运行在协程上下文中 -// (正常运行完成) -// -->[c]onCompleted-> -// | | -// [s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut -// |\ | | -// | \<-[s]onSwapIn--V | -// | | -// -->[c]onException-> -// (运行时抛出未捕获的异常) +// (正常运行完成) +// -->[c]onCompleted-> +// | | +// [s]onInit-->[s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut +// |\ | | +// | \<-[s]onSwapIn--V | +// | | +// -->[c]onException-> +// (运行时抛出未捕获的异常) // //!!注意协程监听器回调方法均不能抛出异常,如果可能有异常抛出,请在回调方法内自行 try...catch 消化掉 //覆盖 co::co_listener 的虚函数实现回调方法 class CoListenerSample: public co::Listener::TaskListener { public: + /** + * 协程准备初始化、即将被创建的时候被调用,可以进行对协程的任务进行封装或者拦截 + * (注意此时并未运行在协程中) + * + * @prarm task_id 协程ID + * @prarm fn 协程任务,可以赋值修改此参数对协程任务进行二次封装 + * @param opt 协程创建的参数,可以赋值修改此参数值 + * + * @return 返回true,正常创建该任务;返回false,放弃此任务 + */ + virtual bool onInit(uint64_t task_id, co::TaskF& fn, co::TaskOpt& opt) noexcept { + cout << "onInit task_id=" << task_id << endl; + fn = [fn]() { + cout << "haha, I'm coming. " << endl; + fn(); + }; + return true; + } + /** * 协程被创建时被调用 * (注意此时并未运行在协程中) @@ -99,6 +118,8 @@ class CoListenerSample: public co::Listener::TaskListener { } catch (...) { cout << "unknow exception." << endl; } + + eptr = nullptr; } /** @@ -125,7 +146,7 @@ class CoListenerSample: public co::Listener::TaskListener { }; int main(int argc, char** argv) { -#if ENABLE_DEBUGGER +#if ENABLE_LISTENER CoListenerSample listener; //设置协程监听器,如果设置为NULL则为取消监听 diff --git a/tutorial/sample13_cls.cpp b/tutorial/sample13_cls.cpp index 2c328b0b..2e7f11b8 100644 --- a/tutorial/sample13_cls.cpp +++ b/tutorial/sample13_cls.cpp @@ -17,8 +17,7 @@ * 块作用域中推荐使用第一种写法, 免掉了一次隐士转换, 更便于使用 * 第一种写法一定注意不要忘记引用符& * - * 全局作用域\类成员变量只能使用第二种写法, 并且会有编译warning, - * 请勿开启-Werror选项! + * 全局作用域\类成员变量只能使用第二种写法 * * co_cls_ref(int)定义了一个可以隐式转换成int&的模板类, * 如果此处不是int而是自定义类, 要访问类的成员或函数,