|
11 | 11 | Optional, |
12 | 12 | Set, |
13 | 13 | Tuple, |
| 14 | + Union, |
14 | 15 | ) |
15 | 16 |
|
16 | | -from corneto._types import Edge |
| 17 | +import numpy as np |
| 18 | + |
| 19 | +from corneto._io import import_cobra_model |
| 20 | +from corneto._types import CobraModel, Edge, NxDiGraph, NxGraph |
17 | 21 |
|
18 | 22 | from ._base import Attr, Attributes, BaseGraph, EdgeType |
19 | 23 |
|
@@ -616,6 +620,27 @@ def from_json(cls, json_str: str) -> "Graph": |
616 | 620 | data = json.loads(json_str) |
617 | 621 | return cls.from_dict(data) |
618 | 622 |
|
| 623 | + @staticmethod |
| 624 | + def from_networkx(G: Union[NxGraph, NxDiGraph]): |
| 625 | + """Create graph from NetworkX graph. |
| 626 | +
|
| 627 | + Args: |
| 628 | + G: NetworkX graph instance |
| 629 | +
|
| 630 | + Returns: |
| 631 | + Graph instance with equivalent structure |
| 632 | + """ |
| 633 | + Gc = Graph() |
| 634 | + is_directed = G.is_directed() |
| 635 | + for edge in G.edges(): |
| 636 | + e_data = G.get_edge_data(edge[0], edge[1], default=dict()) |
| 637 | + if is_directed: |
| 638 | + e_data[Attr.EDGE_TYPE.value] = EdgeType.DIRECTED.value |
| 639 | + else: |
| 640 | + e_data[Attr.EDGE_TYPE.value] = EdgeType.UNDIRECTED.value |
| 641 | + Gc.add_edge(edge[0], edge[1], **e_data) |
| 642 | + return Gc |
| 643 | + |
619 | 644 | def save_json( |
620 | 645 | self, |
621 | 646 | filepath: str, |
@@ -686,3 +711,139 @@ def load_json(cls, filepath: str, compression: Optional[str] = "auto") -> "Graph |
686 | 711 |
|
687 | 712 | # Create graph from JSON string |
688 | 713 | return cls.from_json(json_str) |
| 714 | + |
| 715 | + # DEPRECATED |
| 716 | + |
| 717 | + @staticmethod |
| 718 | + def from_vertex_incidence( |
| 719 | + A: np.ndarray, |
| 720 | + vertex_ids: Union[List[str], np.ndarray], |
| 721 | + edge_ids: Union[List[str], np.ndarray], |
| 722 | + ): |
| 723 | + """Create graph from vertex incidence matrix and labels. |
| 724 | +
|
| 725 | + Args: |
| 726 | + A: Vertex incidence matrix. Rows are vertices, columns are edges. |
| 727 | + Non-zero entries indicate edge-vertex connections. |
| 728 | + vertex_ids: Labels for vertices corresponding to matrix rows |
| 729 | + edge_ids: Labels for edges corresponding to matrix columns |
| 730 | +
|
| 731 | + Returns: |
| 732 | + Graph instance constructed from incidence matrix |
| 733 | +
|
| 734 | + Raises: |
| 735 | + ValueError: If dimensions of inputs don't match |
| 736 | + """ |
| 737 | + g = Graph() |
| 738 | + if len(vertex_ids) != A.shape[0]: |
| 739 | + raise ValueError( |
| 740 | + """The number of rows in A matrix is different from |
| 741 | + the number of vertex ids""" |
| 742 | + ) |
| 743 | + if len(edge_ids) != A.shape[1]: |
| 744 | + raise ValueError( |
| 745 | + """The number of columns in A matrix is different from |
| 746 | + the number of edge ids""" |
| 747 | + ) |
| 748 | + for v in vertex_ids: |
| 749 | + g.add_vertex(v) |
| 750 | + for j, v in enumerate(edge_ids): |
| 751 | + values = A[:, j] |
| 752 | + idx = np.flatnonzero(values) |
| 753 | + coeffs = values[idx] |
| 754 | + v_names = [vertex_ids[i] for i in idx] |
| 755 | + s = {n: val for n, val in zip(v_names, coeffs) if val < 0} |
| 756 | + t = {n: val for n, val in zip(v_names, coeffs) if val > 0} |
| 757 | + g.add_edge(s, t, id=v) |
| 758 | + return g |
| 759 | + |
| 760 | + @staticmethod |
| 761 | + def from_sif( |
| 762 | + sif_file: str, |
| 763 | + delimiter: str = "\t", |
| 764 | + has_header: bool = False, |
| 765 | + discard_self_loops: Optional[bool] = True, |
| 766 | + column_order: List[int] = [0, 1, 2], |
| 767 | + ): |
| 768 | + """Create graph from Simple Interaction Format (SIF) file. |
| 769 | +
|
| 770 | + Args: |
| 771 | + sif_file: Path to SIF file |
| 772 | + delimiter: Column delimiter in file |
| 773 | + has_header: Whether file has a header row |
| 774 | + discard_self_loops: Whether to ignore self-loops |
| 775 | + column_order: Order of source, interaction, target columns |
| 776 | +
|
| 777 | + Returns: |
| 778 | + New Graph loaded from SIF file |
| 779 | + """ |
| 780 | + from corneto._io import _read_sif_iter |
| 781 | + |
| 782 | + it = _read_sif_iter( |
| 783 | + sif_file, |
| 784 | + delimiter=delimiter, |
| 785 | + has_header=has_header, |
| 786 | + discard_self_loops=discard_self_loops, |
| 787 | + column_order=column_order, |
| 788 | + ) |
| 789 | + return Graph.from_sif_tuples(it) |
| 790 | + |
| 791 | + @staticmethod |
| 792 | + def from_sif_tuples(tuples: Iterable[Tuple]): |
| 793 | + """Create graph from iterable of SIF tuples. |
| 794 | +
|
| 795 | + Args: |
| 796 | + tuples: Iterable of (source, interaction, target) tuples |
| 797 | +
|
| 798 | + Returns: |
| 799 | + New Graph created from SIF data |
| 800 | + """ |
| 801 | + g = Graph() |
| 802 | + for s, v, t in tuples: |
| 803 | + g.add_edge(s, t, interaction=v) |
| 804 | + return g |
| 805 | + |
| 806 | + @staticmethod |
| 807 | + def from_cobra_model(model: CobraModel): |
| 808 | + """Create graph from COBRA metabolic model. |
| 809 | +
|
| 810 | + Args: |
| 811 | + model: COBRApy model instance |
| 812 | +
|
| 813 | + Returns: |
| 814 | + New Graph representing the metabolic network |
| 815 | + """ |
| 816 | + S, R, M = import_cobra_model(model) |
| 817 | + G = Graph.from_vertex_incidence(S, M["id"], R["id"]) |
| 818 | + # Add metadata to the graph, such as default lb/ub for reactions |
| 819 | + for i in range(G.num_edges): |
| 820 | + attr = G.get_attr_edge(i) |
| 821 | + attr["default_lb"] = R["lb"][i] |
| 822 | + attr["default_ub"] = R["ub"][i] |
| 823 | + attr["GPR"] = R["gpr"][i] |
| 824 | + return G |
| 825 | + |
| 826 | + @staticmethod |
| 827 | + def from_miom_model(model): |
| 828 | + """Create graph from MIOM metabolic model. |
| 829 | +
|
| 830 | + Args: |
| 831 | + model: MIOM model instance or path to compressed model file |
| 832 | +
|
| 833 | + Returns: |
| 834 | + New Graph representing the metabolic network |
| 835 | + """ |
| 836 | + if isinstance(model, str): |
| 837 | + from corneto._io import _load_compressed_gem |
| 838 | + |
| 839 | + S, R, M = _load_compressed_gem(model) |
| 840 | + else: |
| 841 | + S = model.S, M = model.M, R = model.R |
| 842 | + G = Graph.from_vertex_incidence(S, M["id"], R["id"]) |
| 843 | + # Add metadata to the graph, such as default lb/ub for reactions |
| 844 | + for i in range(G.num_edges): |
| 845 | + attr = G.get_attr_edge(i) |
| 846 | + attr["default_lb"] = R["lb"][i] |
| 847 | + attr["default_ub"] = R["ub"][i] |
| 848 | + attr["GPR"] = R["gpr"][i] |
| 849 | + return G |
0 commit comments