Skip to content

Commit 47a7a06

Browse files
Reading from spark table mappeed to explcit BQ table works
1 parent 726196d commit 47a7a06

File tree

9 files changed

+685
-45
lines changed

9 files changed

+685
-45
lines changed

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,24 @@ public static SparkBigQueryConfig from(
268268
SparkSession spark,
269269
Optional<StructType> schema,
270270
boolean tableIsMandatory) {
271+
return from(
272+
options,
273+
customDefaults,
274+
dataSourceVersion,
275+
spark,
276+
schema,
277+
tableIsMandatory,
278+
Optional.empty());
279+
}
280+
281+
public static SparkBigQueryConfig from(
282+
Map<String, String> options,
283+
ImmutableMap<String, String> customDefaults,
284+
DataSourceVersion dataSourceVersion,
285+
SparkSession spark,
286+
Optional<StructType> schema,
287+
boolean tableIsMandatory,
288+
Optional<TableId> overrideTableId) {
271289
Map<String, String> optionsMap = new HashMap<>(options);
272290
dataSourceVersion.updateOptionsMap(optionsMap);
273291
return SparkBigQueryConfig.from(
@@ -279,7 +297,8 @@ public static SparkBigQueryConfig from(
279297
spark.sqlContext().conf(),
280298
spark.version(),
281299
schema,
282-
tableIsMandatory);
300+
tableIsMandatory,
301+
overrideTableId);
283302
}
284303

