forked from logicalclocks/trino-hadoop-apache
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFileSystem.java
More file actions
5098 lines (4716 loc) · 177 KB
/
Copy pathFileSystem.java
File metadata and controls
5098 lines (4716 loc) · 177 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
* may be implemented as a distributed filesystem, or as a "local"
* one that reflects the locally-connected disk. The local version
* exists for small Hadoop instances and for testing.
*
* <p>
*
* All user code that may potentially use the Hadoop Distributed
* File System should be written to use a FileSystem object or its
* successor, {@link FileContext}.
* </p>
* <p>
* The local implementation is {@link LocalFileSystem} and distributed
* implementation is DistributedFileSystem. There are other implementations
* for object stores and (outside the Apache Hadoop codebase),
* third party filesystems.
* </p>
* Notes
* <ol>
* <li>The behaviour of the filesystem is
* <a href="https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html">
* specified in the Hadoop documentation. </a>
* However, the normative specification of the behavior of this class is
* actually HDFS: if HDFS does not behave the way these Javadocs or
* the specification in the Hadoop documentations define, assume that
* the documentation is incorrect.
* </li>
* <li>The term {@code FileSystem} refers to an instance of this class.</li>
* <li>The acronym "FS" is used as an abbreviation of FileSystem.</li>
* <li>The term {@code filesystem} refers to the distributed/local filesystem
* itself, rather than the class used to interact with it.</li>
* <li>The term "file" refers to a file in the remote filesystem,
* rather than instances of {@code java.io.File}.</li>
* </ol>
*
* This is a carefully evolving class.
* New methods may be marked as Unstable or Evolving for their initial release,
* as a warning that they are new and may change based on the
* experience of use in applications.
* <p>
* <b>Important note for developers</b>
* </p>
* If you are making changes here to the public API or protected methods,
* you must review the following subclasses and make sure that
* they are filtering/passing through new methods as appropriate.
*
* {@link FilterFileSystem}: methods are passed through. If not,
* then {@code TestFilterFileSystem.MustNotImplement} must be
* updated with the unsupported interface.
* Furthermore, if the new API's support is probed for via
* {@link #hasPathCapability(Path, String)} then
* {@link FilterFileSystem#hasPathCapability(Path, String)}
* must return false, always.
* <p>
* {@link ChecksumFileSystem}: checksums are created and
* verified.
* </p>
* {@code TestHarFileSystem} will need its {@code MustNotImplement}
* interface updated.
*
* <p>
* There are some external places your changes will break things.
* Do co-ordinate changes here.
* </p>
*
* HBase: HBoss
* <p>
* Hive: HiveShim23
* </p>
* {@code shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java}
*
*****************************************************************/
@SuppressWarnings("DeprecatedIsStillUsed")
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured
implements Closeable, DelegationTokenIssuer,
PathCapabilities, BulkDeleteSource {
public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS =
CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
public static final String FS_DEFAULT_ALTERNATIVESCHEME_KEY = CommonConfigurationKeys.FS_DEFAULT_ALTERNATIVESCHEME_KEY;
public static final String DEFAULT_ALTERNATIVE_SCHEME
= CommonConfigurationKeys.FS_DEFAULT_ALTERNATIVESCHEME_DEFAULT;
/**
* This log is widely used in the org.apache.hadoop.fs code and tests,
* so must be considered something to only be changed with care.
*/
@InterfaceAudience.Private
public static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
/**
* The SLF4J logger to use in logging within the FileSystem class itself.
*/
private static final Logger LOGGER =
LoggerFactory.getLogger(FileSystem.class);
/**
* Priority of the FileSystem shutdown hook: {@value}.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
/**
* Prefix for trash directory: {@value}.
*/
public static final String TRASH_PREFIX = ".Trash";
public static final String USER_HOME_PREFIX = "/user";
/** FileSystem cache. May be replaced by {@link FileSystemManager}. */
static volatile Cache CACHE = new Cache(new Configuration());
/** The key this instance is stored under in the cache. */
private Cache.Key key;
/** Recording statistics per a FileSystem class. */
private static final Map<Class<? extends FileSystem>, Statistics>
statisticsTable = new IdentityHashMap<>();
/**
* The statistics for this file system.
*/
protected Statistics statistics;
/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
*/
private final Set<Path> deleteOnExit = new TreeSet<>();
/**
* Should symbolic links be resolved by {@link FileSystemLinkResolver}.
* Set to the value of
* {@link CommonConfigurationKeysPublic#FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY}
*/
boolean resolveSymlinks;
/**
* This method adds a FileSystem instance to the cache so that it can
* be retrieved later. It is only for testing.
* @param uri the uri to store it under
* @param conf the configuration to store it under
* @param fs the FileSystem to store
* @throws IOException if the current user cannot be determined.
*/
@VisibleForTesting
static void addFileSystemForTesting(URI uri, Configuration conf,
FileSystem fs) throws IOException {
CACHE.map.put(new Cache.Key(uri, conf), fs);
}
@VisibleForTesting
static void removeFileSystemForTesting(URI uri, Configuration conf,
FileSystem fs) throws IOException {
CACHE.map.remove(new Cache.Key(uri, conf), fs);
}
@VisibleForTesting
static int cacheSize() {
return CACHE.map.size();
}
/**
* Get a FileSystem instance based on the uri, the passed in
* configuration and the user.
* @param uri of the filesystem
* @param conf the configuration to use
* @param user to perform the get as
* @return the filesystem instance
* @throws IOException failure to load
* @throws InterruptedException If the {@code UGI.callAs()} call was
* somehow interrupted.
*/
public static FileSystem get(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.callAs(() -> get(uri, conf));
}
/**
* Returns the configured FileSystem implementation.
* @param conf the configuration to use
* @return FileSystem.
* @throws IOException If an I/O error occurred.
*/
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
/**
* Get the default FileSystem URI from a configuration.
* @param conf the configuration to use
* @return the uri of the default filesystem
*/
public static URI getDefaultUri(Configuration conf) {
URI uri =
URI.create(fixName(conf.getTrimmed(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
if (uri.getScheme() == null) {
throw new IllegalArgumentException("No scheme in default FS: " + uri);
}
return uri;
}
/** Get the alternative default filesystem scheme from a configuration.
* @param conf the configuration to use
* @return the uri of the default filesystem
*/
public static String getAlternativeDefaultScheme(Configuration conf) {
return conf.get(FS_DEFAULT_ALTERNATIVESCHEME_KEY, DEFAULT_ALTERNATIVE_SCHEME);
}
/**
* Set the default FileSystem URI in a configuration.
* @param conf the configuration to alter
* @param uri the new default filesystem uri
*/
public static void setDefaultUri(Configuration conf, URI uri) {
conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
}
/** Set the default FileSystem URI in a configuration.
* @param conf the configuration to alter
* @param uri the new default filesystem uri
*/
public static void setDefaultUri(Configuration conf, String uri) {
setDefaultUri(conf, URI.create(fixName(uri)));
}
/**
* Initialize a FileSystem.
*
* Called after the new FileSystem instance is constructed, and before it
* is ready for use.
*
* FileSystem implementations overriding this method MUST forward it to
* their superclass, though the order in which it is done, and whether
* to alter the configuration before the invocation are options of the
* subclass.
* @param name a URI whose authority section names the host, port, etc.
* for this FileSystem
* @param conf the configuration
* @throws IOException on any failure to initialize this instance.
* @throws IllegalArgumentException if the URI is considered invalid.
*/
public void initialize(URI name, Configuration conf) throws IOException {
final String scheme;
if (name.getScheme() == null || name.getScheme().isEmpty()) {
scheme = getDefaultUri(conf).getScheme();
} else {
scheme = name.getScheme();
}
statistics = getStatistics(scheme, getClass());
resolveSymlinks = conf.getBoolean(
CommonConfigurationKeysPublic.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
CommonConfigurationKeysPublic.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
}
/**
* Return the protocol scheme for this FileSystem.
* <p>
* This implementation throws an <code>UnsupportedOperationException</code>.
*
* @return the protocol scheme for this FileSystem.
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
public String getScheme() {
throw new UnsupportedOperationException("Not implemented by the "
+ getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Return an alternative protocol scheme for the FileSystem.
*
* @return the protocol scheme for the FileSystem.
*/
public String getAlternativeScheme(){
return "";
}
/**
* Returns a URI which identifies this FileSystem.
*
* @return the URI of this filesystem.
*/
public abstract URI getUri();
/**
* Return a canonicalized form of this FileSystem's URI.
*
* The default implementation simply calls {@link #canonicalizeUri(URI)}
* on the filesystem's own URI, so subclasses typically only need to
* implement that method.
*
* @see #canonicalizeUri(URI)
* @return the URI of this filesystem.
*/
protected URI getCanonicalUri() {
return canonicalizeUri(getUri());
}
/**
* Canonicalize the given URI.
*
* This is implementation-dependent, and may for example consist of
* canonicalizing the hostname using DNS and adding the default
* port if not specified.
*
* The default implementation simply fills in the default port if
* not specified and if {@link #getDefaultPort()} returns a
* default port.
*
* @param uri url.
* @return URI
* @see NetUtils#getCanonicalUri(URI, int)
*/
protected URI canonicalizeUri(URI uri) {
if (uri.getPort() == -1 && getDefaultPort() > 0) {
// reconstruct the uri with the default port set
try {
uri = new URI(uri.getScheme(), uri.getUserInfo(),
uri.getHost(), getDefaultPort(),
uri.getPath(), uri.getQuery(), uri.getFragment());
} catch (URISyntaxException e) {
// Should never happen!
throw new AssertionError("Valid URI became unparseable: " +
uri);
}
}
return uri;
}
/**
* Get the default port for this FileSystem.
* @return the default port or 0 if there isn't one
*/
protected int getDefaultPort() {
return 0;
}
protected static FileSystem getFSofPath(final Path absOrFqPath,
final Configuration conf)
throws UnsupportedFileSystemException, IOException {
absOrFqPath.checkNotSchemeWithRelative();
absOrFqPath.checkNotRelative();
// Uses the default FileSystem if not fully qualified
return get(absOrFqPath.toUri(), conf);
}
/**
* Get a canonical service name for this FileSystem.
* The token cache is the only user of the canonical service name,
* and uses it to lookup this FileSystem's service tokens.
* If the file system provides a token of its own then it must have a
* canonical name, otherwise the canonical name can be null.
*
* Default implementation: If the FileSystem has child file systems
* (such as an embedded file system) then it is assumed that the FS has no
* tokens of its own and hence returns a null name; otherwise a service
* name is built using Uri and port.
*
* @return a service string that uniquely identifies this file system, null
* if the filesystem does not implement tokens
* @see SecurityUtil#buildDTServiceName(URI, int)
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@Override
public String getCanonicalServiceName() {
return (getChildFileSystems() == null)
? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
: null;
}
/**
* @return uri to string.
* @deprecated call {@link #getUri()} instead.
*/
@Deprecated
public String getName() { return getUri().toString(); }
/**
* @deprecated call {@link #get(URI, Configuration)} instead.
*
* @param name name.
* @param conf configuration.
* @return file system.
* @throws IOException If an I/O error occurred.
*/
@Deprecated
public static FileSystem getNamed(String name, Configuration conf)
throws IOException {
return get(URI.create(fixName(name)), conf);
}
/** Update old-format filesystem names, for back-compatibility. This should
* eventually be replaced with a checkName() method that throws an exception
* for old-format names.
*/
private static String fixName(String name) {
// convert old-format name to new-format name
if (name.equals("local")) { // "local" is now "file:///".
LOGGER.warn("\"local\" is a deprecated filesystem name."
+" Use \"file:///\" instead.");
name = "file:///";
} else if (name.indexOf('/')==-1) { // unqualified is "hdfs://"
LOGGER.warn("\""+name+"\" is a deprecated filesystem name."
+" Use \"hdfs://"+name+"/\" instead.");
name = "hdfs://"+name;
}
return name;
}
/**
* Get the local FileSystem.
* @param conf the configuration to configure the FileSystem with
* if it is newly instantiated.
* @return a LocalFileSystem
* @throws IOException if somehow the local FS cannot be instantiated.
*/
public static LocalFileSystem getLocal(Configuration conf)
throws IOException {
return ensureLocalFileSystem(get(LocalFileSystem.NAME, conf));
}
/**
* Get a FileSystem for this URI's scheme and authority.
* <ol>
* <li>
* If the configuration has the property
* {@code "fs.$SCHEME.impl.disable.cache"} set to true,
* a new instance will be created, initialized with the supplied URI and
* configuration, then returned without being cached.
* </li>
* <li>
* If the there is a cached FS instance matching the same URI, it will
* be returned.
* </li>
* <li>
* Otherwise: a new FS instance will be created, initialized with the
* configuration and URI, cached and returned to the caller.
* </li>
* </ol>
* @param uri uri of the filesystem.
* @param conf configrution.
* @return filesystem instance.
* @throws IOException if the FileSystem cannot be instantiated.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if ((scheme.equals(defaultUri.getScheme()) // if scheme matches default
|| scheme.equals(getAlternativeDefaultScheme(conf))) // if scheme matches default alternative
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
/**
* Returns the FileSystem for this URI's scheme and authority and the
* given user. Internally invokes {@link #newInstance(URI, Configuration)}
* @param uri uri of the filesystem.
* @param conf the configuration to use
* @param user to perform the get as
* @return filesystem instance
* @throws IOException if the FileSystem cannot be instantiated.
* @throws InterruptedException If the {@code UGI.callAs()} call was
* somehow interrupted.
*/
public static FileSystem newInstance(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.callAs(() -> newInstance(uri, conf));
}
/**
* Returns the FileSystem for this URI's scheme and authority.
* The entire URI is passed to the FileSystem instance's initialize method.
* This always returns a new FileSystem object.
* @param uri FS URI
* @param config configuration to use
* @return the new FS instance
* @throws IOException FS creation or initialization failure.
*/
public static FileSystem newInstance(URI uri, Configuration config)
throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null) { // no scheme: use default FS
return newInstance(config);
}
if (authority == null) { // no authority
URI defaultUri = getDefaultUri(config);
if ((scheme.equals(defaultUri.getScheme()) // if scheme matches default
|| scheme.equals(getAlternativeDefaultScheme(config))) // if scheme matches default alternative
&& defaultUri.getAuthority() != null) { // & default has authority
return newInstance(defaultUri, config); // return default
}
}
return CACHE.getUnique(uri, config);
}
/**
* Returns a unique configured FileSystem implementation for the default
* filesystem of the supplied configuration.
* This always returns a new FileSystem object.
* @param conf the configuration to use
* @return the new FS instance
* @throws IOException FS creation or initialization failure.
*/
public static FileSystem newInstance(Configuration conf) throws IOException {
return newInstance(getDefaultUri(conf), conf);
}
/**
* Get a unique local FileSystem object.
* @param conf the configuration to configure the FileSystem with
* @return a new LocalFileSystem object.
* @throws IOException FS creation or initialization failure.
*/
public static LocalFileSystem newInstanceLocal(Configuration conf)
throws IOException {
return ensureLocalFileSystem(newInstance(LocalFileSystem.NAME, conf));
}
private static LocalFileSystem ensureLocalFileSystem(FileSystem fileSystem) {
if (fileSystem instanceof LocalFileSystem) {
return (LocalFileSystem) fileSystem;
}
return new LocalFileSystemWrapper(fileSystem);
}
/**
* Close all cached FileSystem instances. After this operation, they
* may not be used in any operations.
*
* @throws IOException a problem arose closing one or more filesystem.
*/
public static void closeAll() throws IOException {
debugLogFileSystemClose("closeAll", "");
CACHE.closeAll();
}
/**
* Close all cached FileSystem instances for a given UGI.
* Be sure those filesystems are not used anymore.
* @param ugi user group info to close
* @throws IOException a problem arose closing one or more filesystem.
*/
public static void closeAllForUGI(UserGroupInformation ugi)
throws IOException {
debugLogFileSystemClose("closeAllForUGI", "UGI: " + ugi);
CACHE.closeAll(ugi);
}
private static void debugLogFileSystemClose(String methodName,
String additionalInfo) {
if (LOGGER.isDebugEnabled()) {
Throwable throwable = new Throwable().fillInStackTrace();
LOGGER.debug("FileSystem.{}() by method: {}); {}", methodName,
throwable.getStackTrace()[2], additionalInfo);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("FileSystem.{}() full stack trace:", methodName,
throwable);
}
}
}
/**
* Qualify a path to one which uses this FileSystem and, if relative,
* made absolute.
* @param path to qualify.
* @return this path if it contains a scheme and authority and is absolute, or
* a new path that includes a path and authority and is fully qualified
* @see Path#makeQualified(URI, Path)
* @throws IllegalArgumentException if the path has a schema/URI different
* from this FileSystem.
*/
public Path makeQualified(Path path) {
checkPath(path);
return path.makeQualified(this.getUri(), this.getWorkingDirectory());
}
/**
* Get a new delegation token for this FileSystem.
* This is an internal method that should have been declared protected
* but wasn't historically.
* Callers should use {@link #addDelegationTokens(String, Credentials)}
*
* @param renewer the account name that is allowed to renew the token.
* @return a new delegation token or null if the FS does not support tokens.
* @throws IOException on any problem obtaining a token
*/
@InterfaceAudience.Private()
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return null;
}
/**
* Get all the immediate child FileSystems embedded in this FileSystem.
* It does not recurse and get grand children. If a FileSystem
* has multiple child FileSystems, then it must return a unique list
* of those FileSystems. Default is to return null to signify no children.
*
* @return FileSystems that are direct children of this FileSystem,
* or null for "no children"
*/
@InterfaceAudience.LimitedPrivate({ "HDFS" })
@VisibleForTesting
public FileSystem[] getChildFileSystems() {
return null;
}
@InterfaceAudience.Private
@Override
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException {
return getChildFileSystems();
}
/**
* Create a file with the provided permission.
*
* The permission of the file is set to be the provided permission as in
* setPermission, not permission{@literal &~}umask
*
* The HDFS implementation is implemented using two RPCs.
* It is understood that it is inefficient,
* but the implementation is thread-safe. The other option is to change the
* value of umask in configuration to be 0, but it is not thread-safe.
*
* @param fs FileSystem
* @param file the name of the file to be created
* @param permission the permission of the file
* @return an output stream
* @throws IOException IO failure
*/
public static FSDataOutputStream create(FileSystem fs,
Path file, FsPermission permission) throws IOException {
// create the file with default permission
FSDataOutputStream out = fs.create(file);
// set its permission to the supplied one
fs.setPermission(file, permission);
return out;
}
/**
* Create a directory with the provided permission.
* The permission of the directory is set to be the provided permission as in
* setPermission, not permission{@literal &~}umask
*
* @see #create(FileSystem, Path, FsPermission)
*
* @param fs FileSystem handle
* @param dir the name of the directory to be created
* @param permission the permission of the directory
* @return true if the directory creation succeeds; false otherwise
* @throws IOException A problem creating the directories.
*/
public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
throws IOException {
// create the directory using the default permission
boolean result = fs.mkdirs(dir);
// set its permission to be the supplied one
fs.setPermission(dir, permission);
return result;
}
///////////////////////////////////////////////////////////////
// FileSystem
///////////////////////////////////////////////////////////////
protected FileSystem() {
super(null);
}
/**
* Check that a Path belongs to this FileSystem.
*
* The base implementation performs case insensitive equality checks
* of the URIs' schemes and authorities. Subclasses may implement slightly
* different checks.
* @param path to check
* @throws IllegalArgumentException if the path is not considered to be
* part of this FileSystem.
*
*/
protected void checkPath(Path path) {
Preconditions.checkArgument(path != null, "null path");
URI uri = path.toUri();
String thatScheme = uri.getScheme();
if (thatScheme == null) // fs is relative
return;
URI thisUri = getCanonicalUri();
String thisScheme = thisUri.getScheme();
String thisAlternativeScheme = getAlternativeScheme();
//authority and scheme are not case sensitive
if (thisScheme.equalsIgnoreCase(thatScheme) || thisAlternativeScheme.equalsIgnoreCase(thatScheme) ) {// schemes match
String thisAuthority = thisUri.getAuthority();
String thatAuthority = uri.getAuthority();
if (thatAuthority == null && // path's authority is null
thisAuthority != null) { // fs has an authority
URI defaultUri = getDefaultUri(getConf());
if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
uri = defaultUri; // schemes match, so use this uri instead
} else {
uri = null; // can't determine auth of the path
}
}
if (uri != null) {
// canonicalize uri before comparing with this fs
uri = canonicalizeUri(uri);
thatAuthority = uri.getAuthority();
if (thisAuthority == thatAuthority || // authorities match
(thisAuthority != null &&
thisAuthority.equalsIgnoreCase(thatAuthority)))
return;
}
}
throw new IllegalArgumentException("Wrong FS: " + path +
", expected: " + this.getUri());
}
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For nonexistent
* file or regions, {@code null} is returned.
*
* <pre>
* if f == null :
* result = null
* elif f.getLen() {@literal <=} start:
* result = []
* else result = [ locations(FS, b) for b in blocks(FS, p, s, s+l)]
* </pre>
* This call is most helpful with and distributed filesystem
* where the hostnames of machines that contain blocks of the given file
* can be determined.
*
* The default implementation returns an array containing one element:
* <pre>
* BlockLocation( { "localhost:9866" }, { "localhost" }, 0, file.getLen())
* </pre>
*
* In HDFS, if file is three-replicated, the returned array contains
* elements like:
* <pre>
* BlockLocation(offset: 0, length: BLOCK_SIZE,
* hosts: {"host1:9866", "host2:9866, host3:9866"})
* BlockLocation(offset: BLOCK_SIZE, length: BLOCK_SIZE,
* hosts: {"host2:9866", "host3:9866, host4:9866"})
* </pre>
*
* And if a file is erasure-coded, the returned BlockLocation are logical
* block groups.
*
* Suppose we have a RS_3_2 coded file (3 data units and 2 parity units).
* 1. If the file size is less than one stripe size, say 2 * CELL_SIZE, then
* there will be one BlockLocation returned, with 0 offset, actual file size
* and 4 hosts (2 data blocks and 2 parity blocks) hosting the actual blocks.
* 3. If the file size is less than one group size but greater than one
* stripe size, then there will be one BlockLocation returned, with 0 offset,
* actual file size with 5 hosts (3 data blocks and 2 parity blocks) hosting
* the actual blocks.
* 4. If the file size is greater than one group size, 3 * BLOCK_SIZE + 123
* for example, then the result will be like:
* <pre>
* BlockLocation(offset: 0, length: 3 * BLOCK_SIZE, hosts: {"host1:9866",
* "host2:9866","host3:9866","host4:9866","host5:9866"})
* BlockLocation(offset: 3 * BLOCK_SIZE, length: 123, hosts: {"host1:9866",
* "host4:9866", "host5:9866"})
* </pre>
*
* @param file FilesStatus to get data from
* @param start offset into the given file
* @param len length for which to get locations for
* @throws IOException IO failure
* @return block location array.
*/
public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
if (file == null) {
return null;
}
if (start < 0 || len < 0) {
throw new IllegalArgumentException("Invalid start or len parameter");
}
if (file.getLen() <= start) {
return new BlockLocation[0];
}
String[] name = {"localhost:9866"};
String[] host = {"localhost"};
return new BlockLocation[] {
new BlockLocation(name, host, 0, file.getLen()) };
}
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For a nonexistent
* file or regions, {@code null} is returned.
*
* This call is most helpful with location-aware distributed
* filesystems, where it returns hostnames of machines that
* contain the given file.
*
* A FileSystem will normally return the equivalent result
* of passing the {@code FileStatus} of the path to
* {@link #getFileBlockLocations(FileStatus, long, long)}
*
* @param p path is used to identify an FS since an FS could have
* another FS that it could be delegating the call to
* @param start offset into the given file
* @param len length for which to get locations for
* @throws FileNotFoundException when the path does not exist
* @throws IOException IO failure
* @return block location array.
*/
public BlockLocation[] getFileBlockLocations(Path p,
long start, long len) throws IOException {
if (p == null) {
throw new NullPointerException();
}
FileStatus file = getFileStatus(p);
return getFileBlockLocations(file, start, len);
}
/**
* Return a set of server default configuration values.
* @return server default configuration values
* @throws IOException IO failure
* @deprecated use {@link #getServerDefaults(Path)} instead
*/
@Deprecated
public FsServerDefaults getServerDefaults() throws IOException {
Configuration config = getConf();
// CRC32 is chosen as default as it is available in all
// releases that support checksum.
// The client trash configuration is ignored.
return new FsServerDefaults(getDefaultBlockSize(),
config.getInt("io.bytes.per.checksum", 512),
64 * 1024,
getDefaultReplication(),
config.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
false,
FS_TRASH_INTERVAL_DEFAULT,
DataChecksum.Type.CRC32,
"");
}
/**
* Return a set of server default configuration values.
* @param p path is used to identify an FS since an FS could have
* another FS that it could be delegating the call to
* @return server default configuration values
* @throws IOException IO failure
*/
public FsServerDefaults getServerDefaults(Path p) throws IOException {
return getServerDefaults();
}
/**
* Return the fully-qualified path of path, resolving the path
* through any symlinks or mount point.
* @param p path to be resolved
* @return fully qualified path
* @throws FileNotFoundException if the path is not present
* @throws IOException for any other error
*/
public Path resolvePath(final Path p) throws IOException {
checkPath(p);
return getFileStatus(p).getPath();
}
/**
* Opens an FSDataInputStream at the indicated Path.