From 1f5359f0c871d6721a2005a78dd72dd10622e170 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:26:32 -0800 Subject: [PATCH 01/10] fix(typing): reduce Signed-off-by: Vigith Maurice --- .../manifests/reduce/reduce_counter_class.py | 2 +- packages/pynumaflow-lite/pynumaflow_lite/reducer.pyi | 9 +++++++-- .../tests/examples/reduce_counter_func.py | 5 ++++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/pynumaflow-lite/manifests/reduce/reduce_counter_class.py b/packages/pynumaflow-lite/manifests/reduce/reduce_counter_class.py index a10fb346..7639167d 100644 --- a/packages/pynumaflow-lite/manifests/reduce/reduce_counter_class.py +++ b/packages/pynumaflow-lite/manifests/reduce/reduce_counter_class.py @@ -32,7 +32,7 @@ async def handler( pass -async def start(creator: type, init_args: tuple): +async def start(creator: type[reducer.Reducer], init_args: tuple): sock_file = "/var/run/numaflow/reduce.sock" server_info_file = "/var/run/numaflow/reducer-server-info" server = reducer.ReduceAsyncServer(sock_file, server_info_file) diff --git a/packages/pynumaflow-lite/pynumaflow_lite/reducer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/reducer.pyi index 907261e4..66a087fb 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/reducer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/reducer.pyi @@ -1,7 +1,8 @@ from __future__ import annotations import datetime as _dt -from typing import Optional, List, Dict, Awaitable +from typing import Optional, List, Dict, Awaitable, Callable +from collections.abc import AsyncIterable # Re-export the Python ABC for user convenience and typing from ._reduce_dtypes import Reducer as Reducer @@ -64,7 +65,11 @@ class ReduceAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_creator: type, init_args: tuple | None = ...) -> Awaitable[None]: ... + def start( + self, + py_creator: type[Reducer] | Callable[[list[str], AsyncIterable[Datum], Metadata], Awaitable[Messages]], + init_args: tuple | None = ..., + ) -> Awaitable[None]: ... def stop(self) -> None: ... diff --git a/packages/pynumaflow-lite/tests/examples/reduce_counter_func.py b/packages/pynumaflow-lite/tests/examples/reduce_counter_func.py index f893c965..efb07418 100644 --- a/packages/pynumaflow-lite/tests/examples/reduce_counter_func.py +++ b/packages/pynumaflow-lite/tests/examples/reduce_counter_func.py @@ -1,6 +1,7 @@ import asyncio import signal from collections.abc import AsyncIterable +from typing import Awaitable, Callable from pynumaflow_lite import reducer @@ -29,7 +30,9 @@ async def reduce_handler( pass -async def start(handler: callable): +async def start( + handler: Callable[[list[str], AsyncIterable[reducer.Datum], reducer.Metadata], Awaitable[reducer.Messages]] +): sock_file = "/tmp/var/run/numaflow/reduce.sock" server_info_file = "/tmp/var/run/numaflow/reducer-server-info" server = reducer.ReduceAsyncServer(sock_file, server_info_file) From 1f9a5ea342a415af3a11752f607cc22eff7a6ef1 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:31:46 -0800 Subject: [PATCH 02/10] fix(typing): mapper Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/manifests/map/map_cat.py | 4 +++- packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi | 2 +- packages/pynumaflow-lite/tests/examples/map_cat.py | 3 ++- packages/pynumaflow-lite/tests/examples/map_cat_class.py | 4 +++- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/pynumaflow-lite/manifests/map/map_cat.py b/packages/pynumaflow-lite/manifests/map/map_cat.py index a45df476..fe981d2f 100644 --- a/packages/pynumaflow-lite/manifests/map/map_cat.py +++ b/packages/pynumaflow-lite/manifests/map/map_cat.py @@ -1,5 +1,7 @@ import asyncio import signal +from typing import Awaitable, Callable + from pynumaflow_lite import mapper @@ -26,7 +28,7 @@ async def handler( pass -async def start(f: callable): +async def start(f: Callable[[list[str], mapper.Datum], Awaitable[mapper.Messages]]): server = mapper.MapAsyncServer() # Register loop-level signal handlers so we control shutdown and avoid asyncio.run diff --git a/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi b/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi index aa5635e9..ba127e8f 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/mapper.pyi @@ -113,7 +113,7 @@ class MapAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ... + def start(self, py_func: Callable[[list[str], Datum], Awaitable[Messages]]) -> Awaitable[None]: ... def stop(self) -> None: ... diff --git a/packages/pynumaflow-lite/tests/examples/map_cat.py b/packages/pynumaflow-lite/tests/examples/map_cat.py index 939a9043..f0a38db8 100644 --- a/packages/pynumaflow-lite/tests/examples/map_cat.py +++ b/packages/pynumaflow-lite/tests/examples/map_cat.py @@ -1,5 +1,6 @@ import asyncio import signal +from typing import Awaitable, Callable from pynumaflow_lite import mapper @@ -37,7 +38,7 @@ async def async_handler( return messages -async def start(f: callable): +async def start(f: Callable[[list[str], mapper.Datum], Awaitable[mapper.Messages]]): sock_file = "/tmp/var/run/numaflow/map.sock" server_info_file = "/tmp/var/run/numaflow/mapper-server-info" server = mapper.MapAsyncServer(sock_file, server_info_file) diff --git a/packages/pynumaflow-lite/tests/examples/map_cat_class.py b/packages/pynumaflow-lite/tests/examples/map_cat_class.py index 962a3f79..cf23854e 100644 --- a/packages/pynumaflow-lite/tests/examples/map_cat_class.py +++ b/packages/pynumaflow-lite/tests/examples/map_cat_class.py @@ -1,5 +1,7 @@ import asyncio import signal +from typing import Awaitable, Callable + from pynumaflow_lite import mapper @@ -46,7 +48,7 @@ async def handler( pass -async def start(f: callable): +async def start(f: Callable[[list[str], mapper.Datum], Awaitable[mapper.Messages]]): sock_file = "/tmp/var/run/numaflow/map.sock" server_info_file = "/tmp/var/run/numaflow/mapper-server-info" server = mapper.MapAsyncServer(sock_file, server_info_file) From 4d0aad81003c57b2ff72178962cc752621af3c3a Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:33:30 -0800 Subject: [PATCH 03/10] fix(typing): batchmapper Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py | 3 ++- packages/pynumaflow-lite/pynumaflow_lite/batchmapper.pyi | 2 +- packages/pynumaflow-lite/tests/examples/batchmap_cat.py | 4 ++-- packages/pynumaflow-lite/tests/examples/batchmap_cat_class.py | 3 ++- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py b/packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py index 11b2ee56..cf0714cb 100644 --- a/packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py +++ b/packages/pynumaflow-lite/manifests/batchmap/batchmap_cat.py @@ -1,6 +1,7 @@ import asyncio import signal from collections.abc import AsyncIterable +from typing import Awaitable, Callable from pynumaflow_lite import batchmapper from pynumaflow_lite.batchmapper import Message @@ -28,7 +29,7 @@ async def handler(self, batch: AsyncIterable[batchmapper.Datum]) -> batchmapper. pass -async def start(f: callable): +async def start(f: Callable[[AsyncIterable[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]]): server = batchmapper.BatchMapAsyncServer() # Register loop-level signal handlers so we control shutdown and avoid asyncio.run diff --git a/packages/pynumaflow-lite/pynumaflow_lite/batchmapper.pyi b/packages/pynumaflow-lite/pynumaflow_lite/batchmapper.pyi index c63a5328..56f572f2 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/batchmapper.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/batchmapper.pyi @@ -57,7 +57,7 @@ class BatchMapAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ... + def start(self, py_func: Callable[[AsyncIterator[Datum]], Awaitable[BatchResponses]]) -> Awaitable[None]: ... def stop(self) -> None: ... diff --git a/packages/pynumaflow-lite/tests/examples/batchmap_cat.py b/packages/pynumaflow-lite/tests/examples/batchmap_cat.py index 388926d7..78490cab 100644 --- a/packages/pynumaflow-lite/tests/examples/batchmap_cat.py +++ b/packages/pynumaflow-lite/tests/examples/batchmap_cat.py @@ -1,7 +1,7 @@ import asyncio import collections.abc - import signal +from typing import Awaitable, Callable from pynumaflow_lite import batchmapper @@ -19,7 +19,7 @@ async def async_handler(batch: collections.abc.AsyncIterator[batchmapper.Datum]) return responses -async def start(f: callable): +async def start(f: Callable[[collections.abc.AsyncIterator[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]]): sock_file = "/tmp/var/run/numaflow/batchmap.sock" server_info_file = "/tmp/var/run/numaflow/mapper-server-info" server = batchmapper.BatchMapAsyncServer(sock_file, server_info_file) diff --git a/packages/pynumaflow-lite/tests/examples/batchmap_cat_class.py b/packages/pynumaflow-lite/tests/examples/batchmap_cat_class.py index 245ebe1e..6288925b 100644 --- a/packages/pynumaflow-lite/tests/examples/batchmap_cat_class.py +++ b/packages/pynumaflow-lite/tests/examples/batchmap_cat_class.py @@ -1,6 +1,7 @@ import asyncio import signal from collections.abc import AsyncIterator +from typing import Awaitable, Callable from pynumaflow_lite import batchmapper from pynumaflow_lite.batchmapper import Message @@ -28,7 +29,7 @@ async def handler(self, batch: AsyncIterator[batchmapper.Datum]) -> batchmapper. pass -async def start(f: callable): +async def start(f: Callable[[AsyncIterator[batchmapper.Datum]], Awaitable[batchmapper.BatchResponses]]): sock_file = "/tmp/var/run/numaflow/batchmap.sock" server_info_file = "/tmp/var/run/numaflow/mapper-server-info" server = batchmapper.BatchMapAsyncServer(sock_file, server_info_file) From 316ecbc3c9d85f3f64c7e5e14926d47b10203673 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:35:06 -0800 Subject: [PATCH 04/10] fix(typing): mapstreamer Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py | 3 ++- packages/pynumaflow-lite/pynumaflow_lite/mapstreamer.pyi | 4 ++-- packages/pynumaflow-lite/tests/examples/mapstream_cat.py | 3 ++- .../pynumaflow-lite/tests/examples/mapstream_cat_class.py | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py b/packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py index 888bb74d..093fdfaf 100644 --- a/packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py +++ b/packages/pynumaflow-lite/manifests/mapstream/mapstream_cat.py @@ -1,6 +1,7 @@ import asyncio import signal from collections.abc import AsyncIterator +from typing import Callable from pynumaflow_lite import mapstreamer from pynumaflow_lite.mapstreamer import Message @@ -24,7 +25,7 @@ async def handler(self, keys: list[str], datum: mapstreamer.Datum) -> AsyncItera pass -async def start(f: callable): +async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Message]]): # Use default socket/info file locations; no explicit sock file passed server = mapstreamer.MapStreamAsyncServer() diff --git a/packages/pynumaflow-lite/pynumaflow_lite/mapstreamer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/mapstreamer.pyi index c7601f7f..48244004 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/mapstreamer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/mapstreamer.pyi @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Optional, List, Dict, Callable, Awaitable, Any, AsyncIterator +from typing import Optional, List, Dict, Callable, Awaitable, AsyncIterator import datetime as _dt # Re-export the Python ABC for user convenience and typing @@ -45,7 +45,7 @@ class MapStreamAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ... + def start(self, py_func: Callable[[list[str], Datum], AsyncIterator[Message]]) -> Awaitable[None]: ... def stop(self) -> None: ... diff --git a/packages/pynumaflow-lite/tests/examples/mapstream_cat.py b/packages/pynumaflow-lite/tests/examples/mapstream_cat.py index ef6ddce6..b8dea5ef 100644 --- a/packages/pynumaflow-lite/tests/examples/mapstream_cat.py +++ b/packages/pynumaflow-lite/tests/examples/mapstream_cat.py @@ -1,6 +1,7 @@ import asyncio import signal from collections.abc import AsyncIterator +from typing import Callable from pynumaflow_lite import mapstreamer from pynumaflow_lite.mapstreamer import Message @@ -19,7 +20,7 @@ async def async_handler(keys: list[str], datum: mapstreamer.Datum) -> AsyncItera yield Message(s.encode(), keys) -async def start(f: callable): +async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Message]]): sock_file = "/tmp/var/run/numaflow/mapstream.sock" server_info_file = "/tmp/var/run/numaflow/mapper-server-info" server = mapstreamer.MapStreamAsyncServer(sock_file, server_info_file) diff --git a/packages/pynumaflow-lite/tests/examples/mapstream_cat_class.py b/packages/pynumaflow-lite/tests/examples/mapstream_cat_class.py index 0929b522..d0a09ebe 100644 --- a/packages/pynumaflow-lite/tests/examples/mapstream_cat_class.py +++ b/packages/pynumaflow-lite/tests/examples/mapstream_cat_class.py @@ -1,6 +1,7 @@ import asyncio import signal from collections.abc import AsyncIterator +from typing import Callable from pynumaflow_lite import mapstreamer from pynumaflow_lite.mapstreamer import Message @@ -24,7 +25,7 @@ async def handler(self, keys: list[str], datum: mapstreamer.Datum) -> AsyncItera pass -async def start(f: callable): +async def start(f: Callable[[list[str], mapstreamer.Datum], AsyncIterator[Message]]): sock_file = "/tmp/var/run/numaflow/mapstream.sock" server_info_file = "/tmp/var/run/numaflow/mapper-server-info" server = mapstreamer.MapStreamAsyncServer(sock_file, server_info_file) From f3bdecafb6f18bca9fe6f27cccfb87320815168b Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:37:29 -0800 Subject: [PATCH 05/10] fix(typing): sinker Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/manifests/sink/sink_log.py | 5 +++-- packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi | 2 +- packages/pynumaflow-lite/tests/examples/sink_log.py | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/pynumaflow-lite/manifests/sink/sink_log.py b/packages/pynumaflow-lite/manifests/sink/sink_log.py index 986b270e..87df5bab 100644 --- a/packages/pynumaflow-lite/manifests/sink/sink_log.py +++ b/packages/pynumaflow-lite/manifests/sink/sink_log.py @@ -2,7 +2,8 @@ import collections import logging import signal -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, AsyncIterator +from typing import Awaitable, Callable from pynumaflow_lite import sinker from pynumaflow_lite._sink_dtypes import Sinker @@ -35,7 +36,7 @@ async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses pass -async def start(f: collections.abc.Callable): +async def start(f: Callable[[AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]]): server = sinker.SinkAsyncServer() # Register loop-level signal handlers so we control shutdown and avoid asyncio.run diff --git a/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi b/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi index 2a866337..753fe963 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/sinker.pyi @@ -113,7 +113,7 @@ class SinkAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ... + def start(self, py_func: Callable[[AsyncIterator[Datum]], Awaitable[Responses]]) -> Awaitable[None]: ... def stop(self) -> None: ... diff --git a/packages/pynumaflow-lite/tests/examples/sink_log.py b/packages/pynumaflow-lite/tests/examples/sink_log.py index b60fb14e..f782c020 100644 --- a/packages/pynumaflow-lite/tests/examples/sink_log.py +++ b/packages/pynumaflow-lite/tests/examples/sink_log.py @@ -2,6 +2,7 @@ import collections.abc import logging import signal +from typing import Awaitable, Callable from pynumaflow_lite import sinker @@ -39,7 +40,7 @@ async def async_handler(datums: collections.abc.AsyncIterator[sinker.Datum]) -> return responses -async def start(f: callable): +async def start(f: Callable[[collections.abc.AsyncIterator[sinker.Datum]], Awaitable[sinker.Responses]]): sock_file = "/tmp/var/run/numaflow/sink.sock" server_info_file = "/tmp/var/run/numaflow/sinker-server-info" server = sinker.SinkAsyncServer(sock_file, server_info_file) From addd56a9d125b6fe9bfaedaab1c6ffc1a1fefbfc Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:39:11 -0800 Subject: [PATCH 06/10] fix(typing): sourcer Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi index f1f89b4e..3bc41b0e 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/sourcer.pyi @@ -154,7 +154,7 @@ class SourceAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ... + def start(self, py_func: Sourcer) -> Awaitable[None]: ... def stop(self) -> None: ... From 15f20568a3029095bf107cda48ed2d51f7759109 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:40:48 -0800 Subject: [PATCH 07/10] fix(typing): sourcetransformer Signed-off-by: Vigith Maurice --- .../manifests/sourcetransform/sourcetransform_event_filter.py | 4 +++- .../pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi | 2 +- .../tests/examples/sourcetransform_event_filter.py | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/pynumaflow-lite/manifests/sourcetransform/sourcetransform_event_filter.py b/packages/pynumaflow-lite/manifests/sourcetransform/sourcetransform_event_filter.py index ebe2b17d..f43310ec 100644 --- a/packages/pynumaflow-lite/manifests/sourcetransform/sourcetransform_event_filter.py +++ b/packages/pynumaflow-lite/manifests/sourcetransform/sourcetransform_event_filter.py @@ -1,6 +1,8 @@ import asyncio import signal from datetime import datetime, timezone +from typing import Callable + from pynumaflow_lite import sourcetransformer # Define epoch timestamps for filtering @@ -59,7 +61,7 @@ async def handler( pass -async def start(f: callable): +async def start(f: Callable[[list[str], sourcetransformer.Datum], sourcetransformer.Messages]): server = sourcetransformer.SourceTransformAsyncServer() # Register loop-level signal handlers so we control shutdown and avoid asyncio.run diff --git a/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi index 70804128..437bc7aa 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/sourcetransformer.pyi @@ -115,7 +115,7 @@ class SourceTransformAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ... + def start(self, py_func: SourceTransformer) -> Awaitable[None]: ... def stop(self) -> None: ... diff --git a/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py b/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py index 03647b06..3f9e19f6 100644 --- a/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py +++ b/packages/pynumaflow-lite/tests/examples/sourcetransform_event_filter.py @@ -1,6 +1,8 @@ import asyncio import signal from datetime import datetime, timezone +from typing import Callable + from pynumaflow_lite import sourcetransformer # Define epoch timestamps for filtering @@ -91,7 +93,7 @@ async def handler( pass -async def start(f: callable): +async def start(f: Callable[[list[str], sourcetransformer.Datum], sourcetransformer.Messages]): sock_file = "/tmp/var/run/numaflow/sourcetransform.sock" server_info_file = "/tmp/var/run/numaflow/sourcetransformer-server-info" server = sourcetransformer.SourceTransformAsyncServer(sock_file, server_info_file) From 717a8fa9ac9a394bcadb39da86c63d119e86089d Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:42:06 -0800 Subject: [PATCH 08/10] fix(typing): accumulator Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/pynumaflow_lite/accumulator.pyi | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/pynumaflow-lite/pynumaflow_lite/accumulator.pyi b/packages/pynumaflow-lite/pynumaflow_lite/accumulator.pyi index 13493ace..32db03b6 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/accumulator.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/accumulator.pyi @@ -1,5 +1,6 @@ from datetime import datetime -from typing import AsyncIterator, Optional +from typing import AsyncIterator, Optional, Callable, Awaitable +from collections.abc import AsyncIterable class Message: """ @@ -74,13 +75,13 @@ class AccumulatorAsyncServer: info_file: str | None = "/var/run/numaflow/accumulator-server-info", ) -> None: ... async def start( - self, py_creator: object, init_args: object | None = None + self, py_creator: type[Accumulator] | Callable[[AsyncIterable[Datum]], AsyncIterator[Message]], init_args: tuple | None = None ) -> None: """ - Start the server with the given Python class (creator). + Start the server with the given Python class (creator) or function. Args: - py_creator: The Python class to instantiate per key + py_creator: The Python class to instantiate per key or a function init_args: Optional tuple of positional arguments for class instantiation """ ... From 8d445dd4b2855157b5ade74a36625b63e7b7fdaa Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:42:48 -0800 Subject: [PATCH 09/10] fix(typing): reducestreamer Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/pynumaflow_lite/reducestreamer.pyi | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/pynumaflow-lite/pynumaflow_lite/reducestreamer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/reducestreamer.pyi index 4f134cc0..b6176bdd 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/reducestreamer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/reducestreamer.pyi @@ -1,7 +1,8 @@ from __future__ import annotations import datetime as _dt -from typing import Optional, List, Dict, Awaitable, AsyncIterator +from typing import Optional, List, Dict, Awaitable, AsyncIterator, Callable +from collections.abc import AsyncIterable # Re-export the Python ABC for user convenience and typing from ._reducestreamer_dtypes import ReduceStreamer as ReduceStreamer @@ -61,7 +62,7 @@ class ReduceStreamAsyncServer: info_file: str = ..., ) -> None: ... - def start(self, py_creator: type, init_args: tuple | None = ...) -> Awaitable[None]: ... + def start(self, py_creator: type[ReduceStreamer] | Callable[[list[str], AsyncIterable[Datum], Metadata], AsyncIterator[Message]], init_args: tuple | None = ...) -> Awaitable[None]: ... def stop(self) -> None: ... From 6214fe12cc8427a3b33479cc672dd460870d4971 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 28 Jan 2026 08:43:37 -0800 Subject: [PATCH 10/10] fix(typing): session_reducer Signed-off-by: Vigith Maurice --- packages/pynumaflow-lite/pynumaflow_lite/session_reducer.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pynumaflow-lite/pynumaflow_lite/session_reducer.pyi b/packages/pynumaflow-lite/pynumaflow_lite/session_reducer.pyi index e63d52a2..7cf9df5b 100644 --- a/packages/pynumaflow-lite/pynumaflow_lite/session_reducer.pyi +++ b/packages/pynumaflow-lite/pynumaflow_lite/session_reducer.pyi @@ -42,7 +42,7 @@ class SessionReduceAsyncServer: info_file: str | None = ..., ) -> None: ... - def start(self, py_creator: type, init_args: tuple | None = ...) -> Awaitable[None]: ... + def start(self, py_creator: type[SessionReducer], init_args: tuple | None = ...) -> Awaitable[None]: ... def stop(self) -> None: ...