Skip to content

Commit

Permalink
Fix #38
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Aug 3, 2019
1 parent 7446608 commit 48e14a3
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pandarallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '1.3.1'
__version__ = '1.3.2'

from .pandarallel import pandarallel
28 changes: 13 additions & 15 deletions pandarallel/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from time import time_ns
from ctypes import c_int64
from time import time
from ctypes import c_double, c_uint64
from multiprocessing import Manager
import pyarrow.plasma as plasma
import pandas as pd
from pathos.multiprocessing import ProcessingPool
from .utils import (parallel, chunk, ProgressBarsConsole,
ProgressBarsNotebookLab)

REFRESH_PROGRESS_TIME = int(2.5e8) # 250 ms
REFRESH_PROGRESS_TIME = 0.25 # s


class DataFrame:
Expand All @@ -19,19 +19,18 @@ def worker_apply(worker_args):
client = plasma.connect(plasma_store_name)
df = client.get(object_id)

counter = c_int64(0)
last_push_time_ns = c_int64(time_ns())
counter = c_uint64(0)
last_push_time = c_double(time())

def with_progress(func):
def decorator(*args, **kwargs):
counter.value += 1

current_time_ns = time_ns()
delta = current_time_ns - last_push_time_ns.value
cur_time = time()

if delta >= REFRESH_PROGRESS_TIME:
if cur_time - last_push_time.value >= REFRESH_PROGRESS_TIME:
queue.put_nowait((index, counter.value, False))
last_push_time_ns.value = current_time_ns
last_push_time.value = cur_time

return func(*args, **kwargs)

Expand Down Expand Up @@ -113,22 +112,21 @@ def worker_applymap(worker_args):
df = client.get(object_id)
nb_columns_1 = df.shape[1] + 1

counter = c_int64(0)
last_push_time_ns = c_int64(time_ns())
counter = c_uint64(0)
last_push_time = c_double(time())

def with_progress(func):
def decorator(arg):
counter.value += 1

current_time_ns = time_ns()
delta = current_time_ns - last_push_time_ns.value
cur_time = time()

if(delta >= REFRESH_PROGRESS_TIME):
if(cur_time - last_push_time.value >= REFRESH_PROGRESS_TIME):
if(counter.value % nb_columns_1 == 0):
queue.put_nowait((index,
counter.value // nb_columns_1,
False))
last_push_time_ns.value = current_time_ns
last_push_time.value = cur_time

return func(arg)

Expand Down
28 changes: 13 additions & 15 deletions pandarallel/series.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from time import time_ns
from ctypes import c_int64
from time import time
from ctypes import c_uint64, c_double
from multiprocessing import Manager
import pyarrow.plasma as plasma
import pandas as pd
from pathos.multiprocessing import ProcessingPool
from .utils import (parallel, chunk, ProgressBarsConsole,
ProgressBarsNotebookLab)

REFRESH_PROGRESS_TIME = int(2.5e8) # 250 ms
REFRESH_PROGRESS_TIME = 0.25 # s


class Series:
Expand All @@ -19,19 +19,18 @@ def worker_map(worker_args):
client = plasma.connect(plasma_store_name)
series = client.get(object_id)

counter = c_int64(0)
last_push_time_ns = c_int64(time_ns())
counter = c_uint64(0)
last_push_time = c_double(time())

def with_progress(func):
def decorator(*args, **kwargs):
counter.value += 1

current_time_ns = time_ns()
delta = current_time_ns - last_push_time_ns.value
cur_time = time()

if delta >= REFRESH_PROGRESS_TIME:
if cur_time - last_push_time.value >= REFRESH_PROGRESS_TIME:
queue.put_nowait((index, counter.value, False))
last_push_time_ns.value = current_time_ns
last_push_time.value = cur_time

return func(*args, **kwargs)

Expand Down Expand Up @@ -100,19 +99,18 @@ def worker_apply(worker_args):
client = plasma.connect(plasma_store_name)
series = client.get(object_id)

counter = c_int64(0)
last_push_time_ns = c_int64(time_ns())
counter = c_uint64(0)
last_push_time = c_double(time())

def with_progress(func):
def decorator(*args, **kwargs):
counter.value += 1

current_time_ns = time_ns()
delta = current_time_ns - last_push_time_ns.value
cur_time = time()

if delta >= REFRESH_PROGRESS_TIME:
if cur_time - last_push_time.value >= REFRESH_PROGRESS_TIME:
queue.put_nowait((index, counter.value, False))
last_push_time_ns.value = current_time_ns
last_push_time.value = cur_time

return func(*args, **kwargs)

Expand Down
17 changes: 8 additions & 9 deletions pandarallel/series_rolling.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from time import time_ns
from ctypes import c_int64
from time import time
from ctypes import c_uint64, c_double
from multiprocessing import Manager
import pyarrow.plasma as plasma
import pandas as pd
from pathos.multiprocessing import ProcessingPool
from .utils import (parallel, chunk, ProgressBarsConsole,
ProgressBarsNotebookLab)

REFRESH_PROGRESS_TIME = int(2.5e8) # 250 ms
REFRESH_PROGRESS_TIME = 0.25 # s


class SeriesRolling:
Expand All @@ -19,19 +19,18 @@ def worker(worker_args):
client = plasma.connect(plasma_store_name)
series = client.get(object_id)

counter = c_int64(0)
last_push_time_ns = c_int64(time_ns())
counter = c_uint64(0)
last_push_time = c_double(time())

def with_progress(func):
def decorator(*args, **kwargs):
counter.value += 1

current_time_ns = time_ns()
delta = current_time_ns - last_push_time_ns.value
cur_time = time()

if delta >= REFRESH_PROGRESS_TIME:
if cur_time - last_push_time.value >= REFRESH_PROGRESS_TIME:
queue.put_nowait((index, counter.value, False))
last_push_time_ns.value = current_time_ns
last_push_time.value = cur_time

return func(*args, **kwargs)

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

setup(
name='pandarallel',
version='1.3.1',
version='1.3.2',
python_requires='>=3.5',
packages=find_packages(),
author='Manu NALEPA',
author_email='[email protected]',
description='An easy to use library to speed up computation (by parallelizing on multi CPUs) with pandas.',
long_description='See https://github.com/nalepae/pandarallel/tree/v1.3.1 for complete user guide.',
long_description='See https://github.com/nalepae/pandarallel/tree/v1.3.2 for complete user guide.',
url='https://github.com/nalepae/pandarallel',
install_requires=install_requires,
license='BSD',
Expand Down

0 comments on commit 48e14a3

Please sign in to comment.