diff --git a/README.md b/README.md index 5a43bab..da35f2a 100644 --- a/README.md +++ b/README.md @@ -91,13 +91,14 @@ An easy to use library to speed up computation (by parallelizing on multi CPUs) sensitive performance loss. Not available for `DataFrameGroupy.parallel_apply`. - With `df` a pandas DataFrame, `series` a pandas Series, `col_name` the name of -a pandas Dataframe column & `func` a function to apply/map: - - | Without parallelisation | With parallelisation | - | ---------------------------------- | ------------------------------------------- | - | `df.apply(func)` | `df.parallel_apply(func)` | - | `df.applymap(func)` | `df.parallel_applymap(func)` | - | `series.map(func)` | `series.parallel_map(func)` | - | `series.apply(func)` | `series.parallel_apply(func)` | - | `df.groupby(col_name).apply(func)` | `df.groupby(col_name).parallel_apply(func)` | + With `df` a pandas DataFrame, `series` a pandas Series & `func` a function to + apply/map: + + | Without parallelisation | With parallelisation | + | ------------------------------------ | --------------------------------------------- | + | `df.apply(func)` | `df.parallel_apply(func)` | + | `df.applymap(func)` | `df.parallel_applymap(func)` | + | `df.groupby().apply(func)` | `df.groupby().parallel_apply(func)` | + | `series.map(func)` | `series.parallel_map(func)` | + | `series.apply(func)` | `series.parallel_apply(func)` | + | `series.rolling().apply(func)` | `series.rolling().parallel_apply(func)` | diff --git a/docs/examples.ipynb b/docs/examples.ipynb index 23b84b2..596d9af 100644 --- a/docs/examples.ipynb +++ b/docs/examples.ipynb @@ -100,6 +100,63 @@ "res.equals(res_parallel)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Series.rolling.apply" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_size = int(1e6)\n", + "df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),\n", + " b=list(range(df_size))))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def func(x):\n", + " return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res = df.b.rolling(4).apply(func, raw=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res_parallel = df.b.rolling(4).parallel_apply(func, raw=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res.equals(res_parallel)" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/pandarallel/__init__.py b/pandarallel/__init__.py index 1b17152..6e4801e 100644 --- a/pandarallel/__init__.py +++ b/pandarallel/__init__.py @@ -1,3 +1,3 @@ -__version__ = '0.1.5' +__version__ = '0.1.6' from ._pandarallel import pandarallel diff --git a/pandarallel/_pandarallel.py b/pandarallel/_pandarallel.py index 1036811..e99ee05 100644 --- a/pandarallel/_pandarallel.py +++ b/pandarallel/_pandarallel.py @@ -12,7 +12,7 @@ NB_WORKERS = _multiprocessing.cpu_count() PROGRESS_BAR = False -def _chunk(nb_item, nb_chunks): +def _chunk(nb_item, nb_chunks, start_offset=0): """ Return `nb_chunks` slices of approximatively `nb_item / nb_chunks` each. @@ -24,6 +24,9 @@ def _chunk(nb_item, nb_chunks): nb_chunks : int Number of chunks to return + start_offset : int + Shift start of slice by this amount + Returns ------- A list of slices @@ -37,7 +40,10 @@ def _chunk(nb_item, nb_chunks): slice(78, 103, None)] """ if nb_item <= nb_chunks: - return [slice(idx, idx + 1) for idx in range(nb_item)] + return [ + slice(max(0, idx - start_offset), idx + 1) + for idx in range(nb_item) + ] quotient = nb_item // nb_chunks remainder = nb_item % nb_chunks @@ -49,13 +55,14 @@ def _chunk(nb_item, nb_chunks): quotient + remainder for quotient, remainder in zip(quotients, remainders) ] + accumulated = list(_itertools.accumulate(nb_elems_per_chunk)) shifted_accumulated = accumulated.copy() shifted_accumulated.insert(0, 0) shifted_accumulated.pop() return [ - slice(begin, end) for begin, end + slice(max(0, begin - start_offset), end) for begin, end in zip(shifted_accumulated, accumulated) ] @@ -260,7 +267,7 @@ def closure(data, arg, **kwargs): @staticmethod def worker_apply(plasma_store_name, object_id, chunk, func, - progress_bar, *args, **kwargs): + progress_bar, *args, **kwargs): client = _plasma.connect(plasma_store_name) series = client.get(object_id) @@ -300,6 +307,62 @@ def closure(series, func, *args, **kwargs): return result return closure +class _SeriesRolling: + @staticmethod + def worker(plasma_store_name, num, object_id, window, min_periods, center, + win_type, on, axis, closed, chunk, func, progress_bar, + *args, **kwargs): + client = _plasma.connect(plasma_store_name) + series = client.get(object_id) + + apply_func = "progress_apply" if progress_bar else "apply" + + if progress_bar: + # This following print is a workaround for this issue: + # https://github.com/tqdm/tqdm/issues/485 + print(' ', end='', flush=True) + + series_chunk_rolling = series[chunk].rolling(window, min_periods, + center, win_type, on, + axis, closed) + + res = getattr(series_chunk_rolling, apply_func)(func, *args, **kwargs) + + res = res if num == 0 else res[window:] + + return client.put(res) + + @staticmethod + def apply(plasma_store_name, nb_workers, plasma_client, progress_bar): + @_parallel(nb_workers, plasma_client) + def closure(rolling, func, *args, **kwargs): + series = rolling.obj + window = rolling.window + chunks = _chunk(len(series), nb_workers, window) + object_id = plasma_client.put(series) + + with _ProcessPoolExecutor(max_workers=nb_workers) as executor: + futures = [ + executor.submit(_SeriesRolling.worker, + plasma_store_name, num, object_id, + rolling.window, + rolling.min_periods, + rolling.center, rolling.win_type, + rolling.on, rolling.axis, + rolling.closed, + chunk, func, progress_bar, + *args, **kwargs) + for num, chunk in enumerate(chunks) + ] + + result = _pd.concat([ + plasma_client.get(future.result()) + for future in futures + ], copy=False) + + return result + return closure + class pandarallel: @classmethod @@ -342,4 +405,7 @@ def initialize(cls, shm_size_mo=SHM_SIZE_MO, nb_workers=NB_WORKERS, _pd.Series.parallel_map = _Series.map(*args, progress_bar) _pd.Series.parallel_apply = _Series.apply(*args, progress_bar) + + _pd.core.window.Rolling.parallel_apply = _SeriesRolling.apply(*args, progress_bar) + _pd.core.groupby.DataFrameGroupBy.parallel_apply = _DataFrameGroupBy.apply(*args) diff --git a/setup.py b/setup.py index 8aafab5..be687d7 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name='pandarallel', - version='0.1.5', + version='0.1.6', packages=find_packages(), author='Manu NALEPA', author_email='nalepae@gmail.com', diff --git a/tests/test.py b/tests/test.py index ad05d7f..6578d0b 100644 --- a/tests/test.py +++ b/tests/test.py @@ -21,6 +21,9 @@ def func_for_series_map(x): def func_for_series_apply(x, power, bias=0): return math.log10(math.sqrt(math.exp(x**power))) + bias +def func_for_series_rolling_apply(x): + return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4 + def func_for_dataframe_groupby_apply(df): dum = 0 for item in df.b: @@ -76,6 +79,17 @@ def test_series_apply(plasma_client): bias=3) assert res.equals(res_parallel) +def test_series_rolling_apply(plasma_client): + df_size = int(1e2) + df = _pd.DataFrame(dict(a=np.random.randint(1, 8, df_size), + b=list(range(df_size)))) + + res = df.b.rolling(4).apply(func_for_series_rolling_apply, raw=False) + res_parallel = df.b.rolling(4).parallel_apply(func_for_series_rolling_apply, + raw=False) + + assert res.equals(res_parallel) + def test_dataframe_groupby_apply(plasma_client): df_size = int(1e1) df = _pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),