分类 JavaWeb 下的文章

JVM(更新中)

本文基于 JDK 8 HotSpot VM 撰写

1) Java 概述

1.1) JAVA 版本历史

图来转自 WikiPedia(https://zh.wikipedia.org/wiki/Java版本歷史)
2020-11-29-17.43.00.png

1.2) JVM LTS Version / GC 选型

  • JDK 8: 默认使用 Parallel Scavenge + Parallel Old, 配置充裕建议使用 G1
  • JDK 11: 默认使用 G1, 配置充裕建议使用 ZGC
  • JDK 17: 未发布

1.2) 主流开源 JVM 选型

-JVM-94c1cea74f36f661.png

2) JVM 结构

我们先来看看整体的结构:
JVM-2.jpg

2.1) 虚拟机栈

JVM_Stack-2.jpg

2.2) 方法区

方法区的存储结构比较复杂, 一部分在堆中, 一部分在元空间中
MethodArea-2.jpg

2.3)

JVM_Heap-2.jpg

堆中的 Young Gen / Old Gen 存于 JVM 虚拟内存中, Metspace 直接使用 本地内存(默认无限扩容).

2.3.1) 堆默认分配大小

  • 初始堆大小: 物理内存的 1 / 64
  • 最大堆大小: 物理内存的 1 / 4, 最大为 1G

2.3.2) 垃圾回收过程

Eureka Client 源码

本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。

本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写, 发现 EurekaClient 的源码变动幅度还是有点大的.

1) Eureka Client 启动流程

通常我们的 Eureka Client 是以 SpringBootStarter 引入的, 如: spring-cloud-starter-netflix-eureka-client.
那么一个正常的 Starter 依赖, 会遵循 SpringBoot 自动装配的原则, 在 spring.factories 中定义对应的 AutoConfiguration 自动装配类, 那我们就从这里开始找吧~

spring-cloud-netflix-eureka-client 依赖中找到 spring.factories 文件, 打开康康:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration

我几乎把他们全看了一遍, 核心自动装配类为: EurekaClientAutoConfiguration, 我们只关注他就好了.

1.1) EurekaClientAutoConfiguration 注解分析

@Configuration(proxyBeanMethods = false) // 1. 配置类, 关闭 Bean 代理
@EnableConfigurationProperties // 2. 注入 SpringBoot 配置
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) // 3. 默认开启自动装配
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
        CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
        "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", // 额外的可选参数自动配置
        "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", // 动态刷新 Config 相关
        // 内部定义了个刷新监听器有兴趣可以看看 -> EurekaClientConfigurationRefresher
        "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
        // 自动服务注册配置(以前用来更新上报服务状态, 好似已弃用)
        "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {

我们接着往下看, 看一下重点的方法:

1.2) EurekaClientAutoConfiguration 源码分析

  1. eurekaClient: 初始化 Eureka Client 实例
    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient

     @Bean(destroyMethod = "shutdown")
     @ConditionalOnMissingBean(value = EurekaClient.class,
             search = SearchStrategy.CURRENT)
     public EurekaClient eurekaClient(ApplicationInfoManager manager,
             EurekaClientConfig config) {
         return new CloudEurekaClient(manager, config, this.optionalArgs,
                 this.context);
     }

    往构造方法中走走看:
    org.springframework.cloud.netflix.eureka.CloudEurekaClient#CloudEurekaClient

     public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
             EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
             ApplicationEventPublisher publisher) {
         super(applicationInfoManager, config, args);
         this.applicationInfoManager = applicationInfoManager; // 注册所需信息类注入
         this.publisher = publisher;
         this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                 "eurekaTransport");
         ReflectionUtils.makeAccessible(this.eurekaTransportField);
     }    

    注入了一下注册所需的信息, 进入父类构造康康:
    com.netflix.discovery.DiscoveryClient#DiscoveryClient

     public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
         this(applicationInfoManager, config, args, ResolverUtils::randomize);
     }
    
     public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
         // 1. 这边继续掉了一个重载方法
         this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
             // 这里实际上实现了一个回调, 当访问不到默认 server 的时候, 回调到这个备份节点
             // **需要配置**
             private volatile BackupRegistry backupRegistryInstance;
    
             @Override
             public synchronized BackupRegistry get() {
                 if (backupRegistryInstance == null) {
                     String backupRegistryClassName = config.getBackupRegistryImpl();
                     if (null != backupRegistryClassName) {
                         try {
                             backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                             logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                         } catch (InstantiationException e) {
                             logger.error("Error instantiating BackupRegistry.", e);
                         } catch (IllegalAccessException e) {
                             logger.error("Error instantiating BackupRegistry.", e);
                         } catch (ClassNotFoundException e) {
                             logger.error("Error instantiating BackupRegistry.", e);
                         }
                     }
    
                     if (backupRegistryInstance == null) {
                         logger.warn("Using default backup registry implementation which does not do anything.");
                         backupRegistryInstance = new NotImplementedRegistryImpl();
                     }
                 }
    
                 return backupRegistryInstance;
             }
         }, randomizer);
     }
     @Inject
     DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                     Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
         // 2. 校验和引用, 略过
         if (args != null) {
             this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
             this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
             this.eventListeners.addAll(args.getEventListeners());
             this.preRegistrationHandler = args.preRegistrationHandler;
         } else {
             this.healthCheckCallbackProvider = null;
             this.healthCheckHandlerProvider = null;
             this.preRegistrationHandler = null;
         }
    
         this.applicationInfoManager = applicationInfoManager;
         InstanceInfo myInfo = applicationInfoManager.getInfo();
    
         clientConfig = config;
         staticClientConfig = clientConfig;
         transportConfig = config.getTransportConfig();
         instanceInfo = myInfo;
         if (myInfo != null) {
             appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
         } else {
             logger.warn("Setting instanceInfo to a passed in null value");
         }
    
         this.backupRegistryProvider = backupRegistryProvider;
         this.endpointRandomizer = endpointRandomizer;
         this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
         localRegionApps.set(new Applications());
    
         fetchRegistryGeneration = new AtomicLong(0);
    
         remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
         remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
         if (config.shouldFetchRegistry()) {
             this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
         } else {
             this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
         }
    
         if (config.shouldRegisterWithEureka()) {
             this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
         } else {
             this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
         }
    
         logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
         // 3. 如果同时关闭了 register-with-eureka 和 fetch-registry, return 掉
         if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
             logger.info("Client configured to neither register nor query for data.");
             scheduler = null;
             heartbeatExecutor = null;
             cacheRefreshExecutor = null;
             eurekaTransport = null;
             instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
             // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
             // to work with DI'd DiscoveryClient
             DiscoveryManager.getInstance().setDiscoveryClient(this);
             DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
             initTimestampMs = System.currentTimeMillis();
             initRegistrySize = this.getApplications().size();
             registrySize = initRegistrySize;
             logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                     initTimestampMs, initRegistrySize);
    
             return;  // no need to setup up an network tasks and we are done
         }
         // 4. 一些线程池创建
         try {
             // default size of 2 - 1 each for heartbeat and cacheRefresh
             scheduler = Executors.newScheduledThreadPool(2,
                     new ThreadFactoryBuilder()
                             .setNameFormat("DiscoveryClient-%d")
                             .setDaemon(true)
                             .build());
    
             heartbeatExecutor = new ThreadPoolExecutor(
                     1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                     new SynchronousQueue<Runnable>(),
                     new ThreadFactoryBuilder()
                             .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                             .setDaemon(true)
                             .build()
             );  // use direct handoff
    
             cacheRefreshExecutor = new ThreadPoolExecutor(
                     1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                     new SynchronousQueue<Runnable>(),
                     new ThreadFactoryBuilder()
                             .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                             .setDaemon(true)
                             .build()
             );  // use direct handoff
    
             eurekaTransport = new EurekaTransport();
             scheduleServerEndpointTask(eurekaTransport, args);
    
             AzToRegionMapper azToRegionMapper;
             if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                 azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
             } else {
                 azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
             }
             if (null != remoteRegionsToFetch.get()) {
                 azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
             }
             instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
         } catch (Throwable e) {
             throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
         }
         // 5. 若开启了 fetch-registry 执行
         if (clientConfig.shouldFetchRegistry()) {
             try {
                 // 6. **重点方法 fetchRegistry**, 拉取 server 注册表信息
                 boolean primaryFetchRegistryResult = fetchRegistry(false);
                 if (!primaryFetchRegistryResult) {
                     logger.info("Initial registry fetch from primary servers failed");
                 }
                 boolean backupFetchRegistryResult = true;
                 if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                     backupFetchRegistryResult = false;
                     logger.info("Initial registry fetch from backup servers failed");
                 }
                 if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                     throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                 }
             } catch (Throwable th) {
                 logger.error("Fetch registry error at startup: {}", th.getMessage());
                 throw new IllegalStateException(th);
             }
         }
    
         // call and execute the pre registration handler before all background tasks (inc registration) is started
         if (this.preRegistrationHandler != null) {
             this.preRegistrationHandler.beforeRegistration();
         }
         // 6. 若开启了 register-with-eureka 执行
         if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
             try {
                 if (!register() ) { // 7. **重点方法 register**, 向 server 发起注册请
                     throw new IllegalStateException("Registration error at startup. Invalid server response.");
                 }
             } catch (Throwable th) {
                 logger.error("Registration error at startup: {}", th.getMessage());
                 throw new IllegalStateException(th);
             }
         }
         // 8. **重点方法 initScheduledTasks** 初始化各调度任务
         initScheduledTasks();
    
         try {
             Monitors.registerObject(this);
         } catch (Throwable e) {
             logger.warn("Cannot register timers", e);
         }
    
         // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
         // to work with DI'd DiscoveryClient
         DiscoveryManager.getInstance().setDiscoveryClient(this);
         DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
         initTimestampMs = System.currentTimeMillis();
         initRegistrySize = this.getApplications().size();
         registrySize = initRegistrySize;
         logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                 initTimestampMs, initRegistrySize);
     }

    上面提到几个重点方法, fetchRegistryregister, 还有 initScheduledTasks, 我们来康康他们:

    1. fetchRegistry 拉取注册表

      
      private boolean fetchRegistry(boolean forceFullRegistryFetch) {
       Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
      
       try {
           // 1. 获取缓存中数据, 第一次进来为空
           Applications applications = getApplications();
           // 2. 判断是否需要拉取全量数据, 覆盖缓存
           if (clientConfig.shouldDisableDelta() // 开启 disable-delta
                   // 开启 registry-refresh-single-vip-address
                   || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                   || forceFullRegistryFetch // false
                   || (applications == null) // 第一次进方法时, 需要初始化
                   || (applications.getRegisteredApplications().size() == 0) // 缓存为空
                   || (applications.getVersion() == -1)) // 好似已经被弃用
           {
               logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
               logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
               logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
               logger.info("Application is null : {}", (applications == null));
               logger.info("Registered Applications size is zero : {}",
                       (applications.getRegisteredApplications().size() == 0));
               logger.info("Application version is -1: {}", (applications.getVersion() == -1));
               getAndStoreFullRegistry(); // 3. 获取全量数据
           } else {
               // 3. 获取增量数据
               getAndUpdateDelta(applications);
           }
           applications.setAppsHashCode(applications.getReconcileHashCode());
           logTotalInstances();
       } catch (Throwable e) {
           logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
           return false;
       } finally {
           if (tracer != null) {
               tracer.stop();
           }
       }
      
       // Notify about cache refresh before updating the instance remote status
       onCacheRefreshed();
      
       // Update remote status based on refreshed data held in the cache
       updateInstanceRemoteStatus();
      
       // registry was fetched successfully, so return true
       return true;
      }
     先来看看获取增量的代码: `com.netflix.discovery.DiscoveryClient#getAndUpdateDelta`
     ```java
     private void getAndUpdateDelta(Applications applications) throws Throwable {
         // 1. 获取当前注册表版本号
         long currentUpdateGeneration = fetchRegistryGeneration.get();
    
         Applications delta = null;
         // 2. 请求 server 的 apps/delta 获取增量数据(jerseyClient 实现)
         EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
         if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
             delta = httpResponse.getEntity();
         }
         // 3. 下面更新的代码就不讲了, 充分利用了 JUC
         if (delta == null) {
             logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                     + "Hence got the full registry.");
             getAndStoreFullRegistry();
         } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
             logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
             String reconcileHashCode = "";
             if (fetchRegistryUpdateLock.tryLock()) {
                 try {
                     updateDelta(delta);
                     reconcileHashCode = getReconcileHashCode(applications);
                 } finally {
                     fetchRegistryUpdateLock.unlock();
                 }
             } else {
                 logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
             }
             // There is a diff in number of instances for some reason
             if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                 reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
             }
         } else {
             logger.warn("Not updating application delta as another thread is updating it already");
             logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
         }
     }

    再来看看获取全量的代码:

     private void getAndStoreFullRegistry() throws Throwable {
         // 1. 获取当前注册表版本号
         long currentUpdateGeneration = fetchRegistryGeneration.get();
    
         logger.info("Getting all instance registry info from the eureka server");
    
         Applications apps = null;
         // 2. 判断是否配置了 registry-refresh-single-vip-address, 只对 VIP 感兴趣
         // 未配置则请求 server 的 apps/ 接口获取全量注册表
         EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                 ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                 : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
         if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
             apps = httpResponse.getEntity();
         }
         logger.info("The response status is {}", httpResponse.getStatusCode());
    
         if (apps == null) {
             logger.error("The application is null for some reason. Not storing this information");
         } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
             localRegionApps.set(this.filterAndShuffle(apps));
             logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
         } else {
             logger.warn("Not updating applications as another thread is updating it already");
         }
     }

    拉取服务的代码看完啦, 接着看看 register 注册服务
    2. register 注册服务
    com.netflix.discovery.DiscoveryClient#register

     boolean register() throws Throwable {
         logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
         EurekaHttpResponse<Void> httpResponse;
         try {
             // 1. 请求 server 的 apps/{appname} 注册服务
             httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
         } catch (Exception e) {
             logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
             throw e;
         }
         if (logger.isInfoEnabled()) {
             logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
         }
         return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
     }
    1. initScheduledTasks 初始化调度任务

      private void initScheduledTasks() {
       // 1. 若开启了 fetch-registry 执行此定时任务
       if (clientConfig.shouldFetchRegistry()) {
           // 2. 获取 registry-fetch-intereval-seconds 配置 
           int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
           int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
           cacheRefreshTask = new TimedSupervisorTask(
                   "cacheRefresh",
                   scheduler,
                   cacheRefreshExecutor,
                   registryFetchIntervalSeconds,
                   TimeUnit.SECONDS,
                   expBackOffBound,
                   // 3. 创建一个定时任务, 执行 **CacheRefreshThread** 业务
                   new CacheRefreshThread()
           );
           scheduler.schedule(
                   cacheRefreshTask, // 4. 使用上面取得的配置时间间隔执行
                   registryFetchIntervalSeconds, TimeUnit.SECONDS);
       }
       // 5. 若开启了 register-with-eureka 执行此定时任务
       if (clientConfig.shouldRegisterWithEureka()) {
           // 6. 获取 lease-renewal-interval-in-seconds 配置
           int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
           int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
           logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
      
           heartbeatTask = new TimedSupervisorTask(
                   "heartbeat",
                   scheduler,
                   heartbeatExecutor,
                   renewalIntervalInSecs,
                   TimeUnit.SECONDS,
                   expBackOffBound,
                   // 7. 创建定时任务, 执行 **HeartbeatThread** 任务
                   new HeartbeatThread()
           );
           scheduler.schedule(
                   heartbeatTask, / 8. 使用上面取得的心跳间隔执行
                   renewalIntervalInSecs, TimeUnit.SECONDS);
      
           // InstanceInfo replicator
           instanceInfoReplicator = new InstanceInfoReplicator(
                   this,
                   instanceInfo,
                   clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                   2); // burstSize
      
           statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
               @Override
               public String getId() {
                   return "statusChangeListener";
               }
      
               @Override
               public void notify(StatusChangeEvent statusChangeEvent) {
                   if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                           InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                       // log at warn level if DOWN was involved
                       logger.warn("Saw local status change event {}", statusChangeEvent);
                   } else {
                       logger.info("Saw local status change event {}", statusChangeEvent);
                   }
                   instanceInfoReplicator.onDemandUpdate();
               }
           };
      
           if (clientConfig.shouldOnDemandUpdateStatusChange()) {
               applicationInfoManager.registerStatusChangeListener(statusChangeListener);
           }
      
           instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
       } else {
           logger.info("Not registering with Eureka server per configuration");
       }
      }

      上面提到了两个任务, 分别是 CacheRefreshThreadHeartbeatThread, 我们看看具体的执行代码:

    2. CacheRefreshThread

      class CacheRefreshThread implements Runnable {
       public void run() {
           // 掉了外部类的方法
           refreshRegistry();
       }
      }
      @VisibleForTesting
      void refreshRegistry() {
       try {
           boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
      
           boolean remoteRegionsModified = false;
           // This makes sure that a dynamic change to remote regions to fetch is honored.
           String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
           if (null != latestRemoteRegions) {
               String currentRemoteRegions = remoteRegionsToFetch.get();
               if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                   // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                   synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                       if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                           String[] remoteRegions = latestRemoteRegions.split(",");
                           remoteRegionsRef.set(remoteRegions);
                           instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                           remoteRegionsModified = true;
                       } else {
                           logger.info("Remote regions to fetch modified concurrently," +
                                   " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                       }
                   }
               } else {
                   // Just refresh mapping to reflect any DNS/Property change
                   instanceRegionChecker.getAzToRegionMapper().refreshMapping();
               }
           }
           // 1. **定时执行 fetchRegistry**, 之前讲过这个方法!!
           boolean success = fetchRegistry(remoteRegionsModified);
           if (success) {
               registrySize = localRegionApps.get().size();
               // 2. 成功则更新最后成功更新时间戳
               lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
           }
      
           if (logger.isDebugEnabled()) {
               StringBuilder allAppsHashCodes = new StringBuilder();
               allAppsHashCodes.append("Local region apps hashcode: ");
               allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
               allAppsHashCodes.append(", is fetching remote regions? ");
               allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
               for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                   allAppsHashCodes.append(", Remote region: ");
                   allAppsHashCodes.append(entry.getKey());
                   allAppsHashCodes.append(" , apps hashcode: ");
                   allAppsHashCodes.append(entry.getValue().getAppsHashCode());
               }
               logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                       allAppsHashCodes);
           }
       } catch (Throwable e) {
           logger.error("Cannot fetch registry from server", e);
       }
      }
    3. HeartbeatThread

      private class HeartbeatThread implements Runnable {
      
       public void run() {
           if (renew()) { // 1. 调用 renew 方法发起续约
               // 2. 更新最后成功续约的时间戳
               lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
           }
       }
      }
      boolean renew() {
       EurekaHttpResponse<InstanceInfo> httpResponse;
       try {
           // 2. 向 server 的 apps/{appName}/{id} 接口发起续约请求
           httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
           logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
           // 3. 如果服务端没有找到本服务
           if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
               REREGISTER_COUNTER.increment();
               logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
               long timestamp = instanceInfo.setIsDirtyWithTime();
               // 4. 注册, 上面也讲过这个方法
               boolean success = register();
               if (success) {
                   instanceInfo.unsetIsDirty(timestamp);
               }
               return success;
           }
           return httpResponse.getStatusCode() == Status.OK.getStatusCode();
       } catch (Throwable e) {
           logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
           return false;
       }
      }

2) EurekaClient 服务下线

我们回顾一下 EurekaClientAUtoConfiguration 中的 eurekaClient 方法:
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
        search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
        EurekaClientConfig config) {
    return new CloudEurekaClient(manager, config, this.optionalArgs,
            this.context);
}

在 Java 应用正常关闭时, 会触发这个 shutdown 方法进行下线逻辑, 我们其实也可以手动去调用它手动下线服务:
com.netflix.discovery.DiscoveryClient#shutdown

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        // 1. 将之前提到的两个定时任务取消
        cancelScheduledTasks();

        // 2. 如果开启注册到 server, 则将自己状态改为 DOWN, 发送下线请求 
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
        // 3. 一些监控正常 shutdown
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient");
    }
}

本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。

Eureka Server 源码(服务接口)

本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写

观察 EurekaServer 的启动流程可以发现, 仪表盘的 WEB 服务, 是由 SpringMVC 提供的, 但是除了 SpringMVC, EurekaServer 还加载了 Jersey 发布 Restful 服务, 处理客户端请求, 我们先来看看加载 Bean 时的两个核心方法:

private static final String[] EUREKA_PACKAGES = new String[] {
        "com.netflix.discovery", "com.netflix.eureka" };

@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
        ResourceLoader resourceLoader) {
    // 1. 扫描器实例创建
    ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
            false, environment);
    // 2. 定义需要扫描的注解
    provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
    provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    // 3. 扫描指定包并将对象 put 进 classes 中
    Set<Class<?>> classes = new HashSet<>();
    for (String basePackage : EUREKA_PACKAGES) {
        Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
        for (BeanDefinition bd : beans) {
            Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                    resourceLoader.getClassLoader());
            classes.add(cls);
        }
    }

    // 4. 资源文件相关配置
    Map<String, Object> propsAndFeatures = new HashMap<>();
    propsAndFeatures.put(
            // Skip static content used by the webapp
            ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
            EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

    DefaultResourceConfig rc = new DefaultResourceConfig(classes);
    rc.setPropertiesAndFeatures(propsAndFeatures);

    return rc;
}

@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(
        javax.ws.rs.core.Application eurekaJerseyApp) { // 1. 注入上个个方法的实例
    FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
    bean.setFilter(new ServletContainer(eurekaJerseyApp));
    bean.setOrder(Ordered.LOWEST_PRECEDENCE);
    bean.setUrlPatterns(
            Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

    return bean;
}

通过上述源码得知, EurekaServer 配置 Jersey 会扫描 com.netflix.discoverycom.netflix.eureka 包下, 包含 @Path@Provider 注解的类注册为 WEB 服务, 我们来看看主要的几个接口:

1) ApplicationResource.addInstance 注册服务

com.netflix.eureka.resources.ApplicationResource#addInstance

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // 1. 实例参数校验
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getIPAddr())) {
        return Response.status(400).entity("Missing ip address").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }
    // 2. 错误数据纠正处理
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }
    // 3. 注册
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

我们接着第 3 步注册方法往里走(只看集群实现):
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    // 1. 服务失效事件设置, 客户端未配置则使用默认
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 2. 注册服务
    super.register(info, leaseDuration, isReplication);
    // 3. 将服务复制给其他对等节点
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

注册的方法我们就不看了, 接着看看第 3 步的复制代码:

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    // 1. 监控相关
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // 如已复制则不走了
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 2. 遍历所有对等节点
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 3. 如果遍历到自己, 忽略
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 4. 复制给对等节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

private void replicateInstanceActionsToPeers(Action action, String appName,
                                              String id, InstanceInfo info, InstanceStatus newStatus,
                                              PeerEurekaNode node) {
     try {
         InstanceInfo infoFromRegistry;
         CurrentRequestVersion.set(Version.V2);
         switch (action) {
             case Cancel: // 取消实例
                 node.cancel(appName, id);
                 break;
             case Heartbeat: // 心跳续约
                 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                 node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                 break;
             case Register: // 注册实例
                 node.register(info);
                 break;
             case StatusUpdate: // 状态更新
                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                 node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                 break;
             case DeleteStatusOverride: // 删除覆盖状态
                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                 node.deleteStatusOverride(appName, id, infoFromRegistry);
                 break;
         }
     } catch (Throwable t) {
         logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
     } finally {
         CurrentRequestVersion.remove();
     }
 }

2) InstanceResource.renewLease 续约服务实例

com.netflix.eureka.resources.InstanceResource#renewLease

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // **核心方法**
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    Response response;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    return response;
}

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) { // 1. 续约
        // 2. 复制给对等节点
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

com.netflix.eureka.registry.AbstractInstanceRegistry#renew

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // 1. 获取服务实例集合
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id); // 2. 获取实例
    }
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // 3. 获取实例状态, 不正常处理
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        renewsLastMin.increment();
        leaseToRenew.renew(); // 4. **核心方法**
        return true;
    }
}

com.netflix.eureka.lease.Lease#renew

public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;

}

服务续约本质上就是: 将注册表内的服务实例的 lastUpdateTimestamp 最后更新时间戳刷新一下仅此而已

本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。

Eureka Server 启动流程

本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写

1) Eureka Server 启动流程

通常我们的 Eureka Server 是以 SpringBootStarter 引入的, 如: spring-cloud-starter-netflix-eureka-server.
那么一个正常的 Starter 依赖, 会遵循 SpringBoot 自动装配的原则, 在 spring.factories 中定义对应的 AutoConfiguration 自动装配类, 那我们就从这里开始找吧~

spring-cloud-netflix-eureka-server 依赖中找到 spring.factories 文件, 打开康康:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

文件中只定义了一个自动装配类: EurekaServerAutoConfiguration, 即 EurekaServer 的启动流程关键类

1.1) EurekaServerAutoConfiguration 注解分析

// 1. 标注为配置类, 且关闭 Bean 的代理(提高启动速度)
@Configuration(proxyBeanMethods = false)
// 2. **1.1.2**
@Import(EurekaServerInitializerConfiguration.class)
// 3. 当容器中存在 EurekaServerMarkerConfiguration.Marker 时才加载此配置类, **1.1.1**
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {

看完注解信息, 我们将会分三节来讲 EurekaServer 的启动流程:

  • 1.2 开启自动装配条件
  • 1.3 EurekaServerAutoConfiguration 类分析
  • 1.4 自动装配结束后续操作 @Import(EurekaServerInitializerConfiguration.class)

1.2) Eureka Server 开启自动装配条件

EurekaServerAutoConfiguration 中定义了一个 @ConditionalOnBean, 自动装配的前提是容器中存在一个 Marker 对象, 哪儿来的呢? @EnableEurekaServer 注解中 Import 的:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}

所以我们正常在启动类上加的 @EnableEurekaServer 即开启自动装配的前提.

1.3) EurekaServerAutoConfiguration 自动装配类分析

本小节会讲一些自动装配类中重要或者可扩展的方法点:

  1. EurekaServer 仪表盘
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#eurekaController

    @Bean
    @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
         matchIfMissing = true)
    public EurekaController eurekaController() {
     return new EurekaController(this.applicationInfoManager);
    }
    @RequestMapping("${eureka.dashboard.path:/}")
    public class EurekaController {
  2. EurekaController 是 Server 仪表盘的实现 Controller, 可以通过配置 eureka.dashboard=false 关闭

  3. 可以通过配置 eureka.dashboard.path 修改仪表盘的基路由

  4. PeerAwareInstanceRegistry 对等服务实例注册表
    用于获取其他 Server 实例的信息, 如用于注册表复制
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#peerAwareInstanceRegistry

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
         ServerCodecs serverCodecs) {
     this.eurekaClient.getApplications(); // force initialization
     return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
             serverCodecs, this.eurekaClient,
             this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
             this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }
  5. PeerEurekaNodes 对等节点
    用于封装, 操作对等节点相关信息
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#peerEurekaNodes

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
         ServerCodecs serverCodecs,
         ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
     return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
             this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
             replicationClientAdditionalFilters);
    }

    其中的 start 方法可用于启动一个, 用于同步对等节点的 Task, 这里用到了一个单例线程池, 可以配置 peer-eureka-nodes-update-interval-ms=600000 来修改同步对等节点间隔时间
    com.netflix.eureka.cluster.PeerEurekaNodes#start

    public void start() {
     taskExecutor = Executors.newSingleThreadScheduledExecutor(
             new ThreadFactory() {
                 @Override
                 public Thread newThread(Runnable r) {
                     Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                     thread.setDaemon(true);
                     return thread;
                 }
             }
     );
     try {
         updatePeerEurekaNodes(resolvePeerUrls());
         Runnable peersUpdateTask = new Runnable() {
             @Override
             public void run() {
                 try {
                     updatePeerEurekaNodes(resolvePeerUrls());
                 } catch (Throwable e) {
                     logger.error("Cannot update the replica Nodes", e);
                 }
    
             }
         };
         taskExecutor.scheduleWithFixedDelay(
                 peersUpdateTask,
                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                 TimeUnit.MILLISECONDS
         );
     } catch (Exception e) {
         throw new IllegalStateException(e);
     }
     for (PeerEurekaNode node : peerEurekaNodes) {
         logger.info("Replica node URL:  {}", node.getServiceUrl());
     }
    }
  6. EurekaServerContext EurekaServer 上下文
    封装了 EurekaServer 的包括前面提到的各个组件, 默认使用 DefaultEurekaServerContext
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#eurekaServerContext

    @Bean
    @ConditionalOnMissingBean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
         PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
     return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
             registry, peerEurekaNodes, this.applicationInfoManager);
    }

    其内有一个 initialize() 方法会在 Spring Bean 创建完成后, 调用上小节中的定时任务 start 方法同步对等节点的信息.

  7. EurekaServerBootstrap EurekaServer 引导程序
    也封装了各个组件, 在 1.4 节中会使用到它进行一些结尾初始化配置
    org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
         EurekaServerContext serverContext) {
     return new EurekaServerBootstrap(this.applicationInfoManager,
             this.eurekaClientConfig, this.eurekaServerConfig, registry,
             serverContext);
    }
  8. jerseyFilterRegistration/jerseyApplication Jersey 相关
    EurekaServer 使用 Jersey 代替 SpringMVC 发布 Restful 服务

    @Bean
     public FilterRegistrationBean<?> jerseyFilterRegistration(
             javax.ws.rs.core.Application eurekaJerseyApp) {
         FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
         bean.setFilter(new ServletContainer(eurekaJerseyApp));
         bean.setOrder(Ordered.LOWEST_PRECEDENCE);
         bean.setUrlPatterns(
                 Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    
         return bean;
     }
    
     /**
      * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
      * required by the Eureka server.
      * @param environment an {@link Environment} instance to retrieve classpath resources
      * @param resourceLoader a {@link ResourceLoader} instance to get classloader from
      * @return created {@link Application} object
      */
     @Bean
     public javax.ws.rs.core.Application jerseyApplication(Environment environment,
             ResourceLoader resourceLoader) {
    
         ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
                 false, environment);
    
         // Filter to include only classes that have a particular annotation.
         //
         provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
         provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
         // Find classes in Eureka packages (or subpackages)
         //
         Set<Class<?>> classes = new HashSet<>();
         for (String basePackage : EUREKA_PACKAGES) {
             Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
             for (BeanDefinition bd : beans) {
                 Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                         resourceLoader.getClassLoader());
                 classes.add(cls);
             }
         }
    
         // Construct the Jersey ResourceConfig
         Map<String, Object> propsAndFeatures = new HashMap<>();
         propsAndFeatures.put(
                 // Skip static content used by the webapp
                 ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
                 EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
         DefaultResourceConfig rc = new DefaultResourceConfig(classes);
         rc.setPropertiesAndFeatures(propsAndFeatures);
    
         return rc;
     }

1.4) @Import(EurekaServerInitializerConfiguration.class)

配置类上的 @Import 引入的类, 会在当前配置类加载结束后加载, 我们来康康自动装配加载后做的事儿:

  1. 注解以及接口实现
    org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration
    @Configuration(proxyBeanMethods = false)
    public class EurekaServerInitializerConfiguration
         implements ServletContextAware, SmartLifecycle, Ordered {

发现这也是一个配置类, 且实现了 SmartLifecycle 接口, 会在 Spring 初始化完成后执行 start 方法, 其他方法不太重要, 直接来看 start 的代码:
org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration#start

@Override
public void setServletContext(ServletContext servletContext) {
    1. 实现 ServletContextAware 的 setServletContext 方法, **注入了 ServletContext 供于使用**
    this.servletContext = servletContext;
}

public void start() {
    new Thread(() -> {
        try {
            // 2. 调用 eurekaServerBootstrap.contextInitialized 初始化完成方法
            eurekaServerBootstrap.contextInitialized(
                    EurekaServerInitializerConfiguration.this.servletContext);
            log.info("Started Eureka Server");
            // 3. 发布注册表可用事件
            publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
            EurekaServerInitializerConfiguration.this.running = true;
            // 4. 广播 EurekaServer 启动完成事件
            publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
        }
        catch (Exception ex) {
            log.error("Could not initialize Eureka servlet context", ex);
        }
    }).start();
}

可以看到 3 和 4 步, 发布了两个事件, 在 EurekaServer 中其实并没有针对这两个事件做捕获, 所以是两个扩展点, 可以用 Spring 的监听器捕获这两个事件扩展业务.

  1. 我们看看第 2 步中的 contextInitialized 方法, 将 servletContext 传入, 源码:
    org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized

    public void contextInitialized(ServletContext context) {
     try {
         initEurekaEnvironment(); // 1. 初始化环境参数, 不重要不深入了
         initEurekaServerContext(); // 2. 初始化上下文
    
         context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
     }
     catch (Throwable e) {
         log.error("Cannot bootstrap eureka server :", e);
         throw new RuntimeException("Cannot bootstrap eureka server :", e);
     }
    }
  2. 我们接着来看看重点方法 initEurekaServerContext() 初始化:
    org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#initEurekaServerContext

protected PeerAwareInstanceRegistry registry;

protected void initEurekaServerContext() throws Exception {
    // 1. 转换器兼容性参数配置
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);
    // AWS 云主机相关配置
    if (isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }
    // 注入 serverContext
    EurekaServerContextHolder.initialize(this.serverContext);

    log.info("Initialized server context");

    // 2. 从其他节点复制注册表
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);

    // 监控统计登记
    EurekaMonitors.registerAllStats();
}
  1. 同步注册表 syncUp:
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
    @Override
    public int syncUp() {
     int count = 0;
     // 1. 通过 for 循环重试获取注册表, 配置文件中 registry-sync-retries=0 控制重试次数
     for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
         if (i > 0) { // 重试
             try {
                 // 配置文件中 registry-sync-retry-wait-ms 控制重试间隔
                 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
             } catch (InterruptedException e) {
                 logger.warn("Interrupted during registry transfer..");
                 break;
             }
         }
         // 2. 调用 Client 获取配置节点的注册表, 注册到本节点中
         Applications apps = eurekaClient.getApplications();
         for (Application app : apps.getRegisteredApplications()) {
             for (InstanceInfo instance : app.getInstances()) {
                 try {
                     if (isRegisterable(instance)) {
                         // 3. 尝试复制注册表, 注册到本节点中
                         register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                         count++;
                     }
                 } catch (Throwable t) {
                     logger.error("During DS init copy", t);
                 }
             }
         }
     }
     return count; # 返回 Server 节点数量
    }

复制注册表的方法 register 代码量有点多, 这里不深入了, 简单了解以下注册表的数据结构:
com.netflix.eureka.registry.AbstractInstanceRegistry#register

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  1. openForTraffic 开启同步阀门方法, 其有两个实现(单/多实例), 这里只看多示例的实现:
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
// 需要发送心跳的客户端数量
protected volatile int expectedNumberOfClientsSendingRenews;

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 1. 自我保护阈值计算
    this.expectedNumberOfClientsSendingRenews = count;
    updateRenewsPerMinThreshold();

    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 2. 实例状态更改为 UP
    super.postInit(); // 3. 定时任务剔除服务
}

Eureka Server 的启动流程结束.

本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。

Spring Boot 2.2 后, spring-boot-starter-testJUnit 的版本升级至 JUnit 5, 传统的测试类写法就需要变变啦.

  1. 原来最常用的 RunWith 注解由 @ExtendWith 代替.
  2. 第二个最常用的 @SpringBootTest, 内部注解有点变化:
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @BootstrapWith(SpringBootTestContextBootstrapper.class)
    @ExtendWith({SpringExtension.class})
    public @interface SpringBootTest {

    有没有发现 ~ 内置了 @ExtendWith 注解, HiaHiaHia, 只需要写一个 @SpringBootTest 了, 真香!

Title - Artist
0:00