本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。

Eureka Server 源码(服务接口)

本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写

观察 EurekaServer 的启动流程可以发现, 仪表盘的 WEB 服务, 是由 SpringMVC 提供的, 但是除了 SpringMVC, EurekaServer 还加载了 Jersey 发布 Restful 服务, 处理客户端请求, 我们先来看看加载 Bean 时的两个核心方法:

private static final String[] EUREKA_PACKAGES = new String[] {
        "com.netflix.discovery", "com.netflix.eureka" };

@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
        ResourceLoader resourceLoader) {
    // 1. 扫描器实例创建
    ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
            false, environment);
    // 2. 定义需要扫描的注解
    provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
    provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    // 3. 扫描指定包并将对象 put 进 classes 中
    Set<Class<?>> classes = new HashSet<>();
    for (String basePackage : EUREKA_PACKAGES) {
        Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
        for (BeanDefinition bd : beans) {
            Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                    resourceLoader.getClassLoader());
            classes.add(cls);
        }
    }

    // 4. 资源文件相关配置
    Map<String, Object> propsAndFeatures = new HashMap<>();
    propsAndFeatures.put(
            // Skip static content used by the webapp
            ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
            EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

    DefaultResourceConfig rc = new DefaultResourceConfig(classes);
    rc.setPropertiesAndFeatures(propsAndFeatures);

    return rc;
}

@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(
        javax.ws.rs.core.Application eurekaJerseyApp) { // 1. 注入上个个方法的实例
    FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
    bean.setFilter(new ServletContainer(eurekaJerseyApp));
    bean.setOrder(Ordered.LOWEST_PRECEDENCE);
    bean.setUrlPatterns(
            Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

    return bean;
}

通过上述源码得知, EurekaServer 配置 Jersey 会扫描 com.netflix.discoverycom.netflix.eureka 包下, 包含 @Path@Provider 注解的类注册为 WEB 服务, 我们来看看主要的几个接口:

1) ApplicationResource.addInstance 注册服务

com.netflix.eureka.resources.ApplicationResource#addInstance

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // 1. 实例参数校验
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getIPAddr())) {
        return Response.status(400).entity("Missing ip address").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }
    // 2. 错误数据纠正处理
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }
    // 3. 注册
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

我们接着第 3 步注册方法往里走(只看集群实现):
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    // 1. 服务失效事件设置, 客户端未配置则使用默认
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 2. 注册服务
    super.register(info, leaseDuration, isReplication);
    // 3. 将服务复制给其他对等节点
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

注册的方法我们就不看了, 接着看看第 3 步的复制代码:

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    // 1. 监控相关
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // 如已复制则不走了
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 2. 遍历所有对等节点
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 3. 如果遍历到自己, 忽略
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 4. 复制给对等节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

private void replicateInstanceActionsToPeers(Action action, String appName,
                                              String id, InstanceInfo info, InstanceStatus newStatus,
                                              PeerEurekaNode node) {
     try {
         InstanceInfo infoFromRegistry;
         CurrentRequestVersion.set(Version.V2);
         switch (action) {
             case Cancel: // 取消实例
                 node.cancel(appName, id);
                 break;
             case Heartbeat: // 心跳续约
                 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                 node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                 break;
             case Register: // 注册实例
                 node.register(info);
                 break;
             case StatusUpdate: // 状态更新
                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                 node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                 break;
             case DeleteStatusOverride: // 删除覆盖状态
                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                 node.deleteStatusOverride(appName, id, infoFromRegistry);
                 break;
         }
     } catch (Throwable t) {
         logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
     } finally {
         CurrentRequestVersion.remove();
     }
 }

2) InstanceResource.renewLease 续约服务实例

com.netflix.eureka.resources.InstanceResource#renewLease

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // **核心方法**
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    Response response;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    return response;
}

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) { // 1. 续约
        // 2. 复制给对等节点
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

com.netflix.eureka.registry.AbstractInstanceRegistry#renew

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // 1. 获取服务实例集合
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id); // 2. 获取实例
    }
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // 3. 获取实例状态, 不正常处理
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        renewsLastMin.increment();
        leaseToRenew.renew(); // 4. **核心方法**
        return true;
    }
}

com.netflix.eureka.lease.Lease#renew

public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;

}

服务续约本质上就是: 将注册表内的服务实例的 lastUpdateTimestamp 最后更新时间戳刷新一下仅此而已