Skip to content

Commit

Permalink
Remove tqdm dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Aug 2, 2019
1 parent d251e48 commit 7446608
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 109 deletions.
2 changes: 1 addition & 1 deletion pandarallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '1.3.0'
__version__ = '1.3.1'

from .pandarallel import pandarallel
150 changes: 121 additions & 29 deletions pandarallel/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,68 @@
from time import time_ns
from ctypes import c_int64
from multiprocessing import Manager
import pyarrow.plasma as plasma
import pandas as pd
from pathos.multiprocessing import ProcessingPool
from .utils import parallel, chunk
from .utils import (parallel, chunk, ProgressBarsConsole,
ProgressBarsNotebookLab)

REFRESH_PROGRESS_TIME = int(2.5e8) # 250 ms


class DataFrame:
@staticmethod
def worker_apply(worker_args):
(plasma_store_name, object_id, axis_chunk, func, progress_bar, args,
kwargs) = worker_args
(plasma_store_name, object_id, axis_chunk, func, progress_bar, queue,
index, args, kwargs) = worker_args

axis = kwargs.get("axis", 0)
client = plasma.connect(plasma_store_name)
df = client.get(object_id)
apply_func = "progress_apply" if progress_bar else "apply"

counter = c_int64(0)
last_push_time_ns = c_int64(time_ns())

def with_progress(func):
def decorator(*args, **kwargs):
counter.value += 1

current_time_ns = time_ns()
delta = current_time_ns - last_push_time_ns.value

if delta >= REFRESH_PROGRESS_TIME:
queue.put_nowait((index, counter.value, False))
last_push_time_ns.value = current_time_ns

return func(*args, **kwargs)

return decorator

axis = kwargs.get("axis", 0)
func_to_apply = with_progress(func) if progress_bar else func

if axis == 1:
if progress_bar:
# This following print is a workaround for this issue:
# https://github.com/tqdm/tqdm/issues/485
print(' ', end='', flush=True)
res = getattr(df[axis_chunk], apply_func)(func, *args, **kwargs)
res = df[axis_chunk].apply(func_to_apply, *args, **kwargs)
else:
chunk = slice(0, df.shape[0]), df.columns[axis_chunk]
res = getattr(df.loc[chunk], apply_func)(func, *args, **kwargs)
res = df.loc[chunk].apply(func_to_apply, *args, **kwargs)

if progress_bar:
queue.put((index, counter.value, True))

return client.put(res)

@staticmethod
def apply(plasma_store_name, nb_workers, plasma_client,
progress_bar=False):
display_progress_bar, in_notebook_lab):
@parallel(plasma_client)
def closure(df, func, *args, **kwargs):
pool = ProcessingPool(nb_workers)
manager = Manager()
queue = manager.Queue()

ProgressBars = (ProgressBarsNotebookLab if in_notebook_lab
else ProgressBarsConsole)

axis = kwargs.get("axis", 0)
if axis == 'index':
axis = 0
Expand All @@ -41,17 +72,33 @@ def closure(df, func, *args, **kwargs):
opposite_axis = 1 - axis
chunks = chunk(df.shape[opposite_axis], nb_workers)

maxs = [chunk.stop - chunk.start for chunk in chunks]
values = [0] * nb_workers
finished = [False] * nb_workers

if display_progress_bar:
progress_bar = ProgressBars(maxs)

object_id = plasma_client.put(df)

workers_args = [(plasma_store_name, object_id, chunk, func,
progress_bar, args, kwargs) for chunk in chunks]
display_progress_bar, queue, index, args, kwargs)
for index, chunk in enumerate(chunks)]

result_workers = pool.amap(DataFrame.worker_apply, workers_args)

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(DataFrame.worker_apply, workers_args)
if display_progress_bar:
while not all(finished):
for _ in range(finished.count(False)):
index, value, status = queue.get()
values[index] = value
finished[index] = status

progress_bar.update(values)

result = pd.concat([
plasma_client.get(result_worker)
for result_worker in result_workers
for result_worker in result_workers.get()
], copy=False)

return result
Expand All @@ -60,39 +107,84 @@ def closure(df, func, *args, **kwargs):
@staticmethod
def worker_applymap(worker_args):
(plasma_store_name, object_id, axis_chunk, func,
progress_bar) = worker_args
progress_bar, queue, index) = worker_args

client = plasma.connect(plasma_store_name)
df = client.get(object_id)
applymap_func = "progress_applymap" if progress_bar else "applymap"
nb_columns_1 = df.shape[1] + 1

counter = c_int64(0)
last_push_time_ns = c_int64(time_ns())

def with_progress(func):
def decorator(arg):
counter.value += 1

current_time_ns = time_ns()
delta = current_time_ns - last_push_time_ns.value

if(delta >= REFRESH_PROGRESS_TIME):
if(counter.value % nb_columns_1 == 0):
queue.put_nowait((index,
counter.value // nb_columns_1,
False))
last_push_time_ns.value = current_time_ns

return func(arg)

return decorator

func_to_apply = with_progress(func) if progress_bar else func

res = df[axis_chunk].applymap(func_to_apply)

if progress_bar:
# This following print is a workaround for this issue:
# https://github.com/tqdm/tqdm/issues/485
print(' ', end='', flush=True)
res = getattr(df[axis_chunk], applymap_func)(func)
row_counter = counter.value // nb_columns_1
queue.put((index, row_counter, True))

return client.put(res)

