Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,28 @@
</to>
</configuration>
</execution>
<execution>
<id>on-success-sink</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.sink.onsuccess.OnSuccess
</mainClass>
</container>
<to>
<image>
numaflow-java-examples/on-success-sink:${docker.tag}
</image>
</to>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.numaproj.numaflow.examples.sink.onsuccess;

import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
import io.numaproj.numaflow.sinker.Datum;
import io.numaproj.numaflow.sinker.DatumIterator;
import io.numaproj.numaflow.sinker.Message;
import io.numaproj.numaflow.sinker.Response;
import io.numaproj.numaflow.sinker.ResponseList;
import io.numaproj.numaflow.sinker.Server;
import io.numaproj.numaflow.sinker.Sinker;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;

@Slf4j
public class OnSuccess extends Sinker {
public static void main(String[] args) throws Exception {
Server server = new Server(new OnSuccess());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
public ResponseList processMessages(DatumIterator datumIterator) {
ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
while (true) {
Datum datum = null;
try {
datum = datumIterator.next();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
// null means the iterator is closed, so we break the loop
if (datum == null) {
break;
}
try {
String msg = new String(datum.getValue());
log.info("Received message: {}, id: {}, headers - {}", msg, datum.getId(), datum.getHeaders());
if (writeToPrimarySink()) {
log.info("Writing to onSuccess sink: {}", datum.getId());
responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(),
Message.builder()
.value(String.format("Successfully wrote message with ID: %s",
datum.getId()).getBytes())
.build()));
} else {
log.info("Writing to fallback sink: {}", datum.getId());
responseListBuilder.addResponse(Response.responseFallback(datum.getId()));
}
} catch (Exception e) {
log.warn("Error while writing to any sink: ", e);
responseListBuilder.addResponse(Response.responseFailure(
datum.getId(),
e.getMessage()));
}
}
return responseListBuilder.build();
}

/**
* Example method to simulate write failures/success to primary sink.
* Based on whether this returns true/false, we write to fallback sink / onSuccess sink
* @return true if simulated write to primary sink is successful, false otherwise
*/
public boolean writeToPrimarySink() {
Random random = new Random();
return random.nextBoolean();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.numaproj.numaflow.examples.sink.onsuccess;

import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
import io.numaproj.numaflow.sinker.Response;
import io.numaproj.numaflow.sinker.ResponseList;
import io.numaproj.numaflow.sinker.SinkerTestKit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class OnSuccessTest {
@Test
public void testOnSuccessSink() {
int datumCount = 10;
OnSuccess onSuccessSink = new OnSuccess();
// Create a test datum iterator with 10 messages
SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator();
for (int i = 0; i < datumCount; i++) {
testListIterator.addDatum(
SinkerTestKit.TestDatum
.builder()
.id("id-" + i)
.value(("value-" + i).getBytes())
.build());
}
ResponseList responseList = onSuccessSink.processMessages(testListIterator);
Assertions.assertEquals(datumCount, responseList.getResponses().size());
for (Response response : responseList.getResponses()) {
Assertions.assertEquals(false, response.getSuccess());
}
// we can add the logic to verify if the messages were
// successfully written to the sink(could be a file, database, etc.)
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@
<exclude>io/numaproj/numaflow/source/v1/*</exclude>
<exclude>io/numaproj/numaflow/serving/v1/*</exclude>
<exclude>io/numaproj/numaflow/accumulator/v1/*</exclude>
<exclude>common/*</exclude>
<exclude>**/*TestKit*</exclude>
</excludes>
</configuration>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
class Constants {
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sink.sock";
public static final String DEFAULT_FB_SINK_SOCKET_PATH = "/var/run/numaflow/fb-sink.sock";
public static final String DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH = "/var/run/numaflow/ons-sink.sock";
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info";
public static final String DEFAULT_FB_SERVER_INFO_FILE_PATH =
"/var/run/numaflow/fb-sinker-server-info";
public static final String DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH =
"/var/run/numaflow/ons-sinker-server-info";
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
public static final int DEFAULT_PORT = 50051;
public static final String DEFAULT_HOST = "localhost";
public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE";
public static final String UD_CONTAINER_FALLBACK_SINK = "fb-udsink";
public static final String UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink";

// Private constructor to prevent instantiation
private Constants() {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ static GRPCConfig defaultGrpcConfig() {
if (Constants.UD_CONTAINER_FALLBACK_SINK.equals(containerType)) {
socketPath = Constants.DEFAULT_FB_SINK_SOCKET_PATH;
infoFilePath = Constants.DEFAULT_FB_SERVER_INFO_FILE_PATH;
} else if (Constants.UD_CONTAINER_ON_SUCCESS_SINK.equals(containerType)) {
socketPath = Constants.DEFAULT_ON_SUCCESS_SINK_SOCKET_PATH;
infoFilePath = Constants.DEFAULT_ON_SUCCESS_SERVER_INFO_FILE_PATH;
}
return GRPCConfig.newBuilder()
.infoFilePath(infoFilePath)
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/KeyValueGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.numaproj.numaflow.sinker;

import lombok.Builder;
import lombok.Getter;

import java.util.HashMap;

/**
* KeyValueGroup is a map of key-value pairs for a given group.
* Used as part of {@link io.numaproj.numaflow.sinker.Message}.
*/
@Getter
@Builder
public class KeyValueGroup {
private final HashMap<String, byte[]> keyValue;
}
20 changes: 20 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.numaproj.numaflow.sinker;

import lombok.Builder;
import lombok.Getter;

import java.util.HashMap;

/**
* Message contains information that needs to be sent to the OnSuccess sink.
* The message can be different from the original message that was sent to primary sink.
*/
@Getter
@Builder
public class Message {
private final byte[] value;
private final String key;
private final HashMap<String, KeyValueGroup> userMetadata;
}


104 changes: 100 additions & 4 deletions src/main/java/io/numaproj/numaflow/sinker/Response.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package io.numaproj.numaflow.sinker;

import com.google.protobuf.ByteString;
import common.MetadataOuterClass;
import io.numaproj.numaflow.sink.v1.SinkOuterClass.SinkResponse;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Response is used to send response from the user defined sinker. It contains the id of the
* message, success status, an optional error message and a fallback status. Various static factory
Expand All @@ -16,15 +23,21 @@ public class Response {
private final Boolean success;
private final String err;
private final Boolean fallback;
private final Boolean serve;
private final byte[] serveResponse;
private final Boolean onSuccess;
// FIXME: Should this be Message object from this package? That would allow parity with other SDKs (specially Go)
// Currently done this way to prevent conversion in buildResult method.
private final SinkResponse.Result.Message onSuccessMessage;

/**
/**
* Static method to create response for successful message processing.
*
* @param id id of the message
* @return Response object with success status
*/
public static Response responseOK(String id) {
return new Response(id, true, null, false);
return new Response(id, true, null, false, false, null, false, null);
}

/**
Expand All @@ -35,7 +48,7 @@ public static Response responseOK(String id) {
* @return Response object with failure status and error message
*/
public static Response responseFailure(String id, String errMsg) {
return new Response(id, false, errMsg, false);
return new Response(id, false, errMsg, false, false, null, false, null);
}

/**
Expand All @@ -46,6 +59,89 @@ public static Response responseFailure(String id, String errMsg) {
* @return Response object with fallback status
*/
public static Response responseFallback(String id) {
return new Response(id, false, null, true);
return new Response(id, false, null, true, false, null, false, null);
}

/**
* Static method to create response for serve message which is raw bytes.
* This indicates that the message should be sent to the serving store.
* Allows creation of serve message from raw bytes.
*
* @param id id of the message
* @param serveResponse Response object to be sent to the serving store
* @return Response object with serve status and serve response
*/
public static Response responseServe(String id, byte[] serveResponse) {
return new Response(id, false, null, false, true, serveResponse, false, null);
}

/**
* Static method to create response for onSuccess message. Allows creation of onSuccess message
* from protobuf Message object.
*
* @param id id of the message
* @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink
* @return Response object with onSuccess status and onSuccess message
*/
public static Response responseOnSuccess(String id, SinkResponse.Result.Message onSuccessMessage) {
return new Response(id, false, null, false, false, null, true, onSuccessMessage);
}

/**
* Overloaded static method to create response for onSuccess message. Allows creation of onSuccess message
* from OnSuccessMessage object.
*
* @param id id of the message
* @param onSuccessMessage OnSuccessMessage object to be sent to the onSuccess sink. Can be null
* if original message needs to be written to onSuccess sink
* @return Response object with onSuccess status and onSuccess message
*/
public static Response responseOnSuccess(String id, Message onSuccessMessage) {
if (onSuccessMessage == null) {
return new Response(id, false, null, false, false, null, true, null);
} else {

Map<String, MetadataOuterClass.KeyValueGroup> pbUserMetadata = MetadataOuterClass.Metadata
.getDefaultInstance()
.getUserMetadataMap();

if (onSuccessMessage.getUserMetadata() != null) {
pbUserMetadata =
onSuccessMessage.getUserMetadata()
.entrySet()
.stream()
.filter(e -> e.getKey() != null && e.getValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> MetadataOuterClass.KeyValueGroup.newBuilder()
.putAllKeyValue(e.getValue().getKeyValue() == null
? Collections.emptyMap()
: e.getValue()
.getKeyValue()
.entrySet()
.stream()
.filter(kv -> kv.getKey() != null
&& kv.getValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
kv -> ByteString.copyFrom(kv.getValue())
))
)
.build()
));
}

MetadataOuterClass.Metadata pbMetadata = MetadataOuterClass.Metadata.newBuilder()
.putAllUserMetadata(pbUserMetadata)
.build();

SinkResponse.Result.Message pbOnSuccessMessage = SinkResponse.Result.Message.newBuilder()
.addKeys(onSuccessMessage.getKey() == null ? "" : onSuccessMessage.getKey())
.setValue(onSuccessMessage.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(onSuccessMessage.getValue()))
.setMetadata(pbMetadata)
.build();

return new Response(id, false, null, false, false, null, true, pbOnSuccessMessage);
}
}
}
Loading
Loading