Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
from typing import Dict, List, Optional, Sequence, cast
from typing import Sequence, cast

import numpy as np
import pandas as pd
import polars as pl
import pyarrow as pa
from pydantic import BaseModel


from .utils.arrow_helpers import (
create_float_downcasting_schema,
set_date_column_type_to_timestamp_ms,
)
from .utils.arrow_helpers import create_float_downcasting_schema
from .utils.statistic_function import StatisticFunction
from .service_exceptions import Service, InvalidParameterError


class VectorStatistics(BaseModel):
realizations: List[int]
timestamps_utc_ms: List[int]
values_dict: Dict[StatisticFunction, List[float]]
realizations: list[int]
timestamps_utc_ms: list[int]
values_dict: dict[StatisticFunction, list[float]]


def compute_vector_statistics_table(
summary_vector_table: pa.Table,
vector_name: str,
statistic_functions: Optional[Sequence[StatisticFunction]],
) -> Optional[pa.Table]:
statistic_functions: Sequence[StatisticFunction] | None,
) -> pa.Table | None:
"""
Compute statistics for specified summary vector in the pyarrow table.
If statistics is None, all available statistics are computed.
Returns a pyarrow.Table with a DATE column and then one column for each statistic.
"""
if statistic_functions is not None and len(statistic_functions) == 0:
raise InvalidParameterError("At least one statistic must be requested", Service.GENERAL)

if summary_vector_table.num_rows == 0:
return None

if statistic_functions is None:
statistic_functions = [
Expand All @@ -41,77 +41,54 @@ def compute_vector_statistics_table(
StatisticFunction.P50,
]

# Use wrappers for np.nanmin(), np.nanmax() and np.nanmean() to be explicit about the usage of the
# NumPy functions and to be consistent with the other statistics functions in this module.
def nanmin_func(x: List[float]) -> np.floating:
return np.nanmin(x)

def nanmax_func(x: List[float]) -> np.floating:
return np.nanmax(x)

def nanmean_func(x: List[float]) -> np.floating:
return np.nanmean(x)

# Invert p10 and p90 due to oil industry convention.
def p10_func(x: List[float]) -> np.floating:
return np.nanpercentile(x, q=90)

def p90_func(x: List[float]) -> np.floating:
return np.nanpercentile(x, q=10)

def p50_func(x: List[float]) -> np.floating:
return np.nanpercentile(x, q=50)

agg_dict = {}
# Build list of statistic expressions based on requested functions
statistics_expressions: list[pl.Expr] = []
valid_col_expr = pl.col(vector_name).drop_nans().drop_nulls()
for stat_func in statistic_functions:
if stat_func == StatisticFunction.MIN:
agg_dict["MIN"] = pd.NamedAgg(column=vector_name, aggfunc=nanmin_func)
statistics_expressions.append(valid_col_expr.min().alias("MIN"))
elif stat_func == StatisticFunction.MAX:
agg_dict["MAX"] = pd.NamedAgg(column=vector_name, aggfunc=nanmax_func)
statistics_expressions.append(valid_col_expr.max().alias("MAX"))
elif stat_func == StatisticFunction.MEAN:
agg_dict["MEAN"] = pd.NamedAgg(column=vector_name, aggfunc=nanmean_func)
statistics_expressions.append(valid_col_expr.mean().alias("MEAN"))
elif stat_func == StatisticFunction.P10:
agg_dict["P10"] = pd.NamedAgg(column=vector_name, aggfunc=p10_func)
# Inverted due to oil industry convention (P10 = 90th percentile)
statistics_expressions.append(valid_col_expr.quantile(0.9).alias("P10"))
elif stat_func == StatisticFunction.P90:
agg_dict["P90"] = pd.NamedAgg(column=vector_name, aggfunc=p90_func)
# Inverted due to oil industry convention (P90 = 10th percentile)
statistics_expressions.append(valid_col_expr.quantile(0.1).alias("P90"))
elif stat_func == StatisticFunction.P50:
agg_dict["P50"] = pd.NamedAgg(column=vector_name, aggfunc=p50_func)

if not agg_dict:
raise InvalidParameterError("At least one statistic must be requested", Service.GENERAL)

if summary_vector_table.num_rows == 0:
return None

df = summary_vector_table.select(["DATE", vector_name]).to_pandas(timestamp_as_object=True)
statistics_expressions.append(valid_col_expr.quantile(0.5).alias("P50"))

grouped: pd.core.groupby.DataFrameGroupBy = df.groupby("DATE", as_index=False, sort=True)
statistics_df: pd.DataFrame = grouped.agg(**agg_dict)
# Create Polars DataFrame from Arrow table and compute statistics
vector_df = pl.DataFrame(summary_vector_table.select(["DATE", vector_name]))
statistics_df = vector_df.sort("DATE").group_by("DATE", maintain_order=True).agg(statistics_expressions)

default_schema = pa.Schema.from_pandas(statistics_df, preserve_index=False)
schema_to_use = set_date_column_type_to_timestamp_ms(default_schema)
schema_to_use = create_float_downcasting_schema(schema_to_use)
# Convert to PyArrow
statistics_table = statistics_df.to_arrow()

statistics_table = pa.Table.from_pandas(statistics_df, schema=schema_to_use, preserve_index=False)
# Downcast float columns to save memory
schema_to_use = create_float_downcasting_schema(statistics_table.schema)
statistics_table = statistics_table.cast(schema_to_use)

return statistics_table


def compute_vector_statistics(
summary_vector_table: pa.Table,
vector_name: str,
statistic_functions: Optional[Sequence[StatisticFunction]],
) -> Optional[VectorStatistics]:
statistic_functions: Sequence[StatisticFunction] | None,
) -> VectorStatistics | None:
statistics_table = compute_vector_statistics_table(summary_vector_table, vector_name, statistic_functions)
if not statistics_table:
return None

