Skip to content

Commit 154e4e3

Browse files
authored
Report Generation
* added parameter for report generation * added lint changes * lint changes to import
1 parent 3e99fcb commit 154e4e3

File tree

15 files changed

+273
-27
lines changed

15 files changed

+273
-27
lines changed

DAGify.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
from dagify.converter import Engine
1716
import os
1817
import click
18+
from dagify.converter import Engine
19+
from dagify.converter.report_generator import Report
1920

2021
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
2122

@@ -58,8 +59,14 @@
5859
show_default="{}".format(
5960
os.environ.get("AS_DAG_DIVIDER",
6061
"PARENT_FOLDER")))
62+
@click.option("-r",
63+
"--report-gen",
64+
is_flag=True,
65+
default=False,
66+
help="Generate report in txt and json format which \
67+
gives an overview of job_types converted")
6168

62-
def dagify(source_path, output_path, config_file, templates, dag_divider):
69+
def dagify(source_path, output_path, config_file, templates, dag_divider,report_gen):
6370
"""Run dagify."""
6471
print("Demo dagify Engine")
6572

@@ -70,6 +77,13 @@ def dagify(source_path, output_path, config_file, templates, dag_divider):
7077
templates_path=templates,
7178
dag_divider=dag_divider,
7279
)
80+
if report_gen:
81+
Report(
82+
source_path=source_path,
83+
output_path=output_path,
84+
config_file=config_file,
85+
templates_path=templates,
86+
)
7387

7488

7589
if __name__ == '__main__':

config.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ config:
1616
mappings:
1717
- job_type: "command"
1818
template_name: "control-m-command-to-airflow-bash"
19-
#- job_type: "command"
20-
# template_name: "control-m-command-to-airflow-ssh"
19+
- job_type: "command"
20+
template_name: "control-m-command-to-airflow-ssh"
2121
#- job_type: "command"
2222
# template_name: "control-m-command-to-airflow-python"
23-
#- job_type: "DUMMY"
24-
# template_name: "control-m-dummy-to-airflow-dummy"
23+
- job_type: "sample"
24+
template_name: "control-m-dummy-to-airflow-dummy"

dagify/converter/engine.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,10 @@ def convert(self):
294294
def get_template(self, template_name):
295295
# Validate template_name is Provided
296296
if template_name is None:
297-
raise ValueError("dagify: template name must be provided")
298-
template = self.templates.get(template_name, None)
297+
#raise ValueError("dagify: template name must be provided")
298+
template = self.templates.get("control-m-dummy-to-airflow-dummy",None)
299+
else:
300+
template = self.templates.get(template_name, None)
299301
if template is None:
300302
raise ValueError(
301303
f"dagify: no template with name: '{template_name}' was not found among loaded templates.")
@@ -391,11 +393,11 @@ def generate_airflow_dags(self):
391393
for dep in dependencies[dag_divider_value][task]['external']:
392394
ext_task_uf = self.uf.get_task_by_attr("JOBNAME_ORIGINAL", dep)
393395
dependencies_in_dag_external.append({
394-
'task_name': task,
395-
'ext_dag': ext_task_uf.get_attribute(self.dag_divider),
396+
'task_name': task,
397+
'ext_dag': ext_task_uf.get_attribute(self.dag_divider),
396398
'ext_dep_task': dep,
397399
"marker_name": dep + "_marker_" + ''.join(random.choices('0123456789abcdef', k=4))
398-
})
400+
})
399401

