Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comparison fails on dataframes with a single column #253

Open
rupertbarton opened this issue Dec 13, 2023 · 9 comments
Open

Comparison fails on dataframes with a single column #253

rupertbarton opened this issue Dec 13, 2023 · 9 comments
Labels
bug Something isn't working spark

Comments

@rupertbarton
Copy link

When running a comparison on dataframes with a single column, the following exception is thrown:

/opt/venv/lib/python3.8/site-packages/datacompy/spark.py:356: in rows_both_mismatch
    self._merge_dataframes()
/opt/venv/lib/python3.8/site-packages/datacompy/spark.py:462: in _merge_dataframes
    self._all_rows_mismatched = self.spark.sql(mismatch_query).orderBy(
/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py:1440: in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
a = ('xro111', <py4j.clientserver.JavaClient object at 0x7f7ae6a7cd90>, 'o32', 'sql')
kw = {}, converted = ParseException()
    def deco(*a: Any, **kw: Any) -> Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
>               raise converted from None
E               pyspark.errors.exceptions.captured.ParseException: 
E               [PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 36)
E               
E               == SQL ==
E               SELECT * FROM matched_table A WHERE 
E               ------------------------------------^^^
/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175: ParseException

I believe it's due to this line always including a WHERE statement even when the where_cond is empty (empty because there are no columns apart from the join column).

@fdosani
Copy link
Member

fdosani commented Dec 13, 2023

Thanks for the report. Would you be able to provide a minimal example so we can reproduce the issue? That would be really helpful to debug here.

@rupertbarton
Copy link
Author

Thanks for your reply, here is a sample of some code that throws an exception for us:

df_1 = spark.createDataFrame([{"a": 1}])
df_2 = spark.createDataFrame([{"a": 1}])

compare = datacompy.SparkCompare(
    spark,
    df_1,
    df_2,
    join_columns=["a"],
    cache_intermediates=True,
)

compare.rows_both_mismatch.count()

@rupertbarton
Copy link
Author

The error message is:

---------------------------------------------------------------------------
ParseException                            Traceback (most recent call last)
Cell In[7], line 12
      2 df_2 = spark.createDataFrame([{"a": 1}])
      4 compare = datacompy.SparkCompare(
      5     spark,
      6     df_1,
   (...)
      9     cache_intermediates=True,
     10 )
---> 12 compare.rows_both_mismatch.count()

File /opt/venv/lib/python3.8/site-packages/datacompy/spark.py:356, in SparkCompare.rows_both_mismatch(self)
    354 """pyspark.sql.DataFrame: Returns all rows in both dataframes that have mismatches"""
    355 if self._all_rows_mismatched is None:
--> 356     self._merge_dataframes()
    358 return self._all_rows_mismatched

File /opt/venv/lib/python3.8/site-packages/datacompy/spark.py:462, in SparkCompare._merge_dataframes(self)
    458 where_cond = " OR ".join(
    459     ["A." + name + "_match= False" for name in self.columns_compared]
    460 )
    461 mismatch_query = """SELECT * FROM matched_table A WHERE {}""".format(where_cond)
--> 462 self._all_rows_mismatched = self.spark.sql(mismatch_query).orderBy(
    463     self._join_column_names
    464 )

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
   1438 try:
   1439     litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440     return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
   1441 finally:
   1442     if len(kwargs) > 0:

File /opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
    171 converted = convert_exception(e.java_exception)
    172 if not isinstance(converted, UnknownException):
    173     # Hide where the exception came from that shows a non-Pythonic
    174     # JVM exception message.
--> 175     raise converted from None
    176 else:
    177     raise

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 36)

== SQL ==
SELECT * FROM matched_table A WHERE 
------------------------------------^^^

@fdosani
Copy link
Member

fdosani commented Dec 13, 2023

Thanks for the details. So it seems like I can reproduce it. I also tried using Pandas and Fugue:

Pandas

df_1 = pd.DataFrame([{"a": 1}])
df_2 = pd.DataFrame([{"a": 1}])

compare = datacompy.Compare(df_1, df_2, join_columns=["a"])
print(compare.report())

Results in:

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        1     1
1       df2        1     1

Column Summary
--------------

Number of columns in common: 1
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: a
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 1
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 0

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 1

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 0

Fugue

df_1 = spark.createDataFrame([{"a": 1}])
df_2 = spark.createDataFrame([{"a": 1}])

print(datacompy.report(df_1, df_2, join_columns=["a"]))

Results in:

DataComPy Comparison                                                            
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        1     1
1       df2        1     1

Column Summary
--------------

Number of columns in common: 1
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: a
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 1
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 0

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 1

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 0

We should align the Spark with Pandas and Fugue.

@rupertbarton Would you be open to using Fugue for your Spark Compare for now? you should be able to run it successfully. I'll need to debug the native Spark compare. I have been debating if we should just remove it in favor of using Fugue moving forward.

@rupertbarton
Copy link
Author

I'll create a ticket in our backlog to investigate switching over, thanks!

@fdosani fdosani added the bug Something isn't working label Dec 14, 2023
@fdosani
Copy link
Member

fdosani commented Dec 14, 2023

@rupertbarton more for my understanding but could you articulate what sort of use case you have where you are just joining on a single column with nothing else to compare?

@fdosani
Copy link
Member

fdosani commented Dec 14, 2023

@jdawang @rupertbarton I have a WIP fix here

Getting the following back:

In [2]: print(compare.report())

****** Column Summary ******
Number of columns in common with matching schemas: 1
Number of columns in common with schema differences: 0
Number of columns in base but not compare: 0
Number of columns in compare but not base: 0

****** Row Summary ******
Number of rows in common: 2
Number of rows in base but not compare: 0
Number of rows in compare but not base: 0
Number of duplicate rows found in base: 0
Number of duplicate rows found in compare: 0

****** Row Comparison ******
Number of rows with some columns unequal: 0
Number of rows with all columns equal: 2

****** Column Comparison ******
Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 0
None

Seems like the Column Comparison is different than the Pandas version. I think this is mostly due to the difference in the underlying logic. In Pandas it would say: Number of columns compared with all values equal: 1.

I can kind of see this both ways. This corner case is just a bit odd cause you aren't really comparing anything, just joining on the key (a).

@jdawang Another reason why I'm thinking maybe we just drop the native Spark implementation. The differences are annoying.

@rupertbarton
Copy link
Author

@rupertbarton more for my understanding but could you articulate what sort of use case you have where you are just joining on a single column with nothing else to compare?

Hi! Our use case is that we have a large number of tables we are running assertions on, and all of them work fine apart from 1 particular table. This table has multiple columns, but all of the columns apart from 1 are being encrypted so we're excluding them from the comparison as it's awkward trying to work out what the encrypted values will be, hence why the DF only has a single column.

We still would want to compare that all the values in the expected DF and all the values in the actual DF match up, and we're using the same code for every table.

@jdawang jdawang added the spark label Dec 18, 2023
@ramakanth1997
Copy link

Hi everyone I guess this will work for you
print(compare.report(sample_count=1000))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working spark
Projects
None yet
Development

No branches or pull requests

4 participants