unique_realizations: List[int] = []
unique_realizations: list[int] = []
if "REAL" in summary_vector_table.column_names:
# ! We assume the list never has None-values
unique_realizations = cast(list[int], summary_vector_table.column("REAL").unique().to_pylist())

values_dict: Dict[StatisticFunction, List[float]] = {}
values_dict: dict[StatisticFunction, list[float]] = {}
column_names = statistics_table.column_names
for stat_func in StatisticFunction:
if stat_func.value in column_names:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

import pandas as pd
import polars as pl
import pyarrow as pa
from fmu.sumo.explorer.explorer import SearchContext, SumoClient
from fmu.sumo.explorer.objects import Table
Expand Down Expand Up @@ -95,18 +95,19 @@ def create_ensemble_sensitivities(
)
if sens_case_parameter is None or sens_name_parameter is None:
return []
df = pd.DataFrame(
df = pl.DataFrame(
{
"name": sens_name_parameter.values,
"case": sens_case_parameter.values,
"REAL": sens_case_parameter.realizations,
}
)
for name, group in df.groupby("name"):
for grouped_by_columns, group in df.group_by("name"):
name = grouped_by_columns[0]
sensitivities.append(
EnsembleSensitivity(
name=name,
type=find_sensitivity_type(list(group["case"].unique())),
type=find_sensitivity_type(group["case"].unique().to_list()),
cases=create_ensemble_sensitivity_cases(group),
)
)
Expand All @@ -121,15 +122,16 @@ def find_sensitivity_type(sens_case_names: list[str]) -> SensitivityType:


def create_ensemble_sensitivity_cases(
df: pd.DataFrame,
per_sensitivity_df: pl.DataFrame,
) -> list[EnsembleSensitivityCase]:
"""Create a list of EnsembleSensitivityCase objects from a dataframe"""
cases = []
for case_name, case_df in df.groupby("case"):
for group_by_columns, case_df in per_sensitivity_df.group_by("case"):
case_name = group_by_columns[0]
cases.append(
EnsembleSensitivityCase(
name=case_name,
realizations=case_df["REAL"].unique().tolist(),
realizations=case_df["REAL"].unique().to_list(),
)
)
return cases
Expand Down
110 changes: 65 additions & 45 deletions backend_py/primary/primary/routers/pvt/converters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import List
from enum import Enum

import pandas as pd
import polars as pl
import pyarrow as pa

from .schemas import PvtData

Expand All @@ -22,35 +23,50 @@ class PHASES(str, Enum):
WATER = "Water"


def pvt_dataframe_to_api_data(data_frame: pd.DataFrame) -> List[PvtData]:
def pvt_dataframe_to_api_data(pvt_table_pa: pa.Table) -> List[PvtData]:
"""Converts the PVT table from Sumo/Ecl2Df to a list of PvtData objects"""
# Dataframe manipulation is copied from webviz-subsurface

data_frame = data_frame.rename(str.upper, axis="columns").rename(
columns={
"TYPE": "KEYWORD",
"RS": "GOR",
"RSO": "GOR",
"R": "GOR",
"RV": "OGR",
}
)
data_frame = data_frame.fillna(0)
if "GOR" in data_frame.columns and "OGR" in data_frame.columns:
data_frame["RATIO"] = data_frame["GOR"] + data_frame["OGR"]
elif "GOR" in data_frame.columns:
data_frame["RATIO"] = data_frame["GOR"]
elif "OGR" in data_frame.columns:
data_frame["RATIO"] = data_frame["OGR"]

if not "DENSITY" in data_frame.columns:
data_frame = calculate_densities(data_frame)
if not "RATIO" in data_frame.columns:
dataframe = pl.DataFrame(pvt_table_pa)

# Make upper case columns map
uppercase_columns_map = {col: col.upper() for col in dataframe.columns}

# Rename specific columns
rename_map = {
"TYPE": "KEYWORD",
"RS": "GOR",
"RSO": "GOR",
"R": "GOR",
"RV": "OGR",
}

rename_columns_map = {
**uppercase_columns_map,
**{k: v for k, v in rename_map.items() if k in uppercase_columns_map.values()},
}

# Rename columns to uppercase
dataframe = dataframe.rename(rename_columns_map)

# Fill null values with 0
dataframe = dataframe.fill_null(0.0)

# Create RATIO column
if "GOR" in dataframe.columns and "OGR" in dataframe.columns:
dataframe = dataframe.with_columns((pl.col("GOR") + pl.col("OGR")).alias("RATIO"))
elif "GOR" in dataframe.columns:
dataframe = dataframe.with_columns(pl.col("GOR").alias("RATIO"))
elif "OGR" in dataframe.columns:
dataframe = dataframe.with_columns(pl.col("OGR").alias("RATIO"))

if "DENSITY" not in dataframe.columns:
dataframe = calculate_densities(dataframe)
if "RATIO" not in dataframe.columns:
raise ValueError("The dataframe must contain a column for the ratio (OGR, GOR, R, RV, RS).")

list_of_pvtdata: List[PvtData] = []

for keyword, df_grouped_on_keyword in data_frame.groupby("KEYWORD"):
for group_by_columns, df_grouped_on_keyword in dataframe.group_by("KEYWORD", maintain_order=True):
keyword = group_by_columns[0]
if keyword in OIL_KEYWORDS:
phase = PHASES.OIL.value
name = OIL_KEYWORDS[keyword]
Expand All @@ -62,51 +78,51 @@ def pvt_dataframe_to_api_data(data_frame: pd.DataFrame) -> List[PvtData]:
name = WATER_KEYWORDS[keyword]
else:
continue
for pvtnum, df_grouped_on_pvtnum in df_grouped_on_keyword.groupby("PVTNUM"):

for group_by_columns, df_grouped_on_pvtnum in df_grouped_on_keyword.group_by("PVTNUM", maintain_order=True):
pvtnum = group_by_columns[0]
pvt_data = PvtData(
pvtnum=pvtnum,
name=name,
phase=phase,
ratio=df_grouped_on_pvtnum["RATIO"].tolist(),
pressure=df_grouped_on_pvtnum["PRESSURE"].tolist(),
volumefactor=df_grouped_on_pvtnum["VOLUMEFACTOR"].tolist(),
viscosity=df_grouped_on_pvtnum["VISCOSITY"].tolist(),
density=df_grouped_on_pvtnum["DENSITY"].tolist(),
ratio=df_grouped_on_pvtnum["RATIO"].to_list(),
pressure=df_grouped_on_pvtnum["PRESSURE"].to_list(),
volumefactor=df_grouped_on_pvtnum["VOLUMEFACTOR"].to_list(),
viscosity=df_grouped_on_pvtnum["VISCOSITY"].to_list(),
density=df_grouped_on_pvtnum["DENSITY"].to_list(),
pressure_unit=(
df_grouped_on_pvtnum["PRESSURE_UNIT"].iloc[0]
df_grouped_on_pvtnum["PRESSURE_UNIT"][0]
if "PRESSURE_UNIT" in df_grouped_on_pvtnum.columns
else "bar"
),
volumefactor_unit=(
df_grouped_on_pvtnum["VOLUMEFACTOR_UNIT"].iloc[0]
df_grouped_on_pvtnum["VOLUMEFACTOR_UNIT"][0]
if "VOLUMEFACTOR_UNIT" in df_grouped_on_pvtnum.columns
else "Rm³/Sm³"
),
viscosity_unit=(
df_grouped_on_pvtnum["VISCOSITY_UNIT"].iloc[0]
df_grouped_on_pvtnum["VISCOSITY_UNIT"][0]
if "VISCOSITY_UNIT" in df_grouped_on_pvtnum.columns
else "cP"
),
density_unit=(
df_grouped_on_pvtnum["DENSITY_UNIT"].iloc[0]
df_grouped_on_pvtnum["DENSITY_UNIT"][0]
if "DENSITY_UNIT" in df_grouped_on_pvtnum.columns
else "kg/m³"
),
ratio_unit=(
df_grouped_on_pvtnum["RATIO_UNIT"].iloc[0]
if "RATIO_UNIT" in df_grouped_on_pvtnum.columns
else "Sm³/Sm³"
df_grouped_on_pvtnum["RATIO_UNIT"][0] if "RATIO_UNIT" in df_grouped_on_pvtnum.columns else "Sm³/Sm³"
),
)
list_of_pvtdata.append(pvt_data)

return list_of_pvtdata


def calculate_densities(data_frame: pd.DataFrame) -> pd.DataFrame:
oil_density = data_frame.loc[data_frame["KEYWORD"] == "DENSITY", "OILDENSITY"].values[0]
gas_density = data_frame.loc[data_frame["KEYWORD"] == "DENSITY", "GASDENSITY"].values[0]
water_density = data_frame.loc[data_frame["KEYWORD"] == "DENSITY", "WATERDENSITY"].values[0]
def calculate_densities(data_frame: pl.DataFrame) -> pl.DataFrame:
oil_density = data_frame.filter(pl.col("KEYWORD") == "DENSITY")["OILDENSITY"][0]
gas_density = data_frame.filter(pl.col("KEYWORD") == "DENSITY")["GASDENSITY"][0]
water_density = data_frame.filter(pl.col("KEYWORD") == "DENSITY")["WATERDENSITY"][0]

def calculate_density(keyword: str, ratio: float, volume_factor: float) -> float:
density = 0.0
Expand All @@ -124,8 +140,12 @@ def calculate_density(keyword: str, ratio: float, volume_factor: float) -> float
density = water_density / volume_factor
return density

data_frame["DENSITY"] = data_frame.apply(
lambda row: calculate_density(row["KEYWORD"], row["RATIO"], row["VOLUMEFACTOR"]),
axis=1,
data_frame = data_frame.with_columns(
pl.struct(["KEYWORD", "RATIO", "VOLUMEFACTOR"])
.map_elements(
lambda row: calculate_density(row["KEYWORD"], row["RATIO"], row["VOLUMEFACTOR"]),
return_dtype=pl.Float64,
)
.alias("DENSITY")
)
return data_frame
3 changes: 1 addition & 2 deletions backend_py/primary/primary/routers/pvt/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async def get_table_data(
raise HTTPException(status_code=404, detail="PVT table not found")

sumo_table_data = await access.get_realization_table_async(table_schema, realization=realization)

pvt_data = pvt_dataframe_to_api_data(sumo_table_data.to_pandas())
pvt_data = pvt_dataframe_to_api_data(sumo_table_data)

return pvt_data
Loading
Loading