Skip to content

Increase majc priority for tablets over file size threshold #5026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Collection;
import java.util.Map;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;

Expand Down Expand Up @@ -79,6 +81,13 @@ public interface InitParameters {
*/
public interface PlanningParameters {

/**
* @return The id of the namespace that the table is assigned to
* @throws TableNotFoundException thrown when the namespace for a table cannot be calculated
* @since 2.1.4
*/
NamespaceId getNamespaceId() throws TableNotFoundException;

/**
* @return The id of the table that compactions are being planned for.
* @see ServiceEnvironment#getTableName(TableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -250,6 +251,7 @@ private void determineMaxFilesToCompact(InitParameters params) {

@Override
public CompactionPlan makePlan(PlanningParameters params) {
int maxTabletFiles = 0;
try {

if (params.getCandidates().isEmpty()) {
Expand Down Expand Up @@ -322,7 +324,7 @@ public CompactionPlan makePlan(PlanningParameters params) {
} else if (params.getKind() == CompactionKind.SYSTEM
&& params.getRunningCompactions().isEmpty()
&& params.getAll().size() == params.getCandidates().size()) {
int maxTabletFiles = getMaxTabletFiles(
maxTabletFiles = getMaxTabletFiles(
params.getServiceEnvironment().getConfiguration(params.getTableId()));
if (params.getAll().size() > maxTabletFiles) {
// The tablet is above its max files, there are no compactions running, all files are
Expand All @@ -339,11 +341,13 @@ public CompactionPlan makePlan(PlanningParameters params) {
// determine which executor to use based on the size of the files
var ceid = getExecutor(group);

return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group)
.build();
return params.createPlanBuilder()
.addJob(createPriority(params, group, maxTabletFiles), ceid, group).build();
}
} catch (RuntimeException e) {
throw e;
} catch (TableNotFoundException e) {
throw new RuntimeException("Error getting namespace for table: " + params.getTableId(), e);
}
}

Expand Down Expand Up @@ -415,10 +419,10 @@ private Collection<CompactableFile> findFilesToCompactWithLowerRatio(PlanningPar
return found;
}

private static short createPriority(PlanningParameters params,
Collection<CompactableFile> group) {
return CompactionJobPrioritizer.createPriority(params.getKind(), params.getAll().size(),
group.size());
private static short createPriority(PlanningParameters params, Collection<CompactableFile> group,
int maxTabletFiles) throws TableNotFoundException {
return CompactionJobPrioritizer.createPriority(params.getNamespaceId(), params.getTableId(),
params.getKind(), params.getAll().size(), group.size(), maxTabletFiles);
}

private long getMaxSizeToCompact(CompactionKind kind) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,136 @@
package org.apache.accumulo.core.util.compaction;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.util.Pair;
import org.apache.commons.lang3.Range;

import com.google.common.base.Preconditions;

public class CompactionJobPrioritizer {

public static final Comparator<CompactionJob> JOB_COMPARATOR =
Comparator.comparingInt(CompactionJob::getPriority)
.thenComparingInt(job -> job.getFiles().size()).reversed();

public static short createPriority(CompactionKind kind, int totalFiles, int compactingFiles) {

int prio = totalFiles + compactingFiles;

switch (kind) {
case USER:
case CHOP:
// user-initiated compactions will have a positive priority
// based on number of files
if (prio > Short.MAX_VALUE) {
return Short.MAX_VALUE;
}
return (short) prio;
case SELECTOR:
case SYSTEM:
// system-initiated compactions will have a negative priority
// starting at -32768 and increasing based on number of files
// maxing out at -1
if (prio > Short.MAX_VALUE) {
return -1;
} else {
return (short) (Short.MIN_VALUE + prio);
}
default:
throw new AssertionError("Unknown kind " + kind);
private static final Map<Pair<TableId,CompactionKind>,Range<Short>> SYSTEM_TABLE_RANGES =
new HashMap<>();
private static final Map<Pair<NamespaceId,CompactionKind>,
Range<Short>> ACCUMULO_NAMESPACE_RANGES = new HashMap<>();

// Create ranges of possible priority values where each range has
// 2000 possible values. Priority order is:
// root table user initiated
// root table system initiated
// metadata table user initiated
// metadata table system initiated
// other tables in accumulo namespace user initiated
// other tables in accumulo namespace system initiated
// user tables that have more files that configured system initiated
// user tables user initiated
// user tables system initiated
static final Range<Short> ROOT_TABLE_USER = Range.of((short) 30768, (short) 32767);
static final Range<Short> ROOT_TABLE_SYSTEM = Range.of((short) 28768, (short) 30767);

static final Range<Short> METADATA_TABLE_USER = Range.of((short) 26768, (short) 28767);
static final Range<Short> METADATA_TABLE_SYSTEM = Range.of((short) 24768, (short) 26767);

static final Range<Short> SYSTEM_NS_USER = Range.of((short) 22768, (short) 24767);
static final Range<Short> SYSTEM_NS_SYSTEM = Range.of((short) 20768, (short) 22767);

static final Range<Short> TABLE_OVER_SIZE = Range.of((short) 18768, (short) 20767);

static final Range<Short> USER_TABLE_USER = Range.of((short) 1, (short) 18767);
static final Range<Short> USER_TABLE_SYSTEM = Range.of((short) -32768, (short) 0);

static {
// root table
SYSTEM_TABLE_RANGES.put(new Pair<>(RootTable.ID, CompactionKind.USER), ROOT_TABLE_USER);
SYSTEM_TABLE_RANGES.put(new Pair<>(RootTable.ID, CompactionKind.SYSTEM), ROOT_TABLE_SYSTEM);

// metadata table
SYSTEM_TABLE_RANGES.put(new Pair<>(MetadataTable.ID, CompactionKind.USER), METADATA_TABLE_USER);
SYSTEM_TABLE_RANGES.put(new Pair<>(MetadataTable.ID, CompactionKind.SYSTEM),
METADATA_TABLE_SYSTEM);

// metadata table
ACCUMULO_NAMESPACE_RANGES.put(new Pair<>(Namespace.ACCUMULO.id(), CompactionKind.USER),
SYSTEM_NS_USER);
ACCUMULO_NAMESPACE_RANGES.put(new Pair<>(Namespace.ACCUMULO.id(), CompactionKind.SYSTEM),
SYSTEM_NS_SYSTEM);
}

public static short createPriority(final NamespaceId nsId, final TableId tableId,
final CompactionKind kind, final int totalFiles, final int compactingFiles,
final int maxFilesPerTablet) {

Objects.requireNonNull(nsId, "nsId cannot be null");
Objects.requireNonNull(tableId, "tableId cannot be null");
Preconditions.checkArgument(totalFiles >= 0, "totalFiles is negative %s", totalFiles);
Preconditions.checkArgument(compactingFiles >= 0, "compactingFiles is negative %s",
compactingFiles);

final Function<Range<Short>,Short> normalPriorityFunction = new Function<>() {
@Override
public Short apply(Range<Short> f) {
return (short) Math.min(f.getMaximum(), f.getMinimum() + totalFiles + compactingFiles);
}
};

final Function<Range<Short>,Short> tabletOverSizeFunction = new Function<>() {
@Override
public Short apply(Range<Short> f) {
return (short) Math.min(f.getMaximum(),
f.getMinimum() + compactingFiles + (totalFiles - maxFilesPerTablet));
}
};

// Handle the case of a CHOP compaction. For the purposes of determining
// a priority, treat them as a USER compaction.
CompactionKind calculationKind = kind;
if (kind == CompactionKind.CHOP) {
calculationKind = CompactionKind.USER;
} else if (kind == CompactionKind.SELECTOR) {
calculationKind = CompactionKind.SYSTEM;
}

Range<Short> range = null;
Function<Range<Short>,Short> func = normalPriorityFunction;
if (Namespace.ACCUMULO.id() == nsId) {
// Handle system tables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be chop compactions when merging the metadata table and these may end w/ a null range causing an exception. Need to add handling for kind of chop. Will have to drop this in the 3.1 code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 74d66b2 I added code to treat CHOP as USER compaction kind.

range = SYSTEM_TABLE_RANGES.get(new Pair<>(tableId, calculationKind));
if (range == null) {
range = ACCUMULO_NAMESPACE_RANGES.get(new Pair<>(nsId, calculationKind));
}
} else {
// Handle user tables
if (totalFiles > maxFilesPerTablet && calculationKind == CompactionKind.SYSTEM) {
range = TABLE_OVER_SIZE;
func = tabletOverSizeFunction;
} else if (calculationKind == CompactionKind.SYSTEM) {
range = USER_TABLE_SYSTEM;
} else {
range = USER_TABLE_USER;
}
}

if (range == null) {
throw new IllegalStateException(
"Error calculating compaction priority for table: " + tableId);
}
return func.apply(range);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.accumulo.core.spi.common.ServiceEnvironment.Configuration;
Expand Down Expand Up @@ -741,6 +744,11 @@ private static CompactionPlanner.PlanningParameters createPlanningParams(Set<Com
CompactionKind kind, Configuration conf) {
return new CompactionPlanner.PlanningParameters() {

@Override
public NamespaceId getNamespaceId() throws TableNotFoundException {
return Namespace.ACCUMULO.id();
}

@Override
public TableId getTableId() {
return TableId.of("42");
Expand Down
Loading