本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。

Eureka Server 启动流程

本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写

1) Eureka Server 启动流程

通常我们的 Eureka Server 是以 SpringBootStarter 引入的, 如: spring-cloud-starter-netflix-eureka-server.
那么一个正常的 Starter 依赖, 会遵循 SpringBoot 自动装配的原则, 在 spring.factories 中定义对应的 AutoConfiguration 自动装配类, 那我们就从这里开始找吧~

spring-cloud-netflix-eureka-server 依赖中找到 spring.factories 文件, 打开康康:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

文件中只定义了一个自动装配类: EurekaServerAutoConfiguration, 即 EurekaServer 的启动流程关键类

1.1) EurekaServerAutoConfiguration 注解分析

// 1. 标注为配置类, 且关闭 Bean 的代理(提高启动速度)
@Configuration(proxyBeanMethods = false)
// 2. **1.1.2**
@Import(EurekaServerInitializerConfiguration.class)
// 3. 当容器中存在 EurekaServerMarkerConfiguration.Marker 时才加载此配置类, **1.1.1**
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {

看完注解信息, 我们将会分三节来讲 EurekaServer 的启动流程:

  • 1.2 开启自动装配条件
  • 1.3 EurekaServerAutoConfiguration 类分析
  • 1.4 自动装配结束后续操作 @Import(EurekaServerInitializerConfiguration.class)

1.2) Eureka Server 开启自动装配条件

EurekaServerAutoConfiguration 中定义了一个 @ConditionalOnBean, 自动装配的前提是容器中存在一个 Marker 对象, 哪儿来的呢? @EnableEurekaServer 注解中 Import 的:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}

所以我们正常在启动类上加的 @EnableEurekaServer 即开启自动装配的前提.

1.3) EurekaServerAutoConfiguration 自动装配类分析

本小节会讲一些自动装配类中重要或者可扩展的方法点:

  1. EurekaServer 仪表盘
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#eurekaController

    @Bean
    @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
        matchIfMissing = true)
    public EurekaController eurekaController() {
    return new EurekaController(this.applicationInfoManager);
    }
    @RequestMapping("${eureka.dashboard.path:/}")
    public class EurekaController {
    1. EurekaController 是 Server 仪表盘的实现 Controller, 可以通过配置 eureka.dashboard=false 关闭
    2. 可以通过配置 eureka.dashboard.path 修改仪表盘的基路由
  2. PeerAwareInstanceRegistry 对等服务实例注册表
    用于获取其他 Server 实例的信息, 如用于注册表复制
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#peerAwareInstanceRegistry

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
        ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }
  3. PeerEurekaNodes 对等节点
    用于封装, 操作对等节点相关信息
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#peerEurekaNodes

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
        ServerCodecs serverCodecs,
        ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
    return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
            this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
            replicationClientAdditionalFilters);
    }

    其中的 start 方法可用于启动一个, 用于同步对等节点的 Task, 这里用到了一个单例线程池, 可以配置 peer-eureka-nodes-update-interval-ms=600000 来修改同步对等节点间隔时间
    com.netflix.eureka.cluster.PeerEurekaNodes#start

    public void start() {
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }
    
            }
        };
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
    }
  4. EurekaServerContext EurekaServer 上下文
    封装了 EurekaServer 的包括前面提到的各个组件, 默认使用 DefaultEurekaServerContext
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#eurekaServerContext

    @Bean
    @ConditionalOnMissingBean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
        PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
            registry, peerEurekaNodes, this.applicationInfoManager);
    }

    其内有一个 initialize() 方法会在 Spring Bean 创建完成后, 调用上小节中的定时任务 start 方法同步对等节点的信息.

  5. EurekaServerBootstrap EurekaServer 引导程序
    也封装了各个组件, 在 1.4 节中会使用到它进行一些结尾初始化配置
    org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
        EurekaServerContext serverContext) {
    return new EurekaServerBootstrap(this.applicationInfoManager,
            this.eurekaClientConfig, this.eurekaServerConfig, registry,
            serverContext);
    }
  6. jerseyFilterRegistration/jerseyApplication Jersey 相关
    EurekaServer 使用 Jersey 代替 SpringMVC 发布 Restful 服务

    @Bean
    public FilterRegistrationBean<?> jerseyFilterRegistration(
            javax.ws.rs.core.Application eurekaJerseyApp) {
        FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
        bean.setFilter(new ServletContainer(eurekaJerseyApp));
        bean.setOrder(Ordered.LOWEST_PRECEDENCE);
        bean.setUrlPatterns(
                Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    
        return bean;
    }
    
    /**
     * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
     * required by the Eureka server.
     * @param environment an {@link Environment} instance to retrieve classpath resources
     * @param resourceLoader a {@link ResourceLoader} instance to get classloader from
     * @return created {@link Application} object
     */
    @Bean
    public javax.ws.rs.core.Application jerseyApplication(Environment environment,
            ResourceLoader resourceLoader) {
    
        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
                false, environment);
    
        // Filter to include only classes that have a particular annotation.
        //
        provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
        provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
        // Find classes in Eureka packages (or subpackages)
        //
        Set<Class<?>> classes = new HashSet<>();
        for (String basePackage : EUREKA_PACKAGES) {
            Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
            for (BeanDefinition bd : beans) {
                Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                        resourceLoader.getClassLoader());
                classes.add(cls);
            }
        }
    
        // Construct the Jersey ResourceConfig
        Map<String, Object> propsAndFeatures = new HashMap<>();
        propsAndFeatures.put(
                // Skip static content used by the webapp
                ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
                EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
        DefaultResourceConfig rc = new DefaultResourceConfig(classes);
        rc.setPropertiesAndFeatures(propsAndFeatures);
    
        return rc;
    }

1.4) @Import(EurekaServerInitializerConfiguration.class)

配置类上的 @Import 引入的类, 会在当前配置类加载结束后加载, 我们来康康自动装配加载后做的事儿:

  1. 注解以及接口实现
    org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration

    @Configuration(proxyBeanMethods = false)
    public class EurekaServerInitializerConfiguration
        implements ServletContextAware, SmartLifecycle, Ordered {

发现这也是一个配置类, 且实现了 SmartLifecycle 接口, 会在 Spring 初始化完成后执行 start 方法, 其他方法不太重要, 直接来看 start 的代码:
org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration#start

@Override
public void setServletContext(ServletContext servletContext) {
    1. 实现 ServletContextAware 的 setServletContext 方法, **注入了 ServletContext 供于使用**
    this.servletContext = servletContext;
}

public void start() {
    new Thread(() -> {
        try {
            // 2. 调用 eurekaServerBootstrap.contextInitialized 初始化完成方法
            eurekaServerBootstrap.contextInitialized(
                    EurekaServerInitializerConfiguration.this.servletContext);
            log.info("Started Eureka Server");
            // 3. 发布注册表可用事件
            publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
            EurekaServerInitializerConfiguration.this.running = true;
            // 4. 广播 EurekaServer 启动完成事件
            publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
        }
        catch (Exception ex) {
            log.error("Could not initialize Eureka servlet context", ex);
        }
    }).start();
}

可以看到 3 和 4 步, 发布了两个事件, 在 EurekaServer 中其实并没有针对这两个事件做捕获, 所以是两个扩展点, 可以用 Spring 的监听器捕获这两个事件扩展业务.

  1. 我们看看第 2 步中的 contextInitialized 方法, 将 servletContext 传入, 源码:
    org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized

    public void contextInitialized(ServletContext context) {
    try {
        initEurekaEnvironment(); // 1. 初始化环境参数, 不重要不深入了
        initEurekaServerContext(); // 2. 初始化上下文
    
        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
        log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
    }
  2. 我们接着来看看重点方法 initEurekaServerContext() 初始化:
    org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#initEurekaServerContext

protected PeerAwareInstanceRegistry registry;

protected void initEurekaServerContext() throws Exception {
    // 1. 转换器兼容性参数配置
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);
    // AWS 云主机相关配置
    if (isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }
    // 注入 serverContext
    EurekaServerContextHolder.initialize(this.serverContext);

    log.info("Initialized server context");

    // 2. 从其他节点复制注册表
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);

    // 监控统计登记
    EurekaMonitors.registerAllStats();
}
  1. 同步注册表 syncUp:
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp

    @Override
    public int syncUp() {
    int count = 0;
    // 1. 通过 for 循环重试获取注册表, 配置文件中 registry-sync-retries=0 控制重试次数
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) { // 重试
            try {
                // 配置文件中 registry-sync-retry-wait-ms 控制重试间隔
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // 2. 调用 Client 获取配置节点的注册表, 注册到本节点中
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        // 3. 尝试复制注册表, 注册到本节点中
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count; # 返回 Server 节点数量
    }

复制注册表的方法 register 代码量有点多, 这里不深入了, 简单了解以下注册表的数据结构:
com.netflix.eureka.registry.AbstractInstanceRegistry#register

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  1. openForTraffic 开启同步阀门方法, 其有两个实现(单/多实例), 这里只看多示例的实现:
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
// 需要发送心跳的客户端数量
protected volatile int expectedNumberOfClientsSendingRenews;

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 1. 自我保护阈值计算
    this.expectedNumberOfClientsSendingRenews = count;
    updateRenewsPerMinThreshold();

    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 2. 实例状态更改为 UP
    super.postInit(); // 3. 定时任务剔除服务
}

Eureka Server 的启动流程结束.