From 5135cb605697b7ad73bd6994ea2183d3bb05a609 Mon Sep 17 00:00:00 2001 From: ybyang Date: Wed, 14 Sep 2022 10:59:33 +0800 Subject: [PATCH] feat: wrapperCreate --- README.md | 3 ++ doc/stream_support.md | 27 +++++++++++++++ pyWrapper.cpp | 79 +++++++++++++++++++++++++++++++++++++------ pyWrapper.h | 10 ++++++ wrapper.cpp | 42 ++++++++++++++++++++--- 5 files changed, 145 insertions(+), 16 deletions(-) create mode 100644 doc/stream_support.md diff --git a/README.md b/README.md index 5698c21a..2b86354f 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,9 @@ +## 愿景 +全领域内让AI能力极速落地到生产应用,让AI模型到服务触手可达;让ASF成为AI推理服务框架的事实标准。 + **注意** 本项目为 aiges的配套项目之一,用于支持用户态 Python代码推理服务化。 diff --git a/doc/stream_support.md b/doc/stream_support.md new file mode 100644 index 00000000..fcb78466 --- /dev/null +++ b/doc/stream_support.md @@ -0,0 +1,27 @@ +# python加载器流式请求支持设计 + +## 流程时序图 + +```mermaid +sequenceDiagram + + +aiges ->> aiges: create sid first +aiges ->> wrapper.so: call Initialize +wrapper.so ->> wrapper.py: call wrapperInit at frist +wrapper.py -->> wrapper.py: create model/engine pool with some capcacity. +loop stream Call stage +aiges ->> wrapper.so: call WrapperCreate with sid +wrapper.so ->> wrapper.py: call Python WrapperCreate implement +wrapper.py ->> wrapper.so: return handle(string) +wrapper.so -->> wrapper.so: maintaint the handle and sid with map structure. +aiges ->> wrapper.so: call C WrapperWrite with handle and sid +wrapper.so ->> wrapper.py: call Python WrapperWrite with handle and sid +wrapper.py -->> wrapper.py: get session by handle +wrapper.py ->> wrapper.so: call Python WrapperRead with handle and sid +end +aiges ->> wrapper.so: call C Fin +wrapper.so ->> wrapper.py: call Python Fin +``` + + diff --git a/pyWrapper.cpp b/pyWrapper.cpp index 7053477a..de62ba14 100644 --- a/pyWrapper.cpp +++ b/pyWrapper.cpp @@ -12,8 +12,10 @@ const char *WrapperFile = "wrapper"; const char *WrapperClass = "Wrapper"; const char *PythonSo = "libpython3.so"; -PYBIND11_EMBEDDED_MODULE(aiges_embed, module -) { +std::mutex RECORD_MUTEX; +std::map SID_RECORD; + +PYBIND11_EMBEDDED_MODULE(aiges_embed, module) { py::class_ responseData(module, "ResponseData"); responseData.def(py::init<>()) @@ -92,7 +94,12 @@ PyWrapper::PyWrapper() { _wrapperFini = _obj.attr("wrapperFini"); _wrapperOnceExec = _obj.attr("wrapperOnceExec"); _wrapperError = _obj.attr("wrapperError"); + // stream support + _wrapperCreate = _obj.attr("wrapperCreate"); + _wrapperWrite = _obj.attr("wrapperWrite"); + _wrapperRead = _obj.attr("wrapperRead"); _wrapperTest = _obj.attr("wrapperTestFunc"); + py::gil_scoped_release release; StartMonitorWrapperClass(_wrapper_abs); @@ -118,6 +125,9 @@ PyWrapper::~PyWrapper() { _wrapperFini.release(); _wrapperOnceExec.release(); _wrapperTest.release(); + _wrapperCreate.release(); + _wrapperWrite.release(); + _wrapperRead.release(); pybind11::gil_scoped_release release; } @@ -134,6 +144,11 @@ void PyWrapper::ReloadWrapper() { _wrapperOnceExec = _obj.attr("wrapperOnceExec"); _wrapperError = _obj.attr("wrapperError"); _wrapperTest = _obj.attr("wrapperTestFunc"); + // stream support + + _wrapperCreate = _obj.attr("wrapperCreate"); + _wrapperWrite = _obj.attr("wrapperWrite"); + _wrapperRead = _obj.attr("wrapperRead"); pybind11::gil_scoped_release release; } @@ -175,11 +190,11 @@ int PyWrapper::wrapperInit(std::map config) { return ret; } catch (py::cast_error &e) { - spdlog::error("_wrapperInit cast error: {}", e.what()); + spdlog::get("stderr_console")->error("_wrapperInit cast error: {}", e.what()); return -1; } catch (py::error_already_set &e) { - spdlog::error("_wrapperInit error_already_set error: {}", e.what()); + spdlog::get("stderr_console")->error("_wrapperInit error_already_set error: {}", e.what()); return -1; } } @@ -190,11 +205,11 @@ int PyWrapper::wrapperFini() { return _wrapperFini().cast(); } catch (py::cast_error &e) { - spdlog::error("Fini cast error: {}", e.what()); + spdlog::get("stderr_console")->error("Fini cast error: {}", e.what()); return -1; } catch (py::error_already_set &e) { - spdlog::error("Fini error_already_set error: {}", e.what()); + spdlog::get("stderr_console")->error("Fini error_already_set error: {}", e.what()); return -1; } } @@ -213,13 +228,13 @@ int PyWrapper::wrapperOnceExec(std::map params, DataL pDataList curPtr; // 先判断python有没有抛出错误. response中的 errorCode if (resp->errCode != 0) { - spdlog::error("find error from python: {}", resp->errCode); + spdlog::get("stderr_console")->error("find error from python: {}", resp->errCode); return resp->errCode; } int dataSize = resp->list.size(); if (dataSize == 0) { - spdlog::error("error, not find any data from resp"); + spdlog::get("stderr_console")->error("error, not find any data from resp"); return -1; } for (int idx = 0; idx < dataSize; idx++) { @@ -236,7 +251,7 @@ int PyWrapper::wrapperOnceExec(std::map params, DataL pr = malloc(itemData.len); if (pr == nullptr) { int ret = -1; - spdlog::error("can't malloc memory for data, sid:{}", sid); + spdlog::get("stderr_console")->error("can't malloc memory for data, sid:{}", sid); return ret; } memcpy(pr, itemData.data.data(), itemData.len); @@ -258,11 +273,11 @@ int PyWrapper::wrapperOnceExec(std::map params, DataL *respData = headPtr; } catch (py::cast_error &e) { - spdlog::error("cast error: {}", e.what()); + spdlog::get("stderr_console")->error("cast error: {}", e.what()); return -1; } catch (py::error_already_set &e) { - spdlog::error("error_already_set error: {}", e.what()); + spdlog::get("stderr_console")->error("error_already_set error: {}", e.what()); return -1; } @@ -288,6 +303,27 @@ std::string PyWrapper::wrapperError(int err) { } +std::string +PyWrapper::wrapperCreate(const char *usrTag, std::map params, int *errNum, std::string sid) { + try { + py::gil_scoped_acquire acquire; + + std::string handle = _wrapperCreate(params, errNum, sid).cast(); + if (*errNum != 0) { + spdlog::get("stderr_console")->error("errNum: {}", errNum); + } + + } + catch (py::cast_error &e) { + spdlog::error("cast error: {}", e.what()); + return e.what(); + } + catch (py::error_already_set &e) { + spdlog::error("error_already_set error: {}", e.what()); + return e.what(); + } + +} int PyWrapper::wrapperTest() { py::gil_scoped_acquire acquire; @@ -327,3 +363,24 @@ int PyWrapper::wrapperTest() { } + +void SetHandleSid(char *handle, std::string sid) { + RECORD_MUTEX.lock(); + SID_RECORD[std::string(handle)] = sid; + RECORD_MUTEX.unlock(); +} + +std::string GetHandleSid(char *handle) { + std::string rlt; + RECORD_MUTEX.lock(); + rlt = SID_RECORD[std::string(handle)]; + RECORD_MUTEX.unlock(); + return rlt; +} + +//暂时没用上 +void DelHandleSid(char *handle) { + RECORD_MUTEX.lock(); + + RECORD_MUTEX.unlock(); +} diff --git a/pyWrapper.h b/pyWrapper.h index 422a225c..efd225ad 100644 --- a/pyWrapper.h +++ b/pyWrapper.h @@ -90,6 +90,8 @@ class PyWrapper { int wrapperFini(); + std::string wrapperCreate(const char *usrTag, std::map params, int *errNum, std::string sid); + int wrapperTest(); private: @@ -100,9 +102,17 @@ class PyWrapper { py::object _wrapperFini; py::object _wrapperOnceExec; py::object _wrapperError; + py::object _wrapperCreate; + py::object _wrapperWrite; + py::object _wrapperRead; py::object _wrapperTest; }; +void SetHandleSid(char *handle, std::string sid); + +std::string GetHandleSid(char *handle); + +void DelHandleSid(char *handle); #endif diff --git a/wrapper.cpp b/wrapper.cpp index 50c06e5e..338ef3b6 100644 --- a/wrapper.cpp +++ b/wrapper.cpp @@ -29,10 +29,8 @@ void so_deinit(void) { printf("libwrapper so deinit.\n"); } -static void init_threads() -{ - if (!PyEval_ThreadsInitialized()) - { +static void init_threads() { + if (!PyEval_ThreadsInitialized()) { { py::gil_scoped_acquire acquire; } @@ -56,6 +54,9 @@ void initlog() { printf("log目录已创建\n"); } auto file_logger = spdlog::rotating_logger_mt("mspper", wrapperLogFile, 1048576 * 10, 50); + // Console logger + auto console_logger = spdlog::stdout_color_mt("stdout_console"); + auto err_logger = spdlog::stderr_color_mt("stderr_console"); spdlog::set_default_logger(file_logger); spdlog::flush_on(spdlog::level::err); spdlog::flush_every(std::chrono::seconds(5)); @@ -129,7 +130,38 @@ int WrapperAPI wrapperUnloadRes(unsigned int resId) { const void * WrapperAPI wrapperCreate(const char *usrTag, pParamList params, wrapperCallback cb, unsigned int psrIds[], int psrCnt, int *errNum) { - return NULL; + std::string sid = ""; + for (pParamList sidP = params; sidP != NULL; sidP = sidP->next) { + if (NULL == sidP->key) { + continue; + } + if (std::string("sid") == sidP->key) { + sid = sidP->value; + break; + } + } + spdlog::debug("Crate Session tid is:{},sid:{}", gettid(), sid); + //构建请求参数 + std::map pyParams; + + for (pParamList p = params; p != NULL; p = p->next) { + if (NULL == p->key) { + continue; + } + pyParams.insert({p->key, p->value}); + spdlog::debug("wrapper exec param, key:{},value:{},sid:{}", p->key, p->value, sid); + } + + std::string handle = pyWrapper->wrapperCreate(usrTag, pyParams, errNum, sid); + void *handlePtr = handle.c_str(); + if (*errNum != 0) { + spdlog::debug("wrapper exec Error, errNum:{}, sid:{}", *errNum, sid); + + return NULL; + } + SetHandleSid(handle, sid); + return static_cast(handlePtr); + } int WrapperAPI wrapperWrite(const void *handle, pDataList reqData) {