-
Notifications
You must be signed in to change notification settings - Fork 25
Description
Summary
What change needs making?
We need to use shutdown_call argument in the start functions of XXXAsyncServer.
Use Cases
When would you use this?
We would like to allow users to insert custom processing when a Pod in the pipeline terminates, whether intentionally or unintentionally.
Specifically, the Sink UDF container buffers the data that it streams to an external server and writes it to a file if the Pod terminates.
This functionality is intended as a debugging aid to verify whether the pipeline processing is working correctly, independently of the external server.
Proposed Solution
We only need to add shutdown_callback as an argument to init.
def __init__(
...
shutdown_callback=None,
):
...
self.shutdwon_callback = shutdown_callback
...
def start(self) -> None:
...
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)
From the user side, they can simply create a closure like the following and pass it in.
def make_shutdown_cb(handler):
# _loop is eventloop of aiorun.
async def shutdown_cb(_loop):
logger.info('Starting shutdown_cb...')
handler.dump()
logger.info('shutdown_cb completed')
return shutdown_cb
def main():
handler = CompsysgCat()
shutdown_cb = make_shutdown_cb(handler)
grpc_server = MapAsyncServer(handler, shutdown_callback=shutdown_cb)
grpc_server.start()
I tested this solution with MapAsyncServer and confirmed that it works as expected.
If you agree with this approach, I would like to submit a PR.
Message from the maintainers:
If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.