-
Notifications
You must be signed in to change notification settings - Fork 27
Open
Labels
enhancementNew feature or requestNew feature or request
Description
An sdrf-related function I use in my software a lot is reordering of sdrf columns based on an array of selected schema. It would be beneficial to have this ability in sdrf-pipelines to make it easier to automatically update old sdrf files in case of column order being updated in a schema or addition of new schemas that are now relevant to the old sdrf file.
I propose adding two methods:
- One under schema registry to compile list of columns based on an array of schemas, ordered by priority (example: ['minimum', 'human']) for each sections, source name, characteristics, special, comment, factor value.
- The other under SDRFDataframe which would read the file, compiling the list of column name from the file, check with the compile list of supposed columns order obtain from the above function and reorder the columns from the file for each section with priority from the list order and then custom field that are not in the list, then rewrite the file with the new column order and return an SDRFDataframe object.
An example implementation kinda deriving from my current usage is below
from pathlib import Path
from typing import Optional
import pandas as pd
from sdrf_pipelines.sdrf.schemas import SchemaRegistry
def compile_sdrf_columns_from_schemas(schema_names: list[str], schema_dir: str = None) -> dict[str, list[str]]:
sections = {
"source_name": [],
"characteristics": [],
"special": [],
"comment": [],
"factor_value": [],
}
seen_columns = set()
registry = SchemaRegistry(schema_dir)
for schema_name in schema_names:
schema = registry.get_schema(schema_name)
if not schema:
continue
for column_def in schema.columns:
column_name = column_def.name
if column_name in seen_columns:
continue
if column_name == "source name":
sections["source_name"].append(column_name)
elif column_name.startswith("characteristics["):
sections["characteristics"].append(column_name)
elif column_name.startswith("comment["):
sections["comment"].append(column_name)
elif column_name.startswith("factor value["):
sections["factor_value"].append(column_name)
else:
sections["special"].append(column_name)
seen_columns.add(column_name)
return sections
def reorder_columns_by_schema(
input_file: str | Path,
schema_names: list[str],
output_file: Optional[str | Path] = None,
schema_dir: Optional[str] = None
) -> pd.DataFrame:
with open(input_file, 'r') as f:
header = f.readline().strip().lower().split('\t')
df = pd.read_csv(input_file, sep="\t", dtype=str).fillna("")
sections = compile_sdrf_columns_from_schemas(schema_names, schema_dir)
processed_indices = set()
final_col_positions_by_section = {
"source_name": [],
"characteristics": [],
"special": [],
"comment": [],
"factor_value": []
}
for section, columns in sections.items():
for col in columns:
found = False
for i, orig_col in enumerate(header):
if orig_col == col and i not in processed_indices:
final_col_positions_by_section[section].append(i)
processed_indices.add(i)
found = True
break
if not found and col in header:
indices = [i for i, x in enumerate(header) if x == col and i not in processed_indices]
if indices:
final_col_positions_by_section[section].append(indices[0])
processed_indices.add(indices[0])
for i, col in enumerate(header):
if i in processed_indices:
continue
if col == "source name":
final_col_positions_by_section["source_name"].append(i)
elif col.startswith("characteristics["):
final_col_positions_by_section["characteristics"].append(i)
elif col.startswith("comment["):
final_col_positions_by_section["comment"].append(i)
elif col.startswith("factor value["):
final_col_positions_by_section["factor_value"].append(i)
else:
final_col_positions_by_section["special"].append(i)
final_col_positions = []
final_col_positions.extend(final_col_positions_by_section["source_name"])
final_col_positions.extend(final_col_positions_by_section["characteristics"])
final_col_positions.extend(final_col_positions_by_section["special"])
final_col_positions.extend(final_col_positions_by_section["comment"])
final_col_positions.extend(final_col_positions_by_section["factor_value"])
reordered_df = df.iloc[:, final_col_positions]
if output_file:
reordered_df.to_csv(output_file, sep="\t", index=False)
return reordered_dfReactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request