25
25
import com .facebook .presto .spi .TableNotFoundException ;
26
26
import com .fasterxml .jackson .databind .node .ObjectNode ;
27
27
import com .google .common .base .Splitter ;
28
+ import com .google .common .base .Throwables ;
28
29
import com .google .common .collect .Lists ;
29
30
import com .google .common .collect .Maps ;
31
+ import com .google .common .util .concurrent .Futures ;
32
+ import com .google .common .util .concurrent .ListenableFuture ;
30
33
import com .netflix .metacat .common .QualifiedName ;
31
34
import com .netflix .metacat .common .dto .HasMetadata ;
32
35
import com .netflix .metacat .common .dto .PartitionDto ;
35
38
import com .netflix .metacat .common .monitoring .DynamicGauge ;
36
39
import com .netflix .metacat .common .monitoring .LogConstants ;
37
40
import com .netflix .metacat .common .usermetadata .UserMetadataService ;
41
+ import com .netflix .metacat .common .util .ThreadServiceManager ;
38
42
import com .netflix .metacat .converters .PrestoConverters ;
39
43
import com .netflix .metacat .main .presto .split .SplitManager ;
40
44
import com .netflix .metacat .main .services .CatalogService ;
52
56
import java .util .List ;
53
57
import java .util .Map ;
54
58
import java .util .Optional ;
59
+ import java .util .concurrent .TimeUnit ;
55
60
import java .util .stream .Collectors ;
56
61
57
62
public class PartitionServiceImpl implements PartitionService {
@@ -68,6 +73,8 @@ public class PartitionServiceImpl implements PartitionService {
68
73
UserMetadataService userMetadataService ;
69
74
@ Inject
70
75
SessionProvider sessionProvider ;
76
+ @ Inject
77
+ ThreadServiceManager threadServiceManager ;
71
78
72
79
private ConnectorPartitionResult getPartitionResult (QualifiedName name , String filter , List <String > partitionNames , Sort sort , Pageable pageable , boolean includePartitionDetails ) {
73
80
ConnectorPartitionResult result = null ;
@@ -95,13 +102,23 @@ public List<PartitionDto> list(QualifiedName name, String filter, List<String> p
95
102
})
96
103
.collect (Collectors .toList ());
97
104
if (includeUserDefinitionMetadata || includeUserDataMetadata ){
98
- Map <String ,ObjectNode > dataMetadataMap = includeUserDataMetadata ?userMetadataService .getDataMetadataMap (uris ):
99
- Maps .newHashMap ();
100
- Map <String ,ObjectNode > definitionMetadataMap = includeUserDefinitionMetadata ?userMetadataService .getDefinitionMetadataMap (names ):
101
- Maps .newHashMap ();
102
- result .stream ().forEach (partitionDto -> userMetadataService .populateMetadata (partitionDto
103
- , definitionMetadataMap .get (partitionDto .getName ().toString ())
104
- , dataMetadataMap .get (partitionDto .getDataUri ())));
105
+ List <ListenableFuture <Map <String ,ObjectNode >>> futures = Lists .newArrayList ();
106
+ futures .add (threadServiceManager .getExecutor ().submit (() -> includeUserDefinitionMetadata ?
107
+ userMetadataService .getDefinitionMetadataMap (names ) :
108
+ Maps .newHashMap ()));
109
+ futures .add (threadServiceManager .getExecutor ().submit (() -> includeUserDataMetadata ?
110
+ userMetadataService .getDataMetadataMap (uris ):
111
+ Maps .newHashMap ()));
112
+ try {
113
+ List <Map <String ,ObjectNode >> metadataResults = Futures .successfulAsList (futures ).get (1 , TimeUnit .HOURS );
114
+ Map <String ,ObjectNode > definitionMetadataMap = metadataResults .get (0 );
115
+ Map <String ,ObjectNode > dataMetadataMap = metadataResults .get (1 );
116
+ result .stream ().forEach (partitionDto -> userMetadataService .populateMetadata (partitionDto
117
+ , definitionMetadataMap .get (partitionDto .getName ().toString ())
118
+ , dataMetadataMap .get (partitionDto .getDataUri ())));
119
+ } catch (Exception e ) {
120
+ Throwables .propagate (e );
121
+ }
105
122
}
106
123
}
107
124
TagList tags = BasicTagList .of ("catalog" , name .getCatalogName (), "database" , name .getDatabaseName (), "table" , name .getTableName ());
@@ -154,12 +171,12 @@ public PartitionsSaveResponseDto save(QualifiedName name, List<PartitionDto> par
154
171
155
172
// Save metadata
156
173
log .info ("Saving user metadata for partitions for {}" , name );
157
- userMetadataService .saveMetadatas (session .getUser (), partitionDtos , true );
158
174
// delete metadata
159
175
if ( !deletePartitions .isEmpty ()) {
160
176
log .info ("Deleting user metadata for partitions with names {} for {}" , partitionIdsForDeletes , name );
161
177
userMetadataService .deleteMetadatas (deletePartitions , false );
162
178
}
179
+ userMetadataService .saveMetadatas (session .getUser (), partitionDtos , true );
163
180
164
181
result .setUpdated (savePartitionResult .getUpdated ());
165
182
result .setAdded (savePartitionResult .getAdded ());
@@ -204,18 +221,27 @@ public void delete(QualifiedName name, List<String> partitionIds) {
204
221
205
222
@ Override
206
223
public List <QualifiedName > getQualifiedNames (String uri , boolean prefixSearch ){
207
- List <QualifiedName > result = Lists .newArrayList ();
208
-
224
+ List <QualifiedName > result = Lists .newCopyOnWriteArrayList ();
225
+ List < ListenableFuture < Void >> futures = Lists . newArrayList ();
209
226
catalogService .getCatalogNames ().stream ().forEach (catalog -> {
210
227
Session session = sessionProvider .getSession (QualifiedName .ofCatalog (catalog .getCatalogName ()));
211
- List <SchemaTablePartitionName > schemaTablePartitionNames = splitManager .getPartitionNames ( session , uri , prefixSearch );
212
- List <QualifiedName > qualifiedNames = schemaTablePartitionNames .stream ().map (
213
- schemaTablePartitionName -> QualifiedName .ofPartition ( catalog .getConnectorName ()
214
- , schemaTablePartitionName .getTableName ().getSchemaName ()
215
- , schemaTablePartitionName .getTableName ().getTableName ()
216
- , schemaTablePartitionName .getPartitionId ())).collect (Collectors .toList ());
217
- result .addAll (qualifiedNames );
228
+ futures .add (threadServiceManager .getExecutor ().submit (() -> {
229
+ List <SchemaTablePartitionName > schemaTablePartitionNames = splitManager
230
+ .getPartitionNames (session , uri , prefixSearch );
231
+ List <QualifiedName > qualifiedNames = schemaTablePartitionNames .stream ().map (
232
+ schemaTablePartitionName -> QualifiedName .ofPartition (catalog .getConnectorName ()
233
+ , schemaTablePartitionName .getTableName ().getSchemaName ()
234
+ , schemaTablePartitionName .getTableName ().getTableName ()
235
+ , schemaTablePartitionName .getPartitionId ())).collect (Collectors .toList ());
236
+ result .addAll (qualifiedNames );
237
+ return null ;
238
+ }));
218
239
});
240
+ try {
241
+ Futures .allAsList (futures ).get (1 , TimeUnit .HOURS );
242
+ } catch (Exception e ) {
243
+ Throwables .propagate (e );
244
+ }
219
245
return result ;
220
246
}
221
247
0 commit comments