diff --git a/packages/pynumaflow/pynumaflow/accumulator/async_server.py b/packages/pynumaflow/pynumaflow/accumulator/async_server.py index 50e08468..5be3b16c 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/async_server.py +++ b/packages/pynumaflow/pynumaflow/accumulator/async_server.py @@ -73,6 +73,9 @@ class AccumulatorAsyncServer(NumaflowServer): max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -139,6 +142,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): init_kwargs = init_kwargs or {} self.accumulator_handler = get_handler(accumulator_instance, init_args, init_kwargs) @@ -146,6 +150,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -162,7 +167,7 @@ def start(self): _LOGGER.info( "Starting Async Accumulator Server", ) - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/batchmapper/async_server.py b/packages/pynumaflow/pynumaflow/batchmapper/async_server.py index 5f1f6c91..1078e012 100644 --- a/packages/pynumaflow/pynumaflow/batchmapper/async_server.py +++ b/packages/pynumaflow/pynumaflow/batchmapper/async_server.py @@ -34,6 +34,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=MAP_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Async Batch Map Server instance. @@ -46,6 +47,10 @@ def __init__( max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -79,6 +84,7 @@ async def handler( self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -92,7 +98,7 @@ def start(self): Starter function for the Async Batch Map server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/mapper/async_server.py b/packages/pynumaflow/pynumaflow/mapper/async_server.py index 3ef1ed46..5bba75d7 100644 --- a/packages/pynumaflow/pynumaflow/mapper/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapper/async_server.py @@ -60,6 +60,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=MAP_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Asynchronous Map Server instance. @@ -72,11 +73,16 @@ def __init__( max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. """ self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self.mapper_instance = mapper_instance @@ -92,7 +98,7 @@ def start(self) -> None: Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self) -> None: """ diff --git a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py index d718a6a5..187c720d 100644 --- a/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/mapstreamer/async_server.py @@ -37,6 +37,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=MAP_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Async Map Stream Server instance. @@ -50,6 +51,10 @@ def __init__( max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 server_type: The type of server to be used + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -98,6 +103,7 @@ async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Messag self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -111,7 +117,7 @@ def start(self): Starter function for the Async Map Stream server, we need a separate caller to the aexec so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/reducer/async_server.py b/packages/pynumaflow/pynumaflow/reducer/async_server.py index 4103fe98..77ceefe4 100644 --- a/packages/pynumaflow/pynumaflow/reducer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducer/async_server.py @@ -64,6 +64,10 @@ class ReduceAsyncServer(NumaflowServer): max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py import os @@ -124,6 +128,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=REDUCE_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): init_kwargs = init_kwargs or {} self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs) @@ -131,6 +136,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -147,7 +153,7 @@ def start(self): _LOGGER.info( "Starting Async Reduce Server", ) - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py index f974c1a0..a69b5382 100644 --- a/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py +++ b/packages/pynumaflow/pynumaflow/reducestreamer/async_server.py @@ -69,8 +69,11 @@ class ReduceStreamAsyncServer(NumaflowServer): sock_path: The UNIX socket path to be used for the server max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; - defaults to 4 and max capped at 16 + defaults to 4 and max capped at 16 server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -138,6 +141,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): init_kwargs = init_kwargs or {} self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs) @@ -145,6 +149,7 @@ def __init__( self.max_message_size = max_message_size self.max_threads = min(max_threads, MAX_NUM_THREADS) self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self._server_options = [ ("grpc.max_send_message_length", self.max_message_size), @@ -161,7 +166,7 @@ def start(self): _LOGGER.info( "Starting Async Reduce Stream Server", ) - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/sinker/async_server.py b/packages/pynumaflow/pynumaflow/sinker/async_server.py index 40020ced..63a898e1 100644 --- a/packages/pynumaflow/pynumaflow/sinker/async_server.py +++ b/packages/pynumaflow/pynumaflow/sinker/async_server.py @@ -41,6 +41,10 @@ class SinkAsyncServer(NumaflowServer): max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -88,6 +92,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=SINK_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): # If the container type is fallback sink, then use the fallback sink address and path. if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK: @@ -103,6 +108,7 @@ def __init__( self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self.sinker_instance = sinker_instance @@ -118,7 +124,7 @@ def start(self): Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/sourcer/async_server.py b/packages/pynumaflow/pynumaflow/sourcer/async_server.py index 7e312213..3bca9dfb 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/async_server.py +++ b/packages/pynumaflow/pynumaflow/sourcer/async_server.py @@ -29,6 +29,7 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=SOURCE_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): """ Create a new grpc Async Source Server instance. @@ -41,6 +42,10 @@ def __init__( max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Example invocation: ```py @@ -138,6 +143,7 @@ async def partitions_handler(self) -> PartitionsResponse: self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self.sourcer_instance = sourcer_instance @@ -153,7 +159,7 @@ def start(self): Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self): """ diff --git a/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py b/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py index 520fe281..990e4587 100644 --- a/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py +++ b/packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py @@ -35,6 +35,10 @@ class SourceTransformAsyncServer(NumaflowServer): max_message_size: The max message size in bytes the server can receive and send max_threads: The max number of threads to be spawned; defaults to 4 and max capped at 16 + server_info_file: The path to the server info file + shutdown_callback: Callable, executed after loop is stopped, before + cancelling any tasks. + Useful for graceful shutdown. Below is a simple User Defined Function example which receives a message, applies the @@ -96,11 +100,13 @@ def __init__( max_message_size=MAX_MESSAGE_SIZE, max_threads=NUM_THREADS_DEFAULT, server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH, + shutdown_callback=None, ): self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) self.max_message_size = max_message_size self.server_info_file = server_info_file + self.shutdown_callback = shutdown_callback self.source_transform_instance = source_transform_instance @@ -115,7 +121,7 @@ def start(self) -> None: Starter function for the Async server class, need a separate caller so that all the async coroutines can be started from a single context """ - aiorun.run(self.aexec(), use_uvloop=True) + aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback) async def aexec(self) -> None: """