From f8d115163ff732d14e4d959df6e613203f0c18fe Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Mon, 3 Nov 2025 20:18:23 -0500 Subject: [PATCH 01/13] Add support for on success sink Signed-off-by: vtiwari5 --- .../numaproj/numaflow/sinker/Constants.java | 4 + .../numaproj/numaflow/sinker/GRPCConfig.java | 3 + .../numaflow/sinker/KeyValueGroup.java | 12 +++ .../numaflow/sinker/OnSuccessMessage.java | 16 ++++ .../io/numaproj/numaflow/sinker/Response.java | 88 ++++++++++++++++++- .../io/numaproj/numaflow/sinker/Service.java | 40 +++++++-- src/main/proto/common/metadata.proto | 37 ++++++++ src/main/proto/sink/v1/sink.proto | 11 +++ 8 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java create mode 100644 src/main/java/io/numaproj/numaflow/sinker/OnSuccessMessage.java create mode 100644 src/main/proto/common/metadata.proto diff --git a/src/main/java/io/numaproj/numaflow/sinker/Constants.java b/src/main/java/io/numaproj/numaflow/sinker/Constants.java index c7d8ae7c..cb5a64ac 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Constants.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Constants.java @@ -3,14 +3,18 @@ class Constants { public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sink.sock"; public static final String DEFAULT_FB_SINK_SOCKET_PATH = "/var/run/numaflow/fb-sink.sock"; + public static final String DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH = "/var/run/numaflow/ons-sink.sock"; public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info"; public static final String DEFAULT_FB_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"; + public static final String DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH = + "/var/run/numaflow/ons-sinker-server-info"; public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; public static final int DEFAULT_PORT = 50051; public static final String DEFAULT_HOST = "localhost"; public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"; public static final String UD_CONTAINER_FALLBACK_SINK = "fb-udsink"; + public static final String UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink"; // Private constructor to prevent instantiation private Constants() { diff --git a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java index 46f6e248..5834ff42 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java @@ -36,6 +36,9 @@ static GRPCConfig defaultGrpcConfig() { if (Constants.UD_CONTAINER_FALLBACK_SINK.equals(containerType)) { socketPath = Constants.DEFAULT_FB_SINK_SOCKET_PATH; infoFilePath = Constants.DEFAULT_FB_SERVER_INFO_FILE_PATH; + } else if (Constants.UD_CONTAINER_ON_SUCCESS_SINK.equals(containerType)) { + socketPath = Constants.DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH; + infoFilePath = Constants.DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH; } return GRPCConfig.newBuilder() .infoFilePath(infoFilePath) diff --git a/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java b/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java new file mode 100644 index 00000000..d2d7b104 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java @@ -0,0 +1,12 @@ +package io.numaproj.numaflow.sinker; + +import lombok.Builder; +import lombok.Getter; + +import java.util.HashMap; + +@Getter +@Builder +public class KeyValueGroup { + private final HashMap keyValue; +} diff --git a/src/main/java/io/numaproj/numaflow/sinker/OnSuccessMessage.java b/src/main/java/io/numaproj/numaflow/sinker/OnSuccessMessage.java new file mode 100644 index 00000000..b13f32fa --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sinker/OnSuccessMessage.java @@ -0,0 +1,16 @@ +package io.numaproj.numaflow.sinker; + +import lombok.Builder; +import lombok.Getter; + +import java.util.HashMap; + +@Getter +@Builder +public class OnSuccessMessage { + private final byte[] value; + private final String key; + private final HashMap userMetadata; +} + + diff --git a/src/main/java/io/numaproj/numaflow/sinker/Response.java b/src/main/java/io/numaproj/numaflow/sinker/Response.java index 5d9537ce..473d6ce0 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Response.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Response.java @@ -1,9 +1,15 @@ package io.numaproj.numaflow.sinker; +import com.google.protobuf.ByteString; +import common.MetadataOuterClass; +import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse.Result.Message; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.Map; +import java.util.stream.Collectors; + /** * Response is used to send response from the user defined sinker. It contains the id of the * message, success status, an optional error message and a fallback status. Various static factory @@ -16,6 +22,12 @@ public class Response { private final Boolean success; private final String err; private final Boolean fallback; + private final Boolean serve; + private final byte[] serveResponse; + private final Boolean onSuccess; + // FIXME: Should this be OnSuccessMessage object from package? That would allow parity with other SDKs (specially Go) + // Currently done this way to prevent conversion in buildResult method. + private final Message onSuccessMessage; /** * Static method to create response for successful message processing. @@ -24,7 +36,7 @@ public class Response { * @return Response object with success status */ public static Response responseOK(String id) { - return new Response(id, true, null, false); + return new Response(id, true, null, false, false, null, false, null); } /** @@ -35,7 +47,7 @@ public static Response responseOK(String id) { * @return Response object with failure status and error message */ public static Response responseFailure(String id, String errMsg) { - return new Response(id, false, errMsg, false); + return new Response(id, false, errMsg, false, false, null, false, null); } /** @@ -46,6 +58,76 @@ public static Response responseFailure(String id, String errMsg) { * @return Response object with fallback status */ public static Response responseFallback(String id) { - return new Response(id, false, null, true); + return new Response(id, false, null, true, false, null, false, null); + } + + /** + * Static method to create response for serve message which is raw bytes. + * This indicates that the message should be sent to the serving store. + * Allows creation of serve message from raw bytes. + * + * @param id id of the message + * @param serveResponse Response object to be sent to the serving store + * @return Response object with serve status and serve response + */ + public static Response responseServe(String id, byte[] serveResponse) { + return new Response(id, false, null, false, true, serveResponse, false, null); + } + + /** + * Static method to create response for onSuccess message. Allows creation of onSuccess message + * from protobuf Message object. + * + * @param id id of the message + * @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink + * @return Response object with onSuccess status and onSuccess message + */ + public static Response responseOnSuccess(String id, Message onSuccessMessage) { + return new Response(id, false, null, false, false, null, true, onSuccessMessage); + } + + /** + * Overloaded static method to create response for onSuccess message. Allows creation of onSuccess message + * from OnSuccessMessage object. + * + * @param id id of the message + * @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink. Can be null + * if original message needs to be written to onSuccess sink + * @return Response object with onSuccess status and onSuccess message + */ + public static Response responseOnSuccess(String id, OnSuccessMessage onSuccessMessage) { + if (onSuccessMessage == null) { + return new Response(id, false, null, false, false, null, true, null); + } else { + Map pbUserMetadata = onSuccessMessage.getUserMetadata() + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> MetadataOuterClass.KeyValueGroup.newBuilder() + .putAllKeyValue(e.getValue() + .getKeyValue() + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + kv -> ByteString.copyFrom(kv.getValue()) + )) + ) + .build() + )); + + MetadataOuterClass.Metadata pbMetadata = MetadataOuterClass.Metadata.newBuilder() + .putAllUserMetadata(pbUserMetadata) + .build(); + + Message pbOnSuccessMessage = Message.newBuilder() + .addKeys(onSuccessMessage.getKey()) + .setValue(ByteString.copyFrom(onSuccessMessage.getValue())) + .setMetadata(pbMetadata) + .build(); + + return new Response(id, false, null, false, false, null, true, pbOnSuccessMessage); + } } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index e4cea580..c2aa71eb 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.sinker; import com.google.protobuf.Any; +import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.rpc.Code; import com.google.rpc.DebugInfo; @@ -131,13 +132,38 @@ public void onCompleted() { } private SinkOuterClass.SinkResponse.Result buildResult(Response response) { - SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK - : response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE; - return SinkOuterClass.SinkResponse.Result.newBuilder() - .setId(response.getId() == null ? "" : response.getId()) - .setErrMsg(response.getErr() == null ? "" : response.getErr()) - .setStatus(status) - .build(); + if (response.getFallback()) { + return SinkOuterClass.SinkResponse.Result.newBuilder() + .setId(response.getId() == null ? "" : response.getId()) + .setStatus(SinkOuterClass.Status.FALLBACK) + .build(); + } else if (response.getSuccess()) { + return SinkOuterClass.SinkResponse.Result.newBuilder() + .setId(response.getId() == null ? "" : response.getId()) + .setStatus(SinkOuterClass.Status.SUCCESS) + .build(); + } else if (response.getServe()) { + // FIXME: Return error when serve response is not set? + return SinkOuterClass.SinkResponse.Result.newBuilder() + .setId(response.getId() == null ? "" : response.getId()) + .setStatus(SinkOuterClass.Status.SERVE) + .setServeResponse(response.getServeResponse() == null ? null : ByteString.copyFrom( + response.getServeResponse())) + .build(); + } else if (response.getOnSuccess()) { + return SinkOuterClass.SinkResponse.Result.newBuilder() + .setId(response.getId() == null ? "" : response.getId()) + .setStatus(SinkOuterClass.Status.ON_SUCCESS) + .setOnSuccessMsg(response.getOnSuccessMessage() == null ? null : response.getOnSuccessMessage()) + .build(); + } else { + // FIXME: Return error when error message is not set? + return SinkOuterClass.SinkResponse.Result.newBuilder() + .setId(response.getId() == null ? "" : response.getId()) + .setStatus(SinkOuterClass.Status.FAILURE) + .setErrMsg(response.getErr() == null ? "" : response.getErr()) + .build(); + } } /** diff --git a/src/main/proto/common/metadata.proto b/src/main/proto/common/metadata.proto new file mode 100644 index 00000000..bbf4b3ab --- /dev/null +++ b/src/main/proto/common/metadata.proto @@ -0,0 +1,37 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +syntax = "proto3"; +option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/common"; + +package common; + +// Metadata is the metadata of the message +message Metadata { + // PreviousVertex is the name of the previous vertex + string previous_vertex = 1; + // SystemMetadata is the system metadata of the message + // Key of the map is the group name + map sys_metadata = 2; + // UserMetadata is the user metadata of the message + // Key of the map is the group name + map user_metadata = 3; +} + +// KeyValueGroup is a group of key-value pairs for a given group. +message KeyValueGroup { + map key_value = 1; +} diff --git a/src/main/proto/sink/v1/sink.proto b/src/main/proto/sink/v1/sink.proto index b9ea5ae7..b1484e4a 100644 --- a/src/main/proto/sink/v1/sink.proto +++ b/src/main/proto/sink/v1/sink.proto @@ -4,6 +4,7 @@ option java_package = "io.numaproj.numaflow.sink.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; +import "common/metadata.proto"; package sink.v1; @@ -65,6 +66,8 @@ enum Status { SUCCESS = 0; FAILURE = 1; FALLBACK = 2; + SERVE = 3; + ON_SUCCESS = 4; } /** @@ -72,12 +75,20 @@ enum Status { */ message SinkResponse { message Result { + message Message { + bytes value = 1; + repeated string keys = 2; + common.Metadata metadata = 3; + } // id is the ID of the message, can be used to uniquely identify the message. string id = 1; // status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. Status status = 2; // err_msg is the error message, set it if success is set to false. string err_msg = 3; + optional bytes serve_response = 4; + // on_success_msg is the message to be sent to on_success sink. + optional Message on_success_msg = 5; } repeated Result results = 1; optional Handshake handshake = 2; From 19b660a7a925c5e8c55d3e143ec839a34afdc749 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Tue, 4 Nov 2025 11:26:25 -0500 Subject: [PATCH 02/13] Add basic tests to improve coverage Signed-off-by: vtiwari5 --- .../io/numaproj/numaflow/sinker/Response.java | 53 +++++++++------ .../numaflow/mapper/HandlerDatumTest.java | 22 +++++++ .../numaflow/mapper/MessageListTest.java | 25 +++++++ .../numaproj/numaflow/mapper/MessageTest.java | 22 +++++++ .../numaflow/sinker/HandlerDatumTest.java | 56 ++++++++++++++++ .../numaflow/sinker/ResponseTest.java | 66 +++++++++++++++++++ .../numaproj/numaflow/sinker/ServerTest.java | 13 ++++ 7 files changed, 238 insertions(+), 19 deletions(-) create mode 100644 src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java create mode 100644 src/test/java/io/numaproj/numaflow/mapper/MessageListTest.java create mode 100644 src/test/java/io/numaproj/numaflow/mapper/MessageTest.java create mode 100644 src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java create mode 100644 src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java diff --git a/src/main/java/io/numaproj/numaflow/sinker/Response.java b/src/main/java/io/numaproj/numaflow/sinker/Response.java index 473d6ce0..960ca826 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Response.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Response.java @@ -7,6 +7,8 @@ import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -99,31 +101,44 @@ public static Response responseOnSuccess(String id, OnSuccessMessage onSuccessMe if (onSuccessMessage == null) { return new Response(id, false, null, false, false, null, true, null); } else { - Map pbUserMetadata = onSuccessMessage.getUserMetadata() - .entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> MetadataOuterClass.KeyValueGroup.newBuilder() - .putAllKeyValue(e.getValue() - .getKeyValue() - .entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - kv -> ByteString.copyFrom(kv.getValue()) - )) - ) - .build() - )); + + Map pbUserMetadata = MetadataOuterClass.Metadata + .getDefaultInstance() + .getUserMetadataMap(); + + if (onSuccessMessage.getUserMetadata() != null) { + pbUserMetadata = + onSuccessMessage.getUserMetadata() + .entrySet() + .stream() + .filter(e -> e.getKey() != null && e.getValue() != null) + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> MetadataOuterClass.KeyValueGroup.newBuilder() + .putAllKeyValue(e.getValue().getKeyValue() == null + ? Collections.emptyMap() + : e.getValue() + .getKeyValue() + .entrySet() + .stream() + .filter(kv -> kv.getKey() != null + && kv.getValue() != null) + .collect(Collectors.toMap( + Map.Entry::getKey, + kv -> ByteString.copyFrom(kv.getValue()) + )) + ) + .build() + )); + } MetadataOuterClass.Metadata pbMetadata = MetadataOuterClass.Metadata.newBuilder() .putAllUserMetadata(pbUserMetadata) .build(); Message pbOnSuccessMessage = Message.newBuilder() - .addKeys(onSuccessMessage.getKey()) - .setValue(ByteString.copyFrom(onSuccessMessage.getValue())) + .addKeys(onSuccessMessage.getKey() == null ? "" : onSuccessMessage.getKey()) + .setValue(onSuccessMessage.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(onSuccessMessage.getValue())) .setMetadata(pbMetadata) .build(); diff --git a/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java b/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java new file mode 100644 index 00000000..9b15bcce --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/mapper/HandlerDatumTest.java @@ -0,0 +1,22 @@ +package io.numaproj.numaflow.mapper; + +import org.junit.Test; + +import java.time.Instant; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +public class HandlerDatumTest { + @Test + public void testHandlerDatum() { + Instant watermark = Instant.now(); + Instant eventTime = Instant.now(); + HashMap headers = new HashMap<>(); + headers.put("header1", "value1"); + HandlerDatum datum = new HandlerDatum("asdf".getBytes(), watermark, eventTime, headers); + assertEquals(watermark, datum.getWatermark()); + assertEquals(eventTime, datum.getEventTime()); + assertEquals(headers, datum.getHeaders()); + } +} diff --git a/src/test/java/io/numaproj/numaflow/mapper/MessageListTest.java b/src/test/java/io/numaproj/numaflow/mapper/MessageListTest.java new file mode 100644 index 00000000..9e7521d6 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/mapper/MessageListTest.java @@ -0,0 +1,25 @@ +package io.numaproj.numaflow.mapper; + +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertArrayEquals; + +public class MessageListTest { + @Test + public void testMessageList() { + Message defaultMessage = new Message("asdf".getBytes()); + ArrayList messageList = new ArrayList<>(); + messageList.add(defaultMessage); + + MessageList messageList1 = new MessageList.MessageListBuilder() + .addMessages(messageList) + .addMessage(defaultMessage) + .build(); + + messageList.add(defaultMessage); + + assertArrayEquals(messageList1.getMessages().toArray(), messageList.toArray()); + } +} diff --git a/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java b/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java new file mode 100644 index 00000000..343bf394 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/mapper/MessageTest.java @@ -0,0 +1,22 @@ +package io.numaproj.numaflow.mapper; + +import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; + +public class MessageTest { + @Test + public void testMessage() { + Message message1 = new Message("asdf".getBytes()); + assertArrayEquals("asdf".getBytes(), message1.getValue()); + Message message2 = new Message("asdf".getBytes(), new String[]{"key1"}); + assertArrayEquals("asdf".getBytes(), message2.getValue()); + assertArrayEquals(new String[]{"key1"}, message2.getKeys()); + Message message3 = new Message(null, null, null); + assertArrayEquals(null, message3.getValue()); + Message message4 = Message.toDrop(); + assertArrayEquals(new byte[0], message4.getValue()); + assertArrayEquals(null, message4.getKeys()); + String[] drop_tags = {"U+005C__DROP__"}; + assertArrayEquals(drop_tags, message4.getTags()); + } +} diff --git a/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java b/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java new file mode 100644 index 00000000..2097b42f --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sinker/HandlerDatumTest.java @@ -0,0 +1,56 @@ +package io.numaproj.numaflow.sinker; + +import org.junit.Test; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class HandlerDatumTest { + + @Test + public void testGetKeys() { + String[] keys = {"key1", "key2"}; + HandlerDatum datum = new HandlerDatum(keys, null, null, null, null, null); + assertArrayEquals(keys, datum.getKeys()); + } + + @Test + public void testGetValue() { + byte[] value = {1, 2, 3}; + HandlerDatum datum = new HandlerDatum(null, value, null, null, null, null); + assertArrayEquals(value, datum.getValue()); + } + + @Test + public void testGetWatermark() { + Instant watermark = Instant.now(); + HandlerDatum datum = new HandlerDatum(null, null, watermark, null, null, null); + assertEquals(watermark, datum.getWatermark()); + } + + @Test + public void testGetEventTime() { + Instant eventTime = Instant.now(); + HandlerDatum datum = new HandlerDatum(null, null, null, eventTime, null, null); + assertEquals(eventTime, datum.getEventTime()); + } + + @Test + public void testGetId() { + String id = "test-id"; + HandlerDatum datum = new HandlerDatum(null, null, null, null, id, null); + assertEquals(id, datum.getId()); + } + + @Test + public void testGetHeaders() { + Map headers = new HashMap<>(); + headers.put("header1", "value1"); + HandlerDatum datum = new HandlerDatum(null, null, null, null, null, headers); + assertEquals(headers, datum.getHeaders()); + } +} diff --git a/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java new file mode 100644 index 00000000..70762ab7 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java @@ -0,0 +1,66 @@ +package io.numaproj.numaflow.sinker; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import common.MetadataOuterClass; +import io.numaproj.numaflow.sink.v1.SinkOuterClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Map.entry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ResponseTest { + @Test + public void test_addResponse() { + String defaultId = "id"; + Response response1 = Response.responseFallback(defaultId); + assertEquals(defaultId, response1.getId()); + Response response2 = Response.responseOK(defaultId); + assertEquals(defaultId, response2.getId()); + Response response3 = Response.responseServe(defaultId, "serve".getBytes()); + assertEquals(defaultId, response3.getId()); + Response response4 = Response.responseFailure(defaultId, "failure"); + assertEquals(defaultId, response4.getId()); + + HashMap userMetadata = new HashMap<>(); + userMetadata.put("group1", KeyValueGroup.builder().build()); + HashMap kvg1 = new HashMap<>(Map.ofEntries( + entry("key1", "val1".getBytes()) + )); + kvg1.put("key2", null); + + userMetadata.put("group2", KeyValueGroup.builder().keyValue(kvg1).build()); + userMetadata.put("group3", null); + OnSuccessMessage onSuccessMessage1 = new OnSuccessMessage("onSuccessValue".getBytes(), null, userMetadata); + + Response response5 = Response.responseOnSuccess(defaultId, onSuccessMessage1); + assertEquals(defaultId, response5.getId()); + assertEquals("", response5.getOnSuccessMessage().getKeys(0)); + + OnSuccessMessage onSuccessMessage2 = new OnSuccessMessage("onSuccessValue".getBytes(), null, null); + Response response6 = Response.responseOnSuccess(defaultId, onSuccessMessage2); + assertEquals(defaultId, response6.getId()); + assertEquals(MetadataOuterClass.Metadata.newBuilder() + .putAllUserMetadata(MetadataOuterClass.Metadata + .getDefaultInstance() + .getUserMetadataMap()).build(), + response6.getOnSuccessMessage().getMetadata()); + + OnSuccessMessage onSuccessMessage3 = new OnSuccessMessage(null, "key", null); + Response response7 = Response.responseOnSuccess(defaultId, onSuccessMessage3); + assertEquals(defaultId, response7.getId()); + assertEquals(ByteString.copyFrom("".getBytes()), response7.getOnSuccessMessage().getValue()); + assertEquals("key", response7.getOnSuccessMessage().getKeys(0)); + + Response response8 = Response.responseOnSuccess(defaultId, (OnSuccessMessage) null); + assertEquals(defaultId, response8.getId()); + assertNull(response8.getOnSuccessMessage()); + + Response response9 = Response.responseOnSuccess(defaultId, ( SinkOuterClass.SinkResponse.Result.Message) null); + assertNull(response9.getOnSuccessMessage()); + } +} diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java index 5666f5f9..d384483c 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.sinker; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; @@ -53,6 +54,18 @@ public void tearDown() throws Exception { server.stop(); } + @Test + public void testServer() { + Sinker sinker = new TestSinkFn(); + Server server = new Server(sinker); + try { + server.start(); + server.stop(); + } catch (Exception e) { + assertFalse(e instanceof RuntimeException); + } + } + @Test public void sinkerSuccess() { int batchSize = 6; From fd353050894192dd0de8d609d8ae25fd215b8a43 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Tue, 4 Nov 2025 14:19:26 -0500 Subject: [PATCH 03/13] Modify tests a bit more to improve coverage Signed-off-by: vtiwari5 --- .../io/numaproj/numaflow/sinker/Service.java | 6 ++- .../numaproj/numaflow/sinker/ServerTest.java | 45 +++++++++++++++---- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index c2aa71eb..7c1f4bf6 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -151,10 +151,14 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) { response.getServeResponse())) .build(); } else if (response.getOnSuccess()) { + // FIXME: Cannot set null for onSuccessMsg, so setting default instance + // Problematic because numaflow-core implementation regards null to be the original message return SinkOuterClass.SinkResponse.Result.newBuilder() .setId(response.getId() == null ? "" : response.getId()) .setStatus(SinkOuterClass.Status.ON_SUCCESS) - .setOnSuccessMsg(response.getOnSuccessMessage() == null ? null : response.getOnSuccessMessage()) + .setOnSuccessMsg(response.getOnSuccessMessage() == null + ? SinkOuterClass.SinkResponse.Result.Message.getDefaultInstance() + : response.getOnSuccessMessage()) .build(); } else { // FIXME: Return error when error message is not set? diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java index d384483c..8512bf34 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java @@ -56,12 +56,14 @@ public void tearDown() throws Exception { @Test public void testServer() { + // Test for coverage for different constructors for Server Sinker sinker = new TestSinkFn(); Server server = new Server(sinker); try { server.start(); server.stop(); } catch (Exception e) { + // Interrupted exceptions are let go assertFalse(e instanceof RuntimeException); } } @@ -88,10 +90,16 @@ public void sinkerSuccess() { for (int i = 1; i <= batchSize * numBatches; i++) { String[] keys; - if (i < batchSize * numBatches) { + if (i % 2 == 0) { keys = new String[] {"valid-key"}; + } else if (i % 3 == 0) { + keys = new String[] {"fallback-key"}; + } else if (i % 5 == 0) { + keys = new String[] {"onsuccess-key"}; + } else if (i % 7 == 0) { + keys = new String[] {"serve-key"}; } else { - keys = new String[] {"invalid-key"}; + keys = new String[] {"invalid-key"}; } SinkOuterClass.SinkRequest.Request request = @@ -135,10 +143,19 @@ public void sinkerSuccess() { result -> { if (result.getStatus() == SinkOuterClass.Status.FAILURE) { assertEquals("error message", result.getErrMsg()); - return; + } else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) { + assertEquals(result.getId(), expectedId); + assertEquals(SinkOuterClass.Status.FALLBACK, result.getStatus()); + } else if (result.getStatus() == SinkOuterClass.Status.ON_SUCCESS) { + assertEquals(result.getId(), expectedId); + assertEquals(SinkOuterClass.Status.ON_SUCCESS, result.getStatus()); + } else if (result.getStatus() == SinkOuterClass.Status.SERVE) { + assertEquals(result.getId(), expectedId); + assertEquals(SinkOuterClass.Status.SERVE, result.getStatus()); + } else { + assertEquals(result.getId(), expectedId); + assertEquals(SinkOuterClass.Status.SUCCESS, result.getStatus()); } - assertEquals(result.getId(), expectedId); - assertEquals(SinkOuterClass.Status.SUCCESS, result.getStatus()); }); } } @@ -160,12 +177,22 @@ public ResponseList processMessages(DatumIterator datumIterator) { if (datum == null) { break; } + if (Arrays.equals(datum.getKeys(), new String[] {"invalid-key"})) { - builder.addResponse( - Response.responseFailure(datum.getId() + processedIdSuffix, "error message")); - continue; + builder.addResponse( + Response.responseFailure(datum.getId() + processedIdSuffix, "error message")); + } else if (Arrays.equals(datum.getKeys(), new String[] {"fallback-key"})) { + builder.addResponse( + Response.responseFallback(datum.getId() + processedIdSuffix)); + } else if (Arrays.equals(datum.getKeys(), new String[] {"onsuccess-key"})) { + builder.addResponse( + Response.responseOnSuccess(datum.getId() + processedIdSuffix, (OnSuccessMessage) null)); + } else if (Arrays.equals(datum.getKeys(), new String[] {"serve-key"})) { + builder.addResponse( + Response.responseServe(datum.getId() + processedIdSuffix, "serve message".getBytes())); + } else { + builder.addResponse(Response.responseOK(datum.getId() + processedIdSuffix)); } - builder.addResponse(Response.responseOK(datum.getId() + processedIdSuffix)); } return builder.build(); From a237b3cbbcacc4396e3f9959c7f5971bb4e65261 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Tue, 4 Nov 2025 14:24:41 -0500 Subject: [PATCH 04/13] Exclude generated protobuf files for common.metadata from coverage calculation Signed-off-by: vtiwari5 --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 2a0b3039..db18e1c1 100644 --- a/pom.xml +++ b/pom.xml @@ -310,6 +310,7 @@ io/numaproj/numaflow/source/v1/* io/numaproj/numaflow/serving/v1/* io/numaproj/numaflow/accumulator/v1/* + common/* **/*TestKit* From 32a6020736212a16011305cd44f618a2d7325578 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Wed, 5 Nov 2025 10:01:46 -0500 Subject: [PATCH 05/13] Add an example for onSuccess sink Signed-off-by: vtiwari5 --- .../examples/sink/onsuccess/OnSuccess.java | 62 +++++++++++++++++++ .../io/numaproj/numaflow/sinker/Service.java | 1 + 2 files changed, 63 insertions(+) create mode 100644 examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java new file mode 100644 index 00000000..367b734e --- /dev/null +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java @@ -0,0 +1,62 @@ +package io.numaproj.numaflow.examples.sink.onsuccess; + +import io.numaproj.numaflow.examples.sink.simple.SimpleSink; +import io.numaproj.numaflow.sinker.Datum; +import io.numaproj.numaflow.sinker.DatumIterator; +import io.numaproj.numaflow.sinker.Response; +import io.numaproj.numaflow.sinker.ResponseList; +import io.numaproj.numaflow.sinker.Server; +import io.numaproj.numaflow.sinker.Sinker; +import lombok.extern.slf4j.Slf4j; + +import java.util.Random; + +@Slf4j +public class OnSuccess extends Sinker { + public static void main(String[] args) throws Exception { + Server server = new Server(new SimpleSink()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); + } + + @Override + public ResponseList processMessages(DatumIterator datumIterator) { + ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder(); + while (true) { + Datum datum = null; + try { + datum = datumIterator.next(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } + // null means the iterator is closed, so we break the loop + if (datum == null) { + break; + } + try { + String msg = new String(datum.getValue()); + log.info("Received message: {}, headers - {}", msg, datum.getHeaders()); + if (writeToPrimarySink()) { + responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), null)); + } else { + responseListBuilder.addResponse(Response.responseFallback(datum.getId())); + } + } catch (Exception e) { + responseListBuilder.addResponse(Response.responseFailure( + datum.getId(), + e.getMessage())); + } + } + return responseListBuilder.build(); + } + + public boolean writeToPrimarySink() { + Random random = new Random(); + return random.nextBoolean(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 7c1f4bf6..5458a3f9 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -153,6 +153,7 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) { } else if (response.getOnSuccess()) { // FIXME: Cannot set null for onSuccessMsg, so setting default instance // Problematic because numaflow-core implementation regards null to be the original message + // TODO: Change numaflow-core implementation to also accept empty `value` field return SinkOuterClass.SinkResponse.Result.newBuilder() .setId(response.getId() == null ? "" : response.getId()) .setStatus(SinkOuterClass.Status.ON_SUCCESS) From 917e77e4af5fc53ab6e74675dcdebc37d1374335 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Wed, 5 Nov 2025 10:30:25 -0500 Subject: [PATCH 06/13] Add execution ID for new example Signed-off-by: vtiwari5 --- examples/pom.xml | 22 +++++++++++++++++++ .../examples/sink/onsuccess/OnSuccess.java | 5 +++++ 2 files changed, 27 insertions(+) diff --git a/examples/pom.xml b/examples/pom.xml index 63d37278..31cca1f4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -381,6 +381,28 @@ + + on-success-sink + package + + dockerBuild + + + + amazoncorretto:11 + + + + io.numaproj.numaflow.examples.sink.onsuccess.OnSuccess + + + + + numaflow-java-examples/on-success-sink:${docker.tag} + + + + diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java index 367b734e..34efcfd1 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java @@ -55,6 +55,11 @@ public ResponseList processMessages(DatumIterator datumIterator) { return responseListBuilder.build(); } + /** + * Example method to simulate write failures/success to primary sink. + * Based on whether this returns true/false, we write to fallback sink / onSuccess sink + * @return + */ public boolean writeToPrimarySink() { Random random = new Random(); return random.nextBoolean(); From 9745adcd9782daf45421ec6de9871a8a84d4262c Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Wed, 5 Nov 2025 10:32:31 -0500 Subject: [PATCH 07/13] Fix ambiguous method call in example Signed-off-by: vtiwari5 --- .../numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java index 34efcfd1..54cca116 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java @@ -3,6 +3,7 @@ import io.numaproj.numaflow.examples.sink.simple.SimpleSink; import io.numaproj.numaflow.sinker.Datum; import io.numaproj.numaflow.sinker.DatumIterator; +import io.numaproj.numaflow.sinker.OnSuccessMessage; import io.numaproj.numaflow.sinker.Response; import io.numaproj.numaflow.sinker.ResponseList; import io.numaproj.numaflow.sinker.Server; @@ -42,7 +43,7 @@ public ResponseList processMessages(DatumIterator datumIterator) { String msg = new String(datum.getValue()); log.info("Received message: {}, headers - {}", msg, datum.getHeaders()); if (writeToPrimarySink()) { - responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), null)); + responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), (OnSuccessMessage) null)); } else { responseListBuilder.addResponse(Response.responseFallback(datum.getId())); } From 224d7752fee31b52b6dca973f260419896cc875f Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Wed, 5 Nov 2025 15:28:11 -0500 Subject: [PATCH 08/13] Modify onSuccess message in the added example for clarity Signed-off-by: vtiwari5 --- .../examples/sink/onsuccess/OnSuccess.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java index 54cca116..8ed4d983 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java @@ -15,7 +15,7 @@ @Slf4j public class OnSuccess extends Sinker { public static void main(String[] args) throws Exception { - Server server = new Server(new SimpleSink()); + Server server = new Server(new OnSuccess()); // Start the server server.start(); @@ -41,13 +41,20 @@ public ResponseList processMessages(DatumIterator datumIterator) { } try { String msg = new String(datum.getValue()); - log.info("Received message: {}, headers - {}", msg, datum.getHeaders()); + log.info("Received message: {}, id: {}, headers - {}", msg, datum.getId(), datum.getHeaders()); if (writeToPrimarySink()) { - responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), (OnSuccessMessage) null)); + log.info("Writing to onSuccess sink: {}", datum.getId()); + responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), + OnSuccessMessage.builder() + .value(String.format("Successfully wrote message with ID: %s", + datum.getId()).getBytes()) + .build())); } else { + log.info("Writing to fallback sink: {}", datum.getId()); responseListBuilder.addResponse(Response.responseFallback(datum.getId())); } } catch (Exception e) { + log.warn("Error while writing to any sink: ", e); responseListBuilder.addResponse(Response.responseFailure( datum.getId(), e.getMessage())); @@ -59,7 +66,7 @@ public ResponseList processMessages(DatumIterator datumIterator) { /** * Example method to simulate write failures/success to primary sink. * Based on whether this returns true/false, we write to fallback sink / onSuccess sink - * @return + * @return true if simulated write to primary sink is successful, false otherwise */ public boolean writeToPrimarySink() { Random random = new Random(); From 38b925bf00f219cf135a7d71b0742ec58b222fcc Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Wed, 5 Nov 2025 15:31:58 -0500 Subject: [PATCH 09/13] Add test for the example added for OnSuccess sink Signed-off-by: vtiwari5 --- .../sink/onsuccess/OnSuccessTest.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 examples/src/test/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccessTest.java diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccessTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccessTest.java new file mode 100644 index 00000000..3dba7007 --- /dev/null +++ b/examples/src/test/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccessTest.java @@ -0,0 +1,33 @@ +package io.numaproj.numaflow.examples.sink.onsuccess; + +import io.numaproj.numaflow.examples.sink.simple.SimpleSink; +import io.numaproj.numaflow.sinker.Response; +import io.numaproj.numaflow.sinker.ResponseList; +import io.numaproj.numaflow.sinker.SinkerTestKit; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OnSuccessTest { + @Test + public void testOnSuccessSink() { + int datumCount = 10; + OnSuccess onSuccessSink = new OnSuccess(); + // Create a test datum iterator with 10 messages + SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator(); + for (int i = 0; i < datumCount; i++) { + testListIterator.addDatum( + SinkerTestKit.TestDatum + .builder() + .id("id-" + i) + .value(("value-" + i).getBytes()) + .build()); + } + ResponseList responseList = onSuccessSink.processMessages(testListIterator); + Assertions.assertEquals(datumCount, responseList.getResponses().size()); + for (Response response : responseList.getResponses()) { + Assertions.assertEquals(false, response.getSuccess()); + } + // we can add the logic to verify if the messages were + // successfully written to the sink(could be a file, database, etc.) + } +} From 7ee84b5ccb684bc61c3bb02d3131ef8460bb0da3 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Thu, 6 Nov 2025 08:46:05 -0500 Subject: [PATCH 10/13] Rename OnSuccessMessage to Message to maintain parity with other SDK changes Signed-off-by: vtiwari5 --- .../numaflow/sinker/{OnSuccessMessage.java => Message.java} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/main/java/io/numaproj/numaflow/sinker/{OnSuccessMessage.java => Message.java} (100%) diff --git a/src/main/java/io/numaproj/numaflow/sinker/OnSuccessMessage.java b/src/main/java/io/numaproj/numaflow/sinker/Message.java similarity index 100% rename from src/main/java/io/numaproj/numaflow/sinker/OnSuccessMessage.java rename to src/main/java/io/numaproj/numaflow/sinker/Message.java From d57f4d91bd94812abd13be804b4c44e78da4091a Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Thu, 6 Nov 2025 08:48:10 -0500 Subject: [PATCH 11/13] Rename OnSuccessMessage to Message to maintain parity with other SDK changes Signed-off-by: vtiwari5 --- .../java/io/numaproj/numaflow/sinker/Message.java | 2 +- .../io/numaproj/numaflow/sinker/Response.java | 15 +++++++-------- .../io/numaproj/numaflow/sinker/ResponseTest.java | 9 ++++----- .../io/numaproj/numaflow/sinker/ServerTest.java | 2 +- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/sinker/Message.java b/src/main/java/io/numaproj/numaflow/sinker/Message.java index b13f32fa..a2003198 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Message.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Message.java @@ -7,7 +7,7 @@ @Getter @Builder -public class OnSuccessMessage { +public class Message { private final byte[] value; private final String key; private final HashMap userMetadata; diff --git a/src/main/java/io/numaproj/numaflow/sinker/Response.java b/src/main/java/io/numaproj/numaflow/sinker/Response.java index 960ca826..b0c2b217 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Response.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Response.java @@ -2,13 +2,12 @@ import com.google.protobuf.ByteString; import common.MetadataOuterClass; -import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse.Result.Message; +import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -27,11 +26,11 @@ public class Response { private final Boolean serve; private final byte[] serveResponse; private final Boolean onSuccess; - // FIXME: Should this be OnSuccessMessage object from package? That would allow parity with other SDKs (specially Go) + // FIXME: Should this be Message object from this package? That would allow parity with other SDKs (specially Go) // Currently done this way to prevent conversion in buildResult method. - private final Message onSuccessMessage; + private final SinkResponse.Result.Message onSuccessMessage; - /** + /** * Static method to create response for successful message processing. * * @param id id of the message @@ -84,7 +83,7 @@ public static Response responseServe(String id, byte[] serveResponse) { * @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink * @return Response object with onSuccess status and onSuccess message */ - public static Response responseOnSuccess(String id, Message onSuccessMessage) { + public static Response responseOnSuccess(String id, SinkResponse.Result.Message onSuccessMessage) { return new Response(id, false, null, false, false, null, true, onSuccessMessage); } @@ -97,7 +96,7 @@ public static Response responseOnSuccess(String id, Message onSuccessMessage) { * if original message needs to be written to onSuccess sink * @return Response object with onSuccess status and onSuccess message */ - public static Response responseOnSuccess(String id, OnSuccessMessage onSuccessMessage) { + public static Response responseOnSuccess(String id, Message onSuccessMessage) { if (onSuccessMessage == null) { return new Response(id, false, null, false, false, null, true, null); } else { @@ -136,7 +135,7 @@ public static Response responseOnSuccess(String id, OnSuccessMessage onSuccessMe .putAllUserMetadata(pbUserMetadata) .build(); - Message pbOnSuccessMessage = Message.newBuilder() + SinkResponse.Result.Message pbOnSuccessMessage = SinkResponse.Result.Message.newBuilder() .addKeys(onSuccessMessage.getKey() == null ? "" : onSuccessMessage.getKey()) .setValue(onSuccessMessage.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(onSuccessMessage.getValue())) .setMetadata(pbMetadata) diff --git a/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java index 70762ab7..841cfb85 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ResponseTest.java @@ -1,7 +1,6 @@ package io.numaproj.numaflow.sinker; import com.google.protobuf.ByteString; -import com.google.protobuf.Message; import common.MetadataOuterClass; import io.numaproj.numaflow.sink.v1.SinkOuterClass; import org.junit.Test; @@ -35,13 +34,13 @@ public void test_addResponse() { userMetadata.put("group2", KeyValueGroup.builder().keyValue(kvg1).build()); userMetadata.put("group3", null); - OnSuccessMessage onSuccessMessage1 = new OnSuccessMessage("onSuccessValue".getBytes(), null, userMetadata); + Message onSuccessMessage1 = new Message("onSuccessValue".getBytes(), null, userMetadata); Response response5 = Response.responseOnSuccess(defaultId, onSuccessMessage1); assertEquals(defaultId, response5.getId()); assertEquals("", response5.getOnSuccessMessage().getKeys(0)); - OnSuccessMessage onSuccessMessage2 = new OnSuccessMessage("onSuccessValue".getBytes(), null, null); + Message onSuccessMessage2 = new Message("onSuccessValue".getBytes(), null, null); Response response6 = Response.responseOnSuccess(defaultId, onSuccessMessage2); assertEquals(defaultId, response6.getId()); assertEquals(MetadataOuterClass.Metadata.newBuilder() @@ -50,13 +49,13 @@ public void test_addResponse() { .getUserMetadataMap()).build(), response6.getOnSuccessMessage().getMetadata()); - OnSuccessMessage onSuccessMessage3 = new OnSuccessMessage(null, "key", null); + Message onSuccessMessage3 = new Message(null, "key", null); Response response7 = Response.responseOnSuccess(defaultId, onSuccessMessage3); assertEquals(defaultId, response7.getId()); assertEquals(ByteString.copyFrom("".getBytes()), response7.getOnSuccessMessage().getValue()); assertEquals("key", response7.getOnSuccessMessage().getKeys(0)); - Response response8 = Response.responseOnSuccess(defaultId, (OnSuccessMessage) null); + Response response8 = Response.responseOnSuccess(defaultId, (Message) null); assertEquals(defaultId, response8.getId()); assertNull(response8.getOnSuccessMessage()); diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java index 8512bf34..32de9fce 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java @@ -186,7 +186,7 @@ public ResponseList processMessages(DatumIterator datumIterator) { Response.responseFallback(datum.getId() + processedIdSuffix)); } else if (Arrays.equals(datum.getKeys(), new String[] {"onsuccess-key"})) { builder.addResponse( - Response.responseOnSuccess(datum.getId() + processedIdSuffix, (OnSuccessMessage) null)); + Response.responseOnSuccess(datum.getId() + processedIdSuffix, (Message) null)); } else if (Arrays.equals(datum.getKeys(), new String[] {"serve-key"})) { builder.addResponse( Response.responseServe(datum.getId() + processedIdSuffix, "serve message".getBytes())); From 2887ee9a7a79f52ac05439f6280d206a3c9995a5 Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Thu, 6 Nov 2025 08:54:36 -0500 Subject: [PATCH 12/13] fix on success sink example Signed-off-by: vtiwari5 --- .../numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java index 8ed4d983..b24342d4 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/onsuccess/OnSuccess.java @@ -3,7 +3,7 @@ import io.numaproj.numaflow.examples.sink.simple.SimpleSink; import io.numaproj.numaflow.sinker.Datum; import io.numaproj.numaflow.sinker.DatumIterator; -import io.numaproj.numaflow.sinker.OnSuccessMessage; +import io.numaproj.numaflow.sinker.Message; import io.numaproj.numaflow.sinker.Response; import io.numaproj.numaflow.sinker.ResponseList; import io.numaproj.numaflow.sinker.Server; @@ -45,7 +45,7 @@ public ResponseList processMessages(DatumIterator datumIterator) { if (writeToPrimarySink()) { log.info("Writing to onSuccess sink: {}", datum.getId()); responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(), - OnSuccessMessage.builder() + Message.builder() .value(String.format("Successfully wrote message with ID: %s", datum.getId()).getBytes()) .build())); From fb65bfa86fc1bca948dec7e9314268e7ce9d30af Mon Sep 17 00:00:00 2001 From: vtiwari5 Date: Fri, 14 Nov 2025 14:16:32 -0500 Subject: [PATCH 13/13] Add documentation for newly added public classes Signed-off-by: vtiwari5 --- src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java | 4 ++++ src/main/java/io/numaproj/numaflow/sinker/Message.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java b/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java index d2d7b104..abc0db08 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java +++ b/src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java @@ -5,6 +5,10 @@ import java.util.HashMap; +/** + * KeyValueGroup is a map of key-value pairs for a given group. + * Used as part of {@link io.numaproj.numaflow.sinker.Message}. + */ @Getter @Builder public class KeyValueGroup { diff --git a/src/main/java/io/numaproj/numaflow/sinker/Message.java b/src/main/java/io/numaproj/numaflow/sinker/Message.java index a2003198..d8ede1cf 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Message.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Message.java @@ -5,6 +5,10 @@ import java.util.HashMap; +/** + * Message contains information that needs to be sent to the OnSuccess sink. + * The message can be different from the original message that was sent to primary sink. + */ @Getter @Builder public class Message {