Skip to content

Commit 459e3d2

Browse files
authored
Merge pull request #312 from dclong/dev
Merge dev into main
2 parents 56163c4 + 22488ea commit 459e3d2

File tree

7 files changed

+254
-292
lines changed

7 files changed

+254
-292
lines changed

dsutil/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
from . import git
44
from . import poetry
55

6-
__version__ = "0.66.2"
6+
__version__ = "0.67.0"

dsutil/filesystem.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55
from typing import Union, Iterable, Callable
66
import os
7+
import sys
78
import re
89
import shutil
910
import math
@@ -417,3 +418,249 @@ def _get_files(dir_: Path, exts: list[str]) -> Iterable[Path]:
417418
yield path
418419
else:
419420
yield from _get_files(path, exts)
421+
422+
423+
def has_header(
424+
files: Union[str, Path, list[Union[str, Path]]],
425+
num_files_checking: int = 5
426+
) -> bool:
427+
"""Check whether the files have headers.
428+
429+
:param files: the list of files to check.
430+
:param num_files_checking: the number of non-empty files to use to decide whether there are header lines.
431+
:return: True if the files have headers and False otherwise.
432+
"""
433+
# i: file index
434+
for i in range(len(files)):
435+
with open(files[i], "r", encoding="utf-8") as fin:
436+
first_line = fin.readline()
437+
if first_line:
438+
possible_header = first_line
439+
break
440+
# k: current number of non-empty files
441+
k = 1
442+
for j in range(i, len(files)):
443+
if k >= num_files_checking:
444+
break
445+
with open(files[j], "r", encoding="utf-8") as fin:
446+
first_line = fin.readline()
447+
if first_line:
448+
k += 1
449+
if first_line != possible_header:
450+
return False
451+
return True
452+
453+
454+
def _merge_with_headers(
455+
files: Union[str, Path, list[Union[str, Path]]],
456+
output: Union[str, Path] = ""
457+
) -> None:
458+
"""Merge files with headers. Keep only one header.
459+
460+
:param files: A list of files
461+
or the path to a directory containing a list of files to merge.
462+
:param output: output files for merging the files.
463+
"""
464+
with open(output, "wb") if output else sys.stdout.buffer as out:
465+
with open(files[0], "rb") as fin0:
466+
for line in fin0:
467+
out.write(line)
468+
for file in files[1:]:
469+
with open(file, "rb") as fin:
470+
fin.readline()
471+
for line in fin:
472+
out.write(line)
473+
474+
475+
def _merge_without_header(
476+
files: Union[str, Path, list[Union[str, Path]]],
477+
output: Union[str, Path] = ""
478+
) -> None:
479+
"""Merge files without header.
480+
481+
:param files: A list of files
482+
or the path to a directory containing a list of files to merge.
483+
:param output: output files for merging the files.
484+
"""
485+
with open(output, "wb") if output else sys.stdout.buffer as fout:
486+
for file in files:
487+
with open(file, "rb") as fin:
488+
for line in fin:
489+
fout.write(line)
490+
fout.write(b"\n")
491+
492+
493+
def merge(
494+
files: Union[str, Path, list[Union[str, Path]]],
495+
output: str = "",
496+
num_files_checking: int = 5
497+
) -> None:
498+
"""Merge files. If there are headers in files, keep only one header in the single merged file.
499+
500+
:param files: A list of files
501+
or the path to a directory containing a list of files to merge.
502+
:param output: output files for merging the files.
503+
:param num_files_checking: number of files for checking whether there are headers in files.
504+
"""
505+
if isinstance(files, str):
506+
files = Path(files)
507+
if isinstance(files, Path):
508+
files = list(files.iterdir())
509+
if num_files_checking <= 0:
510+
num_files_checking = 5
511+
num_files_checking = min(num_files_checking, len(files))
512+
if has_header(files, num_files_checking):
513+
_merge_with_headers(files, output)
514+
return
515+
_merge_without_header(files, output)
516+
517+
518+
def dedup_header(file: Union[str, Path], output: Union[str, Path] = "") -> None:
519+
"""Dedup headers in a file (due to the hadoop getmerge command).
520+
Only the header on the first line is kept and headers (identical line to the first line)
521+
on other lines are removed.
522+
523+
:param file: The path to the file to be deduplicated.
524+
:param output: The path of the output file.
525+
If empty, then output to the standard output.
526+
"""
527+
with open(file, "rb"
528+
) as fin, open(output, "wb") if output else sys.stdout.buffer as fout:
529+
header = fin.readline()
530+
fout.write(header)
531+
for line in fin:
532+
if line != header:
533+
fout.write(line)
534+
535+
536+
def select(
537+
path: Union[str, Path],
538+
columns: Union[str, list[str]],
539+
delimiter: str,
540+
output: str = ""
541+
):
542+
"""Select fields by name from a delimited file (not necessarily well structured).
543+
544+
:param path: To path to a file (containing delimited values in each row).
545+
:param columns: A list of columns to extract from the file.
546+
:param delimiter: The delimiter of fields.
547+
:param output: The path of the output file.
548+
If empty, then output to the standard output.
549+
"""
550+
if isinstance(path, str):
551+
path = Path(path)
552+
if isinstance(columns, str):
553+
columns = [columns]
554+
with path.open("r", encoding="utf-8") as fin:
555+
header = fin.readline().split(delimiter)
556+
index = []
557+
columns_full = []
558+
for idx, field in enumerate(header):
559+
if field in columns:
560+
index.append(idx)
561+
columns_full.append(field)
562+
with (open(output, "w", encoding="utf-8") if output else sys.stdout) as fout:
563+
fout.write(delimiter.join(columns_full) + "\n")
564+
for line in fin:
565+
fields = line.split(delimiter)
566+
fout.write(delimiter.join([fields[idx] for idx in index]) + "\n")
567+
568+
569+
def prune_json(input: Union[str, Path], output: Union[str, Path] = ""):
570+
"""Prune fields (value_counts) from a JSON file.
571+
572+
:param input: The path to a JSON file to be pruned.
573+
:param output: The path to output the pruned JSON file.
574+
"""
575+
logger.info("Pruning the JSON fiel at {}...", input)
576+
if isinstance(input, str):
577+
input = Path(input)
578+
if isinstance(output, str):
579+
if output:
580+
output = Path(output)
581+
else:
582+
output = input.with_name(input.stem + "_prune.json")
583+
skip = False
584+
with input.open("r") as fin, output.open("w") as fout:
585+
for line in fin:
586+
line = line.strip()
587+
if line == '"value_counts": {':
588+
skip = True
589+
continue
590+
if skip:
591+
if line in ("}", "},"):
592+
skip = False
593+
else:
594+
fout.write(line)
595+
logger.info("The pruned JSON file is written to {}.", output)
596+
597+
598+
def _filter_num(path: Union[str, Path], pattern: str, num_lines: int):
599+
if isinstance(path, str):
600+
path = Path(path)
601+
results = []
602+
res = []
603+
count = 0
604+
for line in path.open():
605+
if count > 0:
606+
res.append(line)
607+
count -= 1
608+
continue
609+
if re.search(pattern, line):
610+
if res:
611+
results.append(res)
612+
res = []
613+
res.append(line)
614+
count = num_lines
615+
if res:
616+
results.append(res)
617+
return results
618+
619+
620+
def _filter_sp(path: Union[str, Path], pattern: str, sub_pattern: str):
621+
if isinstance(path, str):
622+
path = Path(path)
623+
results = []
624+
res = []
625+
sub = False
626+
for line in path.open():
627+
if sub:
628+
if re.search(sub_pattern, line):
629+
res.append(line)
630+
else:
631+
sub = False
632+
if re.search(pattern, line):
633+
if res:
634+
results.append(res)
635+
res = []
636+
res.append(line)
637+
sub = True
638+
if res:
639+
results.append(res)
640+
return results
641+
642+
643+
def filter(
644+
path: Union[str, Path],
645+
pattern: str,
646+
sub_pattern: str = "",
647+
num_lines: int = 0
648+
) -> list[list[str]]:
649+
"""Filter lines from a file.
650+
A main regex pattern is used to identify main rows.
651+
For each matched main row,
652+
a sub regex pattern or a fixed number of lines can be provided.
653+
If a sub regex pattern is provided,
654+
then lines matching the sub regex pattern following a main line are kept together with the main line.
655+
If a fixed number of lines is provided, e.g., ``num_lines=k``,
656+
then ``k`` additional lines after a main line are kept together with the main line.
657+
658+
:param path: Path to a text file from which to filter lines.
659+
:param pattern: The main regex pattern.
660+
:param sub_pattern: The sub regex pattern (defaults to "", i.e., no sub pattern by default).
661+
:param num_lines: The num of additional lines (0 by default) to keep after a main line.
662+
:return: A list of list of lines.
663+
"""
664+
if sub_pattern:
665+
return _filter_sp(path, pattern=pattern, sub_pattern=sub_pattern)
666+
return _filter_num(path, pattern=pattern, num_lines=num_lines)

