Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 168 additions & 38 deletions src/wsp_balsa/routines/io/nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import re
import zipfile
from io import IOBase, TextIOBase, TextIOWrapper
from os import PathLike
from pathlib import Path
from typing import Hashable, List, Tuple, Union
Expand Down Expand Up @@ -47,26 +48,53 @@ def process_emme_eng_notation_series(s: pd.Series, *, to_dtype=float) -> pd.Seri
return values * units


def read_nwp_base_network(
nwp_fp: Union[str, PathLike],
def _open_nwp(nwp_fp: Union[str, PathLike]) -> zipfile.ZipFile:
"""Open a Network Package file for reading.

Helper which checks for existence of network package before opening.
"""
nwp_fp = Path(nwp_fp)
if not nwp_fp.exists():
raise FileNotFoundError(f"Network Package file `{nwp_fp.as_posix()}` not found.")

return zipfile.ZipFile(nwp_fp)


def _get_file_object(file_obj: Union[str, PathLike, IOBase]) -> TextIOBase:
"""Get a file object which reads strings.

Helper method to gracefully handle reading files from str or Path filepaths,
as well as binary text files (such as the files returned by ``ZipFile.open``).
"""
if isinstance(file_obj, TextIOBase):
return file_obj
if isinstance(file_obj, IOBase):
return TextIOWrapper(file_obj, encoding="utf-8")

fp = Path(file_obj)
if not fp.exists():
raise FileNotFoundError(f"File `{fp.as_posix()}` not found.")
return open(fp, "r", encoding="utf-8")


def read_base_network_transaction(
network_transaction_file: Union[str, PathLike, IOBase]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""A function to read the base network from a Network Package file (exported from Emme using the TMG Toolbox) into
DataFrames.
"""Read an EMME base network transaction.

Args:
nwp_fp (str | PathLike): File path to the network package.
network_transaction_file: (str | PathLike | IOBase): Path to the network
transaction file, or a file object for a network transaction file.

Returns:
Tuple[pd.DataFrame, pd.DataFrame]: A tuple of DataFrames containing the nodes and links
Tuple[pd.DataFrame, pd.DataFrame]: A tuple of DataFrames containing the nodes and links.
"""
nwp_fp = Path(nwp_fp)
if not nwp_fp.exists():
raise FileNotFoundError(f"File `{nwp_fp.as_posix()}` not found.")

header_nodes, header_links, last_line = None, None, None
with zipfile.ZipFile(nwp_fp) as zf:
for i, line in enumerate(zf.open("base.211"), start=1):
line = line.strip().decode("utf-8")
with _get_file_object(network_transaction_file) as open_file:
# Determine start of nodes and links
header_nodes, header_links, last_line = None, None, None
for i, line in enumerate(open_file, start=1):
line = line.strip()
if line.startswith("c"):
continue # Skip comment lines
if line.startswith("t nodes"):
Expand All @@ -87,8 +115,10 @@ def read_nwp_base_network(
"Data3": float,
"Label": str,
}

open_file.seek(0)
nodes = pd.read_csv(
zf.open("base.211"),
open_file,
index_col="Node",
dtype=data_types,
skiprows=header_nodes,
Expand All @@ -104,8 +134,9 @@ def read_nwp_base_network(

# Read links
n_rows = last_line - header_links - 1
open_file.seek(0)
links = pd.read_csv(
zf.open("base.211"),
open_file,
index_col=["From", "To"],
skiprows=header_links,
nrows=n_rows,
Expand Down Expand Up @@ -136,47 +167,38 @@ def read_nwp_base_network(
return nodes, links


def read_nwp_exatts_list(
nwp_fp: Union[str, PathLike],
def read_exatts_list(
exatts_file: Union[str, PathLike, IOBase],
**kwargs,
) -> pd.DataFrame:
"""A function to read the extra attributes present in a Network Package file (exported from Emme using the TMG
Toolbox).
"""Read extra attributes definitions exported from EMME.

Args:
nwp_fp (str | PathLike): File path to the network package.
exatts_file (str | PathLike | IOBase): Path to the exatts file,
or a file object for an exatts file.
**kwargs: Any valid keyword arguments used by ``pandas.read_csv()``.

Returns:
pd.DataFrame
"""
nwp_fp = Path(nwp_fp)
if not nwp_fp.exists():
raise FileNotFoundError(f"File `{nwp_fp.as_posix()}` not found.")

kwargs["index_col"] = False
if "quotechar" not in kwargs:
kwargs["quotechar"] = "'"

with zipfile.ZipFile(nwp_fp) as zf:
df: pd.DataFrame = pd.read_csv(zf.open("exatts.241"), **kwargs)
with _get_file_object(exatts_file) as open_file:
df: pd.DataFrame = pd.read_csv(open_file, **kwargs)
df.columns = df.columns.str.strip()
df["type"] = df["type"].astype("category")

return df


def _base_read_nwp_att_data(
nwp_fp: Union[str, PathLike],
att_type: str,
def _base_read_extatts(
exatts_file: Union[str, PathLike, IOBase],
index_col: Union[str, List[str]],
attributes: Union[str, List[str]] = None,
**kwargs,
) -> pd.DataFrame:
nwp_fp = Path(nwp_fp)
if not nwp_fp.exists():
raise FileNotFoundError(f"File `{nwp_fp.as_posix()}` not found.")

if attributes is not None:
if isinstance(attributes, Hashable):
attributes = [attributes]
Expand All @@ -188,8 +210,8 @@ def _base_read_nwp_att_data(
if "quotechar" not in kwargs:
kwargs["quotechar"] = "'"

with zipfile.ZipFile(nwp_fp) as zf:
df: pd.DataFrame = pd.read_csv(zf.open(f"exatt_{att_type}.241"), **kwargs)
with _get_file_object(exatts_file) as open_file:
df: pd.DataFrame = pd.read_csv(open_file, **kwargs)
df.columns = df.columns.str.strip()
for col in df.columns:
if is_string_dtype(df[col]):
Expand All @@ -202,6 +224,108 @@ def _base_read_nwp_att_data(
return df


def read_node_extra_attributes(
exatts_file: Union[str, PathLike, IOBase],
*,
attributes: Union[str, List[str]] = None,
**kwargs,
) -> pd.DataFrame:
"""Read node attributes exported from EMME.

Args:
exatts_file (str | PathLike | IOBase): Path or file object corresponding to
an extra attributes file.
attributes (str | List[str], optional): Defaults to ``None``.
Names of node attributes to extract.
Note that ``'inode'`` will be included by default.
**kwargs: Any valid keyword arguments used by ``pandas.read_csv()``.

Returns:
pd.DataFrame
"""
return _base_read_extatts(exatts_file, "inode", attributes, **kwargs)


def read_link_extra_attributes(
exatts_file: Union[str, PathLike, IOBase],
*,
attributes: Union[str, List[str]] = None,
**kwargs,
) -> pd.DataFrame:
"""Read link attributes exported from EMME.

Args:
exatts_file (str | PathLike | IOBase): Path or file object corresponding to
an extra attributes file.
attributes (str | List[str], optional): Defaults to ``None``.
Names of link attributes to extract.
Note that ``'inode'`` and ``'jnode'`` will be included by default.
**kwargs: Any valid keyword arguments used by ``pandas.read_csv()``.

Returns:
pd.DataFrame
"""
return _base_read_extatts(exatts_file, ["inode", "jnode"], attributes, **kwargs)


def read_transit_line_extra_attributes(
exatts_file: Union[str, PathLike, IOBase],
*,
attributes: Union[str, List[str]] = None,
**kwargs,
) -> pd.DataFrame:
"""Read transit line attributes exported from EMME.

Args:
exatts_file (str | PathLike | IOBase): Path or file object corresponding to
an extra attributes file.
attributes (str | List[str], optional): Defaults to ``None``.
Names of transit line attributes to extract.
Note that ``'line'`` will be included by default.
**kwargs: Any valid keyword arguments used by ``pandas.read_csv()``.

Returns:
pd.DataFrame
"""
return _base_read_extatts(exatts_file, "line", attributes, **kwargs)


def read_nwp_base_network(
nwp_fp: Union[str, PathLike],
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""A function to read the base network from a Network Package file (exported from Emme using the TMG Toolbox) into
DataFrames.

Args:
nwp_fp (str | PathLike): File path to the network package.

Returns:
Tuple[pd.DataFrame, pd.DataFrame]: A tuple of DataFrames containing the nodes and links
"""
with _open_nwp(nwp_fp) as zf:
base_transaction_file = zf.open("base.211")
return read_base_network_transaction(base_transaction_file)


def read_nwp_exatts_list(
nwp_fp: Union[str, PathLike],
**kwargs,
) -> pd.DataFrame:
"""A function to read the extra attributes present in a Network Package file (exported from Emme using the TMG
Toolbox).

Args:
nwp_fp (str | PathLike): File path to the network package.
**kwargs: Any valid keyword arguments used by ``pandas.read_csv()``.

Returns:
pd.DataFrame
"""
with _open_nwp(nwp_fp) as zf:
exatts_file = zf.open("exatts.241")
return read_exatts_list(exatts_file, **kwargs)


def read_nwp_node_attributes(
nwp_fp: Union[str, PathLike],
*,
Expand All @@ -219,7 +343,9 @@ def read_nwp_node_attributes(
Returns:
pd.DataFrame
"""
return _base_read_nwp_att_data(nwp_fp, "nodes", "inode", attributes, **kwargs)
with _open_nwp(nwp_fp) as zf:
ea_file = zf.open(f"exatt_nodes.241")
return read_node_extra_attributes(ea_file, attributes=attributes, **kwargs)


def read_nwp_link_attributes(
Expand All @@ -239,7 +365,9 @@ def read_nwp_link_attributes(
Returns:
pd.DataFrame
"""
return _base_read_nwp_att_data(nwp_fp, "links", ["inode", "jnode"], attributes, **kwargs)
with _open_nwp(nwp_fp) as zf:
ea_file = zf.open(f"exatt_links.241")
return read_link_extra_attributes(ea_file, attributes=attributes, **kwargs)


def read_nwp_transit_line_attributes(
Expand All @@ -260,7 +388,9 @@ def read_nwp_transit_line_attributes(
Returns:
pd.DataFrame
"""
return _base_read_nwp_att_data(nwp_fp, "transit_lines", "line", attributes, **kwargs)
with _open_nwp(nwp_fp) as zf:
ea_file = zf.open(f"exatt_transit_lines.241")
return read_transit_line_extra_attributes(ea_file, attributes=attributes, **kwargs)


def read_nwp_traffic_results(
Expand Down