Skip to content

IN 1194- merge employee appointment tables #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "pypi"
click = "*"
sentry-sdk = "*"
oracledb = "*"
luigi = "*"
luigi = "3.5.1"
pandas = "*"
pandas-stubs = "*"
attrs = "*"
Expand Down
1,742 changes: 913 additions & 829 deletions Pipfile.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions hrqb/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

__all__ = [
"HRQBLocalTarget",
"PandasPickleTarget",
"QuickbaseTableTarget",
"HRQBPipelineTask",
"HRQBTask",
"PandasPickleTarget",
"PandasPickleTask",
"SQLQueryExtractTask",
"QuickbaseTableTarget",
"QuickbaseUpsertTask",
"SQLQueryExtractTask",
]
73 changes: 3 additions & 70 deletions hrqb/tasks/employee_appointments.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from hrqb.base.task import PandasPickleTask, QuickbaseUpsertTask, SQLQueryExtractTask
from hrqb.utils import md5_hash_from_values, normalize_dataframe_dates
from hrqb.utils.quickbase import QBClient


class ExtractDWEmployeeAppointments(SQLQueryExtractTask):
Expand All @@ -18,77 +17,16 @@ def sql_file(self) -> str:
return "hrqb/tasks/sql/employee_appointments.sql"


class ExtractQBLibHREmployeeAppointments(PandasPickleTask):
"""Query Quickbase for data provided by Library HR about employee appointments."""

stage = luigi.Parameter("Extract")

def get_dataframe(self) -> pd.DataFrame: # pragma: nocover
qbclient = QBClient()
return qbclient.get_table_as_df(
qbclient.get_table_id("LibHR Employee Appointments")
)


class ExtractQBDepartments(PandasPickleTask):
"""Query Quickbase for Department data to merge with Library HR data."""

stage = luigi.Parameter("Extract")

def get_dataframe(self) -> pd.DataFrame: # pragma: nocover
qbclient = QBClient()
return qbclient.get_table_as_df(qbclient.get_table_id("Departments"))


class TransformEmployeeAppointments(PandasPickleTask):
"""Combine Data Warehouse and Library HR data for Employee Appointments QB table."""
"""Transform Data Warehouse data for Employee Appointments QB table."""

stage = luigi.Parameter("Transform")

def requires(self) -> list[luigi.Task]: # pragma: nocover
return [
ExtractDWEmployeeAppointments(pipeline=self.pipeline),
ExtractQBLibHREmployeeAppointments(pipeline=self.pipeline),
ExtractQBDepartments(pipeline=self.pipeline),
]
return [ExtractDWEmployeeAppointments(pipeline=self.pipeline)]

def get_dataframe(self) -> pd.DataFrame:
dw_emp_appts_df = self.named_inputs["ExtractDWEmployeeAppointments"].read()
libhr_df = self.named_inputs["ExtractQBLibHREmployeeAppointments"].read()
depts_df = self.named_inputs["ExtractQBDepartments"].read()

# filter libhr data to active appointments, with position IDs
libhr_df = libhr_df[(libhr_df["Active"]) & ~(libhr_df["Position ID"].isna())]

# normalize position id to string and pad zeros
libhr_df["Position ID"] = libhr_df["Position ID"].apply(
lambda x: str(int(x)).zfill(8)
)

# merge data warehouse data with libhr data to create new employee appointments df
emp_appts_df = dw_emp_appts_df.merge(
libhr_df[
[
"Related Employee MIT ID",
"Position ID",
"Related Supervisor MIT ID",
"HC ID",
"Related Department ID",
"Cost Object",
]
],
how="left",
left_on=["position_id", "mit_id"],
right_on=["Position ID", "Related Employee MIT ID"],
)

# merge on departments to get directorates
emp_appts_df = emp_appts_df.merge(
depts_df[["Record ID#", "Directorate"]],
how="left",
left_on="Related Department ID",
right_on="Record ID#",
)
emp_appts_df = self.single_input_dataframe

emp_appts_df = normalize_dataframe_dates(
emp_appts_df,
Expand All @@ -110,20 +48,15 @@ def get_dataframe(self) -> pd.DataFrame:

fields = {
"mit_id": "MIT ID",
"HC ID": "HC ID",
"employee_type": "Related Employee Type",
"appt_begin_date": "Begin Date",
"appt_end_date": "End Date",
"Directorate": "Related Directorate",
"Related Department ID": "Related Department ID",
"Related Supervisor MIT ID": "Supervisor",
"job_title_long": "Related Job Title",
"position_title_long": "Related Position Title",
"job_family": "Job Family",
"job_subfamily": "Job Subfamily",
"job_track": "Job Track",
"position_id": "Position ID",
"Cost Object": "Cost Object",
"exempt": "Exempt / NE",
"union_name": "Union Name",
"term_or_perm": "Term or Permanent",
Expand Down
177 changes: 0 additions & 177 deletions hrqb/tasks/libhr_employee_appointments.py

This file was deleted.

28 changes: 0 additions & 28 deletions hrqb/tasks/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,3 @@ def requires(self) -> Iterator[luigi.Task]: # pragma: no cover
yield LoadEmployeeLeave(pipeline=self.pipeline_name)
yield LoadPerformanceReviews(pipeline=self.pipeline_name)
yield LoadEmployeeLeaveBalances(pipeline=self.pipeline_name)


class UpdateLibHRData(HRQBPipelineTask):
"""Pipeline to load Library HR employee appointment data from static CSV file.

This pipeline loads the table 'LibHR Employee Appointments', which contains
information known only by Library HR, that we cannot get from the data warehouse.

This Quickbase table is used by the 'Employee Appointments' table to fill in gaps from
warehouse data alone. This pipeline is useful for initial loading and bulk changes,
but this table is primarily managed directly in Quickbase by HR staff.

This pipeline requires a 'csv_filepath' parameter is defined when running, e.g.:
pipenv run hrqb --verbose \
pipeline -p UpdateLibHRData \
--pipeline-parameters=csv_filepath=<PATH/TO/CSV> \
run
"""

csv_filepath = luigi.Parameter()

def requires(self) -> Iterator[luigi.Task]: # pragma: no cover
from hrqb.tasks.libhr_employee_appointments import LoadLibHREmployeeAppointments

yield LoadLibHREmployeeAppointments(
pipeline=self.pipeline_name,
csv_filepath=self.csv_filepath,
)
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ show-fixes = true
select = ["ALL", "PT"]
ignore = [
# default
"ANN101",
"ANN102",
"COM812",
"COM812",
"D107",
"N812",
"PTH",
Expand Down
Loading