2121import org .apache .fluss .config .Configuration ;
2222import org .apache .fluss .fs .FileSystem ;
2323import org .apache .fluss .fs .FileSystemPlugin ;
24+ import org .apache .fluss .fs .abfs .token .AzureDelegationTokenReceiver ;
2425
2526import org .apache .hadoop .fs .azurebfs .AzureBlobFileSystem ;
2627import org .slf4j .Logger ;
2728import org .slf4j .LoggerFactory ;
2829
2930import java .io .IOException ;
3031import java .net .URI ;
32+ import java .util .Objects ;
33+
34+ import static org .apache .fluss .fs .abfs .token .AzureDelegationTokenProvider .CLIENT_ID ;
35+ import static org .apache .fluss .fs .abfs .token .AzureDelegationTokenReceiver .PROVIDER_CONFIG_NAME ;
3136
3237/** Simple factory for the Azure File System. */
3338public class AbfsFileSystemPlugin implements FileSystemPlugin {
@@ -47,12 +52,36 @@ public String getScheme() {
4752 public FileSystem create (URI fsUri , Configuration flussConfig ) throws IOException {
4853 org .apache .hadoop .conf .Configuration hadoopConfig = getHadoopConfiguration (flussConfig );
4954
55+ setCredentialProvider (hadoopConfig );
56+
5057 // create the Google Hadoop FileSystem
5158 org .apache .hadoop .fs .FileSystem fs = new AzureBlobFileSystem ();
5259 fs .initialize (getInitURI (fsUri , hadoopConfig ), hadoopConfig );
5360 return new AzureFileSystem (getScheme (), fs , hadoopConfig );
5461 }
5562
63+ private void setCredentialProvider (org .apache .hadoop .conf .Configuration hadoopConfig ) {
64+ if (hadoopConfig .get (CLIENT_ID ) == null ) {
65+ if (Objects .equals (getScheme (), "abfs" )) {
66+ AzureDelegationTokenReceiver .updateHadoopConfig (hadoopConfig );
67+ } else if (Objects .equals (getScheme (), "abfss" )) {
68+ AzureDelegationTokenReceiver .updateHadoopConfig (hadoopConfig );
69+ } else if (Objects .equals (getScheme (), "wasb" )) {
70+ AzureDelegationTokenReceiver .updateHadoopConfig (hadoopConfig );
71+ } else if (Objects .equals (getScheme (), "wasbs" )) {
72+ AzureDelegationTokenReceiver .updateHadoopConfig (hadoopConfig );
73+ } else {
74+ throw new IllegalArgumentException ("Unsupported scheme: " + getScheme ());
75+ }
76+ LOG .info (
77+ "{} is not set, using credential provider {}." ,
78+ CLIENT_ID ,
79+ hadoopConfig .get (PROVIDER_CONFIG_NAME ));
80+ } else {
81+ LOG .info ("{} is set, using provided access key id and secret." , CLIENT_ID );
82+ }
83+ }
84+
5685 org .apache .hadoop .conf .Configuration getHadoopConfiguration (Configuration flussConfig ) {
5786 org .apache .hadoop .conf .Configuration conf = new org .apache .hadoop .conf .Configuration ();
5887 if (flussConfig == null ) {
0 commit comments