Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed AzureLogHandler with multiple processes. #1158

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ def __init__(self, **options):
def _export(self, batch, event=None): # pragma: NO COVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an entry to the CHANGELOG

try:
if batch:
envelopes = [self.log_record_to_envelope(x) for x in batch]
envelopes = self.apply_telemetry_processors(envelopes)
envelopes = self.apply_telemetry_processors(batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work because apply_telemetry_processors is expecting an Envelope data type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does work; the batch is a list of Envelope data type objects as I converted them already before putting them on the queue.

result = self._transmit(envelopes)
# Only store files if local storage enabled
if self.storage:
Expand Down Expand Up @@ -110,7 +109,11 @@ def createLock(self):
self.lock = None

def emit(self, record):
self._queue.put(record, block=False)
# Convert the raw LogRecord to an envelope before putting it on the
# queue as a LogRecord object is not serializable, while an Envelope
# object is.
envelope = self.log_record_to_envelope(record)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are performance implications of doing this. We don't want to be mapping every time logger.X() is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is indeed a performance loss here. However it is not possible to put a raw LogRecord on the multiprocessing.Queue as it pickles the object.

self._queue.put(envelope, block=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does object need to be serializable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to share messages/resources between different processes, the multiprocessing.Queue pickles the object so that it can be piped: https://github.com/python/cpython/blob/3.10/Lib/multiprocessing/queues.py#L244


def log_record_to_envelope(self, record):
raise NotImplementedError # pragma: NO COVER
Expand Down
3 changes: 2 additions & 1 deletion opencensus/common/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from six.moves import queue

import logging
import multiprocessing
import threading
import time

Expand Down Expand Up @@ -82,7 +83,7 @@ class QueueExitEvent(QueueEvent):
class Queue(object):
def __init__(self, capacity):
self.EXIT_EVENT = QueueExitEvent('EXIT')
self._queue = queue.Queue(maxsize=capacity)
self._queue = multiprocessing.Queue(maxsize=capacity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue.Queue was created to work in concurrent environments spawned with the threading module, which is the behavior of the Azure exporters. With that being said, we probably shouldn't replace the default queue that is being used, since the original use case is supposed to be for the concurrent environments. I suggest adding the ability for the user to configure which type of queue they want by adding an option here. Then you can create a different type queue accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does work with threading, however in an environment where multiple processes are being used (see linked bug reports), the queue is not being shared between the different processes. Therefore a multiprocessing.Queue is required to get this to work.


def _gets(self, count, timeout):
start_time = time.time()
Expand Down