13
13
14
14
package com .netflix .metacat .hive .connector ;
15
15
16
+ import com .facebook .presto .exception .InvalidMetaException ;
16
17
import com .facebook .presto .exception .PartitionAlreadyExistsException ;
17
18
import com .facebook .presto .exception .PartitionNotFoundException ;
19
+ import com .facebook .presto .hadoop .shaded .com .google .common .collect .Maps ;
18
20
import com .facebook .presto .hive .DirectoryLister ;
19
21
import com .facebook .presto .hive .ForHiveClient ;
20
22
import com .facebook .presto .hive .HdfsEnvironment ;
46
48
import com .google .common .collect .Sets ;
47
49
import com .netflix .metacat .common .partition .util .PartitionUtil ;
48
50
import com .netflix .metacat .hive .connector .util .ConverterUtil ;
51
+ import org .apache .hadoop .hive .metastore .Warehouse ;
49
52
import org .apache .hadoop .hive .metastore .api .AlreadyExistsException ;
50
53
import org .apache .hadoop .hive .metastore .api .FieldSchema ;
51
54
import org .apache .hadoop .hive .metastore .api .NoSuchObjectException ;
61
64
import java .util .Optional ;
62
65
import java .util .Set ;
63
66
import java .util .concurrent .ExecutorService ;
67
+ import java .util .function .Consumer ;
68
+ import java .util .function .Predicate ;
64
69
import java .util .stream .Collectors ;
65
70
66
71
import static com .facebook .presto .hive .HiveUtil .schemaTableName ;
@@ -98,28 +103,55 @@ private List<ConnectorPartition> getPartitions(SchemaTableName schemaTableName,
98
103
List <String > partitionIds ,
99
104
Sort sort , Pageable pageable ,
100
105
boolean includePartitionDetails ) {
106
+ List <ConnectorPartition > result = getPartitions ( schemaTableName , filterExpression , partitionIds );
107
+ if ( pageable != null && pageable .isPageable ()){
108
+ int limit = pageable .getOffset () + pageable .getLimit ();
109
+ if ( result .size () < limit ){
110
+ limit = result .size ();
111
+ }
112
+ if ( pageable .getOffset () > limit ) {
113
+ result = Lists .newArrayList ();
114
+ } else {
115
+ result = result .subList (pageable .getOffset (), limit );
116
+ }
117
+ }
118
+ return result ;
119
+ }
120
+
121
+ private List <ConnectorPartition > getPartitions (SchemaTableName schemaTableName , String filterExpression ,
122
+ List <String > partitionIds ) {
101
123
List <ConnectorPartition > result = Lists .newArrayList ();
102
124
List <String > queryPartitionIds = Lists .newArrayList ();
125
+ Table table = metastore .getTable ( schemaTableName .getSchemaName (), schemaTableName .getTableName ())
126
+ .orElseThrow (() -> new TableNotFoundException (schemaTableName ));
127
+ Map <String ,Partition > partitionMap = null ;
103
128
if (!Strings .isNullOrEmpty (filterExpression )) {
104
- queryPartitionIds = metastore
105
- .getPartitionNamesByParts (schemaTableName .getSchemaName (), schemaTableName .getTableName (),
106
- Lists .newArrayList (PartitionUtil .getPartitionKeyValues (filterExpression ).values ())).orElse (Lists .newArrayList ());
107
- }
108
- if (partitionIds != null ) {
109
- queryPartitionIds .addAll (partitionIds );
129
+ Map <String ,Partition > filteredPartitionMap = Maps .newHashMap ();
130
+ List <Partition > partitions = ((MetacatHiveMetastore )metastore ).getPartitions ( schemaTableName .getSchemaName (), schemaTableName .getTableName (), filterExpression );
131
+ partitions .forEach (partition -> {
132
+ String partitionName = null ;
133
+ try {
134
+ partitionName = Warehouse .makePartName (table .getPartitionKeys (), partition .getValues ());
135
+ } catch (Exception e ) {
136
+ throw new InvalidMetaException ("One or more partition names are invalid." , e );
137
+ }
138
+ if (partitionIds == null || partitionIds .contains (partitionName )) {
139
+ filteredPartitionMap .put (partitionName , partition );
140
+ }
141
+ });
142
+ partitionMap = filteredPartitionMap ;
110
143
} else {
111
- queryPartitionIds .addAll (metastore .getPartitionNames (schemaTableName .getSchemaName (),
112
- schemaTableName .getTableName ()).orElse (Lists .newArrayList ()));
144
+ partitionMap = getPartitionsByNames (
145
+ schemaTableName .getSchemaName (), schemaTableName .getTableName (),
146
+ partitionIds );
113
147
}
114
- Map <String ,Partition > partitionMap = getPartitionsByNames (
115
- schemaTableName .getSchemaName (), schemaTableName .getTableName (),
116
- queryPartitionIds );
117
148
Map <ColumnHandle , Comparable <?>> domainMap = ImmutableMap .of (new ColumnHandle (){}, "ignore" );
118
149
TupleDomain <ColumnHandle > tupleDomain = TupleDomain .withFixedValues (domainMap );
150
+ final List <ConnectorPartition > finalResult = result ;
119
151
partitionMap .forEach ((s , partition ) -> {
120
152
StorageDescriptor sd = partition .getSd ();
121
153
StorageInfo storageInfo = null ;
122
- if ( sd != null ){
154
+ if ( sd != null ) {
123
155
storageInfo = new StorageInfo ();
124
156
storageInfo .setUri (sd .getLocation ());
125
157
storageInfo .setInputFormat (sd .getInputFormat ());
@@ -132,9 +164,10 @@ private List<ConnectorPartition> getPartitions(SchemaTableName schemaTableName,
132
164
}
133
165
}
134
166
AuditInfo auditInfo = new AuditInfo ();
135
- auditInfo .setCreatedDate ((long )partition .getCreateTime ());
167
+ auditInfo .setCreatedDate ((long ) partition .getCreateTime ());
136
168
auditInfo .setLastUpdatedDate ((long ) partition .getLastAccessTime ());
137
- result .add ( new ConnectorPartitionDetailImpl (s , tupleDomain , storageInfo , partition .getParameters (), auditInfo ));
169
+ finalResult .add (new ConnectorPartitionDetailImpl (s , tupleDomain , storageInfo , partition .getParameters (),
170
+ auditInfo ));
138
171
});
139
172
return result ;
140
173
}
@@ -327,4 +360,34 @@ public Integer getPartitionCount(ConnectorTableHandle connectorHandle) {
327
360
SchemaTableName schemaTableName = HiveUtil .schemaTableName (connectorHandle );
328
361
return metastore .getPartitionNames (schemaTableName .getSchemaName (), schemaTableName .getTableName ()).orElse (Lists .newArrayList ()).size ();
329
362
}
363
+
364
+ public List <String > getPartitionKeys (ConnectorTableHandle tableHandle , String filterExpression , List <String > partitionNames , Sort sort , Pageable pageable ){
365
+ List <String > result = null ;
366
+ SchemaTableName schemaTableName = HiveUtil .schemaTableName (tableHandle );
367
+ if ( filterExpression != null || (partitionNames != null && !partitionNames .isEmpty ())){
368
+ result = getPartitions (schemaTableName , filterExpression , partitionNames ).stream ().map (
369
+ ConnectorPartition ::getPartitionId ).collect (Collectors .toList ());
370
+ } else {
371
+ result = metastore .getPartitionNames (schemaTableName .getSchemaName (), schemaTableName .getTableName ())
372
+ .orElse (Lists .newArrayList ());
373
+ }
374
+ if ( pageable != null && pageable .isPageable ()){
375
+ int limit = pageable .getOffset () + pageable .getLimit ();
376
+ if ( result .size () < limit ){
377
+ limit = result .size ();
378
+ }
379
+ if ( pageable .getOffset () > limit ) {
380
+ result = Lists .newArrayList ();
381
+ } else {
382
+ result = result .subList (pageable .getOffset (), limit );
383
+ }
384
+ }
385
+ return result ;
386
+ }
387
+
388
+ public List <String > getPartitionUris (ConnectorTableHandle table , String filterExpression , List <String > partitionNames , Sort sort , Pageable pageable ){
389
+ SchemaTableName schemaTableName = HiveUtil .schemaTableName (table );
390
+ return getPartitions (schemaTableName , filterExpression , partitionNames , sort , pageable , true ).stream ().map (
391
+ partition -> ((ConnectorPartitionDetail ) partition ).getStorageInfo ().getUri ()).collect (Collectors .toList ());
392
+ }
330
393
}
0 commit comments