Open
Description
What happened?
found as part of analysis: #34102 (comment)
problematic line here: StorageApiWriteUnshardedRecords
If we have a row with known fields (name:string, surname:string) and one unknown (foo:string) , this line will concat and not cause issues.
If unknown field is part of repeated structure,
e.g. (name:string, phone:(repeated struct type:string, num:string) ) and we have unknown in repeated struct (phone: [ null, {favourite:yes} ]) concat work well.
to reproduce:
final List<String> elements = Arrays.asList("1234, Foo", "1111, Bar", "3456, Baz");
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(
new TableFieldSchema()
.setName("phone_numbers")
.setMode("REPEATED")
.setType("RECORD")
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("type")
.setType("STRING")
.setMode(
"REQUIRED"), // remove required mode - BQ IO will insert empty records
new TableFieldSchema().setName("number").setType("STRING"))));
TableSchema schema = new TableSchema().setFields(fields);
Pipeline p = Pipeline.create(options);
@UnknownKeyFor
@NonNull
@Initialized
WriteResult apply =
p
// Add a source that uses the hard-coded records.
.apply(Create.of(elements))
// Add a transform that converts the records (comma-separated strings) to
// TableRow.
.apply(
"to TableRow",
ParDo.of(
new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<String> columns = Splitter.on(", ").splitToList(c.element());
TableRow row = new TableRow();
row.set("id", Integer.parseInt(columns.get(0)));
row.set("name", columns.get(1));
if (columns.get(0).equals("1111")) {
row.set(
"phone_numbers",
ImmutableList.of(
new TableCell().set("type", "home").set("number", "88888"),
new TableCell()
.set("type", "work")
.set("number", "99999")
.set("foo", "bar")); // add unknown field
);
} else {
row.set(
"phone_numbers",
ImmutableList.of(
new TableCell().set("type", "home").set("number", "12345"),
new TableCell().set("type", "work").set("number", "67890")));
}
c.output(row);
}
}))
.apply(
"bq",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to("<project>:<dataset>.<table>")
.withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withAutoSchemaUpdate(true)
.ignoreUnknownValues());
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner