Skip to content
Open
1 change: 1 addition & 0 deletions clickhouse/changelog.d/24266.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Collect standard system table metrics per node via clusterAllReplicas in single endpoint mode so per-node counters no longer produce phantom failures (e.g. clickhouse.query.failed.count).
23 changes: 14 additions & 9 deletions clickhouse/datadog_checks/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .statement_samples import ClickhouseStatementSamples
from .statements import ClickhouseStatementMetrics
from .table_metrics import ClickhouseTableMetrics
from .utils import ErrorSanitizer
from .utils import ErrorSanitizer, cluster_aware_query

try:
import datadog_agent
Expand Down Expand Up @@ -289,14 +289,19 @@ def check(self, _):

def get_queries(self) -> list[dict]:
query_list = []
single = self._config.single_endpoint_mode

def pick(query: dict) -> dict:
"""In single endpoint mode, read all replicas and tag each row per node."""
return cluster_aware_query(query) if single else query

if self._config.use_legacy_queries:
query_list.extend(
[
queries.SystemMetrics,
queries.SystemEventsToDeprecate,
queries.SystemEvents,
queries.SystemAsynchronousMetrics,
pick(queries.SystemMetrics),
pick(queries.SystemEventsToDeprecate),
pick(queries.SystemEvents),
pick(queries.SystemAsynchronousMetrics),
queries.SystemParts,
queries.SystemReplicas,
queries.SystemDictionaries,
Expand All @@ -306,13 +311,13 @@ def get_queries(self) -> list[dict]:
if self._config.use_advanced_queries:
query_list.extend(
[
advanced_queries.SystemMetrics,
advanced_queries.SystemEvents,
advanced_queries.SystemAsynchronousMetrics,
pick(advanced_queries.SystemMetrics),
pick(advanced_queries.SystemEvents),
pick(advanced_queries.SystemAsynchronousMetrics),
]
)
if self.version_ge('21.3'):
query_list.append(advanced_queries.SystemErrors)
query_list.append(pick(advanced_queries.SystemErrors))

return query_list

Expand Down
22 changes: 22 additions & 0 deletions clickhouse/datadog_checks/clickhouse/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,27 @@ def compact_query(query):
return re.sub(r'\n\s+', ' ', query.strip())


# Tag added to per-node metrics when collecting from all replicas in single endpoint mode.
CLUSTER_NODE_TAG = 'clickhouse_node'


def cluster_aware_query(base: dict) -> dict:
"""Build a cluster-aware variant that reads all replicas and tags each row per node.

Derives the SELECT list and table from the base query, whose shape is always
``SELECT <cols> FROM system.<table>[ <trailing clause>]``.
"""
select, _, tail = base['query'].partition(' FROM system.')
table, sep, trailing = tail.partition(' ')
return {
'name': base['name'],
'query': (
f"{select}, hostName() AS {CLUSTER_NODE_TAG} "
f"FROM clusterAllReplicas('default', system.{table}){sep}{trailing}"
),
'columns': [*base['columns'], {'name': CLUSTER_NODE_TAG, 'type': 'tag'}],
}


def parse_version(version: str) -> list[int]:
return [int(v) for v in version.split('.')]
76 changes: 75 additions & 1 deletion clickhouse/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from clickhouse_connect.driver.exceptions import Error, OperationalError

from datadog_checks.base import ConfigurationError
from datadog_checks.clickhouse import ClickhouseCheck, advanced_queries
from datadog_checks.clickhouse import ClickhouseCheck, advanced_queries, queries
from datadog_checks.clickhouse.utils import cluster_aware_query

from .utils import ensure_csv_safe, parse_described_metrics, raise_error

Expand Down Expand Up @@ -381,3 +382,76 @@ def test_database_hostname_ignores_reported_hostname_override(reported_hostname,
# reported_hostname honors the override when configured, otherwise the resolved host
assert check.reported_hostname == expected_reported_hostname
mock_resolve.assert_called_with(BASE_INSTANCE['server'])


def test_cluster_aware_query_bulk_match_query():
"""The cluster-aware variant reads all replicas and tags system.events per node."""
variant = cluster_aware_query(advanced_queries.SystemEvents)

assert variant['query'] == (
"SELECT value, event, hostName() AS clickhouse_node FROM clusterAllReplicas('default', system.events)"
)
assert variant['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}
# The base query dict must not be mutated by building the variant.
assert advanced_queries.SystemEvents['query'] == 'SELECT value, event FROM system.events'
assert all(column['name'] != 'clickhouse_node' for column in advanced_queries.SystemEvents['columns'])


def test_cluster_aware_query_preserves_where_clause():
"""system.errors carries a WHERE clause that must survive in the cluster-aware variant."""
variant = cluster_aware_query(advanced_queries.SystemErrors)

assert variant['query'] == (
"SELECT value, name, code, remote, hostName() AS clickhouse_node "
"FROM clusterAllReplicas('default', system.errors) WHERE value > 0"
)
assert variant['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}


def test_cluster_aware_query_legacy_query():
"""The helper builds a cluster-aware variant for a legacy query too."""
variant = cluster_aware_query(queries.SystemMetrics)

assert variant['query'] == (
"SELECT value, metric, hostName() AS clickhouse_node FROM clusterAllReplicas('default', system.metrics)"
)
assert variant['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}
assert queries.SystemMetrics['query'] == 'SELECT value, metric FROM system.metrics'


@pytest.mark.parametrize('use_advanced_queries', [True, False])
def test_get_queries_tags_system_tables_per_node_in_single_endpoint_mode(instance, use_advanced_queries):
instance = {
**instance,
'single_endpoint_mode': True,
'use_advanced_queries': use_advanced_queries,
'use_legacy_queries': not use_advanced_queries,
}
check = ClickhouseCheck('clickhouse', {}, [instance])
check._server_version = '24.8'

cluster_aware = [q for q in check.get_queries() if 'clusterAllReplicas' in q['query']]

# system.events, system.metrics, system.asynchronous_metrics (+ system.errors / events-to-deprecate)
assert cluster_aware
for query in cluster_aware:
assert 'hostName() AS clickhouse_node' in query['query']
assert query['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}

# system.parts/replicas/dictionaries use GROUP BY and are intentionally left untouched here.
if not use_advanced_queries:
assert any(q is queries.SystemParts for q in check.get_queries())


@pytest.mark.parametrize('use_advanced_queries', [True, False])
def test_get_queries_uses_base_queries_for_direct_connection(instance, use_advanced_queries):
instance = {
**instance,
'single_endpoint_mode': False,
'use_advanced_queries': use_advanced_queries,
'use_legacy_queries': not use_advanced_queries,
}
check = ClickhouseCheck('clickhouse', {}, [instance])
check._server_version = '24.8'

assert all('clusterAllReplicas' not in q['query'] for q in check.get_queries())
Loading