Skip to content

Commit

Permalink
Improve DataFrameGroupBy apply
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Mar 24, 2019
1 parent 715bbbd commit 5857664
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 20 deletions.
6 changes: 3 additions & 3 deletions docs/examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@
},
"outputs": [],
"source": [
"df_size = int(6e7)\n",
"df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),\n",
"df_size = int(3e7)\n",
"df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size),\n",
" b=np.random.rand(df_size)))"
]
},
Expand Down Expand Up @@ -234,7 +234,7 @@
},
"outputs": [],
"source": [
"res.equals(res_parallel.squeeze())"
"res.equals(res_parallel)"
]
}
],
Expand Down
36 changes: 20 additions & 16 deletions pandarallel/_pandarallel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as _pd
import numpy as np
import pyarrow.plasma as _plasma
from pyarrow.lib import PlasmaStoreFull as _PlasmaStoreFull
import multiprocessing as _multiprocessing
Expand Down Expand Up @@ -134,31 +135,34 @@ def closure(df, func, *args, **kwargs):

class _DataFrameGroupBy:
@staticmethod
def worker(plasma_store_name, object_id, func):
def worker(plasma_store_name, object_id, keys, func, *args, **kwargs):
client = _plasma.connect(plasma_store_name)
df = client.get(object_id)
return client.put(func(df))
return client.put(df.groupby(keys).apply(func, *args, **kwargs))

@staticmethod
def apply(plasma_store_name, nb_workers, plasma_client):
@_parallel(nb_workers, plasma_client)
def closure(data, func, **kwargs):
keys = data.groups.keys()
def closure(df_grouped, func, *args, **kwargs):
groups = list(df_grouped.groups.values())
keys = df_grouped.keys
slices = _chunk(len(groups), nb_workers)
futures = []

with _ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(_DataFrameGroupBy.worker,
plasma_store_name,
plasma_client.put(data.get_group(key)),
func)
for key in keys
]
for slice_ in slices:
indexes = [index.to_numpy() for index in groups[slice_]]
sub_df = df_grouped.obj.iloc[np.concatenate(indexes)]
object_id = plasma_client.put(sub_df)
future = executor.submit(_DataFrameGroupBy.worker,
plasma_store_name, object_id,
keys, func, *args, **kwargs)
futures.append(future)

result = _pd.DataFrame([
plasma_client.get(future.result())
for future in futures
], index=_pd.Series(list(data.grouper),
name=data.keys))
result = _pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)

return result
return closure
Expand Down
2 changes: 1 addition & 1 deletion tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ def test_dataframe_groupby_apply(plasma_client):
res = df.groupby("a").apply(func_for_dataframe_groupby_apply)
res_parallel = (df.groupby("a")
.parallel_apply(func_for_dataframe_groupby_apply))
res.equals(res_parallel.squeeze())
res.equals(res_parallel)

0 comments on commit 5857664

Please sign in to comment.