Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to support PyArrow backed UDFs in Comet natively? #957

Open
SemyonSinchenko opened this issue Sep 21, 2024 · 4 comments
Open
Labels
enhancement New feature or request

Comments

@SemyonSinchenko
Copy link
Member

SemyonSinchenko commented Sep 21, 2024

What is the problem the feature request solves?

Spark provide multiple ways to run arrow-backed UDFs. The current 3.5 supports mapInArrow, in the future 4.0 there will be also applyInArrow.

My understanding of how it works in Spark under the hood is quite limited, so correct me if I'm wrong. At the moment, if Spark see in the plan PythonMapInArrow it will internaly do a conversion from rows to arrow-batches that should be a columnar representation of the data:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala#L36

That is a minimal example of running mapInArrow in Spark 3.4:

import pandas as pd
import pyarrow as pa
from pyspark.sql import SparkSession, types as T


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    iris = pd.read_csv("https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/iris.csv")
    iris.to_parquet("iris.parquet", index=False)

    def arrow_fun(arrow_batch: pa.RecordBatch) -> pa.RecordBatch:
        pdf = arrow_batch.to_pandas()
        pdf["avg_length"] = (pdf["sepal_length"] + pdf["petal_length"]) / 2

        return pa.RecordBatch.from_pandas(pdf)

    schema = T.StructType(
        [
            T.StructField("sepal_length", T.DoubleType()),
            T.StructField("sepal_width", T.DoubleType()),
            T.StructField("petal_length", T.DoubleType()),
            T.StructField("petal_width", T.DoubleType()),
            T.StructField("species", T.StringType()),
            T.StructField("avg_length", T.DoubleType()),
        ]
    )

    
    test_data = spark.read.parquet("iris.parquet")
    new_data = test_data.mapInArrow(arrow_fun, schema)
    new_data.explain(mode="extended")

If I try to run it with Comet enabled it will generate the following physical plan:

PythonMapInArrow arrow_fun(sepal_length#0, sepal_width#1, petal_length#2, petal_width#3, species#4)#10, [sepal_length#11, sepal_width#12, petal_length#13, petal_width#14,species#15, avg_length#16]
+- *(1) ColumnarToRow
   +- CometScan parquet [sepal_length#0,sepal_width#1,petal_length#2,petal_width#3,species#4] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/var/home/sem/github/tmp/iris.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<sepal_length:double,sepal_width:double,petal_length:double,petal_width:double,species:string>

If I understand it right, the following happens:

  1. Comet read parquet file in columnar form
  2. Comet do conversion of arrow-backed columnar data to row-oriented JVM data
  3. Spark do internal conversion of row-oriented JVM data to arrow-backed columnar data
  4. This columnar data is passed to PythonMapInArrow

It seems to me that points 2-3 are redundant and the arrow batches that are required for mapInArrow can be created directly from the Comet arrow-backed columns and this operation should be a kind of zero-copy... And actually the back conversion from spark columnar batch to comet columnar batch may zero-copy too, so in theory Comet does not need to make a fallback to spark in this case, right?

Describe the potential solution

I do no know an exact solution. It is mostly a question.

Additional context

I'm willing to implement it by myself, I'm ready to work on it. But I need a guidance and help with an overall design of how it should be done (if it is feasible).

The native support of arrow-backed UDFs opens a lot of new cool ways of using Comet. I see that it can gain a huge boost for most of ML/MLOps tasks that are typically done in Spark via arrow-backed UDFs (pandas, polars, pyarrow or even rust code built with maturin to a python-module).

@SemyonSinchenko SemyonSinchenko added the enhancement New feature or request label Sep 21, 2024
@andygrove
Copy link
Member

Thanks for writing this up @SemyonSinchenko. Your reasoning seems sound to me, and I agree that is would be quite a unique and powerful feature for Comet. I am not sure how much work it will be, but I am happy to offer guidance (and will probably need some help from @viirya as well). I will start getting up to speed on Spark's PythonMapInArrowExec.

@SemyonSinchenko
Copy link
Member Author

I took a look on the spark code and that is what I found:

  1. Conversion from rows to arrow batches is done in org.apache.spark.sql.execution.python.BasicPythonArrowInput
  2. Back conversion from arrow batches to rows is done org.apache.spark.sql.execution.python.BasicPythonArrowOutput

Both are private[python] in spark. As I can realize BaseArrowPythonRunner expects InternalRow as input, converts it to arrow, and returns org.apache.spark.sql.vectorized.ColumnarBatch.

What am I thinking now is about implementing CometArrowPythonRunner extends BasePythonRunner[CometColumnarBatch, CometColumnarBatch]. CometColumnarBatch here is just a Scala wrapper over comet memory. There are a lot of configs that are handled by Spark, like spark.pyspark.driver.python and spark.pyspark.python. These configs define which python interpreter is used and also what is included to PYTHONPATH: that allows users to add own dependencies to python vectorized UDFs. In the case when execution will be in Rust, all these configs should be handled by Comet including managing python virtual environments, collecting python metrics, etc.

So, I see a possible solution the following:

  • Implement CometColumnarBatch extends InternalRow that wraps comet data without copy
  • Add an ability for child in the plan to request the CometColumnarBatch from parent and add an ability for comet nodes to work with CometColumnarBatch
  • Implement CometArrowPythonRunner extends BasePythonRunner[CometColumnarBatch, CometColumnarBatch] with the only difference in how input and output are processed
  • Implement MapInBatchExecComet that implements doExecute and internally creates a CometArrowPythonRunner instead of ArrowPythonRunner
  • Add a rule to comet plugin that replace MapInBatchExec by MapInBatchExecComet if possible and without fallback to spark

It seems to me that re-using how spark handle python UDFs would be easier than implementing it from scratch using datafusion. But I'm not 100% sure.

@andygrove
Copy link
Member

It seems to me that re-using how spark handle python UDFs would be easier than implementing it from scratch using datafusion. But I'm not 100% sure.

Yes, that is the approach I would take.

Your high level plan sounds good to me.

@SemyonSinchenko
Copy link
Member Author

Your high level plan sounds good to me.

Cool, thank you! I will start working on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants