3232import org .apache .hadoop .hive .conf .HiveConf ;
3333import org .apache .hadoop .hive .metastore .HiveMetaStoreClient ;
3434import org .apache .hadoop .hive .metastore .api .AlreadyExistsException ;
35- import org .apache .hadoop .hive .metastore .api .MetaException ;
3635import org .apache .hadoop .hive .metastore .api .Table ;
3736import org .apache .thrift .TException ;
3837
3938import lombok .NonNull ;
4039import lombok .extern .slf4j .Slf4j ;
4140
41+ import java .io .Closeable ;
4242import java .io .File ;
4343import java .io .IOException ;
44+ import java .io .Serializable ;
4445import java .net .MalformedURLException ;
4546import java .nio .file .Files ;
47+ import java .nio .file .Path ;
4648import java .nio .file .Paths ;
4749import java .util .List ;
4850import java .util .Objects ;
4951
5052@ Slf4j
51- public class HiveMetaStoreProxy {
52- private HiveMetaStoreClient hiveMetaStoreClient ;
53- private static volatile HiveMetaStoreProxy INSTANCE = null ;
53+ public class HiveMetaStoreProxy implements Closeable , Serializable {
5454 private static final List <String > HADOOP_CONF_FILES = ImmutableList .of ("hive-site.xml" );
5555
56- private HiveMetaStoreProxy (ReadonlyConfig readonlyConfig ) {
57- String metastoreUri = readonlyConfig .get (HiveOptions .METASTORE_URI );
58- String hiveHadoopConfigPath = readonlyConfig .get (HiveConfig .HADOOP_CONF_PATH );
59- String hiveSitePath = readonlyConfig .get (HiveConfig .HIVE_SITE_PATH );
60- HiveConf hiveConf = new HiveConf ();
61- hiveConf .set ("hive.metastore.uris" , metastoreUri );
62- try {
63- if (StringUtils .isNotBlank (hiveHadoopConfigPath )) {
64- HADOOP_CONF_FILES .forEach (
65- confFile -> {
66- java .nio .file .Path path = Paths .get (hiveHadoopConfigPath , confFile );
67- if (Files .exists (path )) {
68- try {
69- hiveConf .addResource (path .toUri ().toURL ());
70- } catch (IOException e ) {
71- log .warn (
72- "Error adding Hadoop resource {}, resource was not added" ,
73- path ,
74- e );
75- }
76- }
77- });
78- }
56+ private final String metastoreUri ;
57+ private final String hadoopConfDir ;
58+ private final String hiveSitePath ;
59+ private final boolean kerberosEnabled ;
60+ private final boolean remoteUserEnabled ;
7961
80- if (StringUtils .isNotBlank (hiveSitePath )) {
81- hiveConf .addResource (new File (hiveSitePath ).toURI ().toURL ());
82- }
62+ private final String krb5Path ;
63+ private final String principal ;
64+ private final String keytabPath ;
65+ private final String remoteUser ;
8366
84- log .info ("hive client conf:{}" , hiveConf );
85- if (HiveMetaStoreProxyUtils .enableKerberos (readonlyConfig )) {
86- // login Kerberos
87- Configuration authConf = new Configuration ();
88- authConf .set ("hadoop.security.authentication" , "kerberos" );
89- this .hiveMetaStoreClient =
90- HadoopLoginFactory .loginWithKerberos (
91- authConf ,
92- readonlyConfig .get (HdfsSourceConfigOptions .KRB5_PATH ),
93- readonlyConfig .get (HdfsSourceConfigOptions .KERBEROS_PRINCIPAL ),
94- readonlyConfig .get (HdfsSourceConfigOptions .KERBEROS_KEYTAB_PATH ),
95- (conf , userGroupInformation ) -> {
96- return new HiveMetaStoreClient (hiveConf );
97- });
98- return ;
67+ private HiveMetaStoreClient hiveClient ;
68+
69+ public HiveMetaStoreProxy (ReadonlyConfig config ) {
70+ this .metastoreUri = config .get (HiveOptions .METASTORE_URI );
71+ this .hadoopConfDir = config .get (HiveConfig .HADOOP_CONF_PATH );
72+ this .hiveSitePath = config .get (HiveConfig .HIVE_SITE_PATH );
73+ this .kerberosEnabled = HiveMetaStoreProxyUtils .enableKerberos (config );
74+ this .remoteUserEnabled = HiveMetaStoreProxyUtils .enableRemoteUser (config );
75+ this .krb5Path = config .get (HdfsSourceConfigOptions .KRB5_PATH );
76+ this .principal = config .get (HdfsSourceConfigOptions .KERBEROS_PRINCIPAL );
77+ this .keytabPath = config .get (HdfsSourceConfigOptions .KERBEROS_KEYTAB_PATH );
78+ this .remoteUser = config .get (HdfsSourceConfigOptions .REMOTE_USER );
79+ }
80+
81+ private synchronized HiveMetaStoreClient getClient () {
82+ if (hiveClient == null ) {
83+ hiveClient = initializeClient ();
84+ }
85+ return hiveClient ;
86+ }
87+
88+ private HiveMetaStoreClient initializeClient () {
89+ HiveConf hiveConf = buildHiveConf ();
90+ try {
91+ if (kerberosEnabled ) {
92+ return loginWithKerberos (hiveConf );
9993 }
100- if (HiveMetaStoreProxyUtils .enableRemoteUser (readonlyConfig )) {
101- this .hiveMetaStoreClient =
102- HadoopLoginFactory .loginWithRemoteUser (
103- new Configuration (),
104- readonlyConfig .get (HdfsSourceConfigOptions .REMOTE_USER ),
105- (conf , userGroupInformation ) -> {
106- return new HiveMetaStoreClient (hiveConf );
107- });
108- return ;
94+ if (remoteUserEnabled ) {
95+ return loginWithRemoteUser (hiveConf );
10996 }
110- this .hiveMetaStoreClient = new HiveMetaStoreClient (hiveConf );
111- } catch (MetaException e ) {
112- String errorMsg =
113- String .format (
114- "Using this hive uris [%s] to initialize "
115- + "hive metastore client instance failed" ,
116- metastoreUri );
117- throw new HiveConnectorException (
118- HiveConnectorErrorCode .INITIALIZE_HIVE_METASTORE_CLIENT_FAILED , errorMsg , e );
119- } catch (MalformedURLException e ) {
120- String errorMsg =
121- String .format (
122- "Using this hive uris [%s], hive conf [%s] to initialize "
123- + "hive metastore client instance failed" ,
124- metastoreUri , readonlyConfig .get (HiveOptions .HIVE_SITE_PATH ));
125- throw new HiveConnectorException (
126- HiveConnectorErrorCode .INITIALIZE_HIVE_METASTORE_CLIENT_FAILED , errorMsg , e );
97+ return new HiveMetaStoreClient (hiveConf );
12798 } catch (Exception e ) {
99+ String errMsg =
100+ String .format (
101+ "Failed to initialize HiveMetaStoreClient [uris=%s, hiveSite=%s]" ,
102+ metastoreUri , hiveSitePath );
128103 throw new HiveConnectorException (
129- HiveConnectorErrorCode .INITIALIZE_HIVE_METASTORE_CLIENT_FAILED ,
130- "Login form kerberos failed" ,
131- e );
104+ HiveConnectorErrorCode .INITIALIZE_HIVE_METASTORE_CLIENT_FAILED , errMsg , e );
132105 }
133106 }
134107
135- public static HiveMetaStoreProxy getInstance (ReadonlyConfig readonlyConfig ) {
136- if (INSTANCE == null ) {
137- synchronized (HiveMetaStoreProxy .class ) {
138- if (INSTANCE == null ) {
139- INSTANCE = new HiveMetaStoreProxy (readonlyConfig );
108+ private HiveConf buildHiveConf () {
109+ HiveConf hiveConf = new HiveConf ();
110+ hiveConf .set ("hive.metastore.uris" , metastoreUri );
111+
112+ if (StringUtils .isNotBlank (hadoopConfDir )) {
113+ for (String fileName : HADOOP_CONF_FILES ) {
114+ Path path = Paths .get (hadoopConfDir , fileName );
115+ if (Files .exists (path )) {
116+ try {
117+ hiveConf .addResource (path .toUri ().toURL ());
118+ } catch (IOException e ) {
119+ log .warn ("Error adding Hadoop config {}" , path , e );
120+ }
140121 }
141122 }
142123 }
143- return INSTANCE ;
124+ if (StringUtils .isNotBlank (hiveSitePath )) {
125+ try {
126+ hiveConf .addResource (new File (hiveSitePath ).toURI ().toURL ());
127+ } catch (MalformedURLException e ) {
128+ log .warn ("Invalid hiveSitePath {}" , hiveSitePath , e );
129+ }
130+ }
131+ log .info ("Hive client configuration: {}" , hiveConf );
132+ return hiveConf ;
133+ }
134+
135+ private HiveMetaStoreClient loginWithKerberos (HiveConf hiveConf ) throws Exception {
136+ Configuration authConf = new Configuration ();
137+ authConf .set ("hadoop.security.authentication" , "kerberos" );
138+ return HadoopLoginFactory .loginWithKerberos (
139+ authConf ,
140+ krb5Path ,
141+ principal ,
142+ keytabPath ,
143+ (conf , ugi ) -> new HiveMetaStoreClient (hiveConf ));
144+ }
145+
146+ private HiveMetaStoreClient loginWithRemoteUser (HiveConf hiveConf ) throws Exception {
147+ return HadoopLoginFactory .loginWithRemoteUser (
148+ new Configuration (), remoteUser , (conf , ugi ) -> new HiveMetaStoreClient (hiveConf ));
144149 }
145150
146151 public Table getTable (@ NonNull String dbName , @ NonNull String tableName ) {
147152 try {
148- return hiveMetaStoreClient .getTable (dbName , tableName );
153+ return getClient () .getTable (dbName , tableName );
149154 } catch (TException e ) {
150- String errorMsg =
151- String .format ("Get table [%s.%s] information failed" , dbName , tableName );
155+ String msg = String .format ("Failed to get table %s.%s" , dbName , tableName );
152156 throw new HiveConnectorException (
153- HiveConnectorErrorCode .GET_HIVE_TABLE_INFORMATION_FAILED , errorMsg , e );
157+ HiveConnectorErrorCode .GET_HIVE_TABLE_INFORMATION_FAILED , msg , e );
154158 }
155159 }
156160
@@ -159,9 +163,9 @@ public void addPartitions(
159163 throws TException {
160164 for (String partition : partitions ) {
161165 try {
162- hiveMetaStoreClient .appendPartition (dbName , tableName , partition );
163- } catch (AlreadyExistsException e ) {
164- log .warn ("The partition {} are already exists" , partition );
166+ getClient () .appendPartition (dbName , tableName , partition );
167+ } catch (AlreadyExistsException ae ) {
168+ log .warn ("Partition {} already exists" , partition );
165169 }
166170 }
167171 }
@@ -170,14 +174,14 @@ public void dropPartitions(
170174 @ NonNull String dbName , @ NonNull String tableName , List <String > partitions )
171175 throws TException {
172176 for (String partition : partitions ) {
173- hiveMetaStoreClient .dropPartition (dbName , tableName , partition , false );
177+ getClient () .dropPartition (dbName , tableName , partition , false );
174178 }
175179 }
176180
181+ @ Override
177182 public synchronized void close () {
178- if (Objects .nonNull (hiveMetaStoreClient )) {
179- hiveMetaStoreClient .close ();
180- HiveMetaStoreProxy .INSTANCE = null ;
183+ if (Objects .nonNull (hiveClient )) {
184+ hiveClient .close ();
181185 }
182186 }
183187}
0 commit comments