Skip to content

Commit

Permalink
Support results_queue_size parameter in make_batch_reader api (#783)
Browse files Browse the repository at this point in the history
* Support results_queue_size parameter in make_batch_reader api

* Test results_queue_size is propagated properly to the workers pool

* Update release notes

* Reformat unit test
  • Loading branch information
s-udhaya authored Dec 13, 2022
1 parent 170b22a commit 1319771
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Release notes
Release 0.12.1 (unreleased)
===========================
- `PR 777 <https://github.com/uber/petastorm/pull/777>`_: Remove ``LocalDiskArrowTableCache`` class as it was using deprecated pyarrow serialization API. Speed up ``LocalDiskCache`` by using the highest pickle protocol in cache serialization.
- `PR 783 <https://github.com/uber/petastorm/pull/783>`_: Support results_queue_size parameter in make_batch_reader API

Release 0.12.0
===========================
Expand Down
5 changes: 4 additions & 1 deletion petastorm/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def make_reader(dataset_url,
def make_batch_reader(dataset_url_or_urls,
schema_fields=None,
reader_pool_type='thread', workers_count=10,
results_queue_size=50,
seed=None, shuffle_rows=False,
shuffle_row_groups=True, shuffle_row_drop_partitions=1,
predicate=None,
Expand Down Expand Up @@ -243,6 +244,8 @@ def make_batch_reader(dataset_url_or_urls,
denoting a thread pool, process pool, or running everything in the master thread. Defaults to 'thread'
:param workers_count: An int for the number of workers to use in the reader pool. This only is used for the
thread or process pool. Defaults to 10
:param results_queue_size: Size of the results queue to store prefetched row-groups. Currently only applicable to
thread reader pool type.
:param seed: Random seed specified for shuffle and sharding with reproducible outputs. Defaults to None
:param shuffle_rows: Whether to shuffle inside a single row group. Defaults to False.
:param shuffle_row_groups: Whether to shuffle row groups (the order in which full row groups are read)
Expand Down Expand Up @@ -312,7 +315,7 @@ def make_batch_reader(dataset_url_or_urls,
raise ValueError('Unknown cache_type: {}'.format(cache_type))

if reader_pool_type == 'thread':
reader_pool = ThreadPool(workers_count)
reader_pool = ThreadPool(workers_count, results_queue_size)
elif reader_pool_type == 'process':
serializer = ArrowTableSerializer()
reader_pool = ProcessPool(workers_count, serializer, zmq_copy_buffers=zmq_copy_buffers)
Expand Down
8 changes: 8 additions & 0 deletions petastorm/tests/test_parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,11 @@ def test_random_seed(scalar_dataset):
results.append(actual_row_ids)
# Shuffled results are expected to be same
np.testing.assert_equal(results[0], results[1])


def test_results_queue_size_propagation_in_make_batch_reader(scalar_dataset):
expected_results_queue_size = 42
with make_batch_reader(scalar_dataset.url, reader_pool_type='thread',
results_queue_size=expected_results_queue_size) as batch_reader:
actual_results_queue_size = batch_reader._workers_pool._results_queue_size
assert actual_results_queue_size == expected_results_queue_size

0 comments on commit 1319771

Please sign in to comment.