Skip to content

Commit e4f2b85

Browse files
authored
Merge pull request #308 from dclong/dev
Merge dev into main
2 parents cb92d5e + a2e8ab6 commit e4f2b85

File tree

8 files changed

+200
-34
lines changed

8 files changed

+200
-34
lines changed

dsutil/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
from . import git
44
from . import poetry
55

6-
__version__ = "0.65.4"
6+
__version__ = "0.66.0"

dsutil/hadoop/log.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,31 @@ def _error_priority(line: str) -> tuple[int, str, str]:
232232
:return: The priority of the error line.
233233
"""
234234
patterns = [
235+
(
236+
"(?i)libc.*not found", 1,
237+
"http://www.legendu.net/misc/blog/spark-issue-libc-not-found/"
238+
),
239+
(
240+
r"(?i)ArrowInvalid", 1,
241+
"http://www.legendu.net/misc/blog/Spark-issue:-Pure-Python-code-errors"
242+
),
243+
(
244+
r"(?i)TypeError: .*has no len()", 1,
245+
"http://www.legendu.net/misc/blog/Spark-issue:-Pure-Python-code-errors"
246+
),
247+
(
248+
r"(?i)CalledProcessError: Command .* returned non-zero exit status", 1,
249+
"http://www.legendu.net/misc/blog/Spark-issue:-Pure-Python-code-errors"
250+
),
251+
(
252+
r"(?i)error: Found argument .* which wasn't expected", 1,
253+
"http://www.legendu.net/misc/blog/Spark-issue:-Pure-Python-code-errors"
254+
),
255+
(
256+
r"(?i)RuntimeError: Result vector from pandas_udf was not the required length",
257+
1,
258+
"http://www.legendu.net/misc/blog/Spark-issue:-Pure-Python-code-errors"
259+
),
235260
(
236261
"(?i)object has no attribute", 1,
237262
"http://www.legendu.net/misc/blog/Spark-issue:-Pure-Python-code-errors"
@@ -268,10 +293,6 @@ def _error_priority(line: str) -> tuple[int, str, str]:
268293
"(?i)table not found", 1,
269294
"http://www.legendu.net/misc/blog/spark-issue-table-not-found/"
270295
),
271-
(
272-
"(?i)libc.*not found", 1,
273-
"http://www.legendu.net/misc/blog/spark-issue-libc-not-found/"
274-
),
275296
(
276297
"(?i)SparkContext: A master URL must be set", 1,
277298
"http://www.legendu.net/misc/blog/spark-issue-a-master-url-must-be-set-in-your-configuration/"
@@ -303,19 +324,6 @@ def _error_priority(line: str) -> tuple[int, str, str]:
303324
r"(?i)IllegalArgumentException: System memory \d* must be at least", 1,
304325
"http://www.legendu.net/misc/blog/spark-issue:-IllegalArgumentException:-System-memory-must-be-at-least"
305326
),
306-
(
307-
r"(?i)CalledProcessError: Command .* returned non-zero exit status", 1,
308-
"http://www.legendu.net/misc/blog/spark-issue:-CalledProcessError:-Command-returned-non-zero-exit-status"
309-
),
310-
(
311-
r"(?i)error: Found argument .* which wasn't expected", 1,
312-
"http://www.legendu.net/misc/blog/spark-issue:-error:-Found-argument-which-was-not-expected"
313-
),
314-
(
315-
r"(?i)RuntimeError: Result vector from pandas_udf was not the required length",
316-
1,
317-
"http://www.legendu.net/misc/blog/spark-issue:-RuntimeError:-Result-vector-of-pandas_udf-was-not-the-required-length"
318-
),
319327
(
320328
r"(?i)InvalidResourceRequestException", 1,
321329
"http://www.legendu.net/misc/blog/spark-issue:-InvalidResourceRequestException"

dsutil/hadoop/logf.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Script for fetch and filtering Spark application logs.
22
"""
33
from typing import Optional
4+
from pathlib import Path
45
import re
56
from argparse import ArgumentParser, Namespace
67
import subprocess as sp
@@ -54,6 +55,58 @@ def fetch(args):
5455
filter_(args)
5556

5657

58+
def status(args):
59+
"""Get status of a Spark application.
60+
61+
:param args: A Namespace object containing command-line options.
62+
"""
63+
if "app_id" in args:
64+
cmd = ["yarn", "application", "-status", args.app_id]
65+
sp.run(cmd, check=True)
66+
return
67+
report = """Application Report :
68+
Application-Id : {APP_ID}
69+
Application-Name : {APP_NAME}
70+
Application-Type : {APP_TYPE}
71+
User : {USER}
72+
Queue : {QUEUE}
73+
Application Priority : {PRIORITY}
74+
Start-Time : {START_TIME}
75+
Finish-Time : {FINISH_TIME}
76+
Progress : {PROGRESS}
77+
State : {STATE}
78+
Final-State : {STATUS}
79+
Tracking-URL : {URL}
80+
RPC Port : {PORT}
81+
AM Host : {HOST}
82+
Aggregate Resource Allocation : {RESOURCE}
83+
Log Aggregation Status : {LOG_STATUS}
84+
Diagnostics :
85+
Unmanaged Application : {UNMANAGED}
86+
Application Node Label Expression : {APP_NODE_LABEL}
87+
AM container Node Label Expression : {CON_NODE_LABEL}
88+
"""
89+
with args.log_file.open() as fin:
90+
for line in fin:
91+
if "{APP_ID}" in report:
92+
match = re.search(r"(application_\d+_\d+)", line)
93+
if match:
94+
report = report.replace("{APP_ID}", match.group())
95+
if "{APP_NAME}" in report:
96+
match = re.search(r"--primary-py-file (.*) ", line)
97+
if match:
98+
report = report.replace("{APP_NAME}", match.group())
99+
if "{USER}" in report:
100+
match = re.search(r"local/usercache/(.*)/", line)
101+
if match:
102+
report = report.replace("{USER}", match.group())
103+
if "{STATUS}" in report:
104+
match = re.search(r"Final app status: (.*),", line)
105+
if match:
106+
report = report.replace("{STATUS}", match.group())
107+
print(report)
108+
109+
57110
def parse_args(args=None, namespace=None) -> Namespace:
58111
"""Parse command-line arguments.
59112
@@ -66,9 +119,24 @@ def parse_args(args=None, namespace=None) -> Namespace:
66119
subparsers = parser.add_subparsers(help="Sub commands.")
67120
_subparser_fetch(subparsers)
68121
_subparser_filter(subparsers)
122+
_subparser_status(subparsers)
69123
return parser.parse_args(args=args, namespace=namespace)
70124

71125

126+
def _subparser_status(subparsers):
127+
subparser_status = subparsers.add_parser(
128+
"status", help="filter key information from a Spark/Hive application log."
129+
)
130+
mutex_group = subparser_status.add_mutually_exclusive_group(required=True)
131+
mutex_group.add_argument(
132+
"-i", "--id", "--app-id", dest="app_id", type=str, help="An application ID."
133+
)
134+
mutex_group.add_argument(
135+
"-l", "-f", "--log-file", dest="log_file", type=Path, help="An application ID."
136+
)
137+
subparser_status.set_defaults(func=status)
138+
139+
72140
def _option_filter(subparser) -> None:
73141
subparser.add_argument(
74142
"-k",

dsutil/hadoop/pyspark_submit.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -319,14 +319,20 @@ def _submit_cluster(args, config: dict[str, Any]) -> bool:
319319
if not os.path.isfile(spark_submit):
320320
raise ValueError(f"{spark_submit} does not exist!")
321321
opts = (
322-
"files", "master", "deploy-mode", "queue", "num-executors", "executor-memory",
323-
"driver-memory", "executor-cores", "archives"
322+
"files",
323+
"master",
324+
"deploy-mode",
325+
"queue",
326+
"num-executors",
327+
"executor-memory",
328+
"driver-memory",
329+
"executor-cores",
330+
"archives",
331+
"jars",
324332
)
325333
lines = [config["spark-submit"]] + [
326-
f"--{opt} {config[opt]}" for opt in opts if opt in config
334+
f"--{opt} {config[opt]}" for opt in opts if opt in config and config[opt]
327335
] + [f"--conf {k}={v}" for k, v in config["conf"].items()]
328-
if config["jars"]:
329-
lines.append(f"--jars {config['jars']}")
330336
lines.extend(args.cmd)
331337
for idx in range(1, len(lines)):
332338
lines[idx] = " " * 4 + lines[idx]
@@ -358,11 +364,14 @@ def submit(args: Namespace) -> None:
358364
config["python-local"] = args.python_local
359365
if "files" not in config:
360366
config["files"] = []
361-
config["files"] = _files(config) + args.files
362-
if "jars" not in config:
363-
config["jars"] = ""
364-
if isinstance(config["jars"], (list, tuple)):
365-
config["jars"] = ",".join(config["jars"])
367+
config["files"].extend(args.files)
368+
config["files"] = _files(config)
369+
if "archives" in config:
370+
if isinstance(config["archives"], (list, tuple)):
371+
config["archives"] = ",".join(config["archives"])
372+
if "jars" in config:
373+
if isinstance(config["jars"], (list, tuple)):
374+
config["jars"] = ",".join(config["jars"])
366375
# submit Spark applications
367376
if _submit_local(args, config):
368377
_submit_cluster(args, config)
@@ -407,6 +416,7 @@ def parse_args(args=None, namespace=None) -> Namespace:
407416
help="Specify a path for generating a configration example."
408417
)
409418
mutex_group.add_argument(
419+
"--cmd",
410420
dest="cmd",
411421
nargs="+",
412422
help="The command (of PySpark script) to submit to Spark to run."

dsutil/hash.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,18 @@ def _rmd5(path: Path, res: list[tuple[str]]) -> None:
3535
:param res: A list to record the result.
3636
"""
3737
if path.is_file():
38-
md5sum = hashlib.md5(path.read_bytes()).hexdigest()
38+
try:
39+
md5sum = hashlib.md5(path.read_bytes()).hexdigest()
40+
except:
41+
md5sum = "FAILED!"
3942
line = f"{str(path)}: {md5sum}"
4043
res.append(line)
4144
logger.info(line)
4245
return
43-
for p in path.iterdir():
44-
_rmd5(p, res)
46+
try:
47+
for p in path.iterdir():
48+
_rmd5(p, res)
49+
except:
50+
line = f"{str(path)}: FAILED!"
51+
res.append(line)
52+
logger.info(line)

