Skip to content

Commit

Permalink
aggregation design
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Dec 19, 2024
1 parent 815d31d commit 83ba370
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 208 deletions.
239 changes: 38 additions & 201 deletions python/python/glide/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,29 @@ class PeriodicChecksStatus(Enum):
"""


class AdvancedBaseClientConfiguration:
"""
Represents the advanced configuration settings for a base Glide client.
Args:
connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete.
This applies both during initial client creation and any reconnections that may occur during request processing.
**Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline.
If the client cannot establish a connection within the specified duration, a timeout error will occur.
If not set, a default value will be used.
"""

def __init__(self, connection_timeout: Optional[int] = None):
self.connection_timeout = connection_timeout

def _create_a_protobuf_conn_request(
self, request: ConnectionRequest
) -> ConnectionRequest:
if self.connection_timeout:
request.connection_timeout = self.connection_timeout
return request


class BaseClientConfiguration:
def __init__(
self,
Expand All @@ -141,6 +164,7 @@ def __init__(
protocol: ProtocolVersion = ProtocolVersion.RESP3,
inflight_requests_limit: Optional[int] = None,
client_az: Optional[str] = None,
advanced_config: Optional[AdvancedBaseClientConfiguration] = None,
):
"""
Represents the configuration settings for a Glide client.
Expand Down Expand Up @@ -170,7 +194,6 @@ def __init__(
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
"""
self.addresses = addresses
self.use_tls = use_tls
Expand All @@ -181,6 +204,7 @@ def __init__(
self.protocol = protocol
self.inflight_requests_limit = inflight_requests_limit
self.client_az = client_az
self.advanced_config = advanced_config

if read_from == ReadFrom.AZ_AFFINITY and not client_az:
raise ValueError(
Expand Down Expand Up @@ -220,6 +244,8 @@ def _create_a_protobuf_conn_request(
request.inflight_requests_limit = self.inflight_requests_limit
if self.client_az:
request.client_az = self.client_az
if self.advanced_config:
self.advanced_config._create_a_protobuf_conn_request(request)

return request

Expand All @@ -232,25 +258,9 @@ def _get_pubsub_callback_and_context(
return None, None


class AdvancedBaseClientConfiguration:
"""
Represents the advanced configuration settings for a base Glide client.
Args:
connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete.
This applies both during initial client creation and any reconnections that may occur during request processing.
**Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline.
If the client cannot establish a connection within the specified duration, a timeout error will occur.
If not set, a default value will be used.
"""

class AdvancedGlideClientConfiguration(AdvancedBaseClientConfiguration):
def __init__(self, connection_timeout: Optional[int] = None):
self.connection_timeout = connection_timeout

def _create_a_protobuf_conn_request(self, request) -> ConnectionRequest:
if self.connection_timeout:
request.connection_timeout = self.connection_timeout
return request
super().__init__(connection_timeout)


class GlideClientConfiguration(BaseClientConfiguration):
Expand Down Expand Up @@ -286,7 +296,7 @@ class GlideClientConfiguration(BaseClientConfiguration):
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
advanced_config (Optional[AdvancedGlideClientConfiguration]) : Advanced configuration, see `AdvancedGlideClientConfiguration`.
"""

class PubSubChannelModes(IntEnum):
Expand Down Expand Up @@ -333,6 +343,7 @@ def __init__(
pubsub_subscriptions: Optional[PubSubSubscriptions] = None,
inflight_requests_limit: Optional[int] = None,
client_az: Optional[str] = None,
advanced_config: Optional[AdvancedGlideClientConfiguration] = None,
):
super().__init__(
addresses=addresses,
Expand All @@ -344,6 +355,7 @@ def __init__(
protocol=protocol,
inflight_requests_limit=inflight_requests_limit,
client_az=client_az,
advanced_config=advanced_config,
)
self.reconnect_strategy = reconnect_strategy
self.database_id = database_id
Expand Down Expand Up @@ -400,94 +412,9 @@ def _get_pubsub_callback_and_context(
return None, None


class AdvancedGlideClientConfiguration(
GlideClientConfiguration, AdvancedBaseClientConfiguration
):
"""
Represents the advanced configuration settings for a Standalone Glide client.
Args:
addresses (List[NodeAddress]): DNS Addresses and ports of known nodes in the cluster.
Only nodes whose addresses were provided will be used by the client.
For example:
[
{address:sample-address-0001.use1.cache.amazonaws.com, port:6379},
{address: sample-address-0002.use2.cache.amazonaws.com, port:6379}
].
use_tls (bool): True if communication with the cluster should use Transport Level Security.
credentials (ServerCredentials): Credentials for authentication process.
If none are set, the client will not authenticate itself with the server.
read_from (ReadFrom): If not set, `PRIMARY` will be used.
request_timeout (Optional[int]): The duration in milliseconds that the client should wait for a request to complete.
This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries.
If the specified timeout is exceeded for a pending request, it will result in a timeout error.
If not set, a default value will be used.
reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of
connection failures.
If not set, a default backoff strategy will be used.
database_id (Optional[int]): index of the logical database to connect to.
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment.
protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server.
pubsub_subscriptions (Optional[GlideClientConfiguration.PubSubSubscriptions]): Pubsub subscriptions to be used for the client.
Will be applied via SUBSCRIBE/PSUBSCRIBE commands during connection establishment.
inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed).
This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog.
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete.
This applies both during initial client creation and any reconnections that may occur during request processing.
**Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline.
If the client cannot establish a connection within the specified duration, a timeout error will occur.
If not set, a default value will be used.
"""

def __init__(
self,
addresses: List[NodeAddress],
use_tls: bool = False,
credentials: Optional[ServerCredentials] = None,
read_from: ReadFrom = ReadFrom.PRIMARY,
request_timeout: Optional[int] = None,
reconnect_strategy: Optional[BackoffStrategy] = None,
database_id: Optional[int] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
pubsub_subscriptions: Optional[
GlideClientConfiguration.PubSubSubscriptions
] = None,
inflight_requests_limit: Optional[int] = None,
client_az: Optional[str] = None,
connection_timeout: Optional[int] = None,
):
GlideClientConfiguration.__init__(
self,
addresses=addresses,
use_tls=use_tls,
credentials=credentials,
read_from=read_from,
request_timeout=request_timeout,
reconnect_strategy=reconnect_strategy,
database_id=database_id,
client_name=client_name,
protocol=protocol,
pubsub_subscriptions=pubsub_subscriptions,
inflight_requests_limit=inflight_requests_limit,
client_az=client_az,
)
AdvancedBaseClientConfiguration.__init__(self, connection_timeout)

def _create_a_protobuf_conn_request(
self, cluster_mode: bool = False
) -> ConnectionRequest:
request = GlideClientConfiguration._create_a_protobuf_conn_request(
self, cluster_mode
)
request = AdvancedBaseClientConfiguration._create_a_protobuf_conn_request(
self, request
)
return request
class AdvancedGlideClusterClientConfiguration(AdvancedBaseClientConfiguration):
def __init__(self, connection_timeout: Optional[int] = None):
super().__init__(connection_timeout)


class GlideClusterClientConfiguration(BaseClientConfiguration):
Expand Down Expand Up @@ -521,7 +448,7 @@ class GlideClusterClientConfiguration(BaseClientConfiguration):
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
advanced_config (Optional[AdvancedGlideClusterClientConfiguration]) : Advanced configuration, see `AdvancedGlideClusterClientConfiguration`.
Notes:
Expand Down Expand Up @@ -576,6 +503,7 @@ def __init__(
pubsub_subscriptions: Optional[PubSubSubscriptions] = None,
inflight_requests_limit: Optional[int] = None,
client_az: Optional[str] = None,
advanced_config: Optional[AdvancedGlideClusterClientConfiguration] = None,
):
super().__init__(
addresses=addresses,
Expand All @@ -587,6 +515,7 @@ def __init__(
protocol=protocol,
inflight_requests_limit=inflight_requests_limit,
client_az=client_az,
advanced_config=advanced_config,
)
self.periodic_checks = periodic_checks
self.pubsub_subscriptions = pubsub_subscriptions
Expand Down Expand Up @@ -636,95 +565,3 @@ def _get_pubsub_callback_and_context(
if self.pubsub_subscriptions:
return self.pubsub_subscriptions.callback, self.pubsub_subscriptions.context
return None, None


class AdvancedGlideClusterClientConfiguration(
GlideClusterClientConfiguration, AdvancedBaseClientConfiguration
):
"""
Represents the advanced configuration settings for a Cluster Glide client.
Args:
addresses (List[NodeAddress]): DNS Addresses and ports of known nodes in the cluster.
The list can be partial, as the client will attempt to map out the cluster and find all nodes.
For example:
[
{address:configuration-endpoint.use1.cache.amazonaws.com, port:6379}
].
use_tls (bool): True if communication with the cluster should use Transport Level Security.
credentials (ServerCredentials): Credentials for authentication process.
If none are set, the client will not authenticate itself with the server.
read_from (ReadFrom): If not set, `PRIMARY` will be used.
request_timeout (Optional[int]): The duration in milliseconds that the client should wait for a request to complete.
This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries.
If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not set, a default value will be used.
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment.
protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server.
periodic_checks (Union[PeriodicChecksStatus, PeriodicChecksManualInterval]): Configure the periodic topology checks.
These checks evaluate changes in the cluster's topology, triggering a slot refresh when detected.
Periodic checks ensure a quick and efficient process by querying a limited number of nodes.
Defaults to PeriodicChecksStatus.ENABLED_DEFAULT_CONFIGS.
pubsub_subscriptions (Optional[GlideClusterClientConfiguration.PubSubSubscriptions]): Pubsub subscriptions to be used for the client.
Will be applied via SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE commands during connection establishment.
inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed).
This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog.
If not set, a default value will be used.
client_az (Optional[str]): Availability Zone of the client.
If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
connection_timeout (Optional[int]):The duration in milliseconds to wait for a TCP/TLS connection to complete.
This applies both during initial client creation and any reconnections that may occur during request processing.
**Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline.
If the client cannot establish a connection within the specified duration, a timeout error will occur.
If not set, a default value will be used.
Notes:
Currently, the reconnection strategy in cluster mode is not configurable, and exponential backoff
with fixed values is used.
"""

def __init__(
self,
addresses: List[NodeAddress],
use_tls: bool = False,
credentials: Optional[ServerCredentials] = None,
read_from: ReadFrom = ReadFrom.PRIMARY,
request_timeout: Optional[int] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
periodic_checks: Union[
PeriodicChecksStatus, PeriodicChecksManualInterval
] = PeriodicChecksStatus.ENABLED_DEFAULT_CONFIGS,
pubsub_subscriptions: Optional[
GlideClusterClientConfiguration.PubSubSubscriptions
] = None,
inflight_requests_limit: Optional[int] = None,
client_az: Optional[str] = None,
connection_timeout: Optional[int] = None,
):
GlideClusterClientConfiguration.__init__(
self,
addresses=addresses,
use_tls=use_tls,
credentials=credentials,
read_from=read_from,
request_timeout=request_timeout,
client_name=client_name,
protocol=protocol,
inflight_requests_limit=inflight_requests_limit,
client_az=client_az,
periodic_checks=periodic_checks,
pubsub_subscriptions=pubsub_subscriptions,
)
AdvancedBaseClientConfiguration.__init__(self, connection_timeout)

def _create_a_protobuf_conn_request(
self, cluster_mode: bool = False
) -> ConnectionRequest:
request = GlideClusterClientConfiguration._create_a_protobuf_conn_request(
self, cluster_mode
)
request = AdvancedBaseClientConfiguration._create_a_protobuf_conn_request(
self, request
)
return request
7 changes: 4 additions & 3 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,23 +263,23 @@ async def create_client(
assert database_id == 0
k = min(3, len(pytest.valkey_cluster.nodes_addr))
seed_nodes = random.sample(pytest.valkey_cluster.nodes_addr, k=k)
cluster_config = AdvancedGlideClusterClientConfiguration(
cluster_config = GlideClusterClientConfiguration(
addresses=seed_nodes if addresses is None else addresses,
use_tls=use_tls,
credentials=credentials,
client_name=client_name,
protocol=protocol,
request_timeout=timeout,
connection_timeout=connection_timeout,
pubsub_subscriptions=cluster_mode_pubsub,
inflight_requests_limit=inflight_requests_limit,
read_from=read_from,
client_az=client_az,
advanced_config=AdvancedGlideClusterClientConfiguration(connection_timeout),
)
return await GlideClusterClient.create(cluster_config)
else:
assert type(pytest.standalone_cluster) is ValkeyCluster
config = AdvancedGlideClientConfiguration(
config = GlideClientConfiguration(
addresses=(
pytest.standalone_cluster.nodes_addr if addresses is None else addresses
),
Expand All @@ -294,6 +294,7 @@ async def create_client(
inflight_requests_limit=inflight_requests_limit,
read_from=read_from,
client_az=client_az,
advanced_config=AdvancedGlideClientConfiguration(connection_timeout),
)
return await GlideClient.create(config)

Expand Down
1 change: 1 addition & 0 deletions python/python/tests/test_api_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _get_export_rename_map():
"FtSearchKeywords", # ClassDef
"FtAggregateKeywords", # ClassDef
"FtProfileKeywords", # ClassDef
"AdvancedBaseClientConfiguration", # ClassDef
]


Expand Down
Loading

0 comments on commit 83ba370

Please sign in to comment.