|
30 | 30 |
|
31 | 31 | import org.apache.accumulo.core.client.IteratorSetting;
|
32 | 32 | import org.apache.accumulo.core.client.Scanner;
|
| 33 | +import org.apache.accumulo.core.client.TableNotFoundException; |
33 | 34 | import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
|
34 |
| -import org.apache.accumulo.core.client.sample.SamplerConfiguration; |
| 35 | +import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl; |
35 | 36 | import org.apache.accumulo.core.clientImpl.ScannerOptions;
|
36 | 37 | import org.apache.accumulo.core.conf.AccumuloConfiguration;
|
37 | 38 | import org.apache.accumulo.core.conf.ConfigurationCopy;
|
|
42 | 43 | import org.apache.accumulo.core.data.Column;
|
43 | 44 | import org.apache.accumulo.core.data.Key;
|
44 | 45 | import org.apache.accumulo.core.data.Range;
|
| 46 | +import org.apache.accumulo.core.data.TableId; |
45 | 47 | import org.apache.accumulo.core.data.Value;
|
46 | 48 | import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
|
47 | 49 | import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
|
|
55 | 57 | import org.apache.accumulo.core.iterators.IteratorEnvironment;
|
56 | 58 | import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
|
57 | 59 | import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
|
| 60 | +import org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment; |
58 | 61 | import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
|
59 | 62 | import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
|
60 | 63 | import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
|
|
66 | 69 | import org.apache.accumulo.core.spi.cache.CacheType;
|
67 | 70 | import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
|
68 | 71 | import org.apache.accumulo.core.spi.crypto.CryptoService;
|
| 72 | +import org.apache.accumulo.core.util.ConfigurationImpl; |
69 | 73 | import org.apache.accumulo.core.util.LocalityGroupUtil;
|
70 | 74 | import org.apache.hadoop.fs.FSDataInputStream;
|
71 | 75 | import org.apache.hadoop.io.Text;
|
|
74 | 78 |
|
75 | 79 | class RFileScanner extends ScannerOptions implements Scanner {
|
76 | 80 |
|
| 81 | + private static class RFileScannerEnvironmentImpl extends ClientServiceEnvironmentImpl { |
| 82 | + |
| 83 | + private final Configuration conf; |
| 84 | + private final Configuration tableConf; |
| 85 | + |
| 86 | + public RFileScannerEnvironmentImpl(Opts opts) { |
| 87 | + super(null); |
| 88 | + conf = new ConfigurationImpl(new ConfigurationCopy(DefaultConfiguration.getInstance())); |
| 89 | + ConfigurationCopy tableCC = new ConfigurationCopy(DefaultConfiguration.getInstance()); |
| 90 | + if (opts.tableConfig != null) { |
| 91 | + opts.tableConfig.forEach(tableCC::set); |
| 92 | + } |
| 93 | + tableConf = new ConfigurationImpl(tableCC); |
| 94 | + } |
| 95 | + |
| 96 | + @Override |
| 97 | + public String getTableName(TableId tableId) throws TableNotFoundException { |
| 98 | + Preconditions.checkArgument(tableId == TABLE_ID, "Expected " + TABLE_ID + " obtained" |
| 99 | + + " from IteratorEnvironment.getTableId(), but got: " + tableId); |
| 100 | + return TABLE_NAME; |
| 101 | + } |
| 102 | + |
| 103 | + @Override |
| 104 | + public Configuration getConfiguration() { |
| 105 | + return conf; |
| 106 | + } |
| 107 | + |
| 108 | + @Override |
| 109 | + public Configuration getConfiguration(TableId tableId) { |
| 110 | + Preconditions.checkArgument(tableId == TABLE_ID, "Expected " + TABLE_ID + " obtained" |
| 111 | + + " from IteratorEnvironment.getTableId(), but got: " + tableId); |
| 112 | + return tableConf; |
| 113 | + } |
| 114 | + |
| 115 | + } |
| 116 | + |
77 | 117 | private static final byte[] EMPTY_BYTES = new byte[0];
|
78 | 118 | private static final Range EMPTY_RANGE = new Range();
|
| 119 | + private static final String TABLE_NAME = "rfileScanner"; |
| 120 | + private static final TableId TABLE_ID = TableId.of(TABLE_NAME); |
79 | 121 |
|
80 | 122 | private Range range;
|
81 | 123 | private BlockCacheManager blockCacheManager = null;
|
@@ -225,33 +267,6 @@ public void updateScanIteratorOption(String iteratorName, String key, String val
|
225 | 267 | super.updateScanIteratorOption(iteratorName, key, value);
|
226 | 268 | }
|
227 | 269 |
|
228 |
| - private class IterEnv implements IteratorEnvironment { |
229 |
| - @Override |
230 |
| - public IteratorScope getIteratorScope() { |
231 |
| - return IteratorScope.scan; |
232 |
| - } |
233 |
| - |
234 |
| - @Override |
235 |
| - public boolean isFullMajorCompaction() { |
236 |
| - return false; |
237 |
| - } |
238 |
| - |
239 |
| - @Override |
240 |
| - public Authorizations getAuthorizations() { |
241 |
| - return opts.auths; |
242 |
| - } |
243 |
| - |
244 |
| - @Override |
245 |
| - public boolean isSamplingEnabled() { |
246 |
| - return RFileScanner.this.getSamplerConfiguration() != null; |
247 |
| - } |
248 |
| - |
249 |
| - @Override |
250 |
| - public SamplerConfiguration getSamplerConfiguration() { |
251 |
| - return RFileScanner.this.getSamplerConfiguration(); |
252 |
| - } |
253 |
| - } |
254 |
| - |
255 | 270 | @Override
|
256 | 271 | public Iterator<Entry<Key,Value>> iterator() {
|
257 | 272 | try {
|
@@ -292,15 +307,23 @@ public Iterator<Entry<Key,Value>> iterator() {
|
292 | 307 | EMPTY_BYTES, tableConf);
|
293 | 308 | }
|
294 | 309 |
|
| 310 | + ClientIteratorEnvironment.Builder iterEnvBuilder = new ClientIteratorEnvironment.Builder() |
| 311 | + .withEnvironment(new RFileScannerEnvironmentImpl(opts)).withAuthorizations(opts.auths) |
| 312 | + .withScope(IteratorScope.scan).withTableId(TABLE_ID); |
| 313 | + if (getSamplerConfiguration() != null) { |
| 314 | + iterEnvBuilder.withSamplerConfiguration(getSamplerConfiguration()); |
| 315 | + iterEnvBuilder.withSamplingEnabled(); |
| 316 | + } |
| 317 | + IteratorEnvironment iterEnv = iterEnvBuilder.build(); |
295 | 318 | try {
|
296 | 319 | if (opts.tableConfig != null && !opts.tableConfig.isEmpty()) {
|
297 | 320 | var ibEnv = IteratorConfigUtil.loadIterConf(IteratorScope.scan, serverSideIteratorList,
|
298 | 321 | serverSideIteratorOptions, tableConf);
|
299 |
| - var iteratorBuilder = ibEnv.env(new IterEnv()).build(); |
| 322 | + var iteratorBuilder = ibEnv.env(iterEnv).build(); |
300 | 323 | iterator = IteratorConfigUtil.loadIterators(iterator, iteratorBuilder);
|
301 | 324 | } else {
|
302 | 325 | var iteratorBuilder = IteratorBuilder.builder(serverSideIteratorList)
|
303 |
| - .opts(serverSideIteratorOptions).env(new IterEnv()).build(); |
| 326 | + .opts(serverSideIteratorOptions).env(iterEnv).build(); |
304 | 327 | iterator = IteratorConfigUtil.loadIterators(iterator, iteratorBuilder);
|
305 | 328 | }
|
306 | 329 | } catch (IOException e) {
|
|
0 commit comments