400402
# Calculate external upstream dependencies where a task in the current dag depends on another dag's task
401403
# Such a dependency will require a DAG Sensor
@@ -437,7 +439,7 @@ def generate_airflow_dags(self):
437439
dependencies_int=dependencies_in_dag_internal,
438440
dependencies_ext=dependencies_in_dag_external,
439441
upstream_dependencies=upstream_dependencies
440-
)
442+
)
441443
with open(filename, mode="w", encoding="utf-8") as dag_file:
442444
dag_file.write(content)
443445

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
"""Module providing function to manipulate yaml files"""
2+
import yaml
3+
from .utils import (
4+
is_directory,
5+
generate_report,
6+
get_jobtypes_andcount,
7+
generate_json,
8+
format_table_data
9+
)
10+
11+
class Report():
12+
"""Report generating module """
13+
def __init__(
14+
self,
15+
source_path=None,
16+
output_path=None,
17+
templates_path="./templates",
18+
config_file="./config.yaml",
19+
):
20+
self.config_file = config_file
21+
self.config = {}
22+
self.templates = {}
23+
self.source_path = source_path
24+
self.output_path = output_path
25+
self.templates_path = templates_path
26+
27+
# Run the Proccess
28+
self.generate_report()
29+
30+
def generate_report(self):
31+
"""Function that generates the json and txt report"""
32+
templates_to_validate = []
33+
##Config_File_Info parameters
34+
config_job_types = []
35+
config_job_types_count = 0
36+
## Source_file_Info parameters
37+
source_files_count = 1
38+
source_file_info = []
39+
job_types_source= []
40+
job_types_source_count = 0
41+
42+
## Get the Job_types from config_file
43+
config_job_types, config_job_types_count = get_jobtypes_andcount(self.config_file)
44+
45+
46+
## Get the Job_types from source xml
47+
if is_directory(self.source_path) is False:
48+
source_file_info.append(self.source_path.split("/")[-1])
49+
job_types_source, job_types_source_count = get_jobtypes_andcount(self.source_path)
50+
51+
### Get templates INFO
52+
with open(self.config_file, encoding="utf-8") as stream:
53+
try:
54+
self.config = yaml.safe_load(stream)
55+
except yaml.YAMLError as exc:
56+
raise exc
57+
for idx,config in enumerate(self.config["config"]["mappings"]):
58+
# Set Command Uppercase
59+
self.config["config"]["mappings"][idx]["job_type"] = \
60+
self.config["config"]["mappings"][idx]["job_type"].upper()
61+
templates_to_validate.append(self.config["config"]["mappings"][idx]["template_name"])
62+
63+
64+
## Statistics Info parameters
65+
job_types_converted,job_types_not_converted,converted_percentage, \
66+
non_converted_percentage = \
67+
self.get_statistics(job_types_source,config_job_types)
68+
## Table Info
69+
statistics= [
70+
f"Percentage of Jobtypes converted: {converted_percentage}%",
71+
f"Percentage of Jobtypes not converted: {non_converted_percentage}%"
72+
]
73+
title = "DAGIFY REPORT"
74+
columns = ["TASK","INFO","COUNT"]
75+
rows = [
76+
["Source_files", source_file_info, source_files_count],
77+
["Source_File_Job_Types", job_types_source, job_types_source_count],
78+
["Config_File_Job_Types", config_job_types, config_job_types_count],
79+
["Job_Types_Converted",job_types_converted,len(job_types_converted)],
80+
["Job_types_Not_Converted",job_types_not_converted,len(job_types_not_converted)],
81+
["Templates_validated", templates_to_validate, len(templates_to_validate)]
82+
]
83+
formatted_table_data = format_table_data(title,columns,rows)
84+
85+
warning_line = "NOTE: If the job_type \
86+
is not defined in the config.yaml or \
87+
if the job_type does not have a matching template defined, \
88+
it would be by default converted into a DUMMYOPERATOR"
89+
90+
generate_json(statistics,formatted_table_data,self.output_path)
91+
generate_report(statistics,title, columns, rows, warning_line,self.output_path)
92+
93+
def get_statistics(self,source_jt, config_jt):
94+
"""Function to caluculate the percentage conversion"""
95+
converted_percent = 0
96+
non_converted_percent = 0
97+
98+
## Conversion Info
99+
job_types_converted = list(set(config_jt) & set(source_jt))
100+
job_types_not_converted = list(set(source_jt) - set(config_jt))
101+
102+
## Percentages
103+
non_converted_percent = (len(job_types_not_converted)/len(source_jt))*100
104+
converted_percent = 100 - non_converted_percent
105+
106+
return job_types_converted,job_types_not_converted,converted_percent,non_converted_percent

dagify/converter/utils.py

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414

1515
import re
1616
import os
17-
import yaml
1817
import pprint
18+
import xml.etree.ElementTree as ET
19+
import json
20+
import yaml
21+
from prettytable import PrettyTable
22+
1923

2024

