Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed reserved GET/SET rpc method #1629

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 67 additions & 29 deletions thingsboard_gateway/connectors/ftp/ftp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,45 +316,83 @@ def server_side_rpc_handler(self, content):

rpc_method = content['data']['method']

# check if RPC type is connector RPC (can be only 'get' or 'set')
# check if RPC type is connector RPC
try:
(connector_type, rpc_method_name) = rpc_method.split('_')
if connector_type == self._connector_type:
rpc_method = rpc_method_name
value_expression = content['data']['params']['valueExpression']
converted_data, success_sent = self.__process_rpc(rpc_method_name, value_expression)
self.__send_rpc_reply({}, content, converted_data, success_sent)
return
except ValueError:
pass

# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue

if key and value:
params[key] = value

rpc_method = 'write' if rpc_method == 'set' else 'read'

if rpc_method == 'read':
value_expression = params.get('filePath')
else:
value_expression = params.get('filePath') + ';' + params.get('value')

converted_data, success_sent = self.__process_rpc(rpc_method, value_expression)
self.__send_rpc_reply({}, content, converted_data, success_sent)
return

for rpc_request in self.__rpc_requests:
if fullmatch(rpc_request['methodFilter'], rpc_method):
with self.__ftp() as ftp:
if not self._connected or not ftp.sock:
self.__connect(ftp)

converted_data = None
success_sent = None
if content['data']['method'] == 'write':
try:
arr = re.sub("'", '', content['data']['params']['valueExpression']).split(';')
io_stream = self._get_io_stream(arr[1])
ftp.storbinary('STOR ' + arr[0], io_stream)
io_stream.close()
success_sent = True
except Exception as e:
self.__log.error(e)
converted_data = '{"error": "' + str(e) + '"}'
else:
handle_stream = io.BytesIO()
ftp.retrbinary('RETR ' + content['data']['params']['valueExpression'], handle_stream.write)
converted_data = str(handle_stream.getvalue(), 'UTF-8')
handle_stream.close()

if content.get('device') and fullmatch(rpc_request['deviceNameFilter'], content.get('device')):
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
success_sent=success_sent, content=converted_data)
else:
return converted_data
value_expression = content['data']['params']['valueExpression']
converted_data, success_sent = self.__process_rpc(rpc_method, value_expression)

self.__send_rpc_reply(rpc_request, content, converted_data, success_sent)
except Exception as e:
self.__log.exception(e)

def __process_rpc(self, method, value_expression):
with self.__ftp() as ftp:
if not self._connected or not ftp.sock:
self.__connect(ftp)

converted_data = None
success_sent = None
if method == 'write':
try:
arr = re.sub("'", '', value_expression).split(';')
io_stream = self._get_io_stream(arr[1])
ftp.storbinary('STOR ' + arr[0], io_stream)
io_stream.close()
success_sent = True
except Exception as e:
self.__log.error(e)
converted_data = '{"error": "' + str(e) + '"}'
else:
handle_stream = io.BytesIO()
ftp.retrbinary('RETR ' + value_expression, handle_stream.write)
converted_data = str(handle_stream.getvalue(), 'UTF-8')
handle_stream.close()

return converted_data, success_sent

def __send_rpc_reply(self, rpc_request, content, converted_data, success_sent):
if content.get('device') and fullmatch(rpc_request.get('deviceNameFilter', ''), content.get('device')):
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
success_sent=success_sent, content=converted_data)
elif content.get('device'):
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
success_sent=success_sent, content=converted_data)
else:
return converted_data

def get_config(self):
return self.__config
2 changes: 1 addition & 1 deletion thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def _get_key_or_value(key, arr):
first_val_index = split_val_arr[0] or 0
last_val_index = split_val_arr[1] or len(arr)

return arr[int(first_val_index):int(last_val_index)][0]
return arr[0][int(first_val_index):int(last_val_index)]
else:
return key

Expand Down
19 changes: 11 additions & 8 deletions thingsboard_gateway/connectors/ftp/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ def __get_files(self, ftp, paths, file_name, file_ext):
folder_and_files = ftp.nlst()

