Skip to content

Evaluator

Bases: XoptBaseModel

Xopt Evaluator for handling the parallel execution of an evaluate function.

Parameters

function : Callable Function to evaluate. function_kwargs : dict, default={} Any kwargs to pass on to this function. max_workers : int, default=1 Maximum number of workers. executor : NormalExecutor NormalExecutor or any instantiated Executor object vectorized : bool, default=False If true, lists of evaluation points will be sent to the evaluator function to be processed in parallel instead of evaluated seperately via mapping.

Source code in xopt/evaluator.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class Evaluator(XoptBaseModel):
    """
    Xopt Evaluator for handling the parallel execution of an evaluate function.

    Parameters
    ----------
    function : Callable
        Function to evaluate.
    function_kwargs : dict, default={}
        Any kwargs to pass on to this function.
    max_workers : int, default=1
        Maximum number of workers.
    executor : NormalExecutor
        NormalExecutor or any instantiated Executor object
    vectorized : bool, default=False
        If true, lists of evaluation points will be sent to the evaluator
        function to be processed in parallel instead of evaluated seperately via
        mapping.
    """

    function: Callable
    max_workers: int = Field(1, ge=1)
    executor: NormalExecutor = Field(exclude=True)  # Do not serialize
    function_kwargs: dict = Field({})
    vectorized: bool = Field(False)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="before")
    def validate_all(cls, values):
        f = get_function(values["function"])
        kwargs = values.get("function_kwargs", {})
        kwargs = {**get_function_defaults(f), **kwargs}
        values["function"] = f
        values["function_kwargs"] = kwargs

        max_workers = values.pop("max_workers", 1)

        executor = values.pop("executor", None)
        if not executor:
            if max_workers > 1:
                executor = ProcessPoolExecutor(max_workers=max_workers)
            else:
                executor = DummyExecutor()

        # Cast as a NormalExecutor
        values["executor"] = NormalExecutor[type(executor)](executor=executor)
        values["max_workers"] = max_workers

        return values

    def evaluate(self, input: Dict, **kwargs):
        """
        Evaluate a single input dict using Evaluator.function with
        Evaluator.function_kwargs.

        Further kwargs are passed to the function.

        Inputs:
            inputs: dict of inputs to be evaluated
            **kwargs: additional kwargs to pass to the function

        Returns:
            function(input, **function_kwargs_updated)

        """
        return self.safe_function(input, **{**self.function_kwargs, **kwargs})

    def evaluate_data(
        self,
        input_data: Union[
            pd.DataFrame,
            List[Dict[str, float]],
            Dict[str, List[float]],
            Dict[str, float],
        ],
    ) -> pd.DataFrame:
        """evaluate dataframe of inputs"""
        if self.vectorized:
            output_data = self.safe_function(input_data, **self.function_kwargs)
        else:
            # This construction is needed to avoid a pickle error
            # translate input data into pandas dataframes
            if not isinstance(input_data, DataFrame):
                try:
                    input_data = DataFrame(input_data)
                except ValueError:
                    input_data = DataFrame(input_data, index=[0])

            inputs = input_data.to_dict("records")

            funcs = [self.function] * len(inputs)
            kwargs = [self.function_kwargs] * len(inputs)

            output_data = self.executor.map(
                safe_function1_for_map,
                funcs,
                inputs,
                kwargs,
            )

        return pd.concat(
            [input_data, DataFrame(output_data, index=input_data.index)], axis=1
        )

    def safe_function(self, *args, **kwargs):
        """
        Safely call the function, handling exceptions.

        Note that this should not be submitted to fuu
        """
        return safe_function(self.function, *args, **kwargs)

    def submit(self, input: Dict):
        """submit a single input to the executor

        Parameters
        ----------
        input : dict

        Returns
        -------
        Future  : Future object
        """
        if not isinstance(input, dict):
            raise ValueError("input must be a dictionary")
        # return self.executor.submit(self.function, input, **self.function_kwargs)
        # Must call a function outside of the classs
        # See: https://stackoverflow.com/questions/44144584/typeerror-cant-pickle-thread-lock-objects
        return self.executor.submit(
            safe_function, self.function, input, **self.function_kwargs
        )

    def submit_data(self, input_data: pd.DataFrame):
        """submit dataframe of inputs to executor"""
        input_data = pd.DataFrame(input_data)  # cast to dataframe for consistency

        if self.vectorized:
            # Single submission, cast to numpy array
            inputs = input_data.to_dict(orient="list")
            for key, value in inputs.items():
                inputs[key] = np.array(value)
            futures = [self.submit(inputs)]  # Single item
        else:
            # Do not use iterrows or itertuples.
            futures = [self.submit(inputs) for inputs in input_data.to_dict("records")]

        return futures

