Skip to content

Commit

Permalink
fix: Spark Imputer conversion with multiple input cols (#608)
Browse files Browse the repository at this point in the history
* fix: Spark Imputer conversion with multiple input cols

Signed-off-by: Jason Wang <[email protected]>

* remove whitespace

Signed-off-by: Jason Wang <[email protected]>

---------

Signed-off-by: Jason Wang <[email protected]>
  • Loading branch information
memoryz authored Feb 27, 2023
1 parent db71727 commit 175aee0
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
13 changes: 7 additions & 6 deletions onnxmltools/convert/sparkml/operator_converters/imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
from ...common.data_types import Int64TensorType, FloatTensorType
from ...common.utils import check_input_and_output_numbers, check_input_and_output_types
from ...common._registration import register_converter, register_shape_calculator
from ...common._topology import Operator, Scope
from pyspark.ml.feature import ImputerModel
from typing import List


def convert_imputer(scope, operator, container):
op = operator.raw_operator

def convert_imputer(scope: Scope, operator: Operator, container):
op: ImputerModel = operator.raw_operator
op_type = 'Imputer'
name = scope.get_unique_operator_name(op_type)
attrs = {'name': name}
input_type = operator.inputs[0].type
surrogates = op.surrogateDF.toPandas().values[0].tolist()
value = op.getOrDefault('missingValue')

if isinstance(input_type, FloatTensorType):
attrs['imputed_value_floats'] = surrogates
attrs['replaced_value_float'] = value
Expand All @@ -37,13 +39,12 @@ def convert_imputer(scope, operator, container):
name=scope.get_unique_operator_name('Split'),
op_version=2,
axis=1,
split=range(1, len(operator.output_full_names)))
split=[1] * len(operator.output_full_names))
else:
container.add_node(op_type, operator.inputs[0].full_name, operator.output_full_names[0],
op_domain='ai.onnx.ml',
**attrs)


register_converter('pyspark.ml.feature.ImputerModel', convert_imputer)

def calculate_imputer_output_shapes(operator):
Expand Down
8 changes: 6 additions & 2 deletions onnxmltools/convert/sparkml/ops_input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,12 @@ def build_io_name_map():
lambda model: [model.getOrDefault("predictionCol")],
),
"pyspark.ml.feature.ImputerModel": (
lambda model: model.getOrDefault("inputCols"),
lambda model: model.getOrDefault("outputCols"),
lambda model: model.getOrDefault("inputCols")
if model.isSet("inputCols")
else [model.getOrDefault("inputCol")],
lambda model: model.getOrDefault("outputCols")
if model.isSet("outputCols")
else [model.getOrDefault("outputCol")],
),
"pyspark.ml.feature.MaxAbsScalerModel": (
lambda model: [model.getOrDefault("inputCol")],
Expand Down
14 changes: 10 additions & 4 deletions tests/sparkml/test_imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class TestSparkmlImputer(SparkMlTestCase):
def test_imputer_single(self):
self._imputer_test_single()

@unittest.skipIf(True, reason="Name:'Split' Status Message: Cannot split using values in 'split")
@unittest.skipIf(sys.version_info < (3, 8),
reason="pickle fails on python 3.7")
def test_imputer_multi(self):
Expand All @@ -52,13 +51,20 @@ def _imputer_test_multi(self):

# run the model
predicted = model.transform(data)
expected = predicted.select("out_a", "out_b").toPandas().values.astype(numpy.float32)

expected = {
"out_a": predicted.select("out_a").toPandas().values.astype(numpy.int64),
"out_b": predicted.select("out_b").toPandas().values.astype(numpy.int64),
}

data_np = data.toPandas().values.astype(numpy.float32)
data_np = {'a': data_np[:, :1], 'b': data_np[:, 1:]}
paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlImputerMulti")
onnx_model_path = paths[-1]
output, output_shapes = run_onnx_model(['out_a', 'out_b'], data_np, onnx_model_path)
compare_results(expected, output, decimal=5)
output_names = ['out_a', 'out_b']
output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path)
actual_output = dict(zip(output_names, output))
compare_results(expected, actual_output, decimal=5)

def _imputer_test_single(self):
data = self.spark.createDataFrame([
Expand Down

0 comments on commit 175aee0

Please sign in to comment.