Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 175 additions & 98 deletions ntd/annual_ridership_report/annual_ridership_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,109 +89,182 @@ def sum_by_group(df: pd.DataFrame, group_cols: list) -> pd.DataFrame:

return grouped_df

def produce_annual_ntd_ridership_data_by_rtpa(min_year:str):
def ntd_id_to_rtpa_crosswalk(split_scag:bool) -> pd.DataFrame:
"""
Function that ingest ridership data from `dim_annual_service_agencies`, filters for CA agencies.
Merges in ntd_id_to_RTPA_crosswalk. Aggregates by agency, mode and TOS. calculates change in UPT.
Creates ntd_id to rtpa crosswalk. Reads in dim_orgs, merge in county data from bridge table.
enable split_scag to separate the SCAG to individual county CTC for RTPA. disable split_scag to have all socal counties keep SCAG as RTPA

"""
#split socal counties to county CTC
socal_county_dict = {
"Ventura": "Ventura County Transportation Commission",
"Los Angeles": "Los Angeles County Metropolitan Transportation Authority",
"San Bernardino": "San Bernardino County Transportation Authority",
"Riverside": "Riverside County Transportation Commission",
"Orange": "Orange County Transportation Authority",
}

# Get agencies and RTPA name
ntd_rtpa_orgs = (
tbls.mart_transit_database.dim_organizations()
>> filter(
_._is_current == True,
_.ntd_id_2022.notna(),
_.rtpa_name.notna(),
)
>> select(
_.name,
_.ntd_id_2022,
_.rtpa_name,
_.mpo_name,
_.key
)
>> collect()
)

# join bridge org county geo to get agency counties
bridge_counties = (
tbls.mart_transit_database.bridge_organizations_x_headquarters_county_geography()
>> filter(
_._is_current == True
)
>> select(
_.county_geography_name,
_.organization_key
)
>> collect()
)

# merge to get crosswalk
ntd_to_rtpa_crosswalk = ntd_rtpa_orgs.merge(
bridge_counties,
left_on="key",
right_on="organization_key",
how="left"
)

# locate SoCal counties, replace initial RTPA name with dictionary.
if split_scag == True:
ntd_to_rtpa_crosswalk.loc[
ntd_to_rtpa_crosswalk["county_geography_name"].isin(
socal_county_dict.keys()
),
"rtpa_name",
] = ntd_to_rtpa_crosswalk["county_geography_name"].map(socal_county_dict)

return ntd_to_rtpa_crosswalk

def produce_annual_ntd_ridership_data_by_rtpa(min_year: str, split_scag: bool) -> pd.DataFrame:
"""
Function that ingest time series ridership data from `mart_ntd_funding_and_expenses.fct_service..._by_mode_upt`.
Filters for CA agencies with last report year and year of data greater than min_year
Merges in ntd_id_to_rtpa_crosswalk function. Aggregates by agency, mode and TOS. calculates change in UPT.
"""
from annual_ridership_module import add_change_columns


print("ingest annual ridership data from warehouse")

ntd_service =(
tbls.mart_ntd_funding_and_expenses.fct_service_data_and_operating_expenses_time_series_by_mode_upt()
>> filter(
_.year >= min_year,
_.last_report_year >= min_year,
_.primary_uza_name.str.contains(", CA") |
_.primary_uza_name.str.contains("CA-NV") |
_.primary_uza_name.str.contains("California Non-UZA")
)
>> select(
'source_agency',
'agency_status',
'legacy_ntd_id',
'last_report_year',
'mode',
'ntd_id',
'reporter_type',
'reporting_module',
'service',
'uace_code',
'primary_uza_name',
'uza_population',
'year',
'upt',
)
>> collect())