2125
def clean_converter_type(converter_type):
@@ -102,3 +106,105 @@ def display_dict(dict):
102106
dict (dict): The dictionary to print.
103107
"""
104108
pprint.pprint(dict)
109+
110+
def count_yaml_files(directory, case_sensitive=True, recursive=False):
111+
"""
112+
Counts the number of YAML files (.yaml or .yml) in a directory.
113+
114+
Args:
115+
directory (str): The path to the directory to search.
116+
case_sensitive (bool): Whether the search should be case-sensitive (default: True).
117+
recursive (bool): Whether to search subdirectories recursively (default: False).
118+
119+
Returns:
120+
int: The number of YAML files found.
121+
"""
122+
123+
count = 0
124+
for root, dirs, files in os.walk(directory):
125+
for file in files:
126+
if (case_sensitive and file.endswith(('.yaml', '.yml'))) or \
127+
(not case_sensitive and file.lower().endswith(('.yaml', '.yml'))):
128+
count += 1
129+
if not recursive:
130+
break # Stop after the first level if not recursive
131+
132+
return count
133+
134+
def generate_report(lines,title,columns,rows,warning_line,output_dir):
135+
""" Function to open a file and write the contents of the report in the file """
136+
report = PrettyTable()
137+
report.title = title
138+
i=0
139+
# Column config
140+
report.field_names = columns
141+
for col in columns:
142+
report.align[col] = "l"
143+
144+
# Row config
145+
report.add_rows(rows)
146+
147+
148+
report_file = f"{output_dir}/Detailed-Report.txt"
149+
with open(report_file, "w") as final_report:
150+
for line in lines:
151+
final_report.write(line + '\n')
152+
final_report.write(str(report) + '\n')
153+
final_report.write(warning_line)
154+
155+
def get_jobtypes_andcount(source_path):
156+
"""Generic function that calculates the job_types and the count from any input"""
157+
unique_job_types = []
158+
job_types_source = []
159+
job_types_count = 0
160+
if source_path.endswith('.xml'):
161+
tree = ET.parse(source_path)
162+
root = tree.getroot()
163+
# Find all JOB elements
164+
job_elements = root.findall('.//JOB')
165+
# Extract TASKTYPE values and store them in a set to ensure uniqueness
166+
job_types_source = list({job.get('TASKTYPE') for job in job_elements})
167+
## Convert all to lowercase for comparision
168+
job_types_source = [item.lower() for item in job_types_source]
169+
unique_job_types = list(set(job_types_source))
170+
job_types_count = len(unique_job_types)
171+
elif source_path.endswith('config.yaml'):
172+
with open(source_path, 'r') as file:
173+
data = yaml.safe_load(file)
174+
for mapping in data['config']['mappings']:
175+
job_types_source.append(mapping['job_type'])
176+
177+
## Convert all to lowercase for comparision
178+
job_types_source = [item.lower() for item in job_types_source]
179+
unique_job_types = list(set(job_types_source))
180+
job_types_count = len(unique_job_types)
181+
return unique_job_types,job_types_count
182+
183+
def format_table_data(title, columns, rows):
184+
"""Formats table data into a JSON-friendly structure"""
185+
186+
table_data = {
187+
"title": title,
188+
"columns": columns,
189+
"rows": []
190+
}
191+
192+
for row in rows:
193+
row_dict = {}
194+
for col_index, value in enumerate(row):
195+
row_dict[columns[col_index]] = value
196+
table_data["rows"].append(row_dict)
197+
198+
return table_data
199+
200+
def generate_json(statistics, table_data, output_file_path):
201+
"""Creates a JSON file with intro text, table data, and conclusion text"""
202+
203+
data = {
204+
"High_Level_Info": statistics,
205+
"table_data": table_data,
206+
}
207+
json_file_path = f"{output_file_path}/report.json"
208+
with open(json_file_path, "w") as json_file:
209+
json.dump(data, json_file, indent=2) # indent for better readability
210+

dagify/test/integration/test_references/009-fast-x/fast_x_reports.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
with DAG(
1515
dag_id="fast_x_reports",
16-
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
16+
start_date=datetime.datetime(2024, 1, 1),
17+
#schedule="@daily",
18+
schedule_interval='*/5 * * * *',
1719
catchup=False,
1820
) as dag:
1921

dagify/test/integration/test_references/010-fast-x/fx_fld_001_app_001_subapp_001.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_001_subapp_001",
16-
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
16+
start_date=datetime.datetime(2024, 1, 1),
17+
#schedule="@daily",
18+
schedule_interval='*/5 * * * *',
1719
catchup=False,
1820
) as dag:
1921

dagify/test/integration/test_references/011-fast-x/fx_fld_001_app_001_subapp_001.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_001_subapp_001",
16-
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
16+
start_date=datetime.datetime(2024, 1, 1),
17+
#schedule="@daily",
18+
schedule_interval='*/5 * * * *',
1719
catchup=False,
1820
) as dag:
1921

@@ -46,8 +48,8 @@
4648

4749
# Airflow Downstream Task Dependencies (external dags)
4850

49-
fx_fld_001_app_002_subapp_002_job_003_marker_a61c = ExternalTaskMarker(
50-
task_id="fx_fld_001_app_002_subapp_002_job_003_marker_a61c",
51+
fx_fld_001_app_002_subapp_002_job_003_marker
52+
task_id="fx_fld_001_app_002_subapp_002_job_003_marker
5153
external_dag_id='fx_fld_001_app_002_subapp_002',
5254
external_task_id='fx_fld_001_app_002_subapp_002_job_003'
5355
)

dagify/test/integration/test_references/011-fast-x/fx_fld_001_app_002_subapp_001.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_002_subapp_001",
16-
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
16+
start_date=datetime.datetime(2024, 1, 1),
17+
#schedule="@daily",
18+
schedule_interval='*/5 * * * *',
1719
catchup=False,
1820
) as dag:
1921

0 commit comments

Comments
 (0)