Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AzureExporter not working with multiprocessing #928

Open
dpgrodriguez opened this issue Jul 14, 2020 · 22 comments
Open

AzureExporter not working with multiprocessing #928

dpgrodriguez opened this issue Jul 14, 2020 · 22 comments
Assignees
Labels
azure Microsoft Azure bug

Comments

@dpgrodriguez
Copy link

Describe your environment.
MacOS 10.14.6
Python 3.7.5
opencensus-ext-azure==1.0.4
opencensus-ext-requests==0.7.3

Steps to reproduce.
I have code that I want to monitor the dependency calls to Azure DevOps APIs. Our code is running multiprocessing using the Process class. When the exporter is ran outside of multiprocessing, it sends telemetry to App Insights. When ran inside a multiprocessing Process, it doesn't. I added a callback to print the spandata and it doesn't get called when using Process.

from azure.devops.connection import Connection
from msrest.authentication import BasicAuthentication

from multiprocessing import Process, Pool, Queue
from base_insights import BaseInsights
from opencensus.common.runtime_context import RuntimeContext


class TestInsights:
    def __init__(self):
        self.tracer = BaseInsights.tracer

    def process(self):
       
        procs = []
        organization_url = 'https://dev.azure.com/org'
        credentials = BasicAuthentication('', '')
        
        p1 = Process(target=self.my_loop, args=[organization_url, credentials])
        p1.start()
        p1.join()

    def my_loop(self, organization_url, credentials, parent_span=None):
      
        with self.tracer.span(name='TestLoopProcessThreadingInside'):

            connection = Connection(base_url=organization_url, creds=credentials)
            core_client = connection.clients.get_core_client()

            org = core_client.get_project_collection("test")


TestInsights().process()

BaseInsights:

import os

from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.trace import file_exporter

from opencensus.trace import config_integration
from opencensus.trace.samplers import ProbabilitySampler, AlwaysOnSampler
from opencensus.trace.tracer import Tracer

config_integration.trace_integrations(['requests'])


def singleton(cls):
    return cls()


@singleton
class BaseInsights:
    def __init__(self):
        exporter = AzureExporter()
        exporter.add_telemetry_processor(self.callback_function)

        self.tracer = Tracer(exporter=exporter, sampler=AlwaysOnSampler())


    def callback_function(self, envelope):
        print(envelope)

What is the expected behavior?
Span data gets sent to Application Insights

What is the actual behavior?
Span data is not sent to Application Insights

@aabmass aabmass added the azure Microsoft Azure label Jul 14, 2020
@lzchen
Copy link
Contributor

lzchen commented Jul 23, 2020

@dpgrodriguez
I created a more simplified version of your first code snippet by taking out the calls to devops API. The span datas are printed fine and I can see the dependencies in App Insights. I think the process might be exiting too quickly for the exporter to actually export the data. The default export interval is 15seconds. You can try modifying that or setting a delay within your new process.

exporter = AzureExporter(export_interval=5.0)
import time
from multiprocessing import Process
from base_insights import BaseInsights


class TestInsights:
    def __init__(self):
        self.tracer = BaseInsights.tracer

def my_loop():
    
    with BaseInsights.tracer.span(name='TestLoopProcessThreadingInside'):
        print("inside")
        time.sleep(10)

if __name__ == '__main__':
    with BaseInsights.tracer.span(name='TestOutside'):
        print("outside")
        time.sleep(10)
    p1 = Process(target=my_loop)
    p1.start()
    p1.join()

@dpgrodriguez
Copy link
Author

dpgrodriguez commented Jul 24, 2020

Hi Leighton,

I tried your suggestion but still cannot send span data from inside multiprocess. I used your exact code and this is what was displayed:

outside
{...redacted envelope data from outside}
inside

The only span data generated was from "outside".

I also tried removing the parent span from __main__:

if __name__ == '__main__':
#    with BaseInsights.tracer.span(name='TestOutside'):
    print("outside")
    time.sleep(10)
    p1 = Process(target=my_loop)
    p1.start()
    p1.join()

with these results:

outside
inside

The data from inside multiprocessing also wasn't sent to Applicaiton Insights

These are the versions that I'm using:
opencensus==0.7.10
opencensus-context==0.1.1
opencensus-ext-azure==1.0.4
opencensus-ext-requests==0.7.3
opencensus-ext-threading==0.1.2

@lzchen
Copy link
Contributor