ntd_service = (
tbls.mart_ntd_funding_and_expenses.fct_service_data_and_operating_expenses_time_series_by_mode_upt()
>> filter(_.state.str.contains("CA") |
_.state.str.contains("NV"), # to get lake Tahoe Transportation back
_.year >= min_year,
_.city != None,
_.primary_uza_name.str.contains(", CA") |
_.primary_uza_name.str.contains("CA-NV") |
_.primary_uza_name.str.contains("California Non-UZA") |
_.primary_uza_name.str.contains("El Paso, TX--NM") # something about Paso
)
>> select(
'agency_name',
'agency_status',
'city',
'legacy_ntd_id',
'mode',
'ntd_id',
'reporter_type',
'reporting_module',
'service',
'state',
'uace_code',
'primary_uza_name',
'uza_population',
'year',
'upt',
ntd_service.groupby(
[
"source_agency",
"agency_status",
#"city",
#"state",
"ntd_id",
"primary_uza_name",
"reporter_type",
"mode",
"service",
"last_report_year",
"year",
]
)
.agg({"upt": "sum"})
.sort_values(by="ntd_id")
.reset_index()
)
>> collect())

ntd_service = ntd_service.groupby(
[
"agency_name",
'agency_status',
"city",
"state",
"ntd_id",
'primary_uza_name',
"reporter_type",
"mode",
"service",
"year"
]
).agg({
"upt":"sum"
}).sort_values(by="ntd_id").reset_index()

print("create crosswalk from ntd_id_to_rtpa_crosswalk function")

print("read in new `ntd_id_to_rtpa_all_reporter_types` crosswalk")
# Creating crosswalk using function, enable splitting scag to indivdual CTC
ntd_to_rtpa_crosswalk = ntd_id_to_rtpa_crosswalk(split_scag=split_scag)

ntd_to_rtpa_crosswalk = pd.read_parquet(f"{GCS_FILE_PATH}ntd_id_rtpa_crosswalk_all_reporter_types.parquet")

print("merge ntd data to crosswalk")

print("merge ntd data to crosswalk")
# merge service data to crosswalk
ntd_data_by_rtpa = ntd_service.merge(
ntd_to_rtpa_crosswalk,
how="left",
on=[
"ntd_id",
#"agency", "reporter_type", "city" # sometime agency name, reporter type and city name change or are inconsistent, causing possible fanout
],
indicator=True
).rename(
columns={
"actual_vehicles_passenger_car_revenue_hours":"vrh",
"actual_vehicles_passenger_car_revenue_miles":"vrm",
"unlinked_passenger_trips_upt":"upt",
'agency_name_x':"agency_name",
'agency_status_x':"agency_status",
'city_x':"city",
'state_x':"state",
'reporter_type_x':"reporter_type",
"agency_name_y":"xwalk_agency_name",
'reporter_type_y':"xwalk_reporter_type",
'agency_status_y':"xwalk_agency_status",
'city_y':"xwalk_city",
'state_y':"xwalk_state",
}
ntd_to_rtpa_crosswalk,
how="left",
left_on=[
"ntd_id",
# "agency", "reporter_type", "city" # sometime agency name, reporter type and city name change or are inconsistent, causing possible fanout
],
right_on="ntd_id_2022",
indicator=True,
)

# list of ntd_id with LA County Dept of Public Works name
lacdpw_list = [
"90269",
"90270",
"90272",
"90273",
"90274",
"90275",
"90276",
"90277",
"90278",
"90279",
]

# replace LA County Public Works agencies with their own RTPA
ntd_data_by_rtpa.loc[
ntd_data_by_rtpa["ntd_id"].isin(lacdpw_list), ["rtpa_name", "_merge"]
] = ["Los Angeles County Department of Public Works", "both"]

print(ntd_data_by_rtpa._merge.value_counts())

if len(ntd_data_by_rtpa[ntd_data_by_rtpa._merge=="left_only"]) > 0:
raise ValueError("There are unmerged rows to crosswalk")