for ff in folder_and_files:
cur_file_name, cur_file_ext = ff.split('.')
if cur_file_ext in COMPATIBLE_FILE_EXTENSIONS and self.__is_file(ftp, ff) and ftp.size(ff):
if (file_name == file_ext == '*') \
or pattern.fullmatch(cur_file_name) \
or (cur_file_ext == file_ext and file_name == cur_file_name) \
or (file_name != '*' and cur_file_name == file_name and (
file_ext == cur_file_ext or file_ext == '*')):
kwargs[ftp.voidcmd(f"MDTM {ff}")] = (item + '/' + ff)
try:
cur_file_name, cur_file_ext = ff.split('.')
if cur_file_ext in COMPATIBLE_FILE_EXTENSIONS and self.__is_file(ftp, ff) and ftp.size(ff):
if (file_name == file_ext == '*') \
or pattern.fullmatch(cur_file_name) \
or (cur_file_ext == file_ext and file_name == cur_file_name) \
or (file_name != '*' and cur_file_name == file_name and (
file_ext == cur_file_ext or file_ext == '*')):
kwargs[ftp.voidcmd(f"MDTM {ff}")] = (item + '/' + ff)
except ValueError:
continue

if self._with_sorting_files:
return [File(path_to_file=val, read_mode=self.__read_mode, max_size=self.__max_size) for (_, val) in
Expand Down
12 changes: 8 additions & 4 deletions thingsboard_gateway/connectors/rest/rest_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def load_endpoints(self):

# configuring Attribute Request endpoints
if len(self.__config.get('attributeRequests', [])):
while self.__gateway.tb_client is None and not hasattr(self.__gateway.tb_client, 'client'):
self.__log.info('Waiting for ThingsBoard client to be initialized...')
sleep(1)

self.__attribute_type = {
'client': self.__gateway.tb_client.client.gw_request_client_attributes,
'shared': self.__gateway.tb_client.client.gw_request_shared_attributes
Expand Down Expand Up @@ -288,8 +292,8 @@ def server_side_rpc_handler(self, content):
params[key] = value

uplink_converter = self._default_uplink_converter
downlink_converter = self._default_downlink_converter
converted_data = downlink_converter.convert(params, content)
downlink_converter = self._default_downlink_converter(params, self.__log)
converted_data = downlink_converter.convert(config=params, data=content)

request_dict = {'config': {**params, **converted_data}, 'request': regular_request,
'converter': uplink_converter}
Expand All @@ -303,7 +307,7 @@ def server_side_rpc_handler(self, content):
for rpc_request in self.__rpc_requests:
if fullmatch(rpc_request["deviceNameFilter"], content["device"]) and \
fullmatch(rpc_request["methodFilter"], rpc_method):
converted_data = rpc_request["downlink_converter"].convert(rpc_request, content)
converted_data = rpc_request["downlink_converter"].convert(config=rpc_request, data=content)

request_dict = {"config": {**rpc_request,
**converted_data},
Expand Down Expand Up @@ -373,7 +377,7 @@ def __send_request(self, request_dict, converter_queue, logger, with_queue=True)
logger.debug(url)
security = None

if request_dict["config"]["security"]["type"].lower() == "basic":
if request_dict["config"].get('security', {}).get('type', 'anonymous').lower() == "basic":
security = HTTPBasicAuthRequest(request_dict["config"]["security"]["username"],
request_dict["config"]["security"]["password"])

Expand Down
8 changes: 4 additions & 4 deletions thingsboard_gateway/connectors/socket/socket_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def get_id(self):
@CustomCollectStatistics(start_stat_type='allBytesSentToDevices')
def __write_value_via_tcp(self, address, port, value):
try:
self.__connections[(address, int(port))].sendall(value)
self.__connections[(address, int(port))].sendall(bytes(value, encoding='utf-8'))
return 'ok'
except KeyError:
try:
Expand Down Expand Up @@ -452,13 +452,13 @@ def server_side_rpc_handler(self, content):
else:
self.__write_value_via_udp(params['address'], int(params['port']), params['value'])
except KeyError:
self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'),
self.__gateway.send_rpc_reply(device=device['deviceName'], req_id=content['data'].get('id'),
content='Not enough params')
except ValueError:
self.__gateway.send_rpc_reply(device=device, req_id=content['data']['id'],
self.__gateway.send_rpc_reply(device=device['deviceName'], req_id=content['data']['id'],
content='Param "port" have to be int type')
else:
self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'), content=str(result))
self.__gateway.send_rpc_reply(device=device['deviceName'], req_id=content['data'].get('id'), content=str(result))
else:
for rpc_config in device['serverSideRpc']:
for (key, value) in content['data'].items():
Expand Down
2 changes: 1 addition & 1 deletion thingsboard_gateway/connectors/socket/socket_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ def inner(*args, **kwargs):
except ValueError:
pass

func(*args, **kwargs)
return func(*args, **kwargs)

return inner
Loading