Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

去除一些编译警告、优化listener代码 #152

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ else()
message (" enable_debugger: no")
endif()

option(ENABLE_LISTENER "enable listener" ${ENABLE_DEBUGGER})
if (ENABLE_LISTENER)
set(ENABLE_LISTENER 1)
message (" enable_listener: yes")
else()
set(ENABLE_LISTENER 0)
message (" enable_listener: no")
endif()

option(DISABLE_HOOK "disable hook" OFF)
if (DISABLE_HOOK)
set(ENABLE_HOOK 0)
Expand Down
2 changes: 2 additions & 0 deletions libgo/common/cmake_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

#define ENABLE_DEBUGGER ${ENABLE_DEBUGGER}

#define ENABLE_LISTENER ${ENABLE_LISTENER}

#define ENABLE_HOOK ${ENABLE_HOOK}
45 changes: 37 additions & 8 deletions libgo/debug/listener.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <exception>
#include "../task/task.h"

namespace co
{
Expand All @@ -13,6 +15,20 @@ class Listener
*/
class TaskListener {
public:
/**
* 协程准备初始化、即将被创建的时候被调用,可以进行对协程的任务进行封装或者拦截
* (注意此时并未运行在协程中)
*
* @prarm task_id 协程ID
* @prarm fn 协程任务,可以赋值修改此参数对协程任务进行二次封装
* @param opt 协程创建的参数,可以赋值修改此参数值
*
* @return 返回true,正常创建该任务;返回false,放弃此任务
*/
virtual bool onInit(uint64_t task_id, co::TaskF& fn, co::TaskOpt& opt) noexcept {
return true;
}

/**
* 协程被创建时被调用
* (注意此时并未运行在协程中)
Expand Down Expand Up @@ -88,17 +104,17 @@ class Listener
// s: Scheduler,表示该方法运行在调度器上下文中
// c: Coroutine,表示该方法运行在协程上下文中
//
// -->[c]onCompleted->
// | |
// [s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
// -->[c]onCompleted->
// | |
// [s]onInit-->[s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
};

public:
#if ENABLE_DEBUGGER
#if ENABLE_LISTENER
ALWAYS_INLINE static TaskListener*& GetTaskListener() {
static TaskListener* task_listener = nullptr;
return task_listener;
Expand All @@ -108,6 +124,19 @@ class Listener
GetTaskListener() = listener;
}
#endif

#if ENABLE_LISTENER
#define SAFE_CALL_LISTENER(listener, method, ...) \
do { \
auto* __listener = (listener); \
if (__listener) { \
__listener->method(__VA_ARGS__); \
} \
} while(0)

#else
#define SAFE_CALL_LISTENER(...) do {} while(0)
#endif
};

} // namespace co
18 changes: 9 additions & 9 deletions libgo/netio/unix/hook.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,9 @@ struct hostent* gethostbyname(const char* name)
int & host_errno = CLS(int);

int ret = -1;
while (ret = gethostbyname_r(name, host, &buf[0],
buf.size(), &result, &host_errno) == ERANGE &&
host_errno == NETDB_INTERNAL )
while (( ret = (gethostbyname_r(name, host, &buf[0],
buf.size(), &result, &host_errno) == ERANGE)
&& (host_errno == NETDB_INTERNAL) ) )
{
if (buf.size() < 1024)
buf.resize(1024);
Expand Down Expand Up @@ -568,9 +568,9 @@ struct hostent* gethostbyname2(const char* name, int af)
int & host_errno = CLS(int);

int ret = -1;
while (ret = gethostbyname2_r(name, af, host, &buf[0],
buf.size(), &result, &host_errno) == ERANGE &&
host_errno == NETDB_INTERNAL )
while (( ret = (gethostbyname2_r(name, af, host, &buf[0],
buf.size(), &result, &host_errno) == ERANGE)
&& (host_errno == NETDB_INTERNAL) ))
{
if (buf.size() < 1024)
buf.resize(1024);
Expand Down Expand Up @@ -618,9 +618,9 @@ struct hostent *gethostbyaddr(const void *addr, socklen_t len, int type)
int & host_errno = CLS(int);

int ret = -1;
while (ret = gethostbyaddr_r(addr, len, type,
host, &buf[0], buf.size(), &result, &host_errno) == ERANGE &&
host_errno == NETDB_INTERNAL )
while (( ret = (gethostbyaddr_r(addr, len, type,
host, &buf[0], buf.size(), &result, &host_errno) == ERANGE) &&
(host_errno == NETDB_INTERNAL) ) )
{
if (buf.size() < 1024)
buf.resize(1024);
Expand Down
3 changes: 1 addition & 2 deletions libgo/scheduler/processer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ void Processer::Process()

#if ENABLE_DEBUGGER
DebugPrint(dbg_switch, "enter task(%s)", runningTask_->DebugInfo());
if (Listener::GetTaskListener())
Listener::GetTaskListener()->onSwapIn(runningTask_->id_);
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onSwapIn, runningTask_->id_);

++switchCount_;

Expand Down
7 changes: 2 additions & 5 deletions libgo/scheduler/processer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
#include "../common/clock.h"
#include "../task/task.h"
#include "../common/ts_queue.h"

#if ENABLE_DEBUGGER
#include "../debug/listener.h"
#endif
#include <condition_variable>
#include <mutex>
#include <atomic>
Expand Down Expand Up @@ -179,10 +176,10 @@ ALWAYS_INLINE void Processer::CoYield()

#if ENABLE_DEBUGGER
DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_));
if (Listener::GetTaskListener())
Listener::GetTaskListener()->onSwapOut(tk->id_);
#endif

SAFE_CALL_LISTENER(Listener::GetTaskListener(), onSwapOut, tk->id_);

tk->SwapOut();
}

Expand Down
29 changes: 21 additions & 8 deletions libgo/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "scheduler.h"
#include "../common/error.h"
#include "../common/clock.h"
#include "../debug/listener.h"
#include <stdio.h>
#include <system_error>
#include <unistd.h>
Expand All @@ -11,7 +12,7 @@
namespace co
{

inline atomic_t<unsigned long long> & GetTaskIdFactory()
static inline atomic_t<unsigned long long> & GetTaskIdFactory()
{
static atomic_t<unsigned long long> factory;
return factory;
Expand Down Expand Up @@ -71,22 +72,34 @@ Scheduler::~Scheduler()
Stop();
}

void Scheduler::CreateTask(TaskF const& fn, TaskOpt const& opt)
void Scheduler::CreateTask(TaskF const& _fn, TaskOpt const& _opt)
{
uint64_t id = ++GetTaskIdFactory();

#if ENABLE_LISTENER
TaskF fn = _fn;
TaskOpt opt = _opt;
auto* listener = Listener::GetTaskListener();
if (listener && !listener->onInit(id, fn, opt)) {
return;
}
#else
auto& fn = _fn;
auto& opt = _opt;
#endif

Task* tk = new Task(fn, opt.stack_size_ ? opt.stack_size_ : CoroutineOptions::getInstance().stack_size);

// printf("new tk = %p impl = %p\n", tk, tk->impl_);
tk->SetDeleter(Deleter(&Scheduler::DeleteTask, this));
tk->id_ = ++GetTaskIdFactory();
tk->id_ = id;
TaskRefAffinity(tk) = opt.affinity_;
TaskRefLocation(tk).Init(opt.file_, opt.lineno_);
++taskCount_;

DebugPrint(dbg_task, "task(%s) created in scheduler(%p).", TaskDebugInfo(tk), (void*)this);
#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onCreated(tk->id_);
}
#endif

SAFE_CALL_LISTENER(Listener::GetTaskListener(), onCreated, tk->id_);

AddTask(tk);
}
Expand Down
9 changes: 0 additions & 9 deletions libgo/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@
#include "../common/spinlock.h"
#include "../common/timer.h"
#include "../task/task.h"
#include "../debug/listener.h"
#include "processer.h"
#include <mutex>

namespace co {

struct TaskOpt
{
bool affinity_ = false;
int lineno_ = 0;
std::size_t stack_size_ = 0;
const char* file_ = nullptr;
};

// 协程调度器
// 负责管理1到N个调度线程, 调度从属协程.
// 可以调用Create接口创建更多额外的调度器
Expand Down
29 changes: 7 additions & 22 deletions libgo/task/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,12 @@ const char* GetTaskStateName(TaskState state)
void Task::Run()
{
auto call_fn = [this]() {
#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onStart(this->id_);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onStart, this->id_);

this->fn_();
this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行

#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onCompleted(this->id_);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onCompleted, this->id_);
};

if (CoroutineOptions::getInstance().exception_handle == eCoExHandle::immedaitely_throw) {
Expand All @@ -52,22 +44,14 @@ void Task::Run()
} catch (...) {
this->fn_ = TaskF();

std::exception_ptr eptr = std::current_exception();
this->eptr_ = std::current_exception();
DebugPrint(dbg_exception, "task(%s) catched exception.", DebugInfo());

#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onException(this->id_, eptr);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onException, this->id_, this->eptr_);
}
}

#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onFinished(this->id_);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onFinished, this->id_);

state_ = TaskState::done;
Processer::StaticCoYield();
Expand Down Expand Up @@ -95,7 +79,8 @@ Task::~Task()

const char* Task::DebugInfo()
{
if (reinterpret_cast<void*>(this) == nullptr) return "nil";
char& thiz = *reinterpret_cast<char*>(this);
if (std::addressof(thiz) == nullptr) return "nil";

return TaskDebugInfo(this);
}
Expand Down
8 changes: 8 additions & 0 deletions libgo/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ const char* GetTaskStateName(TaskState state);

typedef std::function<void()> TaskF;

struct TaskOpt
{
bool affinity_ = false;
int lineno_ = 0;
std::size_t stack_size_ = 0;
const char* file_ = nullptr;
};

struct TaskGroupKey {};
typedef Anys<TaskGroupKey> TaskAnys;

Expand Down
41 changes: 31 additions & 10 deletions tutorial/sample12_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,40 @@ using namespace std;
//协程监听器的调用过程:
// s: Scheduler,表示该方法运行在调度器上下文中
// c: Coroutine,表示该方法运行在协程上下文中
// (正常运行完成)
// -->[c]onCompleted->
// | |
// [s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
// (运行时抛出未捕获的异常)
// (正常运行完成)
// -->[c]onCompleted->
// | |
// [s]onInit-->[s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
// (运行时抛出未捕获的异常)
//
//!!注意协程监听器回调方法均不能抛出异常,如果可能有异常抛出,请在回调方法内自行 try...catch 消化掉

//覆盖 co::co_listener 的虚函数实现回调方法
class CoListenerSample: public co::Listener::TaskListener {
public:
/**
* 协程准备初始化、即将被创建的时候被调用,可以进行对协程的任务进行封装或者拦截
* (注意此时并未运行在协程中)
*
* @prarm task_id 协程ID
* @prarm fn 协程任务,可以赋值修改此参数对协程任务进行二次封装
* @param opt 协程创建的参数,可以赋值修改此参数值
*
* @return 返回true,正常创建该任务;返回false,放弃此任务
*/
virtual bool onInit(uint64_t task_id, co::TaskF& fn, co::TaskOpt& opt) noexcept {
cout << "onInit task_id=" << task_id << endl;
fn = [fn]() {
cout << "haha, I'm coming. " << endl;
fn();
};
return true;
}

/**
* 协程被创建时被调用
* (注意此时并未运行在协程中)
Expand Down Expand Up @@ -99,6 +118,8 @@ class CoListenerSample: public co::Listener::TaskListener {
} catch (...) {
cout << "unknow exception." << endl;
}

eptr = nullptr;
}

/**
Expand All @@ -125,7 +146,7 @@ class CoListenerSample: public co::Listener::TaskListener {
};

int main(int argc, char** argv) {
#if ENABLE_DEBUGGER
#if ENABLE_LISTENER
CoListenerSample listener;

//设置协程监听器,如果设置为NULL则为取消监听
Expand Down
Loading