diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java index d52b4970e04a..57d6caf823eb 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java @@ -176,6 +176,11 @@ public synchronized String getCurrentProxyOMNodeId() { return omProxies.getNodeId(currentProxyIndex); } + @VisibleForTesting + public synchronized String getNextProxyOMNodeId() { + return omProxies.getNodeId(nextProxyIndex); + } + @VisibleForTesting public RetryPolicy getRetryPolicy(int maxFailovers) { // Client will attempt up to maxFailovers number of failovers between diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java index 3741abbbe879..a483fbc6870f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java @@ -26,6 +26,7 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.slf4j.Logger; @@ -64,13 +65,14 @@ public static OMResponse convertByteStringToOMResponse(ByteString bytes) throws } /** Convert the given reply with proto 3 {@link ByteString} to a proto 2 response. */ - public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply) throws IOException { + public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, RaftPeerId leaderOMNodeId) + throws IOException { final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent()); - if (reply.getReplierId().equals(response.getLeaderOMNodeId())) { + if (leaderOMNodeId == null) { return response; } return OMResponse.newBuilder(response) - .setLeaderOMNodeId(reply.getReplierId()) + .setLeaderOMNodeId(leaderOMNodeId.toString()) .build(); } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java index 7de5af31a626..9614e403f100 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java @@ -90,18 +90,7 @@ public Hadoop3OmTransport(ConfigurationSource conf, @Override public OMResponse submitRequest(OMRequest payload) throws IOException { try { - OMResponse omResponse = - rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); - - if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) { - String leaderOmId = omResponse.getLeaderOMNodeId(); - - // Failover to the OM node returned by OMResponse leaderOMNodeId if - // current proxy is not pointing to that node. - omFailoverProxyProvider.setNextOmProxy(leaderOmId); - omFailoverProxyProvider.performFailover(null); - } - return omResponse; + return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); } catch (ServiceException e) { OMNotLeaderException notLeaderException = HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java index 6bafe2a68151..7ed849721422 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java @@ -26,7 +26,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; @@ -57,10 +56,15 @@ import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMProxyInfo; +import org.apache.hadoop.ozone.om.protocolPB.OmTransport; +import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest.Scope; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; @@ -122,50 +126,6 @@ void testFollowerReadTargetsFollower() throws Exception { assertEquals(followerOMNodeId, lastProxy.getNodeId()); } - @Test - public void testOMProxyProviderFailoverToCurrentLeader() throws Exception { - ObjectStore objectStore = getObjectStore(); - HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider = - OmTestUtil.getFailoverProxyProvider(objectStore); - HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider = - OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore); - String initialFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId(); - - // Run couple of createVolume tests to discover the current Leader OM - createVolumeTest(true); - createVolumeTest(true); - - // The oMFailoverProxyProvider will point to the current leader OM node. - String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // Perform a manual failover of the proxy provider to move the - // currentProxyIndex to a node other than the leader OM. - omFailoverProxyProvider.selectNextOmProxy(); - omFailoverProxyProvider.performFailover(null); - - String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - assertNotEquals(leaderOMNodeId, newProxyNodeId); - - // Once another request is sent to this new proxy node, the leader - // information must be returned via the response and a failover must - // happen to the leader proxy node. - // This will also do some read operations where this might read from the follower. - createVolumeTest(true); - Thread.sleep(2000); - - String newLeaderOMNodeId = - omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // The old and new Leader OM NodeId must match since there was no new - // election in the Ratis ring. - assertEquals(leaderOMNodeId, newLeaderOMNodeId); - - // The follower read proxy should remain unchanged since the follower is not throwing exceptions - // The performFailover on the leader proxy should not affect the follower read proxy provider - String currentFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId(); - assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId); - } - /** * Choose a follower to send the request, the returned exception should * include the suggested leader node. @@ -461,4 +421,53 @@ public void testAllBucketOperations() throws Exception { OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND, () -> retVolume.deleteBucket(bucketName)); } + + @Test + void testOMResponseLeaderOmNodeId() throws Exception { + HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider = + OmTestUtil.getFailoverProxyProvider(getObjectStore()); + HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider = + OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore()); + + // Make sure All OMs are ready + createVolumeTest(true); + + // The OMFailoverProxyProvider will point to the current leader OM node. + String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); + String initialNextProxyOmNodeId = omFailoverProxyProvider.getNextProxyOMNodeId(); + OzoneManager followerOM = null; + for (OzoneManager om: getCluster().getOzoneManagersList()) { + if (!om.isLeaderReady()) { + followerOM = om; + break; + } + } + assertNotNull(followerOM); + assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER, + followerOM.getOmRatisServer().getLeaderStatus()); + + + ListVolumeRequest req = + ListVolumeRequest.newBuilder() + .setScope(Scope.VOLUMES_BY_USER) + .build(); + + OzoneManagerProtocolProtos.OMRequest readRequest = + OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(Type.ListVolume) + .setListVolumeRequest(req) + .setVersion(ClientVersion.CURRENT_VERSION) + .setClientId(randomUUID().toString()) + .build(); + + OmTransport omTransport = ((OzoneManagerProtocolClientSideTranslatorPB) + getObjectStore().getClientProxy().getOzoneManagerClient()).getTransport(); + followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOM.getOMNodeId()); + OMResponse omResponse = omTransport.submitRequest(readRequest); + + // The returned OM response should be the same as the actual leader OM node ID + assertEquals(leaderOMNodeId, omResponse.getLeaderOMNodeId()); + // There should not be any change in the leader proxy's next proxy OM node ID + assertEquals(initialNextProxyOmNodeId, omFailoverProxyProvider.getNextProxyOMNodeId()); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java index 06bec6481967..1baa2fe9bdc3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java @@ -33,7 +33,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -311,45 +310,6 @@ void testOMProxyProviderInitialization() { } } - /** - * Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is not - * the current OM Leader. - */ - @Test - public void testOMProxyProviderFailoverToCurrentLeader() throws Exception { - ObjectStore objectStore = getObjectStore(); - final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider - = OmTestUtil.getFailoverProxyProvider(objectStore); - - // Run couple of createVolume tests to discover the current Leader OM - createVolumeTest(true); - createVolumeTest(true); - - // The oMFailoverProxyProvider will point to the current leader OM node. - String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // Perform a manual failover of the proxy provider to move the - // currentProxyIndex to a node other than the leader OM. - omFailoverProxyProvider.selectNextOmProxy(); - omFailoverProxyProvider.performFailover(null); - - String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - assertNotEquals(leaderOMNodeId, newProxyNodeId); - - // Once another request is sent to this new proxy node, the leader - // information must be returned via the response and a failover must - // happen to the leader proxy node. - createVolumeTest(true); - Thread.sleep(2000); - - String newLeaderOMNodeId = - omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // The old and new Leader OM NodeId must match since there was no new - // election in the Ratis ring. - assertEquals(leaderOMNodeId, newLeaderOMNodeId); - } - /** * Choose a follower to send the request, the returned exception should * include the suggested leader node. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index aa2e42d72da3..99cddaec748e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -617,7 +617,7 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException { try { - return OMRatisHelper.getOMResponseFromRaftClientReply(reply); + return OMRatisHelper.getOMResponseFromRaftClientReply(reply, getLeaderId()); } catch (IOException ex) { if (ex.getMessage() != null) { throw new ServiceException(ex.getMessage(), ex); diff --git a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java index f66dc091de6c..919e06dc33fe 100644 --- a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java +++ b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java @@ -85,18 +85,7 @@ public Hadoop27RpcTransport(ConfigurationSource conf, @Override public OMResponse submitRequest(OMRequest payload) throws IOException { try { - OMResponse omResponse = - rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); - - if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) { - String leaderOmId = omResponse.getLeaderOMNodeId(); - - // Failover to the OM node returned by OMResponse leaderOMNodeId if - // current proxy is not pointing to that node. - omFailoverProxyProvider.setNextOmProxy(leaderOmId); - omFailoverProxyProvider.performFailover(null); - } - return omResponse; + return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); } catch (ServiceException e) { OMNotLeaderException notLeaderException = HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);