Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs_src/example.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ conn.show()

# Save diagram to disk
conn.save(save_name='example.jpg')

# To provide custom names for units on the block create a dictionary that maps your block name to desired display name
# for example of a model has a block on m.fs.test_block and you want it to be shown as Test Block, and an arc that connects m.fs.test_block to some outlet via m.fs.arc_test_to_outlet and you want to be displayed "Outlet arc" do the following:

block_to_display_name_map={m.fs.test_block.name:"Test Block", m.fs.arc_test_to_outlet:'Outlet Arc'}

# Note: The dictionary can contain reference to blocks that don't even exist in the model.

# pass it into connectivity module
conn = Connectivity(input_model=model, unit_model_display_names=block_to_display_name_map)



```

## Command-line interface
Expand Down
62 changes: 33 additions & 29 deletions idaes_connectivity/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def __init__(
input_model=None,
model_flowsheet_attr: str = "",
model_build_func: str = "build",
unit_model_display_names=None,
):
"""Create from existing data or one of the valid input types.

Expand All @@ -131,12 +132,14 @@ def __init__(
use the model object as the flowsheet.
model_build_func: Name of function in `input_module` to invoke to build
and return the model object.
unit_model_display_names: Dict of user specified names for unit blocks, specify a dict {'unit_name': 'display_name'}

Raises:
ModelLoadError: Couldn't load the model/module
ValueError: Invalid inputs
"""
self._unit_classes = {}
self._user_name_map = unit_model_display_names
self._arc_descend = True # XXX: Maybe make this an option later?
if units is not None and streams is not None and connections is not None:
self.units = units
Expand Down Expand Up @@ -447,7 +450,6 @@ def _load_model(self, fs):
_log.debug(f"Arc short names: {[a.getname() for a in sorted_arcs]}")
_log.debug(f"Arc full names : {[a.name for a in sorted_arcs]}")
self._build_name_map(sorted_arcs)

for comp in sorted_arcs:
stream_name = comp.name # .getname()
src, dst = comp.source.parent_block(), comp.dest.parent_block()
Expand Down Expand Up @@ -492,53 +494,50 @@ def _load_model(self, fs):
self._rows = [[streams[i]] + r for i, r in enumerate(rows)]
_log.info("_end_ load model")

def _build_name_map(self, arcs):
"""Mapping to strip off any prefixes common to all unit names.
This mapping is used by :func:`_model_unit_name`.
def set_display_names(self, name_map):
"""Set the user-defined name map.
Args:
name_map: Dictionary mapping original names to new names
"""
self._name_map = None
if len(arcs) < 2:
return
# split names by "." into tuples
name_tuples = []
for comp in arcs:
for p in comp.source, comp.dest:
nm = p.parent_block().name.split(".")
name_tuples.append(nm)
# iteratively look if all prefixes of length n are the same
n = 1
while True:
prefixes = {tuple(nm[:n]) for nm in name_tuples}
if len(prefixes) > 1: # not common to all = stop
n -= 1
break
n += 1
if n > 0:
self._name_map = {".".join(k): ".".join(k[n:]) for k in name_tuples}
assert isinstance(name_map, dict), "Name map must be a dictionary"
self._user_name_map = name_map

def _build_name_map(self, arcs):
"""Mapping to strip off any prefixes common to all unit names.
This mapping is used by :func:`_model_unit_name`.
"""
_auto_name_map = None
self._name_map = None
if len(arcs) < 2:
return
# split names by "." into tuples
name_tuples = []
user_tuples = []
for comp in arcs:
for p in comp.source, comp.dest:
nm = p.parent_block().name.split(".")
name_tuples.append(nm)
pm = p.parent_block().name
if self._user_name_map is not None and pm in self._user_name_map:
user_tuples.append((pm, self._user_name_map[pm]))
else:
nm = pm.split(".")
name_tuples.append(nm)
# iteratively look if all prefixes of length n are the same
n = 1
while True:
while True and len(name_tuples) > 1:
prefixes = {tuple(nm[:n]) for nm in name_tuples}
if len(prefixes) > 1: # not common to all = stop
n -= 1
break
n += 1
if n > 0:
self._name_map = {".".join(k): ".".join(k[n:]) for k in name_tuples}
self._name_map = {}
_auto_name_map = {".".join(k): ".".join(k[n:]) for k in name_tuples}
self._name_map.update(_auto_name_map)
if len(user_tuples) > 0:
if self._name_map is None:
self._name_map = {}
_user_name_map = {k[0]: k[1] for k in user_tuples}
self._name_map.update(_user_name_map)

def _model_unit_name(self, block):
"""Get the unit name for a Pyomo/IDAES block."""
Expand Down Expand Up @@ -763,7 +762,9 @@ def _get_connections(self):
else:
connections.append(f"{src} --> {tgt}")
elif self._stream_labels:
label = self._clean_stream_label(stream_name)
label = self._clean_stream_label(
stream_name, self._conn._user_name_map
)
connections.append(f"{src} -- {label} -->{tgt}")
else:
connections.append(f"{src} --> {tgt}")
Expand All @@ -776,11 +777,14 @@ def _get_connections(self):
return connections, show_streams

@staticmethod
def _clean_stream_label(label):
def _clean_stream_label(label, user_label_names):
if user_label_names is not None and label in user_label_names:
return user_label_names[label]
if label.endswith("_outlet"):
label = label[:-7]
elif label.endswith("_feed"):
label = label[:-5]

label = label.replace("_", " ")
return label

Expand Down
38 changes: 38 additions & 0 deletions idaes_connectivity/tests/example_flowsheet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ def example_csv():
]


@pytest.fixture
def example_csv_with_user_names():
return [
"Arcs,user_M01,user_H02,user_F03",
"fs.s01,-1,1,0",
"fs.s02,0,-1,1",
# "f01,1,0,0",
# "k01,0,0,-1",
]


@pytest.fixture
def example_mermaid():
return [
Expand All @@ -24,6 +35,18 @@ def example_mermaid():
]


@pytest.fixture
def example_mermaid_user_names():
return [
"flowchart LR",
' Unit_B["user_M01"]',
' Unit_C["user_H02"]',
' Unit_D["user_F03"]',
" Unit_B --> Unit_C",
" Unit_C --> Unit_D",
]


@pytest.fixture
def example_d2():
return [
Expand All @@ -37,3 +60,18 @@ def example_d2():
"Unit_B -> Unit_C",
"Unit_C -> Unit_D",
]


@pytest.fixture
def example_d2_user_names():
return [
"direction: right",
"Unit_B: user_M01 {",
" shape: image",
" icon: /home/<user>/.idaes/icon_shapes/mixer.svg",
"}",
"Unit_C: user_H02",
"Unit_D: user_F03",
"Unit_B -> Unit_C",
"Unit_C -> Unit_D",
]
107 changes: 92 additions & 15 deletions idaes_connectivity/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Tests for `base` module.
"""
# stdlib
from pyexpat import model
from typing import List

# third-party
Expand All @@ -25,24 +26,53 @@
from idaes_connectivity.tests import example_flowsheet
from idaes_connectivity.tests.example_flowsheet_data import (
example_csv,
example_mermaid_user_names,
example_csv_with_user_names,
example_d2_user_names,
example_mermaid,
example_d2,
)

# avoid warnings about unused imports
_1, _2, _3 = example_csv, example_d2, example_mermaid

_4, _5, _6 = (
example_mermaid_user_names,
example_csv_with_user_names,
example_d2_user_names,
)
# Constants
STREAM_1 = "fs.s01"
UNIT_1 = "M01"


def setup():
def _setup(user_name=None):
model = example_flowsheet.build()
conn = Connectivity(input_model=model)
conn = Connectivity(input_model=model, unit_model_display_names=user_name)

return model, conn


@pytest.fixture
def setup():
model = example_flowsheet.build()
conn = Connectivity(
input_model=model,
)
return _setup()


@pytest.fixture
def setup_with_user_names():
test_dict = {
"fs.M01": "user_M01",
"fs.H02": "user_H02",
"fs.F03": "user_F03",
"fs.s01": "connector",
"fs.s02": "connector",
}
return _setup(test_dict)


def list_rstrip(x: List) -> List:
"""Return list (copy) with empty items at end removed"""
i = len(x) - 1
Expand All @@ -52,8 +82,8 @@ def list_rstrip(x: List) -> List:


@pytest.mark.unit
def test_example_data(example_csv, example_mermaid, example_d2):
model, conn = setup()
def test_example_data(setup, example_csv, example_mermaid, example_d2):
model, conn = setup
# loop over each output format
for name, text, ref in (
("CSV", CSV(conn).write(None), example_csv),
Expand All @@ -75,10 +105,40 @@ def test_example_data(example_csv, example_mermaid, example_d2):
print(f"@ End {name}")


@pytest.mark.unit
def test_example_with_user_data(
setup_with_user_names,
example_csv_with_user_names,
example_mermaid_user_names,
example_d2_user_names,
):
model, conn = setup_with_user_names
# loop over each output format
print(conn)
for name, text, ref in (
("CSV", CSV(conn).write(None), example_csv_with_user_names),
("Mermaid", Mermaid(conn).write(None), example_mermaid_user_names),
("D2", D2(conn).write(None), example_d2_user_names),
):
print(f"@ Start {name} {text},{ref}")
# normalize ws and remove blank lines at end (if any)

items = list_rstrip([t.rstrip() for t in text.split("\n")])
assert len(items) == len(ref)
# compare line by line
for i, item in enumerate(items):
# special processing for icon paths (which will differ)
if "icon:" in item:
assert "icon:" in ref[i]
else:
assert item == ref[i]
print(f"@ End {name}")


@pytest.mark.unit
@pytest.mark.parametrize("klass", (Mermaid, D2))
def test_defaults_formatters(klass):
_, conn = setup()
def test_defaults_formatters(setup, klass):
_, conn = setup

klass.defaults["stream_labels"] = True

Expand All @@ -96,11 +156,12 @@ def test_defaults_formatters(klass):


@pytest.mark.unit
def test_show(tmpdir_factory):
_, conn = setup()
def test_show(setup, tmpdir_factory):
_, conn = setup
fn = tmpdir_factory.mktemp("data").join("img.png")

conn.show()
# conn.save(save_file="test_image.png")
conn.save(save_file=fn)
test_data_dir = Path(__file__).parent.absolute() / "test_image.png"

Expand All @@ -110,10 +171,26 @@ def test_show(tmpdir_factory):
assert np.sum(np.array(ImageChops.difference(img, saved_img).getdata())) == 0


@pytest.mark.unit
def test_show_with_user_names(setup_with_user_names, tmpdir_factory):
_, conn = setup_with_user_names
fn = tmpdir_factory.mktemp("data").join("img.png")

conn.show()
# conn.save(save_file="user_test_image.png")
conn.save(save_file=fn)
test_data_dir = Path(__file__).parent.absolute() / "user_test_image.png"

img = Image.open(test_data_dir).convert("RGB")
saved_img = Image.open(fn).convert("RGB")

assert np.sum(np.array(ImageChops.difference(img, saved_img).getdata())) == 0


@pytest.mark.unit
@pytest.mark.parametrize("klass", (Mermaid, D2))
def test_stream_values_formatter(klass):
_, conn = setup()
def test_stream_values_formatter(setup, klass):
_, conn = setup
test_key, test_val = "test_value", 123
conn.set_stream_value(STREAM_1, test_key, test_val)
for stream_labels in (True, False):
Expand All @@ -130,8 +207,8 @@ def test_stream_values_formatter(klass):

@pytest.mark.unit
@pytest.mark.parametrize("klass", (Mermaid, D2))
def test_unit_values_formatter(klass):
_, conn = setup()
def test_unit_values_formatter(setup, klass):
_, conn = setup
test_key, test_val, test_unit_class = "test_value", 123, "foobar"
conn.set_unit_value(UNIT_1, test_key, test_val)
conn.set_unit_class(UNIT_1, test_unit_class)
Expand All @@ -151,8 +228,8 @@ def test_unit_values_formatter(klass):

@pytest.mark.unit
@pytest.mark.parametrize("klass", (Mermaid, D2))
def test_unit_and_stream_values_formatter(klass):
_, conn = setup()
def test_unit_and_stream_values_formatter(setup, klass):
_, conn = setup
s_test_key, s_test_val = "s_test_value", 123
conn.set_stream_value(STREAM_1, s_test_key, s_test_val)
u_test_key, u_test_val, u_test_unit_class = "u_test_value", 123, "foobar"
Expand Down
Binary file added idaes_connectivity/tests/user_test_image.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading