Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(duckdb/pyspark): load data from duckdb into pyspark #8440

Closed
1 task done
lostmygithubaccount opened this issue Feb 24, 2024 · 9 comments
Closed
1 task done

feat(duckdb/pyspark): load data from duckdb into pyspark #8440

lostmygithubaccount opened this issue Feb 24, 2024 · 9 comments
Labels
feature Features or general enhancements pyspark The Apache PySpark backend

Comments

@lostmygithubaccount
Copy link
Member

What happened?

trying to create example data in a PySpark connection and running into errors

repro:

[ins] In [1]: import ibis

[ins] In [2]: from pyspark.sql import SparkSession
         ...:
         ...: spark = SparkSession \
         ...:     .builder \
         ...:     .appName("PySpark2") \
         ...:     .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/24 13:42:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[ins] In [3]: con = ibis.pyspark.connect(spark)

[ins] In [4]: t = ibis.examples.penguins.fetch()

[ins] In [5]: con.create_table("penguins", t)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[5], line 1
----> 1 con.create_table("penguins", t)

File ~/repos/ibis/ibis/backends/pyspark/__init__.py:395, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
    393     with self._active_database(database):
    394         self._run_pre_execute_hooks(table)
--> 395         df = self._session.sql(query)
    396         df.write.saveAsTable(name, format=format, mode=mode)
    397 elif schema is not None:

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1631, in SparkSession.sql(self, sqlQuery, args, **kwargs)
   1627         assert self._jvm is not None
   1628         litArgs = self._jvm.PythonUtils.toArray(
   1629             [_to_java_column(lit(v)) for v in (args or [])]
   1630         )
-> 1631     return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
   1632 finally:
   1633     if len(kwargs) > 0:

File ~/repos/ibis/venv/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `penguins` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 3 pos 5;
'Project [*]
+- 'UnresolvedRelation [penguins], [], false

try with to_pyarrow():

[ins] In [6]: con.create_table("penguins", t.to_pyarrow())
---------------------------------------------------------------------------
PySparkTypeError                          Traceback (most recent call last)
Cell In[6], line 1
----> 1 con.create_table("penguins", t.to_pyarrow())

File ~/repos/ibis/ibis/backends/pyspark/__init__.py:394, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
    392 mode = "overwrite" if overwrite else "error"
    393 with self._active_database(database):
--> 394     self._run_pre_execute_hooks(table)
    395     df = self._session.sql(query)
    396     df.write.saveAsTable(name, format=format, mode=mode)

File ~/repos/ibis/ibis/backends/__init__.py:877, in BaseBackend._run_pre_execute_hooks(self, expr)
    875 self._define_udf_translation_rules(expr)
    876 self._register_udfs(expr)
--> 877 self._register_in_memory_tables(expr)

File ~/repos/ibis/ibis/backends/sql/__init__.py:195, in SQLBackend._register_in_memory_tables(self, expr)
    193 def _register_in_memory_tables(self, expr: ir.Expr) -> None:
    194     for memtable in expr.op().find(ops.InMemoryTable):
--> 195         self._register_in_memory_table(memtable)

File ~/repos/ibis/ibis/backends/pyspark/__init__.py:242, in Backend._register_in_memory_table(self, op)
    240 def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
    241     schema = PySparkSchema.from_ibis(op.schema)
--> 242     df = self._session.createDataFrame(data=op.data.to_frame(), schema=schema)
    243     df.createOrReplaceTempView(op.name)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.createDataFrame(self, data, schema, samplingRatio, verifySchema)
   1436     data = pd.DataFrame(data, columns=column_names)
   1438 if has_pandas and isinstance(data, pd.DataFrame):
   1439     # Create a DataFrame from pandas DataFrame.
-> 1440     return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
   1441         data, schema, samplingRatio, verifySchema
   1442     )
   1443 return self._create_dataframe(
   1444     data, schema, samplingRatio, verifySchema  # type: ignore[arg-type]
   1445 )

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py:363, in SparkConversionMixin.createDataFrame(self, data, schema, samplingRatio, verifySchema)
    361             raise
    362 converted_data = self._convert_from_pandas(data, schema, timezone)
--> 363 return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1485, in SparkSession._create_dataframe(self, data, schema, samplingRatio, verifySchema)
   1483     rdd, struct = self._createFromRDD(data.map(prepare), schema, samplingRatio)
   1484 else:
-> 1485     rdd, struct = self._createFromLocal(map(prepare, data), schema)
   1486 assert self._jvm is not None
   1487 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1090, in SparkSession._createFromLocal(self, data, schema)
   1088 # make sure data could consumed multiple times
   1089 if not isinstance(data, list):
-> 1090     data = list(data)
   1092 if schema is None or isinstance(schema, (list, tuple)):
   1093     struct = self._inferSchemaFromList(data, names=schema)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1459, in SparkSession._create_dataframe.<locals>.prepare(obj)
   1457 @no_type_check
   1458 def prepare(obj):
-> 1459     verify_func(obj)
   1460     return obj

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2187, in _make_type_verifier.<locals>.verify(obj)
   2185 def verify(obj: Any) -> None:
   2186     if not verify_nullability(obj):
-> 2187         verify_value(obj)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2160, in _make_type_verifier.<locals>.verify_struct(obj)
   2150         raise PySparkValueError(
   2151             error_class="LENGTH_SHOULD_BE_THE_SAME",
   2152             message_parameters={
   (...)
   2157             },
   2158         )
   2159     for v, (_, verifier) in zip(obj, verifiers):
-> 2160         verifier(v)
   2161 elif hasattr(obj, "__dict__"):
   2162     d = obj.__dict__

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2187, in _make_type_verifier.<locals>.verify(obj)
   2185 def verify(obj: Any) -> None:
   2186     if not verify_nullability(obj):
-> 2187         verify_value(obj)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2094, in _make_type_verifier.<locals>.verify_long(obj)
   2092 def verify_long(obj: Any) -> None:
   2093     assert_acceptable_types(obj)
-> 2094     verify_acceptable_types(obj)
   2095     if obj < -9223372036854775808 or obj > 9223372036854775807:
   2096         raise PySparkValueError(
   2097             error_class="VALUE_OUT_OF_BOUND",
   2098             message_parameters={
   (...)
   2103             },
   2104         )

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2006, in _make_type_verifier.<locals>.verify_acceptable_types(obj)
   2003 def verify_acceptable_types(obj: Any) -> None:
   2004     # subclass of them can not be fromInternal in JVM
   2005     if type(obj) not in _acceptable_types[_type]:
-> 2006         raise PySparkTypeError(
   2007             error_class="CANNOT_ACCEPT_OBJECT_IN_TYPE",
   2008             message_parameters={
   2009                 "data_type": str(dataType),
   2010                 "obj_name": str(obj),
   2011                 "obj_type": type(obj).__name__,
   2012             },
   2013         )

PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `LongType()` can not accept object `181.0` in type `float`.

What version of ibis are you using?

main

What backend(s) are you using, if any?

duckdb + pyspark

Relevant log output

see above

Code of Conduct

  • I agree to follow this project's Code of Conduct
@lostmygithubaccount lostmygithubaccount added bug Incorrect behavior inside of ibis pyspark The Apache PySpark backend labels Feb 24, 2024
@cpcloud
Copy link
Member

cpcloud commented Feb 24, 2024

We don't support this kind of cross backend data loading, not even with two different instances of the same backend.

Don't we already have an issue for this?

@cpcloud
Copy link
Member

cpcloud commented Feb 24, 2024

If you can add a use case that would also be helpful. It's not trivial to make this work, so having some rationale might help justify the effort.

@lostmygithubaccount
Copy link
Member Author

I just ran into this in getting demo data for #8090, I don't consider that worth this effort. the overall issue is in #8115, which I do think is worth prioritizing. similar to #8426 I think it adds a lot of value to be able to easily (and ideally efficiently) move data cross all the systems Ibis supports for a few use cases:

  • data migrations from one system to another
  • data duplication for different purposes (e.g. move Postgres data into Snowflake for analytics; move Snowflake to DuckDB local for rapid data science prototyping; move DuckDB back into Snowflake for long-term storage)
  • aiding in benchmarking across backends (put the same data in them, test out for your use case w/ a single API)

for this issue, feel free to just close in favor of #8115, though I also think it'd be good to have a better error message here ("Error: cannot transfer data between DuckDB and PySpark backends") -- I'm not sure how difficult detecting and adding that is

for the purposes of the tutorial I'll just write to CSV/Parquet and read from that

@cpcloud
Copy link
Member

cpcloud commented Feb 25, 2024

Converting to a feature request given the above.

@cpcloud cpcloud added feature Features or general enhancements and removed bug Incorrect behavior inside of ibis labels Feb 25, 2024
@cpcloud cpcloud changed the title bug: cannot create table in PySpark from DuckDB feat: load data from duckdb into pyspark Feb 25, 2024
@cpcloud cpcloud changed the title feat: load data from duckdb into pyspark feat(duckdb/pyspark): load data from duckdb into pyspark Feb 25, 2024
@gforsyth
Copy link
Member

I think it adds a lot of value to be able to easily (and ideally efficiently) move data cross all the systems Ibis supports for a few use cases:

I agree. And also, this is a monstrous problem for anything that doesn't have native arrow support. If we plan to try to recreate odo we should first check in with the ibis devs who used to work on it, and then second, make a different plan.

@lostmygithubaccount
Copy link
Member Author

limiting to backends that have native arrow support seems fine to me. perhaps w/ exceptions for postgres and sqlite given how common they are for source data into analytics (idk if they support Arrow but I assume not)

@gforsyth
Copy link
Member

(idk if they support Arrow but I assume not)

they do not. we could accomplish this in the short-term by using duckdb's ATTACH features, although that will make duckdb a dependency of the sqlite and postgres backends (although could be an optional dependency). medium-term, I think we should use ADBC for this.

@lostmygithubaccount
Copy link
Member Author

oh yeah I like using DuckDB for that -- would vote for optional dependency

@jcrist
Copy link
Member

jcrist commented Feb 26, 2024

I think this issue issue is a duplicate of #4800?

Implementation-wise, the common case would be iterating over to_pyarrow_batches() and inserting each batch in turn (possibly inside a transaction so we can roll it back on failure 🤷). The trick here would be exposing fast paths for backends like duckdb that include native support for reading-from/writing-to another backend. AFAICT duckdb is unique among our backends in this ability (it includes native readers/writes for sqlite/mysql/postgres).

I'd vote to close this in favor of #4800, but with a focus on designing #4800 so we can handle the fast path support in duckdb.

@gforsyth gforsyth closed this as not planned Won't fix, can't repro, duplicate, stale Feb 26, 2024
@github-project-automation github-project-automation bot moved this from backlog to done in Ibis planning and roadmap Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements pyspark The Apache PySpark backend
Projects
Archived in project
Development

No branches or pull requests

4 participants