|
2 | 2 | """ |
3 | 3 | from __future__ import annotations |
4 | 4 | from typing import Union |
| 5 | +import sys |
| 6 | +import datetime |
5 | 7 | from pyspark.sql import DataFrame, Window |
6 | 8 | from pyspark.sql.functions import col, spark_partition_id, rank, coalesce, lit, max, sum |
7 | 9 |
|
@@ -71,3 +73,27 @@ def calc_global_rank(frame: DataFrame, order_by: Union[str, list[str]]) -> DataF |
71 | 73 | ["part_id"], |
72 | 74 | ).withColumn("rank", |
73 | 75 | col("local_rank") + col("sum_factor")) |
| 76 | + |
| 77 | + |
| 78 | +def repart_hdfs(spark, path: str, num_parts: int) -> None: |
| 79 | + """Repartition a HDFS path of the Parquet format. |
| 80 | +
|
| 81 | + :param spark: A SparkSession object. |
| 82 | + :param path: The HDFS path to repartition. |
| 83 | + :param num_parts: The new number of partitions. |
| 84 | + """ |
| 85 | + path = path.rstrip("/") |
| 86 | + ts = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f") |
| 87 | + path_tmp = path + f"_repart_tmp_{ts}" |
| 88 | + spark.read.parquet(path).repartition(num_parts) \ |
| 89 | + .write.mode("overwrite").parquet(path_tmp) |
| 90 | + sc = spark.sparkContext |
| 91 | + fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration()) # pylint: disable=W0212 |
| 92 | + if fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True): # pylint: disable=W0212 |
| 93 | + if not fs.rename( |
| 94 | + sc._jvm.org.apache.hadoop.fs.Path(path_tmp), # pylint: disable=W0212 |
| 95 | + sc._jvm.org.apache.hadoop.fs.Path(path), # pylint: disable=W0212 |
| 96 | + ): |
| 97 | + sys.exit(f"Failed to rename the HDFS path {path_tmp} to {path}!") |
| 98 | + else: |
| 99 | + sys.exit(f"Failed to remove the (old) HDFS path: {path}!") |
0 commit comments