From fbe785fb6aa4143a2985521d96970c986cc69c24 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 23 Jan 2026 15:55:02 +0800 Subject: [PATCH 1/5] HDDS-14426. OMResponse.leaderOMNodeId should not use RaftClientMessage.serverId --- .../om/ha/OMFailoverProxyProviderBase.java | 5 ++ .../ozone/om/helpers/OMRatisHelper.java | 12 ++--- ...neManagerHAFollowerReadWithAllRunning.java | 54 +++++++++++++++++++ .../om/ratis/OzoneManagerRatisServer.java | 4 +- 4 files changed, 68 insertions(+), 7 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java index d52b4970e04a..57d6caf823eb 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java @@ -176,6 +176,11 @@ public synchronized String getCurrentProxyOMNodeId() { return omProxies.getNodeId(currentProxyIndex); } + @VisibleForTesting + public synchronized String getNextProxyOMNodeId() { + return omProxies.getNodeId(nextProxyIndex); + } + @VisibleForTesting public RetryPolicy getRetryPolicy(int maxFailovers) { // Client will attempt up to maxFailovers number of failovers between diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java index 3741abbbe879..db58eebe6a2c 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java @@ -64,14 +64,14 @@ public static OMResponse convertByteStringToOMResponse(ByteString bytes) throws } /** Convert the given reply with proto 3 {@link ByteString} to a proto 2 response. */ - public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply) throws IOException { + public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, String leaderOMNodeId) + throws IOException { final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent()); - if (reply.getReplierId().equals(response.getLeaderOMNodeId())) { - return response; + OMResponse.Builder omResponse = OMResponse.newBuilder(response); + if (leaderOMNodeId != null) { + omResponse.setLeaderOMNodeId(leaderOMNodeId); } - return OMResponse.newBuilder(response) - .setLeaderOMNodeId(reply.getReplierId()) - .build(); + return omResponse.build(); } /** Convert the given {@link StateMachineLogEntryProto} to a short {@link String}. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java index 6bafe2a68151..a058f6794c54 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java @@ -57,10 +57,15 @@ import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMProxyInfo; +import org.apache.hadoop.ozone.om.protocolPB.OmTransport; +import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest.Scope; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; @@ -461,4 +466,53 @@ public void testAllBucketOperations() throws Exception { OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND, () -> retVolume.deleteBucket(bucketName)); } + + @Test + void testOMResponseLeaderOmNodeId() throws Exception { + HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider = + OmTestUtil.getFailoverProxyProvider(getObjectStore()); + HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider = + OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore()); + + // Make sure All OMs are ready + createVolumeTest(true); + + // The OMFailoverProxyProvider will point to the current leader OM node. + String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); + String initialNextProxyOmNodeId = omFailoverProxyProvider.getNextProxyOMNodeId(); + OzoneManager followerOM = null; + for (OzoneManager om: getCluster().getOzoneManagersList()) { + if (!om.isLeaderReady()) { + followerOM = om; + break; + } + } + assertNotNull(followerOM); + assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER, + followerOM.getOmRatisServer().getLeaderStatus()); + + + ListVolumeRequest req = + ListVolumeRequest.newBuilder() + .setScope(Scope.VOLUMES_BY_USER) + .build(); + + OzoneManagerProtocolProtos.OMRequest readRequest = + OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(Type.ListVolume) + .setListVolumeRequest(req) + .setVersion(ClientVersion.CURRENT_VERSION) + .setClientId(randomUUID().toString()) + .build(); + + OmTransport omTransport = ((OzoneManagerProtocolClientSideTranslatorPB) + getObjectStore().getClientProxy().getOzoneManagerClient()).getTransport(); + followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOM.getOMNodeId()); + OMResponse omResponse = omTransport.submitRequest(readRequest); + + // The returned OM response should be the same as the actual leader OM node ID + assertEquals(leaderOMNodeId, omResponse.getLeaderOMNodeId()); + // There should not be any change in the leader proxy's next proxy OM node ID + assertEquals(initialNextProxyOmNodeId, omFailoverProxyProvider.getNextProxyOMNodeId()); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index aa2e42d72da3..186f1a438013 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -617,7 +617,9 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException { try { - return OMRatisHelper.getOMResponseFromRaftClientReply(reply); + RaftPeerId currentLeader = getLeaderId(); + return OMRatisHelper.getOMResponseFromRaftClientReply(reply, + currentLeader != null ? currentLeader.toString() : null); } catch (IOException ex) { if (ex.getMessage() != null) { throw new ServiceException(ex.getMessage(), ex); From 1bdc25c6291d7042e4b8d3d44d55f024ff6e28dc Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 27 Jan 2026 17:54:24 +0800 Subject: [PATCH 2/5] Remove the failover logic on the Hadoop transport --- .../ozone/om/protocolPB/Hadoop3OmTransport.java | 13 +------------ .../hadoop/fs/ozone/Hadoop27RpcTransport.java | 13 +------------ 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java index 7de5af31a626..9614e403f100 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java @@ -90,18 +90,7 @@ public Hadoop3OmTransport(ConfigurationSource conf, @Override public OMResponse submitRequest(OMRequest payload) throws IOException { try { - OMResponse omResponse = - rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); - - if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) { - String leaderOmId = omResponse.getLeaderOMNodeId(); - - // Failover to the OM node returned by OMResponse leaderOMNodeId if - // current proxy is not pointing to that node. - omFailoverProxyProvider.setNextOmProxy(leaderOmId); - omFailoverProxyProvider.performFailover(null); - } - return omResponse; + return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); } catch (ServiceException e) { OMNotLeaderException notLeaderException = HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e); diff --git a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java index f66dc091de6c..919e06dc33fe 100644 --- a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java +++ b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java @@ -85,18 +85,7 @@ public Hadoop27RpcTransport(ConfigurationSource conf, @Override public OMResponse submitRequest(OMRequest payload) throws IOException { try { - OMResponse omResponse = - rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); - - if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) { - String leaderOmId = omResponse.getLeaderOMNodeId(); - - // Failover to the OM node returned by OMResponse leaderOMNodeId if - // current proxy is not pointing to that node. - omFailoverProxyProvider.setNextOmProxy(leaderOmId); - omFailoverProxyProvider.performFailover(null); - } - return omResponse; + return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); } catch (ServiceException e) { OMNotLeaderException notLeaderException = HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e); From 571b888786fc0d65ecfcacec715188e785d636ac Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 28 Jan 2026 17:02:15 +0800 Subject: [PATCH 3/5] Remove testOMProxyProviderFailoverToCurrentLeader --- ...neManagerHAFollowerReadWithAllRunning.java | 45 ------------------- .../om/TestOzoneManagerHAWithAllRunning.java | 40 ----------------- 2 files changed, 85 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java index a058f6794c54..7ed849721422 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java @@ -26,7 +26,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; @@ -127,50 +126,6 @@ void testFollowerReadTargetsFollower() throws Exception { assertEquals(followerOMNodeId, lastProxy.getNodeId()); } - @Test - public void testOMProxyProviderFailoverToCurrentLeader() throws Exception { - ObjectStore objectStore = getObjectStore(); - HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider = - OmTestUtil.getFailoverProxyProvider(objectStore); - HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider = - OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore); - String initialFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId(); - - // Run couple of createVolume tests to discover the current Leader OM - createVolumeTest(true); - createVolumeTest(true); - - // The oMFailoverProxyProvider will point to the current leader OM node. - String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // Perform a manual failover of the proxy provider to move the - // currentProxyIndex to a node other than the leader OM. - omFailoverProxyProvider.selectNextOmProxy(); - omFailoverProxyProvider.performFailover(null); - - String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - assertNotEquals(leaderOMNodeId, newProxyNodeId); - - // Once another request is sent to this new proxy node, the leader - // information must be returned via the response and a failover must - // happen to the leader proxy node. - // This will also do some read operations where this might read from the follower. - createVolumeTest(true); - Thread.sleep(2000); - - String newLeaderOMNodeId = - omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // The old and new Leader OM NodeId must match since there was no new - // election in the Ratis ring. - assertEquals(leaderOMNodeId, newLeaderOMNodeId); - - // The follower read proxy should remain unchanged since the follower is not throwing exceptions - // The performFailover on the leader proxy should not affect the follower read proxy provider - String currentFollowerReadNodeId = followerReadFailoverProxyProvider.getCurrentProxy().getNodeId(); - assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId); - } - /** * Choose a follower to send the request, the returned exception should * include the suggested leader node. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java index 06bec6481967..1baa2fe9bdc3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java @@ -33,7 +33,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -311,45 +310,6 @@ void testOMProxyProviderInitialization() { } } - /** - * Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is not - * the current OM Leader. - */ - @Test - public void testOMProxyProviderFailoverToCurrentLeader() throws Exception { - ObjectStore objectStore = getObjectStore(); - final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider - = OmTestUtil.getFailoverProxyProvider(objectStore); - - // Run couple of createVolume tests to discover the current Leader OM - createVolumeTest(true); - createVolumeTest(true); - - // The oMFailoverProxyProvider will point to the current leader OM node. - String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // Perform a manual failover of the proxy provider to move the - // currentProxyIndex to a node other than the leader OM. - omFailoverProxyProvider.selectNextOmProxy(); - omFailoverProxyProvider.performFailover(null); - - String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); - assertNotEquals(leaderOMNodeId, newProxyNodeId); - - // Once another request is sent to this new proxy node, the leader - // information must be returned via the response and a failover must - // happen to the leader proxy node. - createVolumeTest(true); - Thread.sleep(2000); - - String newLeaderOMNodeId = - omFailoverProxyProvider.getCurrentProxyOMNodeId(); - - // The old and new Leader OM NodeId must match since there was no new - // election in the Ratis ring. - assertEquals(leaderOMNodeId, newLeaderOMNodeId); - } - /** * Choose a follower to send the request, the returned exception should * include the suggested leader node. From 69254bfc1b1a7428852b218ff855dff3ef881e51 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 29 Jan 2026 09:25:40 +0800 Subject: [PATCH 4/5] Address comment --- .../apache/hadoop/ozone/om/helpers/OMRatisHelper.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java index db58eebe6a2c..adb7cb7ba495 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java @@ -67,11 +67,12 @@ public static OMResponse convertByteStringToOMResponse(ByteString bytes) throws public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, String leaderOMNodeId) throws IOException { final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent()); - OMResponse.Builder omResponse = OMResponse.newBuilder(response); - if (leaderOMNodeId != null) { - omResponse.setLeaderOMNodeId(leaderOMNodeId); + if (leaderOMNodeId == null) { + return response; } - return omResponse.build(); + return OMResponse.newBuilder(response) + .setLeaderOMNodeId(leaderOMNodeId) + .build(); } /** Convert the given {@link StateMachineLogEntryProto} to a short {@link String}. */ From 41ec029ab5df8a0b8887b81fde80d0767b682c88 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 29 Jan 2026 09:27:22 +0800 Subject: [PATCH 5/5] Pass RaftPeerId directly --- .../org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java | 5 +++-- .../hadoop/ozone/om/ratis/OzoneManagerRatisServer.java | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java index adb7cb7ba495..a483fbc6870f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java @@ -26,6 +26,7 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.slf4j.Logger; @@ -64,14 +65,14 @@ public static OMResponse convertByteStringToOMResponse(ByteString bytes) throws } /** Convert the given reply with proto 3 {@link ByteString} to a proto 2 response. */ - public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, String leaderOMNodeId) + public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply, RaftPeerId leaderOMNodeId) throws IOException { final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent()); if (leaderOMNodeId == null) { return response; } return OMResponse.newBuilder(response) - .setLeaderOMNodeId(leaderOMNodeId) + .setLeaderOMNodeId(leaderOMNodeId.toString()) .build(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 186f1a438013..99cddaec748e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -617,9 +617,7 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException { try { - RaftPeerId currentLeader = getLeaderId(); - return OMRatisHelper.getOMResponseFromRaftClientReply(reply, - currentLeader != null ? currentLeader.toString() : null); + return OMRatisHelper.getOMResponseFromRaftClientReply(reply, getLeaderId()); } catch (IOException ex) { if (ex.getMessage() != null) { throw new ServiceException(ex.getMessage(), ex);