Skip to content

Yaml IT - Phase 3a #34782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 27, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies {
permitUnusedDeclared 'com.google.cloud:alloydb-jdbc-connector:1.2.0'
testImplementation library.java.junit
testImplementation library.java.mockito_core
runtimeOnly ("org.xerial:sqlite-jdbc:3.49.1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a testImplementation? Why do we need this?

Copy link
Collaborator Author

@derrickaw derrickaw May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. My understanding is that testImplementation is for tests inside the schemaio-expansion-service module. We need the runtimeOnly so that its packaged in the jar for use later.
  2. It contains the driver for connecting to the DB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense. I think the thing we're doing here is not really a test change FWIW, it is enabling JDBCIO to connect to a new kind of DB (this is exposing a new feature in x-lang mode).

I think this is ok since we are already packaging other drivers (e.g. com.microsoft.sqlserver:mssql-jdbc:12.2.0.jre11). The only real concern I would have is the size of the jar, which seems fine.

}

task runExpansionService (type: JavaExec) {
Expand Down
275 changes: 275 additions & 0 deletions sdks/python/apache_beam/yaml/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@
import itertools
import logging
import os
import sqlite3
import unittest
import uuid

import mock
import mysql.connector
import psycopg2
import pytds
import sqlalchemy
import yaml
from testcontainers.mssql import SqlServerContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.postgres import PostgresContainer

import apache_beam as beam
from apache_beam.io import filesystems
Expand All @@ -42,13 +50,45 @@

@contextlib.contextmanager
def gcs_temp_dir(bucket):
"""Context manager to create and clean up a temporary GCS directory.

Creates a unique temporary directory within the specified GCS bucket
and yields the path. Upon exiting the context, the directory and its
contents are deleted.

Args:
bucket (str): The GCS bucket name (e.g., 'gs://my-bucket').

Yields:
str: The full path to the created temporary GCS directory.
Example: 'gs://my-bucket/yaml-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
"""
gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4())
yield gcs_tempdir
filesystems.FileSystems.delete([gcs_tempdir])


@contextlib.contextmanager
def temp_spanner_table(project, prefix='temp_spanner_db_'):
"""Context manager to create and clean up a temporary Spanner database and
table.

Creates a unique temporary Spanner database within the specified project
and a predefined table named 'tmp_table' with columns ['UserId', 'Key'].
It yields connection details for the created resources. Upon exiting the
context, the temporary database (and its table) is deleted.

Args:
project (str): The Google Cloud project ID.
prefix (str): A prefix to use for the temporary database name.
Defaults to 'temp_spanner_db_'.

Yields:
list[str]: A list containing connection details:
[project_id, instance_id, database_id, table_name, list_of_columns].
Example: ['my-project', 'beam-test', 'temp_spanner_db_...', 'tmp_table',
['UserId', 'Key']]
"""
spanner_client = SpannerWrapper(project)
spanner_client._create_database()
instance = "beam-test"
Expand All @@ -65,6 +105,26 @@ def temp_spanner_table(project, prefix='temp_spanner_db_'):

@contextlib.contextmanager
def temp_bigquery_table(project, prefix='yaml_bq_it_'):
"""Context manager to create and clean up a temporary BigQuery dataset.

Creates a unique temporary BigQuery dataset within the specified project.
It yields a placeholder table name string within that dataset (e.g.,
'project.dataset_id.tmp_table'). The actual table is expected to be
created by the test using this context.

Upon exiting the context, the temporary dataset and all its contents
(including any tables created within it) are deleted.

Args:
project (str): The Google Cloud project ID.
prefix (str): A prefix to use for the temporary dataset name.
Defaults to 'yaml_bq_it_'.

Yields:
str: The full path for a temporary BigQuery table within the created
dataset.
Example: 'my-project.yaml_bq_it_a1b2c3d4e5f6...tmp_table'
"""
bigquery_client = BigQueryWrapper()
dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex)
bigquery_client.get_or_create_dataset(project, dataset_id)
Expand All @@ -76,6 +136,221 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'):
bigquery_client.client.datasets.Delete(request)


