Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot cleanly shutdown async client #25

Closed
ihorh opened this issue Jun 4, 2024 · 11 comments
Closed

Cannot cleanly shutdown async client #25

ihorh opened this issue Jun 4, 2024 · 11 comments

Comments

@ihorh
Copy link

ihorh commented Jun 4, 2024

Note: I'm not entirely sure problem is in esdbclient

I'm testing this locally for now with fastapi + unicorn (in reload mode). I start/stop subscription in fastapi's lifecycle.

I run persistent subscription in the infinite loop like this:

        while not self._stop_requested:
            try:
                self._subscription = await self._client.read_subscription_to_stream(
                    group_name=self._subscription_group_name,
                    stream_name=self._subscription_stream_name,
                )
                async for event in self._subscription:
                    if not self._stop_requested:
                        await self._handle_event(event)
            except ConsumerTooSlow:
                print("subscription was dropped, restarting subscription")
                continue

no matter how I handle shutdown (ctrl+c) in code I can reliably pick a moment when client or subscription is probably half-initialised (or whatever something else happens there) and does not shutdown properly, causing entire application to hang forever.

With this shutdown code I get the best results so far, but it does not completely solves an issue:

    async def _stop_with_no_lock(self):
        self._stop_requested = True
        # if self._active_task:  self._active_task.cancel()       # pylint: disable=multiple-statements
        if self._subscription: await self._subscription.stop()  # pylint: disable=multiple-statements


    async def stop(self):
        async with self._start_stop_lock:
            await self._stop_with_no_lock()


    async def stop_and_close(self):
        async with self._start_stop_lock:
            await self._stop_with_no_lock()
            if self._client: await self._client.close()         # pylint: disable=multiple-statements

Async client initialization is also protected by the same self._start_stop_lock.

Whenever I see in logs following entry:

DEBUG 2024-06-04 13:23:31,327 grpc._cython.cygrpc/_channel:__init__
  --> Using AsyncIOEngine.POLLER as I/O engine

I hit ctrl+c immediately and almost always get application hanging, producing following output once:

^CINFO:     Application startup complete.
DEBUG 2024-06-04 13:23:34,326 grpc.aio._call/_call:_consume_request_iterator
  --> Client request_iterator raised exception:
Traceback (most recent call last):
  File "<I removed path here>/.venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 443, in _consume_request_iterator
    await self._write(request)
  File "<I removed path here>/.venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 484, in _write
    await self._metadata_sent.wait()
  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError

ERROR:    Traceback (most recent call last):
  File "<I removed path here>/.venv/lib/python3.11/site-packages/starlette/routing.py", line 741, in lifespan
    await receive()
  File "<I removed path here>/.venv/lib/python3.11/site-packages/uvicorn/lifespan/on.py", line 137, in receive
    return await self.receive_queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
asyncio.exceptions.CancelledError

DEBUG 2024-06-04 13:23:34,328 grpc._cython.cygrpc/_call:_fetch_stream_responses
  --> Failed to receive any message from Core
^C

subsequent ctrl+cs don't seem to have any effect

Environment: it is python 3.11 on MacOS M2

@johnbywater
Copy link
Collaborator

Hello @ihorh! Thanks for raising this issue.

I'm looking into this now....

@johnbywater
Copy link
Collaborator

johnbywater commented Jun 4, 2024

I tried various things without involving fastapi. Whatever I have tried, it always exits the program with a crtl-c.

