Skip to content

Add section to README describing PCollections #35012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,34 @@ The key concepts in the Beam programming model are:
* `Pipeline`: manages a directed acyclic graph of PTransforms and PCollections that is ready for execution.
* `PipelineRunner`: specifies where and how the pipeline should execute.

## Working with PCollections

A `PCollection` represents a distributed dataset that your Beam pipeline operates on. The "P" stands for "parallel" to indicate that a PCollection can be processed by multiple workers in parallel.

PCollections can be created in a few primary ways:

* **From in-memory data:** You can create a PCollection from a local collection (like a list or array) in your driver program. This is typically done using a `Create` transform (e.g., `beam.Create()` in Python, `Create.of()` in Java). This is useful for testing or for small, fixed datasets.
* **Reading from external sources:** More commonly, PCollections are created by reading data from an external storage system. Beam provides I/O connectors for a wide variety of systems, such as:
* Text files
* Apache Avro, Apache Parquet
* Databases (JDBC, Google Cloud Bigtable, Apache Cassandra, etc.)
* Messaging systems (Apache Kafka, Google Cloud Pub/Sub, etc.)

Key characteristics of PCollections include:

* **Immutability:** Once a PCollection is created, it cannot be changed. When you apply a transform to a PCollection, you get a new PCollection as output. This immutability is key to allowing Beam to retry operations safely and to enable optimizations.
* **Element Type:** Each PCollection has a specific data type for its elements. For SDKs with static typing (like Java), this is enforced at compile time. For dynamically typed SDKs (like Python), Beam often infers or can be told the type, which helps in choosing efficient processing strategies (e.g., selecting appropriate Coders).
* **Bounded vs. Unbounded:**
* A **bounded** PCollection represents a dataset of a known, fixed size. Data read from batch storage systems (like a file on HDFS or a table in BigQuery) typically forms a bounded PCollection.
* An **unbounded** PCollection represents a dataset that is continuously growing and has no defined end. This is typical for streaming data sources, like messages from Kafka or events from Pub/Sub. Beam's model provides powerful tools (like windowing) to process unbounded PCollections.

PCollections are the inputs and outputs for every `PTransform` in your pipeline. Common operations performed through PTransforms include:
* Element-wise transformations (e.g., formatting or converting each element).
* Filtering elements based on a condition.
* Grouping elements by a key.
* Aggregating elements within a group or across an entire PCollection.
* Combining data from multiple PCollections.

### SDKs

Beam supports multiple language-specific SDKs for writing pipelines against the Beam Model.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,52 +402,103 @@ private static Object convertRequiredField(
unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v);
}

static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema> fieldSchemas) {
List<Field> avroFields = new ArrayList<>();
for (TableFieldSchema bigQueryField : fieldSchemas) {
avroFields.add(convertField(bigQueryField));
private static String sanitizeNameForAvro(String name) {
if (name == null || name.isEmpty()) {
return "_empty_";
}
return Schema.createRecord(
schemaName,
"Translated Avro Schema for " + schemaName,
"org.apache.beam.sdk.io.gcp.bigquery",
false,
avroFields);
StringBuilder sanitized = new StringBuilder();
char firstChar = name.charAt(0);
if (!((firstChar >= 'A' && firstChar <= 'Z') || (firstChar >= 'a' && firstChar <= 'z') || firstChar == '_')) {
sanitized.append('_');
}
for (char ch : name.toCharArray()) {
if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || ch == '_') {
sanitized.append(ch);
} else {
sanitized.append('_');
}
}
return sanitized.toString();
}

@SuppressWarnings({
"nullness" // Avro library not annotated
})
private static Field convertField(TableFieldSchema bigQueryField) {
ImmutableCollection<Type> avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
if (avroTypes.isEmpty()) {
throw new IllegalArgumentException(
"Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type.");
}
static Schema toGenericAvroSchema(String topLevelRecordName, List<TableFieldSchema> fieldSchemas) {
return toGenericAvroSchemaRecursive(
sanitizeNameForAvro(topLevelRecordName),
fieldSchemas,
"org.apache.beam.sdk.io.gcp.bigquery",
new org.apache.avro.Schema.Names()
);
}

Type avroType = avroTypes.iterator().next();
Schema elementSchema;
if (avroType == Type.RECORD) {
elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields());
} else {
elementSchema = handleAvroLogicalTypes(bigQueryField, avroType);
}
Schema fieldSchema;
if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) {
fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);
} else if ("REQUIRED".equals(bigQueryField.getMode())) {
fieldSchema = elementSchema;
} else if ("REPEATED".equals(bigQueryField.getMode())) {
fieldSchema = Schema.createArray(elementSchema);
} else {
throw new IllegalArgumentException(
String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode()));
}
return new Field(
bigQueryField.getName(),
fieldSchema,
bigQueryField.getDescription(),
(Object) null /* Cast to avoid deprecated JsonNode constructor. */);
private static Schema toGenericAvroSchemaRecursive(
String recordName,
List<TableFieldSchema> fieldSchemas,
String currentNamespace,
org.apache.avro.Schema.Names names) {

String doc = "Translated Avro Schema for " + recordName + " in namespace " + currentNamespace;
String fullName = currentNamespace.isEmpty() ? recordName : currentNamespace + "." + recordName;
if (names.contains(fullName)) {
return names.get(fullName);
}

Schema recordSchema = Schema.createRecord(recordName, doc, currentNamespace, false);
names.add(recordSchema);

List<Schema.Field> avroFields = new ArrayList<>();
for (TableFieldSchema bigQueryField : fieldSchemas) {
avroFields.add(convertFieldRecursive(bigQueryField, recordSchema.getFullName(), names));
}
recordSchema.setFields(avroFields);
return recordSchema;
}

@SuppressWarnings({"nullness"})
private static Schema.Field convertFieldRecursive(
TableFieldSchema bigQueryField,
String parentAvroRecordFullName,
org.apache.avro.Schema.Names names) {

String sanitizedFieldName = sanitizeNameForAvro(bigQueryField.getName());
Schema elementSchema;
String bqType = bigQueryField.getType();
ImmutableCollection<Type> avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
if (avroTypes.isEmpty()) {
throw new IllegalArgumentException(
"Unable to map BigQuery field type " + bqType + " to avro type for field " + sanitizedFieldName);
}
Type avroType = avroTypes.iterator().next();

if ("RECORD".equals(bqType) || "STRUCT".equals(bqType)) {
elementSchema = toGenericAvroSchemaRecursive(
sanitizeNameForAvro(bigQueryField.getName()),
bigQueryField.getFields(),
parentAvroRecordFullName,
names
);
} else {
elementSchema = handleAvroLogicalTypes(bigQueryField, avroType);
}

Schema fieldSchema;
String mode = bigQueryField.getMode();
if (mode == null || "NULLABLE".equals(mode)) {
fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);
} else if ("REQUIRED".equals(mode)) {
fieldSchema = elementSchema;
} else if ("REPEATED".equals(mode)) {
fieldSchema = Schema.createArray(elementSchema);
} else {
throw new IllegalArgumentException(
String.format("Unknown BigQuery Field Mode: %s for field %s", mode, sanitizedFieldName));
}

Object defaultValue = null;
return new Schema.Field(
sanitizedFieldName,
fieldSchema,
bigQueryField.getDescription(),
defaultValue);
}

private static Schema handleAvroLogicalTypes(TableFieldSchema bigQueryField, Type avroType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
Expand Down Expand Up @@ -372,4 +373,50 @@ public Bird() {
associates[0] = new SubBird();
}
}

@Test
public void testToGenericAvroSchemaWithDuplicateFieldNamesInNestedRecords() {
TableFieldSchema stringSchema1 = new TableFieldSchema().setName("id1").setType("STRING");
TableFieldSchema stringSchema2 = new TableFieldSchema().setName("id2").setType("STRING");

TableFieldSchema identifier1Schema =
new TableFieldSchema()
.setName("identifier")
.setType("RECORD")
.setFields(java.util.Arrays.asList(stringSchema1));

TableFieldSchema identifier2Schema =
new TableFieldSchema()
.setName("identifier")
.setType("RECORD")
.setFields(java.util.Arrays.asList(stringSchema2));

TableFieldSchema recordSchema =
new TableFieldSchema()
.setName("record")
.setType("RECORD")
.setFields(java.util.Arrays.asList(identifier1Schema));

TableFieldSchema rootSchema =
new TableFieldSchema()
.setName("root")
.setType("RECORD")
.setFields(java.util.Arrays.asList(recordSchema, identifier2Schema));

final Schema output = BigQueryAvroUtils.toGenericAvroSchema("root", rootSchema.getFields());

// This line should throw SchemaParseException because Avro does not allow duplicate
// field names in the full schema, even if they are in different branches of the hierarchy
// when represented as BigQuery TableFieldSchema.
// Avro flattens or mangles names to avoid this, but our conversion might not,
// or it might produce a name that still clashes.
assertThrows(
org.apache.avro.SchemaParseException.class,
() -> {
// The act of trying to convert the schema to string will trigger the check
// for duplicate field names if the Avro library's Schema constructor or
// related methods perform this validation.
output.toString();
});
}
}
Loading