diff --git a/thingsboard_gateway/connectors/ftp/ftp_connector.py b/thingsboard_gateway/connectors/ftp/ftp_connector.py index cb8720c2..51d4bdb4 100644 --- a/thingsboard_gateway/connectors/ftp/ftp_connector.py +++ b/thingsboard_gateway/connectors/ftp/ftp_connector.py @@ -316,45 +316,83 @@ def server_side_rpc_handler(self, content): rpc_method = content['data']['method'] - # check if RPC type is connector RPC (can be only 'get' or 'set') + # check if RPC type is connector RPC try: (connector_type, rpc_method_name) = rpc_method.split('_') if connector_type == self._connector_type: - rpc_method = rpc_method_name + value_expression = content['data']['params']['valueExpression'] + converted_data, success_sent = self.__process_rpc(rpc_method_name, value_expression) + self.__send_rpc_reply({}, content, converted_data, success_sent) + return except ValueError: pass + # check if RPC method is reserved get/set + if rpc_method == 'get' or rpc_method == 'set': + params = {} + for param in content['data']['params'].split(';'): + try: + (key, value) = param.split('=') + except ValueError: + continue + + if key and value: + params[key] = value + + rpc_method = 'write' if rpc_method == 'set' else 'read' + + if rpc_method == 'read': + value_expression = params.get('filePath') + else: + value_expression = params.get('filePath') + ';' + params.get('value') + + converted_data, success_sent = self.__process_rpc(rpc_method, value_expression) + self.__send_rpc_reply({}, content, converted_data, success_sent) + return + for rpc_request in self.__rpc_requests: if fullmatch(rpc_request['methodFilter'], rpc_method): - with self.__ftp() as ftp: - if not self._connected or not ftp.sock: - self.__connect(ftp) - - converted_data = None - success_sent = None - if content['data']['method'] == 'write': - try: - arr = re.sub("'", '', content['data']['params']['valueExpression']).split(';') - io_stream = self._get_io_stream(arr[1]) - ftp.storbinary('STOR ' + arr[0], io_stream) - io_stream.close() - success_sent = True - except Exception as e: - self.__log.error(e) - converted_data = '{"error": "' + str(e) + '"}' - else: - handle_stream = io.BytesIO() - ftp.retrbinary('RETR ' + content['data']['params']['valueExpression'], handle_stream.write) - converted_data = str(handle_stream.getvalue(), 'UTF-8') - handle_stream.close() - - if content.get('device') and fullmatch(rpc_request['deviceNameFilter'], content.get('device')): - self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"], - success_sent=success_sent, content=converted_data) - else: - return converted_data + value_expression = content['data']['params']['valueExpression'] + converted_data, success_sent = self.__process_rpc(rpc_method, value_expression) + + self.__send_rpc_reply(rpc_request, content, converted_data, success_sent) except Exception as e: self.__log.exception(e) + def __process_rpc(self, method, value_expression): + with self.__ftp() as ftp: + if not self._connected or not ftp.sock: + self.__connect(ftp) + + converted_data = None + success_sent = None + if method == 'write': + try: + arr = re.sub("'", '', value_expression).split(';') + io_stream = self._get_io_stream(arr[1]) + ftp.storbinary('STOR ' + arr[0], io_stream) + io_stream.close() + success_sent = True + except Exception as e: + self.__log.error(e) + converted_data = '{"error": "' + str(e) + '"}' + else: + handle_stream = io.BytesIO() + ftp.retrbinary('RETR ' + value_expression, handle_stream.write) + converted_data = str(handle_stream.getvalue(), 'UTF-8') + handle_stream.close() + + return converted_data, success_sent + + def __send_rpc_reply(self, rpc_request, content, converted_data, success_sent): + if content.get('device') and fullmatch(rpc_request.get('deviceNameFilter', ''), content.get('device')): + self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"], + success_sent=success_sent, content=converted_data) + elif content.get('device'): + self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"], + success_sent=success_sent, content=converted_data) + else: + return converted_data + def get_config(self): return self.__config diff --git a/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py b/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py index 6809bca3..ab7c922d 100644 --- a/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py +++ b/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py @@ -127,7 +127,7 @@ def _get_key_or_value(key, arr): first_val_index = split_val_arr[0] or 0 last_val_index = split_val_arr[1] or len(arr) - return arr[int(first_val_index):int(last_val_index)][0] + return arr[0][int(first_val_index):int(last_val_index)] else: return key diff --git a/thingsboard_gateway/connectors/ftp/path.py b/thingsboard_gateway/connectors/ftp/path.py index 213ac156..b7837252 100644 --- a/thingsboard_gateway/connectors/ftp/path.py +++ b/thingsboard_gateway/connectors/ftp/path.py @@ -60,14 +60,17 @@ def __get_files(self, ftp, paths, file_name, file_ext): folder_and_files = ftp.nlst() for ff in folder_and_files: - cur_file_name, cur_file_ext = ff.split('.') - if cur_file_ext in COMPATIBLE_FILE_EXTENSIONS and self.__is_file(ftp, ff) and ftp.size(ff): - if (file_name == file_ext == '*') \ - or pattern.fullmatch(cur_file_name) \ - or (cur_file_ext == file_ext and file_name == cur_file_name) \ - or (file_name != '*' and cur_file_name == file_name and ( - file_ext == cur_file_ext or file_ext == '*')): - kwargs[ftp.voidcmd(f"MDTM {ff}")] = (item + '/' + ff) + try: + cur_file_name, cur_file_ext = ff.split('.') + if cur_file_ext in COMPATIBLE_FILE_EXTENSIONS and self.__is_file(ftp, ff) and ftp.size(ff): + if (file_name == file_ext == '*') \ + or pattern.fullmatch(cur_file_name) \ + or (cur_file_ext == file_ext and file_name == cur_file_name) \ + or (file_name != '*' and cur_file_name == file_name and ( + file_ext == cur_file_ext or file_ext == '*')): + kwargs[ftp.voidcmd(f"MDTM {ff}")] = (item + '/' + ff) + except ValueError: + continue if self._with_sorting_files: return [File(path_to_file=val, read_mode=self.__read_mode, max_size=self.__max_size) for (_, val) in diff --git a/thingsboard_gateway/connectors/rest/rest_connector.py b/thingsboard_gateway/connectors/rest/rest_connector.py index bd245d69..9735de50 100644 --- a/thingsboard_gateway/connectors/rest/rest_connector.py +++ b/thingsboard_gateway/connectors/rest/rest_connector.py @@ -103,6 +103,10 @@ def load_endpoints(self): # configuring Attribute Request endpoints if len(self.__config.get('attributeRequests', [])): + while self.__gateway.tb_client is None and not hasattr(self.__gateway.tb_client, 'client'): + self.__log.info('Waiting for ThingsBoard client to be initialized...') + sleep(1) + self.__attribute_type = { 'client': self.__gateway.tb_client.client.gw_request_client_attributes, 'shared': self.__gateway.tb_client.client.gw_request_shared_attributes @@ -288,8 +292,8 @@ def server_side_rpc_handler(self, content): params[key] = value uplink_converter = self._default_uplink_converter - downlink_converter = self._default_downlink_converter - converted_data = downlink_converter.convert(params, content) + downlink_converter = self._default_downlink_converter(params, self.__log) + converted_data = downlink_converter.convert(config=params, data=content) request_dict = {'config': {**params, **converted_data}, 'request': regular_request, 'converter': uplink_converter} @@ -303,7 +307,7 @@ def server_side_rpc_handler(self, content): for rpc_request in self.__rpc_requests: if fullmatch(rpc_request["deviceNameFilter"], content["device"]) and \ fullmatch(rpc_request["methodFilter"], rpc_method): - converted_data = rpc_request["downlink_converter"].convert(rpc_request, content) + converted_data = rpc_request["downlink_converter"].convert(config=rpc_request, data=content) request_dict = {"config": {**rpc_request, **converted_data}, @@ -373,7 +377,7 @@ def __send_request(self, request_dict, converter_queue, logger, with_queue=True) logger.debug(url) security = None - if request_dict["config"]["security"]["type"].lower() == "basic": + if request_dict["config"].get('security', {}).get('type', 'anonymous').lower() == "basic": security = HTTPBasicAuthRequest(request_dict["config"]["security"]["username"], request_dict["config"]["security"]["password"]) diff --git a/thingsboard_gateway/connectors/socket/socket_connector.py b/thingsboard_gateway/connectors/socket/socket_connector.py index 0b1db57c..f4b45f54 100644 --- a/thingsboard_gateway/connectors/socket/socket_connector.py +++ b/thingsboard_gateway/connectors/socket/socket_connector.py @@ -379,7 +379,7 @@ def get_id(self): @CustomCollectStatistics(start_stat_type='allBytesSentToDevices') def __write_value_via_tcp(self, address, port, value): try: - self.__connections[(address, int(port))].sendall(value) + self.__connections[(address, int(port))].sendall(bytes(value, encoding='utf-8')) return 'ok' except KeyError: try: @@ -452,13 +452,13 @@ def server_side_rpc_handler(self, content): else: self.__write_value_via_udp(params['address'], int(params['port']), params['value']) except KeyError: - self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'), + self.__gateway.send_rpc_reply(device=device['deviceName'], req_id=content['data'].get('id'), content='Not enough params') except ValueError: - self.__gateway.send_rpc_reply(device=device, req_id=content['data']['id'], + self.__gateway.send_rpc_reply(device=device['deviceName'], req_id=content['data']['id'], content='Param "port" have to be int type') else: - self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'), content=str(result)) + self.__gateway.send_rpc_reply(device=device['deviceName'], req_id=content['data'].get('id'), content=str(result)) else: for rpc_config in device['serverSideRpc']: for (key, value) in content['data'].items(): diff --git a/thingsboard_gateway/connectors/socket/socket_decorators.py b/thingsboard_gateway/connectors/socket/socket_decorators.py index b7c5233a..94ebb1b7 100644 --- a/thingsboard_gateway/connectors/socket/socket_decorators.py +++ b/thingsboard_gateway/connectors/socket/socket_decorators.py @@ -10,6 +10,6 @@ def inner(*args, **kwargs): except ValueError: pass - func(*args, **kwargs) + return func(*args, **kwargs) return inner