dsutil/text.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Union
77
import sys
88
from pathlib import Path
9+
import re
910
from loguru import logger
1011

1112

@@ -182,3 +183,74 @@ def prune_json(input: Union[str, Path], output: Union[str, Path] = ""):
182183
else:
183184
fout.write(line)
184185
logger.info("The pruned JSON file is written to {}.", output)
186+
187+
188+
def _filter_num(path: Union[str, Path], pattern: str, num_lines: int):
189+
if isinstance(path, str):
190+
path = Path(path)
191+
results = []
192+
res = []
193+
count = 0
194+
for line in path.open():
195+
if count > 0:
196+
res.append(line)
197+
count -= 1
198+
continue
199+
if re.search(pattern, line):
200+
if res:
201+
results.append(res)
202+
res = []
203+
res.append(line)
204+
count = num_lines
205+
if res:
206+
results.append(res)
207+
return results
208+
209+
210+
def _filter_sp(path: Union[str, Path], pattern: str, sub_pattern: str):
211+
if isinstance(path, str):
212+
path = Path(path)
213+
results = []
214+
res = []
215+
sub = False
216+
for line in path.open():
217+
if sub:
218+
if re.search(sub_pattern, line):
219+
res.append(line)
220+
else:
221+
sub = False
222+
if re.search(pattern, line):
223+
if res:
224+
results.append(res)
225+
res = []
226+
res.append(line)
227+
sub = True
228+
if res:
229+
results.append(res)
230+
return results
231+
232+
233+
def filter(
234+
path: Union[str, Path],
235+
pattern: str,
236+
sub_pattern: str = "",
237+
num_lines: int = 0
238+
) -> list[list[str]]:
239+
"""Filter lines from a file.
240+
A main regex pattern is used to identify main rows.
241+
For each matched main row,
242+
a sub regex pattern or a fixed number of lines can be provided.
243+
If a sub regex pattern is provided,
244+
then lines matching the sub regex pattern following a main line are kept together with the main line.
245+
If a fixed number of lines is provided, e.g., ``num_lines=k``,
246+
then ``k`` additional lines after a main line are kept together with the main line.
247+
248+
:param path: Path to a text file from which to filter lines.
249+
:param pattern: The main regex pattern.
250+
:param sub_pattern: The sub regex pattern (defaults to "", i.e., no sub pattern by default).
251+
:param num_lines: The num of additional lines (0 by default) to keep after a main line.
252+
:return: A list of list of lines.
253+
"""
254+
if sub_pattern:
255+
return _filter_sp(path, pattern=pattern, sub_pattern=sub_pattern)
256+
return _filter_num(path, pattern=pattern, num_lines=num_lines)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "dsutil"
3-
version = "0.65.4"
3+
version = "0.66.0"
44
description = "A utils Python package for data scientists."
55
authors = ["Benjamin Du <[email protected]>"]
66

readme.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Currently, Python 3.7 and 3.8 are supported.
2525
2626
You can download a copy of the latest release and install it using pip.
2727
```bash
28-
pip3 install --user -U https://github.com/dclong/dsutil/releases/download/v0.65.4/dsutil-0.65.4-py3-none-any.whl
28+
pip3 install --user -U https://github.com/dclong/dsutil/releases/download/v0.66.0/dsutil-0.66.0-py3-none-any.whl
2929
```
3030
Or you can use the following command to install the latest master branch
3131
if you have pip 20.0+.
@@ -35,7 +35,7 @@ pip3 install --user -U git+https://github.com/dclong/dsutil@main
3535
Use one of the following commands if you want to install all components of dsutil.
3636
Available additional components are `cv`, `docker`, `pdf`, `jupyter`, `admin` and `all`.
3737
```bash
38-
pip3 install "dsutil[cv] @ https://github.com/dclong/dsutil/releases/download/v0.65.4/dsutil-0.65.4-py3-none-any.whl"
38+
pip3 install "dsutil[cv] @ https://github.com/dclong/dsutil/releases/download/v0.66.0/dsutil-0.66.0-py3-none-any.whl"
3939
# or
4040
pip3 install --user -U "dsutil[all] @ git+https://github.com/dclong/dsutil@main"
4141
```

0 commit comments

Comments
 (0)