Skip to content

Merging changes from staging to release-v5.0.5 #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release-v5.0.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
origins = [
"http://localhost",
"http://localhost:3000",
"http://localhost:3001",
]

app.add_middleware(
Expand Down Expand Up @@ -49,7 +50,7 @@ async def create_upload_file(file: UploadFile, token: str):
@app.post("/api/generate-ingest-files/")
async def generate_ingest_files(token: str, data: RequestData):
return csv_parser_utils.generate_ingest_files(
token, data.column_metadata.model_dump(), data.program_name, data.program_desc
token, data.column_metadata.model_dump(), data.program_name, data.program_desc, data.dimensions
)


Expand Down
22 changes: 21 additions & 1 deletion src/models.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
from typing import Dict
from pydantic import BaseModel, RootModel, field_validator, constr

from typing import List

class ColumnMetadataItem(BaseModel):
updated_col_name: str
type: str
metric: bool
dimension: bool


class ColumnMetadata(RootModel):
root: Dict[str, ColumnMetadataItem]

class DimensionFieldDataValueModel(BaseModel):
updated_col_name: str
type: str
metric: bool

class DimensionFieldDataModel(BaseModel):
key: str
values: DimensionFieldDataValueModel

class DimensionFieldModel(BaseModel):
value: str
isPrimary: bool
isIndex: bool
data: DimensionFieldDataModel

class DimensionModel(BaseModel):
name: str
fields: List[DimensionFieldModel]

class RequestData(BaseModel):
program_name: constr(
Expand All @@ -21,6 +40,7 @@ class RequestData(BaseModel):
)
program_desc: str
column_metadata: ColumnMetadata
dimensions: List[DimensionModel]

@field_validator("program_name")
@classmethod
Expand Down
166 changes: 118 additions & 48 deletions src/utils/csv_parser_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,18 @@ def guess_metrics_and_columns(token: str, filename: str):
column_type_dict = dict()

for column in df.columns:
if df[column].dtype == "int":
column_type_dict[df[column].name] = {
"updated_col_name": df[column].name,
"metric": True,
"dimension": False,
}
else:
column_type_dict[df[column].name] = {
"updated_col_name": df[column].name,
"metric": False,
"dimension": True,
}
column_type_dict[df[column].name] = {
"updated_col_name": df[column].name,
"type": str(df[column].dtype),
"metric": False,
"timeDimension": False,
"dimension": False
}
return column_type_dict


def generate_ingest_files(
token: str, column_metadata: typing.Dict, program_name: str, program_desc: str
token: str, column_metadata: typing.Dict, program_name: str, program_desc: str, dimensions: typing.Iterable
):
folder_path = os.path.join(TMP_BASE_PATH, token)
file_path = os.path.join(
Expand All @@ -66,77 +61,152 @@ def generate_ingest_files(
df = pd.read_csv(file_path)

df, column_mapping = format_df_columns(df, column_metadata)

metrics, dimensions = [], []
metrics, otherCols, dimensionFkCols = [], [], []

for cols in column_metadata:
updated_col_name = column_metadata[cols]["updated_col_name"]
if column_metadata[cols]["metric"]:
metrics.append(column_mapping[updated_col_name])
elif column_metadata[cols]["dimension"]:
print('Do Nothing')
else:
dimensions.append(column_mapping[updated_col_name])
otherCols.append(cols)

for dimension in dimensions:
if (len(dimension.fields) > 1):
for field in dimension.fields:
if (field.isIndex):
data = { "key": field.data.key, "name": dimension.name }
dimensionFkCols.append(data)
break
else:
data = { "key": f"{dimension.name}_id", "name": dimension.name }
dimensionFkCols.append(data)

ingest_folder_path = os.path.join(folder_path, "ingest")

write_dimensions_to_ingest_folder(df, dimensions, ingest_folder_path)
write_events_to_ingest_folder(
df, dimensions, metrics, program_name, ingest_folder_path
)
write_dimensions_to_ingest_folder(df, dimensions, column_metadata, ingest_folder_path)
write_config_to_ingest_folder(program_name, program_desc, ingest_folder_path)
write_events_to_ingest_folder(program_name, df, metrics, dimensions, ingest_folder_path, dimensionFkCols, otherCols, column_metadata)

return {"dimension": dimensions, "metrics": metrics}

def getType(dataType: str):
if (dataType == 'int64' or dataType == 'float64'):
return 'number'
elif (dataType == 'datetime64'):
return 'date'
elif (dataType == 'object'):
return 'string'
else:
return 'string'

def write_dimensions_to_ingest_folder(
df: pd.DataFrame, dimensions: typing.Iterable, ingest_folder_path: str
df: pd.DataFrame, dimensions: typing.Iterable, column_metadata: typing.Dict, ingest_folder_path: str
):
dimensions_base_path = os.path.join(ingest_folder_path, "dimensions")
create_folder_if_not(dimensions_base_path)

for dimension in dimensions:
dimension_grammar_data = f"""PK,Index
string,string
{dimension}_id,{dimension}"""
index_row, type_row, column_row, indexed_col, req_cols = [], [], [], [], []
if (len(dimension.fields) > 1):
for field in dimension.fields:
if (field.isPrimary):
index_row.append('PK')
indexed_col.append(column_metadata[field.value]['updated_col_name'])
elif (field.isIndex):
index_row.append('Index')
indexed_col.append(column_metadata[field.value]['updated_col_name'])
else:
index_row.append('')

type_row.append(getType(column_metadata[field.value]['type']))
column_row.append(column_metadata[field.value]['updated_col_name'])
req_cols.append(column_metadata[field.value]['updated_col_name'])
else:
index_row.append('PK')
type_row.append('number')
column_row.append(f"{dimension.name}_id")

index_row.append('Index')
type_row.append(getType(column_metadata[dimension.fields[0].value]['type']))
column_row.append(column_metadata[dimension.fields[0].value]['updated_col_name'])

indexed_col.append(column_metadata[dimension.fields[0].value]['updated_col_name'])
req_cols.append(column_metadata[dimension.fields[0].value]['updated_col_name'])
dimension_grammar_data = f"""{','.join(index_row)}
{','.join(type_row)}
{','.join(column_row)}
"""

with open(
os.path.join(dimensions_base_path, f"{dimension}-dimension.grammar.csv"),
os.path.join(dimensions_base_path, f"{dimension.name}-dimension.grammar.csv"),
"w",
) as f:
f.write(dimension_grammar_data)
column_df = pd.DataFrame(df[dimension].drop_duplicates(keep="first"))
column_df.insert(
loc=0, column=f"{dimension}_id", value=range(1, len(column_df) + 1)
)
column_df.to_csv(
os.path.join(dimensions_base_path, f"{dimension}-dimension.data.csv"),

df2 = df[req_cols]
df2 = df2.drop_duplicates(subset=indexed_col)
if (len(dimension.fields) == 1):
df2.insert(
loc=0, column=f"{dimension.name}_id", value=range(1, len(df2) + 1)
)

df2.to_csv(
os.path.join(dimensions_base_path, f"{dimension.name}-dimension.data.csv"),
index=False,
)


def write_events_to_ingest_folder(
df: pd.DataFrame, dimensions, metrics, program_name, ingest_folder_path
program_name: str, df: pd.DataFrame, metrics: typing.Iterable, dimensions: typing.Iterable, ingest_folder_path: str, dimensionFkCols: typing.Iterable, otherCols: typing.Iterable, column_metadata: typing.Dict
):
events_base_path = os.path.join(ingest_folder_path, "programs", program_name)
create_folder_if_not(events_base_path)

for metric in metrics:
dimension_name_row, dimension_col_row, type_row, column_row, last_row = [], [], [], [], []
for dimensionFkCol in dimensionFkCols:
if column_metadata.get(dimensionFkCol['key']) is not None:
columnName = column_metadata[dimensionFkCol['key']]['updated_col_name']
typeOfDimension = getType(column_metadata[dimensionFkCol['key']]['type'])
else:
columnName = dimensionFkCol['key']
typeOfDimension = 'number'

dimension_name_row.append(dimensionFkCol['name'])
dimension_col_row.append(columnName)
type_row.append(typeOfDimension)
column_row.append(columnName)
last_row.append("dimension")

for otherCol in otherCols:
dimension_name_row.append("")
dimension_col_row.append("")
type_row.append(getType(column_metadata[otherCol]['type']))
column_row.append(column_metadata[otherCol]['updated_col_name'])
if (column_metadata[otherCol]['type'] == "datetime64"):
last_row.append("timeDimension")
else:
last_row.append("")

dimension_name_row.append("")
dimension_col_row.append("")
type_row.append(getType(column_metadata[metric]['type']))
column_row.append(column_metadata[metric]['updated_col_name'])
last_row.append("metric")

event_grammar = f"""{','.join(dimension_name_row)}
{','.join(dimension_col_row)}
{','.join(type_row)}
{','.join(column_row)}
{','.join(last_row)}
"""

with open(
os.path.join(events_base_path, f"{metric}-event.grammar.csv"), "w"
) as f:
f.write("," + ",".join(dimensions) + "," + "\n")
f.write("," + ",".join(dimensions) + "," + "\n")
f.write("date," + "string," * (len(dimensions)) + "integer" "\n")
f.write("date," + ",".join(dimensions) + f",{metric}" + "\n")
f.write(
"timeDimension," + "dimension," * (len(dimensions)) + "metric" + "\n"
)
headers = dimensions + [metric]
event_df = pd.DataFrame(df[headers])
event_df.insert(
loc=0, column=f"date", value=datetime.today().strftime("%d/%m/%y")
)
event_df.to_csv(
os.path.join(events_base_path, f"{metric}-event.data.csv"), index=False
)
f.write(event_grammar)


def write_config_to_ingest_folder(
Expand Down Expand Up @@ -204,5 +274,5 @@ def fetch_file_content(token: str, filename: str):
if not os.path.exists(file_path):
return None
df = pd.read_csv(file_path)
json_response = df.head().to_json(orient="records")
json_response = df.to_json(orient="records")
return json_response