Skip to content

Modified IteratorEnvironment to no longer throw UOE #5490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl;
import org.apache.accumulo.core.clientImpl.ScannerImpl;
import org.apache.accumulo.core.clientImpl.ScannerOptions;
import org.apache.accumulo.core.data.ArrayByteSequence;
Expand All @@ -49,10 +48,10 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment;
import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.hadoop.io.Text;

/**
Expand Down Expand Up @@ -87,70 +86,6 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
private final Supplier<ClientContext> context;
private final Supplier<TableId> tableId;

private class ClientSideIteratorEnvironment implements IteratorEnvironment {

private final SamplerConfiguration samplerConfig;
private final boolean sampleEnabled;

ClientSideIteratorEnvironment(boolean sampleEnabled, SamplerConfiguration samplerConfig) {
this.sampleEnabled = sampleEnabled;
this.samplerConfig = samplerConfig;
}

@Override
public IteratorScope getIteratorScope() {
return IteratorScope.scan;
}

@Override
public boolean isFullMajorCompaction() {
// The javadocs state this method will throw an ISE when scope is not majc
throw new IllegalStateException(
"Asked about major compaction type when scope is " + getIteratorScope());
}

@Override
public boolean isUserCompaction() {
return false;
}

@Override
public Authorizations getAuthorizations() {
return ClientSideIteratorScanner.this.getAuthorizations();
}

@Override
public IteratorEnvironment cloneWithSamplingEnabled() {
return new ClientSideIteratorEnvironment(true, samplerConfig);
}

@Override
public boolean isSamplingEnabled() {
return sampleEnabled;
}

@Override
public SamplerConfiguration getSamplerConfiguration() {
return samplerConfig;
}

@Deprecated(since = "2.1.0")
@Override
public ServiceEnvironment getServiceEnv() {
return new ClientServiceEnvironmentImpl(context.get());
}

@Override
public PluginEnvironment getPluginEnv() {
return new ClientServiceEnvironmentImpl(context.get());
}

@Override
public TableId getTableId() {
return tableId.get();
}
}

/**
* A class that wraps a Scanner in a SortedKeyValueIterator so that other accumulo iterators can
* use it as a source.
Expand Down Expand Up @@ -295,9 +230,14 @@ public Iterator<Entry<Key,Value>> iterator() {

SortedKeyValueIterator<Key,Value> skvi;
try {
IteratorEnvironment iterEnv = new ClientSideIteratorEnvironment(
getSamplerConfiguration() != null, getIteratorSamplerConfigurationInternal());

ClientIteratorEnvironment.Builder builder = new ClientIteratorEnvironment.Builder()
.withClient(context.get()).withAuthorizations(getAuthorizations())
.withScope(IteratorScope.scan).withTableId(tableId.get())
.withSamplerConfiguration(getIteratorSamplerConfigurationInternal());
if (getSamplerConfiguration() != null) {
builder.withSamplingEnabled();
}
IteratorEnvironment iterEnv = builder.build();
IteratorBuilder ib =
IteratorBuilder.builder(tm.values()).opts(serverSideIteratorOptions).env(iterEnv).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl;
import org.apache.accumulo.core.clientImpl.ScannerOptions;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
Expand All @@ -42,6 +43,7 @@
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
Expand All @@ -55,6 +57,7 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment;
import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
Expand All @@ -66,6 +69,7 @@
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.ConfigurationImpl;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;
Expand All @@ -74,8 +78,46 @@

class RFileScanner extends ScannerOptions implements Scanner {

private static class RFileScannerEnvironmentImpl extends ClientServiceEnvironmentImpl {

private final Configuration conf;
private final Configuration tableConf;

public RFileScannerEnvironmentImpl(Opts opts) {
super(null);
conf = new ConfigurationImpl(new ConfigurationCopy(DefaultConfiguration.getInstance()));
ConfigurationCopy tableCC = new ConfigurationCopy(DefaultConfiguration.getInstance());
if (opts.tableConfig != null) {
opts.tableConfig.forEach(tableCC::set);
}
tableConf = new ConfigurationImpl(tableCC);
}

@Override
public String getTableName(TableId tableId) throws TableNotFoundException {
Preconditions.checkArgument(tableId == TABLE_ID, "Expected " + TABLE_ID + " obtained"
+ " from IteratorEnvironment.getTableId(), but got: " + tableId);
return TABLE_NAME;
}
Comment on lines +96 to +101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think getTableName should be supported here. There is no need for a user to call this. Should probably just throw an exception


@Override
public Configuration getConfiguration() {
return conf;
}

@Override
public Configuration getConfiguration(TableId tableId) {
Preconditions.checkArgument(tableId == TABLE_ID, "Expected " + TABLE_ID + " obtained"
+ " from IteratorEnvironment.getTableId(), but got: " + tableId);
return tableConf;
}

}

private static final byte[] EMPTY_BYTES = new byte[0];
private static final Range EMPTY_RANGE = new Range();
private static final String TABLE_NAME = "rfileScanner";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be deleted if getTableName is removed

private static final TableId TABLE_ID = TableId.of(TABLE_NAME);
Comment on lines +119 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed in more detail here but I'm not sure about a dummy table id/name here. Both null and a dummy id have their drawbacks though, so may be up to preference.

In my opinion, a null table id makes sense since there is no table in this context while still allowing for env.getPluginEnv().getConfiguration(env.getTableId()) to work (getConfiguration just ensures the passed in value is null)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could spin this off in a separate issue since it was a preexisting pattern in the code.


private Range range;
private BlockCacheManager blockCacheManager = null;
Expand Down Expand Up @@ -225,33 +267,6 @@ public void updateScanIteratorOption(String iteratorName, String key, String val
super.updateScanIteratorOption(iteratorName, key, value);
}

private class IterEnv implements IteratorEnvironment {
@Override
public IteratorScope getIteratorScope() {
return IteratorScope.scan;
}

@Override
public boolean isFullMajorCompaction() {
return false;
}

@Override
public Authorizations getAuthorizations() {
return opts.auths;
}

@Override
public boolean isSamplingEnabled() {
return RFileScanner.this.getSamplerConfiguration() != null;
}

@Override
public SamplerConfiguration getSamplerConfiguration() {
return RFileScanner.this.getSamplerConfiguration();
}
}

@Override
public Iterator<Entry<Key,Value>> iterator() {
try {
Expand Down Expand Up @@ -292,15 +307,23 @@ public Iterator<Entry<Key,Value>> iterator() {
EMPTY_BYTES, tableConf);
}

ClientIteratorEnvironment.Builder iterEnvBuilder = new ClientIteratorEnvironment.Builder()
.withEnvironment(new RFileScannerEnvironmentImpl(opts)).withAuthorizations(opts.auths)
.withScope(IteratorScope.scan).withTableId(TABLE_ID);
if (getSamplerConfiguration() != null) {
iterEnvBuilder.withSamplerConfiguration(getSamplerConfiguration());
iterEnvBuilder.withSamplingEnabled();
}
IteratorEnvironment iterEnv = iterEnvBuilder.build();
try {
if (opts.tableConfig != null && !opts.tableConfig.isEmpty()) {
var ibEnv = IteratorConfigUtil.loadIterConf(IteratorScope.scan, serverSideIteratorList,
serverSideIteratorOptions, tableConf);
var iteratorBuilder = ibEnv.env(new IterEnv()).build();
var iteratorBuilder = ibEnv.env(iterEnv).build();
iterator = IteratorConfigUtil.loadIterators(iterator, iteratorBuilder);
} else {
var iteratorBuilder = IteratorBuilder.builder(serverSideIteratorList)
.opts(serverSideIteratorOptions).env(new IterEnv()).build();
.opts(serverSideIteratorOptions).env(iterEnv).build();
iterator = IteratorConfigUtil.loadIterators(iterator, iteratorBuilder);
}
} catch (IOException e) {
Expand Down
Loading