Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncTransport: eagerly process queue on stop() #888

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions opencensus/common/transports/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,17 @@ def start(self):
# auto-collection.
execution_context.set_is_exporter(True)
self._thread.start()
atexit.register(self._export_pending_data)
atexit.register(self.stop)

def stop(self):
"""Signals the background thread to stop.

This does not terminate the background thread. It simply queues the
stop signal. If the main process exits before the background thread
stop signal, and tells the background thread to immediately consume any
remaining items. If the main process exits before the background thread
processes the stop signal, it will be terminated without finishing
work. The ``grace_period`` parameter will give the background
thread some time to finish processing before this function returns.
work. The ``grace_period`` parameter will give the background thread
some time to finish processing before this function returns.

:rtype: bool
:returns: True if the thread terminated. False if the thread is still
Expand All @@ -165,21 +166,15 @@ def stop(self):

with self._lock:
self._queue.put_nowait(_WORKER_TERMINATOR)
# Stop blocking between export batches
self._event.set()
self._thread.join(timeout=self._grace_period)

success = not self.is_alive
self._thread = None

return success

def _export_pending_data(self):
"""Callback that attempts to send pending data before termination."""
if not self.is_alive:
return
# Stop blocking between export batches
self._event.set()
self.stop()

def enqueue(self, data):
"""Queues data to be written by the background thread."""
self._queue.put_nowait(data)
Expand All @@ -198,15 +193,15 @@ class AsyncTransport(base.Transport):

:type grace_period: float
:param grace_period: The amount of time to wait for pending data to
be submitted when the process is shutting down.
be submitted when the process is shutting down (sec).

:type max_batch_size: int
:param max_batch_size: The maximum number of items to send at a time
in the background thread.

:type wait_period: int
:param wait_period: The amount of time to wait before sending the next
batch of data.
batch of data (sec).
"""

def __init__(self, exporter,
Expand All @@ -227,5 +222,20 @@ def export(self, data):
self.worker.enqueue(data)

def flush(self):
"""Submit any pending traces/stats."""
"Submit any pending traces/stats, blocking up to `wait_period` secs."
self.worker.flush()

def stop(self):
"""
Submit any pending traces/stats and shut down the transport.

Blocks for up to `grace_period` secs. Unlike :meth:`~.flush`, any
pending traces are immediately processed, instead of waiting for the
wait period to end. Once called, any subsequent calls to
:meth:`~.export` will be silently dropped.

:rtype: bool
:returns: True if the thread terminated. False if the thread is still
running.
"""
self.worker.stop()
22 changes: 6 additions & 16 deletions tests/unit/common/transports/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_start(self):
self.assertEqual(worker._thread._target, worker._thread_main)
self.assertEqual(
worker._thread._name, async_._WORKER_THREAD_NAME)
mock_atexit.assert_called_once_with(worker._export_pending_data)
mock_atexit.assert_called_once_with(worker.stop)

cur_thread = worker._thread
self._start_worker(worker)
Expand All @@ -70,6 +70,7 @@ def test_stop(self):

worker.stop()

self.assertTrue(worker._event.is_set())
self.assertEqual(worker._queue.qsize(), 1)
self.assertEqual(
worker._queue.get(), async_._WORKER_TERMINATOR)
Expand All @@ -79,35 +80,24 @@ def test_stop(self):
# If thread not alive, do not stop twice.
worker.stop()

def test__export_pending_data(self):
exporter = mock.Mock()
worker = async_._Worker(exporter)

self._start_worker(worker)
worker._export_pending_data()

self.assertFalse(worker.is_alive)

worker._export_pending_data()

def test__export_pending_data_non_empty_queue(self):
def test_stop_non_empty_queue(self):
exporter = mock.Mock()
worker = async_._Worker(exporter)

self._start_worker(worker)
worker.enqueue(mock.Mock())
worker._export_pending_data()
worker.stop()

self.assertFalse(worker.is_alive)

def test__export_pending_data_did_not_join(self):
def test_stop_did_not_join(self):
exporter = mock.Mock()
worker = async_._Worker(exporter)

self._start_worker(worker)
worker._thread._terminate_on_join = False
worker.enqueue(mock.Mock())
worker._export_pending_data()
worker.stop()

self.assertFalse(worker.is_alive)

Expand Down