From 97af90d0aff04def30cd02ffc4498b201b252bdb Mon Sep 17 00:00:00 2001 From: chengyouling Date: Mon, 26 Jan 2026 16:39:57 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=A0=B9=E6=8D=AEwatch=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E6=8B=89=E5=8F=96=E6=8C=87=E5=AE=9A=E5=BA=94=E7=94=A8=E3=80=81?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=90=8D=E7=9A=84=E5=AE=9E=E4=BE=8B=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../center/client/DiscoveryEvents.java | 45 +++++++++++++++++++ .../center/client/ServiceCenterClient.java | 3 +- .../center/client/ServiceCenterDiscovery.java | 43 +++++++++++++++++- .../center/client/ServiceCenterOperation.java | 3 +- .../center/client/ServiceCenterWatch.java | 2 +- 5 files changed, 89 insertions(+), 7 deletions(-) diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java index e45720f5ebb..914a033851f 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java @@ -20,6 +20,11 @@ import java.util.List; import org.apache.servicecomb.service.center.client.model.MicroserviceInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; public abstract class DiscoveryEvents { public static class InstanceChangedEvent extends DiscoveryEvents { @@ -52,6 +57,46 @@ public List getInstances() { * internal events to ask for a immediate instance pull */ public static class PullInstanceEvent extends DiscoveryEvents { + private static final Logger LOGGER = LoggerFactory.getLogger(PullInstanceEvent.class); + + private final String appId; + + private final String serviceName; + + public PullInstanceEvent(String message) { + JsonNode messageNode = parseJsonString(message); + this.appId = getContextFromNode(messageNode, "appId"); + this.serviceName = getContextFromNode(messageNode, "serviceName"); + } + + public String getAppId() { + return appId; + } + public String getServiceName() { + return serviceName; + } + + private JsonNode parseJsonString(String message) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readTree(message); + } catch (Exception e) { + LOGGER.error("parse message [{}] failed!", message, e); + return null; + } + } + + private String getContextFromNode(JsonNode messageNode, String itemKey) { + if (messageNode == null) { + return ""; + } + try { + return messageNode.get("key").get(itemKey).asText(); + } catch (Exception e) { + LOGGER.error("get [{}] context from node [{}] failed!", itemKey, e); + return ""; + } + } } } diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java index 9f6e2baf88e..1a9bcbc2a27 100755 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java @@ -237,8 +237,7 @@ public RegisteredMicroserviceInstanceResponse registerMicroserviceInstance(Micro @Override public FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName, - String versionRule, - String revision) { + String versionRule, String revision) { try { Map headers = new HashMap<>(); headers.put("X-ConsumerId", consumerId); diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java index 2628e9c3e64..c66aae7525b 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java @@ -24,6 +24,7 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.http.client.task.AbstractTask; import org.apache.servicecomb.http.client.task.Task; import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent; @@ -151,7 +152,45 @@ public void onPullInstanceEvent(PullInstanceEvent event) { return; } pullInstanceTaskOnceInProgress = true; - startTask(new PullInstanceOnceTask()); + if (StringUtils.isEmpty(event.getAppId()) || StringUtils.isEmpty(event.getServiceName())) { + // If the application or service name cannot be resolved, pulled all services. + startTask(new PullInstanceOnceTask()); + return; + } + try { + pullTargetService(event); + } finally { + pullInstanceTaskOnceInProgress = false; + } + } + + private void pullTargetService(PullInstanceEvent event) { + SubscriptionKey currentKey = new SubscriptionKey(event.getAppId(), event.getServiceName()); + if (instancesCache.get(currentKey) == null) { + // No pull during the service startup phase. + return; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("pull [{}#{}] instances from service center", event.getAppId(), event.getServiceName()); + } + String originRev = instancesCache.get(currentKey).revision; + for (int i = 1; i <= 3; i++) { + try { + pullInstance(currentKey, instancesCache.get(currentKey), true); + String currentRev = instancesCache.get(currentKey).revision; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev); + } + if (!originRev.equals(currentRev)) { + break; + } + int positive = random.nextInt(300); + int sign = random.nextBoolean() ? 1 : -1; + Thread.sleep(i * 1000 + sign * positive); + } catch (Exception e) { + LOGGER.error("pull [{}#{}] instances failed.", event.getAppId(), event.getServiceName(), e); + } + } } private void pullInstance(SubscriptionKey k, SubscriptionValue v, boolean sendChangedEvent) { @@ -250,7 +289,7 @@ private static String instanceToString(List instances) { sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint); sb.append("|"); } - sb.append(instance.getServiceName()); + sb.append(instance.getStatus()); sb.append("|"); } sb.append("#"); diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java index 5c9fd21cb3a..15dd1b5de35 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java @@ -114,8 +114,7 @@ public interface ServiceCenterOperation { * @throws OperationException If some problems happened to contact service center or non http 200 returned.n */ FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName, - String versionRule, - String revision); + String versionRule, String revision); /** * Delete a microservice instance diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java index 46ba02b8b9a..a7d678e30f2 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java @@ -177,7 +177,7 @@ private void backOff() { @Override public void onMessage(String s) { LOGGER.info("web socket receive message [{}], start query instance", s); - this.eventBus.post(new PullInstanceEvent()); + this.eventBus.post(new PullInstanceEvent(s)); } @Override From 3596640f7169e3e992bf5627d90ca279439fb4e5 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Tue, 27 Jan 2026 17:19:25 +0800 Subject: [PATCH 2/4] =?UTF-8?q?ObjectMapper=E4=BD=9C=E4=B8=BA=E5=B8=B8?= =?UTF-8?q?=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../servicecomb/service/center/client/DiscoveryEvents.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java index 914a033851f..916c9024c07 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; public abstract class DiscoveryEvents { + private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static class InstanceChangedEvent extends DiscoveryEvents { private final String appName; @@ -79,8 +81,7 @@ public String getServiceName() { private JsonNode parseJsonString(String message) { try { - ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readTree(message); + return OBJECT_MAPPER.readTree(message); } catch (Exception e) { LOGGER.error("parse message [{}] failed!", message, e); return null; From 951c58f992e74813a8346bfb00b79e0a061c4da9 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Wed, 28 Jan 2026 15:48:41 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BD=BF=E7=94=A8timer=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../center/client/ServiceCenterDiscovery.java | 61 ++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java index c66aae7525b..6a58d1203db 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Objects; import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; @@ -103,6 +105,8 @@ public static class SubscriptionValue { private final Random random = new Random(); + private Timer timer; + public ServiceCenterDiscovery(ServiceCenterClient serviceCenterClient, EventBus eventBus) { super("service-center-discovery-task"); this.serviceCenterClient = serviceCenterClient; @@ -158,39 +162,54 @@ public void onPullInstanceEvent(PullInstanceEvent event) { return; } try { - pullTargetService(event); + String appId = event.getAppId(); + String serviceName = event.getServiceName(); + if (!refreshTargetServiceSuccess(appId, serviceName)) { + int positive = random.nextInt(300); + int sign = random.nextBoolean() ? 1 : -1; + long delayTime = 2000L + sign * positive; + if (timer == null) { + timer = new Timer("event-retry-pull-task"); + } + timer.schedule(new PullTargetServiceTask(appId, serviceName), delayTime); + } } finally { pullInstanceTaskOnceInProgress = false; } } - private void pullTargetService(PullInstanceEvent event) { - SubscriptionKey currentKey = new SubscriptionKey(event.getAppId(), event.getServiceName()); + class PullTargetServiceTask extends TimerTask { + private final String appId; + + private final String serviceName; + + public PullTargetServiceTask(String appId, String serviceName) { + this.appId = appId; + this.serviceName = serviceName; + } + + @Override + public void run() { + refreshTargetServiceSuccess(appId, serviceName); + } + } + + private boolean refreshTargetServiceSuccess(String appId, String serviceName) { + SubscriptionKey currentKey = new SubscriptionKey(appId, serviceName); if (instancesCache.get(currentKey) == null) { // No pull during the service startup phase. - return; + return true; } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("pull [{}#{}] instances from service center", event.getAppId(), event.getServiceName()); + LOGGER.debug("pull [{}#{}] instances from service center", appId, serviceName); } String originRev = instancesCache.get(currentKey).revision; - for (int i = 1; i <= 3; i++) { - try { - pullInstance(currentKey, instancesCache.get(currentKey), true); - String currentRev = instancesCache.get(currentKey).revision; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev); - } - if (!originRev.equals(currentRev)) { - break; - } - int positive = random.nextInt(300); - int sign = random.nextBoolean() ? 1 : -1; - Thread.sleep(i * 1000 + sign * positive); - } catch (Exception e) { - LOGGER.error("pull [{}#{}] instances failed.", event.getAppId(), event.getServiceName(), e); - } + pullInstance(currentKey, instancesCache.get(currentKey), true); + String currentRev = instancesCache.get(currentKey).revision; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev); } + return !originRev.equals(currentRev); } private void pullInstance(SubscriptionKey k, SubscriptionValue v, boolean sendChangedEvent) { From 11de8d66d1355368d339cc76fdd7f5e68d783f80 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Wed, 28 Jan 2026 15:48:46 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BD=BF=E7=94=A8timer=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../servicecomb/service/center/client/DiscoveryEvents.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java index 916c9024c07..db0d12b1ce1 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java @@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; public abstract class DiscoveryEvents { - private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static class InstanceChangedEvent extends DiscoveryEvents { private final String appName;