Please can you share a complete example program using fastapi that I can run, and see if I can replicate the issue here (I'm also running a Mac but with Intel CPU)?

This is the code I wrote (which exits automatically if await self.stop() line is uncommented, and always when I hit ctrl-c with that line commented out):

import asyncio
import uuid

from esdbclient import RecordedEvent, AsyncioEventStoreDBClient, NewEvent, StreamState
from esdbclient.exceptions import ConsumerTooSlow


class Consumer:
    def __init__(self, client, group_name, stream_name):
        self._client = client
        self._stop_requested = False
        self._subscription_group_name = group_name
        self._subscription_stream_name = stream_name

    async def run(self):
        while not self._stop_requested:
            try:
                self._subscription = await self._client.read_subscription_to_stream(
                    group_name=self._subscription_group_name,
                    stream_name=self._subscription_stream_name,
                )
                async for event in self._subscription:
                    if not self._stop_requested:
                        await self._handle_event(event)
            except ConsumerTooSlow:
                print("subscription was dropped, restarting subscription")
                continue


    async def _handle_event(self, event: RecordedEvent) -> None:
        print("Received event:", event)
        await asyncio.sleep(1)
        # await self.stop()

    async def stop(self):
        self._stop_requested = True
        await self._subscription.stop()


async def main() -> None:
    client = await AsyncioEventStoreDBClient(uri="esdb://localhost:2113?Tls=false")
    group_name = str(uuid.uuid4())
    stream_name = str(uuid.uuid4())

    await client.append_events(
        stream_name=stream_name,
        events=list(NewEvent(type="SomethingHappened", data=b"{}") for _ in range(100)),
        current_version=StreamState.NO_STREAM,
    )

    await client.create_subscription_to_stream(group_name=group_name, stream_name=stream_name)
    consumer = Consumer(client, group_name, stream_name)
    await consumer.run()
    print("Consumer has shutdown")


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(
        main()
    )

@johnbywater
Copy link
Collaborator

Also, I'm using https://pypi.org/project/esdbclient/1.1b1/

@ihorh
Copy link
Author

ihorh commented Jun 4, 2024

@johnbywater , thank you for looking into this so quickly. Before I create self contained example program, I just want to make sure error is not in my code, it indeed can be. Please, could you give me a hint if it is ok to use multiple instances of async client in python program? or is it better to reuse the same instance of async client as long as it connects to the same Event Store instance?

@ihorh
Copy link
Author

ihorh commented Jun 4, 2024

@johnbywater , it seems to me that async client in 1.0.25 is not quite compatible with async fastapi endpoints, I have problems sending simple events from the endpoint on fairy clean environment, I should try try with 1.1b1 first. Or maybe it is (my) skills issue :-)

sorry for the trouble. I'll post my updates as soon as I achieve a progress with this.

@johnbywater
Copy link
Collaborator

Please, could you give me a hint if it is ok to use multiple instances of async client in python program? or is it better to reuse the same instance of async client as long as it connects to the same Event Store instance?

@ihorh It's probably best to just have one client.

@ihorh
Copy link
Author

ihorh commented Jun 4, 2024

quick update. it seems that in original project something is messing with asyncio and creates side effects which affect esdbclient.

I'm creating one from scratch and adding different components one-by-one until I find the cause. Currently async fastapi + async sqlalchemy (with async driver) seems to work fine together with esdbclient. I'll keep going.

So it is definitely not a bug in esdbclient. Sorry for the trouble! And thank you for being so responsive!

@johnbywater
Copy link
Collaborator

johnbywater commented Jun 4, 2024

How are you constructing the client?

I tried using the lifespan argument when constructing FastAPI and it works okay.

from contextlib import asynccontextmanager

from fastapi import FastAPI

from esdbclient import AsyncioEventStoreDBClient
from esdbclient.asyncio_client import _AsyncioEventStoreDBClient

esdbclient: _AsyncioEventStoreDBClient


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Construct the client.
    global esdbclient
    esdbclient = await AsyncioEventStoreDBClient(
        uri=f"esdb+discover://localhost:2113?Tls=false",
    )
    
    yield
    
    # Close the client.
    await esdbclient.close()


app = FastAPI(lifespan=lifespan)


@app.get("/commit_position")
async def commit_position():
    commit_position = await esdbclient.get_commit_position()
    return {"commit_position": commit_position}

