Source code for noctis.data_transformation.preprocessing.data_preprocessing

from collections.abc import Iterator
from dataclasses import dataclass, fields, asdict
from dask.distributed import Client, as_completed
from typing import Type, Callable, Optional
import yaml
import pandas as pd
import dask.dataframe as dd
import os
import numpy as np
from linchemin.cgu.syngraph import (
    SynGraph,
    MonopartiteMolSynGraph,
    MonopartiteReacSynGraph,
    BipartiteSynGraph,
)
from noctis.data_architecture.datacontainer import DataContainer
from noctis.data_architecture.datamodel import (
    Relationship,
    Node,
)
from noctis.data_transformation.preprocessing.utilities import (
    create_data_container,
)
from linchemin.cgu.syngraph_operations import extract_reactions_from_syngraph
from abc import ABC, abstractmethod

from noctis.data_transformation.neo4j.stylers import Neo4jImportStyle
from pathlib import Path

from noctis.utilities import console_logger
from typing import Union

from tqdm import tqdm
import textwrap

from noctis.data_transformation.preprocessing.utilities import (
    _save_dataframes_to_partition_csv,
    _save_list_to_partition_csv,
    _update_partition_dict_with_row,
    _merge_partition_files,
    _delete_tmp_folder,
    dict_to_list,
)

from noctis.data_transformation.preprocessing.graph_expander import GraphExpander
from noctis.data_architecture.graph_schema import GraphSchema

logger = console_logger(__name__)


class EmptyHeaderError(Exception):
    pass


class NoPreprocessorError(Exception):
    pass


class NoChemicalStringError(Exception):
    pass


class MissingUIDError(Exception):
    """Custom error for when a UID is missing for a node."""

    def __init__(self, label):
        self.message = f"Missing 'uid' for node with label '{label}'"
        super().__init__(self.message)


def validate_single_char(value):
    if value is not None and len(value) != 1:
        raise ValueError(
            "Invalid lineterminator: must be a single character or None. "
            "For Windows-style line endings ('\\r\\n'), keep lineterminator as None. "
            f"Received: {repr(value)}"
        )
    return value


[docs] @dataclass class PreprocessorConfig: """Configuration for preprocessing chemical data.""" inp_chem_format: Optional[str] = "smiles" out_chem_format: Optional[str] = None validation: Optional[bool] = True output_folder: Optional[str] = "output" tmp_folder: Optional[str] = None delete_tmp: Optional[bool] = True prefix: Optional[str] = None delimiter: Optional[str] = "," lineterminator: Optional[str] = None quotechar: Optional[str] = '"' blocksize: Optional[int] = 600000 # Kb; used by dask in run_parallel chunksize: Optional[int] = 10000 # number of lines; used in run_serial nrows: Optional[int] = None # only for serial run def __post_init__(self): for f in fields(self): value = getattr(self, f.name) if value is None: setattr(self, f.name, f.default) if self.tmp_folder is None: self.tmp_folder = os.path.join(self.output_folder, "tmp") if self.out_chem_format is None: self.out_chem_format = self.inp_chem_format validate_single_char(self.lineterminator) if self.nrows is not None and self.parallel: logger.warning( "The 'nrows' parameter is only applicable for serial processing. " "It will be ignored in parallel mode." ) @classmethod def build_from_yaml(cls, file_path: str) -> "PreprocessorConfig": return _load_config_from_yaml(file_path) def save_to_yaml(self, file_path: str) -> None: try: with open(file_path, "w") as file: yaml.safe_dump(asdict(self), file, sort_keys=False) logger.info(f"Configuration saved to {file_path}") except Exception as e: logger.error(f"Error saving configuration to YAML: {e}") raise NoPreprocessorError(f"Error saving configuration to YAML: {e}")
def _load_config_from_yaml(yaml_file_path: str) -> PreprocessorConfig: try: with open(yaml_file_path) as file: config_data = yaml.safe_load(file) except FileNotFoundError: logger.error(f"Configuration file not found: {yaml_file_path}") raise NoPreprocessorError(f"Configuration file not found: {yaml_file_path}") except yaml.YAMLError as e: logger.error(f"Error parsing YAML file: {e}") except Exception as e: logger.error(f"Unexpected error loading configuration: {e}") raise NoPreprocessorError(f"Unexpected error loading configuration: {e}") if not isinstance(config_data, dict): logger.error("Configuration file does not contain a dictionary") raise NoPreprocessorError("Configuration file does not contain a dictionary") return PreprocessorConfig(**config_data)
[docs] class PandasRowPreprocessorBase(ABC): """ Abstract base class for preprocessing rows of a pandas DataFrame according to a specified graph schema and configuration. Attributes: schema (GraphSchema): The schema defining the structure of nodes and relationships. config (PreprocessorConfig): Configuration settings for preprocessing. """
[docs] def __init__(self, schema: GraphSchema, config: PreprocessorConfig): """ Initialize the preprocessor with a graph schema and configuration. Args: schema (GraphSchema): The schema defining the nodes and relationships. config (PreprocessorConfig): Configuration settings for preprocessing. """ self.schema = schema self.config = config
def _process_row(self, row: pd.Series) -> tuple[dict, dict, Union[str, None]]: """ Process a single row from a DataFrame, extracting nodes and relationships. Args: row (pd.Series): A row of data from a pandas DataFrame. Returns: tuple: A tuple containing dictionaries of nodes and relationships, and a string indicating a failed chemical string, or None if successful. """ split_row = self._split_row_by_node_types(row) ge = GraphExpander(self.schema) try: nodes, relationships = ge.expand_reaction_step( split_row, self.config.inp_chem_format, self.config.out_chem_format, self.config.validation, ) return nodes, relationships, None except Exception as e: reaction_string = split_row[self.schema.base_nodes["chemical_equation"]][ "properties" ][self.config.inp_chem_format] logger.error( f"Row with reaction string {reaction_string} cannot be expanded into nodes and relationships. Error: {str(e)}" ) return {}, {}, reaction_string def _split_row_by_node_types(self, row: pd.Series) -> dict[str, dict]: """ Split a row into a dictionary organized by node types. Args: row (pd.Series): A row of data from a pandas DataFrame. Returns: dict: A dictionary mapping node labels to their properties and UID. """ node_data = {} for column, value in row.items(): parts = column.split(".", 1) if len(parts) == 2: label, property_name = parts if label not in node_data: node_data[label] = {"properties": {}} if property_name == "uid": node_data[label]["uid"] = value else: node_data[label]["properties"][property_name] = value return node_data def _validate_the_header(self, header: list[str]) -> None: """ Validate the header of a DataFrame to ensure required fields are present. Args: header (list): List of column names from the DataFrame. Raises: EmptyHeaderError: If the header is empty. NoChemicalStringError: If required chemical string fields are missing. MissingUIDError: If UID fields are missing for extra schema nodes. """ chemical_equation_label = self.schema.base_nodes["chemical_equation"] chemical_equation_field = ( f"{chemical_equation_label}.{self.config.inp_chem_format}" ) if not header: error_msg = "Header is empty" logger.error(f"EmptyHeaderError: {error_msg}") raise EmptyHeaderError(error_msg) if chemical_equation_field not in header: error_msg = f"Missing required field: {chemical_equation_field}. Current header: {header}" logger.error(f"NoChemicalStringError: {error_msg}") raise NoChemicalStringError(error_msg) for header_field in header: parts = header_field.split(".", 1) if len(parts) != 2: logger.warning( f"IncorrectHeaderFieldFormatWarning: Field '{header_field}' will be ignored during processing. Allowed format is LABEL.property" ) continue label, property_name = parts if label == self.schema.base_nodes["molecule"]: logger.warning( f"MoleculeNodeWarning: Field '{header_field}' will be ignored. Molecule nodes are reconstructed from '{chemical_equation_label}' nodes." ) continue if ( label not in self.schema.base_nodes.values() and label not in self.schema.extra_nodes.values() ): logger.warning( f"FieldNotInSchemaWarning: Field '{header_field}' will be ignored during processing. Node '{label}' is not in the schema." ) continue for node_label in self.schema.extra_nodes.values(): uid_field = f"{node_label}.uid" if uid_field not in header: error_msg = f"Missing uid for node {node_label}. All extra schema nodes require uid." logger.error(f"MissingUIDError: {error_msg}") raise MissingUIDError(error_msg)
[docs] class CSVPreprocessor(PandasRowPreprocessorBase): """ Preprocessor for handling CSV files, capable of processing data either in parallel or serially using Dask or pandas. Attributes: input_file (str): Path to the input CSV file. max_partitions (int): Maximum number of partitions for processing. """ input_file: Optional[str] = None max_partitions: Optional[int] = None def run( self, input_file: str, parallel: bool, dask_client: Optional[Client] = None ) -> None: """ Execute the preprocessing of a CSV file. Args: input_file (str): Path to the input CSV file. parallel (bool): Flag indicating whether to run in parallel mode. dask_client (Optional[Client]): Optional Dask client for parallel processing. """ self.input_file = input_file header = self._read_header() self._validate_the_header(header) if parallel: self._run_parallel(dask_client) else: self._run_serial() self._merge_all_partition_files() if self.config.delete_tmp: _delete_tmp_folder(self.config.tmp_folder) def _merge_all_partition_files(self) -> None: """ Merge all partition files into final output files. """ combined_templates = [] combined_templates.extend( [ value.upper() for value in ( self.schema.get_nodes_labels() + self.schema.get_relationships_types() ) ] ) combined_templates.append("failed_strings") combined_templates.append("empty_strings") for template in combined_templates: filename = f"{template}.csv" _merge_partition_files( filename, self.config.tmp_folder, self.config.output_folder, self.max_partitions, self.config.prefix, ) def _read_header(self) -> list[str]: """ Read the header from the input CSV file. Returns: list[str]: A list of column names from the CSV file. """ df = pd.read_csv( self.input_file, delimiter=self.config.delimiter, quotechar=self.config.quotechar, lineterminator=self.config.lineterminator, nrows=1, ) return df.columns.tolist() def _run_parallel(self, dask_client: Optional[Client]) -> None: """ Run the preprocessing in parallel using Dask. Args: dask_client (Optional[Client]): Optional Dask client for parallel processing. """ client_provided = dask_client is not None client = dask_client if client_provided else Client() ddf = dd.read_csv( self.input_file, blocksize=self.config.blocksize, delimiter=self.config.delimiter, lineterminator=self.config.lineterminator, quotechar=self.config.quotechar, ) self.max_partitions = ddf.npartitions partition_sizes = ddf.map_partitions(len).compute() offsets = np.concatenate(([0], np.cumsum(partition_sizes)[:-1])) ddf_proc = ddf.map_partitions( lambda df, partition_info: self._process_partition( df, partition_info["number"], offsets ), meta=pd.DataFrame(), ) delayed_list = ddf_proc.to_delayed() futures = client.compute(delayed_list) with tqdm( total=self.max_partitions, desc="Processing partitions in parallel" ) as pbar: for fut in as_completed(futures): fut.result() pbar.update(1) if client_provided is False: client.close() def _run_serial(self) -> None: """ Run the preprocessing serially using pandas. """ self.max_partitions = self._calculate_max_partitions_for_serial() with tqdm( total=self.max_partitions, desc="Processing partitions serially" ) as pbar: for partition_number, df_partition in enumerate( self._serial_partition_generator() ): self._process_partition(df_partition, partition_number) pbar.update(1) def _serial_partition_generator(self) -> Iterator[pd.DataFrame]: """ Generate partitions of the CSV file for serial processing. Yields: pd.DataFrame: A DataFrame representing a partition of the CSV file. """ if self.config.chunksize is None: yield pd.read_csv( self.input_file, delimiter=self.config.delimiter, quotechar=self.config.quotechar, lineterminator=self.config.lineterminator, nrows=self.config.nrows, ) else: yield from pd.read_csv( self.input_file, chunksize=self.config.chunksize, delimiter=self.config.delimiter, quotechar=self.config.quotechar, lineterminator=self.config.lineterminator, nrows=self.config.nrows, ) def _process_partition( self, df: pd.DataFrame, partition_number: int = None, offsets: Optional[list[int]] = None, ) -> None: """ Process a single partition of the CSV file. Args: df (pd.DataFrame): The DataFrame representing a partition. partition_number (int): The partition number. offsets (Optional[list[int]]): List of offsets for partition indices. """ nodes_partition = {} relationships_partition = {} failed_strings = [] empty_reaction_strings = [] for index, row in df.iterrows(): nodes_row, relationships_row, failed_chem_string = self._process_row(row) # Calculate global_index if offsets is None: global_index = index + 2 else: global_index = index + offsets[partition_number] + 2 if failed_chem_string is None: # Successful processing _update_partition_dict_with_row(nodes_partition, nodes_row) _update_partition_dict_with_row( relationships_partition, relationships_row ) elif pd.isna(failed_chem_string): # Empty or invalid reaction empty_reaction_strings.append([global_index]) else: # Failed processing failed_strings.append([failed_chem_string, global_index]) df_nodes = Neo4jImportStyle.export_nodes(nodes_partition) df_relationships = Neo4jImportStyle.export_relationships( relationships_partition ) _save_dataframes_to_partition_csv( df_nodes, df_relationships, graph_schema=self.schema, output_dir=self.config.tmp_folder, partition_num=partition_number, ) _save_list_to_partition_csv( failed_strings, header=[ f'{self.schema.base_nodes["chemical_equation"]}.{self.config.inp_chem_format}', "index", ], output_dir=self.config.tmp_folder, name="failed_strings", partition_num=partition_number, ) _save_list_to_partition_csv( empty_reaction_strings, header=[ "index", ], output_dir=self.config.tmp_folder, name="empty_strings", partition_num=partition_number, ) def _calculate_max_partitions_for_serial(self) -> int: """ Calculate the maximum number of partitions for serial processing. Returns: int: The maximum number of partitions for serial processing. """ total_rows = sum(1 for _ in open(self.input_file)) - 1 effective_rows = ( min(total_rows, self.config.nrows - 1) if self.config.nrows else total_rows ) return (effective_rows + self.config.chunksize - 1) // self.config.chunksize
[docs] class PythonObjectPreprocessorInterface(ABC): """ Abstract base class for preprocessors that handle Python objects. Defines a common interface for running preprocessing tasks. Attributes: failed_strings (list): A list to store strings that failed during processing. """ failed_strings: list[str] = [] @abstractmethod def run(self, data: object) -> DataContainer: """ Abstract method to run the preprocessing task on the given data. Args: data (object): The data to be processed. Returns: DataContainer: A container holding processed nodes and relationships. """ pass
[docs] class ChemicalStringPreprocessorBase(ABC): """ Base class for preprocessors that handle chemical strings. Provides methods for processing chemical reaction strings. Attributes: schema (GraphSchema): The schema defining the nodes and relationships. config (PreprocessorConfig): Configuration settings for preprocessing. """
[docs] def __init__(self, schema: GraphSchema, config: PreprocessorConfig): """ Initialize the preprocessor with a graph schema and configuration. Args: schema (GraphSchema): The schema defining nodes and relationships. config (PreprocessorConfig): Configuration settings for preprocessing. """ self.schema = schema self.config = config
def _process_reaction_string( self, reaction_string: str ) -> tuple[list[Node], list[Relationship], Union[None, str]]: """ Process a chemical reaction string into nodes and relationships. Args: reaction_string (str): The chemical reaction string to be processed. Returns: tuple: A tuple containing lists of nodes and relationships, and a string indicating a failed reaction string, or None if successful. """ reaction_dict = self._build_reaction_string_dict(reaction_string) ge = GraphExpander(self.schema) try: nodes, relationships = ge.expand_reaction_step( reaction_dict, self.config.inp_chem_format, self.config.out_chem_format, self.config.validation, ) return dict_to_list(nodes), dict_to_list(relationships), None except Exception as e: logger.error( f"Row with reaction string {reaction_string} cannot be expanded into nodes and relationships. Error: {str(e)}" ) return [], [], reaction_string def _build_reaction_string_dict(self, reaction_string: str) -> dict: """ Build a dictionary representation of a chemical reaction string. Args: reaction_string (str): The chemical reaction string. Returns: dict: A dictionary mapping the chemical equation label to its properties. """ label = self.schema.base_nodes["chemical_equation"] return {label: {"properties": {self.config.inp_chem_format: reaction_string}}}
[docs] class PythonObjectPreprocessorFactory: """ Factory class for creating preprocessors based on data type. Manages registration and retrieval of preprocessor classes. Attributes: preprocessors (dict): A dictionary mapping data types to preprocessor classes. """ preprocessors: dict[str, Type[PythonObjectPreprocessorInterface]] = {} @classmethod def get_preprocessor( cls, data_type: str, schema: GraphSchema, config: PreprocessorConfig, ) -> PythonObjectPreprocessorInterface: """ Retrieve a preprocessor class based on the data type. Args: data_type (str): The type of data to be processed. schema (GraphSchema): The schema defining nodes and relationships. config (PreprocessorConfig): Configuration settings for preprocessing. Returns: PythonObjectPreprocessorInterface: An instance of the preprocessor class. Raises: ValueError: If the data type is not supported. """ preprocessor_class = cls.preprocessors.get(data_type) if preprocessor_class: return preprocessor_class(schema, config) else: raise ValueError(f"Unsupported data type: {data_type}") @classmethod def register_preprocessor( cls, data_type: str ) -> Callable[ [Type[PythonObjectPreprocessorInterface]], Type[PythonObjectPreprocessorInterface], ]: """ Decorator to register a preprocessor class for a specific data type. Args: data_type (str): The type of data the preprocessor handles. Returns: Callable: A decorator function to register the preprocessor class. """ def decorator( preprocessor: type[PythonObjectPreprocessorInterface], ) -> type[PythonObjectPreprocessorInterface]: cls.preprocessors[data_type] = preprocessor return preprocessor return decorator
[docs] @PythonObjectPreprocessorFactory.register_preprocessor("dataframe") class DataFramePreprocessor( PandasRowPreprocessorBase, PythonObjectPreprocessorInterface ): """ Preprocessor for handling pandas DataFrames, extracting nodes and relationships based on a predefined schema and configuration. Inherits from: PandasRowPreprocessorBase PythonObjectPreprocessorInterface """ def run(self, df: pd.DataFrame) -> DataContainer: """ Process the DataFrame to extract nodes and relationships. Args: df (pd.DataFrame): The DataFrame containing the data to be processed. Returns: DataContainer: A container holding processed nodes and relationships. """ nodes: list[Node] = [] relationships: list[Relationship] = [] header = df.columns.tolist() self._validate_the_header(header) for _, row in df.iterrows(): nodes_row, relationships_row, failed_chem_string = self._process_row(row) if failed_chem_string: self.failed_strings.append(failed_chem_string) else: nodes.extend( [node for node_list in nodes_row.values() for node in node_list] ) relationships.extend( [rel for rel_list in relationships_row.values() for rel in rel_list] ) return create_data_container( nodes, relationships, self.schema.base_nodes["chemical_equation"] )
[docs] @PythonObjectPreprocessorFactory.register_preprocessor("reaction_string") class ReactionStringsPreprocessor( ChemicalStringPreprocessorBase, PythonObjectPreprocessorInterface ): """ Preprocessor for handling lists of chemical reaction strings, extracting nodes and relationships based on a predefined schema and configuration. Inherits from: ChemicalStringPreprocessorBase PythonObjectPreprocessorInterface """ def run(self, data: list[str]) -> DataContainer: """ Process a list of chemical reaction strings to extract nodes and relationships. Args: data (list[str]): A list of chemical reaction strings. Returns: DataContainer: A container holding processed nodes and relationships. """ nodes: list[Node] = [] relationships: list[Relationship] = [] for reaction_string in data: ( nodes_reaction, relationships_reaction, failed_string, ) = self._process_reaction_string(reaction_string) if failed_string: self.failed_strings.append(failed_string) else: nodes.extend(nodes_reaction) relationships.extend(relationships_reaction) return create_data_container( nodes, relationships, self.schema.base_nodes["chemical_equation"] )
[docs] @PythonObjectPreprocessorFactory.register_preprocessor("syngraph") class SynGraphPreprocessor( ChemicalStringPreprocessorBase, PythonObjectPreprocessorInterface ): """ Preprocessor for handling synthetic graph objects, extracting nodes and relationships based on a predefined schema and configuration. Inherits from: ChemicalStringPreprocessorBase PythonObjectPreprocessorInterface """
[docs] def __init__(self, schema: GraphSchema, config: PreprocessorConfig): """ Initialize the preprocessor with a graph schema and configuration. Sets validation to False for synthetic graph processing. Args: schema (GraphSchema): The schema defining nodes and relationships. config (PreprocessorConfig): Configuration settings for preprocessing. """ super().__init__(schema, config) self.config.validation = False
def run( self, data: list[ Union[MonopartiteReacSynGraph, BipartiteSynGraph, MonopartiteMolSynGraph] ], ) -> DataContainer: """ Process a list of synthetic graph objects to extract nodes and relationships. Args: data (list[Union[MonopartiteReacSynGraph, BipartiteSynGraph, MonopartiteMolSynGraph]]): A list of synthetic graph objects. Returns: DataContainer: A container holding processed nodes and relationships. """ nodes: list[Node] = [] relationships: list[Relationship] = [] for one_syngraph in data: reactions = extract_reactions_from_syngraph(one_syngraph) for reaction in reactions: reaction_string = reaction.get("input_string") ( nodes_reaction, relationships_reaction, _, ) = self._process_reaction_string(reaction_string) nodes.extend(nodes_reaction) relationships.extend(relationships_reaction) return create_data_container( nodes, relationships, self.schema.base_nodes["chemical_equation"] )
[docs] class Preprocessor: """ A class to handle preprocessing tasks for various data formats, including CSV files and Python objects, for Neo4j integration. Attributes: preprocessor (Optional[PythonObjectPreprocessorInterface]): The preprocessor instance. schema (Optional[GraphSchema]): The graph schema for processing. config (PreprocessorConfig): Configuration settings for preprocessing. """
[docs] def __init__(self, schema: Optional[GraphSchema] = GraphSchema()): """ Initialize the Preprocessor with a graph schema and default configuration. Args: schema (Optional[GraphSchema]): The graph schema for processing. """ self.preprocessor: Optional[PythonObjectPreprocessorInterface] = None self.schema = schema self.config = PreprocessorConfig()
def set_config_from_yaml(self, file_path: Optional[str] = None) -> None: """ Set the preprocessor configuration from a YAML file. Args: file_path (Optional[str]): Path to the YAML configuration file. Raises: ValueError: If no file path is provided or if the file doesn't exist. FileNotFoundError: If the configuration file does not exist. YAMLError: If there's an error parsing the YAML file. """ if not file_path: logger.error("No file path provided for YAML configuration.") raise ValueError("A file path must be provided to set the configuration.") yaml_path = Path(file_path) if not yaml_path.exists(): logger.error(f"Configuration file not found: {file_path}") raise FileNotFoundError( f"The configuration file does not exist: {file_path}" ) try: self.config = PreprocessorConfig.build_from_yaml(yaml_path) logger.info(f"Successfully loaded configuration from {file_path}") except yaml.YAMLError as e: logger.error(f"Error parsing YAML file {file_path}: {str(e)}") raise except Exception as e: logger.error( f"Unexpected error while loading configuration from {file_path}: {str(e)}" ) raise def preprocess_csv_for_neo4j_parallel( self, input_file: str, dask_client: Optional[Client] = None, output_folder: Optional[str] = None, tmp_folder: Optional[str] = None, inp_chem_format: Optional[str] = None, out_chem_format: Optional[str] = None, validation: Optional[bool] = None, prefix: Optional[str] = None, blocksize: Optional[int] = None, delimiter: Optional[str] = None, lineterminator: Optional[str] = None, delete_tmp: Optional[bool] = None, quotechar: Optional[str] = None, ) -> None: """ Preprocess a CSV file for Neo4j integration using parallel processing. Args: input_file (str): Path to the input CSV file. dask_client (Optional[Client]): Optional Dask client for parallel processing. output_folder (Optional[str]): Folder to store output files. tmp_folder (Optional[str]): Temporary folder for intermediate files. inp_chem_format (Optional[str]): Input chemical format. out_chem_format (Optional[str]): Output chemical format. validation (Optional[bool]): Whether to validate the data. prefix (Optional[str]): Prefix for output files. blocksize (Optional[int]): Block size for Dask processing. delimiter (Optional[str]): Delimiter used in the CSV file. lineterminator (Optional[str]): Line terminator used in the CSV file. delete_tmp (Optional[bool]): Whether to delete temporary files. quotechar (Optional[str]): Quote character used in the CSV file. """ config_dict = asdict(self.config) new_config = { "output_folder": output_folder, "tmp_folder": tmp_folder, "inp_chem_format": inp_chem_format, "out_chem_format": out_chem_format, "validation": validation, "prefix": prefix, "blocksize": blocksize, "delimiter": delimiter, "lineterminator": lineterminator, "delete_tmp": delete_tmp, "quotechar": quotechar, } config_dict.update({k: v for k, v in new_config.items() if v is not None}) config = PreprocessorConfig(**config_dict) self.preprocessor = CSVPreprocessor(self.schema, config) self.preprocessor.run(input_file, parallel=True, dask_client=dask_client) def preprocess_csv_for_neo4j_serial( self, input_file: str, output_folder: Optional[str] = None, tmp_folder: Optional[str] = None, inp_chem_format: Optional[str] = None, out_chem_format: Optional[str] = None, validation: Optional[bool] = None, prefix: Optional[str] = None, delimiter: Optional[str] = None, lineterminator: Optional[str] = None, delete_tmp: Optional[bool] = None, quotechar: Optional[str] = None, chunksize: Optional[int] = None, nrows: Optional[int] = None, ) -> None: """ Preprocess a CSV file for Neo4j integration using serial processing. Args: input_file (str): Path to the input CSV file. output_folder (Optional[str]): Folder to store output files. tmp_folder (Optional[str]): Temporary folder for intermediate files. inp_chem_format (Optional[str]): Input chemical format. out_chem_format (Optional[str]): Output chemical format. validation (Optional[bool]): Whether to validate the data. prefix (Optional[str]): Prefix for output files. delimiter (Optional[str]): Delimiter used in the CSV file. lineterminator (Optional[str]): Line terminator used in the CSV file. delete_tmp (Optional[bool]): Whether to delete temporary files. quotechar (Optional[str]): Quote character used in the CSV file. chunksize (Optional[int]): Chunk size for reading the CSV file. nrows (Optional[int]): Number of rows to read from the CSV file. """ config_dict = asdict(self.config) new_config = { "output_folder": output_folder, "tmp_folder": tmp_folder, "inp_chem_format": inp_chem_format, "out_chem_format": out_chem_format, "validation": validation, "prefix": prefix, "chunksize": chunksize, "nrows": nrows, "delimiter": delimiter, "lineterminator": lineterminator, "delete_tmp": delete_tmp, "quotechar": quotechar, } config_dict.update({k: v for k, v in new_config.items() if v is not None}) config = PreprocessorConfig(**config_dict) self.preprocessor = CSVPreprocessor(self.schema, config) self.preprocessor.run(input_file, parallel=False) def preprocess_object_for_neo4j( self, data: Union[pd.DataFrame, list[Union[str, "SynGraph"]]], data_type: str, inp_chem_format: Optional[str] = None, out_chem_format: Optional[str] = None, validation: Optional[bool] = None, ) -> DataContainer: """ Preprocess Python objects for Neo4j integration. Args: data (Union[pd.DataFrame, list[Union[str, "SynGraph"]]]): The data to be processed. data_type (str): The type of data to be processed. inp_chem_format (Optional[str]): Input chemical format. out_chem_format (Optional[str]): Output chemical format. validation (Optional[bool]): Whether to validate the data. Returns: DataContainer: A container holding processed nodes and relationships. """ config_dict = asdict(self.config) new_config = { "inp_chem_format": inp_chem_format, "out_chem_format": out_chem_format, "validation": validation, } config_dict.update({k: v for k, v in new_config.items() if v is not None}) config = PreprocessorConfig(**config_dict) self.preprocessor = PythonObjectPreprocessorFactory.get_preprocessor( data_type, self.schema, config ) data_container = self.preprocessor.run(data) return data_container def get_failed_strings(self): """ Retrieve strings that failed during preprocessing. Returns: Optional[list[str]]: A list of failed strings, or None if stored in a file. Raises: NoPreprocessorError: If no preprocessor has been defined. """ if self.preprocessor is None: logger.error("No preprocessor has been defined") raise NoPreprocessorError if isinstance(self.preprocessor, PythonObjectPreprocessorInterface): return self.preprocessor.failed_strings else: file_name = os.path.join( self.preprocessor.config.output_folder, "failed_strings.csv" ) logger.info(f"Failed strings are in a file {file_name}") return None @classmethod def info(cls) -> None: """ Display information about the Preprocessor capabilities, including the types of objects it can transform and the reaction string formats it supports. Provides a small usage example. Prints: - Supported data types for transformation. - Reaction string formats it can process. - Usage example. """ # Supported formats supported_formats = {"smiles", "smarts", "rxn_blockV3K", "rxn_blockV2K"} # Supported data types supported_data_types = list( PythonObjectPreprocessorFactory.preprocessors.keys() ) # Information display print("Available Preprocessing Capabilities:") print("=====================================") print( "Name Required Args Optional Args " ) print( "------------------------------------------------------------------------------------------------------------------------" ) print( "preprocess_csv_for_neo4j_serial input_file output_folder, tmp_folder, " ) print( " inp_chem_format, out_chem_format, " ) print( " validation, prefix, delimiter, " ) print( " lineterminator, delete_tmp, " ) print( " quotechar, chunksize, nrows " ) print( "------------------------------------------------------------------------------------------------------------------------" ) print( "preprocess_csv_for_neo4j_parallel input_file output_folder, tmp_folder, " ) print( " inp_chem_format, out_chem_format, " ) print( " validation, prefix, blocksize, " ) print( " delimiter, lineterminator, " ) print( " delete_tmp, quotechar, " ) print( " dask_client " ) print( "------------------------------------------------------------------------------------------------------------------------" ) print( "preprocess_object_for_neo4j data, data_type inp_chem_format, out_chem_format, " ) print( " validation " ) print( "------------------------------------------------------------------------------------------------------------------------" ) print("\nSupported Data Types:") print("=====================") for data_type in supported_data_types: print(f"- {data_type}") print("\nSupported Reaction String Formats:") print("===================================") for format_name in supported_formats: print(f"- {format_name}") print("\nUsage Example:") print("--------------") print( textwrap.dedent( """ preprocessor = Preprocessor(schema = GraphSchema(...)) # Optional set configuration from a YAML file preprocessor.set_config_from_yaml('path/to/config.yaml') # Preprocess a CSV file for Neo4j integration preprocessor.preprocess_csv_for_neo4j_serial( input_file='path/to/input.csv', output_folder='path/to/output', inp_chem_format='smiles', out_chem_format='rxn_blockV3K', validation=True ) # Preprocess Python objects data = ['N.O>>C', 'T.I>>S'] data_container = preprocessor.preprocess_object_for_neo4j( data=data, data_type='reaction_string', inp_chem_format='smiles', out_chem_format='smarts' ) """ ) )