Skip to content

Commit 6880213

Browse files
committed
Mark distributed tests as integration to skip github actions workflow
1 parent 98104b4 commit 6880213

File tree

10 files changed

+59
-51
lines changed

10 files changed

+59
-51
lines changed

deltacat/compute/converter/converter_session.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ def converter_session(
116116
params.position_delete_for_multiple_data_files
117117
)
118118

119+
logger.info(f"Fetching all bucket files for table {table_identifier}...")
119120
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(
120121
table=iceberg_table
121122
)
123+
logger.info(
124+
f"Fetched files - data: {len(data_file_dict)}, equality_delete: {len(equality_delete_dict)}, pos_delete: {len(pos_delete_dict)}"
125+
)
122126

123127
convert_input_files_for_all_buckets = group_all_files_to_each_bucket(
124128
data_file_dict=data_file_dict,
@@ -181,7 +185,9 @@ def convert_input_provider(index: int, item: Any) -> Dict[str, ConvertInput]:
181185
)
182186
}
183187

184-
logger.info(f"Getting remote convert tasks...")
188+
logger.info(f"Creating {len(convert_input_files_for_all_buckets)} convert tasks...")
189+
logger.info(f"Task max parallelism: {task_max_parallelism}")
190+
185191
# Ray remote task: convert
186192
# TODO: Add split mechanism to split large buckets
187193
convert_tasks_pending = invoke_parallel(
@@ -193,10 +199,12 @@ def convert_input_provider(index: int, item: Any) -> Dict[str, ConvertInput]:
193199
)
194200

195201
to_be_deleted_files_list: List[DataFile] = []
196-
logger.info(f"Finished invoking {len(convert_tasks_pending)} convert tasks.")
202+
logger.info(
203+
f"Finished invoking {len(convert_tasks_pending)} convert tasks, waiting for results..."
204+
)
197205

198206
convert_results: List[ConvertResult] = ray.get(convert_tasks_pending)
199-
logger.info(f"Got {len(convert_tasks_pending)} convert tasks.")
207+
logger.info(f"Got {len(convert_tasks_pending)} convert task results.")
200208

201209
total_position_delete_record_count = sum(
202210
convert_result.position_delete_record_count

deltacat/compute/converter/pyiceberg/overrides.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ def fetch_all_bucket_files(
259259
# and collect their partition values
260260
target_partition_values = set()
261261
all_partition_values = []
262-
all_table_partition_values = []
262+
263263
for manifest_entry in chain(
264264
*executor.map(
265265
lambda args: _open_manifest(*args),

deltacat/experimental/converter_agent/table_monitor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ def monitor_table(
421421
params=converter_params
422422
)
423423
conversion_end_time = time.time_ns() # Nanosecond precision
424+
logger.info(f"Converter session completed successfully")
424425

425426
logger.info(f"Converter session completed successfully")
426427
current_snapshot_id = snapshot_id

deltacat/tests/compute/converter/integration/conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ def session_catalog() -> Catalog:
7575

7676
@pytest.fixture(autouse=True, scope="module")
7777
def setup_ray_cluster():
78-
ray.init(local_mode=True, ignore_reinit_error=True)
78+
ray.init(
79+
local_mode=True,
80+
ignore_reinit_error=True,
81+
resources={
82+
"convert_task": 10
83+
}, # Provide convert_task resource for converter session
84+
)
7985
yield
8086
ray.shutdown()

deltacat/tests/compute/converter/integration/test_convert_session.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@
5757

5858

5959
@pytest.fixture(scope="session")
60-
def daft_native_runner_session():
60+
def daft_native_runner():
6161
"""
6262
Session-scoped fixture to set Daft to use native runner for converter integration tests.
6363
This is set once per test session and cannot be changed (Daft limitation).
64-
Only applied to tests that explicitly request daft_native_runner.
64+
Tests that need the native runner should explicitly request this fixture.
6565
"""
6666
# Set to native runner only when explicitly requested
6767
# Note: Daft only allows setting runner once per session
@@ -76,17 +76,6 @@ def daft_native_runner_session():
7676
# No teardown needed - Daft doesn't allow changing runner after it's set
7777

7878

79-
@pytest.fixture
80-
def daft_native_runner(daft_native_runner_session):
81-
"""
82-
Per-test fixture that depends on the session-scoped runner setup.
83-
This ensures tests get the native runner without trying to change it.
84-
Tests must explicitly request this fixture to use the native runner.
85-
"""
86-
# Just yield - the actual setup is done by the session fixture
87-
yield
88-
89-
9079
# Test data fixtures
9180
@pytest.fixture
9281
def base_schema():

deltacat/tests/compute/converter/integration/test_converter_commit_conflict_resolution.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
# Task memory in bytes for testing
4141
TASK_MEMORY_BYTES = BASE_MEMORY_BUFFER
4242

43+
4344
# Test data fixtures
4445
@pytest.fixture
4546
def base_schema():
Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,37 @@
11
import pytest
22
import ray
3+
import daft
34

45

56
@pytest.fixture(autouse=True, scope="module")
67
def setup_ray_cluster():
78
"""Set up Ray cluster for table monitor tests."""
8-
ray.init(local_mode=True, ignore_reinit_error=True)
9+
ray.init(
10+
local_mode=True,
11+
ignore_reinit_error=True,
12+
resources={
13+
"convert_task": 10
14+
}, # Provide convert_task resource for converter session
15+
)
916
yield
1017
ray.shutdown()
18+
19+
20+
@pytest.fixture(scope="session")
21+
def daft_native_runner():
22+
"""
23+
Session-scoped fixture to set Daft to use native runner for table monitor tests.
24+
This is set once per test session and cannot be changed (Daft limitation).
25+
Tests that need the native runner should explicitly request this fixture.
26+
"""
27+
# Set to native runner only when explicitly requested
28+
# Note: Daft only allows setting runner once per session
29+
try:
30+
daft.context.set_runner_native()
31+
except Exception as e:
32+
# If runner is already set, that's okay - just log it
33+
print(f"Note: Daft runner already set, continuing with existing runner: {e}")
34+
35+
yield
36+
37+
# No teardown needed - Daft doesn't allow changing runner after it's set

deltacat/tests/experimental/converter_agent/test_table_monitor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,9 @@ def test_stage_in_context(self):
322322
class TestTableMonitorEndToEnd:
323323
"""End-to-end integration test for table monitor."""
324324

325-
def test_table_monitor_with_shared_catalog(self, setup_ray_cluster):
325+
def test_table_monitor_with_shared_catalog(
326+
self, setup_ray_cluster, daft_native_runner
327+
):
326328
"""
327329
Test that table monitor automatically detects and converts data using a shared catalog.
328330

deltacat/tests/storage/main/test_main_storage.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import polars as pl
1010
import numpy as np
1111
import ray
12-
import daft
1312
import ray.data
1413

1514
from deltacat import PartitionKey, PartitionScheme
@@ -7629,12 +7628,10 @@ def test_download_delta_distributed_error_handling(self):
76297628
)
76307629

76317630
# ========== DAFT DISTRIBUTED TESTS ==========
7632-
7631+
@pytest.mark.integration
76337632
def test_download_delta_distributed_daft_basic(self):
76347633
"""Test basic distributed download with DAFT dataset type."""
76357634

7636-
daft.context.set_runner_ray()
7637-
76387635
# Create test data
76397636
test_data = pd.DataFrame(
76407637
{
@@ -7688,13 +7685,10 @@ def test_download_delta_distributed_daft_basic(self):
76887685
), "Column names mismatch"
76897686
pd.testing.assert_frame_equal(downloaded_df, expected_df)
76907687

7688+
@pytest.mark.integration
76917689
def test_download_delta_distributed_daft_with_delta_locator(self):
76927690
"""Test DAFT distributed download using DeltaLocator instead of Delta object."""
76937691

7694-
if ray.is_initialized():
7695-
ray.shutdown()
7696-
ray.init()
7697-
76987692
test_data = pd.DataFrame(
76997693
{
77007694
"id": [12101, 12102, 12103],
@@ -7731,13 +7725,10 @@ def test_download_delta_distributed_daft_with_delta_locator(self):
77317725
expected_df = test_data.sort_values("id").reset_index(drop=True)
77327726
pd.testing.assert_frame_equal(downloaded_df, expected_df)
77337727

7728+
@pytest.mark.integration
77347729
def test_download_delta_distributed_daft_vs_ray_consistency(self):
77357730
"""Test that DAFT and Ray distributed downloads return the same data."""
77367731

7737-
if ray.is_initialized():
7738-
ray.shutdown()
7739-
ray.init()
7740-
77417732
test_data = pd.DataFrame(
77427733
{
77437734
"id": [12501, 12502, 12503, 12504],

deltacat/tests/utils/test_daft.py

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import unittest
2-
import ray
2+
import pytest
33
from deltacat.types.media import ContentEncoding, ContentType
44
from deltacat.utils.daft import (
55
daft_file_to_pyarrow_table,
@@ -162,12 +162,11 @@ def test_read_from_local_single_column_with_row_groups(self):
162162
self.assertEqual(table.num_rows, 10)
163163

164164

165+
@pytest.mark.integration
165166
class TestFilesToDataFrame(unittest.TestCase):
166167
MVP_PATH = "deltacat/tests/utils/data/mvp.parquet"
167168

168169
def test_read_local_files_all_columns(self):
169-
if not ray.is_initialized():
170-
ray.init()
171170
df = files_to_dataframe(
172171
uris=[self.MVP_PATH],
173172
content_encoding=ContentEncoding.IDENTITY.value,
@@ -180,8 +179,6 @@ def test_read_local_files_all_columns(self):
180179
self.assertEqual(table.num_rows, 100)
181180

182181
def test_read_local_files_with_column_selection(self):
183-
if not ray.is_initialized():
184-
ray.init()
185182
df = files_to_dataframe(
186183
uris=[self.MVP_PATH],
187184
content_encoding=ContentEncoding.IDENTITY.value,
@@ -195,8 +192,6 @@ def test_read_local_files_with_column_selection(self):
195192
self.assertEqual(table.num_rows, 100)
196193

197194
def test_read_local_files_does_not_materialize_by_default(self):
198-
if not ray.is_initialized():
199-
ray.init()
200195
df = files_to_dataframe(
201196
uris=[self.MVP_PATH],
202197
content_encoding=ContentEncoding.IDENTITY.value,
@@ -212,8 +207,6 @@ def test_read_local_files_does_not_materialize_by_default(self):
212207
self.assertEqual(len(df), 100)
213208

214209
def test_supports_unescaped_tsv_content_type(self):
215-
if not ray.is_initialized():
216-
ray.init()
217210
# Test that UNESCAPED_TSV is now supported (was previously unsupported)
218211
# Use a CSV file since we're testing TSV reader functionality
219212
csv_path = "deltacat/tests/utils/data/non_empty_valid.csv"
@@ -230,8 +223,6 @@ def test_supports_unescaped_tsv_content_type(self):
230223
self.assertGreater(len(table.schema.names), 0)
231224

232225
def test_supports_gzip_content_encoding(self):
233-
if not ray.is_initialized():
234-
ray.init()
235226
# Test that GZIP encoding is now supported (was previously unsupported)
236227
df = files_to_dataframe(
237228
uris=[self.MVP_PATH],
@@ -245,8 +236,6 @@ def test_supports_gzip_content_encoding(self):
245236
self.assertEqual(table.num_rows, 100)
246237

247238
def test_raises_error_if_not_supported_content_type(self):
248-
if not ray.is_initialized():
249-
ray.init()
250239
# Test that truly unsupported content types raise NotImplementedError
251240
self.assertRaises(
252241
NotImplementedError,
@@ -259,8 +248,6 @@ def test_raises_error_if_not_supported_content_type(self):
259248
)
260249

261250
def test_raises_error_if_not_supported_content_encoding(self):
262-
if not ray.is_initialized():
263-
ray.init()
264251
# Test that truly unsupported content encodings raise NotImplementedError
265252
self.assertRaises(
266253
NotImplementedError,
@@ -273,8 +260,6 @@ def test_raises_error_if_not_supported_content_encoding(self):
273260
)
274261

275262
def test_accepts_custom_kwargs(self):
276-
if not ray.is_initialized():
277-
ray.init()
278263
# Test that custom kwargs are passed through to daft.read_parquet
279264
df = files_to_dataframe(
280265
uris=[self.MVP_PATH],
@@ -290,8 +275,6 @@ def test_accepts_custom_kwargs(self):
290275
self.assertEqual(table.num_rows, 100)
291276

292277
def test_accepts_io_config(self):
293-
if not ray.is_initialized():
294-
ray.init()
295278
# Test that io_config parameter is accepted and passed correctly
296279
df = files_to_dataframe(
297280
uris=[self.MVP_PATH],

0 commit comments

Comments
 (0)