Putting this code is a file called fastapi_example.py and then running uvicorn fastapi_example:app --host 0.0.0.0 --port 80 returns {"commit_position":628917} when I point a browser to http://localhost/commit_position. And when I ctrl-c the process, the process exits.

% uvicorn fastapi_example:app --host 0.0.0.0 --port 80
INFO:     Started server process [80523]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:80 (Press CTRL+C to quit)
INFO:     127.0.0.1:58586 - "GET /commit_position HTTP/1.1" 200 OK
^CINFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [80523]

Aborted!

I know this doesn't involve any persistent subscriptions, but can you replicate this at least?

@ihorh
Copy link
Author

ihorh commented Jun 4, 2024

@johnbywater , thanks again for being so helpful. On original project I fixed most of issues (user error) and async client (1.0.25) works fine. On original project I still can reproduce issue with infinite freeze during shutdown.

I reused your Consumer and lifespan code in the clean environment from scratch. On clean environment issue with hanging during shutdown is not reproducible. I won't post code yet, since it is messy and doesn't reproduce anything.

@ihorh
Copy link
Author

ihorh commented Jun 5, 2024

If you don't mind I'll post my research here.

There were number of bug reports in uvicorn repo with similar problem. Even so some of them are marked closed, people still complain in the comments that under some circumstance these bugs are reproducible.

some responses claim that python between version 11 and 12, changed the way it is handling idle connections (not sure I get that part right). As a result on shutdown uvicorn waits until outgoing connection closes on its own and somehow under certain circumstance it can make it unresponsive to ctrl+c.

In my project, whenever server hangs, I can see that process has an open connection to my (remote) EventStoreDB server to port 2113.

So for now I will evaluate a theory that in my environment somehow esdbclient is not closing the connection on shutdown, or maybe my code does not reach the point where it is closing the client.

And quick edit: whenever it hangs, it does not reach shutdown code in lifespan:

@asynccontextmanager
async def lifespan(_: FastAPI):
    ...
   yield
   # it is not reaching this point

@ihorh
Copy link
Author

ihorh commented Jun 5, 2024

Okay, so now I think I have a proof that the problem is not neither my code or esdbclient or grpc.

It looks uvicorn's shutdown process under some hardly reproducible conditions freezes waiting for outgoing network connection to close. But because shutdown process itself is frozen it does not execute our shutdown 'hook' defined in fastapi's lifespan after yield keyword, so it does not give a chance to application to clean-up resources.

Here is code I've added to lifespan to verify this theory:

@asynccontextmanager
async def lifespan(_: FastAPI):
    ...
    default_sigint_handler = signal.getsignal(signal.SIGINT)
    def terminate_now(signum: int, frame: FrameType | None = None):
        # do whatever you need to unblock your own tasks
        print("===> shutdown signal override")
        task1 = asyncio.create_task(event_handler.stop())
        task2 = asyncio.create_task(esdb_client.close())
        asyncio.gather(task1, task2)

        default_sigint_handler(signum, frame) # type: ignore

    signal.signal(signal.SIGINT, terminate_now)

    yield

    print("===> normal shutdown hook execution")

    await event_handler.stop()
    await esdb_client.close()

This code makes uvicorn process responsive to subsequent ctrl+c even when its shutdown process is frozen. So in my particular "development" scenario uvicorn is reloading (not because of SIGINT) when I edit files and freezes. But it still gives me the chance to stop it with ctrl+c. So this 'hack' proves that whenever application is given a chance to cleanup resources it does it correctly.

I've found hints for a workaround here: encode/uvicorn#1579

So it seems there is nothing we can do better in our code.

@johnbywater , sorry for the trouble again. At first it looked that this problem is caused by grpc aio or esdbcclient, since it appeared only when they were part of the equation. But apparently adding them to the equation just triggers uvicorn's strange behaviour.

I'm closing the bug if you don't mind.

@ihorh ihorh closed this as completed Jun 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants