Skip to content

Expose BigQuery schema autodetect in Java SDK #25632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class BatchLoads<DestinationT, ElementT>
// The maximum number of times to retry failed load or copy jobs.
private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;

private boolean allowNullSchema = false;

BatchLoads(
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
Expand All @@ -181,7 +183,8 @@ class BatchLoads<DestinationT, ElementT>
@Nullable String kmsKey,
boolean clusteringEnabled,
boolean useAvroLogicalTypes,
String tempDataset) {
String tempDataset,
boolean allowNullSchema) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
Expand All @@ -208,6 +211,7 @@ class BatchLoads<DestinationT, ElementT>
this.tempDataset = tempDataset;
this.tableDestinationCoder =
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
this.allowNullSchema = allowNullSchema;
}

void setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
Expand Down Expand Up @@ -781,7 +785,8 @@ private PCollection<KV<DestinationT, WriteTables.Result>> writeTempTables(
// tables. They also shouldn't be needed. See
// https://github.com/apache/beam/issues/21105 for additional details.
schemaUpdateOptions,
tempDataset))
tempDataset,
allowNullSchema))
.setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE));
}

Expand Down Expand Up @@ -821,7 +826,8 @@ PCollection<TableDestination> writeSinglePartition(
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions,
null))
null,
allowNullSchema))
.setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE));

BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,7 @@ public static <T> Write<T> write() {
.setAutoSchemaUpdate(false)
.setDeterministicRecordIdFn(null)
.setMaxRetryJobs(1000)
.setAllowNullSchema(false)
.build();
}

