Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ output:
dataset: "" # No default (required)
table: "" # No default (required)
message_format: json
max_in_flight: 64
max_in_flight: 4
batching:
count: 0
byte_size: 0
Expand All @@ -66,7 +66,7 @@ output:
dataset: "" # No default (required)
table: "" # No default (required)
message_format: json
max_in_flight: 64
max_in_flight: 4
batching:
count: 0
byte_size: 0
Expand All @@ -92,13 +92,15 @@ This provides higher throughput and lower latency than the legacy streaming API
Messages can be formatted as JSON (default) or raw protobuf bytes.
When using JSON format the component automatically fetches the table schema and converts each message to the corresponding proto representation.

WARNING: The proto3 JSON mapping encodes int64 and uint64 values as strings.
JSON messages with integer fields must use string values (e.g. `"age": "30"` not `"age": 30`).
Otherwise the write will fail with an unmarshalling error.
WARNING: protojson encodes int64 and uint64 values as strings, bytes as base64-encoded strings, and timestamps as RFC 3339 strings.
JSON messages must follow these conventions (e.g. `"age": "30"`, `"data": "aGVsbG8="`, `"created_at": "2026-01-02T15:04:05Z"`); otherwise the write will fail with an unmarshalling error.

When batching is enabled the table name is resolved from the first message in each batch.
All messages in the same batch are written to that table.

The interpolated table name is sanitized for BigQuery: dots, hyphens, slashes and whitespace are replaced with underscores, non-ASCII-alphanumeric characters are stripped, leading digits are prefixed with `_`, and the result is truncated to 1024 characters.
A name that sanitizes to the empty string is rejected as a permanent error.


== Fields

Expand Down Expand Up @@ -149,7 +151,7 @@ The maximum number of messages to have in flight at a given time. Increase this

*Type*: `int`

*Default*: `64`
*Default*: `4`

=== `batching`

Expand Down
184 changes: 171 additions & 13 deletions internal/impl/gcp/enterprise/bigquery/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/reflect/protoreflect"

_ "github.com/redpanda-data/benthos/v4/public/components/io"
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
Expand All @@ -31,14 +33,16 @@ import (
"github.com/redpanda-data/connect/v4/internal/license"
)

func TestIntegrationBigQueryWriteAPI(t *testing.T) {
integration.CheckSkip(t)
// bqEmulator holds connection details for a running BigQuery emulator container.
type bqEmulator struct {
httpEndpoint string
grpcEndpoint string
bqClient *bigquery.Client
}

const (
projectID = "test-project"
datasetID = "test_dataset"
tableID = "test_table"
)
// startEmulator launches a BigQuery emulator container and returns connection details.
func startEmulator(t *testing.T, projectID, datasetID string) *bqEmulator {
t.Helper()

t.Log("Given a BigQuery emulator running with HTTP and gRPC ports")
ctr, err := testcontainers.Run(t.Context(),
Expand All @@ -54,7 +58,10 @@ func TestIntegrationBigQueryWriteAPI(t *testing.T) {
},
}),
testcontainers.WithWaitStrategy(
wait.ForListeningPort("9050/tcp").WithStartupTimeout(60*time.Second),
wait.ForAll(
wait.ForListeningPort("9050/tcp"),
wait.ForListeningPort("9060/tcp"),
).WithDeadline(60*time.Second),
),
)
testcontainers.CleanupContainer(t, ctr)
Expand All @@ -70,19 +77,50 @@ func TestIntegrationBigQueryWriteAPI(t *testing.T) {
httpEndpoint := fmt.Sprintf("http://%s:%s", host, httpPort.Port())
grpcEndpoint := fmt.Sprintf("%s:%s", host, grpcPort.Port())

t.Log("Given a table with name and age columns")
bqClient, err := bigquery.NewClient(t.Context(), projectID,
option.WithoutAuthentication(),
option.WithEndpoint(httpEndpoint),
)
require.NoError(t, err)
defer bqClient.Close()
t.Cleanup(func() { bqClient.Close() })

return &bqEmulator{
httpEndpoint: httpEndpoint,
grpcEndpoint: grpcEndpoint,
bqClient: bqClient,
}
}

// bqSchemaToMessageDescriptor converts a BigQuery schema to a protoreflect.MessageDescriptor
// via the adapt pipeline, for use in tests that need proto descriptors.
func bqSchemaToMessageDescriptor(t *testing.T, schema bigquery.Schema) protoreflect.MessageDescriptor {
t.Helper()
tableSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
require.NoError(t, err)
descriptor, err := adapt.StorageSchemaToProto2Descriptor(tableSchema, "root")
require.NoError(t, err)
md, ok := descriptor.(protoreflect.MessageDescriptor)
require.True(t, ok)
return md
}

