Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/pynumaflow/pynumaflow/mapper/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
"""
Create a new grpc Asynchronous Map Server instance.
Expand All @@ -77,6 +78,7 @@ def __init__(
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdwon_callback = shutdown_callback

self.mapper_instance = mapper_instance

Expand All @@ -92,7 +94,7 @@ def start(self) -> None:
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)

async def aexec(self) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
"""
Create a new grpc Async Map Stream Server instance.
Expand Down Expand Up @@ -98,6 +99,7 @@ async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Messag
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdwon_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -111,7 +113,7 @@ def start(self):
Starter function for the Async Map Stream server, we need a separate caller
to the aexec so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)

async def aexec(self):
"""
Expand Down
4 changes: 3 additions & 1 deletion packages/pynumaflow/pynumaflow/reducer/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,15 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=REDUCE_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
init_kwargs = init_kwargs or {}
self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs)
self.sock_path = f"unix://{sock_path}"
self.max_message_size = max_message_size
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.server_info_file = server_info_file
self.shutdwon_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -147,7 +149,7 @@ def start(self):
_LOGGER.info(
"Starting Async Reduce Server",
)
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)

async def aexec(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
init_kwargs = init_kwargs or {}
self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs)
self.sock_path = f"unix://{sock_path}"
self.max_message_size = max_message_size
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.server_info_file = server_info_file
self.shutdwon_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -161,7 +163,7 @@ def start(self):
_LOGGER.info(
"Starting Async Reduce Stream Server",
)
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)

async def aexec(self):
"""
Expand Down
4 changes: 3 additions & 1 deletion packages/pynumaflow/pynumaflow/sinker/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SINK_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
# If the container type is fallback sink, then use the fallback sink address and path.
if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
Expand All @@ -103,6 +104,7 @@ def __init__(
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdwon_callback = shutdown_callback

self.sinker_instance = sinker_instance

Expand All @@ -118,7 +120,7 @@ def start(self):
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)

async def aexec(self):
"""
Expand Down
4 changes: 3 additions & 1 deletion packages/pynumaflow/pynumaflow/sourcer/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
"""
Create a new grpc Async Source Server instance.
Expand Down Expand Up @@ -138,6 +139,7 @@ async def partitions_handler(self) -> PartitionsResponse:
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self.sourcer_instance = sourcer_instance

Expand All @@ -153,7 +155,7 @@ def start(self):
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback)

async def aexec(self):
"""
Expand Down