-
_execute_command
Both ConnectionError and TimeoutError are catched and it reinitializes the cluster topology.
|
connection = get_connection(valkey_node, *args, **kwargs) |
|
if asking: |
|
connection.send_command("ASKING") |
|
valkey_node.parse_response(connection, "ASKING", **kwargs) |
|
asking = False |
|
response_from_cache = connection._get_from_local_cache(args) |
|
if response_from_cache is not None: |
|
return response_from_cache |
|
else: |
|
connection.send_command(*args) |
|
response = valkey_node.parse_response(connection, command, **kwargs) |
|
if command in self.cluster_response_callbacks: |
|
response = self.cluster_response_callbacks[command]( |
|
response, **kwargs |
|
) |
|
if keys: |
|
connection._add_to_local_cache(args, response, keys) |
|
return response |
|
except AuthenticationError: |
|
raise |
|
except (ConnectionError, TimeoutError) as e: |
|
# Connection retries are being handled in the node's |
|
# Retry object. |
|
# ConnectionError can also be raised if we couldn't get a |
|
# connection from the pool before timing out, so check that |
|
# this is an actual connection before attempting to disconnect. |
|
if connection is not None: |
|
connection.disconnect() |
|
|
|
# Remove the failed node from the startup nodes before we try |
|
# to reinitialize the cluster |
|
self.nodes_manager.startup_nodes.pop(target_node.name, None) |
-
_send_cluster_commands
It only catchs ConnectionError. Retry with the cluster topology update doesn't work for TimeoutError.
|
retry_attempts = self.cluster_error_retry_attempts |
|
while True: |
|
try: |
|
return self._send_cluster_commands( |
|
stack, |
|
raise_on_error=raise_on_error, |
|
allow_redirections=allow_redirections, |
|
) |
|
except (ClusterDownError, ConnectionError) as e: |
|
if retry_attempts > 0: |
|
# Try again with the new cluster setup. All other errors |
|
# should be raised. |
|
retry_attempts -= 1 |
|
pass |
|
else: |
|
raise e |
|
try: |
|
connection = get_connection(valkey_node, c.args) |
|
except ConnectionError: |
|
for n in nodes.values(): |
|
n.connection_pool.release(n.connection) |
I think ClusterPipeline.ERRORS_ALLOW_RETRY should be considered.
_execute_command
Both ConnectionError and TimeoutError are catched and it reinitializes the cluster topology.
valkey-py/valkey/cluster.py
Lines 1149 to 1180 in a4d180b
_send_cluster_commands
It only catchs ConnectionError. Retry with the cluster topology update doesn't work for TimeoutError.
valkey-py/valkey/cluster.py
Lines 2106 to 2121 in a4d180b
valkey-py/valkey/cluster.py
Lines 2177 to 2181 in a4d180b
I think ClusterPipeline.ERRORS_ALLOW_RETRY should be considered.