Skip to content

Commit

Permalink
Merge pull request #1632 from samson0v/master
Browse files Browse the repository at this point in the history
Added reserved get/set rpc methods
  • Loading branch information
imbeacon authored Jan 6, 2025
2 parents ed4c2db + 1758e56 commit 8428c63
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 96 deletions.
24 changes: 16 additions & 8 deletions tests/blackbox/connectors/modbus/test_modbus_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ def test_input_registers_reading_rpc_little(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')

def test_input_registers_reading_rpc_big(self):
(config, _) = self.change_connector_configuration(
Expand All @@ -150,7 +151,8 @@ def test_input_registers_reading_rpc_big(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')

def test_holding_registers_reading_rpc_little(self):
(config, _) = self.change_connector_configuration(
Expand All @@ -167,7 +169,8 @@ def test_holding_registers_reading_rpc_little(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')

def test_holding_registers_reading_rpc_big(self):
(config, _) = self.change_connector_configuration(
Expand All @@ -184,7 +187,8 @@ def test_holding_registers_reading_rpc_big(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')

def test_coils_reading_rpc_little(self):
(config, _) = self.change_connector_configuration(
Expand All @@ -201,7 +205,8 @@ def test_coils_reading_rpc_little(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')

def test_coils_reading_rpc_big(self):
(config, _) = self.change_connector_configuration(
Expand All @@ -218,7 +223,8 @@ def test_coils_reading_rpc_big(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')

def test_discrete_inputs_reading_rpc_little(self):
(config, _) = self.change_connector_configuration(
Expand All @@ -235,7 +241,8 @@ def test_discrete_inputs_reading_rpc_little(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')

def test_discrete_inputs_reading_rpc_big(self):
(config, _) = self.change_connector_configuration(
Expand All @@ -252,7 +259,8 @@ def test_discrete_inputs_reading_rpc_big(self):
"params": rpc,
"timeout": 5000
})
self.assertEqual(result, expected_values[rpc_tag], f'Value is not equal for the next rpc: {rpc_tag}')
self.assertEqual(result, {'result': expected_values[rpc_tag]},
f'Value is not equal for the next rpc: {rpc_tag}')


class ModbusRpcWritingTest(ModbusRpcTest):
Expand Down
66 changes: 47 additions & 19 deletions thingsboard_gateway/connectors/bacnet/bacnet_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,34 +323,62 @@ def server_side_rpc_handler(self, content):
self.__log.error('Method name not found in RPC request: %r', content)
return

# check if RPC method is reserved get/set
self.__check_and_process_reserved_rpc(rpc_method_name, device, content)

for rpc_config in device.server_side_rpc:
if rpc_config['method'] == rpc_method_name:
try:
object_id = Device.get_object_id(rpc_config)
result = {}
value = content.get('data', {}).get('params')
self.__create_task(self.__process_rpc_request,
(Address(device.details.address),
object_id,
rpc_config['propertyId']),
{'value': value, 'result': result})
self.__log.info('Processed RPC request with result: %r', result)
self.__gateway.send_rpc_reply(device=device.device_info.device_name,
req_id=content['data'].get('id'),
content=str(result))
except Exception as e:
self.__log.error('Error processing RPC request %s: %s', rpc_method_name, e)
self.__gateway.send_rpc_reply(device=device.device_info.device_name,
req_id=content['data'].get('id'),
content={rpc_method_name: str(e)},
success_sent=False)
self.__process_rpc(rpc_method_name, rpc_config, content, device)

def __process_rpc(self, rpc_method_name, rpc_config, content, device):
try:
object_id = Device.get_object_id(rpc_config)
result = {}
value = content.get('data', {}).get('params')
self.__create_task(self.__process_rpc_request,
(Address(device.details.address),
object_id,
rpc_config['propertyId']),
{'value': value, 'result': result})
self.__log.info('Processed RPC request with result: %r', result)
self.__gateway.send_rpc_reply(device=device.device_info.device_name,
req_id=content['data'].get('id'),
content={'result': str(result.get('response'))},)
except Exception as e:
self.__log.error(
'Error processing RPC request %s: %s', rpc_method_name, e)
self.__gateway.send_rpc_reply(device=device.device_info.device_name,
req_id=content['data'].get('id'),
content={'result': str(e)},
success_sent=False)

async def __process_rpc_request(self, address, object_id, property_id, value=None, result={}):
if value is None:
result['response'] = await self.__read_property(address, object_id, property_id)
else:
result['response'] = await self.__write_property(address, object_id, property_id, value)

def __check_and_process_reserved_rpc(self, rpc_method_name, device, content):
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue

if key and value:
params[key] = value

if rpc_method_name == 'get':
params['requestType'] = 'readProperty'
content['data'].pop('params')
elif rpc_method_name == 'set':
params['requestType'] = 'writeProperty'
content['data'].pop('params')
content['data']['params'] = params['value']

self.__process_rpc(rpc_method_name, params, content, device)

def get_id(self):
return self.__id

Expand Down
4 changes: 2 additions & 2 deletions thingsboard_gateway/connectors/ftp/ftp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,10 @@ def __process_rpc(self, method, value_expression):
def __send_rpc_reply(self, rpc_request, content, converted_data, success_sent):
if content.get('device') and fullmatch(rpc_request.get('deviceNameFilter', ''), content.get('device')):
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
success_sent=success_sent, content=converted_data)
success_sent=success_sent, content={'result': converted_data})
elif content.get('device'):
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
success_sent=success_sent, content=converted_data)
success_sent=success_sent, content={'result': converted_data})
else:
return converted_data

Expand Down
8 changes: 4 additions & 4 deletions thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ def server_side_rpc_handler(self, content):
content[DEVICE_SECTION_PARAMETER] = device.device_name
result = {}
self.__create_task(self.__process_rpc_request,
(device, content, content),
{'with_response': True, 'result': result})
(device, content, content),
{'with_response': True, 'result': result})
results.append(result['response'])

return results
Expand Down Expand Up @@ -589,7 +589,7 @@ async def __write_rpc_data(self, device, config, data):
@staticmethod
def __can_rpc_return_response(content):
return content.get(RPC_ID_PARAMETER) or (content.get(DATA_PARAMETER) is not None
and content[DATA_PARAMETER].get(RPC_ID_PARAMETER) is not None)
and content[DATA_PARAMETER].get(RPC_ID_PARAMETER) is not None)

def __send_rpc_response(self, content, response, with_response=False):
if isinstance(response, Exception) or isinstance(response, ExceptionResponse):
Expand All @@ -613,7 +613,7 @@ def __send_rpc_response(self, content, response, with_response=False):
if not with_response:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content=response)
content={'result': response})
else:
return {
'device': content[DEVICE_SECTION_PARAMETER],
Expand Down
2 changes: 2 additions & 0 deletions thingsboard_gateway/connectors/mqtt/mqtt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,8 @@ def server_side_rpc_handler(self, content):
if key and value:
params[key] = value

params['valueExpression'] = params.pop('value', None)

return self.__process_rpc_request(content, params)
else:
# Check whether one of my RPC handlers can handle this request
Expand Down
60 changes: 42 additions & 18 deletions thingsboard_gateway/connectors/ocpp/ocpp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,32 +315,56 @@ def server_side_rpc_handler(self, content):
self._log.error('Charge Point with name %s not found!', content['device'])
return

# check if RPC method is reserved get/set
self.__check_and_process_reserved_rpc(charge_point, content)

try:
for rpc in charge_point.config.get('serverSideRpc', []):
if rpc['methodRPC'] == content['data']['method']:
data_to_send_tags = TBUtility.get_values(rpc.get('valueExpression'), content['data'],
'params',
get_tag=True)
data_to_send_values = TBUtility.get_values(rpc.get('valueExpression'), content['data'],
'params',
expression_instead_none=True)
self.__process_rpc(charge_point, content, rpc)
except Exception as e:
self._log.exception(e)

data_to_send = rpc.get('valueExpression')
for (tag, value) in zip(data_to_send_tags, data_to_send_values):
data_to_send = data_to_send.replace('${' + tag + '}', dumps(value))
def __process_rpc(self, charge_point, content, rpc):
data_to_send_tags = TBUtility.get_values(rpc.get('valueExpression'), content['data'],
'params',
get_tag=True)
data_to_send_values = TBUtility.get_values(rpc.get('valueExpression'), content['data'],
'params',
expression_instead_none=True)

request = call.DataTransferPayload('1', data=data_to_send)
data_to_send = rpc.get('valueExpression')
for (tag, value) in zip(data_to_send_tags, data_to_send_values):
data_to_send = data_to_send.replace('${' + tag + '}', dumps(value))

task = self.__loop.create_task(self._send_request(charge_point, request))
while not task.done():
sleep(.2)
request = call.DataTransferPayload('1', data=data_to_send)

if rpc.get('withResponse', True):
self._gateway.send_rpc_reply(content["device"], content["data"]["id"], str(task.result()))
task = self.__loop.create_task(
self._send_request(charge_point, request))
while not task.done():
sleep(.2)

return
except Exception as e:
self._log.exception(e)
if rpc.get('withResponse', True):
self._gateway.send_rpc_reply(content["device"], content["data"]["id"], {
'result': str(task.result())})

return

def __check_and_process_reserved_rpc(self, charge_point, content):
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue

if key and value:
params[key] = value

if content['data']['method'] == 'set':
params['valueExpression'] = params.pop('value', None)

self.__process_rpc(charge_point, content, params)

def get_config(self):
return {'CS': self._central_system_config, 'CP': self._charge_points_config}
2 changes: 1 addition & 1 deletion thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ def server_side_rpc_handler(self, content):

self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data']['method']: result})
content={'result': result})
else:
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]

Expand Down
Loading

0 comments on commit 8428c63

Please sign in to comment.