lzchen commented Jul 24, 2020

Did you set the export_interval of the exporter? The whole execution from start to finish should take around 20 seconds.

@dpgrodriguez
Copy link
Author

Yes, tried setting it to 5 and 1.

@dpgrodriguez
Copy link
Author

Any other recommendations?

@lzchen
Copy link
Contributor

lzchen commented Aug 4, 2020

Perhaps it's the way you are setting up BaseInsights? I simplified it even further. Try to see if it works for you.

import time
from multiprocessing import Process
from opencensus.trace.samplers import AlwaysOnSampler
from opencensus.trace.tracer import Tracer
from opencensus.ext.azure.trace_exporter import AzureExporter

exporter = AzureExporter(export_interval=5)
tracer = Tracer(exporter=exporter, sampler=AlwaysOnSampler())

def callback_function(envelope):
    print(envelope)

exporter.add_telemetry_processor(callback_function)

def my_loop():
    
    with tracer.span(name='TestLoopProcessThreadingInside'):
        print("inside")
        time.sleep(10)

def main():
    with tracer.span(name='TestOutside'):
        print("outside")
        time.sleep(10)
        p1 = Process(target=my_loop)
        p1.start()
        p1.join()

if __name__ == '__main__':
    main()

@dpgrodriguez
Copy link
Author

I tried running this code on 4 different environments.

CentOS 7.8 - Python 2.7.5
CentOS 7.8 - Python 3.6.8
macOS 10.14.6 - Python 2.7.16
macOS 10.14.6 - Python 3.7.5

I got the same results. Only INPROC: TestOutside is being logged in Application Insights.

Screen Shot 2020-08-05 at 10 43 46 AM

@dpgrodriguez
Copy link
Author

Is there a way to show lower level debug messages?

@lzchen
Copy link
Contributor

lzchen commented Aug 10, 2020

@dpgrodriguez
I did some debugging and it looks like the multiprocessing module behaves a little differently on Windows then macOC/linux. On Windows, when a new process is created, all threads are copied over into the new address space (including the worker that sends telemetry to Azure Monitor), however when I ran the same code in Linux, only the calling thread was copied. The multiprocessing module uses os.fork() underneath for POSIX systems (which includes linux), and from the documentation of fork: A process shall be created with a single thread. If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space, possibly including the states of mutexes and other resources. Whereas, Windows will spin up an entirely new Process, and tell it to load all the modules again (which in turn creates another AzureExporter and worker).

So the only way to get this working for your OS is to duplicate the initialization logic of your AzureExporter and Tracer into the function that is run in the spawned Process.

@dpgrodriguez
Copy link
Author

dpgrodriguez commented Aug 20, 2020

That's some interesting findings. I did some digging as well.

In BaseExporter (opencensus.ext.azure.common.exporter.py), it looks like the items are actually being passed into the export method and into the queue even inside the multiprocess Process.

    def export(self, items):
        self._queue.puts(items, block=False)  # pragma: NO COVER

However, in the Worker's run method, as it tries to get the batch from the queue, it cannot find the items anymore:

class Worker(threading.Thread):
    daemon = True
...
...

        while True:
            batch = src.gets(dst.max_batch_size, dst.export_interval)

Any thoughts on this? Could it be that the threading Queue is not playing well with multiprocessing?

@lzchen
Copy link
Contributor

lzchen commented Aug 20, 2020

@dpgrodriguez
Yes there is nothing in the queue because the Worker that fills them up doesn't exist.

@dpgrodriguez
Copy link
Author

The worker is actually present but doesn't see the contents of the threading queue because it can't access it. I did some quick PoC of using the multiprocess Queue and was able to send logs to analytics.

from multiprocessing import Queue as MP_Queue
import jsonpickle

...redacted...

    # queue, or shared workers among queues (e.g. queue for traces, queue
    # for logs).
    def export(self, items):
        json_items = jsonpickle.encode(items)

        # Put items on both multiprocessing and threading queues
        self._mp_queue.put(json_items)
        self._queue.puts(items, block=False)  # pragma: NO COVER


... redacted ...

    def run(self):  # pragma: NO COVER
        # Indicate that this thread is an exporter thread.
        # Used to suppress tracking of requests in this thread.
        execution_context.set_is_exporter(True)
        src = self.src
        dst = self.dst
        while True:
            batch = src.gets(dst.max_batch_size, dst.export_interval)

            # Check if batch results in empty tuple and check multiprocessing queue for contents
            if batch == ():
                try:
                    json_items = dst._mp_queue.get()
                    batch = tuple(jsonpickle.decode(json_items))
                except Exception as e:
                    pass

            if batch and isinstance(batch[-1], QueueEvent):

