Skip to content

Commit

Permalink
feat: wrapperCreate
Browse files Browse the repository at this point in the history
  • Loading branch information
whybeyoung committed Sep 14, 2022
1 parent 1365bea commit 5135cb6
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 16 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

<!-- markdownlint-restore -->

## 愿景
全领域内让AI能力极速落地到生产应用,让AI模型到服务触手可达;让ASF成为AI推理服务框架的事实标准。

**注意**

本项目为 aiges的配套项目之一,用于支持用户态 Python代码推理服务化。
Expand Down
27 changes: 27 additions & 0 deletions doc/stream_support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# python加载器流式请求支持设计

## 流程时序图

```mermaid
sequenceDiagram
aiges ->> aiges: create sid first
aiges ->> wrapper.so: call Initialize
wrapper.so ->> wrapper.py: call wrapperInit at frist
wrapper.py -->> wrapper.py: create model/engine pool with some capcacity.
loop stream Call stage
aiges ->> wrapper.so: call WrapperCreate with sid
wrapper.so ->> wrapper.py: call Python WrapperCreate implement
wrapper.py ->> wrapper.so: return handle(string)
wrapper.so -->> wrapper.so: maintaint the handle and sid with map structure.
aiges ->> wrapper.so: call C WrapperWrite with handle and sid
wrapper.so ->> wrapper.py: call Python WrapperWrite with handle and sid
wrapper.py -->> wrapper.py: get session by handle
wrapper.py ->> wrapper.so: call Python WrapperRead with handle and sid
end
aiges ->> wrapper.so: call C Fin
wrapper.so ->> wrapper.py: call Python Fin
```


79 changes: 68 additions & 11 deletions pyWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ const char *WrapperFile = "wrapper";
const char *WrapperClass = "Wrapper";
const char *PythonSo = "libpython3.so";

PYBIND11_EMBEDDED_MODULE(aiges_embed, module
) {
std::mutex RECORD_MUTEX;
std::map <std::string, std::string> SID_RECORD;

PYBIND11_EMBEDDED_MODULE(aiges_embed, module) {

py::class_<ResponseData> responseData(module, "ResponseData");
responseData.def(py::init<>())
Expand Down Expand Up @@ -92,7 +94,12 @@ PyWrapper::PyWrapper() {
_wrapperFini = _obj.attr("wrapperFini");
_wrapperOnceExec = _obj.attr("wrapperOnceExec");
_wrapperError = _obj.attr("wrapperError");
// stream support
_wrapperCreate = _obj.attr("wrapperCreate");
_wrapperWrite = _obj.attr("wrapperWrite");
_wrapperRead = _obj.attr("wrapperRead");
_wrapperTest = _obj.attr("wrapperTestFunc");

py::gil_scoped_release release;
StartMonitorWrapperClass(_wrapper_abs);

Expand All @@ -118,6 +125,9 @@ PyWrapper::~PyWrapper() {
_wrapperFini.release();
_wrapperOnceExec.release();
_wrapperTest.release();
_wrapperCreate.release();
_wrapperWrite.release();
_wrapperRead.release();
pybind11::gil_scoped_release release;
}

Expand All @@ -134,6 +144,11 @@ void PyWrapper::ReloadWrapper() {
_wrapperOnceExec = _obj.attr("wrapperOnceExec");
_wrapperError = _obj.attr("wrapperError");
_wrapperTest = _obj.attr("wrapperTestFunc");
// stream support

_wrapperCreate = _obj.attr("wrapperCreate");
_wrapperWrite = _obj.attr("wrapperWrite");
_wrapperRead = _obj.attr("wrapperRead");
pybind11::gil_scoped_release release;
}

Expand Down Expand Up @@ -175,11 +190,11 @@ int PyWrapper::wrapperInit(std::map <std::string, std::string> config) {
return ret;
}
catch (py::cast_error &e) {
spdlog::error("_wrapperInit cast error: {}", e.what());
spdlog::get("stderr_console")->error("_wrapperInit cast error: {}", e.what());
return -1;
}
catch (py::error_already_set &e) {
spdlog::error("_wrapperInit error_already_set error: {}", e.what());
spdlog::get("stderr_console")->error("_wrapperInit error_already_set error: {}", e.what());
return -1;
}
}
Expand All @@ -190,11 +205,11 @@ int PyWrapper::wrapperFini() {
return _wrapperFini().cast<int>();
}
catch (py::cast_error &e) {
spdlog::error("Fini cast error: {}", e.what());
spdlog::get("stderr_console")->error("Fini cast error: {}", e.what());
return -1;
}
catch (py::error_already_set &e) {
spdlog::error("Fini error_already_set error: {}", e.what());
spdlog::get("stderr_console")->error("Fini error_already_set error: {}", e.what());
return -1;
}
}
Expand All @@ -213,13 +228,13 @@ int PyWrapper::wrapperOnceExec(std::map <std::string, std::string> params, DataL
pDataList curPtr;
// 先判断python有没有抛出错误. response中的 errorCode
if (resp->errCode != 0) {
spdlog::error("find error from python: {}", resp->errCode);
spdlog::get("stderr_console")->error("find error from python: {}", resp->errCode);
return resp->errCode;
}

int dataSize = resp->list.size();
if (dataSize == 0) {
spdlog::error("error, not find any data from resp");
spdlog::get("stderr_console")->error("error, not find any data from resp");
return -1;
}
for (int idx = 0; idx < dataSize; idx++) {
Expand All @@ -236,7 +251,7 @@ int PyWrapper::wrapperOnceExec(std::map <std::string, std::string> params, DataL
pr = malloc(itemData.len);
if (pr == nullptr) {
int ret = -1;
spdlog::error("can't malloc memory for data, sid:{}", sid);
spdlog::get("stderr_console")->error("can't malloc memory for data, sid:{}", sid);
return ret;
}
memcpy(pr, itemData.data.data(), itemData.len);
Expand All @@ -258,11 +273,11 @@ int PyWrapper::wrapperOnceExec(std::map <std::string, std::string> params, DataL
*respData = headPtr;
}
catch (py::cast_error &e) {
spdlog::error("cast error: {}", e.what());
spdlog::get("stderr_console")->error("cast error: {}", e.what());
return -1;
}
catch (py::error_already_set &e) {
spdlog::error("error_already_set error: {}", e.what());
spdlog::get("stderr_console")->error("error_already_set error: {}", e.what());
return -1;
}

Expand All @@ -288,6 +303,27 @@ std::string PyWrapper::wrapperError(int err) {

}

std::string
PyWrapper::wrapperCreate(const char *usrTag, std::map <std::string, std::string> params, int *errNum, std::string sid) {
try {
py::gil_scoped_acquire acquire;

std::string handle = _wrapperCreate(params, errNum, sid).cast<std::string>();
if (*errNum != 0) {
spdlog::get("stderr_console")->error("errNum: {}", errNum);
}

}
catch (py::cast_error &e) {
spdlog::error("cast error: {}", e.what());
return e.what();
}
catch (py::error_already_set &e) {
spdlog::error("error_already_set error: {}", e.what());
return e.what();
}

}

int PyWrapper::wrapperTest() {
py::gil_scoped_acquire acquire;
Expand Down Expand Up @@ -327,3 +363,24 @@ int PyWrapper::wrapperTest() {

}


void SetHandleSid(char *handle, std::string sid) {
RECORD_MUTEX.lock();
SID_RECORD[std::string(handle)] = sid;
RECORD_MUTEX.unlock();
}

std::string GetHandleSid(char *handle) {
std::string rlt;
RECORD_MUTEX.lock();
rlt = SID_RECORD[std::string(handle)];
RECORD_MUTEX.unlock();
return rlt;
}

//暂时没用上
void DelHandleSid(char *handle) {
RECORD_MUTEX.lock();

RECORD_MUTEX.unlock();
}
10 changes: 10 additions & 0 deletions pyWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class PyWrapper {

int wrapperFini();

std::string wrapperCreate(const char *usrTag, std::map <std::string, std::string> params, int *errNum, std::string sid);

int wrapperTest();

private:
Expand All @@ -100,9 +102,17 @@ class PyWrapper {
py::object _wrapperFini;
py::object _wrapperOnceExec;
py::object _wrapperError;
py::object _wrapperCreate;
py::object _wrapperWrite;
py::object _wrapperRead;
py::object _wrapperTest;

};

void SetHandleSid(char *handle, std::string sid);

std::string GetHandleSid(char *handle);

void DelHandleSid(char *handle);

#endif
42 changes: 37 additions & 5 deletions wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ void so_deinit(void) {
printf("libwrapper so deinit.\n");
}

static void init_threads()
{
if (!PyEval_ThreadsInitialized())
{
static void init_threads() {
if (!PyEval_ThreadsInitialized()) {
{
py::gil_scoped_acquire acquire;
}
Expand All @@ -56,6 +54,9 @@ void initlog() {
printf("log目录已创建\n");
}
auto file_logger = spdlog::rotating_logger_mt("mspper", wrapperLogFile, 1048576 * 10, 50);
// Console logger
auto console_logger = spdlog::stdout_color_mt("stdout_console");
auto err_logger = spdlog::stderr_color_mt("stderr_console");
spdlog::set_default_logger(file_logger);
spdlog::flush_on(spdlog::level::err);
spdlog::flush_every(std::chrono::seconds(5));
Expand Down Expand Up @@ -129,7 +130,38 @@ int WrapperAPI wrapperUnloadRes(unsigned int resId) {
const void *
WrapperAPI wrapperCreate(const char *usrTag, pParamList params, wrapperCallback cb, unsigned int psrIds[], int psrCnt,
int *errNum) {
return NULL;
std::string sid = "";
for (pParamList sidP = params; sidP != NULL; sidP = sidP->next) {
if (NULL == sidP->key) {
continue;
}
if (std::string("sid") == sidP->key) {
sid = sidP->value;
break;
}
}
spdlog::debug("Crate Session tid is:{},sid:{}", gettid(), sid);
//构建请求参数
std::map <std::string, std::string> pyParams;

for (pParamList p = params; p != NULL; p = p->next) {
if (NULL == p->key) {
continue;
}
pyParams.insert({p->key, p->value});
spdlog::debug("wrapper exec param, key:{},value:{},sid:{}", p->key, p->value, sid);
}

std::string handle = pyWrapper->wrapperCreate(usrTag, pyParams, errNum, sid);
void *handlePtr = handle.c_str();
if (*errNum != 0) {
spdlog::debug("wrapper exec Error, errNum:{}, sid:{}", *errNum, sid);

return NULL;
}
SetHandleSid(handle, sid);
return static_cast<const void *>(handlePtr);

}

int WrapperAPI wrapperWrite(const void *handle, pDataList reqData) {
Expand Down

0 comments on commit 5135cb6

Please sign in to comment.