Skip to content

Commit

Permalink
Add support of lambda functions
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Jul 23, 2019
1 parent 8c9ed60 commit d251e48
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 120 deletions.
19 changes: 1 addition & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,9 @@

- Linux or macOS (Windows is not supported at the moment)

## Warnings
## Warning

- Parallelization has a cost (instanciating new processes, sending data via shared memory, etc ...), so parallelization is efficiant only if the amount of calculation to parallelize is high enough. For very little amount of data, using parallezation not always worth it.
- Functions applied should NOT be lambda functions.

```python
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
return sin(x**2)

df.parallel_apply(func, axis=1)
```

## Examples

Expand Down
2 changes: 1 addition & 1 deletion pandarallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '1.2.0'
__version__ = '1.3.0'

from .pandarallel import pandarallel
54 changes: 28 additions & 26 deletions pandarallel/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import pyarrow.plasma as plasma
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from pathos.multiprocessing import ProcessingPool
from .utils import parallel, chunk


class DataFrame:
@staticmethod
def worker_apply(plasma_store_name, object_id, axis_chunk, func,
progress_bar, *args, **kwargs):
def worker_apply(worker_args):
(plasma_store_name, object_id, axis_chunk, func, progress_bar, args,
kwargs) = worker_args

axis = kwargs.get("axis", 0)
client = plasma.connect(plasma_store_name)
df = client.get(object_id)
Expand Down Expand Up @@ -40,26 +43,25 @@ def closure(df, func, *args, **kwargs):

object_id = plasma_client.put(df)

with ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(DataFrame.worker_apply,
plasma_store_name, object_id,
chunk, func, progress_bar,
*args, **kwargs)
for chunk in chunks
]
workers_args = [(plasma_store_name, object_id, chunk, func,
progress_bar, args, kwargs) for chunk in chunks]

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(DataFrame.worker_apply, workers_args)

result = pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)
plasma_client.get(result_worker)
for result_worker in result_workers
], copy=False)

return result
return closure

@staticmethod
def worker_applymap(plasma_store_name, object_id, axis_chunk, func,
progress_bar):
def worker_applymap(worker_args):
(plasma_store_name, object_id, axis_chunk, func,
progress_bar) = worker_args

client = plasma.connect(plasma_store_name)
df = client.get(object_id)
applymap_func = "progress_applymap" if progress_bar else "applymap"
Expand All @@ -80,18 +82,18 @@ def closure(df, func):
chunks = chunk(df.shape[0], nb_workers)
object_id = plasma_client.put(df)

with ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(DataFrame.worker_applymap,
plasma_store_name, object_id,
chunk, func, progress_bar)
for index, chunk in enumerate(chunks)
]
worker_args = [(plasma_store_name, object_id, chunk, func,
progress_bar)
for chunk in chunks]

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(DataFrame.worker_applymap,
worker_args)

result = pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)
plasma_client.get(result_worker)
for result_worker in result_workers
], copy=False)

return result
return closure
36 changes: 19 additions & 17 deletions pandarallel/dataframe_groupby.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import pyarrow.plasma as plasma
import pandas as pd
import itertools
from concurrent.futures import ProcessPoolExecutor
from pathos.multiprocessing import ProcessingPool
from .utils import parallel, chunk


class DataFrameGroupBy:
@staticmethod
def worker(plasma_store_name, object_id, groups_id, chunk,
func, *args, **kwargs):
def worker(worker_args):
(plasma_store_name, object_id, groups_id, chunk, func, args,
kwargs) = worker_args

client = plasma.connect(plasma_store_name)
df = client.get(object_id)
groups = client.get(groups_id)[chunk]
result = [
func(df.iloc[indexes], *args, **kwargs)
for _, indexes in groups
func(df.iloc[indexes], *args, **kwargs)
for _, indexes in groups
]

return client.put(result)
Expand All @@ -27,13 +30,12 @@ def closure(df_grouped, func, *args, **kwargs):
object_id = plasma_client.put(df_grouped.obj)
groups_id = plasma_client.put(groups)

with ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(DataFrameGroupBy.worker,
plasma_store_name, object_id,
groups_id, chunk, func, *args, **kwargs)
for chunk in chunks
]
workers_args = [(plasma_store_name, object_id, groups_id, chunk,
func, args, kwargs) for chunk in chunks]

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(
DataFrameGroupBy.worker, workers_args)

if len(df_grouped.grouper.shape) == 1:
# One element in "by" argument
Expand All @@ -52,10 +54,10 @@ def closure(df_grouped, func, *args, **kwargs):
names=df_grouped.keys)

result = pd.DataFrame(list(itertools.chain.from_iterable([
plasma_client.get(future.result())
for future in futures
])),
index=index
).squeeze()
plasma_client.get(result_worker)
for result_worker in result_workers
])),
index=index
).squeeze()
return result
return closure
28 changes: 15 additions & 13 deletions pandarallel/rolling_groupby.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import pyarrow.plasma as plasma
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from pathos.multiprocessing import ProcessingPool
from .utils import parallel, chunk


class RollingGroupby:
@staticmethod
def worker(plasma_store_name, object_id, groups_id, attribute2value, chunk,
func, *args, **kwargs):
def worker(worker_args):
(plasma_store_name, object_id, groups_id, attribute2value, chunk,
func, args, kwargs) = worker_args

client = plasma.connect(plasma_store_name)
df = client.get(object_id)
groups = client.get(groups_id)[chunk]
Expand Down Expand Up @@ -34,18 +37,17 @@ def closure(rolling_groupby, func, *args, **kwargs):
attribute2value = {attribute: getattr(rolling_groupby, attribute)
for attribute in rolling_groupby._attributes}

with ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(RollingGroupby.worker, plasma_store_name,
object_id, groups_id, attribute2value,
chunk, func, *args, **kwargs)
for chunk in chunks
]
worker_args = [(plasma_store_name, object_id, groups_id,
attribute2value, chunk, func, args, kwargs)
for chunk in chunks]

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(RollingGroupby.worker, worker_args)

result = pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)
plasma_client.get(result_worker)
for result_worker in result_workers
], copy=False)

return result
return closure
55 changes: 27 additions & 28 deletions pandarallel/series.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import pyarrow.plasma as plasma
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from pathos.multiprocessing import ProcessingPool
from .utils import parallel, chunk


class Series:
@staticmethod
def worker_map(plasma_store_name, object_id, chunk, arg, progress_bar,
**kwargs):
def worker_map(worker_args):
(plasma_store_name, object_id, chunk, arg, progress_bar,
kwargs) = worker_args

client = plasma.connect(plasma_store_name)
series = client.get(object_id)

Expand All @@ -28,26 +31,25 @@ def closure(data, arg, **kwargs):
chunks = chunk(data.size, nb_workers)
object_id = plasma_client.put(data)

with ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(Series.worker_map,
plasma_store_name, object_id,
_chunk, arg, progress_bar,
**kwargs)
for _chunk in chunks
]
workers_args = [(plasma_store_name, object_id, chunk, arg,
progress_bar, kwargs) for chunk in chunks]

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(Series.worker_map, workers_args)

result = pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)
plasma_client.get(result_worker)
for result_worker in result_workers
], copy=False)

return result
return closure

@staticmethod
def worker_apply(plasma_store_name, object_id, chunk, func,
progress_bar, *args, **kwargs):
def worker_apply(worker_args):
(plasma_store_name, object_id, chunk, func,
progress_bar, args, kwargs) = worker_args

client = plasma.connect(plasma_store_name)
series = client.get(object_id)

Expand All @@ -62,27 +64,24 @@ def worker_apply(plasma_store_name, object_id, chunk, func,

return client.put(res)


@staticmethod
def apply(plasma_store_name, nb_workers, plasma_client, progress_bar):
@parallel(plasma_client)
def closure(series, func, *args, **kwargs):
chunks = chunk(series.size, nb_workers)
object_id = plasma_client.put(series)

with ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(Series.worker_apply,
plasma_store_name, object_id,
chunk, func, progress_bar,
*args, **kwargs)
for chunk in chunks
]
workers_args = [(plasma_store_name, object_id,
chunk, func, progress_bar,
args, kwargs) for chunk in chunks]

with ProcessingPool(nb_workers) as pool:
results_workers = pool.map(Series.worker_apply, workers_args)

result = pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)
plasma_client.get(result_worker)
for result_worker in results_workers
], copy=False)

return result
return closure
31 changes: 16 additions & 15 deletions pandarallel/series_rolling.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import pyarrow.plasma as plasma
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from pathos.multiprocessing import ProcessingPool
from .utils import parallel, chunk


class SeriesRolling:
@staticmethod
def worker(plasma_store_name, num, object_id, attribute2value, chunk, func,
progress_bar, *args, **kwargs):
def worker(worker_args):
(plasma_store_name, num, object_id, attribute2value, chunk, func,
progress_bar, args, kwargs) = worker_args

client = plasma.connect(plasma_store_name)
series = client.get(object_id)

Expand Down Expand Up @@ -37,20 +40,18 @@ def closure(rolling, func, *args, **kwargs):
attribute2value = {attribute: getattr(rolling, attribute)
for attribute in rolling._attributes}

with ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(SeriesRolling.worker,
plasma_store_name, num, object_id,
attribute2value,
chunk, func, progress_bar,
*args, **kwargs)
for num, chunk in enumerate(chunks)
]
workers_args = [(plasma_store_name, num, object_id,
attribute2value, chunk, func, progress_bar, args,
kwargs)
for num, chunk in enumerate(chunks)]

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(SeriesRolling.worker, workers_args)

result = pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)
plasma_client.get(result_worker)
for result_worker in result_workers
], copy=False)

return result
return closure
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
'pandas',
'pyarrow >= 0.12.1',
'tqdm >= 4.31.1',
'pathos >= 0.2.4'
]

setup(
name='pandarallel',
version='1.2.0',
version='1.3.0',
python_requires='>=3.5',
packages=find_packages(),
author='Manu NALEPA',
author_email='[email protected]',
description='An easy to use library to speed up computation (by parallelizing on multi CPUs) with pandas.',
long_description='See https://github.com/nalepae/pandarallel/tree/v1.2.0 for complete user guide.',
long_description='See https://github.com/nalepae/pandarallel/tree/v1.3.0 for complete user guide.',
url='https://github.com/nalepae/pandarallel',
install_requires=install_requires,
license='BSD',
Expand Down

0 comments on commit d251e48

Please sign in to comment.