Skip to content

Commit 49a417b

Browse files
committed
add an auto sizing of target data file and a simulation diagnostic
1 parent 2e076fc commit 49a417b

6 files changed

Lines changed: 161 additions & 31 deletions

File tree

ice_keeper/ice_keeper.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,21 @@ def reset(force: bool) -> None: # noqa: FBT001
267267
@click.option("--min_age_to_diagnose", default=1, help="Minimum snapshot age (in partition rank) to diagnose (default: 1).")
268268
@click.option("--max_age_to_diagnose", default=72, help="Maximum snapshot age (in partition rank) to diagnose (default: 72).")
269269
@click.option("--optimization_strategy", help="Optional optimization strategy to use during diagnosis.")
270+
@click.option("--target_file_size_bytes", help="Optional target data file size.")
271+
@click.option(
272+
"--mode",
273+
type=click.Choice(["simulate", "dry_run"], case_sensitive=False),
274+
default="simulate",
275+
show_default=True,
276+
help="Mandatory mode choosing either dry_run or simulate.",
277+
)
270278
def diagnose(
271279
full_name: str,
272280
min_age_to_diagnose: int,
273281
max_age_to_diagnose: int,
282+
mode: str,
274283
optimization_strategy: str | None,
284+
target_file_size_bytes: int | None,
275285
) -> int:
276286
"""Diagnose table health by analyzing its partitions.
277287
@@ -287,12 +297,25 @@ def diagnose(
287297
record_copy = record.model_copy()
288298
if optimization_strategy:
289299
record_copy.optimization_strategy = optimization_strategy
300+
if target_file_size_bytes:
301+
record_copy.target_file_size_bytes = target_file_size_bytes
290302
record_copy.min_age_to_optimize = min_age_to_diagnose
291303
record_copy.max_age_to_optimize = max_age_to_diagnose
292304
row = Row(**record_copy.model_dump(by_alias=True))
293305
entry = MaintenanceScheduleRecord.from_row(row).to_entry()
294306
strategy = OptimizationStrategy(entry)
295-
strategy.find_and_optimize_specs(None)
307+
try:
308+
if mode == "dry_run":
309+
strategy.diagnose_partition_specs()
310+
elif mode == "simulate":
311+
strategy.estimate_optimization_results_partition_specs()
312+
else:
313+
msg = f"Invalid option for mode: {mode}"
314+
raise Exception(msg)
315+
except Exception as e: # noqa: BLE001
316+
msg = f"An error occurred while diagnosing table '{full_name}': {e}"
317+
raise click.ClickException(msg) # noqa: B904
318+
296319
else:
297320
# Preserve expected CLI behavior: emit an error and non-zero exit
298321
# when the table is not present in the maintenance schedule.

ice_keeper/task/action/optimization/datafile_summary.py

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,14 @@ def make_age_filter_stmt(self, min_age_to_optimize: int, max_age_to_optimize: in
277277

278278
return filter_stmt
279279

280-
def create_summary_stmt(self) -> str:
280+
def _format_bytes_stmt(self, bytes_column: str) -> str:
281+
return f"""CONCAT(
282+
ROUND({bytes_column} / POWER(1024, FLOOR(LOG(1024, GREATEST({bytes_column}, 1)))), 2),
283+
' ',
284+
ELEMENT_AT(ARRAY('B', 'KB', 'MB', 'GB', 'TB', 'PB'), CAST(FLOOR(LOG(1024, GREATEST({bytes_column}, 1))) AS INT) + 1)
285+
)"""
286+
287+
def create_summary_stmt(self, *, estimate_optimization_results: bool = False) -> str:
281288
"""Generate an SQL query for creating a partition diagnostics summary.
282289
283290
This summary includes metrics such as correlation factors, number of files
@@ -286,8 +293,18 @@ def create_summary_stmt(self) -> str:
286293
Returns:
287294
str: SQL query for analyzing partition health and optimization readiness.
288295
"""
289-
min_file_size_bytes = int(self.mnt_props.target_file_size_bytes * 0.75)
290-
max_file_size_bytes = int(self.mnt_props.target_file_size_bytes * 1.8)
296+
target_file_size_stmt = str(self.mnt_props.target_file_size_bytes)
297+
if self.mnt_props.target_file_size_bytes <= 0:
298+
target_file_size_stmt = """
299+
case
300+
when sum_file_size < 16L * 16 * 1048576 then 16L * 1048576
301+
when sum_file_size < 32L * 32 * 1048576 then 32L * 1048576
302+
when sum_file_size < 64L * 64 * 1048576 then 64L * 1048576
303+
when sum_file_size < 128L * 128 * 1048576 then 128L * 1048576
304+
when sum_file_size < 256L * 256 * 1048576 then 256L * 1048576
305+
when sum_file_size < 512L * 512 * 1048576 then 512L * 1048576
306+
else 1024L end
307+
"""
291308

292309
num_files_targetted_for_rewrite_threshold = 5
293310

@@ -302,6 +319,8 @@ def create_summary_stmt(self) -> str:
302319
order_by = self.spec.make_order_stmt()
303320
age_filter_stmt = self.make_age_filter_stmt(min_age_to_optimize, max_age_to_optimize)
304321

322+
cte_to_use = "final_decision" if not estimate_optimization_results else "final_estimate"
323+
305324
return f"""
306325
-- Diagnosing partitioned table '{self.mnt_props.full_name}' for optimization
307326
-- All data files to consider for optimization.
@@ -344,6 +363,44 @@ def create_summary_stmt(self) -> str:
344363
where
345364
spec_id = {self.spec_id}
346365
),
366+
file_stats_per_partition as (
367+
select
368+
{grouping_stmt},
369+
content,
370+
record_count,
371+
file_size_in_bytes,
372+
rn1,
373+
rn2,
374+
is_data_file_from_widening_src_partition,
375+
-- Aggregations for content = 0 (data files)
376+
count_if(content = 0) over (partition by {grouping_stmt}) as n_files,
377+
sum(case when content = 0 then record_count end) over (partition by {grouping_stmt}) as n_records,
378+
avg(case when content = 0 then file_size_in_bytes end) over (partition by {grouping_stmt}) as avg_file_size,
379+
min(case when content = 0 then file_size_in_bytes end) over (partition by {grouping_stmt}) as min_file_size,
380+
max(case when content = 0 then file_size_in_bytes end) over (partition by {grouping_stmt}) as max_file_size,
381+
sum(case when content = 0 then file_size_in_bytes end) over (partition by {grouping_stmt}) as sum_file_size
382+
from
383+
ranked_data_files
384+
),
385+
target_file_size_per_partition as (
386+
select
387+
{grouping_stmt},
388+
content,
389+
record_count,
390+
file_size_in_bytes,
391+
rn1,
392+
rn2,
393+
is_data_file_from_widening_src_partition,
394+
n_files,
395+
n_records,
396+
avg_file_size,
397+
min_file_size,
398+
max_file_size,
399+
sum_file_size,
400+
{target_file_size_stmt} as target_file_size
401+
from
402+
file_stats_per_partition
403+
),
347404
-- Aggregate the metrics per partition.
348405
agg_data_files as (
349406
select
@@ -354,20 +411,22 @@ def create_summary_stmt(self) -> str:
354411
{self.spec.make_to_json_stmt()} as partition_desc,
355412
356413
-- Aggregations for content = 0 (data files)
357-
count_if(content = 0) as n_files,
414+
first(n_files) as n_files,
415+
416+
first(n_records) as n_records,
417+
first(avg_file_size) as avg_file_size,
418+
first(min_file_size) as min_file_size,
419+
first(max_file_size) as max_file_size,
420+
first(sum_file_size) as sum_file_size,
421+
422+
first(target_file_size) as target_file_size,
358423
359424
count_if(
360425
content = 0 and
361-
(file_size_in_bytes < {min_file_size_bytes}
362-
or file_size_in_bytes > {max_file_size_bytes})
426+
(file_size_in_bytes < int(target_file_size * 0.75)
427+
or file_size_in_bytes > int(target_file_size * 1.8))
363428
) as num_files_targetted_for_rewrite,
364429
365-
sum(case when content = 0 then record_count end) as n_records,
366-
avg(case when content = 0 then file_size_in_bytes end) as avg_file_size,
367-
min(case when content = 0 then file_size_in_bytes end) as min_file_size,
368-
max(case when content = 0 then file_size_in_bytes end) as max_file_size,
369-
sum(case when content = 0 then file_size_in_bytes end) as sum_file_size,
370-
371430
count_if(
372431
content = 0 and
373432
is_data_file_from_widening_src_partition = true
@@ -387,19 +446,20 @@ def create_summary_stmt(self) -> str:
387446
388447
sum(case when content > 0 then record_count else 0 end) as n_delete_records
389448
from
390-
ranked_data_files
449+
target_file_size_per_partition
391450
group by
392451
{grouping_stmt}
393452
),
394453
-- Add should optimize flags to the aggregate.
395-
final as (
454+
final_decision as (
396455
select
397456
{grouping_stmt},
398457
partition_age,
399458
partition_desc,
400459
n_files,
401460
num_files_targetted_for_rewrite,
402461
n_records,
462+
target_file_size,
403463
avg_file_size,
404464
min_file_size,
405465
max_file_size,
@@ -419,7 +479,27 @@ def create_summary_stmt(self) -> str:
419479
{age_filter_stmt}
420480
order by
421481
{order_by}
482+
),
483+
final_estimate as (
484+
select
485+
{grouping_stmt},
486+
partition_age,
487+
{self._format_bytes_stmt("sum_file_size")} as partition_size,
488+
{self._format_bytes_stmt("avg_file_size")} as avg_file_size,
489+
{self._format_bytes_stmt("target_file_size")} as target_file_size,
490+
n_files as partition_num_files,
491+
int(sum_file_size / target_file_size) as partition_target_num_files,
492+
sum(n_files) over(partition by partition_age) as num_files_per_age,
493+
sum(int(sum_file_size / target_file_size)) over(partition by partition_age) as target_num_files_per_age
494+
from
495+
agg_data_files
496+
where
497+
{age_filter_stmt}
498+
order by
499+
partition_age asc,
500+
n_files desc,
501+
avg_file_size desc
422502
)
423503
424-
select * from final
504+
select * from {cte_to_use}
425505
"""

ice_keeper/task/action/optimization/optimization.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ice_keeper.table import PartitionHealth
1313
from ice_keeper.task import SparkTask
1414
from ice_keeper.task.action.action import ActionStrategy, ActionTask
15+
from ice_keeper.task.action.optimization.partition_summary import DataFilesSummary
1516
from ice_keeper.task.task import SubTaskExecutor
1617
from ice_keeper.zorder_udf import zorder2Tuple
1718

@@ -120,7 +121,7 @@ def execute_statement(self, sub_executor: SubTaskExecutor, sql_stm: str) -> dict
120121
self.disable_journaling()
121122
return {}
122123

123-
def find_and_optimize_specs(self, sub_executor: SubTaskExecutor | None) -> None:
124+
def find_and_optimize_specs(self, sub_executor: SubTaskExecutor) -> None:
124125
# Register UDF in this new Spark session. We might use it to diagnose the table.
125126

126127
udf = pandas_udf(zorder2Tuple, returnType=BinaryType()) # type: ignore[call-overload]
@@ -133,11 +134,7 @@ def find_and_optimize_specs(self, sub_executor: SubTaskExecutor | None) -> None:
133134
did_some_optimizations = False
134135
# Collect partition summary for the spec_id
135136
summary = PartitionSummary(self.mnt_props, spec_id, self.get_widening_rule(spec_id))
136-
if sub_executor:
137-
summary.show(100)
138-
else:
139-
# In diagnostic mode, we want to show the full summary in logs for debugging purposes
140-
summary.show(10000)
137+
summary.show(100)
141138

142139
try:
143140
# Diagnose the partitions for optimization opportunities
@@ -147,19 +144,47 @@ def find_and_optimize_specs(self, sub_executor: SubTaskExecutor | None) -> None:
147144
if len(rows) > 0:
148145
rows_log_debug(rows, f"Partitions to optimize in {self.mnt_props.full_name}")
149146
did_some_optimizations = True
150-
if sub_executor:
151-
self._execute_sub_tasks(sub_executor, rows, spec_id)
147+
self._execute_sub_tasks(sub_executor, rows, spec_id)
152148
else:
153149
logger.debug("All partitions in spec_id: %s are healthy", spec_id)
154150

155-
if sub_executor:
156-
# In the context of executing optimization, we want to save the results back to the partition health table
157-
partition_health = PartitionHealth()
158-
summary.save_diff(partition_health, did_some_optimizations=did_some_optimizations)
151+
# In the context of executing optimization, we want to save the results back to the partition health table
152+
partition_health = PartitionHealth()
153+
summary.save_diff(partition_health, did_some_optimizations=did_some_optimizations)
159154
finally:
160155
logger.debug("END Optimizing spec_id: %s", spec_id)
161156
summary.uncache_views(did_some_optimizations=did_some_optimizations)
162157

158+
def diagnose_partition_specs(self) -> None:
159+
# Register UDF in this new Spark session. We might use it to diagnose the table.
160+
udf = pandas_udf(zorder2Tuple, returnType=BinaryType()) # type: ignore[call-overload]
161+
STL.get().udf.register("zorder2Tuple", udf)
162+
163+
unique_spec_ids = self._find_specs_to_optimize()
164+
for spec_id in unique_spec_ids:
165+
logger.debug("START Diagnosing spec_id: %s -> %s", spec_id, self.mnt_props.partition_specs[spec_id])
166+
spec = self.mnt_props.partition_specs[spec_id]
167+
widening_rule = self.get_widening_rule(spec_id)
168+
datafiles_summary = DataFilesSummary(self.mnt_props, spec, spec_id, widening_rule)
169+
sql = datafiles_summary.create_summary_stmt()
170+
rows = STL.sql_and_log(sql, "Retrieve rows from partition summary").take(10000)
171+
rows_log_debug(rows, f"Diagnostic Partition Summary of {self.mnt_props.full_name}, spec: {spec}")
172+
173+
def estimate_optimization_results_partition_specs(self) -> None:
174+
# Register UDF in this new Spark session. We might use it to diagnose the table.
175+
udf = pandas_udf(zorder2Tuple, returnType=BinaryType()) # type: ignore[call-overload]
176+
STL.get().udf.register("zorder2Tuple", udf)
177+
178+
unique_spec_ids = self._find_specs_to_optimize()
179+
for spec_id in unique_spec_ids:
180+
logger.debug("START Diagnosing spec_id: %s -> %s", spec_id, self.mnt_props.partition_specs[spec_id])
181+
spec = self.mnt_props.partition_specs[spec_id]
182+
widening_rule = self.get_widening_rule(spec_id)
183+
datafiles_summary = DataFilesSummary(self.mnt_props, spec, spec_id, widening_rule)
184+
sql = datafiles_summary.create_summary_stmt(estimate_optimization_results=True)
185+
rows = STL.sql_and_log(sql, "Retrieve rows from partition summary").take(10000)
186+
rows_log_debug(rows, f"Diagnostic Partition Summary of {self.mnt_props.full_name}, spec: {spec}")
187+
163188
def create_widening_rule_if_any(self) -> None | WideningRule:
164189
"""Attach a widening rule to the partition specs, if defined in the table configuration.
165190

tests/config/ice-keeper.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ partition_health_table_name: local.admin_test.partition_health
44
journal_table_name: local.admin_test.journal
55
storage_inventory_report_table_name: local.admin_test.storage_inventory_report
66
notification_email_fallback: admin@hostname.com
7-
logging_config_file: ./config/logging_config.yaml
7+
logging_config_file: ./tests/config/logging_config.yaml

tests/config/logging_config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ formatters:
99
handlers:
1010
console:
1111
class: logging.StreamHandler
12-
level: INFO
12+
level: DEBUG
1313
formatter: simple
1414
stream: ext://sys.stdout
1515

1616
loggers:
1717
ice-keeper:
18-
level: INFO
18+
level: DEBUG
1919
handlers: [console]
2020
propagate: no
2121

tests/integration/test_optimize.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,8 @@ def test_diagnose(executor: TaskExecutor) -> None:
964964
"14",
965965
"--optimization_strategy",
966966
"id ASC",
967+
"--target_file_size_bytes",
968+
"-1",
967969
],
968970
)
969971

0 commit comments

Comments
 (0)