Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ public synchronized String getCurrentProxyOMNodeId() {
return omProxies.getNodeId(currentProxyIndex);
}

@VisibleForTesting
public synchronized String getNextProxyOMNodeId() {
return omProxies.getNodeId(nextProxyIndex);
}

@VisibleForTesting
public RetryPolicy getRetryPolicy(int maxFailovers) {
// Client will attempt up to maxFailovers number of failovers between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ public static OMResponse convertByteStringToOMResponse(ByteString bytes) throws
}

/** Convert the given reply with proto 3 {@link ByteString} to a proto 2 response. */
public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply) throws IOException {
public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, String leaderOMNodeId)
throws IOException {
final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent());
if (reply.getReplierId().equals(response.getLeaderOMNodeId())) {
return response;
OMResponse.Builder omResponse = OMResponse.newBuilder(response);
if (leaderOMNodeId != null) {
omResponse.setLeaderOMNodeId(leaderOMNodeId);
}
return OMResponse.newBuilder(response)
.setLeaderOMNodeId(reply.getReplierId())
.build();
return omResponse.build();
Comment on lines -69 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass RaftPeerId and rebuild response only if leaderOMNodeId != null.

  public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, RaftPeerId leaderOMNodeId)
      throws IOException {
    final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent());
    if (leaderOMNodeId == null) {
      return response;
    }
    return OMResponse.newBuilder(response)
        .setLeaderOMNodeId(leaderOMNodeId.toString())
        .build();
  }

}

/** Convert the given {@link StateMachineLogEntryProto} to a short {@link String}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,7 @@ public Hadoop3OmTransport(ConfigurationSource conf,
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
try {
OMResponse omResponse =
rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);

if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
String leaderOmId = omResponse.getLeaderOMNodeId();

// Failover to the OM node returned by OMResponse leaderOMNodeId if
// current proxy is not pointing to that node.
omFailoverProxyProvider.setNextOmProxy(leaderOmId);
omFailoverProxyProvider.performFailover(null);
}
return omResponse;
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand Down Expand Up @@ -57,10 +56,15 @@
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest.Scope;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
Expand Down Expand Up @@ -122,50 +126,6 @@ void testFollowerReadTargetsFollower() throws Exception {
assertEquals(followerOMNodeId, lastProxy.getNodeId());
}

@Test
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
ObjectStore objectStore = getObjectStore();
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> omFailoverProxyProvider =
OmTestUtil.getFailoverProxyProvider(objectStore);
HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
String initialFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();

// Run couple of createVolume tests to discover the current Leader OM
createVolumeTest(true);
createVolumeTest(true);

// The oMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();

// Perform a manual failover of the proxy provider to move the
// currentProxyIndex to a node other than the leader OM.
omFailoverProxyProvider.selectNextOmProxy();
omFailoverProxyProvider.performFailover(null);

String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
assertNotEquals(leaderOMNodeId, newProxyNodeId);

// Once another request is sent to this new proxy node, the leader
// information must be returned via the response and a failover must
// happen to the leader proxy node.
// This will also do some read operations where this might read from the follower.
createVolumeTest(true);
Thread.sleep(2000);

String newLeaderOMNodeId =
omFailoverProxyProvider.getCurrentProxyOMNodeId();

// The old and new Leader OM NodeId must match since there was no new
// election in the Ratis ring.
assertEquals(leaderOMNodeId, newLeaderOMNodeId);

// The follower read proxy should remain unchanged since the follower is not throwing exceptions
// The performFailover on the leader proxy should not affect the follower read proxy provider
String currentFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId);
}

/**
* Choose a follower to send the request, the returned exception should
* include the suggested leader node.
Expand Down Expand Up @@ -461,4 +421,53 @@ public void testAllBucketOperations() throws Exception {
OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND,
() -> retVolume.deleteBucket(bucketName));
}

@Test
void testOMResponseLeaderOmNodeId() throws Exception {
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> omFailoverProxyProvider =
OmTestUtil.getFailoverProxyProvider(getObjectStore());
HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());

// Make sure All OMs are ready
createVolumeTest(true);

// The OMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
String initialNextProxyOmNodeId = omFailoverProxyProvider.getNextProxyOMNodeId();
OzoneManager followerOM = null;
for (OzoneManager om: getCluster().getOzoneManagersList()) {
if (!om.isLeaderReady()) {
followerOM = om;
break;
}
}
assertNotNull(followerOM);
assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER,
followerOM.getOmRatisServer().getLeaderStatus());


ListVolumeRequest req =
ListVolumeRequest.newBuilder()
.setScope(Scope.VOLUMES_BY_USER)
.build();

OzoneManagerProtocolProtos.OMRequest readRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(Type.ListVolume)
.setListVolumeRequest(req)
.setVersion(ClientVersion.CURRENT_VERSION)
.setClientId(randomUUID().toString())
.build();

OmTransport omTransport = ((OzoneManagerProtocolClientSideTranslatorPB)
getObjectStore().getClientProxy().getOzoneManagerClient()).getTransport();
followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOM.getOMNodeId());
OMResponse omResponse = omTransport.submitRequest(readRequest);

// The returned OM response should be the same as the actual leader OM node ID
assertEquals(leaderOMNodeId, omResponse.getLeaderOMNodeId());
// There should not be any change in the leader proxy's next proxy OM node ID
assertEquals(initialNextProxyOmNodeId, omFailoverProxyProvider.getNextProxyOMNodeId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -311,45 +310,6 @@ void testOMProxyProviderInitialization() {
}
}

/**
* Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is not
* the current OM Leader.
*/
@Test
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
ObjectStore objectStore = getObjectStore();
final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> omFailoverProxyProvider
= OmTestUtil.getFailoverProxyProvider(objectStore);

// Run couple of createVolume tests to discover the current Leader OM
createVolumeTest(true);
createVolumeTest(true);

// The oMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();

// Perform a manual failover of the proxy provider to move the
// currentProxyIndex to a node other than the leader OM.
omFailoverProxyProvider.selectNextOmProxy();
omFailoverProxyProvider.performFailover(null);

String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
assertNotEquals(leaderOMNodeId, newProxyNodeId);

// Once another request is sent to this new proxy node, the leader
// information must be returned via the response and a failover must
// happen to the leader proxy node.
createVolumeTest(true);
Thread.sleep(2000);

String newLeaderOMNodeId =
omFailoverProxyProvider.getCurrentProxyOMNodeId();

// The old and new Leader OM NodeId must match since there was no new
// election in the Ratis ring.
assertEquals(leaderOMNodeId, newLeaderOMNodeId);
}

/**
* Choose a follower to send the request, the returned exception should
* include the suggested leader node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,

private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException {
try {
return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
RaftPeerId currentLeader = getLeaderId();
return OMRatisHelper.getOMResponseFromRaftClientReply(reply,
currentLeader != null ? currentLeader.toString() : null);
} catch (IOException ex) {
if (ex.getMessage() != null) {
throw new ServiceException(ex.getMessage(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,7 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
try {
OMResponse omResponse =
rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);

if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
String leaderOmId = omResponse.getLeaderOMNodeId();

// Failover to the OM node returned by OMResponse leaderOMNodeId if
// current proxy is not pointing to that node.
omFailoverProxyProvider.setNextOmProxy(leaderOmId);
omFailoverProxyProvider.performFailover(null);
}
return omResponse;
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
Expand Down