print("add `change_column` to data")
ntd_data_by_rtpa = add_change_columns(ntd_data_by_rtpa)
ntd_data_by_rtpa = annual_ridership_module.add_change_columns(ntd_data_by_rtpa)

print("map mode and tos desc.")
ntd_data_by_rtpa = ntd_data_by_rtpa.assign(
mode_full = ntd_data_by_rtpa["mode"].map(NTD_MODES),
service_full = ntd_data_by_rtpa["service"].map(NTD_TOS)
)

print("complete")
return ntd_data_by_rtpa

def save_rtpa_outputs(
Expand Down Expand Up @@ -227,7 +300,7 @@ def save_rtpa_outputs(
#}
print("creating individual RTPA excel files")

for i in df["RTPA"].unique():
for i in df["rtpa_name"].unique():

print(f"creating excel file for: {i}")

Expand All @@ -244,31 +317,31 @@ def save_rtpa_outputs(

#filter data by single RTPA
rtpa_data = (
df[df["RTPA"] == i].sort_values("ntd_id")
.drop(columns=[
"_merge",
"xwalk_agency_name",
"xwalk_reporter_type",
"xwalk_agency_status",
"xwalk_city",
"xwalk_state",
])
df[df["rtpa_name"] == i].sort_values("ntd_id")
# .drop(columns=[
# "_merge",
# "xwalk_agency_name",
# "xwalk_reporter_type",
# "xwalk_agency_status",
# "xwalk_city",
# "xwalk_state",
# ])
# cleaning column names
.rename(columns=lambda x: x.replace("_", " ").title().strip())
# rename columns
#.rename(columns=col_dict)
)
# column lists for aggregations
agency_cols = ["ntd_id", "agency_name", "RTPA"]
mode_cols = ["mode", "RTPA"]
tos_cols = ["service", "RTPA"]
reporter_type = ["reporter_type", "RTPA"]
agency_cols = ["ntd_id", "source_agency", "rtpa_name"]
mode_cols = ["mode", "rtpa_name"]
tos_cols = ["service", "rtpa_name"]
reporter_type = ["reporter_type", "rtpa_name"]

# Creating aggregations
by_agency_long = annual_ridership_module.sum_by_group((df[df["RTPA"] == i]), agency_cols)
by_mode_long = annual_ridership_module.sum_by_group((df[df["RTPA"] == i]), mode_cols)
by_tos_long = annual_ridership_module.sum_by_group((df[df["RTPA"] == i]), tos_cols)
by_reporter_type_long = annual_ridership_module.sum_by_group((df[df["RTPA"] == i]), reporter_type)
by_agency_long = annual_ridership_module.sum_by_group((df[df["rtpa_name"] == i]), agency_cols)
by_mode_long = annual_ridership_module.sum_by_group((df[df["rtpa_name"] == i]), mode_cols)
by_tos_long = annual_ridership_module.sum_by_group((df[df["rtpa_name"] == i]), tos_cols)
by_reporter_type_long = annual_ridership_module.sum_by_group((df[df["rtpa_name"] == i]), reporter_type)

# writing pages to excel file
with pd.ExcelWriter(
Expand Down Expand Up @@ -319,14 +392,18 @@ def remove_local_outputs(


if __name__ == "__main__":
min_year="2018"
min_year=2018

df = produce_annual_ntd_ridership_data_by_rtpa(min_year)
df = produce_annual_ntd_ridership_data_by_rtpa(min_year=min_year, split_scag=True)
print("saving parqut to private GCS")

df.to_parquet(f"{GCS_FILE_PATH}annual_ridership_report_data.parquet")

os.makedirs(f"./{YEAR}_{MONTH}/")


print("saving RTPA outputs")
save_rtpa_outputs(df, YEAR, MONTH, upload_to_public = True)
remove_local_outputs(YEAR, MONTH)

print("removing local folder")
remove_local_outputs(YEAR, MONTH)
print("complete")
Loading
Loading