Skip to content

Commit 26e7066

Browse files
adelapenadriftx
authored andcommitted
CNDB-16123: Add slow query logger execution info to non-SAI read commands (#2147)
Add information about fetched/returned/deleted partitions and rows to the logging reports for non-SAI slow queries. There is a new system property named `cassandra.monitoring_execution_info_enabled` to disable this feature, which is enabled by default.
1 parent c00cfa8 commit 26e7066

File tree

9 files changed

+577
-78
lines changed

9 files changed

+577
-78
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,11 @@ public enum CassandraRelevantProperties
589589
MEMTABLE_TRIE_SIZE_LIMIT("cassandra.trie_size_limit_mb"),
590590
MIGRATION_DELAY("cassandra.migration_delay_ms", "60000"),
591591
MMAPPED_MAX_SEGMENT_SIZE_IN_MB("cassandra.mmapped_max_segment_size"),
592+
/**
593+
* Whether to log detailed execution info when logging slow non-SAI queries.
594+
* For SAI queries, see {@link #SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED}.
595+
*/
596+
MONITORING_EXECUTION_INFO_ENABLED("cassandra.monitoring_execution_info_enabled", "true"),
592597
/** Defines the maximum number of unique timed out queries that will be reported in the logs. Use a negative number to remove any limit. */
593598
MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"),
594599
/** Defines the interval for reporting any operations that have timed out. */
@@ -793,6 +798,7 @@ public enum CassandraRelevantProperties
793798
/**
794799
* Whether to log SAI-specific detailed execution info when logging slow SAI queries.
795800
* This execution info includes the query metrics and the query plan of the slow queries.
801+
* For non-SAI queries, see {@link #MONITORING_EXECUTION_INFO_ENABLED}.
796802
*/
797803
SAI_SLOW_QUERY_LOG_EXECUTION_INFO_ENABLED("cassandra.sai.slow_query_log.execution_info_enabled", "true"),
798804

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public abstract class ReadCommand extends AbstractReadQuery
138138
@Nullable
139139
protected final Index.QueryPlan indexQueryPlan;
140140

141-
private volatile Supplier<ExecutionInfo> executionInfoSupplier = ExecutionInfo.EMPTY_SUPPLIER;
141+
private Supplier<ExecutionInfo> executionInfoSupplier = ExecutionInfo.EMPTY_SUPPLIER;
142142

143143
protected static abstract class SelectionDeserializer
144144
{
@@ -516,8 +516,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
516516
Context context = Context.from(this);
517517
var storageTarget = (null == searcher) ? queryStorage(cfs, executionController)
518518
: searchStorage(searcher, executionController);
519-
if (searcher != null)
520-
executionInfoSupplier = searcher.monitorableExecutionInfo();
519+
// Prepare the monitorable execution info, which will be null if it's deferred to the index
520+
ReadCommandExecutionInfo executionInfo = setupExecutionInfo(searcher);
521521

522522
UnfilteredPartitionIterator iterator = Transformation.apply(storageTarget, new TrackingRowIterator(context));
523523
iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
@@ -530,6 +530,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
530530
iterator = withReadObserver(iterator);
531531
iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
532532
iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
533+
if (executionInfo != null)
534+
iterator = executionInfo.countFetched(iterator, nowInSec());
533535

534536
// If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
535537
// no point in checking it again.
@@ -562,8 +564,13 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
562564
iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
563565
}
564566

565-
// because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
566-
return RTBoundCloser.close(iterator);
567+
// because of the above, we need to append an artifical end bound if the source iterator was stopped short by a counter.
568+
iterator = RTBoundCloser.close(iterator);
569+
570+
if (executionInfo != null)
571+
iterator = executionInfo.countReturned(iterator, nowInSec());
572+
573+
return iterator;
567574
}
568575
catch (RuntimeException | Error e)
569576
{
@@ -1372,4 +1379,28 @@ public long serializedSize(ReadCommand command, int version)
13721379
+ command.indexSerializedSize(version);
13731380
}
13741381
}
1382+
1383+
@Nullable
1384+
private ReadCommandExecutionInfo setupExecutionInfo(Index.Searcher searcher)
1385+
{
1386+
// if we have a searcher, it may use its own custom execution info instead of the generic one
1387+
if (searcher != null)
1388+
{
1389+
Supplier<ExecutionInfo> searcherExecutionInfoSupplier = searcher.monitorableExecutionInfo();
1390+
if (searcherExecutionInfoSupplier != null)
1391+
{
1392+
executionInfoSupplier = searcherExecutionInfoSupplier;
1393+
return null;
1394+
}
1395+
}
1396+
1397+
// if execution info is disabled, return null so we will keep using the default empty supplier
1398+
if (!CassandraRelevantProperties.MONITORING_EXECUTION_INFO_ENABLED.getBoolean())
1399+
return null;
1400+
1401+
// otherwise, create and use the generic execution info
1402+
ReadCommandExecutionInfo commandExecutionInfo = new ReadCommandExecutionInfo();
1403+
executionInfoSupplier = () -> commandExecutionInfo;
1404+
return commandExecutionInfo;
1405+
}
13751406
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.cassandra.db;
18+
19+
import javax.annotation.concurrent.NotThreadSafe;
20+
21+
import org.apache.cassandra.db.monitoring.Monitorable;
22+
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
23+
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
24+
import org.apache.cassandra.db.rows.Row;
25+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
26+
import org.apache.cassandra.db.transform.Transformation;
27+
import org.apache.cassandra.index.Index;
28+
29+
/**
30+
* A custom {@link Monitorable.ExecutionInfo} implementation for {@link ReadCommand}, to be used unless there is an
31+
* {@link Index.Searcher} with its own custom implementation.
32+
* </p>
33+
* It holds and prints the number of partitions, rows and tombstones fetched and returned by the command.
34+
* </p>
35+
* Deleted partitions are considered as a partition tombstone.
36+
* Deleted rows and range tombstone markers are considered as row tombstones.
37+
*/
38+
@NotThreadSafe
39+
class ReadCommandExecutionInfo implements Monitorable.ExecutionInfo
40+
{
41+
private long partitionsFetched = 0;
42+
private long partitionsReturned = 0;
43+
private long partitionTombstones = 0;
44+
private long rowsFetched = 0;
45+
private long rowsReturned = 0;
46+
private long rowTombstones = 0;
47+
48+
/**
49+
* Counts the number of fetched partitions and rows in the specified iterator.
50+
*
51+
* @param partitions the iterator of fetched partitions to count
52+
* @param nowInSec the command's time in seconds, used to evaluate whether a partition/row is alive
53+
* @return the same iterator
54+
*/
55+
UnfilteredPartitionIterator countFetched(UnfilteredPartitionIterator partitions, long nowInSec)
56+
{
57+
Transformation<UnfilteredRowIterator> rowCounter = new Transformation<>() {
58+
@Override
59+
protected Row applyToRow(Row row)
60+
{
61+
if (row.hasLiveData(nowInSec, false))
62+
rowsFetched++;
63+
return row;
64+
}
65+
};
66+
return Transformation.apply(partitions, new Transformation<>() {
67+
@Override
68+
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
69+
{
70+
if (!partition.partitionLevelDeletion().deletes(nowInSec))
71+
partitionsFetched++;
72+
return Transformation.apply(partition, rowCounter);
73+
}
74+
});
75+
}
76+
77+
/**
78+
* Counts the number of fetched partitions, rows and tombstones in the specified iterator.
79+
*
80+
* @param partitions the iterator of returned partitions to count
81+
* @param nowInSec the command's time in seconds, used to evaluate whether a partition/row is alive
82+
* @return the same iterator
83+
*/
84+
UnfilteredPartitionIterator countReturned(UnfilteredPartitionIterator partitions, long nowInSec)
85+
{
86+
Transformation<UnfilteredRowIterator> rowCounter = new Transformation<>() {
87+
@Override
88+
protected Row applyToRow(Row row)
89+
{
90+
if (row.hasLiveData(nowInSec, false))
91+
rowsReturned++;
92+
else
93+
rowTombstones++;
94+
return row;
95+
}
96+
97+
@Override
98+
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
99+
{
100+
rowTombstones++;
101+
return marker;
102+
}
103+
};
104+
return Transformation.apply(partitions, new Transformation<>() {
105+
@Override
106+
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
107+
{
108+
if (partition.partitionLevelDeletion().deletes(nowInSec))
109+
partitionTombstones++;
110+
else
111+
partitionsReturned++;
112+
return Transformation.apply(partition, rowCounter);
113+
}
114+
});
115+
}
116+
117+
@Override
118+
public String toLogString(boolean unique)
119+
{
120+
StringBuilder sb = new StringBuilder("\n");
121+
sb.append(INDENT);
122+
sb.append(unique ? "Fetched/returned/tombstones:" : "Slowest fetched/returned/tombstones:");
123+
append(sb, "partitions",
124+
partitionsFetched,
125+
partitionsReturned,
126+
partitionTombstones);
127+
append(sb, "rows",
128+
rowsFetched,
129+
rowsReturned,
130+
rowTombstones);
131+
return sb.toString();
132+
}
133+
134+
private static void append(StringBuilder sb, String name, long fetched, long returned, long tombstones)
135+
{
136+
sb.append('\n')
137+
.append(DOUBLE_INDENT)
138+
.append(name)
139+
.append(": ")
140+
.append(fetched)
141+
.append('/')
142+
.append(returned)
143+
.append('/')
144+
.append(tombstones);
145+
}
146+
}

src/java/org/apache/cassandra/db/monitoring/Monitorable.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ default ExecutionInfo executionInfo()
5454
*/
5555
interface ExecutionInfo
5656
{
57+
String INDENT = " ";
58+
String DOUBLE_INDENT = INDENT + INDENT;
59+
5760
/**
5861
* An empty no-op implementation.
5962
*/

src/java/org/apache/cassandra/index/Index.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -836,15 +836,16 @@ interface Searcher
836836
UnfilteredPartitionIterator search(ReadExecutionController executionController);
837837

838838
/**
839-
* Returns a supplier for the {@link Monitorable.ExecutionInfo} for this query, to be used by
839+
* Returns a supplier for the custom {@link Monitorable.ExecutionInfo} for this query, to be used by
840840
* {@link ReadCommand#executionInfo()} at the end of the query to collect details about the query execution in
841841
* case it is considered too slow.
842842
*
843-
* @return a supplier for the execution info for this query
843+
* @return a supplier for the execution info for this query, or {@code null} if no custom execution info is available
844844
*/
845+
@Nullable
845846
default Supplier<Monitorable.ExecutionInfo> monitorableExecutionInfo()
846847
{
847-
return Monitorable.ExecutionInfo.EMPTY_SUPPLIER;
848+
return null;
848849
}
849850
}
850851

src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
*/
2929
public class QueryMonitorableExecutionInfo implements Monitorable.ExecutionInfo
3030
{
31-
private static final String INDENT = " ";
32-
private static final String DOUBLE_INDENT = INDENT + INDENT;
33-
3431
private final QueryContext.Snapshot metrics;
3532
private final String plan;
3633

0 commit comments

Comments
 (0)