@staticmethod
def applymap(plasma_store_name, nb_workers, plasma_client,
progress_bar=False):
display_progress_bar, in_notebook_lab):
@parallel(plasma_client)
def closure(df, func):
pool = ProcessingPool(nb_workers)
manager = Manager()
queue = manager.Queue()

ProgressBars = (ProgressBarsNotebookLab if in_notebook_lab
else ProgressBarsConsole)

chunks = chunk(df.shape[0], nb_workers)

maxs = [chunk.stop - chunk.start for chunk in chunks]
values = [0] * nb_workers
finished = [False] * nb_workers

if display_progress_bar:
progress_bar = ProgressBars(maxs)

object_id = plasma_client.put(df)

worker_args = [(plasma_store_name, object_id, chunk, func,
progress_bar)
for chunk in chunks]
display_progress_bar, queue, index)
for index, chunk in enumerate(chunks)]

result_workers = pool.amap(DataFrame.worker_applymap, worker_args)

if display_progress_bar:
while not all(finished):
for _ in range(finished.count(False)):
index, value, status = queue.get()
values[index] = value
finished[index] = status

with ProcessingPool(nb_workers) as pool:
result_workers = pool.map(DataFrame.worker_applymap,
worker_args)
progress_bar.update(values)

result = pd.concat([
plasma_client.get(result_worker)
for result_worker in result_workers
for result_worker in result_workers.get()
], copy=False)

return result
Expand Down
2 changes: 1 addition & 1 deletion pandarallel/dataframe_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def worker(worker_args):
return client.put(result)

@staticmethod
def apply(plasma_store_name, nb_workers, plasma_client):
def apply(plasma_store_name, nb_workers, plasma_client, _1, _2):
@parallel(plasma_client)
def closure(df_grouped, func, *args, **kwargs):
groups = list(df_grouped.groups.items())
Expand Down
35 changes: 15 additions & 20 deletions pandarallel/pandarallel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import pandas as pd
import pyarrow.plasma as plasma
import multiprocessing as multiprocessing
from tqdm._tqdm_notebook import tqdm_notebook as tqdm_notebook
from tqdm import tqdm

from .dataframe import DataFrame
from .series import Series
Expand All @@ -16,7 +14,7 @@
PROGRESS_BAR = False


def is_jupyter_notebook_or_lab():
def is_notebook_lab():
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
Expand Down Expand Up @@ -51,7 +49,7 @@ def initialize(cls, shm_size_mb=SHM_SIZE_MB, nb_workers=NB_WORKERS,
progress_bar: bool, optional
Display a progress bar
WARNING: Progress bar is an experimental feature.
This can lead to a considerable performance loss.
This can lead to a considerable performance loss.
verbose: int, optional
If verbose >= 2, display all logs
Expand All @@ -61,13 +59,11 @@ def initialize(cls, shm_size_mb=SHM_SIZE_MB, nb_workers=NB_WORKERS,
if progress_bar:
print("WARNING: Progress bar is an experimental feature. This \
can lead to a considerable performance loss.")
if is_jupyter_notebook_or_lab():
tqdm_notebook().pandas()
else:
tqdm.pandas()

verbose_store = verbose >= 2

i_am_in_notebook_lab = is_notebook_lab()

if hasattr(cls, "proc"):
cls.proc.kill()

Expand All @@ -80,20 +76,19 @@ def initialize(cls, shm_size_mb=SHM_SIZE_MB, nb_workers=NB_WORKERS,

plasma_client = plasma.connect(plasma_store_name)

args = plasma_store_name, nb_workers, plasma_client
args = (plasma_store_name, nb_workers, plasma_client, progress_bar,
i_am_in_notebook_lab)

pd.DataFrame.parallel_apply = DataFrame.apply(*args, progress_bar)
pd.DataFrame.parallel_applymap = DataFrame.applymap(
*args, progress_bar)
pd.DataFrame.parallel_apply = DataFrame.apply(*args)
pd.DataFrame.parallel_applymap = DataFrame.applymap(*args)

pd.Series.parallel_map = Series.map(*args, progress_bar)
pd.Series.parallel_apply = Series.apply(*args, progress_bar)
pd.Series.parallel_map = Series.map(*args)
pd.Series.parallel_apply = Series.apply(*args)

pd.core.window.Rolling.parallel_apply = SeriesRolling.apply(
*args, progress_bar)
pd.core.window.Rolling.parallel_apply = SeriesRolling.apply(*args)

pd.core.groupby.DataFrameGroupBy.parallel_apply = DataFrameGroupBy.apply(
*args)
dfgb_a = DataFrameGroupBy.apply(*args)
pd.core.groupby.DataFrameGroupBy.parallel_apply = dfgb_a

pd.core.window.RollingGroupby.parallel_apply = RollingGroupby.apply(
*args)
rgb_a = RollingGroupby.apply(*args)
pd.core.window.RollingGroupby.parallel_apply = rgb_a
2 changes: 1 addition & 1 deletion pandarallel/rolling_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def worker(worker_args):
return client.put(pd.concat(results))

@staticmethod
def apply(plasma_store_name, nb_workers, plasma_client):
def apply(plasma_store_name, nb_workers, plasma_client, _1, _2):
@parallel(plasma_client)
def closure(rolling_groupby, func, *args, **kwargs):
groups = list(rolling_groupby._groupby.groups.items())
Expand Down
Loading

0 comments on commit 7446608

Please sign in to comment.