Skip to content

Change XXXAsyncServer to use shutdown_callback #310

@sesame0224

Description

@sesame0224

Summary

What change needs making?

We need to use shutdown_call argument in the start functions of XXXAsyncServer.

Use Cases

When would you use this?

We would like to allow users to insert custom processing when a Pod in the pipeline terminates, whether intentionally or unintentionally.
Specifically, the Sink UDF container buffers the data that it streams to an external server and writes it to a file if the Pod terminates.
This functionality is intended as a debugging aid to verify whether the pipeline processing is working correctly, independently of the external server.

Proposed Solution

We only need to add shutdown_callback as an argument to init.

def __init__(
    ...
    shutdown_callback=None,
):
    ...
    self.shutdwon_callback = shutdown_callback
    ...
def start(self) -> None:
    ...
    aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)

From the user side, they can simply create a closure like the following and pass it in.

def make_shutdown_cb(handler):
    # _loop is eventloop of aiorun.
    async def shutdown_cb(_loop):
        logger.info('Starting shutdown_cb...')
        handler.dump()
        logger.info('shutdown_cb completed')
    
    return shutdown_cb
def main():
    handler = CompsysgCat()
    shutdown_cb = make_shutdown_cb(handler)
    grpc_server = MapAsyncServer(handler, shutdown_callback=shutdown_cb)
    grpc_server.start()

I tested this solution with MapAsyncServer and confirmed that it works as expected.
If you agree with this approach, I would like to submit a PR.


Message from the maintainers:

If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions