Skip to content

Commit

Permalink
Implement Series.rolling.parallel_apply
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Mar 31, 2019
1 parent e49f478 commit 25f4921
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 16 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ An easy to use library to speed up computation (by parallelizing on multi CPUs)
sensitive performance loss.
Not available for `DataFrameGroupy.parallel_apply`.

With `df` a pandas DataFrame, `series` a pandas Series, `col_name` the name of
a pandas Dataframe column & `func` a function to apply/map:

| Without parallelisation | With parallelisation |
| ---------------------------------- | ------------------------------------------- |
| `df.apply(func)` | `df.parallel_apply(func)` |
| `df.applymap(func)` | `df.parallel_applymap(func)` |
| `series.map(func)` | `series.parallel_map(func)` |
| `series.apply(func)` | `series.parallel_apply(func)` |
| `df.groupby(col_name).apply(func)` | `df.groupby(col_name).parallel_apply(func)` |
With `df` a pandas DataFrame, `series` a pandas Series & `func` a function to
apply/map:

| Without parallelisation | With parallelisation |
| ------------------------------------ | --------------------------------------------- |
| `df.apply(func)` | `df.parallel_apply(func)` |
| `df.applymap(func)` | `df.parallel_applymap(func)` |
| `df.groupby(<args>).apply(func)` | `df.groupby(<args>).parallel_apply(func)` |
| `series.map(func)` | `series.parallel_map(func)` |
| `series.apply(func)` | `series.parallel_apply(func)` |
| `series.rolling(<args>).apply(func)` | `series.rolling(<args>).parallel_apply(func)` |
57 changes: 57 additions & 0 deletions docs/examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,63 @@
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Series.rolling.apply"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_size = int(1e6)\n",
"df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),\n",
" b=list(range(df_size))))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def func(x):\n",
" return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res = df.b.rolling(4).apply(func, raw=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res_parallel = df.b.rolling(4).parallel_apply(func, raw=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
2 changes: 1 addition & 1 deletion pandarallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '0.1.5'
__version__ = '0.1.6'

from ._pandarallel import pandarallel
74 changes: 70 additions & 4 deletions pandarallel/_pandarallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
NB_WORKERS = _multiprocessing.cpu_count()
PROGRESS_BAR = False

def _chunk(nb_item, nb_chunks):
def _chunk(nb_item, nb_chunks, start_offset=0):
"""
Return `nb_chunks` slices of approximatively `nb_item / nb_chunks` each.
Expand All @@ -24,6 +24,9 @@ def _chunk(nb_item, nb_chunks):
nb_chunks : int
Number of chunks to return
start_offset : int
Shift start of slice by this amount
Returns
-------
A list of slices
Expand All @@ -37,7 +40,10 @@ def _chunk(nb_item, nb_chunks):
slice(78, 103, None)]
"""
if nb_item <= nb_chunks:
return [slice(idx, idx + 1) for idx in range(nb_item)]
return [
slice(max(0, idx - start_offset), idx + 1)
for idx in range(nb_item)
]

quotient = nb_item // nb_chunks
remainder = nb_item % nb_chunks
Expand All @@ -49,13 +55,14 @@ def _chunk(nb_item, nb_chunks):
quotient + remainder for quotient, remainder
in zip(quotients, remainders)
]

accumulated = list(_itertools.accumulate(nb_elems_per_chunk))
shifted_accumulated = accumulated.copy()
shifted_accumulated.insert(0, 0)
shifted_accumulated.pop()

return [
slice(begin, end) for begin, end
slice(max(0, begin - start_offset), end) for begin, end
in zip(shifted_accumulated, accumulated)
]

Expand Down Expand Up @@ -260,7 +267,7 @@ def closure(data, arg, **kwargs):

@staticmethod
def worker_apply(plasma_store_name, object_id, chunk, func,
progress_bar, *args, **kwargs):
progress_bar, *args, **kwargs):
client = _plasma.connect(plasma_store_name)
series = client.get(object_id)

Expand Down Expand Up @@ -300,6 +307,62 @@ def closure(series, func, *args, **kwargs):
return result
return closure

class _SeriesRolling:
@staticmethod
def worker(plasma_store_name, num, object_id, window, min_periods, center,
win_type, on, axis, closed, chunk, func, progress_bar,
*args, **kwargs):
client = _plasma.connect(plasma_store_name)
series = client.get(object_id)

apply_func = "progress_apply" if progress_bar else "apply"

if progress_bar:
# This following print is a workaround for this issue:
# https://github.com/tqdm/tqdm/issues/485
print(' ', end='', flush=True)

series_chunk_rolling = series[chunk].rolling(window, min_periods,
center, win_type, on,
axis, closed)

res = getattr(series_chunk_rolling, apply_func)(func, *args, **kwargs)

res = res if num == 0 else res[window:]

return client.put(res)

@staticmethod
def apply(plasma_store_name, nb_workers, plasma_client, progress_bar):
@_parallel(nb_workers, plasma_client)
def closure(rolling, func, *args, **kwargs):
series = rolling.obj
window = rolling.window
chunks = _chunk(len(series), nb_workers, window)
object_id = plasma_client.put(series)

with _ProcessPoolExecutor(max_workers=nb_workers) as executor:
futures = [
executor.submit(_SeriesRolling.worker,
plasma_store_name, num, object_id,
rolling.window,
rolling.min_periods,
rolling.center, rolling.win_type,
rolling.on, rolling.axis,
rolling.closed,
chunk, func, progress_bar,
*args, **kwargs)
for num, chunk in enumerate(chunks)
]

result = _pd.concat([
plasma_client.get(future.result())
for future in futures
], copy=False)

return result
return closure


class pandarallel:
@classmethod
Expand Down Expand Up @@ -342,4 +405,7 @@ def initialize(cls, shm_size_mo=SHM_SIZE_MO, nb_workers=NB_WORKERS,

_pd.Series.parallel_map = _Series.map(*args, progress_bar)
_pd.Series.parallel_apply = _Series.apply(*args, progress_bar)

_pd.core.window.Rolling.parallel_apply = _SeriesRolling.apply(*args, progress_bar)

_pd.core.groupby.DataFrameGroupBy.parallel_apply = _DataFrameGroupBy.apply(*args)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name='pandarallel',
version='0.1.5',
version='0.1.6',
packages=find_packages(),
author='Manu NALEPA',
author_email='[email protected]',
Expand Down
14 changes: 14 additions & 0 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def func_for_series_map(x):
def func_for_series_apply(x, power, bias=0):
return math.log10(math.sqrt(math.exp(x**power))) + bias

def func_for_series_rolling_apply(x):
return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

def func_for_dataframe_groupby_apply(df):
dum = 0
for item in df.b:
Expand Down Expand Up @@ -76,6 +79,17 @@ def test_series_apply(plasma_client):
bias=3)
assert res.equals(res_parallel)

def test_series_rolling_apply(plasma_client):
df_size = int(1e2)
df = _pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
b=list(range(df_size))))

res = df.b.rolling(4).apply(func_for_series_rolling_apply, raw=False)
res_parallel = df.b.rolling(4).parallel_apply(func_for_series_rolling_apply,
raw=False)

assert res.equals(res_parallel)

def test_dataframe_groupby_apply(plasma_client):
df_size = int(1e1)
df = _pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
Expand Down

0 comments on commit 25f4921

Please sign in to comment.