Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class AccumulatorAsyncServer(NumaflowServer):
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.

Example invocation:
```py
Expand Down Expand Up @@ -139,13 +142,15 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
init_kwargs = init_kwargs or {}
self.accumulator_handler = get_handler(accumulator_instance, init_args, init_kwargs)
self.sock_path = f"unix://{sock_path}"
self.max_message_size = max_message_size
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -162,7 +167,7 @@ def start(self):
_LOGGER.info(
"Starting Async Accumulator Server",
)
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
"""
Create a new grpc Async Batch Map Server instance.
Expand All @@ -46,6 +47,10 @@ def __init__(
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.

Example invocation:
```py
Expand Down Expand Up @@ -79,6 +84,7 @@ async def handler(
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -92,7 +98,7 @@ def start(self):
Starter function for the Async Batch Map server, we need a separate caller
to the aexec so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self):
"""
Expand Down
8 changes: 7 additions & 1 deletion packages/pynumaflow/pynumaflow/mapper/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
"""
Create a new grpc Asynchronous Map Server instance.
Expand All @@ -72,11 +73,16 @@ def __init__(
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.
"""
self.sock_path = f"unix://{sock_path}"
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self.mapper_instance = mapper_instance

Expand All @@ -92,7 +98,7 @@ def start(self) -> None:
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=MAP_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
"""
Create a new grpc Async Map Stream Server instance.
Expand All @@ -50,6 +51,10 @@ def __init__(
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_type: The type of server to be used
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.

Example invocation:
```py
Expand Down Expand Up @@ -98,6 +103,7 @@ async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Messag
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -111,7 +117,7 @@ def start(self):
Starter function for the Async Map Stream server, we need a separate caller
to the aexec so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self):
"""
Expand Down
8 changes: 7 additions & 1 deletion packages/pynumaflow/pynumaflow/reducer/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class ReduceAsyncServer(NumaflowServer):
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.
Example invocation:
```py
import os
Expand Down Expand Up @@ -124,13 +128,15 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=REDUCE_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
init_kwargs = init_kwargs or {}
self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs)
self.sock_path = f"unix://{sock_path}"
self.max_message_size = max_message_size
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -147,7 +153,7 @@ def start(self):
_LOGGER.info(
"Starting Async Reduce Server",
)
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ class ReduceStreamAsyncServer(NumaflowServer):
sock_path: The UNIX socket path to be used for the server
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.

Example invocation:
```py
Expand Down Expand Up @@ -138,13 +141,15 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
init_kwargs = init_kwargs or {}
self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs)
self.sock_path = f"unix://{sock_path}"
self.max_message_size = max_message_size
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
Expand All @@ -161,7 +166,7 @@ def start(self):
_LOGGER.info(
"Starting Async Reduce Stream Server",
)
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self):
"""
Expand Down
8 changes: 7 additions & 1 deletion packages/pynumaflow/pynumaflow/sinker/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class SinkAsyncServer(NumaflowServer):
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.

Example invocation:
```py
Expand Down Expand Up @@ -88,6 +92,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SINK_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
# If the container type is fallback sink, then use the fallback sink address and path.
if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
Expand All @@ -103,6 +108,7 @@ def __init__(
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self.sinker_instance = sinker_instance

Expand All @@ -118,7 +124,7 @@ def start(self):
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self):
"""
Expand Down
8 changes: 7 additions & 1 deletion packages/pynumaflow/pynumaflow/sourcer/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
"""
Create a new grpc Async Source Server instance.
Expand All @@ -41,6 +42,10 @@ def __init__(
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.

Example invocation:
```py
Expand Down Expand Up @@ -138,6 +143,7 @@ async def partitions_handler(self) -> PartitionsResponse:
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self.sourcer_instance = sourcer_instance

Expand All @@ -153,7 +159,7 @@ def start(self):
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class SourceTransformAsyncServer(NumaflowServer):
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
server_info_file: The path to the server info file
shutdown_callback: Callable, executed after loop is stopped, before
cancelling any tasks.
Useful for graceful shutdown.


Below is a simple User Defined Function example which receives a message, applies the
Expand Down Expand Up @@ -96,11 +100,13 @@ def __init__(
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
shutdown_callback=None,
):
self.sock_path = f"unix://{sock_path}"
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file
self.shutdown_callback = shutdown_callback

self.source_transform_instance = source_transform_instance

Expand All @@ -115,7 +121,7 @@ def start(self) -> None:
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)

async def aexec(self) -> None:
"""
Expand Down