5
5
6
6
from hrqb .base .task import PandasPickleTask , QuickbaseUpsertTask , SQLQueryExtractTask
7
7
from hrqb .utils import md5_hash_from_values , normalize_dataframe_dates
8
- from hrqb .utils .quickbase import QBClient
9
8
10
9
11
10
class ExtractDWEmployeeAppointments (SQLQueryExtractTask ):
@@ -18,77 +17,16 @@ def sql_file(self) -> str:
18
17
return "hrqb/tasks/sql/employee_appointments.sql"
19
18
20
19
21
- class ExtractQBLibHREmployeeAppointments (PandasPickleTask ):
22
- """Query Quickbase for data provided by Library HR about employee appointments."""
23
-
24
- stage = luigi .Parameter ("Extract" )
25
-
26
- def get_dataframe (self ) -> pd .DataFrame : # pragma: nocover
27
- qbclient = QBClient ()
28
- return qbclient .get_table_as_df (
29
- qbclient .get_table_id ("LibHR Employee Appointments" )
30
- )
31
-
32
-
33
- class ExtractQBDepartments (PandasPickleTask ):
34
- """Query Quickbase for Department data to merge with Library HR data."""
35
-
36
- stage = luigi .Parameter ("Extract" )
37
-
38
- def get_dataframe (self ) -> pd .DataFrame : # pragma: nocover
39
- qbclient = QBClient ()
40
- return qbclient .get_table_as_df (qbclient .get_table_id ("Departments" ))
41
-
42
-
43
20
class TransformEmployeeAppointments (PandasPickleTask ):
44
- """Combine Data Warehouse and Library HR data for Employee Appointments QB table."""
21
+ """Transform Data Warehouse data for Employee Appointments QB table."""
45
22
46
23
stage = luigi .Parameter ("Transform" )
47
24
48
25
def requires (self ) -> list [luigi .Task ]: # pragma: nocover
49
- return [
50
- ExtractDWEmployeeAppointments (pipeline = self .pipeline ),
51
- ExtractQBLibHREmployeeAppointments (pipeline = self .pipeline ),
52
- ExtractQBDepartments (pipeline = self .pipeline ),
53
- ]
26
+ return [ExtractDWEmployeeAppointments (pipeline = self .pipeline )]
54
27
55
28
def get_dataframe (self ) -> pd .DataFrame :
56
- dw_emp_appts_df = self .named_inputs ["ExtractDWEmployeeAppointments" ].read ()
57
- libhr_df = self .named_inputs ["ExtractQBLibHREmployeeAppointments" ].read ()
58
- depts_df = self .named_inputs ["ExtractQBDepartments" ].read ()
59
-
60
- # filter libhr data to active appointments, with position IDs
61
- libhr_df = libhr_df [(libhr_df ["Active" ]) & ~ (libhr_df ["Position ID" ].isna ())]
62
-
63
- # normalize position id to string and pad zeros
64
- libhr_df ["Position ID" ] = libhr_df ["Position ID" ].apply (
65
- lambda x : str (int (x )).zfill (8 )
66
- )
67
-
68
- # merge data warehouse data with libhr data to create new employee appointments df
69
- emp_appts_df = dw_emp_appts_df .merge (
70
- libhr_df [
71
- [
72
- "Related Employee MIT ID" ,
73
- "Position ID" ,
74
- "Related Supervisor MIT ID" ,
75
- "HC ID" ,
76
- "Related Department ID" ,
77
- "Cost Object" ,
78
- ]
79
- ],
80
- how = "left" ,
81
- left_on = ["position_id" , "mit_id" ],
82
- right_on = ["Position ID" , "Related Employee MIT ID" ],
83
- )
84
-
85
- # merge on departments to get directorates
86
- emp_appts_df = emp_appts_df .merge (
87
- depts_df [["Record ID#" , "Directorate" ]],
88
- how = "left" ,
89
- left_on = "Related Department ID" ,
90
- right_on = "Record ID#" ,
91
- )
29
+ emp_appts_df = self .single_input_dataframe
92
30
93
31
emp_appts_df = normalize_dataframe_dates (
94
32
emp_appts_df ,
@@ -110,20 +48,15 @@ def get_dataframe(self) -> pd.DataFrame:
110
48
111
49
fields = {
112
50
"mit_id" : "MIT ID" ,
113
- "HC ID" : "HC ID" ,
114
51
"employee_type" : "Related Employee Type" ,
115
52
"appt_begin_date" : "Begin Date" ,
116
53
"appt_end_date" : "End Date" ,
117
- "Directorate" : "Related Directorate" ,
118
- "Related Department ID" : "Related Department ID" ,
119
- "Related Supervisor MIT ID" : "Supervisor" ,
120
54
"job_title_long" : "Related Job Title" ,
121
55
"position_title_long" : "Related Position Title" ,
122
56
"job_family" : "Job Family" ,
123
57
"job_subfamily" : "Job Subfamily" ,
124
58
"job_track" : "Job Track" ,
125
59
"position_id" : "Position ID" ,
126
- "Cost Object" : "Cost Object" ,
127
60
"exempt" : "Exempt / NE" ,
128
61
"union_name" : "Union Name" ,
129
62
"term_or_perm" : "Term or Permanent" ,
0 commit comments