evaluate(input, **kwargs)

Evaluate a single input dict using Evaluator.function with Evaluator.function_kwargs.

Further kwargs are passed to the function.

Inputs

inputs: dict of inputs to be evaluated **kwargs: additional kwargs to pass to the function

Returns:

Type Description

function(input, **function_kwargs_updated)

Source code in xopt/evaluator.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def evaluate(self, input: Dict, **kwargs):
    """
    Evaluate a single input dict using Evaluator.function with
    Evaluator.function_kwargs.

    Further kwargs are passed to the function.

    Inputs:
        inputs: dict of inputs to be evaluated
        **kwargs: additional kwargs to pass to the function

    Returns:
        function(input, **function_kwargs_updated)

    """
    return self.safe_function(input, **{**self.function_kwargs, **kwargs})

evaluate_data(input_data)

evaluate dataframe of inputs

Source code in xopt/evaluator.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def evaluate_data(
    self,
    input_data: Union[
        pd.DataFrame,
        List[Dict[str, float]],
        Dict[str, List[float]],
        Dict[str, float],
    ],
) -> pd.DataFrame:
    """evaluate dataframe of inputs"""
    if self.vectorized:
        output_data = self.safe_function(input_data, **self.function_kwargs)
    else:
        # This construction is needed to avoid a pickle error
        # translate input data into pandas dataframes
        if not isinstance(input_data, DataFrame):
            try:
                input_data = DataFrame(input_data)
            except ValueError:
                input_data = DataFrame(input_data, index=[0])

        inputs = input_data.to_dict("records")

        funcs = [self.function] * len(inputs)
        kwargs = [self.function_kwargs] * len(inputs)

        output_data = self.executor.map(
            safe_function1_for_map,
            funcs,
            inputs,
            kwargs,
        )

    return pd.concat(
        [input_data, DataFrame(output_data, index=input_data.index)], axis=1
    )

safe_function(*args, **kwargs)

Safely call the function, handling exceptions.

Note that this should not be submitted to fuu

Source code in xopt/evaluator.py
130
131
132
133
134
135
136
def safe_function(self, *args, **kwargs):
    """
    Safely call the function, handling exceptions.

    Note that this should not be submitted to fuu
    """
    return safe_function(self.function, *args, **kwargs)

submit(input)

submit a single input to the executor

Parameters

input : dict

Returns

Future : Future object

Source code in xopt/evaluator.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def submit(self, input: Dict):
    """submit a single input to the executor

    Parameters
    ----------
    input : dict

    Returns
    -------
    Future  : Future object
    """
    if not isinstance(input, dict):
        raise ValueError("input must be a dictionary")
    # return self.executor.submit(self.function, input, **self.function_kwargs)
    # Must call a function outside of the classs
    # See: https://stackoverflow.com/questions/44144584/typeerror-cant-pickle-thread-lock-objects
    return self.executor.submit(
        safe_function, self.function, input, **self.function_kwargs
    )

submit_data(input_data)

submit dataframe of inputs to executor

Source code in xopt/evaluator.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def submit_data(self, input_data: pd.DataFrame):
    """submit dataframe of inputs to executor"""
    input_data = pd.DataFrame(input_data)  # cast to dataframe for consistency

    if self.vectorized:
        # Single submission, cast to numpy array
        inputs = input_data.to_dict(orient="list")
        for key, value in inputs.items():
            inputs[key] = np.array(value)
        futures = [self.submit(inputs)]  # Single item
    else:
        # Do not use iterrows or itertuples.
        futures = [self.submit(inputs) for inputs in input_data.to_dict("records")]

    return futures