diff --git a/python/python/glide/config.py b/python/python/glide/config.py index adf5ed77ac..f5d651ddff 100644 --- a/python/python/glide/config.py +++ b/python/python/glide/config.py @@ -129,6 +129,29 @@ class PeriodicChecksStatus(Enum): """ +class AdvancedBaseClientConfiguration: + """ + Represents the advanced configuration settings for a base Glide client. + + Args: + connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete. + This applies both during initial client creation and any reconnections that may occur during request processing. + **Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline. + If the client cannot establish a connection within the specified duration, a timeout error will occur. + If not set, a default value will be used. + """ + + def __init__(self, connection_timeout: Optional[int] = None): + self.connection_timeout = connection_timeout + + def _create_a_protobuf_conn_request( + self, request: ConnectionRequest + ) -> ConnectionRequest: + if self.connection_timeout: + request.connection_timeout = self.connection_timeout + return request + + class BaseClientConfiguration: def __init__( self, @@ -141,6 +164,7 @@ def __init__( protocol: ProtocolVersion = ProtocolVersion.RESP3, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedBaseClientConfiguration] = None, ): """ Represents the configuration settings for a Glide client. @@ -170,7 +194,6 @@ def __init__( If not set, a default value will be used. client_az (Optional[str]): Availability Zone of the client. If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. - """ self.addresses = addresses self.use_tls = use_tls @@ -181,6 +204,7 @@ def __init__( self.protocol = protocol self.inflight_requests_limit = inflight_requests_limit self.client_az = client_az + self.advanced_config = advanced_config if read_from == ReadFrom.AZ_AFFINITY and not client_az: raise ValueError( @@ -220,6 +244,8 @@ def _create_a_protobuf_conn_request( request.inflight_requests_limit = self.inflight_requests_limit if self.client_az: request.client_az = self.client_az + if self.advanced_config: + self.advanced_config._create_a_protobuf_conn_request(request) return request @@ -232,25 +258,9 @@ def _get_pubsub_callback_and_context( return None, None -class AdvancedBaseClientConfiguration: - """ - Represents the advanced configuration settings for a base Glide client. - - Args: - connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete. - This applies both during initial client creation and any reconnections that may occur during request processing. - **Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline. - If the client cannot establish a connection within the specified duration, a timeout error will occur. - If not set, a default value will be used. - """ - +class AdvancedGlideClientConfiguration(AdvancedBaseClientConfiguration): def __init__(self, connection_timeout: Optional[int] = None): - self.connection_timeout = connection_timeout - - def _create_a_protobuf_conn_request(self, request) -> ConnectionRequest: - if self.connection_timeout: - request.connection_timeout = self.connection_timeout - return request + super().__init__(connection_timeout) class GlideClientConfiguration(BaseClientConfiguration): @@ -286,7 +296,7 @@ class GlideClientConfiguration(BaseClientConfiguration): If not set, a default value will be used. client_az (Optional[str]): Availability Zone of the client. If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. - + advanced_config (Optional[AdvancedGlideClientConfiguration]) : Advanced configuration, see `AdvancedGlideClientConfiguration`. """ class PubSubChannelModes(IntEnum): @@ -333,6 +343,7 @@ def __init__( pubsub_subscriptions: Optional[PubSubSubscriptions] = None, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedGlideClientConfiguration] = None, ): super().__init__( addresses=addresses, @@ -344,6 +355,7 @@ def __init__( protocol=protocol, inflight_requests_limit=inflight_requests_limit, client_az=client_az, + advanced_config=advanced_config, ) self.reconnect_strategy = reconnect_strategy self.database_id = database_id @@ -400,94 +412,9 @@ def _get_pubsub_callback_and_context( return None, None -class AdvancedGlideClientConfiguration( - GlideClientConfiguration, AdvancedBaseClientConfiguration -): - """ - Represents the advanced configuration settings for a Standalone Glide client. - - Args: - addresses (List[NodeAddress]): DNS Addresses and ports of known nodes in the cluster. - Only nodes whose addresses were provided will be used by the client. - For example: - [ - {address:sample-address-0001.use1.cache.amazonaws.com, port:6379}, - {address: sample-address-0002.use2.cache.amazonaws.com, port:6379} - ]. - use_tls (bool): True if communication with the cluster should use Transport Level Security. - credentials (ServerCredentials): Credentials for authentication process. - If none are set, the client will not authenticate itself with the server. - read_from (ReadFrom): If not set, `PRIMARY` will be used. - request_timeout (Optional[int]): The duration in milliseconds that the client should wait for a request to complete. - This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries. - If the specified timeout is exceeded for a pending request, it will result in a timeout error. - If not set, a default value will be used. - reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of - connection failures. - If not set, a default backoff strategy will be used. - database_id (Optional[int]): index of the logical database to connect to. - client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment. - protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server. - pubsub_subscriptions (Optional[GlideClientConfiguration.PubSubSubscriptions]): Pubsub subscriptions to be used for the client. - Will be applied via SUBSCRIBE/PSUBSCRIBE commands during connection establishment. - inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). - This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. - If not set, a default value will be used. - client_az (Optional[str]): Availability Zone of the client. - If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. - connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete. - This applies both during initial client creation and any reconnections that may occur during request processing. - **Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline. - If the client cannot establish a connection within the specified duration, a timeout error will occur. - If not set, a default value will be used. - - """ - - def __init__( - self, - addresses: List[NodeAddress], - use_tls: bool = False, - credentials: Optional[ServerCredentials] = None, - read_from: ReadFrom = ReadFrom.PRIMARY, - request_timeout: Optional[int] = None, - reconnect_strategy: Optional[BackoffStrategy] = None, - database_id: Optional[int] = None, - client_name: Optional[str] = None, - protocol: ProtocolVersion = ProtocolVersion.RESP3, - pubsub_subscriptions: Optional[ - GlideClientConfiguration.PubSubSubscriptions - ] = None, - inflight_requests_limit: Optional[int] = None, - client_az: Optional[str] = None, - connection_timeout: Optional[int] = None, - ): - GlideClientConfiguration.__init__( - self, - addresses=addresses, - use_tls=use_tls, - credentials=credentials, - read_from=read_from, - request_timeout=request_timeout, - reconnect_strategy=reconnect_strategy, - database_id=database_id, - client_name=client_name, - protocol=protocol, - pubsub_subscriptions=pubsub_subscriptions, - inflight_requests_limit=inflight_requests_limit, - client_az=client_az, - ) - AdvancedBaseClientConfiguration.__init__(self, connection_timeout) - - def _create_a_protobuf_conn_request( - self, cluster_mode: bool = False - ) -> ConnectionRequest: - request = GlideClientConfiguration._create_a_protobuf_conn_request( - self, cluster_mode - ) - request = AdvancedBaseClientConfiguration._create_a_protobuf_conn_request( - self, request - ) - return request +class AdvancedGlideClusterClientConfiguration(AdvancedBaseClientConfiguration): + def __init__(self, connection_timeout: Optional[int] = None): + super().__init__(connection_timeout) class GlideClusterClientConfiguration(BaseClientConfiguration): @@ -521,7 +448,7 @@ class GlideClusterClientConfiguration(BaseClientConfiguration): If not set, a default value will be used. client_az (Optional[str]): Availability Zone of the client. If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. - + advanced_config (Optional[AdvancedGlideClusterClientConfiguration]) : Advanced configuration, see `AdvancedGlideClusterClientConfiguration`. Notes: @@ -576,6 +503,7 @@ def __init__( pubsub_subscriptions: Optional[PubSubSubscriptions] = None, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedGlideClusterClientConfiguration] = None, ): super().__init__( addresses=addresses, @@ -587,6 +515,7 @@ def __init__( protocol=protocol, inflight_requests_limit=inflight_requests_limit, client_az=client_az, + advanced_config=advanced_config, ) self.periodic_checks = periodic_checks self.pubsub_subscriptions = pubsub_subscriptions @@ -636,95 +565,3 @@ def _get_pubsub_callback_and_context( if self.pubsub_subscriptions: return self.pubsub_subscriptions.callback, self.pubsub_subscriptions.context return None, None - - -class AdvancedGlideClusterClientConfiguration( - GlideClusterClientConfiguration, AdvancedBaseClientConfiguration -): - """ - Represents the advanced configuration settings for a Cluster Glide client. - - Args: - addresses (List[NodeAddress]): DNS Addresses and ports of known nodes in the cluster. - The list can be partial, as the client will attempt to map out the cluster and find all nodes. - For example: - [ - {address:configuration-endpoint.use1.cache.amazonaws.com, port:6379} - ]. - use_tls (bool): True if communication with the cluster should use Transport Level Security. - credentials (ServerCredentials): Credentials for authentication process. - If none are set, the client will not authenticate itself with the server. - read_from (ReadFrom): If not set, `PRIMARY` will be used. - request_timeout (Optional[int]): The duration in milliseconds that the client should wait for a request to complete. - This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries. - If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not set, a default value will be used. - client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment. - protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server. - periodic_checks (Union[PeriodicChecksStatus, PeriodicChecksManualInterval]): Configure the periodic topology checks. - These checks evaluate changes in the cluster's topology, triggering a slot refresh when detected. - Periodic checks ensure a quick and efficient process by querying a limited number of nodes. - Defaults to PeriodicChecksStatus.ENABLED_DEFAULT_CONFIGS. - pubsub_subscriptions (Optional[GlideClusterClientConfiguration.PubSubSubscriptions]): Pubsub subscriptions to be used for the client. - Will be applied via SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE commands during connection establishment. - inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). - This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. - If not set, a default value will be used. - client_az (Optional[str]): Availability Zone of the client. - If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. - connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete. - This applies both during initial client creation and any reconnections that may occur during request processing. - **Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline. - If the client cannot establish a connection within the specified duration, a timeout error will occur. - If not set, a default value will be used. - - - Notes: - Currently, the reconnection strategy in cluster mode is not configurable, and exponential backoff - with fixed values is used. - """ - - def __init__( - self, - addresses: List[NodeAddress], - use_tls: bool = False, - credentials: Optional[ServerCredentials] = None, - read_from: ReadFrom = ReadFrom.PRIMARY, - request_timeout: Optional[int] = None, - client_name: Optional[str] = None, - protocol: ProtocolVersion = ProtocolVersion.RESP3, - periodic_checks: Union[ - PeriodicChecksStatus, PeriodicChecksManualInterval - ] = PeriodicChecksStatus.ENABLED_DEFAULT_CONFIGS, - pubsub_subscriptions: Optional[ - GlideClusterClientConfiguration.PubSubSubscriptions - ] = None, - inflight_requests_limit: Optional[int] = None, - client_az: Optional[str] = None, - connection_timeout: Optional[int] = None, - ): - GlideClusterClientConfiguration.__init__( - self, - addresses=addresses, - use_tls=use_tls, - credentials=credentials, - read_from=read_from, - request_timeout=request_timeout, - client_name=client_name, - protocol=protocol, - inflight_requests_limit=inflight_requests_limit, - client_az=client_az, - periodic_checks=periodic_checks, - pubsub_subscriptions=pubsub_subscriptions, - ) - AdvancedBaseClientConfiguration.__init__(self, connection_timeout) - - def _create_a_protobuf_conn_request( - self, cluster_mode: bool = False - ) -> ConnectionRequest: - request = GlideClusterClientConfiguration._create_a_protobuf_conn_request( - self, cluster_mode - ) - request = AdvancedBaseClientConfiguration._create_a_protobuf_conn_request( - self, request - ) - return request diff --git a/python/python/tests/conftest.py b/python/python/tests/conftest.py index bc8e7e2529..1cab4d96d6 100644 --- a/python/python/tests/conftest.py +++ b/python/python/tests/conftest.py @@ -263,23 +263,23 @@ async def create_client( assert database_id == 0 k = min(3, len(pytest.valkey_cluster.nodes_addr)) seed_nodes = random.sample(pytest.valkey_cluster.nodes_addr, k=k) - cluster_config = AdvancedGlideClusterClientConfiguration( + cluster_config = GlideClusterClientConfiguration( addresses=seed_nodes if addresses is None else addresses, use_tls=use_tls, credentials=credentials, client_name=client_name, protocol=protocol, request_timeout=timeout, - connection_timeout=connection_timeout, pubsub_subscriptions=cluster_mode_pubsub, inflight_requests_limit=inflight_requests_limit, read_from=read_from, client_az=client_az, + advanced_config=AdvancedGlideClusterClientConfiguration(connection_timeout), ) return await GlideClusterClient.create(cluster_config) else: assert type(pytest.standalone_cluster) is ValkeyCluster - config = AdvancedGlideClientConfiguration( + config = GlideClientConfiguration( addresses=( pytest.standalone_cluster.nodes_addr if addresses is None else addresses ), @@ -294,6 +294,7 @@ async def create_client( inflight_requests_limit=inflight_requests_limit, read_from=read_from, client_az=client_az, + advanced_config=AdvancedGlideClientConfiguration(connection_timeout), ) return await GlideClient.create(config) diff --git a/python/python/tests/test_api_export.py b/python/python/tests/test_api_export.py index 996a8821a3..e169950da0 100644 --- a/python/python/tests/test_api_export.py +++ b/python/python/tests/test_api_export.py @@ -55,6 +55,7 @@ def _get_export_rename_map(): "FtSearchKeywords", # ClassDef "FtAggregateKeywords", # ClassDef "FtProfileKeywords", # ClassDef + "AdvancedBaseClientConfiguration", # ClassDef ] diff --git a/python/python/tests/test_config.py b/python/python/tests/test_config.py index f4f930257c..2476d8ec0f 100644 --- a/python/python/tests/test_config.py +++ b/python/python/tests/test_config.py @@ -77,18 +77,18 @@ def test_convert_config_with_azaffinity_to_protobuf(): def test_connection_timeout_in_protobuf_request(): connection_timeout = 5000 # in milliseconds - config = AdvancedGlideClientConfiguration( + config = GlideClientConfiguration( [NodeAddress("127.0.0.1")], - connection_timeout=connection_timeout, + advanced_config=AdvancedGlideClientConfiguration(connection_timeout), ) request = config._create_a_protobuf_conn_request() assert isinstance(request, ConnectionRequest) assert request.connection_timeout == connection_timeout - config = AdvancedGlideClusterClientConfiguration( + config = GlideClusterClientConfiguration( [NodeAddress("127.0.0.1")], - connection_timeout=connection_timeout, + advanced_config=AdvancedGlideClusterClientConfiguration(connection_timeout), ) request = config._create_a_protobuf_conn_request(cluster_mode=True)