dsutil/hadoop/hdfs.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import annotations
44
from typing import Union
55
from pathlib import Path
6-
import os
76
import subprocess as sp
87
import pandas as pd
98
from loguru import logger
@@ -144,20 +143,6 @@ def get(
144143
f"Content of the HDFS path {hdfs_path} has been fetch into the local directory {local_dir}"
145144
)
146145

147-
@staticmethod
148-
def _file_size_1(path: str, size: int, dir_size: dict[str, int]):
149-
while path != "/":
150-
path = os.path.dirname(path)
151-
dir_size.setdefault(path, 0)
152-
dir_size[path] += size
153-
154-
def _file_size(self, files):
155-
dir_size = {}
156-
for path, bytes_ in files.bytes[~files.permissions.str.
157-
startswith("d")].iteritems():
158-
self._file_size_1(path, bytes_, dir_size)
159-
return dir_size
160-
161146
def count_path(self, path: str) -> pd.Series:
162147
"""Count frequence of paths and their parent paths.
163148
@@ -167,21 +152,6 @@ def count_path(self, path: str) -> pd.Series:
167152
frame = self.ls(path, recursive=True)
168153
return count_path(frame.path)
169154

170-
def size(self, path: str) -> pd.DataFrame:
171-
"""Calculate sizes of subdirs and subfiles under a path.
172-
173-
:param path: A HDFS path.
174-
:return: Size information of the HDFS path as a pandas DataFrame.
175-
"""
176-
files = self.ls(path, recursive=True)
177-
files.set_index("path", inplace=True)
178-
dir_size = self._file_size(files)
179-
bytes_ = pd.Series(dir_size, name="bytes")
180-
files.update(bytes_)
181-
files.reset_index(inplace=True)
182-
files.insert(6, "metabytes", round(files.bytes / 1E6, 2))
183-
return files.sort_values("bytes", ascending=False)
184-
185155
def mkdir(self, path: str) -> None:
186156
"""Create a HDFS path.
187157

dsutil/hadoop/pyspark_submit.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,9 @@ def _final_status(stdout: list[str]) -> str:
225225
:return: The final status (SUCCEED or FAILED) of the Spark application.
226226
"""
227227
for line in reversed(stdout):
228-
if "final status: " in line:
229-
return line.split(": ")[-1]
228+
match = re.search("final status: ([A-Z]+)", line)
229+
if match:
230+
return match.group(1)
230231
return ""
231232

232233

0 commit comments

Comments
 (0)