From 4308a9ae71ad1b4705dfb9f2d429bf56af24b1d5 Mon Sep 17 00:00:00 2001 From: Kazuki Yamamoto Date: Mon, 2 Feb 2026 23:32:17 +0900 Subject: [PATCH 1/3] feat: support shutdown_callback argument in XXXAsyncServer Signed-off-by: Kazuki Yamamoto --- packages/pynumaflow/pynumaflow/mapper/async_server.py | 4 +++- packages/pynumaflow/pynumaflow/mapstreamer/async_server.py | 4 +++- packages/pynumaflow/pynumaflow/reducer/async_server.py | 4 +++- packages/pynumaflow/pynumaflow/reducestreamer/async_server.py | 4 +++- packages/pynumaflow/pynumaflow/sinker/async_server.py | 4 +++- packages/pynumaflow/pynumaflow/sourcer/async_server.py | 4 +++- 6 files changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/mapper/async_server.py b/packages/pynumaflow/pynumaflow/mapper/async_server.py index 3ef1ed46..34c9b92a 100644 --- a/packages/pynumaflow/pynumaflow/mapper/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapper/async_server.py @@ -60,6 +60,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=MAP_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Asynchronous Map Server instance. @@ -77,6 +78,7 @@ def __init__( self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdwon_callback = shutdown_callback self.mapper_instance = mapper_instance @@ -92,7 +94,7 @@ def start(self) -> None: Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) async def aexec(self) -> None: """ diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py index d718a6a5..90fdd3c7 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py @@ -37,6 +37,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=MAP_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Async Map Stream Server instance. @@ -98,6 +99,7 @@ async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Messag self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdwon_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -111,7 +113,7 @@ def start(self): Starter function for the Async Map Stream server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/reducer/async_server.py b/packages/pynumaflow/pynumaflow/reducer/async_server.py index 4103fe98..8f6d06e7 100644 --- a/packages/pynumaflow/pynumaflow/reducer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducer/async_server.py @@ -124,6 +124,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=REDUCE_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): init_kwargs = init_kwargs or {} self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs) @@ -131,6 +132,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file + self.shutdwon_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -147,7 +149,7 @@ def start(self): _LOGGER.info( "Starting Async Reduce Server", ) - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py index f974c1a0..771b5f94 100644 --- a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py @@ -138,6 +138,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): init_kwargs = init_kwargs or {} self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs) @@ -145,6 +146,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file + self.shutdwon_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -161,7 +163,7 @@ def start(self): _LOGGER.info( "Starting Async Reduce Stream Server", ) - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/sinker/async_server.py b/packages/pynumaflow/pynumaflow/sinker/async_server.py index 40020ced..3628f01c 100644 --- a/packages/pynumaflow/pynumaflow/sinker/async_server.py +++ b/packages/pynumaflow/pynumaflow/sinker/async_server.py @@ -88,6 +88,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=SINK_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): # If the container type is fallback sink, then use the fallback sink address and path. if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK: @@ -103,6 +104,7 @@ def __init__( self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdwon_callback = shutdown_callback self.sinker_instance = sinker_instance @@ -118,7 +120,7 @@ def start(self): Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/sourcer/async_server.py b/packages/pynumaflow/pynumaflow/sourcer/async_server.py index 7e312213..3da75cf8 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/async_server.py +++ b/packages/pynumaflow/pynumaflow/sourcer/async_server.py @@ -29,6 +29,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=SOURCE_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Async Source Server instance. @@ -138,6 +139,7 @@ async def partitions_handler(self) -> PartitionsResponse: self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self.sourcer_instance = sourcer_instance @@ -153,7 +155,7 @@ def start(self): Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) async def aexec(self): """ From 573028b50b63752fe84ec7226c9c6ea94e36ef95 Mon Sep 17 00:00:00 2001 From: Kazuki Yamamoto Date: Wed, 4 Feb 2026 13:04:41 +0900 Subject: [PATCH 2/3] fix(typing): shutd`wo`n -> shutdown Signed-off-by: Kazuki Yamamoto --- packages/pynumaflow/pynumaflow/mapper/async_server.py | 4 ++-- packages/pynumaflow/pynumaflow/mapstreamer/async_server.py | 4 ++-- packages/pynumaflow/pynumaflow/reducer/async_server.py | 4 ++-- packages/pynumaflow/pynumaflow/reducestreamer/async_server.py | 4 ++-- packages/pynumaflow/pynumaflow/sinker/async_server.py | 4 ++-- packages/pynumaflow/pynumaflow/sourcer/async_server.py | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/mapper/async_server.py b/packages/pynumaflow/pynumaflow/mapper/async_server.py index 34c9b92a..1992c016 100644 --- a/packages/pynumaflow/pynumaflow/mapper/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapper/async_server.py @@ -78,7 +78,7 @@ def __init__( self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file - self.shutdwon_callback = shutdown_callback + self.shutdown_callback = shutdown_callback self.mapper_instance = mapper_instance @@ -94,7 +94,7 @@ def start(self) -> None: Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self) -> None: """ diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py index 90fdd3c7..243a3578 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py @@ -99,7 +99,7 @@ async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Messag self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file - self.shutdwon_callback = shutdown_callback + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -113,7 +113,7 @@ def start(self): Starter function for the Async Map Stream server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/reducer/async_server.py b/packages/pynumaflow/pynumaflow/reducer/async_server.py index 8f6d06e7..581e7cc5 100644 --- a/packages/pynumaflow/pynumaflow/reducer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducer/async_server.py @@ -132,7 +132,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file - self.shutdwon_callback = shutdown_callback + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -149,7 +149,7 @@ def start(self): _LOGGER.info( "Starting Async Reduce Server", ) - aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py index 771b5f94..68d293c3 100644 --- a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py @@ -146,7 +146,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file - self.shutdwon_callback = shutdown_callback + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -163,7 +163,7 @@ def start(self): _LOGGER.info( "Starting Async Reduce Stream Server", ) - aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/sinker/async_server.py b/packages/pynumaflow/pynumaflow/sinker/async_server.py index 3628f01c..af76ae37 100644 --- a/packages/pynumaflow/pynumaflow/sinker/async_server.py +++ b/packages/pynumaflow/pynumaflow/sinker/async_server.py @@ -104,7 +104,7 @@ def __init__( self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file - self.shutdwon_callback = shutdown_callback + self.shutdown_callback = shutdown_callback self.sinker_instance = sinker_instance @@ -120,7 +120,7 @@ def start(self): Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/sourcer/async_server.py b/packages/pynumaflow/pynumaflow/sourcer/async_server.py index 3da75cf8..8229de5f 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/async_server.py +++ b/packages/pynumaflow/pynumaflow/sourcer/async_server.py @@ -155,7 +155,7 @@ def start(self): Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdwon_callback) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ From 1cf67297ab548036e94d423b6ca901a8ede4764d Mon Sep 17 00:00:00 2001 From: Kazuki Yamamoto Date: Wed, 4 Feb 2026 17:16:40 +0900 Subject: [PATCH 3/3] feat: add shutdown_callback to the remaining async_servers - Also, update the documentation comment for this parameter Signed-off-by: Kazuki Yamamoto --- .../pynumaflow/pynumaflow/accumulator/async_server.py | 7 ++++++- .../pynumaflow/pynumaflow/batchmapper/async_server.py | 8 +++++++- packages/pynumaflow/pynumaflow/mapper/async_server.py | 4 ++++ .../pynumaflow/pynumaflow/mapstreamer/async_server.py | 4 ++++ packages/pynumaflow/pynumaflow/reducer/async_server.py | 4 ++++ .../pynumaflow/pynumaflow/reducestreamer/async_server.py | 5 ++++- packages/pynumaflow/pynumaflow/sinker/async_server.py | 4 ++++ packages/pynumaflow/pynumaflow/sourcer/async_server.py | 4 ++++ .../pynumaflow/sourcetransformer/async_server.py | 8 +++++++- 9 files changed, 44 insertions(+), 4 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/accumulator/async_server.py b/packages/pynumaflow/pynumaflow/accumulator/async_server.py index 50e08468..5be3b16c 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/async_server.py +++ b/packages/pynumaflow/pynumaflow/accumulator/async_server.py @@ -73,6 +73,9 @@ class AccumulatorAsyncServer(NumaflowServer): max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -139,6 +142,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): init_kwargs = init_kwargs or {} self.accumulator_handler = get_handler(accumulator_instance, init_args, init_kwargs) @@ -146,6 +150,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -162,7 +167,7 @@ def start(self): _LOGGER.info( "Starting Async Accumulator Server", ) - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/batchmapper/async_server.py b/packages/pynumaflow/pynumaflow/batchmapper/async_server.py index 5f1f6c91..1078e012 100644 --- a/packages/pynumaflow/pynumaflow/batchmapper/async_server.py +++ b/packages/pynumaflow/pynumaflow/batchmapper/async_server.py @@ -34,6 +34,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=MAP_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Async Batch Map Server instance. @@ -46,6 +47,10 @@ def __init__( max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -79,6 +84,7 @@ async def handler( self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -92,7 +98,7 @@ def start(self): Starter function for the Async Batch Map server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/mapper/async_server.py b/packages/pynumaflow/pynumaflow/mapper/async_server.py index 1992c016..5bba75d7 100644 --- a/packages/pynumaflow/pynumaflow/mapper/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapper/async_server.py @@ -73,6 +73,10 @@ def __init__( max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. """ self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py index 243a3578..187c720d 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py @@ -51,6 +51,10 @@ def __init__( max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 server_type: The type of server to be used + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py diff --git a/packages/pynumaflow/pynumaflow/reducer/async_server.py b/packages/pynumaflow/pynumaflow/reducer/async_server.py index 581e7cc5..77ceefe4 100644 --- a/packages/pynumaflow/pynumaflow/reducer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducer/async_server.py @@ -64,6 +64,10 @@ class ReduceAsyncServer(NumaflowServer): max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py import os diff --git a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py index 68d293c3..a69b5382 100644 --- a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py @@ -69,8 +69,11 @@ class ReduceStreamAsyncServer(NumaflowServer): sock_path: The UNIX socket path to be used for the server max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; - defaults to 4 and max capped at 16 + defaults to 4 and max capped at 16 server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py diff --git a/packages/pynumaflow/pynumaflow/sinker/async_server.py b/packages/pynumaflow/pynumaflow/sinker/async_server.py index af76ae37..63a898e1 100644 --- a/packages/pynumaflow/pynumaflow/sinker/async_server.py +++ b/packages/pynumaflow/pynumaflow/sinker/async_server.py @@ -41,6 +41,10 @@ class SinkAsyncServer(NumaflowServer): max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py diff --git a/packages/pynumaflow/pynumaflow/sourcer/async_server.py b/packages/pynumaflow/pynumaflow/sourcer/async_server.py index 8229de5f..3bca9dfb 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/async_server.py +++ b/packages/pynumaflow/pynumaflow/sourcer/async_server.py @@ -42,6 +42,10 @@ def __init__( max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py b/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py index 520fe281..990e4587 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py @@ -35,6 +35,10 @@ class SourceTransformAsyncServer(NumaflowServer): max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Below is a simple User Defined Function example which receives a message, applies the @@ -96,11 +100,13 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self.source_transform_instance = source_transform_instance @@ -115,7 +121,7 @@ def start(self) -> None: Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self) -> None: """