285304
@VisibleForTesting
@@ -293,6 +312,31 @@ public static SparkBigQueryConfig from(
293312
String sparkVersion,
294313
Optional<StructType> schema,
295314
boolean tableIsMandatory) {
315+
return from(
316+
optionsInput,
317+
originalGlobalOptions,
318+
hadoopConfiguration,
319+
customDefaults,
320+
defaultParallelism,
321+
sqlConf,
322+
sparkVersion,
323+
schema,
324+
tableIsMandatory,
325+
Optional.empty());
326+
}
327+
328+
@VisibleForTesting
329+
public static SparkBigQueryConfig from(
330+
Map<String, String> optionsInput,
331+
ImmutableMap<String, String> originalGlobalOptions,
332+
Configuration hadoopConfiguration,
333+
ImmutableMap<String, String> customDefaults,
334+
int defaultParallelism,
335+
SQLConf sqlConf,
336+
String sparkVersion,
337+
Optional<StructType> schema,
338+
boolean tableIsMandatory,
339+
Optional<TableId> overrideTableId) {
296340
SparkBigQueryConfig config = new SparkBigQueryConfig();
297341

298342
ImmutableMap<String, String> options = toLowerCaseKeysMap(optionsInput);
@@ -314,9 +358,6 @@ public static SparkBigQueryConfig from(
314358
com.google.common.base.Optional.fromNullable(
315359
hadoopConfiguration.get(GCS_CONFIG_PROJECT_ID_PROPERTY))
316360
.toJavaUtil();
317-
Optional<String> tableParam =
318-
getOptionFromMultipleParams(options, ImmutableList.of("table", "path"), DEFAULT_FALLBACK)
319-
.toJavaUtil();
320361
Optional<String> datasetParam = getOption(options, "dataset").or(fallbackDataset).toJavaUtil();
321362
Optional<String> projectParam =
322363
firstPresent(getOption(options, "project").toJavaUtil(), fallbackProject);
@@ -327,28 +368,36 @@ public static SparkBigQueryConfig from(
327368
config.partitionRangeEnd = getOption(options, "partitionRangeEnd").transform(Long::parseLong);
328369
config.partitionRangeInterval =
329370
getOption(options, "partitionRangeInterval").transform(Long::parseLong);
330-
Optional<String> datePartitionParam = getOption(options, DATE_PARTITION_PARAM).toJavaUtil();
331-
datePartitionParam.ifPresent(
332-
date -> validateDateFormat(date, config.getPartitionTypeOrDefault(), DATE_PARTITION_PARAM));
333-
// checking for query
334-
if (tableParam.isPresent()) {
335-
String tableParamStr = tableParam.get().trim();
336-
if (isQuery(tableParamStr)) {
337-
// it is a query in practice
338-
config.query = com.google.common.base.Optional.of(tableParamStr);
339-
config.tableId = parseTableId("QUERY", datasetParam, projectParam, datePartitionParam);
340-
} else {
341-
config.tableId =
342-
parseTableId(tableParamStr, datasetParam, projectParam, datePartitionParam);
343-
}
371+
if (overrideTableId.isPresent()) {
372+
config.tableId = overrideTableId.get();
344373
} else {
345-
// no table has been provided, it is either a query or an error
346-
config.query = getOption(options, "query").transform(String::trim);
347-
if (config.query.isPresent()) {
348-
config.tableId = parseTableId("QUERY", datasetParam, projectParam, datePartitionParam);
349-
} else if (tableIsMandatory) {
350-
// No table nor query were set. We cannot go further.
351-
throw new IllegalArgumentException("No table has been specified");
374+
// checking for query
375+
Optional<String> tableParam =
376+
getOptionFromMultipleParams(options, ImmutableList.of("table", "path"), DEFAULT_FALLBACK)
377+
.toJavaUtil();
378+
Optional<String> datePartitionParam = getOption(options, DATE_PARTITION_PARAM).toJavaUtil();
379+
datePartitionParam.ifPresent(
380+
date ->
381+
validateDateFormat(date, config.getPartitionTypeOrDefault(), DATE_PARTITION_PARAM));
382+
if (tableParam.isPresent()) {
383+
String tableParamStr = tableParam.get().trim();
384+
if (isQuery(tableParamStr)) {
385+
// it is a query in practice
386+
config.query = com.google.common.base.Optional.of(tableParamStr);
387+
config.tableId = parseTableId("QUERY", datasetParam, projectParam, datePartitionParam);
388+
} else {
389+
config.tableId =
390+
parseTableId(tableParamStr, datasetParam, projectParam, datePartitionParam);
391+
}
392+
} else {
393+
// no table has been provided, it is either a query or an error
394+
config.query = getOption(options, "query").transform(String::trim);
395+
if (config.query.isPresent()) {
396+
config.tableId = parseTableId("QUERY", datasetParam, projectParam, datePartitionParam);
397+
} else if (tableIsMandatory) {
398+
// No table nor query were set. We cannot go further.
399+
throw new IllegalArgumentException("No table has been specified");
400+
}
352401
}
353402
}
354403

spark-bigquery-dsv2/spark-3.1-bigquery-lib/src/main/java/com/google/cloud/spark/bigquery/v2/Spark31BigQueryTable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.spark.bigquery.v2;
1717

18+
import com.google.cloud.bigquery.TableId;
1819
import com.google.cloud.bigquery.connector.common.BigQueryUtil;
1920
import com.google.cloud.spark.bigquery.DataSourceVersion;
2021
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
@@ -47,10 +48,12 @@ public class Spark31BigQueryTable implements Table, SupportsRead, SupportsWrite
4748

4849
protected Injector injector;
4950
protected Supplier<StructType> schemaSupplier;
51+
protected TableId tableId;
5052

5153
public Spark31BigQueryTable(Injector injector, Supplier<StructType> schemaSupplier) {
5254
this.injector = injector;
5355
this.schemaSupplier = schemaSupplier;
56+
this.tableId = injector.getInstance(SparkBigQueryConfig.class).getTableId();
5457
}
5558

5659
@Override
@@ -67,8 +70,9 @@ protected BigQueryDataSourceReaderContext createBigQueryDataSourceReaderContext(
6770
ImmutableMap.of(),
6871
injector.getInstance(DataSourceVersion.class),
6972
injector.getInstance(SparkSession.class),
70-
Optional.ofNullable(schemaSupplier.get()), /*tableIsMandatory*/
71-
true);
73+
Optional.ofNullable(schemaSupplier.get()),
74+
true /* tableIsMandatory */,
75+
Optional.of(tableId));
7276
Injector readerInjector =
7377
injector.createChildInjector(
7478
new BigQueryDataSourceReaderModule(Optional.of(tableScanConfig)));

spark-bigquery-dsv2/spark-3.5-bigquery-lib/src/main/java/com/google/cloud/spark/bigquery/v2/BigQueryTableCatalog.java

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.common.collect.ImmutableMap;
2929
import com.google.inject.Injector;
3030
import java.util.Arrays;
31+
import java.util.HashMap;
3132
import java.util.Map;
3233
import java.util.Optional;
3334
import java.util.ServiceLoader;
@@ -48,9 +49,11 @@ public class BigQueryTableCatalog implements TableCatalog {
4849
private static final Logger logger = LoggerFactory.getLogger(BigQueryTableCatalog.class);
4950
private static final String[] DEFAULT_NAMESPACE = {"default"};
5051

51-
TableProvider tableProvider;
52-
BigQueryClient bigQueryClient;
53-
SchemaConverters schemaConverters;
52+
private static Map<String, Table> identifierToTableMapping = new HashMap<>();
53+
54+
private TableProvider tableProvider;
55+
private BigQueryClient bigQueryClient;
56+
private SchemaConverters schemaConverters;
5457

5558
@Override
5659
public void initialize(String name, CaseInsensitiveStringMap caseInsensitiveStringMap) {
@@ -99,10 +102,14 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti
99102
@Override
100103
public Table loadTable(Identifier identifier) throws NoSuchTableException {
101104
logger.debug("loading table [{}])", format(identifier));
102-
ImmutableMap<String, String> properties =
103-
ImmutableMap.of("dataset", identifier.namespace()[0], "table", identifier.name());
104-
// TODO: reuse injector
105-
return Spark3Util.createBigQueryTableInstance(Spark35BigQueryTable::new, null, properties);
105+
return identifierToTableMapping.computeIfAbsent(
106+
identifier.toString(),
107+
ignored ->
108+
// TODO: reuse injector
109+
Spark3Util.createBigQueryTableInstance(
110+
Spark35BigQueryTable::new,
111+
null,
112+
ImmutableMap.of("dataset", identifier.namespace()[0], "table", identifier.name())));
106113
}
107114

108115
@Override
@@ -125,20 +132,40 @@ public Table createTable(
125132
Map<String, String> properties)
126133
throws TableAlreadyExistsException, NoSuchNamespaceException {
127134
logger.debug("creating table [{}])", format(identifier));
135+
if (tableExplicitlySet(properties)) {
136+
logger.debug("Mapping Spark table to BigQuery table)");
137+
// As the table is mapped to an actual table in BigQuery, we are relying on the BigQuery
138+
// schema
139+
return identifierToTableMapping.computeIfAbsent(
140+
identifier.toString(),
141+
ignored ->
142+
Spark3Util.createBigQueryTableInstance(
143+
Spark35BigQueryTable::new, /* schema */ null, properties));
144+
}
128145
Schema schema = schemaConverters.toBigQuerySchema(structType);
129146
bigQueryClient.createTable(
130147
toTableId(identifier),
131148
schema,
132149
BigQueryClient.CreateTableOptions.of(
133150
Optional.empty(), ImmutableMap.of(), Optional.empty()));
134-
ImmutableMap<String, String> getTableProperties =
135-
ImmutableMap.<String, String>builder()
136-
.put("dataset", identifier.namespace()[0])
137-
.put("table", identifier.name())
138-
.putAll(properties)
139-
.build();
151+
ImmutableMap.Builder<String, String> getTableProperties =
152+
ImmutableMap.<String, String>builder().putAll(properties);
153+
// if the user provided an alternative table we do not want to ignore it
154+
if (!tableExplicitlySet(properties)) {
155+
getTableProperties.put("dataset", identifier.namespace()[0]).put("table", identifier.name());
156+
}
140157
// TODO: Use the table constructor directly using the catalog's injector
141-
return tableProvider.getTable(structType, transforms, getTableProperties);
158+
return tableProvider.getTable(structType, transforms, getTableProperties.buildKeepingLast());
159+
}
160+
161+
private static boolean tableExplicitlySet(Map<String, String> properties) {
162+
if (properties.containsKey("table")) {
163+
return true;
164+
}
165+
if (properties.containsKey("path")) {
166+
return true;
167+
}
168+
return false;
142169
}
143170

144171
@Override
@@ -151,8 +178,12 @@ public Table alterTable(Identifier identifier, TableChange... tableChanges)
151178
public boolean dropTable(Identifier identifier) {
152179
logger.debug("dropping table [{}])", format(identifier));
153180
TableId tableId = toTableId(identifier);
154-
if (bigQueryClient.tableExists(tableId)) {
155-
return bigQueryClient.deleteTable(tableId);
181+
if (!bigQueryClient.tableExists(tableId)) {
182+
return false;
183+
}
184+
if (bigQueryClient.deleteTable(tableId)) {
185+
identifierToTableMapping.remove(identifier.toString());
186+
return true;
156187
}
157188
return false;
158189
}

0 commit comments

Comments
 (0)