Expand Down Expand Up @@ -2235,6 +2236,8 @@ public enum Method {

abstract int getMaxRetryJobs();

abstract Boolean getAllowNullSchema();

abstract @Nullable String getKmsKey();

abstract Boolean getOptimizeWrites();
Expand Down Expand Up @@ -2340,6 +2343,8 @@ abstract Builder<T> setAvroSchemaFactory(

abstract Builder<T> setMaxRetryJobs(int maxRetryJobs);

abstract Builder<T> setAllowNullSchema(Boolean allowNullSchema);

abstract Builder<T> setPropagateSuccessful(Boolean propagateSuccessful);

abstract Builder<T> setAutoSchemaUpdate(Boolean autoSchemaUpdate);
Expand Down Expand Up @@ -2854,6 +2859,11 @@ public Write<T> withMaxRetryJobs(int maxRetryJobs) {
return toBuilder().setMaxRetryJobs(maxRetryJobs).build();
}

/** If set, this will control whether null schema is allowed or not. */
public Write<T> withAllowNullSchema(Boolean allowNullSchema) {
return toBuilder().setAllowNullSchema(allowNullSchema).build();
}

/**
* If true, it enables the propagation of the successfully inserted TableRows on BigQuery as
* part of the {@link WriteResult} object when using {@link Method#STREAMING_INSERTS}. By
Expand Down Expand Up @@ -3325,7 +3335,8 @@ private <DestinationT> WriteResult continueExpandTyped(
getKmsKey(),
getClustering() != null,
getUseAvroLogicalTypes(),
getWriteTempDataset());
getWriteTempDataset(),
getAllowNullSchema());
batchLoads.setTestServices(getBigQueryServices());
if (getSchemaUpdateOptions() != null) {
batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
private @Nullable JobService jobService;
private final @Nullable String tempDataset;

private final Boolean allowNullSchema;

private class WriteTablesDoFn
extends DoFn<KV<ShardedKey<DestinationT>, WritePartition.Result>, KV<DestinationT, Result>> {

Expand Down Expand Up @@ -206,16 +208,20 @@ public void processElement(
BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class);
} else {
tableSchema = dynamicDestinations.getSchema(destination);
Preconditions.checkArgumentNotNull(
tableSchema,
"Unless create disposition is %s, a schema must be specified, i.e. "
+ "DynamicDestinations.getSchema() may not return null. "
+ "However, create disposition is %s, and %s returned null for destination %s",
CreateDisposition.CREATE_NEVER,
firstPaneCreateDisposition,
dynamicDestinations,
destination);
jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
if (!allowNullSchema) {
Preconditions.checkArgumentNotNull(
tableSchema,
"Unless create disposition is %s, a schema must be specified, i.e. "
+ "DynamicDestinations.getSchema() may not return null. "
+ "However, create disposition is %s, and %s returned null for destination %s",
CreateDisposition.CREATE_NEVER,
firstPaneCreateDisposition,
dynamicDestinations,
destination);
}
if (tableSchema != null) {
jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
}
}

TableDestination tableDestination = dynamicDestinations.getTable(destination);
Expand Down Expand Up @@ -404,7 +410,8 @@ public WriteTables(
String sourceFormat,
boolean useAvroLogicalTypes,
Set<SchemaUpdateOption> schemaUpdateOptions,
@Nullable String tempDataset) {
@Nullable String tempDataset,
boolean allowNullSchema) {

this.tempTable = tempTable;
this.bqServices = bqServices;
Expand All @@ -423,6 +430,7 @@ public WriteTables(
this.useAvroLogicalTypes = useAvroLogicalTypes;
this.schemaUpdateOptions = schemaUpdateOptions;
this.tempDataset = tempDataset;
this.allowNullSchema = allowNullSchema;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,6 @@ private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load)
CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition());

Table existingTable = datasetService.getTable(destination);
if (schema == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to preserve this check?

schema = existingTable.getSchema();
}
checkArgument(schema != null, "No schema specified");

if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,20 @@ public void testMaxRetryJobs() {
assertEquals(500, write.getMaxRetryJobs());
}

@Test
public void testAllowNullSchema() {
BigQueryIO.Write<TableRow> write =
BigQueryIO.writeTableRows()
.to("dataset.table")
.withSchema(new TableSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(
EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.withAllowNullSchema(true);
assertEquals(true, write.getAllowNullSchema());
}

@Test
public void testWritePartitionEmptyData() throws Exception {
long numFiles = 0;
Expand Down Expand Up @@ -2380,6 +2394,12 @@ private void testWritePartition(
}

static class IdentityDynamicTables extends DynamicDestinations<String, String> {
private boolean nullSchema;

public IdentityDynamicTables(boolean nullSchema) {
this.nullSchema = nullSchema;
}

@Override
public String getDestination(ValueInSingleWindow<String> element) {
throw new UnsupportedOperationException("getDestination not expected in this test.");
Expand All @@ -2392,7 +2412,11 @@ public TableDestination getTable(String destination) {

@Override
public TableSchema getSchema(String destination) {
return new TableSchema();
if (nullSchema) {
return null;
} else {
return new TableSchema();
}
}
}

Expand Down Expand Up @@ -2448,7 +2472,7 @@ public void testWriteTables() throws Exception {
p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);

DynamicDestinations<String, String> dynamicDestinations = new IdentityDynamicTables();
DynamicDestinations<String, String> dynamicDestinations = new IdentityDynamicTables(false);

fakeJobService.setNumFailuresExpected(3);
WriteTables<String> writeTables =
Expand All @@ -2467,7 +2491,119 @@ public void testWriteTables() throws Exception {
"NEWLINE_DELIMITED_JSON",
false,
Collections.emptySet(),
null);
null,
false);

PCollection<KV<TableDestination, WriteTables.Result>> writeTablesOutput =
writeTablesInput
.apply(writeTables)
.setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE))
.apply(
ParDo.of(
new DoFn<
KV<String, WriteTables.Result>,
KV<TableDestination, WriteTables.Result>>() {
@ProcessElement
public void processElement(
@Element KV<String, WriteTables.Result> e,
OutputReceiver<KV<TableDestination, WriteTables.Result>> o) {
o.output(KV.of(dynamicDestinations.getTable(e.getKey()), e.getValue()));
}
}));

PAssert.thatMultimap(writeTablesOutput)
.satisfies(
input -> {
assertEquals(expectedTempTables.keySet(), input.keySet());
for (Map.Entry<TableDestination, Iterable<WriteTables.Result>> entry :
input.entrySet()) {
Iterable<String> tableNames =
StreamSupport.stream(entry.getValue().spliterator(), false)
.map(Result::getTableName)
.collect(Collectors.toList());
@SuppressWarnings("unchecked")
String[] expectedValues =
Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class);
assertThat(tableNames, containsInAnyOrder(expectedValues));
}
return null;
});
p.run();
}

@Test
public void testWriteTablesNullSchema() throws Exception {
long numTables = 3;
long numPartitions = 3;
long numFilesPerPartition = 10;
String jobIdToken = "jobId";
final Multimap<TableDestination, String> expectedTempTables = ArrayListMultimap.create();

List<KV<ShardedKey<String>, WritePartition.Result>> partitions = Lists.newArrayList();
for (int i = 0; i < numTables; ++i) {
String tableName = String.format("project-id:dataset-id.table%05d", i);
TableDestination tableDestination = new TableDestination(tableName, tableName);
for (int j = 0; j < numPartitions; ++j) {
String tempTableId =
BigQueryResourceNaming.createJobIdWithDestination(jobIdToken, tableDestination, j, 0);
List<String> filesPerPartition = Lists.newArrayList();
for (int k = 0; k < numFilesPerPartition; ++k) {
String filename =
Paths.get(
testFolder.getRoot().getAbsolutePath(),
String.format("files0x%08x_%05d", tempTableId.hashCode(), k))
.toString();
TableRowWriter<TableRow> writer =
new TableRowWriter<>(filename, SerializableFunctions.identity());
try (TableRowWriter<TableRow> ignored = writer) {
TableRow tableRow = new TableRow().set("name", tableName);
writer.write(tableRow);
}
filesPerPartition.add(writer.getResult().resourceId.toString());
}
partitions.add(
KV.of(
ShardedKey.of(tableDestination.getTableSpec(), j),
new AutoValue_WritePartition_Result(filesPerPartition, true)));

String json =
String.format(
"{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}",
tempTableId);
expectedTempTables.put(tableDestination, json);
}
}

PCollection<KV<ShardedKey<String>, WritePartition.Result>> writeTablesInput =
p.apply(
Create.of(partitions)
.withCoder(
KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), ResultCoder.INSTANCE)));
PCollectionView<String> jobIdTokenView =
p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);

DynamicDestinations<String, String> dynamicDestinations = new IdentityDynamicTables(true);

fakeJobService.setNumFailuresExpected(3);
WriteTables<String> writeTables =
new WriteTables<>(
true,
fakeBqServices,
jobIdTokenView,
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED,
sideInputs,
dynamicDestinations,
null,
4,
false,
null,
"NEWLINE_DELIMITED_JSON",
false,
Collections.emptySet(),
null,
true);

PCollection<KV<TableDestination, WriteTables.Result>> writeTablesOutput =
writeTablesInput
Expand Down