20
20
21
21
import com .github .benmanes .caffeine .cache .Cache ;
22
22
import com .github .benmanes .caffeine .cache .Caffeine ;
23
- import com .github .benmanes .caffeine .cache .RemovalListener ;
24
23
import java .io .IOException ;
25
24
import java .io .InputStream ;
26
25
import java .net .URI ;
27
- import java .time .Duration ;
28
26
import java .util .Collections ;
29
27
import java .util .List ;
30
28
import java .util .Map ;
31
- import java .util .concurrent .ScheduledExecutorService ;
32
29
import java .util .concurrent .TimeUnit ;
33
30
import java .util .function .Consumer ;
34
31
import java .util .function .Supplier ;
42
39
import org .apache .iceberg .rest .HTTPClient ;
43
40
import org .apache .iceberg .rest .RESTClient ;
44
41
import org .apache .iceberg .rest .ResourcePaths ;
45
- import org .apache .iceberg .rest .auth .AuthConfig ;
42
+ import org .apache .iceberg .rest .auth .AuthManager ;
43
+ import org .apache .iceberg .rest .auth .AuthManagers ;
44
+ import org .apache .iceberg .rest .auth .AuthSession ;
46
45
import org .apache .iceberg .rest .auth .OAuth2Properties ;
47
46
import org .apache .iceberg .rest .auth .OAuth2Util ;
48
- import org .apache .iceberg .rest .auth .OAuth2Util .AuthSession ;
49
- import org .apache .iceberg .rest .responses .OAuthTokenResponse ;
50
47
import org .apache .iceberg .util .PropertyUtil ;
51
- import org .apache .iceberg .util .ThreadPools ;
52
48
import org .immutables .value .Value ;
53
49
import org .slf4j .Logger ;
54
50
import org .slf4j .LoggerFactory ;
64
60
65
61
@ Value .Immutable
66
62
public abstract class S3V4RestSignerClient
67
- extends AbstractAws4Signer <AwsS3V4SignerParams , Aws4PresignerParams > {
63
+ extends AbstractAws4Signer <AwsS3V4SignerParams , Aws4PresignerParams > implements AutoCloseable {
68
64
69
65
private static final Logger LOG = LoggerFactory .getLogger (S3V4RestSignerClient .class );
70
66
public static final String S3_SIGNER_URI = "s3.signer.uri" ;
@@ -81,13 +77,14 @@ public abstract class S3V4RestSignerClient
81
77
private static final String SCOPE = "sign" ;
82
78
83
79
@ SuppressWarnings ("immutables:incompat" )
84
- private static volatile ScheduledExecutorService tokenRefreshExecutor ;
80
+ private volatile AuthManager authManager ;
85
81
86
- @ SuppressWarnings ("immutables:incompat" )
87
- private static volatile RESTClient httpClient ;
82
+ @ SuppressWarnings ({"immutables:incompat" , "VisibilityModifier" })
83
+ @ VisibleForTesting
84
+ static volatile RESTClient httpClient ;
88
85
89
86
@ SuppressWarnings ("immutables:incompat" )
90
- private static volatile Cache < String , AuthSession > authSessionCache ;
87
+ private volatile AuthSession authSession ;
91
88
92
89
public abstract Map <String , String > properties ();
93
90
@@ -138,52 +135,6 @@ boolean keepTokenRefreshed() {
138
135
OAuth2Properties .TOKEN_REFRESH_ENABLED_DEFAULT );
139
136
}
140
137
141
- @ VisibleForTesting
142
- ScheduledExecutorService tokenRefreshExecutor () {
143
- if (!keepTokenRefreshed ()) {
144
- return null ;
145
- }
146
-
147
- if (null == tokenRefreshExecutor ) {
148
- synchronized (S3V4RestSignerClient .class ) {
149
- if (null == tokenRefreshExecutor ) {
150
- tokenRefreshExecutor = ThreadPools .newScheduledPool ("s3-signer-token-refresh" , 1 );
151
- }
152
- }
153
- }
154
-
155
- return tokenRefreshExecutor ;
156
- }
157
-
158
- private Cache <String , AuthSession > authSessionCache () {
159
- if (null == authSessionCache ) {
160
- synchronized (S3V4RestSignerClient .class ) {
161
- if (null == authSessionCache ) {
162
- long expirationIntervalMs =
163
- PropertyUtil .propertyAsLong (
164
- properties (),
165
- CatalogProperties .AUTH_SESSION_TIMEOUT_MS ,
166
- CatalogProperties .AUTH_SESSION_TIMEOUT_MS_DEFAULT );
167
-
168
- authSessionCache =
169
- Caffeine .newBuilder ()
170
- .expireAfterAccess (Duration .ofMillis (expirationIntervalMs ))
171
- .removalListener (
172
- (RemovalListener <String , AuthSession >)
173
- (id , auth , cause ) -> {
174
- if (null != auth ) {
175
- LOG .trace ("Stopping refresh for AuthSession" );
176
- auth .stopRefreshing ();
177
- }
178
- })
179
- .build ();
180
- }
181
- }
182
- }
183
-
184
- return authSessionCache ;
185
- }
186
-
187
138
private RESTClient httpClient () {
188
139
if (null == httpClient ) {
189
140
synchronized (S3V4RestSignerClient .class ) {
@@ -200,86 +151,40 @@ private RESTClient httpClient() {
200
151
return httpClient ;
201
152
}
202
153
203
- private AuthSession authSession () {
204
- String token = token ().get ();
205
- if (null != token ) {
206
- return authSessionCache ()
207
- .get (
208
- token ,
209
- id -> {
210
- // this client will be reused for token refreshes; it must contain an empty auth
211
- // session in order to avoid interfering with refreshed tokens
212
- RESTClient refreshClient =
213
- httpClient ().withAuthSession (org .apache .iceberg .rest .auth .AuthSession .EMPTY );
214
- return AuthSession .fromAccessToken (
215
- refreshClient ,
216
- tokenRefreshExecutor (),
217
- token ,
218
- expiresAtMillis (properties ()),
219
- new AuthSession (
220
- ImmutableMap .of (),
221
- AuthConfig .builder ()
222
- .token (token )
223
- .credential (credential ())
224
- .scope (SCOPE )
225
- .oauth2ServerUri (oauth2ServerUri ())
226
- .optionalOAuthParams (optionalOAuthParams ())
227
- .build ()));
228
- });
229
- }
230
-
231
- if (credentialProvided ()) {
232
- return authSessionCache ()
233
- .get (
234
- credential (),
235
- id -> {
236
- AuthSession session =
237
- new AuthSession (
238
- ImmutableMap .of (),
239
- AuthConfig .builder ()
240
- .credential (credential ())
241
- .scope (SCOPE )
242
- .oauth2ServerUri (oauth2ServerUri ())
243
- .optionalOAuthParams (optionalOAuthParams ())
244
- .build ());
245
- long startTimeMillis = System .currentTimeMillis ();
246
- // this client will be reused for token refreshes; it must contain an empty auth
247
- // session in order to avoid interfering with refreshed tokens
248
- RESTClient refreshClient =
249
- httpClient ().withAuthSession (org .apache .iceberg .rest .auth .AuthSession .EMPTY );
250
- OAuthTokenResponse authResponse =
251
- OAuth2Util .fetchToken (
252
- refreshClient ,
253
- session .headers (),
254
- credential (),
255
- SCOPE ,
256
- oauth2ServerUri (),
257
- optionalOAuthParams ());
258
- return AuthSession .fromTokenResponse (
259
- refreshClient , tokenRefreshExecutor (), authResponse , startTimeMillis , session );
260
- });
154
+ @ VisibleForTesting
155
+ AuthSession authSession () {
156
+ if (null == authSession ) {
157
+ synchronized (S3V4RestSignerClient .class ) {
158
+ if (null == authSession ) {
159
+ authManager = AuthManagers .loadAuthManager ("s3-signer" , properties ());
160
+ ImmutableMap .Builder <String , String > properties =
161
+ ImmutableMap .<String , String >builder ()
162
+ .putAll (properties ())
163
+ .putAll (optionalOAuthParams ())
164
+ .put (OAuth2Properties .OAUTH2_SERVER_URI , oauth2ServerUri ())
165
+ .put (OAuth2Properties .TOKEN_REFRESH_ENABLED , String .valueOf (keepTokenRefreshed ()))
166
+ .put (OAuth2Properties .SCOPE , SCOPE );
167
+ String token = token ().get ();
168
+ if (null != token ) {
169
+ properties .put (OAuth2Properties .TOKEN , token );
170
+ }
171
+
172
+ if (credentialProvided ()) {
173
+ properties .put (OAuth2Properties .CREDENTIAL , credential ());
174
+ }
175
+
176
+ authSession = authManager .tableSession (httpClient (), properties .buildKeepingLast ());
177
+ }
178
+ }
261
179
}
262
180
263
- return AuthSession . empty () ;
181
+ return authSession ;
264
182
}
265
183
266
184
private boolean credentialProvided () {
267
185
return null != credential () && !credential ().isEmpty ();
268
186
}
269
187
270
- private Long expiresAtMillis (Map <String , String > properties ) {
271
- if (properties .containsKey (OAuth2Properties .TOKEN_EXPIRES_IN_MS )) {
272
- long expiresInMillis =
273
- PropertyUtil .propertyAsLong (
274
- properties ,
275
- OAuth2Properties .TOKEN_EXPIRES_IN_MS ,
276
- OAuth2Properties .TOKEN_EXPIRES_IN_MS_DEFAULT );
277
- return System .currentTimeMillis () + expiresInMillis ;
278
- } else {
279
- return null ;
280
- }
281
- }
282
-
283
188
@ Value .Check
284
189
protected void check () {
285
190
Preconditions .checkArgument (
@@ -377,6 +282,12 @@ public SdkHttpFullRequest sign(
377
282
return mutableRequest .build ();
378
283
}
379
284
285
+ @ Override
286
+ public void close () throws Exception {
287
+ IoUtils .closeQuietlyV2 (authSession , null );
288
+ IoUtils .closeQuietlyV2 (authManager , null );
289
+ }
290
+
380
291
/**
381
292
* Only add body for DeleteObjectsRequest. Refer to
382
293
* https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestSyntax
0 commit comments