Skip to content

Commit c5c7136

Browse files
Reverting View changes and Fixing V1 API header issues (#642)
* Revert "Adding common view support to the Polaris Connector (#636)" Will re-apply after adding in a V1 API Header fix for SBN3 This reverts commit 4fe3671. * Removing Consumes & Produces in Request Mapping to keep existing SBN2 behavior where content-type and accepts is not required in headers. * Removing unused imports of MediaType
1 parent 4fe3671 commit c5c7136

File tree

23 files changed

+56
-700
lines changed

23 files changed

+56
-700
lines changed

metacat-connector-polaris/src/functionalTest/resources/schema.sql

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ create table TBLS (
1919
tbl_name varchar(255) not null,
2020
previous_metadata_location varchar(8192),
2121
metadata_location varchar(8192),
22-
params TEXT,
2322
constraint uniq_name unique(db_name, tbl_name),
2423
created_by STRING(255),
2524
created_date TIMESTAMP not null,

metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableService.java

+11-71
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
import com.netflix.metacat.common.server.connectors.exception.TablePreconditionFailedException;
1919
import com.netflix.metacat.common.server.connectors.model.TableInfo;
2020
import com.netflix.metacat.common.server.properties.Config;
21-
import com.netflix.metacat.common.server.util.MetacatUtils;
22-
import com.netflix.metacat.connector.hive.commonview.CommonViewHandler;
2321
import com.netflix.metacat.connector.hive.converters.HiveConnectorInfoConverter;
24-
import com.netflix.metacat.connector.hive.converters.HiveTypeConverter;
2522
import com.netflix.metacat.connector.hive.iceberg.IcebergTableHandler;
2623
import com.netflix.metacat.connector.hive.iceberg.IcebergTableWrapper;
2724
import com.netflix.metacat.connector.hive.sql.DirectSqlTable;
@@ -54,7 +51,6 @@ public class PolarisConnectorTableService implements ConnectorTableService {
5451
protected final HiveConnectorInfoConverter connectorConverter;
5552
protected final ConnectorContext connectorContext;
5653
protected final IcebergTableHandler icebergTableHandler;
57-
protected final CommonViewHandler commonViewHandler;
5854
protected final PolarisTableMapper polarisTableMapper;
5955
protected final String catalogName;
6056

@@ -66,7 +62,6 @@ public class PolarisConnectorTableService implements ConnectorTableService {
6662
* @param polarisConnectorDatabaseService connector database service
6763
* @param connectorConverter converter
6864
* @param icebergTableHandler iceberg table handler
69-
* @param commonViewHandler common view handler
7065
* @param polarisTableMapper polaris table polarisTableMapper
7166
* @param connectorContext the connector context
7267
*/
@@ -76,7 +71,6 @@ public PolarisConnectorTableService(
7671
final PolarisConnectorDatabaseService polarisConnectorDatabaseService,
7772
final HiveConnectorInfoConverter connectorConverter,
7873
final IcebergTableHandler icebergTableHandler,
79-
final CommonViewHandler commonViewHandler,
8074
final PolarisTableMapper polarisTableMapper,
8175
final ConnectorContext connectorContext
8276
) {
@@ -85,7 +79,6 @@ public PolarisConnectorTableService(
8579
this.connectorConverter = connectorConverter;
8680
this.connectorContext = connectorContext;
8781
this.icebergTableHandler = icebergTableHandler;
88-
this.commonViewHandler = commonViewHandler;
8982
this.polarisTableMapper = polarisTableMapper;
9083
this.catalogName = catalogName;
9184
}
@@ -103,13 +96,8 @@ public void create(final ConnectorRequestContext requestContext, final TableInfo
10396
}
10497
try {
10598
final PolarisTableEntity entity = polarisTableMapper.toEntity(tableInfo);
106-
if (HiveTableUtil.isCommonView(tableInfo)) {
107-
polarisStoreService.createTable(entity.getDbName(), entity.getTblName(),
108-
entity.getMetadataLocation(), entity.getParams(), createdBy);
109-
} else {
110-
polarisStoreService.createTable(entity.getDbName(), entity.getTblName(),
99+
polarisStoreService.createTable(entity.getDbName(), entity.getTblName(),
111100
entity.getMetadataLocation(), createdBy);
112-
}
113101
} catch (DataIntegrityViolationException | InvalidMetaException exception) {
114102
throw new InvalidMetaException(name, exception);
115103
} catch (Exception exception) {
@@ -160,10 +148,8 @@ public TableInfo get(final ConnectorRequestContext requestContext, final Qualifi
160148
final PolarisTableEntity polarisTableEntity = polarisStoreService
161149
.getTable(name.getDatabaseName(), name.getTableName())
162150
.orElseThrow(() -> new TableNotFoundException(name));
163-
final boolean isView = MetacatUtils.isCommonView(polarisTableEntity.getParams());
164-
final TableInfo info = polarisTableMapper.toInfo(polarisTableEntity, isView);
151+
final TableInfo info = polarisTableMapper.toInfo(polarisTableEntity);
165152
final String tableLoc = HiveTableUtil.getIcebergTableMetadataLocation(info);
166-
167153
// Return the iceberg table with just the metadata location included if requested.
168154
if (connectorContext.getConfig().shouldFetchOnlyMetadataLocationEnabled()
169155
&& requestContext.isIncludeMetadataLocationOnly()) {
@@ -172,12 +158,8 @@ public TableInfo get(final ConnectorRequestContext requestContext, final Qualifi
172158
.fields(Collections.emptyList())
173159
.build();
174160
}
175-
if (isView) {
176-
return getCommonView(name, tableLoc, info, connectorContext.getConfig().isIcebergCacheEnabled());
177-
} else {
178-
return getIcebergTable(name, tableLoc, info,
179-
requestContext.isIncludeMetadata(), connectorContext.getConfig().isIcebergCacheEnabled());
180-
}
161+
return getIcebergTable(name, tableLoc, info,
162+
requestContext.isIncludeMetadata(), connectorContext.getConfig().isIcebergCacheEnabled());
181163
} catch (TableNotFoundException | IllegalArgumentException exception) {
182164
log.error(String.format("Not found exception for polaris table %s", name), exception);
183165
throw exception;
@@ -235,19 +217,13 @@ public void update(final ConnectorRequestContext requestContext, final TableInfo
235217
final QualifiedName name = tableInfo.getName();
236218
final Config conf = connectorContext.getConfig();
237219
final String lastModifiedBy = PolarisUtils.getUserOrDefault(requestContext);
238-
final boolean isView = HiveTableUtil.isCommonView(tableInfo);
239-
if (isView) {
240-
commonViewHandler.update(tableInfo);
241-
} else {
242-
icebergTableHandler.update(tableInfo);
243-
}
220+
icebergTableHandler.update(tableInfo);
244221
try {
245222
final Map<String, String> newTableMetadata = tableInfo.getMetadata();
246223
if (MapUtils.isEmpty(newTableMetadata)) {
247224
log.warn("No parameters defined for iceberg table %s, no data update needed", name);
248225
return;
249226
}
250-
251227
final String prevLoc = newTableMetadata.get(DirectSqlTable.PARAM_PREVIOUS_METADATA_LOCATION);
252228
final String newLoc = newTableMetadata.get(DirectSqlTable.PARAM_METADATA_LOCATION);
253229
if (StringUtils.isBlank(prevLoc)) {
@@ -270,25 +246,10 @@ public void update(final ConnectorRequestContext requestContext, final TableInfo
270246
log.error(message);
271247
throw new InvalidMetaException(name, message, null);
272248
}
273-
274-
boolean updated = false;
275-
if (isView) {
276-
final Map<String, String> newTableParams = polarisTableMapper.filterMetadata(newTableMetadata);
277-
final Map<String, String> existingTableParams = polarisStoreService
278-
.getTable(name.getDatabaseName(), name.getTableName())
279-
.orElseThrow(() -> new TableNotFoundException(name))
280-
.getParams();
281-
// optimistically attempt to update metadata location and/or params
282-
updated = polarisStoreService.updateTableMetadataLocationAndParams(
283-
name.getDatabaseName(), name.getTableName(), prevLoc, newLoc,
284-
existingTableParams, newTableParams, lastModifiedBy);
285-
} else {
286-
// optimistically attempt to update metadata location
287-
updated = polarisStoreService.updateTableMetadataLocation(
288-
name.getDatabaseName(), name.getTableName(), prevLoc, newLoc, lastModifiedBy
289-
);
290-
}
291-
249+
// optimistically attempt to update metadata location
250+
final boolean updated = polarisStoreService.updateTableMetadataLocation(
251+
name.getDatabaseName(), name.getTableName(),
252+
prevLoc, newLoc, lastModifiedBy);
292253
// if succeeded then done, else try to figure out why and throw corresponding exception
293254
if (updated) {
294255
requestContext.setIgnoreErrorsAfterUpdate(true);
@@ -387,10 +348,7 @@ public List<TableInfo> list(
387348
ConnectorUtils.sort(tbls, sort, Comparator.comparing(t -> t.getTblName()));
388349
}
389350
return ConnectorUtils.paginate(tbls, pageable).stream()
390-
.map(
391-
t -> polarisTableMapper.toInfo(t, MetacatUtils.isCommonView(t.getParams()))
392-
)
393-
.collect(Collectors.toList());
351+
.map(t -> polarisTableMapper.toInfo(t)).collect(Collectors.toList());
394352
} catch (Exception exception) {
395353
final String msg = String.format("Failed polaris list tables %s using prefix %s", name, prefix);
396354
log.error(msg, exception);
@@ -402,7 +360,7 @@ public List<TableInfo> list(
402360
* Return the table metadata from cache if exists else make the iceberg call and refresh it.
403361
* @param tableName table name
404362
* @param tableMetadataLocation table metadata location
405-
* @param info table info stored in Polaris
363+
* @param info table info stored in hive metastore
406364
* @param includeInfoDetails if true, will include more details like the manifest file content
407365
* @param useCache true, if table can be retrieved from cache
408366
* @return TableInfo
@@ -418,24 +376,6 @@ public TableInfo getIcebergTable(final QualifiedName tableName,
418376
return connectorConverter.fromIcebergTableToTableInfo(tableName, icebergTable, tableMetadataLocation, info);
419377
}
420378

421-
/**
422-
* Return the view metadata from cache if exists else make the iceberg call and refresh it.
423-
* @param tableName table name
424-
* @param tableMetadataLocation table metadata location
425-
* @param info table info stored in Polaris
426-
* @param useCache true, if table can be retrieved from cache
427-
* @return TableInfo
428-
*/
429-
@Cacheable(key = "'iceberg.view.' + #tableMetadataLocation", condition = "#useCache")
430-
public TableInfo getCommonView(final QualifiedName tableName,
431-
final String tableMetadataLocation,
432-
final TableInfo info,
433-
final boolean useCache) {
434-
return commonViewHandler.getCommonViewTableInfo(
435-
tableName, tableMetadataLocation, info, new HiveTypeConverter()
436-
);
437-
}
438-
439379
@Override
440380
public List<QualifiedName> getTableNames(
441381
final ConnectorRequestContext context,

metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/configs/PolarisConnectorConfig.java

-14
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.google.common.collect.ImmutableMap;
44
import com.netflix.metacat.common.server.connectors.ConnectorContext;
55
import com.netflix.metacat.common.server.util.ThreadServiceManager;
6-
import com.netflix.metacat.connector.hive.commonview.CommonViewHandler;
76
import com.netflix.metacat.connector.hive.converters.HiveConnectorInfoConverter;
87
import com.netflix.metacat.connector.hive.iceberg.IcebergTableCriteria;
98
import com.netflix.metacat.connector.hive.iceberg.IcebergTableCriteriaImpl;
@@ -68,7 +67,6 @@ public PolarisConnectorDatabaseService polarisDatabaseService(
6867
* @param connectorConverter connector converter
6968
* @param connectorDatabaseService polaris database service
7069
* @param icebergTableHandler iceberg table handler
71-
* @param commonViewHandler common view handler
7270
* @param polarisTableMapper polaris table mapper
7371
* @param connectorContext connector context
7472
* @return PolarisConnectorTableService
@@ -80,7 +78,6 @@ public PolarisConnectorTableService polarisTableService(
8078
final HiveConnectorInfoConverter connectorConverter,
8179
final PolarisConnectorDatabaseService connectorDatabaseService,
8280
final IcebergTableHandler icebergTableHandler,
83-
final CommonViewHandler commonViewHandler,
8481
final PolarisTableMapper polarisTableMapper,
8582
final ConnectorContext connectorContext
8683
) {
@@ -90,7 +87,6 @@ public PolarisConnectorTableService polarisTableService(
9087
connectorDatabaseService,
9188
connectorConverter,
9289
icebergTableHandler,
93-
commonViewHandler,
9490
polarisTableMapper,
9591
connectorContext
9692
);
@@ -126,16 +122,6 @@ public IcebergTableHandler icebergTableHandler(final ConnectorContext connectorC
126122
icebergTableOpsProxy);
127123
}
128124

129-
/**
130-
* Create common view handler.
131-
* @param connectorContext server context
132-
* @return CommonViewHandler
133-
*/
134-
@Bean
135-
public CommonViewHandler commonViewHandler(final ConnectorContext connectorContext) {
136-
return new CommonViewHandler(connectorContext);
137-
}
138-
139125
/**
140126
* Create iceberg table criteria.
141127
* @param connectorContext server context

metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/mappers/PolarisTableMapper.java

+10-53
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.netflix.metacat.connector.polaris.mappers;
22

3-
import com.google.common.collect.ImmutableSet;
3+
import com.google.common.collect.ImmutableMap;
44
import com.netflix.metacat.common.QualifiedName;
55
import com.netflix.metacat.common.server.connectors.exception.InvalidMetaException;
66
import com.netflix.metacat.common.server.connectors.model.AuditInfo;
@@ -12,10 +12,7 @@
1212
import org.apache.commons.lang3.StringUtils;
1313

1414
import java.sql.Date;
15-
import java.util.HashMap;
1615
import java.util.Map;
17-
import java.util.Set;
18-
import java.util.stream.Collectors;
1916

2017
/**
2118
* Table object mapper implementations.
@@ -27,13 +24,6 @@ public class PolarisTableMapper implements
2724
private static final String PARAMETER_SPARK_SQL_PROVIDER = "spark.sql.sources.provider";
2825
private static final String PARAMETER_EXTERNAL = "EXTERNAL";
2926
private static final String PARAMETER_METADATA_PREFIX = "/metadata/";
30-
private static final Set<String> EXCLUDED_PARAM_KEYS = ImmutableSet.of(
31-
DirectSqlTable.PARAM_METADATA_LOCATION,
32-
DirectSqlTable.PARAM_PREVIOUS_METADATA_LOCATION,
33-
DirectSqlTable.PARAM_TABLE_TYPE,
34-
DirectSqlTable.PARAM_PARTITION_SPEC,
35-
DirectSqlTable.PARAM_METADATA_CONTENT
36-
);
3727
private final String catalogName;
3828

3929
/**
@@ -49,59 +39,28 @@ public PolarisTableMapper(final String catalogName) {
4939
*/
5040
@Override
5141
public TableInfo toInfo(final PolarisTableEntity entity) {
52-
return toInfo(entity, false);
53-
}
54-
55-
/**
56-
* Maps an Entity to the Info object.
57-
*
58-
* @param entity The entity to map from.
59-
* @param isView Whether the given entity represents a common view
60-
* @return The result info object.
61-
*/
62-
public TableInfo toInfo(final PolarisTableEntity entity, final boolean isView) {
6342
final int uriIndex = entity.getMetadataLocation().indexOf(PARAMETER_METADATA_PREFIX);
64-
65-
final HashMap<String, String> metadata = new HashMap<>();
66-
metadata.put(DirectSqlTable.PARAM_METADATA_LOCATION, entity.getMetadataLocation());
67-
68-
if (isView) {
69-
metadata.putAll(entity.getParams());
70-
} else {
71-
metadata.put(PARAMETER_EXTERNAL, "TRUE");
72-
metadata.put(PARAMETER_SPARK_SQL_PROVIDER, "iceberg");
73-
metadata.put(DirectSqlTable.PARAM_TABLE_TYPE, DirectSqlTable.ICEBERG_TABLE_TYPE);
74-
}
75-
7643
final TableInfo tableInfo = TableInfo.builder()
7744
.name(QualifiedName.ofTable(catalogName, entity.getDbName(), entity.getTblName()))
78-
.metadata(metadata)
45+
.metadata(ImmutableMap.of(
46+
DirectSqlTable.PARAM_METADATA_LOCATION, entity.getMetadataLocation(),
47+
PARAMETER_EXTERNAL, "TRUE", PARAMETER_SPARK_SQL_PROVIDER, "iceberg",
48+
DirectSqlTable.PARAM_TABLE_TYPE, DirectSqlTable.ICEBERG_TABLE_TYPE))
7949
.serde(StorageInfo.builder().inputFormat("org.apache.hadoop.mapred.FileInputFormat")
8050
.outputFormat("org.apache.hadoop.mapred.FileOutputFormat")
8151
.serializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
8252
.uri(uriIndex > 0 ? entity.getMetadataLocation().substring(0, uriIndex) : "")
8353
.build())
8454
.auditInfo(AuditInfo.builder()
85-
.createdBy(entity.getAudit().getCreatedBy())
86-
.createdDate(Date.from(entity.getAudit().getCreatedDate()))
87-
.lastModifiedBy(entity.getAudit().getLastModifiedBy())
88-
.lastModifiedDate(Date.from(entity.getAudit().getLastModifiedDate()))
89-
.build())
55+
.createdBy(entity.getAudit().getCreatedBy())
56+
.createdDate(Date.from(entity.getAudit().getCreatedDate()))
57+
.lastModifiedBy(entity.getAudit().getLastModifiedBy())
58+
.lastModifiedDate(Date.from(entity.getAudit().getLastModifiedDate()))
59+
.build())
9060
.build();
9161
return tableInfo;
9262
}
9363

94-
/**
95-
* Given TableInfo.metadata, filter out reserved metadata keys which should not be stored as table params.
96-
* @param metadata TableInfo.metadata
97-
* @return Map of table params which can be stored in PolarisTableEntity.params
98-
*/
99-
public Map<String, String> filterMetadata(final Map<String, String> metadata) {
100-
return metadata.entrySet()
101-
.stream().filter(entry -> !EXCLUDED_PARAM_KEYS.contains(entry.getKey()))
102-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
103-
}
104-
10564
/**
10665
* {@inheritDoc}.
10766
*/
@@ -117,12 +76,10 @@ public PolarisTableEntity toEntity(final TableInfo info) {
11776
final String message = String.format("No metadata location defined for iceberg table %s", info.getName());
11877
throw new InvalidMetaException(info.getName(), message, null);
11978
}
120-
12179
final PolarisTableEntity tableEntity = PolarisTableEntity.builder()
12280
.dbName(info.getName().getDatabaseName())
12381
.tblName(info.getName().getTableName())
12482
.metadataLocation(location)
125-
.params(filterMetadata(metadata))
12683
.build();
12784
return tableEntity;
12885
}

0 commit comments

Comments
 (0)