diff --git a/etls/loadHalloween/mapping.ipynb b/etls/loadHalloween/mapping.ipynb new file mode 100644 index 00000000..1c54b6c4 --- /dev/null +++ b/etls/loadHalloween/mapping.ipynb @@ -0,0 +1,612 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a126be52", + "metadata": {}, + "source": [ + "# Halloween CSV to API db Initial Load POC" + ] + }, + { + "cell_type": "markdown", + "id": "bfebd40d", + "metadata": {}, + "source": [ + "## Initial Set-up" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0a26a1bc", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import datetime\n", + "import os\n", + "from utils import createEngine\n", + "import uuid\n", + "import numpy as np\n", + "import warnings\n", + "from pangres import upsert\n", + "from urllib.parse import urljoin\n", + "import requests\n", + "import zipfile\n", + "import io\n", + "\n", + "warnings.simplefilter(action='ignore', category=pd.errors.DtypeWarning)\n", + "warnings.simplefilter(\n", + " action='ignore', category=pd.errors.SettingWithCopyWarning)\n" + ] + }, + { + "cell_type": "markdown", + "id": "9b17ed3b", + "metadata": {}, + "source": [ + "### Define helper functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "357ba103", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "def convertBool(val):\n", + " \"\"\"\n", + " The Halloween CSVs use 1 and 0 to represent True and False. This function maps the 1s to True and the 0s to False.\n", + " \"\"\"\n", + " primary_to_bool = {1: True, 0: False}\n", + " if val in primary_to_bool.keys():\n", + " return primary_to_bool[val]\n", + " else:\n", + " return False\n", + "\n", + "def show_or_load(df, table_name, schema_name, engine, load=True):\n", + " \"\"\"\n", + " This function allows you to decide whether you do or don't proceed with loading data (so you can focus on preparing it/ debugging).\n", + " It also prints out the name of the table that is being loaded, so you can tell what has been loaded and what is in progress.\n", + " \"\"\"\n", + " if load:\n", + " print(f'Loading {table_name}')\n", + " df.to_sql(table_name, schema=schema_name, con=engine,\n", + " if_exists='append', index=False)\n", + " else:\n", + " print(f'Showing {table_name}')\n", + " print(df.head())\n", + "\n", + "def gen_uuid_id(base_url, id):\n", + " url = urljoin(base_url, id)\n", + " return uuid.uuid5(uuid.NAMESPACE_URL, url)" + ] + }, + { + "cell_type": "markdown", + "id": "6cc95d55", + "metadata": {}, + "source": [ + "### Create a database engine from the settings specified in your .env file" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09f9d8b4", + "metadata": {}, + "outputs": [], + "source": [ + "engine = createEngine()" + ] + }, + { + "cell_type": "markdown", + "id": "eb1d8d3e", + "metadata": {}, + "source": [ + "### Define the scratch workspace where the Halloween CSVs are located and where the NPPES Main File will be downloaded" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3e78049b", + "metadata": {}, + "outputs": [], + "source": [ + "scratch_dir = os.path.join('..','scratch')" + ] + }, + { + "cell_type": "markdown", + "id": "08e6c3c0", + "metadata": {}, + "source": [ + "## Get data to fill gaps in Halloween CSVs" + ] + }, + { + "cell_type": "markdown", + "id": "ce42c70e", + "metadata": {}, + "source": [ + "### Get FIPS State Reference Data\n", + "We need to load the FIPS state reference data from the target db, to serve as a lookup table between state abbreviations and state codes, because the Halloween CSVs only contain state abbreviations but the db utilizes state codes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6d13c3ce", + "metadata": {}, + "outputs": [], + "source": [ + "fips_state_df = pd.read_sql('select * from npd.fips_state', con = engine)\n", + "fips_state_df.set_index('abbreviation', inplace=True)" + ] + }, + { + "cell_type": "markdown", + "id": "c3d737e6", + "metadata": {}, + "source": [ + "### Get NPI Data\n", + "Since the Halloween CSV files do not contain sufficient attributes for NPIs, we need to download the latest NPPES main file and get the additional NPI fields that the target db is expecting" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18201595", + "metadata": {}, + "outputs": [], + "source": [ + "def generateNPPESVersion(version = 'Monthly', days_ago = 0):\n", + " current_date = datetime.datetime.now() - datetime.timedelta(days = days_ago)\n", + " if version == 'Monthly':\n", + " current_month = current_date.strftime(\"%B\")\n", + " current_year = current_date.year\n", + " csv_version = f'{current_month}_{current_year}_V2'\n", + " else:\n", + " current_week_start = current_date - datetime.timedelta(days=current_date.weekday()-7).strftime(\"%\")\n", + " current_week_end = current_date + datetime.timedelta(days = 6)\n", + "\n", + " return csv_version" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eec38e7c", + "metadata": {}, + "outputs": [], + "source": [ + "# NOTE: We are just using the previously downloaded October NPPES data\n", + "#current_date = datetime.datetime.now()\n", + "#current_month = current_date.strftime(\"%B\")\n", + "#current_year = current_date.year\n", + "#csv_version = f'{current_month}_{current_year}_V2'\n", + "#nppes_dir = os.path.join(scratch_dir,'nppes')\n", + "\n", + "# Download and unzip the NPPES CSV files\n", + "#zipData = requests.get(f'https://download.cms.gov/nppes/NPPES_Data_Dissemination_{csv_version}.zip').content\n", + "#with zipfile.ZipFile(io.BytesIO(zipData), 'r') as zip_file:\n", + "# zip_file.extractall(nppes_dir)\n", + "main_files = [f for f in os.listdir(nppes_dir) if 'npidata_pfile' in f and '_fileheader' not in f]\n", + "main_files.sort()\n", + "latest_main_file = main_files[-1]\n", + "\n", + "npi_df = pd.read_csv(os.path.join(nppes_dir, latest_main_file), usecols = ['Provider Last Name (Legal Name)', 'NPI', 'Entity Type Code', 'Replacement NPI', 'Provider Enumeration Date', 'Last Update Date',\n", + " 'NPI Deactivation Reason Code', 'NPI Deactivation Date', 'NPI Reactivation Date', 'Certification Date'])\n", + "npi_df_renamed = npi_df.rename(columns={\n", + " 'NPI': 'npi',\n", + " 'Entity Type Code': 'entity_type_code',\n", + " 'Replacement NPI': 'replacement_npi',\n", + " 'Provider Enumeration Date': 'enumeration_date',\n", + " 'Last Update Date': 'last_update_date',\n", + " 'NPI Deactivation Reason Code': 'deactivation_reason_code',\n", + " 'NPI Deactivation Date': 'deactivation_date',\n", + " 'NPI Reactivation Date': 'reactivation_date',\n", + " 'Certification Date': 'certification_date'\n", + " })" + ] + }, + { + "cell_type": "markdown", + "id": "78d95fd5", + "metadata": {}, + "source": [ + "### Populate NPI Fields\n", + "The NPPES main file strips certain field values for records with deactivated NPIs, so we populate those as needed for the target db.\n", + "1. Deactivated NPIs show up without entity_type_code values, but those are required in the db. We use the Provider Last Name (Legal Name) field to intuit whether a provider is an invidual (if there is a last name listed, the provider has entity_type_code 1) or an organization (if there is not a last name listed, the provider has entity_type_code 2)\n", + "2. Deactivated NPIs show up without enumeration_date and last_update_date values. We populate bogus dates of 1/1/1900." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ae525963", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "deactivated_npi1_condition = (npi_df_renamed['entity_type_code'].isnull())&~(npi_df_renamed['Provider Last Name (Legal Name)'].isnull())\n", + "deactivated_npi2_condition = (npi_df_renamed['entity_type_code'].isnull())&(npi_df_renamed['Provider Last Name (Legal Name)'].isnull())\n", + "npi_df_renamed.loc[deactivated_npi1_condition, ['entity_type_code', 'enumeration_date', 'last_update_date']] = [1, '1900-01-01', '1900-01-01']\n", + "npi_df_renamed.loc[deactivated_npi2_condition, ['entity_type_code', 'enumeration_date', 'last_update_date']] = [2, '1900-01-01', '1900-01-01']\n", + "del npi_df_renamed['Provider Last Name (Legal Name)']" + ] + }, + { + "cell_type": "markdown", + "id": "9ff46de7", + "metadata": {}, + "source": [ + "## Process Halloween Data" + ] + }, + { + "cell_type": "markdown", + "id": "501c6898", + "metadata": {}, + "source": [ + "### Read in the Halloween CSVs \n", + "We loop through the halloween_data folder (downloaded from the [Halloween Release Google Drive Folder](https://drive.google.com/drive/folders/1zvneyQi7xNReIfeKkdpTgxX1BziaeKkT)) and store each in a dictionary. Optionally, we can load them to the raw_csv schema within the database, to facilitate querying and inspection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ed4a504c", + "metadata": {}, + "outputs": [], + "source": [ + "load_raw_csvs = False\n", + "df_dict={}\n", + "for f in os.listdir(os.path.join(scratch_dir, 'halloween_data')):\n", + " if '.csv' in f:\n", + " tablename = f.split('.csv')[0]\n", + " df = pd.read_csv(os.path.join(scratch_dir,'halloween_data',f), na_values=[''], keep_default_na=False)\n", + " df_dict[f]=df\n", + " if load_raw_csvs:\n", + " df.to_sql(tablename, index=False, schema = 'raw_csv', con = engine, if_exists='replace')" + ] + }, + { + "cell_type": "markdown", + "id": "22276745", + "metadata": {}, + "source": [ + "### Structure Practitioner Data\n", + "Using the pracitioner csv and the practiitonerrole csv, we transform the data into the formats necessary for the individual provider tables:\n", + "* Assign a uuid id to each practitioner record\n", + "* Map the following csv fields to db fields:\n", + " * gender_code to sex, because that is how the field is captured in NPPES (in the ETL process it is renamed to gender_code, but here it is renamed back)\n", + " * name_prefix to prefix\n", + " * name_suffix to suffix\n", + "* Assign a name_use_id of 1 for \"usual,\" which is the FHIR code assigned to the primary name. Since we don't have any other provider names listed, we assume that the name provided here is the primary name\n", + "* Join practitioner data with taxonomy data, so we can associate our new uuids with the taxonomy records\n", + "* Filter out the practitioner with the NPI 1770923773, because that NPI does not exist\n", + "* Filter out the taxonomy records with a state code of 'ZZ,' because that state code does not exist\n", + "* Separate out taxonomy information from license information" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7e520af4", + "metadata": {}, + "outputs": [], + "source": [ + "practitioner_df = df_dict['practitioner.csv']\n", + "#note: we can do this because each practitioner only appears once in this table\n", + "practitioner_df['id'] = [gen_uuid_id('https://api/individual/', str(i)) for i in practitioner_df['id']]\n", + "practitioner_df_renamed = practitioner_df.rename(columns = {'gender_code': 'sex', 'name_prefix': 'prefix', 'name_suffix': 'suffix'})\n", + "practitioner_df_renamed['name_use_id'] = 1\n", + "practitioner_taxonomy_df = df_dict['practitionerrole.csv']\n", + "filtered_practitioner_taxonomy_df = practitioner_taxonomy_df.loc[practitioner_taxonomy_df['practitioner_id']!=1770923773]\n", + "merged_taxonomy_df = filtered_practitioner_taxonomy_df.merge(practitioner_df_renamed, left_on = 'practitioner_id', right_on = 'npi', suffixes = ('tax', 'individual')) \n", + "merged_taxonomy_df = merged_taxonomy_df.loc[merged_taxonomy_df['state_code']!='ZZ']\n", + "merged_taxonomy_df['state_code'] = [fips_state_df.loc[i]['id'] if i in fips_state_df.index else np.nan for i in merged_taxonomy_df['state_code']]\n", + "merged_taxonomy_df_renamed = merged_taxonomy_df.rename(columns={'idindividual': 'individual_id', 'taxonomy_code':'nucc_code'})\n", + "provider_to_taxonomy_df = merged_taxonomy_df_renamed[['npi', 'nucc_code', 'is_primary']]\n", + "provider_to_taxonomy_df['is_primary'] = provider_to_taxonomy_df['is_primary'].apply(lambda x: convertBool(x))\n", + "dedup_taxonomy_df = provider_to_taxonomy_df.sort_values(by='is_primary', ascending=False)[\n", + " ['npi', 'nucc_code', 'is_primary']].drop_duplicates(subset=['nucc_code', 'npi'])\n", + "dedup_taxonomy_df['id'] = [uuid.uuid4() for i in dedup_taxonomy_df.index]\n", + "license_df = dedup_taxonomy_df.merge(merged_taxonomy_df_renamed, on = ['npi', 'nucc_code'], suffixes = ('tax', 'cred'))\n", + "license_df_renamed = license_df.rename(columns={'id': 'provider_to_taxonomy_id'})" + ] + }, + { + "cell_type": "markdown", + "id": "2762d9ce", + "metadata": {}, + "source": [ + "### Structure Organization Data\n", + "Using the organization csv and the organization_npi csv, we attempt to discern a hierarchical organization structure and transform the data into the formats necessary for the organization tables:\n", + "* Since we only have one name per organization, we assume this is the primary name and set the is_primary field (which will later be loaded into the organization_to_name table) to True\n", + "* We associate a uuid id with each organization \n", + "* We back calculate the organization hierarchy by backpopulating the uuids into the old id and parent id fields\n", + "* We also ensure that each NPI is associated with its own organization and that the hierarchy is maintained when doing so\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "74c78543", + "metadata": {}, + "outputs": [], + "source": [ + "organization_df = df_dict['organization.csv']\n", + "organization_df['is_primary'] = True\n", + "organization_df_renamed = organization_df.rename(columns={'id':'old_org_id', 'parent_id':'old_parent_id', 'organization_name':'name'})\n", + "organization_df_renamed.set_index(['old_org_id'], inplace=True)\n", + "organization_df_renamed['org_id'] = [uuid.uuid4() for i in organization_df_renamed.index]\n", + "organization_df_renamed['org_parent_id'] = [organization_df_renamed.loc[i]['org_id'] if i in fips_state_df.index else np.nan for i in organization_df_renamed['old_parent_id']]\n", + "organization_npi_df = df_dict['organization_npi.csv']\n", + "organization_npi_df_renamed = organization_npi_df.rename(columns={'organization_id':'old_org_id'})\n", + "organization_npi_df_renamed['id'] = [gen_uuid_id('https://api/organization/', str(i)) for i in organization_npi_df_renamed['npi']]\n", + "clinical_organization_df = organization_npi_df_renamed.merge(organization_df_renamed, on='old_org_id', how='outer')\n", + "clinical_organization_df_renamed = clinical_organization_df.rename(columns={'org_id':'parent_id'})\n", + "other_organization_df = organization_df_renamed.rename(columns = {'org_id':'id', 'org_parent_id': 'parent_id'})\n" + ] + }, + { + "cell_type": "markdown", + "id": "7cb2b701", + "metadata": {}, + "source": [ + "### Structure Endpoint Data\n", + "Using the endpoint csv, we transform the data into the necessary structure for the endpoint tables:\n", + "* Rename `fhir_url` to `address`\n", + "* Create a table of unique ehr vendors and assign a uuid to each\n", + "* Join the vendor uuids back to the endpoint data\n", + "* Populate fields that are not present in the dataset (environment_type_id and endpoint_connection_type_id) with hardcoded values\n", + "* Assign a uuid to each endpoint record" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39a319ec", + "metadata": {}, + "outputs": [], + "source": [ + "endpoint_df = df_dict['endpoint.csv']\n", + "endpoint_df_renamed = endpoint_df.rename(columns={'id':'endpoint_id','fhir_url':'address'})\n", + "ehr_vendor_df = endpoint_df.drop_duplicates(subset='vendor_name')\n", + "ehr_vendor_df['id'] = [gen_uuid_id('https://api/ehr_vendor/', str(i)) for i in ehr_vendor_df['vendor_name']]\n", + "ehr_vendor_df_renamed = ehr_vendor_df.rename(columns={'vendor_name':'name'})\n", + "ehr_vendor_df_renamed.set_index('name', inplace=True, drop=False)\n", + "endpoint_df_renamed['ehr_vendor_id'] = endpoint_df_renamed['vendor_name'].apply(lambda x: ehr_vendor_df_renamed.loc[x]['id'])\n", + "endpoint_df_renamed['environment_type_id'] = 'prod'\n", + "endpoint_df_renamed['endpoint_connection_type_id'] = 'hl7-fhir-rest'\n", + "endpoint_df_renamed['id'] = [gen_uuid_id('https://api/endpoint_instance/', str(i)) for i in endpoint_df_renamed['endpoint_id']]" + ] + }, + { + "cell_type": "markdown", + "id": "885e032a", + "metadata": {}, + "source": [ + "### Structure Organization to Endpoint Data\n", + "Using the organization_endpoint csv, we transform the data into the necessary structure for the organization to endpoint relationship:\n", + "* Join the endpoint data to the organization_to_endpoint data so we can associate the endpoint uuids with the org_to_endpoint records and also join the organization data to the organization_to_endpoint data, so we can associate the organization uuids with the org_to_endpoint records" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "55f6e7f2", + "metadata": {}, + "outputs": [], + "source": [ + "org_to_endpoint_df = df_dict['organization_endpoint.csv']\n", + "merged_org_to_endpoint_df = org_to_endpoint_df.merge(endpoint_df_renamed, on = 'endpoint_id', how='outer').merge(clinical_organization_df_renamed, left_on = 'organization_npi', right_on = 'npi', suffixes = ('endpoint', 'organization'), how='outer')\n", + "merged_org_to_endpoint_df= merged_org_to_endpoint_df[['idendpoint', 'idorganization']].rename(columns = {'idendpoint': 'endpoint_instance_id', 'idorganization':'organization_id'}).dropna()" + ] + }, + { + "cell_type": "markdown", + "id": "98b6e7f0", + "metadata": {}, + "source": [ + "### Structure Address Data\n", + "Using the location csv and the npi_location csv, transform the data into the necessary structure for the address and location tables:\n", + "* Rename the columns to align with the naming in the database:\n", + " * `line` to `delivery_line_1` (note: ideally we would have multiple fields, one for each line)\n", + " * `postalcode` to `zipcode`\n", + " * `city` to `city_name`\n", + "* Assign a uuid to each address record\n", + "* Filter out the states that do not exist (FM, ~, UK, MH) (note: FM and MH are US territories, but the addresses with those values listed as states do not correspond to those territories)\n", + "* Populate the fips state codes based on state abbreviation\n", + "* Join the address data to npi_location in order to populate the address uuids\n", + "* Join the practitioner and organization data to npi_location in order to populate practitioner uuid and organization uuid\n", + "* Populate the address_use_id with a hard coded value of 2 (for work address), since there is no reference to the address type\n", + "* Assign a location uuid to each location (address associated with an organization)\n", + "* Associate the endpoints with the appropriate locations, based on their organization affiliations (we assume that each organization uses each endpoint at all their locations)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "56004b90", + "metadata": {}, + "outputs": [], + "source": [ + "address_df = df_dict['location.csv']\n", + "address_df_renamed = address_df.rename(columns={'id':'address_us_id', 'line':'delivery_line_1', 'postalcode':'zipcode', 'city':'city_name'})\n", + "address_df_renamed['id']= [uuid.uuid4() for i in address_df_renamed.index]\n", + "address_df_renamed = address_df_renamed.loc[(address_df_renamed['state'] != 'FM') & (address_df_renamed['state'] != '~') & (address_df_renamed['state'] != 'UK') & (address_df['state'] != 'MH')]\n", + "address_df_renamed['state_code'] = address_df_renamed['state'].apply(lambda x: fips_state_df.loc[x]['id'])\n", + "location_npi_df = df_dict['npi_location.csv']\n", + "merged_df_1 = location_npi_df.merge(address_df_renamed, left_on='location_id', right_on = 'address_us_id', how='outer')\n", + "merged_df_2 = merged_df_1.merge(npi_df_renamed, on = 'npi', suffixes=('address','npi'), how='outer')\n", + "merged_df_3 = merged_df_2.merge(practitioner_df_renamed, on = 'npi', suffixes = ('address', 'individual'), how='outer')\n", + "merged_location_df = merged_df_3.merge(clinical_organization_df_renamed, on = 'npi', suffixes = ('address', 'organization'), how='outer')\n", + "merged_location_df_renamed = merged_location_df.rename(columns={'idaddress':'address_id', 'idindividual':'individual_id', 'id':'organization_id', 'nameaddress':'name'})\n", + "merged_location_df_renamed['address_use_id'] = 2\n", + "individual_to_address_df = merged_location_df_renamed[['address_id','individual_id', 'address_use_id']].dropna(how='any')\n", + "location_df = merged_location_df_renamed[['address_id','organization_id','name', 'address_use_id']].dropna(how='any')\n", + "location_df['id'] = [uuid.uuid4() for i in location_df.index]\n", + "location_to_endpoint_df = location_df.merge(merged_org_to_endpoint_df, on = 'organization_id', how='outer')[['id', 'endpoint_instance_id']].dropna(how = 'any').rename(columns = {'id':'location_id'})\n" + ] + }, + { + "cell_type": "markdown", + "id": "7ece7d77", + "metadata": {}, + "source": [ + "### Structure Provider to Organization Data\n", + "Using the personal_npi_to_organizational_npi csv, transform the data into the necessary structure for the provider to organization and provider to location relationships:\n", + "* Join provider and organization data to associate the provider and organization uuids with the NPIs listed\n", + "* Assign a uuid to each provider to organization relationship\n", + "* Assign a relationship_type_id value of 2 for each relationship where the affiliation_source is 'PECOS Assignment Relationships'\n", + "* Join location information based on organization NPI (we assume each provider works at every location owned by the organization that they have a relationship with)\n", + "* Assign a uuid to each provider to location relationship" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2595c9c5", + "metadata": {}, + "outputs": [], + "source": [ + "provider_to_organization_df = df_dict['personal_npi_to_organizational_npi.csv']\n", + "merged_provider_to_org_df = provider_to_organization_df.merge(practitioner_df_renamed, left_on = 'personal_npi', right_on = 'npi', how='inner').merge(clinical_organization_df_renamed, left_on = 'organizational_npi', right_on = 'npi', suffixes = ('individual', 'organization'), how='inner')\n", + "provider_to_org_df_renamed = merged_provider_to_org_df.rename(columns = {'idindividual':'individual_id', 'idorganization':'organization_id'})\n", + "provider_to_org_df_renamed['id'] = [uuid.uuid4() for i in provider_to_org_df_renamed.index]\n", + "provider_to_org_df_renamed['relationship_type_id'] = [2 if val=='PECOS Assignment Relationships' else val for val in provider_to_org_df_renamed['affiliation_source']]\n", + "provider_to_location_df = provider_to_org_df_renamed.merge(location_df, on='organization_id', how='inner', suffixes=('porg','location'))\n", + "provider_to_location_df['id'] = [uuid.uuid4() for i in provider_to_location_df.index]\n", + "provider_to_location_df_renamed = provider_to_location_df.rename(columns={'idlocation':'location_id', 'idporg':'provider_to_organization_id'})" + ] + }, + { + "cell_type": "markdown", + "id": "c13ccbcd", + "metadata": {}, + "source": [ + "## Load the Data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "be8bb0b8", + "metadata": {}, + "outputs": [], + "source": [ + "schema_name = 'npd'\n", + "load = True\n", + "\n", + "# load npi\n", + "show_or_load(npi_df_renamed, 'npi', schema_name, engine, load)\n", + "\n", + "# load individual\n", + "show_or_load(practitioner_df_renamed[['id', 'sex']], 'individual', schema_name, engine, load)\n", + "practitioner_df_renamed_renamed = practitioner_df_renamed.rename(columns={'id':'individual_id'})\n", + "\n", + "# load individual_to_name\n", + "show_or_load(practitioner_df_renamed_renamed[['individual_id', 'first_name', 'middle_name', 'last_name', 'prefix', 'suffix', 'name_use_id']], 'individual_to_name', schema_name, engine, load)\n", + "\n", + "# load provider\n", + "show_or_load(practitioner_df_renamed_renamed.merge(npi_df_renamed, on = 'npi', how='inner')[['npi', 'individual_id']], 'provider', schema_name, engine, load)\n", + "\n", + "# load organization\n", + "show_or_load(other_organization_df[['id']], 'organization', schema_name, engine, load)\n", + "other_organization_df.set_index('id', drop=False, inplace=True)\n", + "if load:\n", + " print('adding parent_id to organization')\n", + " upsert(df = other_organization_df[['parent_id']], con = engine, schema = schema_name, if_row_exists='update', table_name = 'organization')\n", + "show_or_load(clinical_organization_df_renamed[['id']], 'organization', schema_name, engine, load)\n", + "clinical_organization_df_renamed.set_index('id', drop=False, inplace=True)\n", + "if load:\n", + " print('adding parent_id to clinical_organization organizations')\n", + " upsert(df = clinical_organization_df_renamed[['parent_id']], con = engine, schema = schema_name, if_row_exists='update', table_name = 'organization')\n", + "\n", + "other_organization_df_renamed = other_organization_df.rename(columns={'id':'organization_id', 'organization_name':'name'})\n", + "clinical_organization_df_renamed_renamed = clinical_organization_df_renamed.rename(columns={'id':'organization_id'})\n", + "\n", + "# load organization_to_name\n", + "\n", + "show_or_load(other_organization_df_renamed[['organization_id', 'name', 'is_primary']], 'organization_to_name', schema_name, engine, load)\n", + "show_or_load(clinical_organization_df_renamed_renamed[['organization_id', 'name', 'is_primary']], 'organization_to_name', schema_name, engine, load)\n", + "\n", + "# load clinical_organization\n", + "show_or_load(clinical_organization_df_renamed_renamed[['organization_id', 'npi']], 'clinical_organization', schema_name, engine, load)\n", + "\n", + "# load ehr_vendor\n", + "show_or_load(ehr_vendor_df_renamed[['id', 'name']], 'ehr_vendor', schema_name, engine, load)\n", + "\n", + "# load endpoint_instance\n", + "show_or_load(endpoint_df_renamed[['id', 'ehr_vendor_id', 'address', 'endpoint_connection_type_id', 'environment_type_id']], 'endpoint_instance', schema_name, engine, load)\n", + "\n", + "# load address_us\n", + "show_or_load(address_df_renamed[['address_us_id', 'delivery_line_1','city_name','state_code','zipcode']].rename(columns={'address_us_id':'id'}), 'address_us', schema_name, engine, load)\n", + "\n", + "# load address\n", + "show_or_load(address_df_renamed[['id', 'address_us_id']], 'address', schema_name, engine, load)\n", + "\n", + "# load individual_to_address\n", + "show_or_load(individual_to_address_df, 'individual_to_address', schema_name, engine, load)\n", + "\n", + "# load organization_to_address\n", + "show_or_load(location_df[['address_id','organization_id', 'address_use_id']], 'organization_to_address', schema_name, engine, load)\n", + "\n", + "# load location\n", + "show_or_load(location_df[['id','address_id','organization_id']], 'location', schema_name, engine, load)\n", + "\n", + "# load location_to_endpoint_instance\n", + "show_or_load(location_to_endpoint_df, 'location_to_endpoint_instance', schema_name, engine, load)\n", + "\n", + "# load provider_to_organization\n", + "show_or_load(provider_to_org_df_renamed[['individual_id', 'organization_id', 'relationship_type_id','id']], 'provider_to_organization', schema_name, engine, load)\n", + "\n", + "# load provider_to_location\n", + "show_or_load(provider_to_location_df_renamed[['location_id', 'provider_to_organization_id', 'id']], 'provider_to_location', schema_name, engine, load)\n", + "\n", + "# load provider_to_taxonomy\n", + "show_or_load(dedup_taxonomy_df, 'provider_to_taxonomy', schema_name, engine, load)\n", + "\n", + "# load provider_to_credential\n", + "###show_or_load(credential_df_renamed[['license_number', 'state_code', 'provider_to_taxonomy_id']], 'provider_to_credential', schema_name, engine, load)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09bb20d6", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/etls/loadHalloween/utils.py b/etls/loadHalloween/utils.py new file mode 100644 index 00000000..e51848cf --- /dev/null +++ b/etls/loadHalloween/utils.py @@ -0,0 +1,17 @@ +import os +from dotenv import load_dotenv +from sqlalchemy import create_engine + + +def createEngine(): + # Get database details and create engine + load_dotenv() + username = os.getenv("SOURCE_DB_USER") + password = os.getenv("SOURCE_DB_PASSWORD") + instance = os.getenv("SOURCE_DB_HOST") + db = os.getenv("SOURCE_DB_NAME") + port = os.getenv("SOURCE_DB_PORT") + engine = create_engine( + f"postgresql+psycopg2://{username}:{password}@{instance}:{port}/{db}" + ) + return engine