func TestIntegrationBigQueryWriteAPI(t *testing.T) {
integration.CheckSkip(t)

const (
projectID = "test-project"
datasetID = "test_dataset"
tableID = "test_table"
)

emu := startEmulator(t, projectID, datasetID)

t.Log("Given a table with name and age columns")
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "age", Type: bigquery.IntegerFieldType, Required: true},
}
err = bqClient.Dataset(datasetID).Table(tableID).Create(t.Context(), &bigquery.TableMetadata{
err := emu.bqClient.Dataset(datasetID).Table(tableID).Create(t.Context(), &bigquery.TableMetadata{
Schema: schema,
})
require.NoError(t, err)
Expand All @@ -102,7 +140,7 @@ gcp_bigquery_write_api:
endpoint:
http: %s
grpc: %s
`, projectID, datasetID, tableID, httpEndpoint, grpcEndpoint)))
`, projectID, datasetID, tableID, emu.httpEndpoint, emu.grpcEndpoint)))

stream, err := sb.Build()
require.NoError(t, err)
Expand Down Expand Up @@ -131,7 +169,7 @@ gcp_bigquery_write_api:
// Then all 3 rows land in the BigQuery table.
t.Log("Then all 3 rows are present in the BigQuery table")
assert.Eventually(t, func() bool {
it := bqClient.Dataset(datasetID).Table(tableID).Read(t.Context())
it := emu.bqClient.Dataset(datasetID).Table(tableID).Read(t.Context())
var count int
for {
var row map[string]bigquery.Value
Expand All @@ -147,3 +185,123 @@ gcp_bigquery_write_api:
return count >= 3
}, 30*time.Second, 500*time.Millisecond)
}

func TestIntegrationSchemaEvolution(t *testing.T) {
integration.CheckSkip(t)

const (
projectID = "test-project"
datasetID = "test_dataset"
tableID = "test_evolution"
)

emu := startEmulator(t, projectID, datasetID)

t.Log("Given a table with name and age columns")
err := emu.bqClient.Dataset(datasetID).Table(tableID).Create(t.Context(), &bigquery.TableMetadata{
Schema: bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType},
{Name: "age", Type: bigquery.IntegerFieldType},
},
})
require.NoError(t, err)

// Build a proto descriptor with an extra "email" field.
expandedSchema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType},
{Name: "age", Type: bigquery.IntegerFieldType},
{Name: "email", Type: bigquery.StringFieldType},
}
md := bqSchemaToMessageDescriptor(t, expandedSchema)

t.Log("When we call Evolve with a descriptor that has an extra email column")
evolver := &schemaEvolver{log: service.MockResources().Logger()}
evolved, err := evolver.Evolve(t.Context(), emu.bqClient, datasetID, tableID, md)
require.NoError(t, err)
assert.True(t, evolved, "expected columns to be added")

t.Log("Then the table schema includes the email column")
meta, err := emu.bqClient.Dataset(datasetID).Table(tableID).Metadata(t.Context())
require.NoError(t, err)

var colNames []string
for _, f := range meta.Schema {
colNames = append(colNames, f.Name)
}
assert.Contains(t, colNames, "email")
assert.Contains(t, colNames, "name")
assert.Contains(t, colNames, "age")
}

func TestIntegrationTableNameSanitization(t *testing.T) {
integration.CheckSkip(t)

const (
projectID = "test-project"
datasetID = "test_dataset"
)

emu := startEmulator(t, projectID, datasetID)

// Create a table with the sanitized name. The output will receive
// "events.user.created" but sanitize it to "events_user_created".
sanitizedTableID := "events_user_created"
err := emu.bqClient.Dataset(datasetID).Table(sanitizedTableID).Create(t.Context(), &bigquery.TableMetadata{
Schema: bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType},
},
})
require.NoError(t, err)

t.Log("When we send a message with a dot-separated table name")
sb := service.NewStreamBuilder()
require.NoError(t, sb.SetLoggerYAML(`level: DEBUG`))

sendFn, err := sb.AddProducerFunc()
require.NoError(t, err)

// Use a static table name with dots — sanitization should convert to underscores.
require.NoError(t, sb.AddOutputYAML(fmt.Sprintf(`
gcp_bigquery_write_api:
project: %s
dataset: %s
table: events.user.created
endpoint:
http: %s
grpc: %s
`, projectID, datasetID, emu.httpEndpoint, emu.grpcEndpoint)))

stream, err := sb.Build()
require.NoError(t, err)
license.InjectTestService(stream.Resources())

go func() {
if err := stream.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("stream error: %v", err)
}
}()

t.Cleanup(func() {
if err := stream.StopWithin(10 * time.Second); err != nil {
t.Log(err)
}
})

require.NoError(t, sendFn(t.Context(), service.NewMessage([]byte(`{"name":"alice"}`))))

t.Log("Then the row lands in the sanitized table name")
assert.Eventually(t, func() bool {
it := emu.bqClient.Dataset(datasetID).Table(sanitizedTableID).Read(t.Context())
var count int
for {
var row map[string]bigquery.Value
if err := it.Next(&row); errors.Is(err, iterator.Done) {
break
} else if err != nil {
return false
}
count++
}
return count >= 1
}, 30*time.Second, 500*time.Millisecond)
}
Loading