@contextlib.contextmanager
def temp_sqlite_database(prefix='yaml_jdbc_it_'):
"""Context manager to provide a temporary SQLite database via JDBC for
testing.

This function creates a temporary SQLite database file on the local
filesystem. It establishes a connection using 'sqlite3', creates a predefined
'tmp_table', and then yields a JDBC connection string suitable for use in
tests that require a generic JDBC connection (specifically configured for
SQLite in this case).

The SQLite database file is automatically cleaned up (closed and deleted)
when the context manager exits.

Args:
prefix (str): A prefix to use for the temporary database file name.

Yields:
str: A JDBC connection string for the temporary SQLite database.
Example format: "jdbc:sqlite:<path_to_db_file>"

Raises:
sqlite3.Error: If there's an error connecting to or interacting with
the SQLite database during setup.
Exception: Any other exception encountered during the setup or cleanup
process.
"""
conn = cursor = None
try:
# Establish connection to the temp file
db_name = f'{prefix}{uuid.uuid4().hex}.db'
conn = sqlite3.connect(db_name)
cursor = conn.cursor()

# Create a temp table for tests
cursor.execute(
'''
CREATE TABLE tmp_table (
value INTEGER PRIMARY KEY,
rank INTEGER
)
''')
conn.commit()
yield f'jdbc:sqlite:{db_name}'
except (sqlite3.Error, Exception) as err:
logging.error("Error interacting with temporary SQLite DB: %s", err)
raise err
finally:
# Close connections
if cursor:
cursor.close()
if conn:
conn.close()
try:
if os.path.exists(db_name):
os.remove(db_name)
except Exception as err:
logging.error("Error deleting temporary SQLite DB: %s", err)
raise err


@contextlib.contextmanager
def temp_mysql_database():
"""Context manager to provide a temporary MySQL database for testing.

This function utilizes the 'testcontainers' library to spin up a
MySQL instance within a Docker container. It then connects
to this temporary database using 'mysql.connector', creates a predefined
'tmp_table', and yields a JDBC connection string suitable for use in tests.

The Docker container and the database instance are automatically managed
and torn down when the context manager exits.

Yields:
str: A JDBC connection string for the temporary MySQL database.
Example format:
"jdbc:mysql://<host>:<port>/<db_name>?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres quite a lot of repetition for setting up all the jdbc compatible databases. Can we simplify it by using SqlAlcemy like in jdbcio tests

cls.engines[db_type] = sqlalchemy.create_engine(
?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thanks

user=<user>&password=<password>"

Raises:
mysql.connector.Error: If there's an error connecting to or interacting
with the MySQL database during setup.
Exception: Any other exception encountered during the setup process.
"""
with MySqlContainer() as mysql_container:
try:
# Make connection to temp database and create tmp table
engine = sqlalchemy.create_engine(mysql_container.get_connection_url())
with engine.begin() as connection:
connection.execute(
sqlalchemy.text(
"CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);"))

# Construct the JDBC url for connections later on by tests
jdbc_url = (
f"jdbc:mysql://{mysql_container.get_container_host_ip()}:"
f"{mysql_container.get_exposed_port(mysql_container.port_to_expose)}/"
f"{mysql_container.MYSQL_DATABASE}?"
f"user={mysql_container.MYSQL_USER}&"
f"password={mysql_container.MYSQL_PASSWORD}")

yield jdbc_url
except mysql.connector.Error as err:
logging.error("Error interacting with temporary MySQL DB: %s", err)
raise err


@contextlib.contextmanager
def temp_postgres_database():
"""Context manager to provide a temporary PostgreSQL database for testing.

This function utilizes the 'testcontainers' library to spin up a
PostgreSQL instance within a Docker container. It then connects
to this temporary database using 'psycopg2', creates a predefined 'tmp_table',
and yields a JDBC connection string suitable for use in tests.

The Docker container and the database instance are automatically managed
and torn down when the context manager exits.

Yields:
str: A JDBC connection string for the temporary PostgreSQL database.
Example format:
"jdbc:postgresql://<host>:<port>/<db_name>?
user=<user>&password=<password>"

Raises:
psycopg2.Error: If there's an error connecting to or interacting with
the PostgreSQL database during setup.
Exception: Any other exception encountered during the setup process.
"""
default_port = 5432

# Start the postgress container using testcontainers
with PostgresContainer(port=default_port) as postgres_container:
try:
# Make connection to temp database and create tmp table
engine = sqlalchemy.create_engine(postgres_container.get_connection_url())
with engine.begin() as connection:
connection.execute(
sqlalchemy.text(
"CREATE TABLE tmp_table (value INTEGER, rank INTEGER);"))

# Construct the JDBC url for connections later on by tests
jdbc_url = (
f"jdbc:postgresql://{postgres_container.get_container_host_ip()}:"
f"{postgres_container.get_exposed_port(default_port)}/"
f"{postgres_container.POSTGRES_DB}?"
f"user={postgres_container.POSTGRES_USER}&"
f"password={postgres_container.POSTGRES_PASSWORD}")

yield jdbc_url
except (psycopg2.Error, Exception) as err:
logging.error("Error interacting with temporary Postgres DB: %s", err)
raise err


@contextlib.contextmanager
def temp_sqlserver_database():
"""Context manager to provide a temporary SQL Server database for testing.

This function utilizes the 'testcontainers' library to spin up a
Microsoft SQL Server instance within a Docker container. It then connects
to this temporary database using 'pytds', creates a predefined 'tmp_table',
and yields a JDBC connection string suitable for use in tests.

The Docker container and the database instance are automatically managed
and torn down when the context manager exits.

Yields:
str: A JDBC connection string for the temporary SQL Server database.
Example format:
"jdbc:sqlserver://<host>:<port>;
databaseName=<db_name>;
user=<user>;
password=<password>;
encrypt=false;
trustServerCertificate=true"

Raises:
pytds.Error: If there's an error connecting to or interacting with
the SQL Server database during setup.
Exception: Any other exception encountered during the setup process.
"""
default_port = 1433

# Start the sql server using testcontainers
with SqlServerContainer(port=default_port,
dialect='mssql+pytds') as sqlserver_container:
try:
# Make connection to temp database and create tmp table
engine = sqlalchemy.create_engine(
sqlserver_container.get_connection_url())
with engine.begin() as connection:
connection.execute(
sqlalchemy.text(
"CREATE TABLE tmp_table (value INTEGER, rank INTEGER);"))

# Construct the JDBC url for connections later on by tests
# NOTE: encrypt=false and trustServerCertificate=true is generally
# needed for test container connections without proper certificates setup
jdbc_url = (
f"jdbc:sqlserver://{sqlserver_container.get_container_host_ip()}:"
f"{int(sqlserver_container.get_exposed_port(default_port))};"
f"databaseName={sqlserver_container.SQLSERVER_DBNAME};"
f"user={sqlserver_container.SQLSERVER_USER};"
f"password={sqlserver_container.SQLSERVER_PASSWORD};"
f"encrypt=true;"
f"trustServerCertificate=true")

yield jdbc_url
except (pytds.Error, Exception) as err:
logging.error("Error interacting with temporary SQL Server DB: %s", err)
raise err


def replace_recursive(spec, vars):
if isinstance(spec, dict):
return {
Expand Down
55 changes: 55 additions & 0 deletions sdks/python/apache_beam/yaml/tests/jdbc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


fixtures:
- name: TEMP_DB
type: "apache_beam.yaml.integration_tests.temp_sqlite_database"

pipelines:
# Jdbc write pipeline
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {value: 123, rank: 0}
- {value: 456, rank: 1}
- {value: 789, rank: 2}
- type: WriteToJdbc
config:
url: "{TEMP_DB}"
driver_class_name: "org.sqlite.JDBC"
query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)"

# Jdbc read pipeline
- pipeline:
type: chain
transforms:
- type: ReadFromJdbc
config:
url: "{TEMP_DB}"
driver_class_name: "org.sqlite.JDBC"
query: "SELECT * FROM tmp_table"
- type: AssertEqual
config:
elements:
- {value: 123, rank: 0}
- {value: 456, rank: 1}
- {value: 789, rank: 2}

Loading
Loading