... redacted ...

This works and sends the multiprocessed SpanData into insights.

@arnabbiswas1
Copy link

arnabbiswas1 commented Nov 24, 2021

I am facing the same issue while utilizing multiprocessing through joblib. Mine is a pandas based ML Pipeline where work is distributed across multiple processes using joblib ("multiprocessing" back end).

        with Parallel(n_jobs=4, backend="multiprocessing") as parallel:
            parallel(delayed(_handle_blob)(name) for name in blob_paths)

joblib with "multiprocessing" backend successfully logs to a file on the disk across multiple processes. But, when I add "AzureLogHandler", I don't see any logs sent to Azure Application Insights.

I have used the workaround proposed in the thread. It works. But finally it's a work around.

It would be great if this issue can be prioritized and addressed in Open-Census.

@lzchen
Copy link
Contributor

lzchen commented Nov 30, 2021

@arnabbiswas1
I am not too familiar with jobib but it seems like the worker fork behaviour is similar to the other multiprocessing libraries. As of today there is not a clean solution to address this issue, at least from the SDK side. I believe some mp/web server libraries provide hooks in which you can run logic right after a child process is created (like gunicorn). Looking at joblib docs, maybe you can look into whether Parallel class allows for some pre-processing of the workers that are spawned (the backend parameter).

If you are interested in using other technologies (such as Gunicorn), take a look at the examples in OpenTelemetry.

@arnabbiswas1
Copy link

@lzchen Thanks for the pointers.

Right now we are using "multiprocessing" as a backend. The other backend which we can use is "loky". That has the same logging issue, even with log files on the disk.

        with Parallel(n_jobs=4, backend="multiprocessing") as parallel:
            parallel(delayed(_handle_blob)(name) for name in blob_paths)

In general, for various priorities, I can't spend much time in this issue at this point of time. Considering that some flavor of multiprocessing is very common in Python based applications and Microsoft is using this library for Azure Monitor/Application Insight, I thought there may be some work arounds or solutions.

Thanks for your support. Please keep me posted, in case, you encounter with some other solutions in future.

@ManashJKonwar
Copy link

Is there any resolution for integrating AzureLogHandler and AzureEventHandler objects with the multithreaded process? I have been trying to send intermediate loggers to App Insights from a threaded process every 5 secs, I have utilized a workaround to create the AzureEventHandler at the process ID level, however, when the main thread closes, the opencensus BaseLogHandler class raises an error as such:

image

The workaround has resulted in successful uploading of intermediate logs to App Insights, the only issue is faced when the Main thread closes.
Please do let me know if any resolution to this is possible?

@ManashJKonwar
Copy link

#928 (comment)

Can you please share a working sample of how you managed to integrate Azure Exporter / Azure Event handler / Azure Log handler to work in a multiprocessing setup? That would help me a lot in resolving the issue which I am facing upon using the Azure Event handler under multiple processes. FYI @dpgrodriguez

@JeremyVriens
Copy link
Contributor

I was experiencing the same issue (with Celery) and I figured out that it's not specifically related to the AzureExporter, but to the opencensus.common.schedule.Queue
@dpgrodriguez helped me on the way
Simply changing self._queue = queue.Queue(maxsize=capacity) to self._queue = multiprocessing.Queue(maxsize=capacity) in https://github.com/census-instrumentation/opencensus-python/blob/master/opencensus/common/schedule/__init__.py#L85 does the trick for me.
Can someone confirm this/help me out with a pull request?

@giulianabaratto
Copy link

Any news on this? 👀 We're having the same problem with Celery and would really appreciate to have an official solution

@lzchen
Copy link
Contributor

lzchen commented Jan 3, 2023

@giulianabaratto

There is a draft pr open and the issue should be addressed once it is merged and released.

@wojciech-jakubowski
Copy link

@giulianabaratto

There is a draft pr open and the issue should be addressed once it is merged and released.

Would you be able to share a link to this PR? I would be nice to be able to check if it has been merged or not...

@lzchen
Copy link
Contributor

lzchen commented Sep 15, 2023

#1158

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
azure Microsoft Azure bug
Projects
None yet
Development

No branches or pull requests

8 participants