|
| 1 | +# |
| 2 | +# Copyright (2024) The Delta Lake Project Authors. |
| 3 | +# |
| 4 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +# you may not use this file except in compliance with the License. |
| 6 | +# You may obtain a copy of the License at |
| 7 | +# |
| 8 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +# |
| 10 | +# Unless required by applicable law or agreed to in writing, software |
| 11 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +# See the License for the specific language governing permissions and |
| 14 | +# limitations under the License. |
| 15 | +# |
| 16 | + |
| 17 | +import os |
| 18 | +import sys |
| 19 | +import threading |
| 20 | +import json |
| 21 | + |
| 22 | +from pyspark.sql import SparkSession |
| 23 | +from multiprocessing.pool import ThreadPool |
| 24 | +import time |
| 25 | +import boto3 |
| 26 | +import uuid |
| 27 | + |
| 28 | +""" |
| 29 | +
|
| 30 | +Run this script in root dir of repository: |
| 31 | +
|
| 32 | +# ===== Mandatory input from user ===== |
| 33 | +export RUN_ID=run001 |
| 34 | +export S3_BUCKET=delta-lake-dynamodb-test-00 |
| 35 | +export AWS_DEFAULT_REGION=us-west-2 |
| 36 | +
|
| 37 | +# ===== Optional input from user ===== |
| 38 | +export DELTA_CONCURRENT_WRITERS=20 |
| 39 | +export DELTA_CONCURRENT_READERS=2 |
| 40 | +export DELTA_NUM_ROWS=200 |
| 41 | +export DELTA_DYNAMO_ENDPOINT=https://dynamodb.us-west-2.amazonaws.com |
| 42 | +
|
| 43 | +# ===== Optional input from user (we calculate defaults using S3_BUCKET and RUN_ID) ===== |
| 44 | +export RELATIVE_DELTA_TABLE_PATH=___ |
| 45 | +export DELTA_DYNAMO_TABLE_NAME=___ |
| 46 | +
|
| 47 | +./run-integration-tests.py --use-local --run-dynamodb-commit-coordinator-integration-tests \ |
| 48 | + --dbb-packages org.apache.hadoop:hadoop-aws:3.4.0,com.amazonaws:aws-java-sdk-bundle:1.12.262 \ |
| 49 | + --dbb-conf io.delta.storage.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider \ |
| 50 | + spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider |
| 51 | +""" |
| 52 | + |
| 53 | +# ===== Mandatory input from user ===== |
| 54 | +run_id = os.environ.get("RUN_ID") |
| 55 | +s3_bucket = os.environ.get("S3_BUCKET") |
| 56 | + |
| 57 | +# ===== Optional input from user ===== |
| 58 | +concurrent_writers = int(os.environ.get("DELTA_CONCURRENT_WRITERS", 2)) |
| 59 | +concurrent_readers = int(os.environ.get("DELTA_CONCURRENT_READERS", 2)) |
| 60 | +num_rows = int(os.environ.get("DELTA_NUM_ROWS", 16)) |
| 61 | +dynamo_endpoint = os.environ.get("DELTA_DYNAMO_ENDPOINT", "https://dynamodb.us-west-2.amazonaws.com") |
| 62 | + |
| 63 | +# ===== Optional input from user (we calculate defaults using RUN_ID) ===== |
| 64 | +relative_delta_table_path = os.environ.get("RELATIVE_DELTA_TABLE_PATH", f"tables/table_ddb_cs_{run_id}_{str(uuid.uuid4())}")\ |
| 65 | + .rstrip("/") |
| 66 | +dynamo_table_name = os.environ.get("DELTA_DYNAMO_TABLE_NAME", "test_ddb_cs_table_" + run_id) |
| 67 | + |
| 68 | +relative_delta_table1_path = relative_delta_table_path + "_tab1" |
| 69 | +relative_delta_table2_path = relative_delta_table_path + "_tab2" |
| 70 | +bucket_prefix = "s3a://" + s3_bucket + "/" |
| 71 | +delta_table1_path = bucket_prefix + relative_delta_table1_path |
| 72 | +delta_table2_path = bucket_prefix + relative_delta_table2_path |
| 73 | + |
| 74 | +if delta_table1_path is None: |
| 75 | + print(f"\nSkipping Python test {os.path.basename(__file__)} due to the missing env variable " |
| 76 | + f"`DELTA_TABLE_PATH`\n=====================") |
| 77 | + sys.exit(0) |
| 78 | + |
| 79 | +dynamodb_commit_coordinator_conf = json.dumps({ |
| 80 | + "dynamoDBTableName": dynamo_table_name, |
| 81 | + "dynamoDBEndpoint": dynamo_endpoint |
| 82 | +}) |
| 83 | + |
| 84 | +test_log = f""" |
| 85 | +========================================== |
| 86 | +run id: {run_id} |
| 87 | +delta table1 path: {delta_table1_path} |
| 88 | +delta table2 path: {delta_table1_path} |
| 89 | +dynamo table name: {dynamo_table_name} |
| 90 | +
|
| 91 | +concurrent writers: {concurrent_writers} |
| 92 | +concurrent readers: {concurrent_readers} |
| 93 | +number of rows: {num_rows} |
| 94 | +
|
| 95 | +relative_delta_table_path: {relative_delta_table_path} |
| 96 | +========================================== |
| 97 | +""" |
| 98 | +print(test_log) |
| 99 | + |
| 100 | +commit_coordinator_property_key = "coordinatedCommits.commitCoordinator" |
| 101 | +property_key_suffix = "-preview" |
| 102 | + |
| 103 | +spark = SparkSession \ |
| 104 | + .builder \ |
| 105 | + .appName("utilities") \ |
| 106 | + .master("local[*]") \ |
| 107 | + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ |
| 108 | + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ |
| 109 | + .config(f"spark.databricks.delta.properties.defaults.{commit_coordinator_property_key}{property_key_suffix}", "dynamodb") \ |
| 110 | + .config(f"spark.databricks.delta.properties.defaults.coordinatedCommits.commitCoordinatorConf{property_key_suffix}", dynamodb_commit_coordinator_conf) \ |
| 111 | + .config(f"spark.databricks.delta.coordinatedCommits.commitCoordinator.dynamodb.awsCredentialsProviderName", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \ |
| 112 | + .getOrCreate() |
| 113 | + |
| 114 | +print("Creating table at path ", delta_table1_path) |
| 115 | +spark.sql(f"CREATE table delta.`{delta_table1_path}` (id int, a int) USING DELTA") # commit 0 |
| 116 | + |
| 117 | + |
| 118 | +def write_tx(n): |
| 119 | + print("writing:", [n, n]) |
| 120 | + spark.sql(f"INSERT INTO delta.`{delta_table1_path}` VALUES ({n}, {n})") |
| 121 | + |
| 122 | + |
| 123 | +stop_reading = threading.Event() |
| 124 | + |
| 125 | + |
| 126 | +def read_data(): |
| 127 | + while not stop_reading.is_set(): |
| 128 | + print("Reading {:d} rows ...".format( |
| 129 | + spark.read.format("delta").load(delta_table1_path).distinct().count()) |
| 130 | + ) |
| 131 | + time.sleep(1) |
| 132 | + |
| 133 | + |
| 134 | +def start_read_thread(): |
| 135 | + thread = threading.Thread(target=read_data) |
| 136 | + thread.start() |
| 137 | + return thread |
| 138 | + |
| 139 | + |
| 140 | +print("===================== Starting reads and writes =====================") |
| 141 | +read_threads = [start_read_thread() for i in range(concurrent_readers)] |
| 142 | +pool = ThreadPool(concurrent_writers) |
| 143 | +start_t = time.time() |
| 144 | +pool.map(write_tx, range(num_rows)) |
| 145 | +stop_reading.set() |
| 146 | + |
| 147 | +for thread in read_threads: |
| 148 | + thread.join() |
| 149 | + |
| 150 | +print("===================== Evaluating number of written rows =====================") |
| 151 | +actual = spark.read.format("delta").load(delta_table1_path).distinct().count() |
| 152 | +print("Actual number of written rows:", actual) |
| 153 | +print("Expected number of written rows:", num_rows) |
| 154 | +assert actual == num_rows |
| 155 | + |
| 156 | +t = time.time() - start_t |
| 157 | +print(f"{num_rows / t:.02f} tx / sec") |
| 158 | + |
| 159 | +current_table_version = num_rows |
| 160 | +dynamodb = boto3.resource('dynamodb', endpoint_url=dynamo_endpoint) |
| 161 | +ddb_table = dynamodb.Table(dynamo_table_name) |
| 162 | + |
| 163 | +def get_dynamo_db_table_entry_id(table_path): |
| 164 | + table_properties = spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").select("properties").collect()[0][0] |
| 165 | + table_conf = table_properties.get(f"delta.coordinatedCommits.tableConf{property_key_suffix}", None) |
| 166 | + if table_conf is None: |
| 167 | + return None |
| 168 | + return json.loads(table_conf).get("tableId", None) |
| 169 | + |
| 170 | +def validate_table_version_as_per_dynamodb(table_path, expected_version): |
| 171 | + table_id = get_dynamo_db_table_entry_id(table_path) |
| 172 | + assert table_id is not None |
| 173 | + print(f"Validating table version for tableId: {table_id}") |
| 174 | + item = ddb_table.get_item( |
| 175 | + Key={ |
| 176 | + 'tableId': table_id |
| 177 | + }, |
| 178 | + AttributesToGet = ['tableVersion'] |
| 179 | + )['Item'] |
| 180 | + current_table_version = int(item['tableVersion']) |
| 181 | + assert current_table_version == expected_version |
| 182 | + |
| 183 | +delta_table_version = num_rows |
| 184 | +validate_table_version_as_per_dynamodb(delta_table1_path, delta_table_version) |
| 185 | + |
| 186 | +def perform_insert_and_validate(table_path, insert_value): |
| 187 | + spark.sql(f"INSERT INTO delta.`{table_path}` VALUES ({insert_value}, {insert_value})") |
| 188 | + res = spark.sql(f"SELECT 1 FROM delta.`{table_path}` WHERE id = {insert_value} AND a = {insert_value}").collect() |
| 189 | + assert(len(res) == 1) |
| 190 | + |
| 191 | +def check_for_delta_file_in_filesystem(delta_table_path, version, is_backfilled, should_exist): |
| 192 | + # Check for backfilled commit |
| 193 | + s3_client = boto3.client("s3") |
| 194 | + relative_table_path = delta_table_path.replace(bucket_prefix, "") |
| 195 | + relative_delta_log_path = relative_table_path + "/_delta_log/" |
| 196 | + relative_commit_folder_path = relative_delta_log_path if is_backfilled else os.path.join(relative_delta_log_path, "_commits") |
| 197 | + listing_prefix = os.path.join(relative_commit_folder_path, f"{version:020}.").lstrip("/") |
| 198 | + print(f"querying {listing_prefix} from bucket {s3_bucket} for version {version}") |
| 199 | + response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=listing_prefix) |
| 200 | + if 'Contents' not in response: |
| 201 | + assert(not should_exist, f"Listing for prefix {listing_prefix} did not return any files even though it should have.") |
| 202 | + return |
| 203 | + items = response['Contents'] |
| 204 | + commits = filter(lambda key: ".json" in key and ".tmp" not in key, map(lambda x: os.path.basename(x['Key']), items)) |
| 205 | + expected_count = 1 if should_exist else 0 |
| 206 | + matching_files = list(filter(lambda key: key.split('.')[0].endswith(f"{version:020}"), commits)) |
| 207 | + assert(len(matching_files) == expected_count) |
| 208 | + |
| 209 | +def test_downgrades_and_upgrades(delta_table_path, delta_table_version): |
| 210 | + # Downgrade to filesystem based commits should work |
| 211 | + print("===================== Evaluating downgrade to filesystem based commits =====================") |
| 212 | + spark.sql(f"ALTER TABLE delta.`{delta_table_path}` UNSET TBLPROPERTIES ('delta.{commit_coordinator_property_key}{property_key_suffix}')") |
| 213 | + delta_table_version += 1 |
| 214 | + |
| 215 | + perform_insert_and_validate(delta_table_path, 9990) |
| 216 | + delta_table_version += 1 |
| 217 | + |
| 218 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=True, should_exist=True) |
| 219 | + # No UUID delta file should have been created for this version |
| 220 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=False, should_exist=False) |
| 221 | + print("[SUCCESS] Downgrade to filesystem based commits worked") |
| 222 | + |
| 223 | + # Upgrade to coordinated commits should work |
| 224 | + print("===================== Evaluating upgrade to coordinated commits =====================") |
| 225 | + spark.sql(f"ALTER TABLE delta.`{delta_table_path}` SET TBLPROPERTIES ('delta.{commit_coordinator_property_key}{property_key_suffix}' = 'dynamodb')") |
| 226 | + delta_table_version += 1 |
| 227 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=True, should_exist=True) |
| 228 | + # No UUID delta file should have been created for the enablement commit |
| 229 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=False, should_exist=False) |
| 230 | + |
| 231 | + perform_insert_and_validate(delta_table_path, 9991) |
| 232 | + delta_table_version += 1 |
| 233 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=True, should_exist=True) |
| 234 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=False, should_exist=True) |
| 235 | + |
| 236 | + perform_insert_and_validate(delta_table_path, 9992) |
| 237 | + delta_table_version += 1 |
| 238 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=True, should_exist=True) |
| 239 | + check_for_delta_file_in_filesystem(delta_table_path, delta_table_version, is_backfilled=False, should_exist=True) |
| 240 | + validate_table_version_as_per_dynamodb(delta_table_path, delta_table_version) |
| 241 | + |
| 242 | + print("[SUCCESS] Upgrade to coordinated commits worked") |
| 243 | + |
| 244 | +test_downgrades_and_upgrades(delta_table1_path, delta_table_version) |
| 245 | + |
| 246 | + |
| 247 | + |
| 248 | +print("[SUCCESS] All tests passed for Table 1") |
| 249 | + |
| 250 | +print("===================== Evaluating Table 2 =====================") |
| 251 | + |
| 252 | +# Table 2 is created with coordinated commits disabled |
| 253 | +spark.conf.unset(f"spark.databricks.delta.properties.defaults.{commit_coordinator_property_key}{property_key_suffix}") |
| 254 | + |
| 255 | +spark.sql(f"CREATE table delta.`{delta_table2_path}` (id int, a int) USING DELTA") # commit 0 |
| 256 | +table_2_version = 0 |
| 257 | + |
| 258 | +perform_insert_and_validate(delta_table2_path, 8000) |
| 259 | +table_2_version += 1 |
| 260 | + |
| 261 | +check_for_delta_file_in_filesystem(delta_table2_path, table_2_version, is_backfilled=True, should_exist=True) |
| 262 | +# No UUID delta file should have been created for this version |
| 263 | +check_for_delta_file_in_filesystem(delta_table2_path, table_2_version, is_backfilled=False, should_exist=False) |
| 264 | + |
| 265 | +print("===================== Evaluating Upgrade of Table 2 =====================") |
| 266 | + |
| 267 | +spark.sql(f"ALTER TABLE delta.`{delta_table2_path}` SET TBLPROPERTIES ('delta.{commit_coordinator_property_key}{property_key_suffix}' = 'dynamodb')") |
| 268 | +table_2_version += 1 |
| 269 | + |
| 270 | +perform_insert_and_validate(delta_table2_path, 8001) |
| 271 | +table_2_version += 1 |
| 272 | + |
| 273 | +check_for_delta_file_in_filesystem(delta_table2_path, table_2_version, is_backfilled=True, should_exist=True) |
| 274 | +# This version should have a UUID delta file |
| 275 | +check_for_delta_file_in_filesystem(delta_table2_path, table_2_version, is_backfilled=True, should_exist=True) |
| 276 | + |
| 277 | +test_downgrades_and_upgrades(delta_table2_path, table_2_version) |
| 278 | + |
| 279 | +print("[SUCCESS] All tests passed for Table 2") |
0 commit comments