Skip to content

Commit bd184af

Browse files
dlmarionctubbsii
andauthored
Added summary of queued and running compactions to coordinator (#5989)
This commit adds periodic logging of queued and running external compaction information to the coordinator. The logging is emitted by a new class, CoordinatorSummaryLogger, so that users can easily redirect this log to a new file in the logging configuration. At each interval this new class will log the number of compactions running for each table, and will log the number of compactors, queued compactions and running compactions for each compaction queue. The number of queued compactions is an estimate as each tablet server only reports up to 100 different compaction priorities to conserve memory space in the Coordinator (see ExternalCompactionExecutor.summarize). The metrics are a more accurate source of the number of queued external compactions, but that requires aggregating all of the METRICS_MAJC_QUEUED Meters from all of the TabletServers. Related to #5965 Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
1 parent 7d823f6 commit bd184af

3 files changed

Lines changed: 91 additions & 1 deletion

File tree

server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
168168
protected final AccumuloConfiguration aconf;
169169
protected CompactionFinalizer compactionFinalizer;
170170
protected LiveTServerSet tserverSet;
171+
private final CoordinatorSummaryLogger summaryLogger;
171172

172173
private ServiceLock coordinatorLock;
173174

@@ -187,8 +188,9 @@ protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfigur
187188
printStartupMsg();
188189
startCompactionCleaner();
189190
startRunningCleaner();
190-
compactorCounts = Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS)
191+
compactorCounts = Caffeine.newBuilder().expireAfterWrite(2, TimeUnit.MINUTES)
191192
.build(queue -> ExternalCompactionUtil.countCompactors(queue, getContext()));
193+
summaryLogger = new CoordinatorSummaryLogger(super.getContext(), compactorCounts);
192194
}
193195

194196
@Override
@@ -345,6 +347,7 @@ public void run() {
345347

346348
tserverSet.startListeningForTabletServerChanges();
347349
startDeadCompactionDetector();
350+
startQueueRunningSummaryLogging();
348351
startFailureSummaryLogging();
349352

350353
LOG.info("Starting loop to check tservers for compaction summaries");
@@ -685,6 +688,12 @@ private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
685688
failingTables.compute(extent.tableId(), FailureCounts::incrementFailure);
686689
}
687690

691+
protected void startQueueRunningSummaryLogging() {
692+
ScheduledFuture<?> future = getContext().getScheduledExecutor()
693+
.scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1, TimeUnit.MINUTES);
694+
ThreadPools.watchNonCriticalScheduledTask(future);
695+
}
696+
688697
protected void startFailureSummaryLogging() {
689698
ScheduledFuture<?> future = getContext().getScheduledExecutor()
690699
.scheduleWithFixedDelay(this::printStats, 0, 5, TimeUnit.MINUTES);
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.coordinator;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import java.util.TreeMap;
24+
import java.util.TreeSet;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
import org.apache.accumulo.core.data.TableId;
28+
import org.apache.accumulo.core.dataImpl.KeyExtent;
29+
import org.apache.accumulo.core.metadata.TServerInstance;
30+
import org.apache.accumulo.server.ServerContext;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import com.github.benmanes.caffeine.cache.Cache;
35+
36+
public class CoordinatorSummaryLogger {
37+
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSummaryLogger.class);
38+
39+
private static final TreeMap<Short,TreeSet<TServerInstance>> EMPTY = new TreeMap<>();
40+
private final ServerContext ctx;
41+
private final Cache<String,Integer> compactorCounts;
42+
43+
public CoordinatorSummaryLogger(ServerContext ctx, Cache<String,Integer> compactorCounts) {
44+
this.ctx = ctx;
45+
this.compactorCounts = compactorCounts;
46+
}
47+
48+
public void logSummary() {
49+
50+
final Map<TableId,String> tableMap = ctx.getTableIdToNameMap();
51+
final Map<String,AtomicLong> perQueueRunningCount = new HashMap<>();
52+
final Map<String,AtomicLong> perTableRunningCount = new HashMap<>();
53+
54+
CompactionCoordinator.RUNNING_CACHE.values().forEach(rc -> {
55+
TableId tid = KeyExtent.fromThrift(rc.getJob().getExtent()).tableId();
56+
String tableName = tableMap.getOrDefault(tid, "Unmapped table id: " + tid.canonical());
57+
perQueueRunningCount.computeIfAbsent(rc.getQueueName(), q -> new AtomicLong(0))
58+
.incrementAndGet();
59+
perTableRunningCount.computeIfAbsent(tableName, t -> new AtomicLong(0)).incrementAndGet();
60+
});
61+
62+
perQueueRunningCount.forEach((q, count) -> {
63+
LOG.info(
64+
"Queue {}: compactors: {}, queued majc (minimum, possibly higher): {}, running majc: {}",
65+
q, compactorCounts.asMap().getOrDefault(q, 0),
66+
// This map only contains the highest priority for each tserver. So when tservers have
67+
// other priorities that need to compact or have more than one compaction for a
68+
// priority level this count will be lower than the actual number of queued.
69+
CompactionCoordinator.QUEUE_SUMMARIES.QUEUES.getOrDefault(q, EMPTY).values().stream()
70+
.mapToLong(TreeSet::size).sum(),
71+
count.get());
72+
73+
});
74+
perTableRunningCount
75+
.forEach((t, count) -> LOG.info("Running compactions for table {}: {}", t, count));
76+
}
77+
78+
}

server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ protected TestCoordinator(CompactionFinalizer finalizer, LiveTServerSet tservers
117117
@Override
118118
protected void startDeadCompactionDetector() {}
119119

120+
@Override
121+
protected void startQueueRunningSummaryLogging() {}
122+
120123
@Override
121124
protected void startFailureSummaryLogging() {}
122125

0 commit comments

Comments
 (0)