-
Notifications
You must be signed in to change notification settings - Fork 143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QST] How to omit the Dataset.shuffle_by_keys
step when exporting data from BigQuery to parquet
#1862
Comments
Moreover, I don't understand why "The row group memory size of the parquet files should be smaller than the part_size that you set for the NVTabular dataset such as nvt.Dataset(TRAIN_DIR, engine="parquet", part_size="1000MB").". This is mentioned in the Troubleshooting section of the docs. The |
Unfortunately, I cannot help much with BigQuery. There may be a way to use
There is not much you can do in import dask
import dask.dataframe as dd
with dask.config.set({"dataframe.backend": "cudf"}):
ddf = dd.read_parquet(
path, blocksize="512MB", aggregate_files=True
).sort_values("session_id")
dataset = Dataset(ddf) Note that
The |
Thank you! Let me repeat in my own words (bullet points 😃) and ask some clarification questions to make sure I understand correctly:
EDIT:
Thanks a lot! |
These are good questions @piojanu. They are actually a bit tricky to answer completely, but I'll do my best :) Some general background (I'm sorry if I am mostly repeating things that you already know):
A When you (or NVTabular) needs the real data for a specific partition, the underlying task graph needed to generate that partition will be executed by scheduling the necessary compute tasks on worker processes (or threads). It is only when one of these tasks needs to move remote parquet data into local memory that dask will actually do so. In this sense, dask does not copy GCS data into a local dask space for later processing, but will attempt to stream the data to the worker processes as it is needed. How is If we were to implement In order to support the For datasets containing large parquet files with many uniformly-sized row-groups per file, the above approach tends to work just fine. However, there are certainly cases were the partition sizes will end up being much larger or much smaller than The other problem with So, now that we have some background out of the way:
Neither
They are loaded lazily. The data needed for a specific partition is moved into memory when the corresponding compute task is executing on a worker.
Correct. Shuffling is well-known to be slow and memory intensive, and so it is something you want to avoid at all costs. The crazy hive-partitioning logic is just Merlin jumping through hoops to avoid a conventional shuffle.
Before shuffling your data, you can always call
It is mostly a wrapper around the
Hopefully I’ve provided some background here, but feel free to ask further questions. Since this is mostly a high-level interface to Dask-DataFrame, I’t may be useful to look through the dask documentation a bit. |
This is an amazing explanation, just what I needed to understand how the I think this explanation should be a part of the troubleshooting section of the docs. To fix problems described there, the user shall know how this stuff works. Do you want me to prepare a PR that adds your explanation there? |
I always welcome contributions to the documentation :) Thanks for engaging @piojanu ! |
Hi! After some time of working around my problems described in this and other issues (#1863 and #1864), I've come to the conclusion, that I still don't understand: What is the proper way to save the dataset before feature engineering to not have to call |
@piojanu - Thank you for sharing your experiences and raising issues! First the bad news: You are absolutely correct that the Merlin/NVTabular Historical Context: Early NVT/Merlin use cases did not require this kind of shuffling support, and the general assumption was that some other ETL engine or pure Dask/Dask-cuDF code would be a better fit for those kinds of workflows anyway. The early motivation for NVTabular was to provide a high-level API for common pre-processing and feature-engineering tasks that are "embarrassingly parallel" at The good (ish?) news: It is possible to use Your best bet is to write your data in such a way that each Parquet file already contains unique values for the columns you need. If you used # If `output_files` is set to the original partition count, Merlin
# will map each dask partition to a distinct Parquet file.
ds.to_parquet("/shuffled/path", output_files=ds.npartitions) I realize that the documentation doesn't make it clear that you can use Once your data is written out so that each Parquet file already contains unique values for the grouping columns of interest, you can specify an unrealistically large # Using an oversized `part_size` value will give you a 1:1 file-to-partition mapping
shuffled_ds = Dataset("/shuffled/path", engine="parquet", part_size="100GB") Since your "shuffled" files should each be much smaller than "100GB", Again, I realize that the documentation does not establish a clear mechanism to achieve a 1:1 file-to-partition mapping. In hindsight, this is very unfortunate! What about hive partitioning?: You may be wondering if you can use In order to avoid the small-partition problem, I suggest that you pass in Why do I need to write my shuffled data back to disk before applying Although you don't *need to write your shuffled data back to disk, you are much more likely to see OOM errors if you don't. Recall that |
Thank you for your detailed answer! One thing I don't understand: If we set "an unrealistically large part_size when you read back", won't we get OOM because Dask will try to load possibly bigger than memory data into one DataFrame? AFAC Dask under the hood works in such a way, that one partition is one Pandas/cuDF DataFrame. What am I missing? |
Hi!
Level of expertise
Note that I'm not a Parquet/Dask expert. I DO know what the hybrid layout is (row groups etc.) but I DON'T really know the internals of CUDA Dask (how data is loaded from parquet into partitions, how scheduling is done, etc.)
Context
I do some data transformations in BigQuery before exporting the data to the parquet using this query:
Then, in NVTabular, I run the
GroupBy
op which groups by thesession_id
column and sorts by thetimestamp
column. However, I need to doDataset.shuffle_by_keys
first, because I ran into the problem described in your docs (data not moved between partitions etc.).Question
This shuffling by keys takes time. What can I do so it isn't required? My ideas:
I know you're not Google, but maybe you can direct me further, I wasn't able to find appropriate parameters.
Dataset
creation?Again, I even dinged into the source code, but I really got lost in e.g. what are the
ParquetDatasetEngine
params doing.Thanks!
Piotr
The text was updated successfully, but these errors were encountered: