diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java index e45720f5eb..db0d12b1ce 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java @@ -20,8 +20,15 @@ import java.util.List; import org.apache.servicecomb.service.center.client.model.MicroserviceInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; public abstract class DiscoveryEvents { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static class InstanceChangedEvent extends DiscoveryEvents { private final String appName; @@ -52,6 +59,45 @@ public List getInstances() { * internal events to ask for a immediate instance pull */ public static class PullInstanceEvent extends DiscoveryEvents { + private static final Logger LOGGER = LoggerFactory.getLogger(PullInstanceEvent.class); + + private final String appId; + + private final String serviceName; + + public PullInstanceEvent(String message) { + JsonNode messageNode = parseJsonString(message); + this.appId = getContextFromNode(messageNode, "appId"); + this.serviceName = getContextFromNode(messageNode, "serviceName"); + } + + public String getAppId() { + return appId; + } + + public String getServiceName() { + return serviceName; + } + + private JsonNode parseJsonString(String message) { + try { + return OBJECT_MAPPER.readTree(message); + } catch (Exception e) { + LOGGER.error("parse message [{}] failed!", message, e); + return null; + } + } + private String getContextFromNode(JsonNode messageNode, String itemKey) { + if (messageNode == null) { + return ""; + } + try { + return messageNode.get("key").get(itemKey).asText(); + } catch (Exception e) { + LOGGER.error("get [{}] context from node [{}] failed!", itemKey, e); + return ""; + } + } } } diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java index 9f6e2baf88..1a9bcbc2a2 100755 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java @@ -237,8 +237,7 @@ public RegisteredMicroserviceInstanceResponse registerMicroserviceInstance(Micro @Override public FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName, - String versionRule, - String revision) { + String versionRule, String revision) { try { Map headers = new HashMap<>(); headers.put("X-ConsumerId", consumerId); diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java index 2628e9c3e6..6a58d1203d 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java @@ -22,8 +22,11 @@ import java.util.Map; import java.util.Objects; import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.http.client.task.AbstractTask; import org.apache.servicecomb.http.client.task.Task; import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent; @@ -102,6 +105,8 @@ public static class SubscriptionValue { private final Random random = new Random(); + private Timer timer; + public ServiceCenterDiscovery(ServiceCenterClient serviceCenterClient, EventBus eventBus) { super("service-center-discovery-task"); this.serviceCenterClient = serviceCenterClient; @@ -151,7 +156,60 @@ public void onPullInstanceEvent(PullInstanceEvent event) { return; } pullInstanceTaskOnceInProgress = true; - startTask(new PullInstanceOnceTask()); + if (StringUtils.isEmpty(event.getAppId()) || StringUtils.isEmpty(event.getServiceName())) { + // If the application or service name cannot be resolved, pulled all services. + startTask(new PullInstanceOnceTask()); + return; + } + try { + String appId = event.getAppId(); + String serviceName = event.getServiceName(); + if (!refreshTargetServiceSuccess(appId, serviceName)) { + int positive = random.nextInt(300); + int sign = random.nextBoolean() ? 1 : -1; + long delayTime = 2000L + sign * positive; + if (timer == null) { + timer = new Timer("event-retry-pull-task"); + } + timer.schedule(new PullTargetServiceTask(appId, serviceName), delayTime); + } + } finally { + pullInstanceTaskOnceInProgress = false; + } + } + + class PullTargetServiceTask extends TimerTask { + private final String appId; + + private final String serviceName; + + public PullTargetServiceTask(String appId, String serviceName) { + this.appId = appId; + this.serviceName = serviceName; + } + + @Override + public void run() { + refreshTargetServiceSuccess(appId, serviceName); + } + } + + private boolean refreshTargetServiceSuccess(String appId, String serviceName) { + SubscriptionKey currentKey = new SubscriptionKey(appId, serviceName); + if (instancesCache.get(currentKey) == null) { + // No pull during the service startup phase. + return true; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("pull [{}#{}] instances from service center", appId, serviceName); + } + String originRev = instancesCache.get(currentKey).revision; + pullInstance(currentKey, instancesCache.get(currentKey), true); + String currentRev = instancesCache.get(currentKey).revision; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev); + } + return !originRev.equals(currentRev); } private void pullInstance(SubscriptionKey k, SubscriptionValue v, boolean sendChangedEvent) { @@ -250,7 +308,7 @@ private static String instanceToString(List instances) { sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint); sb.append("|"); } - sb.append(instance.getServiceName()); + sb.append(instance.getStatus()); sb.append("|"); } sb.append("#"); diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java index 5c9fd21cb3..15dd1b5de3 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java @@ -114,8 +114,7 @@ public interface ServiceCenterOperation { * @throws OperationException If some problems happened to contact service center or non http 200 returned.n */ FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName, - String versionRule, - String revision); + String versionRule, String revision); /** * Delete a microservice instance diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java index 46ba02b8b9..a7d678e30f 100644 --- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java +++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java @@ -177,7 +177,7 @@ private void backOff() { @Override public void onMessage(String s) { LOGGER.info("web socket receive message [{}], start query instance", s); - this.eventBus.post(new PullInstanceEvent()); + this.eventBus.post(new PullInstanceEvent(s)); } @Override