From 45b1cd9e04d16fabc8eb3a40f1601161f9e8951e Mon Sep 17 00:00:00 2001 From: Kai-Zhang Date: Thu, 10 Sep 2015 20:34:38 +0800 Subject: [PATCH 1/6] Add python 2.x sdk interface using ctypes --- Makefile | 15 +++- sdk/ins_sdk.py | 216 +++++++++++++++++++++++++++++++++++++++++++++ sdk/ins_wrapper.cc | 210 +++++++++++++++++++++++++++++++++++++++++++ ubuntu_build.sh | 2 +- 4 files changed, 441 insertions(+), 2 deletions(-) create mode 100755 sdk/ins_sdk.py create mode 100755 sdk/ins_wrapper.cc diff --git a/Makefile b/Makefile index 610e9db..2fe7bc8 100755 --- a/Makefile +++ b/Makefile @@ -28,6 +28,13 @@ LDFLAGS = -L$(PREFIX)/lib -L$(PROTOBUF_PATH)/lib \ -L$(LEVELDB_PATH)/lib -lleveldb \ -lz -lpthread +LDFLAGS_SO = -L$(PREFIX)/lib -L$(PROTOBUF_PATH)/lib \ + -L$(PBRPC_PATH)/lib -Wl,--whole-archive -lsofa-pbrpc -lprotobuf -Wl,--no-whole-archive \ + -L$(SNAPPY_PATH)/lib -lsnappy \ + -L$(GFLAGS_PATH)/lib -Wl,--whole-archive -lgflags -Wl,--no-whole-archive \ + -L$(LEVELDB_PATH)/lib -lleveldb \ + -lz -lpthread + CXXFLAGS += $(OPT) PROTO_FILE = $(wildcard proto/*.proto) @@ -61,6 +68,7 @@ TEST_OBJ = $(patsubst %.cc, %.o, $(TEST_SRC)) TESTS = test_binlog test_storage_manager test_user_manager BIN = ins ins_cli sample LIB = libins_sdk.a +PY_LIB = libins_py.so all: $(BIN) cp $(LIB) @@ -90,7 +98,7 @@ $(LIB): $(SDK_OBJ) $(PROTOC) --proto_path=./proto/ --proto_path=/usr/local/include --cpp_out=./proto/ $< clean: - rm -rf $(BIN) $(LIB) $(TESTS) + rm -rf $(BIN) $(LIB) $(TESTS) $(PY_LIB) rm -rf $(INS_OBJ) $(INS_CLI_OBJ) $(SAMPLE_OBJ) $(SDK_OBJ) $(TEST_OBJ) $(UTIL_OBJ) rm -rf $(PROTO_SRC) $(PROTO_HEADER) rm -rf output/ @@ -110,6 +118,11 @@ sdk: $(LIB) mkdir -p output/lib cp sdk/ins_sdk.h output/include cp libins_sdk.a output/lib + +python: $(SDK_OBJ) sdk/ins_wrapper.o + $(CXX) -shared -fPIC -Wl,-soname,$(PY_LIB) -o $(PY_LIB) $(LDFLAGS_SO) $^ + mkdir -p output/python + cp $(PY_LIB) sdk/ins_sdk.py output/python install: $(LIB) cp sdk/ins_sdk.h $(PREFIX)/include diff --git a/sdk/ins_sdk.py b/sdk/ins_sdk.py new file mode 100755 index 0000000..2488e5b --- /dev/null +++ b/sdk/ins_sdk.py @@ -0,0 +1,216 @@ +#!/usr/bin/python + +from ctypes import CDLL, byref, CFUNCTYPE, POINTER, Structure +from ctypes import c_void_p, c_char_p, c_long, c_int, c_bool, py_object +from collections import Iterable +import threading +import time + +SDKError = ('OK', 'ClusterDown', 'NoSuchKey', 'Timeout', 'LockFail', + 'CleanBinlogFail', 'UserExists', 'PermissionDenied', 'PasswordError', + 'UnknownUser') +NodeStatus = ('Leader', 'Candidate', 'Follower', 'Offline') +ClusterInfo = ('server_id', 'status', 'term', 'last_log_index', 'last_log_term', + 'commit_index', 'last_applied') + +class InsSDK: + class _ClusterNodeInfo(Structure): + _fields_ = [('server_id', c_char_p), + ('status', c_int), + ('term', c_long), + ('last_log_index', c_long), + ('last_log_term', c_long), + ('commit_index', c_long), + ('last_applied', c_long)] + class _WatchParam(Structure): + _fields_ = [('key', c_char_p), + ('value', c_char_p), + ('deleted', c_bool), + ('context', c_void_p)] + + def __init__(self, members): + if isinstance(members, basestring): + self.sdk = _ins.GetSDK(members) + else: + self.sdk = _ins.GetSDKFromArray(members) + self.local = threading.local() + self.local.errno = c_int() + + def __del__(self): + if _ins != None: + _ins.DeleteSDK(self.sdk) + + def error(self): + return SDKError[self.local.errno] + + def show(self): + count = c_int() + cluster_ptr = _ins.SDKShowCluster(self.sdk, byref(count)) + count = count.value + ClusterArray = self._ClusterNodeInfo * count + clusters = ClusterArray.from_address(cluster_ptr) + cluster_list = [] + for i in xrange(count): + cluster_list.append({ + 'server_id' : str(clusters[i].server_id), + 'status' : NodeStatus[clusters[i].status], + 'term' : clusters[i].term, + 'last_log_index' : clusters[i].last_log_index, + 'last_log_term' : clusters[i].last_log_term, + 'commit_index' : clusters[i].commit_index, + 'last_applied' : clusters[i].last_applied + }) + _ins.DeleteClusterArray(cluster_ptr) + return cluster_list + + def get(self, key): + return _ins.SDKGet(self.sdk, key, byref(self.local.errno)) + + def put(self, key, value): + return _ins.SDKPut(self.sdk, key, value, byref(self.local.errno)) + + def delete(self, key): + return _ins.SDKDelete(self.sdk, key, byref(self.local.errno)) + + def scan(self, start_key, end_key): + self.local.errno = c_int(0) + return ScanResult(_ins.SDKScan(self.sdk, start_key, end_key)) + + # TODO unable to transfer the function pointer + def watch(self, key, callback, context): + WatchCallback = CFUNCTYPE(self._WatchParam, c_int) + ctx = py_object(context) + return _ins.SDKWatch(self.sdk, key, WatchCallback(callback), ctx, byref(self.local.errno)) + + def lock(self, key): + return _ins.SDKLock(self.sdk, key, byref(self.local.errno)) + + def trylock(self, key): + return _ins.SDKTryLock(self.sdk, key, byref(self.local.errno)) + + def unlock(self, key): + return _ins.SDKUnLock(self.sdk, key, byref(self.local.errno)) + + def login(self, username, password): + return _ins.SDKLogin(self.sdk, username, password, byref(self.local.errno)) + + def logout(self): + return _ins.SDKLogout(self.sdk, byref(self.local.errno)) + + def register(self, username, password): + return _ins.SDKRegister(self.sdk, username, password, byref(self.local.errno)) + + def get_session_id(self): + return _ins.SDKGetSessionID(self.sdk) + + def get_current_user_id(self): + return _ins.SDKGetCurrentUserID(self.sdk) + + def is_logged_in(self): + return _ins.SDKIsLoggedIn(self.sdk) + + def register_session_timeout(self, callback, context): + SessionTimeoutCallback = CFUNCTYPE(c_void_p) + ctx = py_object(context) + _ins.SDKRegisterSessionTimeout(self.sdk, SessionTimeoutCallback(callback), byref(ctx)) + +# <-- InsSDK class definition ends here + +class ScanResult: + def __init__(self, res_ptr): + self.scanner = res_ptr + + def __del__(self): + if _ins != None: + _ins.DeleteScanResult(self.scanner) + + def __iter__(self): + return self + + def done(self): + return _ins.ScanResultDone(self.scanner) + + def error(self): + return SDKError[_ins.ScanResultError(self.scanner)] + + def key(self): + return _ins.ScanResultKey(self.scanner) + + def value(self): + return _ins.ScanResultValue(self.scanner) + + def pair(self): + return self.key(), self.value() + + def next(self): + if self.done(): + raise StopIteration() + else: + key, value = self.pair() + _ins.ScanResultNext(self.scanner) + return key, value + +# <-- ScanResult class definition ends here + +_ins = CDLL('./libins_py.so') +def _set_function_sign(): + _ins.SDKShowCluster.argtypes = [c_void_p, POINTER(c_int)] + _ins.SDKShowCluster.restype = c_void_p + _ins.SDKGet.argtypes = [c_void_p, c_char_p, POINTER(c_int)] + _ins.SDKGet.restype = c_char_p + _ins.SDKPut.argtypes = [c_void_p, c_char_p, c_char_p, POINTER(c_int)] + _ins.SDKPut.restype = c_bool + _ins.SDKDelete.argtypes = [c_void_p, c_char_p, POINTER(c_int)] + _ins.SDKDelete.restype = c_bool + _ins.SDKScan.argtypes = [c_void_p, c_char_p, c_char_p] + _ins.SDKScan.restype = c_void_p + _ins.SDKWatch.argtypes = [c_void_p, c_char_p, c_char_p, c_void_p] + _ins.SDKWatch.restype = c_bool + _ins.SDKLock.argtypes = [c_void_p, c_char_p, POINTER(c_int)] + _ins.SDKLock.restype = c_bool + _ins.SDKTryLock.argtypes = [c_void_p, c_char_p, POINTER(c_int)] + _ins.SDKTryLock.restype = c_bool + _ins.SDKUnLock.argtypes = [c_void_p, c_char_p, POINTER(c_int)] + _ins.SDKUnLock.restype = c_bool + _ins.SDKLogin.argtypes = [c_void_p, c_char_p, c_char_p, POINTER(c_int)] + _ins.SDKLogin.restype = c_bool + _ins.SDKLogout.argtypes = [c_void_p, POINTER(c_int)] + _ins.SDKLogout.restype = c_bool + _ins.SDKRegister.argtypes = [c_void_p, c_char_p, c_char_p, POINTER(c_int)] + _ins.SDKRegister.restype = c_bool + _ins.SDKGetSessionID.argtypes = [c_void_p] + _ins.SDKGetSessionID.restype = c_char_p + _ins.SDKGetCurrentUserID.argtypes = [c_void_p] + _ins.SDKGetCurrentUserID.restype = c_char_p + _ins.SDKIsLoggedIn.argtypes = [c_void_p] + _ins.SDKIsLoggedIn.restype = c_bool + _ins.SDKRegisterSessionTimeout.argtypes = [c_void_p, CFUNCTYPE(c_void_p), c_void_p] + _ins.ScanResultDone.argtypes = [c_void_p] + _ins.ScanResultDone.restype = c_bool + _ins.ScanResultError.argtypes = [c_void_p] + _ins.ScanResultError.restype = c_int + _ins.ScanResultKey.argtypes = [c_void_p] + _ins.ScanResultKey.restype = c_char_p + _ins.ScanResultValue.argtypes = [c_void_p] + _ins.ScanResultValue.restype = c_char_p +_set_function_sign() + +g_done = False +def default_callback(error): + print 'status: %s' % SDKError[error] + #print 'key: %s' % param.key + #print 'value: %s' % param.value + #print 'deleted: %s' % param.deleted + #print 'context: %s' % param.context + g_done = True + +if __name__ == '__main__': + FLAG_members = 'john-ubuntu:8868,john-ubuntu:8869,john-ubuntu:8870,john-ubuntu:8871,john-ubuntu:8872' + result = sdk.scan('','') + while not result.done(): + print result.key(), result.value() + result.next() + sdk.watch('test', default_callback, 'context') + while not g_done: + time.sleep(1) + diff --git a/sdk/ins_wrapper.cc b/sdk/ins_wrapper.cc new file mode 100755 index 0000000..34b9cc0 --- /dev/null +++ b/sdk/ins_wrapper.cc @@ -0,0 +1,210 @@ +#include "ins_sdk.h" + +#include +#include + +using namespace galaxy::ins::sdk; + +extern "C" { + +typedef void (*SessionTimeoutCallback)(void*); + +// ----- InsSDK Wrappers ----- +InsSDK* GetSDK(const char* server_list) { + return new InsSDK(server_list); +} + +InsSDK* GetSDKFromArray(int count, const char* members[]) { + std::vector member_vec; + for (int i = 0; i < count; ++i) { + member_vec.push_back(members[i]); + } + return new InsSDK(member_vec); +} + +void DeleteSDK(InsSDK* sdk) { + if (sdk != NULL) { + delete sdk; + } +} + +void DeleteClusterArray(ClusterNodeInfo* pointer) { + if (pointer != NULL) { + delete[] pointer; + } +} + +ClusterNodeInfo* SDKShowCluster(InsSDK* sdk, int* count) { + if (sdk == NULL) { + return NULL; + } + std::vector cluster_info; + sdk->ShowCluster(&cluster_info); + *count = cluster_info.size(); + ClusterNodeInfo* cluster_arr = new ClusterNodeInfo[*count]; + std::copy(cluster_info.begin(), cluster_info.end(), cluster_arr); + return cluster_arr; +} + +bool SDKPut(InsSDK* sdk, const char* key, const char* value, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->Put(key, value, error); +} + +const char* SDKGet(InsSDK* sdk, const char* key, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return ""; + } + std::string value; + sdk->Get(key, &value, error); + return value.c_str(); +} + +bool SDKDelete(InsSDK* sdk, const char* key, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->Delete(key, error); +} + +ScanResult* SDKScan(InsSDK* sdk, const char* start_key, const char* end_key) { + if (sdk == NULL) { + return NULL; + } + return sdk->Scan(start_key, end_key); +} + +bool SDKWatch(InsSDK* sdk, const char* key, WatchCallback user_callback, + void* context, SDKError* error) { + if (sdk == NULL) { + return false; + } + return sdk->Watch(key, user_callback, context, error); +} + +bool SDKLock(InsSDK* sdk, const char* key, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->Lock(key, error); +} + +bool SDKTryLock(InsSDK* sdk, const char* key, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->TryLock(key, error); +} + +bool SDKUnLock(InsSDK* sdk, const char* key, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->UnLock(key, error); +} + +bool SDKLogin(InsSDK* sdk, const char* username, const char* password, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->Login(username, password, error); +} + +bool SDKLogout(InsSDK* sdk, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->Logout(error); +} + +bool SDKRegister(InsSDK* sdk, const char* username, const char* password, SDKError* error) { + if (sdk == NULL) { + *error = kPermissionDenied; + return false; + } + return sdk->Register(username, password, error); +} + +const char* SDKGetSessionID(InsSDK* sdk) { + if (sdk == NULL) { + return ""; + } + return sdk->GetSessionID().c_str(); +} + +const char* SDKGetCurrentUserID(InsSDK* sdk) { + if (sdk == NULL) { + return ""; + } + return sdk->GetCurrentUserID().c_str(); +} + +bool SDKIsLoggedIn(InsSDK* sdk) { + if (sdk == NULL) { + return false; + } + return sdk->IsLoggedIn(); +} + +void SDKRegisterSessionTimeout(InsSDK* sdk, SessionTimeoutCallback handle_session_timeout, + void* ctx) { + if (sdk == NULL) { + return; + } + sdk->RegisterSessionTimeout(handle_session_timeout, ctx); +} + +// ----- ScanResult Wrappers ----- +void DeleteScanResult(ScanResult* result) { + if (result != NULL) { + delete result; + } +} + +bool ScanResultDone(ScanResult* result) { + if (result == NULL) { + return false; + } + return result->Done(); +} + +SDKError ScanResultError(ScanResult* result) { + if (result == NULL) { + return kPermissionDenied; + } + return result->Error(); +} + +const char* ScanResultKey(ScanResult* result) { + if (result == NULL) { + return ""; + } + return result->Key().c_str(); +} + +const char* ScanResultValue(ScanResult* result) { + if (result == NULL) { + return ""; + } + return result->Value().c_str(); +} + +void ScanResultNext(ScanResult* result) { + if (result == NULL) { + return; + } + result->Next(); +} + +} + diff --git a/ubuntu_build.sh b/ubuntu_build.sh index 58faed1..32c9321 100755 --- a/ubuntu_build.sh +++ b/ubuntu_build.sh @@ -16,7 +16,7 @@ cd - wget https://github.com/gflags/gflags/archive/v2.1.2.tar.gz tar xf v2.1.2.tar.gz -cd gflags-2.1.2 && cmake -DGFLAGS_NAMESPACE=google && make -j4 && sudo make install +cd gflags-2.1.2 && cmake -DGFLAGS_NAMESPACE=google -DCMAKE_CXX_FLAGS=-fPIC && make -j4 && sudo make install cd - make -j4 From 212200ee9464f317e7bc63eda246ab3390b95411 Mon Sep 17 00:00:00 2001 From: Kai-Zhang Date: Mon, 14 Sep 2015 15:35:15 +0800 Subject: [PATCH 2/6] Add watch and timeout handler to python sdk --- sdk/ins_sdk.py | 108 ++++++++++++++++++++++++++++----------------- sdk/ins_wrapper.cc | 48 ++++++++++++++++++-- 2 files changed, 112 insertions(+), 44 deletions(-) diff --git a/sdk/ins_sdk.py b/sdk/ins_sdk.py index 2488e5b..5c6297e 100755 --- a/sdk/ins_sdk.py +++ b/sdk/ins_sdk.py @@ -1,6 +1,6 @@ #!/usr/bin/python -from ctypes import CDLL, byref, CFUNCTYPE, POINTER, Structure +from ctypes import CDLL, byref, CFUNCTYPE, POINTER, Structure, cast, addressof from ctypes import c_void_p, c_char_p, c_long, c_int, c_bool, py_object from collections import Iterable import threading @@ -13,6 +13,13 @@ ClusterInfo = ('server_id', 'status', 'term', 'last_log_index', 'last_log_term', 'commit_index', 'last_applied') +class WatchParam: + def __init__(self, key, value, deleted, context): + self.key = key + self.value = value + self.deleted = deleted + self.context = context + class InsSDK: class _ClusterNodeInfo(Structure): _fields_ = [('server_id', c_char_p), @@ -28,24 +35,27 @@ class _WatchParam(Structure): ('deleted', c_bool), ('context', c_void_p)] + WatchCallback = CFUNCTYPE(None, POINTER(_WatchParam), c_long, c_int) + SessionTimeoutCallback = CFUNCTYPE(None, c_long, c_void_p) + def __init__(self, members): if isinstance(members, basestring): - self.sdk = _ins.GetSDK(members) + self._sdk = _ins.GetSDK(members) else: - self.sdk = _ins.GetSDKFromArray(members) - self.local = threading.local() - self.local.errno = c_int() + self._sdk = _ins.GetSDKFromArray(members) + self._local = threading.local() + self._local.errno = c_int() def __del__(self): if _ins != None: - _ins.DeleteSDK(self.sdk) + _ins.DeleteSDK(self._sdk) def error(self): - return SDKError[self.local.errno] + return SDKError[self._local.errno] def show(self): count = c_int() - cluster_ptr = _ins.SDKShowCluster(self.sdk, byref(count)) + cluster_ptr = _ins.SDKShowCluster(self._sdk, byref(count)) count = count.value ClusterArray = self._ClusterNodeInfo * count clusters = ClusterArray.from_address(cluster_ptr) @@ -64,55 +74,73 @@ def show(self): return cluster_list def get(self, key): - return _ins.SDKGet(self.sdk, key, byref(self.local.errno)) + return _ins.SDKGet(self._sdk, key, byref(self._local.errno)) def put(self, key, value): - return _ins.SDKPut(self.sdk, key, value, byref(self.local.errno)) + return _ins.SDKPut(self._sdk, key, value, byref(self._local.errno)) def delete(self, key): - return _ins.SDKDelete(self.sdk, key, byref(self.local.errno)) + return _ins.SDKDelete(self._sdk, key, byref(self._local.errno)) def scan(self, start_key, end_key): - self.local.errno = c_int(0) - return ScanResult(_ins.SDKScan(self.sdk, start_key, end_key)) + self._local.errno = c_int(0) + return ScanResult(_ins.SDKScan(self._sdk, start_key, end_key)) - # TODO unable to transfer the function pointer def watch(self, key, callback, context): - WatchCallback = CFUNCTYPE(self._WatchParam, c_int) ctx = py_object(context) - return _ins.SDKWatch(self.sdk, key, WatchCallback(callback), ctx, byref(self.local.errno)) + self._contexts[addressof(ctx)] = ctx + self._callback[id(callback)] = callback + def _watch_wrapper(param, cb, error): + param = param.contents + context = cast(c_void_p(param.context), POINTER(py_object)).contents + pm = WatchParam(param.key, param.value, param.deleted, context.value) + InsSDK._callback[cb](pm, SDKError[error]) + del InsSDK._callback[cb] + del InsSDK._contexts[addressof(context)] + return _ins.SDKWatch(self._sdk, key, self.WatchCallback(_watch_wrapper), \ + id(callback), byref(ctx), byref(self._local.errno)) def lock(self, key): - return _ins.SDKLock(self.sdk, key, byref(self.local.errno)) + return _ins.SDKLock(self._sdk, key, byref(self._local.errno)) def trylock(self, key): - return _ins.SDKTryLock(self.sdk, key, byref(self.local.errno)) + return _ins.SDKTryLock(self._sdk, key, byref(self._local.errno)) def unlock(self, key): - return _ins.SDKUnLock(self.sdk, key, byref(self.local.errno)) + return _ins.SDKUnLock(self._sdk, key, byref(self._local.errno)) def login(self, username, password): - return _ins.SDKLogin(self.sdk, username, password, byref(self.local.errno)) + return _ins.SDKLogin(self._sdk, username, password, byref(self._local.errno)) def logout(self): - return _ins.SDKLogout(self.sdk, byref(self.local.errno)) + return _ins.SDKLogout(self._sdk, byref(self._local.errno)) def register(self, username, password): - return _ins.SDKRegister(self.sdk, username, password, byref(self.local.errno)) + return _ins.SDKRegister(self._sdk, username, password, byref(self._local.errno)) def get_session_id(self): - return _ins.SDKGetSessionID(self.sdk) + return _ins.SDKGetSessionID(self._sdk) def get_current_user_id(self): - return _ins.SDKGetCurrentUserID(self.sdk) + return _ins.SDKGetCurrentUserID(self._sdk) def is_logged_in(self): - return _ins.SDKIsLoggedIn(self.sdk) + return _ins.SDKIsLoggedIn(self._sdk) def register_session_timeout(self, callback, context): - SessionTimeoutCallback = CFUNCTYPE(c_void_p) ctx = py_object(context) - _ins.SDKRegisterSessionTimeout(self.sdk, SessionTimeoutCallback(callback), byref(ctx)) + self._contexts[addressof(context)] = context + self._callback[id(callback)] = callback + def _timeout_wrapper(cb, ctx): + context = cast(ctx, POINTER(py_object)).contents + InsSDK._callback[cb](context.value) + del InsSDK._callback[cb] + del InsSDK._contexts[addressof(context)] + _ins.SDKRegisterSessionTimeout(self._sdk, self.SessionTimeoutCallback(wrapper), \ + id(callback), byref(ctx)) + + _contexts = {} + _callback = {} # <-- InsSDK class definition ends here @@ -164,7 +192,8 @@ def _set_function_sign(): _ins.SDKDelete.restype = c_bool _ins.SDKScan.argtypes = [c_void_p, c_char_p, c_char_p] _ins.SDKScan.restype = c_void_p - _ins.SDKWatch.argtypes = [c_void_p, c_char_p, c_char_p, c_void_p] + _ins.SDKWatch.argtypes = [c_void_p, c_char_p, InsSDK.WatchCallback, \ + c_long, c_void_p, c_void_p] _ins.SDKWatch.restype = c_bool _ins.SDKLock.argtypes = [c_void_p, c_char_p, POINTER(c_int)] _ins.SDKLock.restype = c_bool @@ -184,7 +213,8 @@ def _set_function_sign(): _ins.SDKGetCurrentUserID.restype = c_char_p _ins.SDKIsLoggedIn.argtypes = [c_void_p] _ins.SDKIsLoggedIn.restype = c_bool - _ins.SDKRegisterSessionTimeout.argtypes = [c_void_p, CFUNCTYPE(c_void_p), c_void_p] + _ins.SDKRegisterSessionTimeout.argtypes = [c_void_p, InsSDK.SessionTimeoutCallback, \ + c_long, c_void_p] _ins.ScanResultDone.argtypes = [c_void_p] _ins.ScanResultDone.restype = c_bool _ins.ScanResultError.argtypes = [c_void_p] @@ -196,21 +226,19 @@ def _set_function_sign(): _set_function_sign() g_done = False -def default_callback(error): - print 'status: %s' % SDKError[error] - #print 'key: %s' % param.key - #print 'value: %s' % param.value - #print 'deleted: %s' % param.deleted - #print 'context: %s' % param.context +def default_callback(param, error): + print 'status: %s' % error + print 'key: %s' % param.key + print 'value: %s' % param.value + print 'deleted: %s' % param.deleted + print 'context: %s' % repr(param.context) + global g_done g_done = True if __name__ == '__main__': FLAG_members = 'john-ubuntu:8868,john-ubuntu:8869,john-ubuntu:8870,john-ubuntu:8871,john-ubuntu:8872' - result = sdk.scan('','') - while not result.done(): - print result.key(), result.value() - result.next() - sdk.watch('test', default_callback, 'context') + sdk = InsSDK(FLAG_members) + sdk.watch('test', default_callback, WatchParam('key', 'value', True, 123)) while not g_done: time.sleep(1) diff --git a/sdk/ins_wrapper.cc b/sdk/ins_wrapper.cc index 34b9cc0..779d0a3 100755 --- a/sdk/ins_wrapper.cc +++ b/sdk/ins_wrapper.cc @@ -8,6 +8,34 @@ using namespace galaxy::ins::sdk; extern "C" { typedef void (*SessionTimeoutCallback)(void*); +typedef void (*CWatchCallback)(WatchParam*, long, SDKError); +typedef void (*CTimeoutCallback)(long, void*); + +struct CallbackPack { + void* callback_wrapper; + long callback_id; + void* ctx; +}; + +void WatchCallbackWrapper(const WatchParam& param, SDKError error) { + CallbackPack* pack = static_cast(param.context); + CWatchCallback cb = (CWatchCallback)pack->callback_wrapper; + long callback_id = pack->callback_id; + void* ctx = pack->ctx; + delete pack; + WatchParam p = param; + p.context = ctx; + cb(&p, callback_id, error); +} + +void TimeoutWrapper(void* ctx) { + CallbackPack* pack = static_cast(ctx); + CTimeoutCallback cb = (CTimeoutCallback)pack->callback_wrapper; + long callback_id = pack->callback_id; + void* context = pack->ctx; + delete pack; + cb(callback_id, context); +} // ----- InsSDK Wrappers ----- InsSDK* GetSDK(const char* server_list) { @@ -79,12 +107,18 @@ ScanResult* SDKScan(InsSDK* sdk, const char* start_key, const char* end_key) { return sdk->Scan(start_key, end_key); } -bool SDKWatch(InsSDK* sdk, const char* key, WatchCallback user_callback, - void* context, SDKError* error) { +// NOTE: This interface is customized for python sdk. +// For other purpose, please implement another watch interface +bool SDKWatch(InsSDK* sdk, const char* key, CWatchCallback wrapper, + long callback_id, void* context, SDKError* error) { if (sdk == NULL) { return false; } - return sdk->Watch(key, user_callback, context, error); + CallbackPack* pack = new CallbackPack; + pack->callback_wrapper = reinterpret_cast(wrapper); + pack->callback_id = callback_id; + pack->ctx = context; + return sdk->Watch(key, WatchCallbackWrapper, pack, error); } bool SDKLock(InsSDK* sdk, const char* key, SDKError* error) { @@ -156,11 +190,17 @@ bool SDKIsLoggedIn(InsSDK* sdk) { return sdk->IsLoggedIn(); } +// NOTE: This interface is customized for python sdk. +// For other purpose, please implement another interface void SDKRegisterSessionTimeout(InsSDK* sdk, SessionTimeoutCallback handle_session_timeout, - void* ctx) { + long callback_id, void* ctx) { if (sdk == NULL) { return; } + CallbackPack* pack = new CallbackPack(); + pack->callback_wrapper = reinterpret_cast(handle_session_timeout); + pack->callback_id = callback_id; + pack->ctx = ctx; sdk->RegisterSessionTimeout(handle_session_timeout, ctx); } From ab458ea3417361a97cdd7d97e8e41c9b119c0509 Mon Sep 17 00:00:00 2001 From: Kai-Zhang Date: Mon, 14 Sep 2015 15:58:39 +0800 Subject: [PATCH 3/6] Strip useless test code in python sdk --- sdk/ins_sdk.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/sdk/ins_sdk.py b/sdk/ins_sdk.py index 5c6297e..4372604 100755 --- a/sdk/ins_sdk.py +++ b/sdk/ins_sdk.py @@ -2,9 +2,7 @@ from ctypes import CDLL, byref, CFUNCTYPE, POINTER, Structure, cast, addressof from ctypes import c_void_p, c_char_p, c_long, c_int, c_bool, py_object -from collections import Iterable import threading -import time SDKError = ('OK', 'ClusterDown', 'NoSuchKey', 'Timeout', 'LockFail', 'CleanBinlogFail', 'UserExists', 'PermissionDenied', 'PasswordError', @@ -225,20 +223,3 @@ def _set_function_sign(): _ins.ScanResultValue.restype = c_char_p _set_function_sign() -g_done = False -def default_callback(param, error): - print 'status: %s' % error - print 'key: %s' % param.key - print 'value: %s' % param.value - print 'deleted: %s' % param.deleted - print 'context: %s' % repr(param.context) - global g_done - g_done = True - -if __name__ == '__main__': - FLAG_members = 'john-ubuntu:8868,john-ubuntu:8869,john-ubuntu:8870,john-ubuntu:8871,john-ubuntu:8872' - sdk = InsSDK(FLAG_members) - sdk.watch('test', default_callback, WatchParam('key', 'value', True, 123)) - while not g_done: - time.sleep(1) - From a8414bffeb93970b40fcce5c0f2861b23f0065df Mon Sep 17 00:00:00 2001 From: Kai-Zhang Date: Mon, 14 Sep 2015 16:06:17 +0800 Subject: [PATCH 4/6] Avoid mem leak in truncate database --- server/user_manage.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/user_manage.cc b/server/user_manage.cc index bc20311..12be87b 100755 --- a/server/user_manage.cc +++ b/server/user_manage.cc @@ -253,8 +253,10 @@ bool UserManager::TruncateDatabase() { batch.Delete(it->key().ToString()); } if (!it->status().ok()) { + delete it; return false; } + delete it; leveldb::Status status = user_db_->Write(leveldb::WriteOptions(), &batch); return status.ok(); } From 32e91b5ce23c225f33cfd6bfa2a9fe12afad9605 Mon Sep 17 00:00:00 2001 From: Kai-Zhang Date: Mon, 14 Sep 2015 19:33:40 +0800 Subject: [PATCH 5/6] Update COMAKE to build internal --- COMAKE | 4 ++++ sdk/ins_wrapper.cc | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/COMAKE b/COMAKE index 0de8c46..7b71ad2 100755 --- a/COMAKE +++ b/COMAKE @@ -40,6 +40,8 @@ ins_sources = 'server/ins_main.cc server/ins_node_impl.cc server/flags.cc server ins_sdk_sources = 'sdk/ins_sdk.cc common/logging.cc proto/ins_node.proto server/flags.cc' ins_sdk_headers = 'sdk/ins_sdk.h' +ins_python_sources = 'sdk/ins_sdk.cc sdk/ins_wrapper.cc common/logging.cc proto/ins_node.proto server/flags.cc' + ins_cli_sources = 'sdk/ins_sdk.cc proto/ins_node.proto common/logging.cc sdk/ins_cli.cc server/flags.cc' sample_sources = 'sdk/sample.cc' @@ -47,8 +49,10 @@ sample_sources = 'sdk/sample.cc' binlog_test_sources = 'storage/binlog.cc storage/binlog_test.cc common/logging.cc proto/ins_node.proto' Application('ins', Sources(ins_sources)) Application('ins_cli', Sources(ins_cli_sources)) +SharedLibrary('ins_py', Sources(ins_python_sources), LinkDeps(True)) SharedLibrary('ins_sdk', Sources(ins_sdk_sources), LinkDeps(True)) StaticLibrary('ins_sdk', Sources(ins_sdk_sources), HeaderFiles(ins_sdk_headers)) +TARGET('py_sdk_copy', ShellCommands('cp sdk/ins_sdk.py output/so/'), Prefixes('libins_py.so')) Application('binlog_test', Sources(binlog_test_sources)) Application('sample', Sources(sample_sources), Libraries('libins_sdk.a')) diff --git a/sdk/ins_wrapper.cc b/sdk/ins_wrapper.cc index 779d0a3..40d3ae2 100755 --- a/sdk/ins_wrapper.cc +++ b/sdk/ins_wrapper.cc @@ -115,7 +115,7 @@ bool SDKWatch(InsSDK* sdk, const char* key, CWatchCallback wrapper, return false; } CallbackPack* pack = new CallbackPack; - pack->callback_wrapper = reinterpret_cast(wrapper); + pack->callback_wrapper = (void*)(wrapper); pack->callback_id = callback_id; pack->ctx = context; return sdk->Watch(key, WatchCallbackWrapper, pack, error); @@ -198,7 +198,7 @@ void SDKRegisterSessionTimeout(InsSDK* sdk, SessionTimeoutCallback handle_sessio return; } CallbackPack* pack = new CallbackPack(); - pack->callback_wrapper = reinterpret_cast(handle_session_timeout); + pack->callback_wrapper = (void*)(handle_session_timeout); pack->callback_id = callback_id; pack->ctx = ctx; sdk->RegisterSessionTimeout(handle_session_timeout, ctx); From f4af5fa37bf12f93b201b432cbf9a853134fea6e Mon Sep 17 00:00:00 2001 From: Kai-Zhang Date: Mon, 14 Sep 2015 19:46:05 +0800 Subject: [PATCH 6/6] Use abstract function pointer instead of c-style casting --- sdk/ins_wrapper.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/ins_wrapper.cc b/sdk/ins_wrapper.cc index 40d3ae2..0e8ecfa 100755 --- a/sdk/ins_wrapper.cc +++ b/sdk/ins_wrapper.cc @@ -10,9 +10,10 @@ extern "C" { typedef void (*SessionTimeoutCallback)(void*); typedef void (*CWatchCallback)(WatchParam*, long, SDKError); typedef void (*CTimeoutCallback)(long, void*); +typedef void (*AbstractFunc)(); struct CallbackPack { - void* callback_wrapper; + AbstractFunc callback_wrapper; long callback_id; void* ctx; }; @@ -115,7 +116,7 @@ bool SDKWatch(InsSDK* sdk, const char* key, CWatchCallback wrapper, return false; } CallbackPack* pack = new CallbackPack; - pack->callback_wrapper = (void*)(wrapper); + pack->callback_wrapper = reinterpret_cast(wrapper); pack->callback_id = callback_id; pack->ctx = context; return sdk->Watch(key, WatchCallbackWrapper, pack, error); @@ -198,7 +199,7 @@ void SDKRegisterSessionTimeout(InsSDK* sdk, SessionTimeoutCallback handle_sessio return; } CallbackPack* pack = new CallbackPack(); - pack->callback_wrapper = (void*)(handle_session_timeout); + pack->callback_wrapper = reinterpret_cast(handle_session_timeout); pack->callback_id = callback_id; pack->ctx = ctx; sdk->RegisterSessionTimeout(handle_session_timeout, ctx);