Source code for noctis.data_transformation.preprocessing.graph_expander
from noctis.data_transformation.preprocessing.core_graph_builder import (
build_core_graph,
ValidatedStringBuilder,
UnvalidatedStringBuilder,
)
from noctis.data_architecture.graph_schema import GraphSchema
from noctis.data_architecture.datamodel import Node, Relationship
from noctis.utilities import console_logger
logger = console_logger(__name__)
[docs]
class GraphExpander:
"""
Class to expand graph data based on a given schema, including nodes and relationships.
Attributes:
schema (GraphSchema): The schema defining the structure of the graph.
nodes (dict[str, list[Node]]): Dictionary to store expanded nodes.
relationships (dict[str, list[Relationship]]): Dictionary to store expanded relationships.
"""
[docs]
def __init__(self, schema: GraphSchema):
self.schema = schema
self.nodes = {}
self.relationships = {}
def expand_reaction_step(
self, step_dict: dict[str, dict], input_format, output_format, validation
) -> tuple[dict[str:dict], dict[str:dict]]:
"""
Expand a reaction step into nodes and relationships.
Args:
step_dict (dict[str, dict]): Dictionary containing step data for the reaction.
input_format (str): Format of the input reaction string.
output_format (str): Format for the output reaction string.
validation (bool): Flag indicating whether to use validation in processing.
Returns:
tuple[dict[str, dict], dict[str, dict]]: Expanded nodes and relationships.
"""
# expand core schema
if validation:
processor = ValidatedStringBuilder(
input_format=input_format, output_format=output_format
)
base_nodes, base_relationships = build_core_graph(
reaction_data=step_dict[self.schema.base_nodes["chemical_equation"]],
builder=processor,
)
else:
processor = UnvalidatedStringBuilder(input_format=input_format)
base_nodes, base_relationships = build_core_graph(
reaction_data=step_dict[self.schema.base_nodes["chemical_equation"]],
builder=processor,
)
self.nodes.update(base_nodes)
self.relationships.update(base_relationships)
# expand extra nodes
self._expand_extra_nodes(step_dict)
# expand extra relationships
self._expand_extra_relationships()
return self.nodes, self.relationships
def _expand_extra_nodes(self, step_dict):
"""
Expand extra nodes based on the schema.
Args:
step_dict (dict[str, dict]): Dictionary containing step data for the reaction.
"""
for tag, label in self.schema.extra_nodes.items():
if label not in step_dict:
logger.warning(
f"Node with label '{label}' is missing in step_dict. Skipping this node."
)
continue
node = step_dict[
label
].copy() # Create a copy to avoid modifying the original
node = Node(
uid=step_dict[label]["uid"],
node_label=label,
properties=step_dict[label]["properties"],
)
self.nodes.setdefault(tag, []).append(node)
def _expand_extra_relationships(self):
"""
Expand extra relationships based on the schema.
"""
for tag, relationship_schema in self.schema.extra_relationships.items():
start_node = relationship_schema["start_node"]
end_node = relationship_schema["end_node"]
if start_node not in self.nodes:
logger.warning(
f"Start node '{start_node}' is missing for relationship '{tag}'. Skipping this relationship."
)
continue
if end_node not in self.nodes:
logger.warning(
f"End node '{end_node}' is missing for relationship '{tag}'. Skipping this relationship."
)
continue
for node in self.nodes[start_node]:
for another_node in self.nodes[end_node]:
relationship = Relationship(
relationship_type=relationship_schema["type"],
start_node=node,
end_node=another_node,
)
self.relationships.setdefault(tag, []).append(relationship)