From 75e3b8697aacf1b2226cdf2180006159db41a09b Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Mon, 22 Dec 2025 16:33:03 -0500 Subject: [PATCH 1/5] Add support for onSuccess sink in pynumaflow Signed-off-by: vtiwari5 --- .../examples/sink/all_sinks/Dockerfile | 55 +++++++++++++ .../examples/sink/all_sinks/Makefile | 22 +++++ .../examples/sink/all_sinks/entry.sh | 4 + .../examples/sink/all_sinks/example.py | 81 +++++++++++++++++++ .../examples/sink/all_sinks/pipeline.yaml | 50 ++++++++++++ .../examples/sink/all_sinks/pyproject.toml | 15 ++++ packages/pynumaflow/pynumaflow/_constants.py | 3 + .../pynumaflow/proto/sinker/sink.proto | 8 ++ .../pynumaflow/proto/sinker/sink_pb2.py | 18 +++-- .../pynumaflow/proto/sinker/sink_pb2.pyi | 17 +++- .../pynumaflow/pynumaflow/sinker/__init__.py | 3 +- .../pynumaflow/pynumaflow/sinker/_dtypes.py | 51 +++++++++++- .../pynumaflow/sinker/async_server.py | 7 ++ .../pynumaflow/pynumaflow/sinker/server.py | 7 ++ .../pynumaflow/tests/sink/test_responses.py | 9 ++- 15 files changed, 331 insertions(+), 19 deletions(-) create mode 100644 packages/pynumaflow/examples/sink/all_sinks/Dockerfile create mode 100644 packages/pynumaflow/examples/sink/all_sinks/Makefile create mode 100644 packages/pynumaflow/examples/sink/all_sinks/entry.sh create mode 100644 packages/pynumaflow/examples/sink/all_sinks/example.py create mode 100644 packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml create mode 100644 packages/pynumaflow/examples/sink/all_sinks/pyproject.toml diff --git a/packages/pynumaflow/examples/sink/all_sinks/Dockerfile b/packages/pynumaflow/examples/sink/all_sinks/Dockerfile new file mode 100644 index 00000000..af3ebe18 --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/Dockerfile @@ -0,0 +1,55 @@ +#################################################################################################### +# Stage 1: Base Builder - installs core dependencies using poetry +#################################################################################################### +FROM python:3.10-slim-bullseye AS base-builder + +ENV PYSETUP_PATH="/opt/pysetup" +WORKDIR $PYSETUP_PATH + +# Copy only core dependency files first for better caching +COPY pyproject.toml poetry.lock README.md ./ +COPY pynumaflow/ ./pynumaflow/ +RUN apt-get update && apt-get install --no-install-recommends -y \ + curl wget build-essential git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && pip install poetry \ + && poetry install --no-root --no-interaction + +#################################################################################################### +# Stage 2: UDF Builder - adds UDF code and installs UDF-specific deps +#################################################################################################### +FROM base-builder AS udf-builder + +ENV EXAMPLE_PATH="/opt/pysetup/examples/sink/all_sinks" +ENV POETRY_VIRTUALENVS_IN_PROJECT=true + +WORKDIR $EXAMPLE_PATH +COPY examples/sink/all_sinks/ ./ +RUN poetry install --no-root --no-interaction + +#################################################################################################### +# Stage 3: UDF Runtime - clean container with only needed stuff +#################################################################################################### +FROM python:3.10-slim-bullseye AS udf + +ENV PYSETUP_PATH="/opt/pysetup" +ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sink/all_sinks" +ENV VENV_PATH="$EXAMPLE_PATH/.venv" +ENV PATH="$VENV_PATH/bin:$PATH" + +RUN apt-get update && apt-get install --no-install-recommends -y wget \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \ + && chmod +x /dumb-init + +WORKDIR $PYSETUP_PATH +COPY --from=udf-builder $VENV_PATH $VENV_PATH +COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH + +WORKDIR $EXAMPLE_PATH +RUN chmod +x entry.sh + +ENTRYPOINT ["/dumb-init", "--"] +CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"] + +EXPOSE 5000 diff --git a/packages/pynumaflow/examples/sink/all_sinks/Makefile b/packages/pynumaflow/examples/sink/all_sinks/Makefile new file mode 100644 index 00000000..fdd8e89d --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/Makefile @@ -0,0 +1,22 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-python/all-sinks:${TAG} +DOCKER_FILE_PATH = examples/sink/all_sinks/Dockerfile + +.PHONY: update +update: + poetry update -vv + +.PHONY: image-push +image-push: update + cd ../../../ && docker buildx build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} \ + --platform linux/amd64,linux/arm64 . --push + +.PHONY: image +image: update + cd ../../../ && docker build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi diff --git a/packages/pynumaflow/examples/sink/all_sinks/entry.sh b/packages/pynumaflow/examples/sink/all_sinks/entry.sh new file mode 100644 index 00000000..073b05e3 --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/entry.sh @@ -0,0 +1,4 @@ +#!/bin/sh +set -eux + +python example.py diff --git a/packages/pynumaflow/examples/sink/all_sinks/example.py b/packages/pynumaflow/examples/sink/all_sinks/example.py new file mode 100644 index 00000000..0fe7077e --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/example.py @@ -0,0 +1,81 @@ +import os +from collections.abc import AsyncIterable +from pynumaflow.sinker import Datum, Responses, Response, Sinker, Message +from pynumaflow.sinker import SinkAsyncServer +import logging +import random + +logging.basicConfig(level=logging.DEBUG) +_LOGGER = logging.getLogger(__name__) + + +class UserDefinedSink(Sinker): + async def handler(self, datums: AsyncIterable[Datum]) -> Responses: + responses = Responses() + async for msg in datums: + if primary_sink_write_status(): + _LOGGER.info( + "Write to User Defined Sink succeeded, writing %s to onSuccess sink", + msg.value.decode("utf-8"), + ) + # create a message to be sent to onSuccess sink + on_success_message = Response.as_on_success( + msg.id, + Message(msg.value) + .with_keys(["on_success"]) + .with_user_metadata(msg.user_metadata), + ) + responses.append(on_success_message) + # Sending `None`, on the other hand, specifies that simply send + # the original message to the onSuccess sink + # `responses.append(Response.as_on_success(msg.id, None))` + else: + _LOGGER.info( + "Write to User Defined Sink failed, writing %s to fallback sink", + msg.value.decode("utf-8"), + ) + responses.append(Response.as_fallback(msg.id)) + return responses + + +async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: + responses = Responses() + async for msg in datums: + if primary_sink_write_status(): + _LOGGER.info( + "Write to User Defined Sink succeeded, writing %s to onSuccess sink", + msg.value.decode("utf-8"), + ) + # create a message to be sent to onSuccess sink + on_success_message = Response.as_on_success( + msg.id, + Message(msg.value).with_keys(["on_success"]).with_user_metadata(msg.user_metadata), + ) + responses.append(on_success_message) + # Sending `None`, on the other hand, specifies that simply send + # the original message to the onSuccess sink + # `responses.append(Response.as_on_success(msg.id, None))` + else: + _LOGGER.info( + "Write to User Defined Sink failed, writing %s to fallback sink", + msg.value.decode("utf-8"), + ) + responses.append(Response.as_fallback(msg.id)) + return responses + + +def primary_sink_write_status(): + # simulate writing to primary sink and return status of the same + # return True if writing to primary sink succeeded + # return False if writing to primary sink failed + return random.randint(0, 1) == 1 + + +if __name__ == "__main__": + invoke = os.getenv("INVOKE", "func_handler") + if invoke == "class": + sink_handler = UserDefinedSink() + else: + sink_handler = udsink_handler + grpc_server = SinkAsyncServer(sink_handler) + grpc_server.start() diff --git a/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml b/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml new file mode 100644 index 00000000..e156c5ed --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml @@ -0,0 +1,50 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: all-sinks-pipeline +spec: + vertices: + - name: in + source: + generator: + rpu: 1 + duration: 1s + msgSize: 10 + - name: p1 + udf: + builtin: + name: cat + - name: out + sink: + udsink: + container: + args: + - python + - example.py + image: quay.io/numaio/numaflow-python/all-sinks:stable + imagePullPolicy: IfNotPresent + env: + - name: PYTHONDEBUG + value: "true" + - name: INVOKE + value: "func_handler" + fallback: + udsink: + container: + image: quay.io/numaio/numaflow-python/sink-log:stable + imagePullPolicy: IfNotPresent + onSuccess: + udsink: + container: + image: quay.io/numaio/numaflow-python/sink-log:stable + imagePullPolicy: IfNotPresent + - name: log-output + sink: + log: {} + edges: + - from: in + to: p1 + - from: p1 + to: out + - from: p1 + to: log-output diff --git a/packages/pynumaflow/examples/sink/all_sinks/pyproject.toml b/packages/pynumaflow/examples/sink/all_sinks/pyproject.toml new file mode 100644 index 00000000..2ff3e97a --- /dev/null +++ b/packages/pynumaflow/examples/sink/all_sinks/pyproject.toml @@ -0,0 +1,15 @@ +[tool.poetry] +name = "example-sink" +version = "0.2.4" +description = "" +authors = ["Numaflow developers"] + +[tool.poetry.dependencies] +python = ">=3.10,<3.13" +pynumaflow = { path = "../../../"} + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/packages/pynumaflow/pynumaflow/_constants.py b/packages/pynumaflow/pynumaflow/_constants.py index 01ae44d5..16c74134 100644 --- a/packages/pynumaflow/pynumaflow/_constants.py +++ b/packages/pynumaflow/pynumaflow/_constants.py @@ -25,6 +25,7 @@ SOURCE_SOCK_PATH = "/var/run/numaflow/source.sock" MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc" FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock" +ON_SUCCESS_SINK_SOCK_PATH = "/var/run/numaflow/ons-sink.sock" BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock" ACCUMULATOR_SOCK_PATH = "/var/run/numaflow/accumulator.sock" @@ -37,10 +38,12 @@ SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info" SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info" FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info" +ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/ons-sinker-server-info" ACCUMULATOR_SERVER_INFO_FILE_PATH = "/var/run/numaflow/accumulator-server-info" ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE" UD_CONTAINER_FALLBACK_SINK = "fb-udsink" +UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink" # TODO: need to make sure the DATUM_KEY value is the same as # https://github.com/numaproj/numaflow-go/blob/main/pkg/function/configs.go#L6 diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto b/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto index a94afd31..fdba28d2 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink.proto @@ -67,6 +67,7 @@ enum Status { FAILURE = 1; FALLBACK = 2; SERVE = 3; + ON_SUCCESS = 4; } /** @@ -74,6 +75,11 @@ enum Status { */ message SinkResponse { message Result { + message Message { + bytes value = 1; + repeated string keys = 2; + common.Metadata metadata = 3; + } // id is the ID of the message, can be used to uniquely identify the message. string id = 1; // status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. @@ -81,6 +87,8 @@ message SinkResponse { // err_msg is the error message, set it if success is set to false. string err_msg = 3; optional bytes serve_response = 4; + // on_success_msg is the message to be sent to on_success sink. + optional Message on_success_msg = 5; } repeated Result results = 1; optional Handshake handshake = 2; diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py index 92b0e505..651b2784 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py @@ -27,7 +27,7 @@ from pynumaflow.proto.common import metadata_pb2 as pynumaflow_dot_proto_dot_common_dot_metadata__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"pynumaflow/proto/sinker/sink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\"\xc7\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xa1\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\xac\x02\n\x0cSinkResponse\x12-\n\x07results\x18\x01 \x03(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1av\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\t\x12\x1b\n\x0eserve_response\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x42\x11\n\x0f_serve_responseB\x0c\n\n_handshakeB\t\n\x07_status*;\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x12\t\n\x05SERVE\x10\x03\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"pynumaflow/proto/sinker/sink.proto\x12\x07sink.v1\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&pynumaflow/proto/common/metadata.proto\"\xc7\x03\n\x0bSinkRequest\x12-\n\x07request\x18\x01 \x01(\x0b\x32\x1c.sink.v1.SinkRequest.Request\x12+\n\x06status\x18\x02 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatus\x12*\n\thandshake\x18\x03 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x1a\xa1\x02\n\x07Request\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\x12.\n\nevent_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\twatermark\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\n\n\x02id\x18\x05 \x01(\t\x12:\n\x07headers\x18\x06 \x03(\x0b\x32).sink.v1.SinkRequest.Request.HeadersEntry\x12\"\n\x08metadata\x18\x07 \x01(\x0b\x32\x10.common.Metadata\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x0c\n\n_handshake\"\x18\n\tHandshake\x12\x0b\n\x03sot\x18\x01 \x01(\x08\"\x1e\n\rReadyResponse\x12\r\n\x05ready\x18\x01 \x01(\x08\"!\n\x12TransmissionStatus\x12\x0b\n\x03\x65ot\x18\x01 \x01(\x08\"\xcf\x03\n\x0cSinkResponse\x12-\n\x07results\x18\x01 \x03(\x0b\x32\x1c.sink.v1.SinkResponse.Result\x12*\n\thandshake\x18\x02 \x01(\x0b\x32\x12.sink.v1.HandshakeH\x00\x88\x01\x01\x12\x30\n\x06status\x18\x03 \x01(\x0b\x32\x1b.sink.v1.TransmissionStatusH\x01\x88\x01\x01\x1a\x98\x02\n\x06Result\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1f\n\x06status\x18\x02 \x01(\x0e\x32\x0f.sink.v1.Status\x12\x0f\n\x07\x65rr_msg\x18\x03 \x01(\t\x12\x1b\n\x0eserve_response\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x12\x41\n\x0eon_success_msg\x18\x05 \x01(\x0b\x32$.sink.v1.SinkResponse.Result.MessageH\x01\x88\x01\x01\x1aJ\n\x07Message\x12\r\n\x05value\x18\x01 \x01(\x0c\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12\"\n\x08metadata\x18\x03 \x01(\x0b\x32\x10.common.MetadataB\x11\n\x0f_serve_responseB\x11\n\x0f_on_success_msgB\x0c\n\n_handshakeB\t\n\x07_status*K\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x12\x0c\n\x08\x46\x41LLBACK\x10\x02\x12\t\n\x05SERVE\x10\x03\x12\x0e\n\nON_SUCCESS\x10\x04\x32|\n\x04Sink\x12\x39\n\x06SinkFn\x12\x14.sink.v1.SinkRequest\x1a\x15.sink.v1.SinkResponse(\x01\x30\x01\x12\x39\n\x07IsReady\x12\x16.google.protobuf.Empty\x1a\x16.sink.v1.ReadyResponseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -36,8 +36,8 @@ DESCRIPTOR._loaded_options = None _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._loaded_options = None _globals['_SINKREQUEST_REQUEST_HEADERSENTRY']._serialized_options = b'8\001' - _globals['_STATUS']._serialized_start=1003 - _globals['_STATUS']._serialized_end=1062 + _globals['_STATUS']._serialized_start=1166 + _globals['_STATUS']._serialized_end=1241 _globals['_SINKREQUEST']._serialized_start=150 _globals['_SINKREQUEST']._serialized_end=605 _globals['_SINKREQUEST_REQUEST']._serialized_start=302 @@ -51,9 +51,11 @@ _globals['_TRANSMISSIONSTATUS']._serialized_start=665 _globals['_TRANSMISSIONSTATUS']._serialized_end=698 _globals['_SINKRESPONSE']._serialized_start=701 - _globals['_SINKRESPONSE']._serialized_end=1001 - _globals['_SINKRESPONSE_RESULT']._serialized_start=858 - _globals['_SINKRESPONSE_RESULT']._serialized_end=976 - _globals['_SINK']._serialized_start=1064 - _globals['_SINK']._serialized_end=1188 + _globals['_SINKRESPONSE']._serialized_end=1164 + _globals['_SINKRESPONSE_RESULT']._serialized_start=859 + _globals['_SINKRESPONSE_RESULT']._serialized_end=1139 + _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_start=1027 + _globals['_SINKRESPONSE_RESULT_MESSAGE']._serialized_end=1101 + _globals['_SINK']._serialized_start=1243 + _globals['_SINK']._serialized_end=1367 # @@protoc_insertion_point(module_scope) diff --git a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi index 57a3728b..2c696e72 100644 --- a/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi +++ b/packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi @@ -18,10 +18,12 @@ class Status(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): FAILURE: _ClassVar[Status] FALLBACK: _ClassVar[Status] SERVE: _ClassVar[Status] + ON_SUCCESS: _ClassVar[Status] SUCCESS: Status FAILURE: Status FALLBACK: Status SERVE: Status +ON_SUCCESS: Status class SinkRequest(_message.Message): __slots__ = ("request", "status", "handshake") @@ -78,16 +80,27 @@ class TransmissionStatus(_message.Message): class SinkResponse(_message.Message): __slots__ = ("results", "handshake", "status") class Result(_message.Message): - __slots__ = ("id", "status", "err_msg", "serve_response") + __slots__ = ("id", "status", "err_msg", "serve_response", "on_success_msg") + class Message(_message.Message): + __slots__ = ("value", "keys", "metadata") + VALUE_FIELD_NUMBER: _ClassVar[int] + KEYS_FIELD_NUMBER: _ClassVar[int] + METADATA_FIELD_NUMBER: _ClassVar[int] + value: bytes + keys: _containers.RepeatedScalarFieldContainer[str] + metadata: _metadata_pb2.Metadata + def __init__(self, value: _Optional[bytes] = ..., keys: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ...) -> None: ... ID_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] ERR_MSG_FIELD_NUMBER: _ClassVar[int] SERVE_RESPONSE_FIELD_NUMBER: _ClassVar[int] + ON_SUCCESS_MSG_FIELD_NUMBER: _ClassVar[int] id: str status: Status err_msg: str serve_response: bytes - def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ...) -> None: ... + on_success_msg: SinkResponse.Result.Message + def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ..., on_success_msg: _Optional[_Union[SinkResponse.Result.Message, _Mapping]] = ...) -> None: ... RESULTS_FIELD_NUMBER: _ClassVar[int] HANDSHAKE_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] diff --git a/packages/pynumaflow/pynumaflow/sinker/__init__.py b/packages/pynumaflow/pynumaflow/sinker/__init__.py index 1064d96a..5a67f752 100644 --- a/packages/pynumaflow/pynumaflow/sinker/__init__.py +++ b/packages/pynumaflow/pynumaflow/sinker/__init__.py @@ -3,9 +3,10 @@ from pynumaflow.sinker.server import SinkServer from pynumaflow._metadata import UserMetadata, SystemMetadata -from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker +from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker, Message __all__ = [ + "Message", "Response", "Responses", "Datum", diff --git a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py index 43ce3386..daf3882a 100644 --- a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py @@ -12,6 +12,42 @@ Rs = TypeVar("Rs", bound="Responses") +@dataclass +class Message: + """ + Basic datatype for OnSuccess UDSink message. + + Args: + keys: the keys of the on_success message. + value: the payload of the on_success message. + user_metadata: the user metadata of the on_success message. + """ + + keys: Optional[list[str]] + value: bytes + user_metadata: Optional[UserMetadata] + + __slots__ = ("keys", "value", "user_metadata") + + def __init__( + self, + value: bytes, + keys: Optional[list[str]] = None, + user_metadata: Optional[UserMetadata] = None, + ): + self.value = value + self.keys = keys + self.user_metadata = user_metadata + + def with_keys(self, keys: Optional[list[str]]): + self.keys = keys + return self + + def with_user_metadata(self, user_metadata: Optional[UserMetadata]): + self.user_metadata = user_metadata + return self + + @dataclass class Response: """ @@ -28,26 +64,33 @@ class Response: success: bool err: Optional[str] fallback: bool + on_success: Optional[Message] - __slots__ = ("id", "success", "err", "fallback") + __slots__ = ("id", "success", "err", "fallback", "on_success") # as_success creates a successful Response with the given id. # The Success field is set to true. @classmethod def as_success(cls: type[R], id_: str) -> R: - return Response(id=id_, success=True, err=None, fallback=False) + return Response(id=id_, success=True, err=None, fallback=False, on_success=None) # as_failure creates a failed Response with the given id and error message. # The success field is set to false and the err field is set to the provided error message. @classmethod def as_failure(cls: type[R], id_: str, err_msg: str) -> R: - return Response(id=id_, success=False, err=err_msg, fallback=False) + return Response(id=id_, success=False, err=err_msg, fallback=False, on_success=None) # as_fallback creates a Response with the fallback field set to true. # This indicates that the message should be sent to the fallback sink. @classmethod def as_fallback(cls: type[R], id_: str) -> R: - return Response(id=id_, fallback=True, err=None, success=False) + return Response(id=id_, fallback=True, err=None, success=False, on_success=None) + + # as_on_success creates a Response with the on_success field set to true. + # This indicates that the message should be sent to the on_success sink. + @classmethod + def as_on_success(cls: type[R], id_: str, on_success: Optional[Message] = None) -> R: + return Response(id=id_, fallback=False, err=None, success=False, on_success=on_success) class Responses(Sequence[R]): diff --git a/packages/pynumaflow/pynumaflow/sinker/async_server.py b/packages/pynumaflow/pynumaflow/sinker/async_server.py index a329e47e..a9331aca 100644 --- a/packages/pynumaflow/pynumaflow/sinker/async_server.py +++ b/packages/pynumaflow/pynumaflow/sinker/async_server.py @@ -18,6 +18,9 @@ _LOGGER, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, MAX_NUM_THREADS, ) @@ -86,6 +89,10 @@ def __init__( _LOGGER.info("Using Fallback Sink") sock_path = FALLBACK_SINK_SOCK_PATH server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH + elif os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_ON_SUCCESS_SINK: + _LOGGER.info("Using On Success Sink") + sock_path = ON_SUCCESS_SINK_SOCK_PATH + server_info_file = ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) diff --git a/packages/pynumaflow/pynumaflow/sinker/server.py b/packages/pynumaflow/pynumaflow/sinker/server.py index dc3a4788..842c1725 100644 --- a/packages/pynumaflow/pynumaflow/sinker/server.py +++ b/packages/pynumaflow/pynumaflow/sinker/server.py @@ -14,6 +14,9 @@ UD_CONTAINER_FALLBACK_SINK, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, MAX_NUM_THREADS, ) @@ -82,6 +85,10 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses: _LOGGER.info("Using Fallback Sink") sock_path = FALLBACK_SINK_SOCK_PATH server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH + elif os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_ON_SUCCESS_SINK: + _LOGGER.info("Using On Success Sink") + sock_path = ON_SUCCESS_SINK_SOCK_PATH + server_info_file = ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH self.sock_path = f"unix://{sock_path}" self.max_threads = min(max_threads, MAX_NUM_THREADS) diff --git a/packages/pynumaflow/tests/sink/test_responses.py b/packages/pynumaflow/tests/sink/test_responses.py index 118570d5..7f5a67bf 100644 --- a/packages/pynumaflow/tests/sink/test_responses.py +++ b/packages/pynumaflow/tests/sink/test_responses.py @@ -40,10 +40,11 @@ def test_responses(self): self.assertEqual(self.resps[3].id, "4") self.assertEqual( - "[Response(id='2', success=True, err=None, fallback=False), " - "Response(id='3', success=False, err='RuntimeError encountered!', fallback=False), " - "Response(id='5', success=False, err=None, fallback=True), " - "Response(id='4', success=True, err=None, fallback=False)]", + "[Response(id='2', success=True, err=None, fallback=False, on_success=None), " + "Response(id='3', success=False, err='RuntimeError encountered!', " + "fallback=False, on_success=None), " + "Response(id='5', success=False, err=None, fallback=True, on_success=None), " + "Response(id='4', success=True, err=None, fallback=False, on_success=None)]", repr(self.resps), ) From 1473d5730de62413cbaa72b3e8ace72f404ca8d6 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Mon, 22 Dec 2025 23:14:45 -0500 Subject: [PATCH 2/5] Add tests. Convert on_success message to pb equivalent before sending. Update example added Signed-off-by: vtiwari5 --- .../examples/sink/all_sinks/example.py | 4 +- .../examples/sink/all_sinks/pipeline.yaml | 28 +++---- .../pynumaflow/pynumaflow/sinker/_dtypes.py | 68 ++++++++++++---- .../pynumaflow/sinker/servicer/utils.py | 29 ++++++- .../pynumaflow/tests/sink/test_async_sink.py | 79 ++++++++++++++++++- .../pynumaflow/tests/sink/test_responses.py | 29 +++++-- packages/pynumaflow/tests/sink/test_server.py | 37 ++++++++- 7 files changed, 228 insertions(+), 46 deletions(-) diff --git a/packages/pynumaflow/examples/sink/all_sinks/example.py b/packages/pynumaflow/examples/sink/all_sinks/example.py index 0fe7077e..55122c54 100644 --- a/packages/pynumaflow/examples/sink/all_sinks/example.py +++ b/packages/pynumaflow/examples/sink/all_sinks/example.py @@ -21,9 +21,7 @@ async def handler(self, datums: AsyncIterable[Datum]) -> Responses: # create a message to be sent to onSuccess sink on_success_message = Response.as_on_success( msg.id, - Message(msg.value) - .with_keys(["on_success"]) - .with_user_metadata(msg.user_metadata), + Message(msg.value, ["on_success"], msg.user_metadata), ) responses.append(on_success_message) # Sending `None`, on the other hand, specifies that simply send diff --git a/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml b/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml index e156c5ed..53d256c7 100644 --- a/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml +++ b/packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml @@ -10,10 +10,6 @@ spec: rpu: 1 duration: 1s msgSize: 10 - - name: p1 - udf: - builtin: - name: cat - name: out sink: udsink: @@ -28,23 +24,21 @@ spec: value: "true" - name: INVOKE value: "func_handler" - fallback: - udsink: - container: - image: quay.io/numaio/numaflow-python/sink-log:stable - imagePullPolicy: IfNotPresent - onSuccess: - udsink: - container: - image: quay.io/numaio/numaflow-python/sink-log:stable - imagePullPolicy: IfNotPresent + fallback: + udsink: + container: + image: quay.io/numaio/numaflow-python/sink-log:stable + imagePullPolicy: IfNotPresent + onSuccess: + udsink: + container: + image: quay.io/numaio/numaflow-rs/sink-log:stable + imagePullPolicy: IfNotPresent - name: log-output sink: log: {} edges: - from: in - to: p1 - - from: p1 to: out - - from: p1 + - from: in to: log-output diff --git a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py index daf3882a..8cc1a7f0 100644 --- a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py @@ -23,30 +23,45 @@ class Message: user_metadata: the user metadata of the on_success message. """ - keys: Optional[list[str]] - value: bytes - user_metadata: Optional[UserMetadata] + _keys: Optional[list[str]] + _value: bytes + _user_metadata: Optional[UserMetadata] - __slots__ = ("keys", "value", "user_metadata") + __slots__ = ("_keys", "_value", "_user_metadata") def __init__( self, value: bytes, - keys: Optional[list[str]] = None, - user_metadata: Optional[UserMetadata] = None, + keys: list[str] = None, + user_metadata: UserMetadata = None, ): - self.value = value - self.keys = keys - self.user_metadata = user_metadata + self._value = value + self._keys = keys + self._user_metadata = user_metadata def with_keys(self, keys: Optional[list[str]]): - self.keys = keys + self._keys = keys return self def with_user_metadata(self, user_metadata: Optional[UserMetadata]): - self.user_metadata = user_metadata + self._user_metadata = user_metadata return self + @property + def keys(self) -> Optional[list[str]]: + """Returns the id of the event.""" + return self._keys + + @property + def value(self) -> bytes: + """Returns the id of the event.""" + return self._value + + @property + def user_metadata(self) -> Optional[UserMetadata]: + """Returns the id of the event.""" + return self._user_metadata + @dataclass class Response: @@ -64,33 +79,52 @@ class Response: success: bool err: Optional[str] fallback: bool - on_success: Optional[Message] + on_success: bool + on_success_msg: Optional[Message] - __slots__ = ("id", "success", "err", "fallback", "on_success") + __slots__ = ("id", "success", "err", "fallback", "on_success", "on_success_msg") # as_success creates a successful Response with the given id. # The Success field is set to true. @classmethod def as_success(cls: type[R], id_: str) -> R: - return Response(id=id_, success=True, err=None, fallback=False, on_success=None) + return Response( + id=id_, success=True, err=None, fallback=False, on_success=False, on_success_msg=None + ) # as_failure creates a failed Response with the given id and error message. # The success field is set to false and the err field is set to the provided error message. @classmethod def as_failure(cls: type[R], id_: str, err_msg: str) -> R: - return Response(id=id_, success=False, err=err_msg, fallback=False, on_success=None) + return Response( + id=id_, + success=False, + err=err_msg, + fallback=False, + on_success=False, + on_success_msg=None, + ) # as_fallback creates a Response with the fallback field set to true. # This indicates that the message should be sent to the fallback sink. @classmethod def as_fallback(cls: type[R], id_: str) -> R: - return Response(id=id_, fallback=True, err=None, success=False, on_success=None) + return Response( + id=id_, fallback=True, err=None, success=False, on_success=False, on_success_msg=None + ) # as_on_success creates a Response with the on_success field set to true. # This indicates that the message should be sent to the on_success sink. @classmethod def as_on_success(cls: type[R], id_: str, on_success: Optional[Message] = None) -> R: - return Response(id=id_, fallback=False, err=None, success=False, on_success=on_success) + return Response( + id=id_, + fallback=False, + err=None, + success=False, + on_success=True, + on_success_msg=on_success, + ) class Responses(Sequence[R]): diff --git a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py index 465240b4..147bb1ad 100644 --- a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py +++ b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py @@ -1,6 +1,9 @@ +from typing import Optional + from pynumaflow._metadata import _user_and_system_metadata_from_proto +from pynumaflow.proto.common import metadata_pb2 from pynumaflow.proto.sinker import sink_pb2 -from pynumaflow.sinker._dtypes import Response, Datum, Responses +from pynumaflow.sinker._dtypes import Response, Datum, Responses, Message def build_sink_resp_results(responses: Responses) -> list[sink_pb2.SinkResponse.Result]: @@ -32,12 +35,36 @@ def build_sink_response(rspn: Response) -> sink_pb2.SinkResponse.Result: return sink_pb2.SinkResponse.Result(id=rid, status=sink_pb2.Status.SUCCESS) elif rspn.fallback: return sink_pb2.SinkResponse.Result(id=rid, status=sink_pb2.Status.FALLBACK) + elif rspn.on_success: + return sink_pb2.SinkResponse.Result( + id=rid, + status=sink_pb2.Status.ON_SUCCESS, + on_success_msg=build_on_success_message(rspn.on_success_msg), + ) else: return sink_pb2.SinkResponse.Result( id=rid, status=sink_pb2.Status.FAILURE, err_msg=rspn.err ) +def build_on_success_message( + msg: Optional[Message], +) -> Optional[sink_pb2.SinkResponse.Result.Message]: + if not msg: + return None + + if msg.user_metadata is not None: + metadata = msg.user_metadata._to_proto() + else: + metadata = None + + return sink_pb2.SinkResponse.Result.Message( + keys=msg.keys, + value=msg.value, + metadata=metadata, + ) + + def datum_from_sink_req(d: sink_pb2.SinkRequest) -> Datum: """ Convert a SinkRequest object to a Datum object. diff --git a/packages/pynumaflow/tests/sink/test_async_sink.py b/packages/pynumaflow/tests/sink/test_async_sink.py index c3d91fe5..ef21e458 100644 --- a/packages/pynumaflow/tests/sink/test_async_sink.py +++ b/packages/pynumaflow/tests/sink/test_async_sink.py @@ -15,12 +15,15 @@ UD_CONTAINER_FALLBACK_SINK, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, ) from pynumaflow.proto.common import metadata_pb2 from pynumaflow.sinker import ( Datum, ) -from pynumaflow.sinker import Responses, Response +from pynumaflow.sinker import Responses, Response, Message, UserMetadata from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2 from pynumaflow.sinker.async_server import SinkAsyncServer from tests.sink.test_server import ( @@ -41,6 +44,10 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: raise ValueError("test_mock_err_message") elif msg.value.decode("utf-8") == "test_mock_fallback_message": responses.append(Response.as_fallback(msg.id)) + elif msg.value.decode("utf-8") == "test_mock_on_success1_message": + responses.append(Response.as_on_success(msg.id, None)) + elif msg.value.decode("utf-8") == "test_mock_on_success2_message": + responses.append(Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata()))) else: if msg.user_metadata.groups() != ["custom_info"]: raise ValueError("user metadata groups do not match") @@ -59,6 +66,12 @@ def start_sink_streaming_request(_id: str, req_type) -> (Datum, tuple): if req_type == "fallback": value = mock_fallback_message() + if req_type == "on_success1": + value = bytes("test_mock_on_success1_message", encoding="utf-8") + + if req_type == "on_success2": + value = bytes("test_mock_on_success2_message", encoding="utf-8") + request = sink_pb2.SinkRequest.Request( value=value, event_time=event_time_timestamp, @@ -259,6 +272,64 @@ def test_sink_fallback(self) -> None: except grpc.RpcError as e: logging.error(e) + def test_sink_on_success1(self) -> None: + stub = self.__stub() + try: + generator_response = stub.SinkFn( + request_iterator=request_generator(count=10, req_type="on_success1", session=1) + ) + handshake = next(generator_response) + # assert that handshake response is received. + self.assertTrue(handshake.handshake.sot) + + data_resp = [] + for r in generator_response: + data_resp.append(r) + + # 1 sink data response + 1 EOT response + self.assertEqual(2, len(data_resp)) + + idx = 0 + # capture the output from the SinkFn generator and assert. + for resp in data_resp[0].results: + self.assertEqual(resp.id, str(idx)) + self.assertEqual(resp.status, sink_pb2.Status.ON_SUCCESS) + idx += 1 + # EOT Response + self.assertEqual(data_resp[1].status.eot, True) + + except grpc.RpcError as e: + logging.error(e) + + def test_sink_on_success2(self) -> None: + stub = self.__stub() + try: + generator_response = stub.SinkFn( + request_iterator=request_generator(count=10, req_type="on_success2", session=1) + ) + handshake = next(generator_response) + # assert that handshake response is received. + self.assertTrue(handshake.handshake.sot) + + data_resp = [] + for r in generator_response: + data_resp.append(r) + + # 1 sink data response + 1 EOT response + self.assertEqual(2, len(data_resp)) + + idx = 0 + # capture the output from the SinkFn generator and assert. + for resp in data_resp[0].results: + self.assertEqual(resp.id, str(idx)) + self.assertEqual(resp.status, sink_pb2.Status.ON_SUCCESS) + idx += 1 + # EOT Response + self.assertEqual(data_resp[1].status.eot, True) + + except grpc.RpcError as e: + logging.error(e) + def __stub(self): return sink_pb2_grpc.SinkStub(_channel) @@ -272,6 +343,12 @@ def test_start_fallback_sink(self): self.assertEqual(server.sock_path, f"unix://{FALLBACK_SINK_SOCK_PATH}") self.assertEqual(server.server_info_file, FALLBACK_SINK_SERVER_INFO_FILE_PATH) + @mockenv(NUMAFLOW_UD_CONTAINER_TYPE=UD_CONTAINER_ON_SUCCESS_SINK) + def test_start_on_success_sink(self): + server = SinkAsyncServer(sinker_instance=udsink_handler) + self.assertEqual(server.sock_path, f"unix://{ON_SUCCESS_SINK_SOCK_PATH}") + self.assertEqual(server.server_info_file, ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH) + def test_max_threads(self): # max cap at 16 server = SinkAsyncServer(sinker_instance=udsink_handler, max_threads=32) diff --git a/packages/pynumaflow/tests/sink/test_responses.py b/packages/pynumaflow/tests/sink/test_responses.py index 7f5a67bf..6a75a0ff 100644 --- a/packages/pynumaflow/tests/sink/test_responses.py +++ b/packages/pynumaflow/tests/sink/test_responses.py @@ -1,7 +1,7 @@ import unittest from collections.abc import Iterator -from pynumaflow.sinker import Response, Responses, Sinker, Datum +from pynumaflow.sinker import Response, Responses, Sinker, Datum, Message, UserMetadata class TestResponse(unittest.TestCase): @@ -18,6 +18,12 @@ def test_as_fallback(self): self.assertFalse(_response.success) self.assertTrue(_response.fallback) + def test_as_on_success(self): + _response = Response.as_on_success("5", Message(b"value", ["key"], UserMetadata())) + self.assertFalse(_response.success) + self.assertFalse(_response.fallback) + self.assertTrue(_response.on_success) + class TestResponses(unittest.TestCase): def setUp(self) -> None: @@ -29,7 +35,9 @@ def setUp(self) -> None: def test_responses(self): self.resps.append(Response.as_success("4")) - self.assertEqual(4, len(self.resps)) + self.resps.append(Response.as_on_success("6", Message(b"value", ["key"], UserMetadata()))) + self.resps.append(Response.as_on_success("7", None)) + self.assertEqual(6, len(self.resps)) for resp in self.resps: self.assertIsInstance(resp, Response) @@ -38,13 +46,22 @@ def test_responses(self): self.assertEqual(self.resps[1].id, "3") self.assertEqual(self.resps[2].id, "5") self.assertEqual(self.resps[3].id, "4") + self.assertEqual(self.resps[4].id, "6") + self.assertEqual(self.resps[5].id, "7") self.assertEqual( - "[Response(id='2', success=True, err=None, fallback=False, on_success=None), " + "[Response(id='2', success=True, err=None, fallback=False, " + "on_success=False, on_success_msg=None), " "Response(id='3', success=False, err='RuntimeError encountered!', " - "fallback=False, on_success=None), " - "Response(id='5', success=False, err=None, fallback=True, on_success=None), " - "Response(id='4', success=True, err=None, fallback=False, on_success=None)]", + "fallback=False, on_success=False, on_success_msg=None), " + "Response(id='5', success=False, err=None, fallback=True, " + "on_success=False, on_success_msg=None), " + "Response(id='4', success=True, err=None, fallback=False, " + "on_success=False, on_success_msg=None), " + "Response(id='6', success=False, err=None, fallback=False, " + "on_success=True, on_success_msg=Message(_keys=['key'], _value=b'value', _user_metadata=UserMetadata(_data={}))), " + "Response(id='7', success=False, err=None, fallback=False, " + "on_success=True, on_success_msg=None)]", repr(self.resps), ) diff --git a/packages/pynumaflow/tests/sink/test_server.py b/packages/pynumaflow/tests/sink/test_server.py index 8baf9c96..0f72a3f9 100644 --- a/packages/pynumaflow/tests/sink/test_server.py +++ b/packages/pynumaflow/tests/sink/test_server.py @@ -14,10 +14,13 @@ UD_CONTAINER_FALLBACK_SINK, FALLBACK_SINK_SOCK_PATH, FALLBACK_SINK_SERVER_INFO_FILE_PATH, + UD_CONTAINER_ON_SUCCESS_SINK, + ON_SUCCESS_SINK_SOCK_PATH, + ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH, ) from pynumaflow.proto.common import metadata_pb2 from pynumaflow.proto.sinker import sink_pb2 -from pynumaflow.sinker import Responses, Datum, Response, SinkServer +from pynumaflow.sinker import Responses, Datum, Response, SinkServer, Message, UserMetadata from tests.testing_utils import mock_terminate_on_stop @@ -32,6 +35,10 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses: results.append(Response.as_failure(msg.id, "mock sink message error")) elif "fallback" in msg.value.decode("utf-8"): results.append(Response.as_fallback(msg.id)) + elif "on_success1" in msg.value.decode("utf-8"): + results.append(Response.as_on_success(msg.id, None)) + elif "on_success2" in msg.value.decode("utf-8"): + results.append(Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata()))) else: if msg.user_metadata.groups() != ["custom_info"]: raise ValueError("user metadata groups do not match") @@ -230,6 +237,24 @@ def test_forward_message(self): metadata=self.metadata, ) ), + sink_pb2.SinkRequest( + request=sink_pb2.SinkRequest.Request( + id="test_id_2", + value=bytes("test_mock_on_success1_message", encoding="utf-8"), + event_time=event_time_timestamp, + watermark=watermark_timestamp, + metadata=self.metadata, + ) + ), + sink_pb2.SinkRequest( + request=sink_pb2.SinkRequest.Request( + id="test_id_3", + value=bytes("test_mock_on_success2_message", encoding="utf-8"), + event_time=event_time_timestamp, + watermark=watermark_timestamp, + metadata=self.metadata, + ) + ), sink_pb2.SinkRequest(status=sink_pb2.TransmissionStatus(eot=True)), ] @@ -258,9 +283,13 @@ def test_forward_message(self): # first message should be handshake response self.assertTrue(responses[0].handshake.sot) + self.assertEqual(4, len(responses[1].results)) + # assert the values for the corresponding messages self.assertEqual("test_id_0", responses[1].results[0].id) self.assertEqual("test_id_1", responses[1].results[1].id) + self.assertEqual("test_id_2", responses[1].results[2].id) + self.assertEqual("test_id_3", responses[1].results[3].id) self.assertEqual(responses[1].results[0].status, sink_pb2.Status.SUCCESS) self.assertEqual(responses[1].results[1].status, sink_pb2.Status.FAILURE) self.assertEqual("", responses[1].results[0].err_msg) @@ -282,6 +311,12 @@ def test_start_fallback_sink(self): self.assertEqual(server.sock_path, f"unix://{FALLBACK_SINK_SOCK_PATH}") self.assertEqual(server.server_info_file, FALLBACK_SINK_SERVER_INFO_FILE_PATH) + @mockenv(NUMAFLOW_UD_CONTAINER_TYPE=UD_CONTAINER_ON_SUCCESS_SINK) + def test_start_on_success_sink(self): + server = SinkServer(sinker_instance=udsink_handler) + self.assertEqual(server.sock_path, f"unix://{ON_SUCCESS_SINK_SOCK_PATH}") + self.assertEqual(server.server_info_file, ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH) + def test_max_threads(self): # max cap at 16 server = SinkServer(sinker_instance=udsink_handler, max_threads=32) From 114580b97b4211efada7c30548bb7ba44678446e Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Mon, 22 Dec 2025 23:21:27 -0500 Subject: [PATCH 3/5] fix linting Signed-off-by: Vaibhav Tiwari --- packages/pynumaflow/pynumaflow/sinker/servicer/utils.py | 1 - packages/pynumaflow/tests/sink/test_async_sink.py | 4 +++- packages/pynumaflow/tests/sink/test_responses.py | 3 ++- packages/pynumaflow/tests/sink/test_server.py | 4 +++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py index 147bb1ad..a7e47929 100644 --- a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py +++ b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py @@ -1,7 +1,6 @@ from typing import Optional from pynumaflow._metadata import _user_and_system_metadata_from_proto -from pynumaflow.proto.common import metadata_pb2 from pynumaflow.proto.sinker import sink_pb2 from pynumaflow.sinker._dtypes import Response, Datum, Responses, Message diff --git a/packages/pynumaflow/tests/sink/test_async_sink.py b/packages/pynumaflow/tests/sink/test_async_sink.py index ef21e458..2dee50ec 100644 --- a/packages/pynumaflow/tests/sink/test_async_sink.py +++ b/packages/pynumaflow/tests/sink/test_async_sink.py @@ -47,7 +47,9 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: elif msg.value.decode("utf-8") == "test_mock_on_success1_message": responses.append(Response.as_on_success(msg.id, None)) elif msg.value.decode("utf-8") == "test_mock_on_success2_message": - responses.append(Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata()))) + responses.append( + Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata())) + ) else: if msg.user_metadata.groups() != ["custom_info"]: raise ValueError("user metadata groups do not match") diff --git a/packages/pynumaflow/tests/sink/test_responses.py b/packages/pynumaflow/tests/sink/test_responses.py index 6a75a0ff..73a11cf7 100644 --- a/packages/pynumaflow/tests/sink/test_responses.py +++ b/packages/pynumaflow/tests/sink/test_responses.py @@ -59,7 +59,8 @@ def test_responses(self): "Response(id='4', success=True, err=None, fallback=False, " "on_success=False, on_success_msg=None), " "Response(id='6', success=False, err=None, fallback=False, " - "on_success=True, on_success_msg=Message(_keys=['key'], _value=b'value', _user_metadata=UserMetadata(_data={}))), " + "on_success=True, on_success_msg=Message(_keys=['key'], _value=b'value', " + "_user_metadata=UserMetadata(_data={}))), " "Response(id='7', success=False, err=None, fallback=False, " "on_success=True, on_success_msg=None)]", repr(self.resps), diff --git a/packages/pynumaflow/tests/sink/test_server.py b/packages/pynumaflow/tests/sink/test_server.py index 0f72a3f9..2845519f 100644 --- a/packages/pynumaflow/tests/sink/test_server.py +++ b/packages/pynumaflow/tests/sink/test_server.py @@ -38,7 +38,9 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses: elif "on_success1" in msg.value.decode("utf-8"): results.append(Response.as_on_success(msg.id, None)) elif "on_success2" in msg.value.decode("utf-8"): - results.append(Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata()))) + results.append( + Response.as_on_success(msg.id, Message(b"value", ["key"], UserMetadata())) + ) else: if msg.user_metadata.groups() != ["custom_info"]: raise ValueError("user metadata groups do not match") From f179c26244a92ced1d338bb98d59b8ff037f3b8b Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Fri, 2 Jan 2026 14:07:21 -0500 Subject: [PATCH 4/5] Address review comments Signed-off-by: Vaibhav Tiwari --- .../examples/sink/all_sinks/Dockerfile | 3 +-- .../examples/sink/all_sinks/example.py | 17 ++++++----------- .../pynumaflow/pynumaflow/sinker/_dtypes.py | 12 ++++++------ .../pynumaflow/sinker/servicer/utils.py | 7 ++----- .../pynumaflow/tests/sink/test_async_sink.py | 12 ++++++++++-- packages/pynumaflow/tests/sink/test_server.py | 4 ++-- 6 files changed, 27 insertions(+), 28 deletions(-) diff --git a/packages/pynumaflow/examples/sink/all_sinks/Dockerfile b/packages/pynumaflow/examples/sink/all_sinks/Dockerfile index af3ebe18..c0126132 100644 --- a/packages/pynumaflow/examples/sink/all_sinks/Dockerfile +++ b/packages/pynumaflow/examples/sink/all_sinks/Dockerfile @@ -47,9 +47,8 @@ COPY --from=udf-builder $VENV_PATH $VENV_PATH COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH WORKDIR $EXAMPLE_PATH -RUN chmod +x entry.sh ENTRYPOINT ["/dumb-init", "--"] -CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"] +CMD ["python", "$EXAMPLE_PATH/example.py"] EXPOSE 5000 diff --git a/packages/pynumaflow/examples/sink/all_sinks/example.py b/packages/pynumaflow/examples/sink/all_sinks/example.py index 55122c54..58a18601 100644 --- a/packages/pynumaflow/examples/sink/all_sinks/example.py +++ b/packages/pynumaflow/examples/sink/all_sinks/example.py @@ -1,4 +1,3 @@ -import os from collections.abc import AsyncIterable from pynumaflow.sinker import Datum, Responses, Response, Sinker, Message from pynumaflow.sinker import SinkAsyncServer @@ -6,7 +5,7 @@ import random logging.basicConfig(level=logging.DEBUG) -_LOGGER = logging.getLogger(__name__) +logger = logging.getLogger(__name__) class UserDefinedSink(Sinker): @@ -14,7 +13,7 @@ async def handler(self, datums: AsyncIterable[Datum]) -> Responses: responses = Responses() async for msg in datums: if primary_sink_write_status(): - _LOGGER.info( + logger.info( "Write to User Defined Sink succeeded, writing %s to onSuccess sink", msg.value.decode("utf-8"), ) @@ -28,7 +27,7 @@ async def handler(self, datums: AsyncIterable[Datum]) -> Responses: # the original message to the onSuccess sink # `responses.append(Response.as_on_success(msg.id, None))` else: - _LOGGER.info( + logger.info( "Write to User Defined Sink failed, writing %s to fallback sink", msg.value.decode("utf-8"), ) @@ -40,7 +39,7 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: responses = Responses() async for msg in datums: if primary_sink_write_status(): - _LOGGER.info( + logger.info( "Write to User Defined Sink succeeded, writing %s to onSuccess sink", msg.value.decode("utf-8"), ) @@ -54,7 +53,7 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses: # the original message to the onSuccess sink # `responses.append(Response.as_on_success(msg.id, None))` else: - _LOGGER.info( + logger.info( "Write to User Defined Sink failed, writing %s to fallback sink", msg.value.decode("utf-8"), ) @@ -70,10 +69,6 @@ def primary_sink_write_status(): if __name__ == "__main__": - invoke = os.getenv("INVOKE", "func_handler") - if invoke == "class": - sink_handler = UserDefinedSink() - else: - sink_handler = udsink_handler + sink_handler = UserDefinedSink() grpc_server = SinkAsyncServer(sink_handler) grpc_server.start() diff --git a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py index 8cc1a7f0..c2582ca2 100644 --- a/packages/pynumaflow/pynumaflow/sinker/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sinker/_dtypes.py @@ -32,8 +32,8 @@ class Message: def __init__( self, value: bytes, - keys: list[str] = None, - user_metadata: UserMetadata = None, + keys: Optional[list[str]] = None, + user_metadata: Optional[UserMetadata] = None, ): self._value = value self._keys = keys @@ -87,7 +87,7 @@ class Response: # as_success creates a successful Response with the given id. # The Success field is set to true. @classmethod - def as_success(cls: type[R], id_: str) -> R: + def as_success(cls, id_: str) -> "Response": return Response( id=id_, success=True, err=None, fallback=False, on_success=False, on_success_msg=None ) @@ -95,7 +95,7 @@ def as_success(cls: type[R], id_: str) -> R: # as_failure creates a failed Response with the given id and error message. # The success field is set to false and the err field is set to the provided error message. @classmethod - def as_failure(cls: type[R], id_: str, err_msg: str) -> R: + def as_failure(cls, id_: str, err_msg: str) -> "Response": return Response( id=id_, success=False, @@ -108,7 +108,7 @@ def as_failure(cls: type[R], id_: str, err_msg: str) -> R: # as_fallback creates a Response with the fallback field set to true. # This indicates that the message should be sent to the fallback sink. @classmethod - def as_fallback(cls: type[R], id_: str) -> R: + def as_fallback(cls, id_: str) -> "Response": return Response( id=id_, fallback=True, err=None, success=False, on_success=False, on_success_msg=None ) @@ -116,7 +116,7 @@ def as_fallback(cls: type[R], id_: str) -> R: # as_on_success creates a Response with the on_success field set to true. # This indicates that the message should be sent to the on_success sink. @classmethod - def as_on_success(cls: type[R], id_: str, on_success: Optional[Message] = None) -> R: + def as_on_success(cls, id_: str, on_success: Optional[Message] = None) -> "Response": return Response( id=id_, fallback=False, diff --git a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py index a7e47929..feee3d0f 100644 --- a/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py +++ b/packages/pynumaflow/pynumaflow/sinker/servicer/utils.py @@ -49,13 +49,10 @@ def build_sink_response(rspn: Response) -> sink_pb2.SinkResponse.Result: def build_on_success_message( msg: Optional[Message], ) -> Optional[sink_pb2.SinkResponse.Result.Message]: - if not msg: + if msg is None: return None - if msg.user_metadata is not None: - metadata = msg.user_metadata._to_proto() - else: - metadata = None + metadata = msg.user_metadata._to_proto() if msg.user_metadata is not None else None return sink_pb2.SinkResponse.Result.Message( keys=msg.keys, diff --git a/packages/pynumaflow/tests/sink/test_async_sink.py b/packages/pynumaflow/tests/sink/test_async_sink.py index 2dee50ec..cedc6241 100644 --- a/packages/pynumaflow/tests/sink/test_async_sink.py +++ b/packages/pynumaflow/tests/sink/test_async_sink.py @@ -69,10 +69,10 @@ def start_sink_streaming_request(_id: str, req_type) -> (Datum, tuple): value = mock_fallback_message() if req_type == "on_success1": - value = bytes("test_mock_on_success1_message", encoding="utf-8") + value = b"test_mock_on_success1_message" if req_type == "on_success2": - value = bytes("test_mock_on_success2_message", encoding="utf-8") + value = b"test_mock_on_success2_message" request = sink_pb2.SinkRequest.Request( value=value, @@ -276,6 +276,7 @@ def test_sink_fallback(self) -> None: def test_sink_on_success1(self) -> None: stub = self.__stub() + grpc_exception = None try: generator_response = stub.SinkFn( request_iterator=request_generator(count=10, req_type="on_success1", session=1) @@ -302,9 +303,13 @@ def test_sink_on_success1(self) -> None: except grpc.RpcError as e: logging.error(e) + grpc_exception = e + + self.assertIsNone(grpc_exception) def test_sink_on_success2(self) -> None: stub = self.__stub() + grpc_exception = None try: generator_response = stub.SinkFn( request_iterator=request_generator(count=10, req_type="on_success2", session=1) @@ -331,6 +336,9 @@ def test_sink_on_success2(self) -> None: except grpc.RpcError as e: logging.error(e) + grpc_exception = e + + self.assertIsNone(grpc_exception) def __stub(self): return sink_pb2_grpc.SinkStub(_channel) diff --git a/packages/pynumaflow/tests/sink/test_server.py b/packages/pynumaflow/tests/sink/test_server.py index 2845519f..9e517b16 100644 --- a/packages/pynumaflow/tests/sink/test_server.py +++ b/packages/pynumaflow/tests/sink/test_server.py @@ -242,7 +242,7 @@ def test_forward_message(self): sink_pb2.SinkRequest( request=sink_pb2.SinkRequest.Request( id="test_id_2", - value=bytes("test_mock_on_success1_message", encoding="utf-8"), + value=b"test_mock_on_success1_message", event_time=event_time_timestamp, watermark=watermark_timestamp, metadata=self.metadata, @@ -251,7 +251,7 @@ def test_forward_message(self): sink_pb2.SinkRequest( request=sink_pb2.SinkRequest.Request( id="test_id_3", - value=bytes("test_mock_on_success2_message", encoding="utf-8"), + value=b"test_mock_on_success2_message", event_time=event_time_timestamp, watermark=watermark_timestamp, metadata=self.metadata, From 68c7f270ea22483147ba6293af51cc707c59e8de Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Fri, 2 Jan 2026 14:35:50 -0500 Subject: [PATCH 5/5] Address review comments. Remove unused entry.sh from all sinks example Signed-off-by: Vaibhav Tiwari --- packages/pynumaflow/examples/sink/all_sinks/entry.sh | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 packages/pynumaflow/examples/sink/all_sinks/entry.sh diff --git a/packages/pynumaflow/examples/sink/all_sinks/entry.sh b/packages/pynumaflow/examples/sink/all_sinks/entry.sh deleted file mode 100644 index 073b05e3..00000000 --- a/packages/pynumaflow/examples/sink/all_sinks/entry.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -set -eux - -python example.py