Skip to content

Commit

Permalink
Reduced count of requested attributes, on the gateway initialization,…
Browse files Browse the repository at this point in the history
… to avoid a lot of unnecessary requests
  • Loading branch information
imbeacon committed Feb 20, 2024
1 parent 9dda62b commit e2206d8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 54 deletions.
3 changes: 3 additions & 0 deletions thingsboard_gateway/gateway/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@
RPC_METHOD_PARAMETER = "method"
RPC_PARAMS_PARAMETER = "params"
DATA_PARAMETER = "data"

# Attribute constants
ATTRIBUTES_FOR_REQUEST = ["RemoteLoggingLevel", "general_configuration", "storage_configuration", "grpc_configuration", "logs_configuration", "active_connectors"]
100 changes: 46 additions & 54 deletions thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME, RENAMING_PARAMETER, CONNECTOR_NAME_PARAMETER, DEVICE_TYPE_PARAMETER, \
CONNECTOR_ID_PARAMETER
CONNECTOR_ID_PARAMETER, ATTRIBUTES_FOR_REQUEST
from thingsboard_gateway.gateway.device_filter import DeviceFilter
from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector
from thingsboard_gateway.gateway.shell.proxy import AutoProxy
Expand Down Expand Up @@ -191,8 +191,8 @@ def __init__(self, config_file=None):

self._config_dir = path.dirname(path.abspath(config_file)) + path.sep
if config_file is None:
config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.json'.replace('/',
path.sep)
config_file = (path.dirname(path.dirname(path.abspath(__file__))) +
'/config/tb_gateway.json'.replace('/', path.sep))

logging_error = None
try:
Expand Down Expand Up @@ -468,8 +468,8 @@ def _watchers(self):
if result == 256:
log.warning("Error on RPC command: 256. Permission denied.")

if (
self.__rpc_requests_in_progress or not self.__rpc_register_queue.empty()) and self.tb_client.is_connected():
if ((self.__rpc_requests_in_progress or not self.__rpc_register_queue.empty())
and self.tb_client.is_connected()):
new_rpc_request_in_progress = {}
if self.__rpc_requests_in_progress:
for rpc_in_progress, data in self.__rpc_requests_in_progress.items():
Expand All @@ -493,21 +493,21 @@ def _watchers(self):
log.exception(e)
break

if not self.__request_config_after_connect and self.tb_client.is_connected() and not self.tb_client.client.get_subscriptions_in_progress():
if (not self.__request_config_after_connect and self.tb_client.is_connected()
and not self.tb_client.client.get_subscriptions_in_progress()):
self.__request_config_after_connect = True
self.__check_shared_attributes()
self._check_shared_attributes()

if cur_time - gateway_statistic_send > self.__statistics[
'statsSendPeriodInSeconds'] * 1000 and self.tb_client.is_connected():
if (cur_time - gateway_statistic_send > self.__statistics['statsSendPeriodInSeconds'] * 1000
and self.tb_client.is_connected()):
summary_messages = self.__form_statistics()
# with self.__lock:
self.tb_client.client.send_telemetry(summary_messages)
gateway_statistic_send = time() * 1000
# self.__check_shared_attributes()

if cur_time - connectors_configuration_check_time > self.__config["thingsboard"].get(
"checkConnectorsConfigurationInSeconds", 60) * 1000 and not (
self.__remote_configurator is not None and self.__remote_configurator.in_process):
if (cur_time - connectors_configuration_check_time > self.__config["thingsboard"].get("checkConnectorsConfigurationInSeconds", 60) * 1000
and not (self.__remote_configurator is not None and self.__remote_configurator.in_process)):
self.check_connector_configuration_updates()
connectors_configuration_check_time = time() * 1000

Expand Down Expand Up @@ -561,7 +561,7 @@ def __init_remote_configuration(self, force=False):
try:
self.__remote_configurator = RemoteConfigurator(self, self.__config)
if self.tb_client.is_connected() and not self.tb_client.client.get_subscriptions_in_progress():
self.__check_shared_attributes()
self._check_shared_attributes()
except Exception as e:
log.exception(e)
if self.__remote_configurator is not None:
Expand Down Expand Up @@ -660,7 +660,8 @@ def __process_renamed_gateway_devices(self, renamed_device: dict):
self.__load_persistent_devices()
log.debug("Current renamed_devices dict: %s", self.__renamed_devices)
else:
log.debug("Received renamed device notification %r, but device renaming handle is disabled", renamed_device)
log.debug("Received renamed device notification %r, but device renaming handle is disabled",
renamed_device)

def __process_remote_configuration(self, new_configuration):
if new_configuration is not None and self.__remote_configurator is not None:
Expand All @@ -687,8 +688,10 @@ def request_device_attributes(self, device_name, shared_keys, client_keys, callb
if shared_keys is not None:
self.tb_client.client.gw_request_shared_attributes(device_name, shared_keys, callback)

def __check_shared_attributes(self):
self.tb_client.client.request_attributes(callback=self._attributes_parse)
def _check_shared_attributes(self, shared_keys=None, client_keys=None):
if shared_keys is None:
shared_keys = ATTRIBUTES_FOR_REQUEST
self.tb_client.client.request_attributes(callback=self._attributes_parse, shared_keys=shared_keys, client_keys=client_keys)

def __register_connector(self, session_id, connector_key):
if (self.__grpc_connectors.get(connector_key) is not None
Expand All @@ -706,11 +709,12 @@ def __register_connector(self, session_id, connector_key):
log.error("[%r] GRPC connector with key: %s - already registered!", session_id, connector_key)
else:
self.__grpc_manager.registration_finished(Status.NOT_FOUND, session_id, None)
log.error("[%r] GRPC configuration for connector with key: %s - not found", session_id, connector_key)
log.error("[%r] GRPC configuration for connector with key: %s - not found", session_id,
connector_key)

def __unregister_connector(self, session_id, connector_key):
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key][
'id'] in self.available_connectors_by_id:
if (self.__grpc_connectors.get(connector_key) is not None
and self.__grpc_connectors[connector_key]['id'] in self.available_connectors_by_id):
connector_id = self.__grpc_connectors[connector_key]['id']
target_connector: GrpcConnector = self.available_connectors_by_id.pop(connector_id)
self.__grpc_manager.unregister(Status.SUCCESS, session_id, target_connector)
Expand All @@ -721,8 +725,7 @@ def __unregister_connector(self, session_id, connector_key):
log.error("[%r] GRPC connector with key: %s - is not registered!", session_id, connector_key)
else:
self.__grpc_manager.unregister(Status.FAILURE, session_id, None)
log.error(
"[%r] GRPC configuration for connector with key: %s - not found in configuration and not registered",
log.error("[%r] GRPC configuration for connector with key: %s - not found and not registered",
session_id, connector_key)

@staticmethod
Expand Down Expand Up @@ -1087,8 +1090,8 @@ def __send_data_pack_to_storage(self, data, connector_name, connector_id=None):
data["deviceName"], connector_name)

def check_size(self, devices_data_in_event_pack):
if self.__get_data_size(devices_data_in_event_pack) >= self.__config["thingsboard"].get("maxPayloadSizeBytes",
400):
if (self.__get_data_size(devices_data_in_event_pack)
>= self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)):
self.__send_data(devices_data_in_event_pack)
for device in devices_data_in_event_pack:
devices_data_in_event_pack[device]["telemetry"] = []
Expand All @@ -1097,7 +1100,8 @@ def check_size(self, devices_data_in_event_pack):
def __read_data_from_storage(self):
devices_data_in_event_pack = {}
log.debug("Send data Thread has been started successfully.")
log.debug("Maximal size of the client message queue is: %r", self.tb_client.client._client._max_queued_messages)
log.debug("Maximal size of the client message queue is: %r",
self.tb_client.client._client._max_queued_messages)

while not self.stopped:
try:
Expand All @@ -1122,22 +1126,18 @@ def __read_data_from_storage(self):
if isinstance(current_event["telemetry"], list):
for item in current_event["telemetry"]:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(
item)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(item)
else:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(
current_event["telemetry"])
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(current_event["telemetry"])
if current_event.get("attributes"):
if isinstance(current_event["attributes"], list):
for item in current_event["attributes"]:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(
item.items())
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(item.items())
else:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(
current_event["attributes"].items())
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(current_event["attributes"].items())
if devices_data_in_event_pack:
if not self.tb_client.is_connected():
continue
Expand All @@ -1160,8 +1160,9 @@ def __read_data_from_storage(self):
range(min(self.__min_pack_size_to_send, self._published_events.qsize()))]
for event in events:
try:
if self.tb_client.is_connected() and (
self.__remote_configurator is None or not self.__remote_configurator.in_process):
if (self.tb_client.is_connected()
and (self.__remote_configurator is None
or not self.__remote_configurator.in_process)):
if self.tb_client.client.quality_of_service == 1:
success = event.get() == event.TB_ERR_SUCCESS
else:
Expand Down Expand Up @@ -1236,11 +1237,9 @@ def _rpc_request_handler(self, request_id, content):
log.debug("Connector \"%s\" for RPC request \"%s\" found", module, content["method"])
for connector_name in self.available_connectors_by_name:
if self.available_connectors_by_name[connector_name]._connector_type == module:
log.debug("Sending command RPC %s to connector %s", content["method"],
connector_name)
log.debug("Sending command RPC %s to connector %s", content["method"], connector_name)
content['id'] = request_id
result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(
content)
result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(content)
elif module == 'gateway' or module in self.__remote_shell.shell_commands:
result = self.__rpc_gateway_processing(request_id, content)
else:
Expand All @@ -1267,15 +1266,13 @@ def __rpc_gateway_processing(self, request_id, content):
method_to_call = content["method"].replace("gateway_", "")
result = None
if self.__remote_shell is not None:
method_function = self.__remote_shell.shell_commands.get(method_to_call,
self.__gateway_rpc_methods.get(method_to_call))
method_function = self.__remote_shell.shell_commands.get(method_to_call, self.__gateway_rpc_methods.get(method_to_call))
else:
log.info("Remote shell is disabled.")
method_function = self.__gateway_rpc_methods.get(method_to_call)
if method_function is None and method_to_call in self.__rpc_scheduled_methods_functions:
seconds_to_restart = arguments * 1000 if arguments and arguments != '{}' else 0
self.__scheduled_rpc_calls.append(
[time() * 1000 + seconds_to_restart, self.__rpc_scheduled_methods_functions[method_to_call]])
self.__scheduled_rpc_calls.append([time() * 1000 + seconds_to_restart, self.__rpc_scheduled_methods_functions[method_to_call]])
log.info("Gateway %s scheduled in %i seconds", method_to_call, seconds_to_restart / 1000)
result = {"success": True}
elif method_function is None:
Expand Down Expand Up @@ -1313,13 +1310,9 @@ def __rpc_update(self, *args):

def __rpc_version(self, *args):
try:
result = {"resp": self.__updater.get_version(),
"code": 200,
}
result = {"resp": self.__updater.get_version(), "code": 200}
except Exception as e:
result = {"error": str(e),
"code": 500
}
result = {"error": str(e), "code": 500}
return result

def is_rpc_in_progress(self, topic):
Expand Down Expand Up @@ -1353,8 +1346,7 @@ def __send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=
if success_sent:
rpc_response["success"] = True
if device is not None and success_sent is not None:
self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response),
quality_of_service=quality_of_service)
self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response), quality_of_service=quality_of_service)
elif device is not None and req_id is not None and content is not None:
self.tb_client.client.gw_send_rpc_reply(device, req_id, content, quality_of_service=quality_of_service)
elif device is None and success_sent is not None:
Expand Down Expand Up @@ -1391,10 +1383,10 @@ def __form_statistics(self):
for connector in self.available_connectors_by_name:
connector_camel_case = connector.replace(' ', '')
telemetry = {
(connector_camel_case + ' EventsProduced').replace(' ', ''): self.available_connectors_by_name[
connector].statistics.get('MessagesReceived', 0),
(connector_camel_case + ' EventsSent').replace(' ', ''): self.available_connectors_by_name[
connector].statistics.get('MessagesSent', 0)
(connector_camel_case + ' EventsProduced').replace(' ', ''):
self.available_connectors_by_name[connector].statistics.get('MessagesReceived', 0),
(connector_camel_case + ' EventsSent').replace(' ', ''):
self.available_connectors_by_name[connector].statistics.get('MessagesSent', 0)
}
summary_messages['eventsProduced'] += telemetry[
str(connector_camel_case + ' EventsProduced').replace(' ', '')]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ def _handle_logs_configuration_update(self, config):
def _handle_active_connectors_update(self, config):
LOG.debug('Processing active connectors configuration update...')

for connector_name in config:
if self._gateway.connectors_configs.get(connector_name) is None:
self._gateway._check_shared_attributes(shared_keys=[connector_name])

has_changed = False
for_deletion = []
for active_connector_name in self._gateway.available_connectors_by_name:
Expand Down

0 comments on commit e2206d8

Please sign in to comment.