Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unhandled exception in event loop due to possible resource contention #1219

Open
4 of 6 tasks
tabrezm opened this issue Oct 25, 2024 · 4 comments
Open
4 of 6 tasks

Unhandled exception in event loop due to possible resource contention #1219

tabrezm opened this issue Oct 25, 2024 · 4 comments

Comments

@tabrezm
Copy link

tabrezm commented Oct 25, 2024

Describe the bug
Preface: I'm not 100% sure what's going on because the stack trace doesn't include my code. I'll share what I do know.

We have an image processor which fetches millions of images and writes them to cloud storage. We recently migrated a storage bucket from AWS S3 to Backblaze B2 and started seeing the following exception stack randomly in asyncio.run(debug=True).

TypeError: 'NoneType' object is not callable
Exception in callback None()
handle: <Handle created at /home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/selector_events.py:310>
source_traceback: Object created at (most recent call last):
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/base_events.py", line 708, in run_until_complete
    self.run_forever()
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/base_events.py", line 679, in run_forever
    self._run_once()
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/base_events.py", line 2019, in runonce
    handle._run()
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/events.py", line 89, in _run
    self._context.run(self._callback, *self._args)
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/selector_events.py", line 1107, in writesendmsg
    self._maybe_resume_protocol()  # May append to buffer.
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/transports.py", line 302, in mayberesume_protocol
    self._protocol.resume_writing()
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/sslproto.py", line 911, in resume_writing
    self._process_outgoing()
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/sslproto.py", line 719, in processoutgoing
    self._transport.write(data)
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/selector_events.py", line 1080, in write
    self._loop._add_writer(self._sock_fd, self._write_ready)
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/selector_events.py", line 310, in addwriter
    handle = events.Handle(callback, args, self, None)
Traceback (most recent call last):
  File "/home/linuxbrew/.linuxbrew/Cellar/[email protected]/3.13.0_1/lib/python3.13/asyncio/events.py", line 89, in _run
    self._context.run(self._callback, *self._args)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The image processor creates a ProcessPoolExecutor, and each process invokes asyncio.run(process_messages(messages)). Within this function, we create aiobotocore clients, and for the S3 client we set the credentials and endpoint URL. We create tasks via asyncio.createtask() and asyncio.wait(). Each task downloads files from the internet to temp storage and then uploads them to B2. This is the upload code:

    async with aiofiles.open(path, "rb") as f:
        content = await f.read()

    mime = magic.Magic(mime=True)
    content_type = mime.from_file(path)

    await s3.put_object(Body=content, Bucket=bucket, Key=key, ContentType=content_type)
    response = await s3.head_object(Bucket=bucket, Key=key)

Note that this code could run for 24h+ without errors when uploading to S3. But upon switching to B2, processes within the pool will get stuck in an infinite exception loop. We've noticed that B2's API is less stable than S3 (throttling, latency, etc.), so that could be triggering some race condition in aiobotocore, aiohttp, or even asyncio. We have several different Python envs, and it repros on all of them (i.e., 3.10, 3.12, and 3.13).

Furthermore, we noticed that the exception loop gets triggered more frequently when increasing resource utilization. We could go 1h+ without error when running with fewer workers, i.e., ProcessPoolExecutor(max_workers=4). But when we increase it to 32-64 workers (on a 64 core machine), we'll see at least one of the processes get stuck and time out every ~5 minutes. If we revert the S3 client config and use AWS S3, the problem goes away. We've mitigated the issue by setting a timeout in asyncio.wait() and then canceling any pending tasks (pending due to being stuck in the loop).

Checklist

  • I have reproduced in environment where pip check passes without errors
  • I have provided pip freeze results
  • I have provided sample code or detailed way to reproduce
  • I have tried the same code in botocore to ensure this is an aiobotocore specific issue
  • I have tried similar code in aiohttp to ensure this is is an aiobotocore specific issue
  • I have checked the latest and older versions of aiobotocore/aiohttp/python to see if this is a regression / injection

pip freeze results

aiobotocore==2.15.2
aiofiles==24.1.0
aiohappyeyeballs==2.4.3
aiohttp==3.10.10
aioitertools==0.12.0
aiosignal==1.3.1
attrs==24.2.0
boto3==1.35.36
botocore==1.35.36
frozenlist==1.4.1
idna==3.10
jmespath==1.0.1
multidict==6.1.0
pillow==11.0.0
pillow-avif-plugin==1.4.6
primp==0.6.4
propcache==0.2.0
python-dateutil==2.9.0.post0
python-magic==0.4.27
s3transfer==0.10.3
six==1.16.0
tenacity==9.0.0
tqdm==4.66.5
urllib3==2.2.3
wrapt==1.16.0
yarl==1.16.0

Environment:

  • Python Version: 3.10-3.13
  • OS name and version: macOS, Ubuntu 24.04
@thehesiod
Copy link
Collaborator

I'm 90 percent sure this is an issue from aiohttp down. debugging this will require adding a ton of logging to aiohttp. if you can create a repeatable test case that would help a lot, perhaps with moto

@thehesiod
Copy link
Collaborator

are you using spawned vs forked processes? forking is dangerous with aiohttp due to the event loops

@tabrezm
Copy link
Author

tabrezm commented Oct 25, 2024

if you can create a repeatable test case that would help a lot

I need to tear apart several hundred lines of code in this script to see what I can share. The hierarchy of ProcessPoolExecutor and asyncio is fairly simple:
executor.submit(handler) > asyncio.run(process_messages(messages)) > asyncio.create_task(process_message(clients, message)) > await asyncio.wait(tasks.keys(), return_when=asyncio.ALL_COMPLETED, timeout=timeout) > client_s3.put_object()

are you using spawned vs forked processes? forking is dangerous with aiohttp due to the event loops

Ubuntu defaults to fork, and macOS defaults to spawn. There's no asyncio code outside the ProcessPoolExecutor, so would this be an issue?

@tabrezm
Copy link
Author

tabrezm commented Nov 18, 2024

Minor update in case this helps someone else ... I implemented a singleton wrapper around AioSession I haven't seen this exception again (128 processes running over 24h).

class AWSSessionManager:
    _sessions = WeakValueDictionary()
    _lock = asyncio.Lock()

    @classmethod
    async def get_session(cls) -> AioSession:
        loop = asyncio.get_running_loop()
        loop_id = id(loop)

        if loop_id not in cls._sessions:
            async with cls._lock:
                if loop_id not in cls._sessions:
                    cls._sessions[loop_id] = get_session()

        return cls._sessions[loop_id]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants