Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
import java.util.List;

import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public abstract class DiscoveryEvents {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public static class InstanceChangedEvent extends DiscoveryEvents {
private final String appName;

Expand Down Expand Up @@ -52,6 +59,45 @@ public List<MicroserviceInstance> getInstances() {
* internal events to ask for a immediate instance pull
*/
public static class PullInstanceEvent extends DiscoveryEvents {
private static final Logger LOGGER = LoggerFactory.getLogger(PullInstanceEvent.class);

private final String appId;

private final String serviceName;

public PullInstanceEvent(String message) {
JsonNode messageNode = parseJsonString(message);
this.appId = getContextFromNode(messageNode, "appId");
this.serviceName = getContextFromNode(messageNode, "serviceName");
}

public String getAppId() {
return appId;
}

public String getServiceName() {
return serviceName;
}

private JsonNode parseJsonString(String message) {
try {
return OBJECT_MAPPER.readTree(message);
} catch (Exception e) {
LOGGER.error("parse message [{}] failed!", message, e);
return null;
}
}

private String getContextFromNode(JsonNode messageNode, String itemKey) {
if (messageNode == null) {
return "";
}
try {
return messageNode.get("key").get(itemKey).asText();
} catch (Exception e) {
LOGGER.error("get [{}] context from node [{}] failed!", itemKey, e);
return "";
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ public RegisteredMicroserviceInstanceResponse registerMicroserviceInstance(Micro

@Override
public FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName,
String versionRule,
String revision) {
String versionRule, String revision) {
try {
Map<String, String> headers = new HashMap<>();
headers.put("X-ConsumerId", consumerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.http.client.task.AbstractTask;
import org.apache.servicecomb.http.client.task.Task;
import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent;
Expand Down Expand Up @@ -102,6 +105,8 @@ public static class SubscriptionValue {

private final Random random = new Random();

private Timer timer;

public ServiceCenterDiscovery(ServiceCenterClient serviceCenterClient, EventBus eventBus) {
super("service-center-discovery-task");
this.serviceCenterClient = serviceCenterClient;
Expand Down Expand Up @@ -151,7 +156,60 @@ public void onPullInstanceEvent(PullInstanceEvent event) {
return;
}
pullInstanceTaskOnceInProgress = true;
startTask(new PullInstanceOnceTask());
if (StringUtils.isEmpty(event.getAppId()) || StringUtils.isEmpty(event.getServiceName())) {
// If the application or service name cannot be resolved, pulled all services.
startTask(new PullInstanceOnceTask());
return;
}
try {
String appId = event.getAppId();
String serviceName = event.getServiceName();
if (!refreshTargetServiceSuccess(appId, serviceName)) {
int positive = random.nextInt(300);
int sign = random.nextBoolean() ? 1 : -1;
long delayTime = 2000L + sign * positive;
if (timer == null) {
timer = new Timer("event-retry-pull-task");
}
timer.schedule(new PullTargetServiceTask(appId, serviceName), delayTime);
}
} finally {
pullInstanceTaskOnceInProgress = false;
}
}

class PullTargetServiceTask extends TimerTask {
private final String appId;

private final String serviceName;

public PullTargetServiceTask(String appId, String serviceName) {
this.appId = appId;
this.serviceName = serviceName;
}

@Override
public void run() {
refreshTargetServiceSuccess(appId, serviceName);
}
}

private boolean refreshTargetServiceSuccess(String appId, String serviceName) {
SubscriptionKey currentKey = new SubscriptionKey(appId, serviceName);
if (instancesCache.get(currentKey) == null) {
// No pull during the service startup phase.
return true;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("pull [{}#{}] instances from service center", appId, serviceName);
}
String originRev = instancesCache.get(currentKey).revision;
pullInstance(currentKey, instancesCache.get(currentKey), true);
String currentRev = instancesCache.get(currentKey).revision;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev);
}
return !originRev.equals(currentRev);
}

private void pullInstance(SubscriptionKey k, SubscriptionValue v, boolean sendChangedEvent) {
Expand Down Expand Up @@ -250,7 +308,7 @@ private static String instanceToString(List<MicroserviceInstance> instances) {
sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint);
sb.append("|");
}
sb.append(instance.getServiceName());
sb.append(instance.getStatus());
sb.append("|");
}
sb.append("#");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public interface ServiceCenterOperation {
* @throws OperationException If some problems happened to contact service center or non http 200 returned.n
*/
FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName,
String versionRule,
String revision);
String versionRule, String revision);

/**
* Delete a microservice instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private void backOff() {
@Override
public void onMessage(String s) {
LOGGER.info("web socket receive message [{}], start query instance", s);
this.eventBus.post(new PullInstanceEvent());
this.eventBus.post(new PullInstanceEvent(s));
}

@Override
Expand Down
Loading