Eureka Client 源码

本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。

本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写, 发现 EurekaClient 的源码变动幅度还是有点大的.

1) Eureka Client 启动流程

通常我们的 Eureka Client 是以 SpringBootStarter 引入的, 如: spring-cloud-starter-netflix-eureka-client.
那么一个正常的 Starter 依赖, 会遵循 SpringBoot 自动装配的原则, 在 spring.factories 中定义对应的 AutoConfiguration 自动装配类, 那我们就从这里开始找吧~

spring-cloud-netflix-eureka-client 依赖中找到 spring.factories 文件, 打开康康:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration

我几乎把他们全看了一遍, 核心自动装配类为: EurekaClientAutoConfiguration, 我们只关注他就好了.

1.1) EurekaClientAutoConfiguration 注解分析

@Configuration(proxyBeanMethods = false) // 1. 配置类, 关闭 Bean 代理
@EnableConfigurationProperties // 2. 注入 SpringBoot 配置
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) // 3. 默认开启自动装配
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
        CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
        "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", // 额外的可选参数自动配置
        "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", // 动态刷新 Config 相关
        // 内部定义了个刷新监听器有兴趣可以看看 -> EurekaClientConfigurationRefresher
        "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
        // 自动服务注册配置(以前用来更新上报服务状态, 好似已弃用)
        "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {

我们接着往下看, 看一下重点的方法:

1.2) EurekaClientAutoConfiguration 源码分析

  1. eurekaClient: 初始化 Eureka Client 实例
    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class,
            search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
            EurekaClientConfig config) {
        return new CloudEurekaClient(manager, config, this.optionalArgs,
                this.context);
    }

    往构造方法中走走看:
    org.springframework.cloud.netflix.eureka.CloudEurekaClient#CloudEurekaClient

    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
            EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
            ApplicationEventPublisher publisher) {
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager; // 注册所需信息类注入
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }    

    注入了一下注册所需的信息, 进入父类构造康康:
    com.netflix.discovery.DiscoveryClient#DiscoveryClient

    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
        this(applicationInfoManager, config, args, ResolverUtils::randomize);
    }
    
    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        // 1. 这边继续掉了一个重载方法
        this(applicationInfoManager, config, args, new Provider() {
            // 这里实际上实现了一个回调, 当访问不到默认 server 的时候, 回调到这个备份节点
            // **需要配置**
            private volatile BackupRegistry backupRegistryInstance;
    
            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (IllegalAccessException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (ClassNotFoundException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        }
                    }
    
                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }
    
                return backupRegistryInstance;
            }
        }, randomizer);
    }
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        // 2. 校验和引用, 略过
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
    
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();
    
        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }
    
        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());
    
        fetchRegistryGeneration = new AtomicLong(0);
    
        remoteRegionsToFetch = new AtomicReference(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
        // 3. 如果同时关闭了 register-with-eureka 和 fetch-registry, return 掉
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            initRegistrySize = this.getApplications().size();
            registrySize = initRegistrySize;
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);
    
            return;  // no need to setup up an network tasks and we are done
        }
        // 4. 一些线程池创建
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
    
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);
    
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
        // 5. 若开启了 fetch-registry 执行
        if (clientConfig.shouldFetchRegistry()) {
            try {
                // 6. **重点方法 fetchRegistry**, 拉取 server 注册表信息
                boolean primaryFetchRegistryResult = fetchRegistry(false);
                if (!primaryFetchRegistryResult) {
                    logger.info("Initial registry fetch from primary servers failed");
                }
                boolean backupFetchRegistryResult = true;
                if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                    backupFetchRegistryResult = false;
                    logger.info("Initial registry fetch from backup servers failed");
                }
                if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                    throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                }
            } catch (Throwable th) {
                logger.error("Fetch registry error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
    
        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
        // 6. 若开启了 register-with-eureka 执行
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) { // 7. **重点方法 register**, 向 server 发起注册请
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
        // 8. **重点方法 initScheduledTasks** 初始化各调度任务
        initScheduledTasks();
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }
    
        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
        initTimestampMs = System.currentTimeMillis();
        initRegistrySize = this.getApplications().size();
        registrySize = initRegistrySize;
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);
    }

    上面提到几个重点方法, fetchRegistryregister, 还有 initScheduledTasks, 我们来康康他们:

    1. fetchRegistry 拉取注册表

      
      private boolean fetchRegistry(boolean forceFullRegistryFetch) {
      Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
      
      try {
          // 1. 获取缓存中数据, 第一次进来为空
          Applications applications = getApplications();
          // 2. 判断是否需要拉取全量数据, 覆盖缓存
          if (clientConfig.shouldDisableDelta() // 开启 disable-delta
                  // 开启 registry-refresh-single-vip-address
                  || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                  || forceFullRegistryFetch // false
                  || (applications == null) // 第一次进方法时, 需要初始化
                  || (applications.getRegisteredApplications().size() == 0) // 缓存为空
                  || (applications.getVersion() == -1)) // 好似已经被弃用
          {
              logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
              logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
              logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
              logger.info("Application is null : {}", (applications == null));
              logger.info("Registered Applications size is zero : {}",
                      (applications.getRegisteredApplications().size() == 0));
              logger.info("Application version is -1: {}", (applications.getVersion() == -1));
              getAndStoreFullRegistry(); // 3. 获取全量数据
          } else {
              // 3. 获取增量数据
              getAndUpdateDelta(applications);
          }
          applications.setAppsHashCode(applications.getReconcileHashCode());
          logTotalInstances();
      } catch (Throwable e) {
          logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
          return false;
      } finally {
          if (tracer != null) {
              tracer.stop();
          }
      }
      
      // Notify about cache refresh before updating the instance remote status
      onCacheRefreshed();
      
      // Update remote status based on refreshed data held in the cache
      updateInstanceRemoteStatus();
      
      // registry was fetched successfully, so return true
      return true;
      }
    先来看看获取增量的代码: com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
    ```java
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        // 1. 获取当前注册表版本号
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        Applications delta = null;
        // 2. 请求 server 的 apps/delta 获取增量数据(jerseyClient 实现)
        EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
        // 3. 下面更新的代码就不讲了, 充分利用了 JUC
        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

    再来看看获取全量的代码:

    private void getAndStoreFullRegistry() throws Throwable {
        // 1. 获取当前注册表版本号
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        logger.info("Getting all instance registry info from the eureka server");
    
        Applications apps = null;
        // 2. 判断是否配置了 registry-refresh-single-vip-address, 只对 VIP 感兴趣
        // 未配置则请求 server 的 apps/ 接口获取全量注册表
        EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());
    
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

    拉取服务的代码看完啦, 接着看看 register 注册服务

    1. register 注册服务
      com.netflix.discovery.DiscoveryClient#register

      boolean register() throws Throwable {
      logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
      EurekaHttpResponse httpResponse;
      try {
          // 1. 请求 server 的 apps/{appname} 注册服务
          httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
      } catch (Exception e) {
          logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
          throw e;
      }
      if (logger.isInfoEnabled()) {
          logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
      }
      return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
      }
    2. initScheduledTasks 初始化调度任务

      private void initScheduledTasks() {
      // 1. 若开启了 fetch-registry 执行此定时任务
      if (clientConfig.shouldFetchRegistry()) {
          // 2. 获取 registry-fetch-intereval-seconds 配置 
          int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
          int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
          cacheRefreshTask = new TimedSupervisorTask(
                  "cacheRefresh",
                  scheduler,
                  cacheRefreshExecutor,
                  registryFetchIntervalSeconds,
                  TimeUnit.SECONDS,
                  expBackOffBound,
                  // 3. 创建一个定时任务, 执行 **CacheRefreshThread** 业务
                  new CacheRefreshThread()
          );
          scheduler.schedule(
                  cacheRefreshTask, // 4. 使用上面取得的配置时间间隔执行
                  registryFetchIntervalSeconds, TimeUnit.SECONDS);
      }
      // 5. 若开启了 register-with-eureka 执行此定时任务
      if (clientConfig.shouldRegisterWithEureka()) {
          // 6. 获取 lease-renewal-interval-in-seconds 配置
          int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
          int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
          logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
      
          heartbeatTask = new TimedSupervisorTask(
                  "heartbeat",
                  scheduler,
                  heartbeatExecutor,
                  renewalIntervalInSecs,
                  TimeUnit.SECONDS,
                  expBackOffBound,
                  // 7. 创建定时任务, 执行 **HeartbeatThread** 任务
                  new HeartbeatThread()
          );
          scheduler.schedule(
                  heartbeatTask, / 8. 使用上面取得的心跳间隔执行
                  renewalIntervalInSecs, TimeUnit.SECONDS);
      
          // InstanceInfo replicator
          instanceInfoReplicator = new InstanceInfoReplicator(
                  this,
                  instanceInfo,
                  clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                  2); // burstSize
      
          statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
              @Override
              public String getId() {
                  return "statusChangeListener";
              }
      
              @Override
              public void notify(StatusChangeEvent statusChangeEvent) {
                  if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                          InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                      // log at warn level if DOWN was involved
                      logger.warn("Saw local status change event {}", statusChangeEvent);
                  } else {
                      logger.info("Saw local status change event {}", statusChangeEvent);
                  }
                  instanceInfoReplicator.onDemandUpdate();
              }
          };
      
          if (clientConfig.shouldOnDemandUpdateStatusChange()) {
              applicationInfoManager.registerStatusChangeListener(statusChangeListener);
          }
      
          instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
      } else {
          logger.info("Not registering with Eureka server per configuration");
      }
      }

      上面提到了两个任务, 分别是 CacheRefreshThreadHeartbeatThread, 我们看看具体的执行代码:

    3. CacheRefreshThread

      class CacheRefreshThread implements Runnable {
      public void run() {
          // 掉了外部类的方法
          refreshRegistry();
      }
      }
      @VisibleForTesting
      void refreshRegistry() {
      try {
          boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
      
          boolean remoteRegionsModified = false;
          // This makes sure that a dynamic change to remote regions to fetch is honored.
          String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
          if (null != latestRemoteRegions) {
              String currentRemoteRegions = remoteRegionsToFetch.get();
              if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                  // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                  synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                      if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                          String[] remoteRegions = latestRemoteRegions.split(",");
                          remoteRegionsRef.set(remoteRegions);
                          instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                          remoteRegionsModified = true;
                      } else {
                          logger.info("Remote regions to fetch modified concurrently," +
                                  " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                      }
                  }
              } else {
                  // Just refresh mapping to reflect any DNS/Property change
                  instanceRegionChecker.getAzToRegionMapper().refreshMapping();
              }
          }
          // 1. **定时执行 fetchRegistry**, 之前讲过这个方法!!
          boolean success = fetchRegistry(remoteRegionsModified);
          if (success) {
              registrySize = localRegionApps.get().size();
              // 2. 成功则更新最后成功更新时间戳
              lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
          }
      
          if (logger.isDebugEnabled()) {
              StringBuilder allAppsHashCodes = new StringBuilder();
              allAppsHashCodes.append("Local region apps hashcode: ");
              allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
              allAppsHashCodes.append(", is fetching remote regions? ");
              allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
              for (Map.Entry entry : remoteRegionVsApps.entrySet()) {
                  allAppsHashCodes.append(", Remote region: ");
                  allAppsHashCodes.append(entry.getKey());
                  allAppsHashCodes.append(" , apps hashcode: ");
                  allAppsHashCodes.append(entry.getValue().getAppsHashCode());
              }
              logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                      allAppsHashCodes);
          }
      } catch (Throwable e) {
          logger.error("Cannot fetch registry from server", e);
      }
      }
    4. HeartbeatThread

      private class HeartbeatThread implements Runnable {
      
      public void run() {
          if (renew()) { // 1. 调用 renew 方法发起续约
              // 2. 更新最后成功续约的时间戳
              lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
          }
      }
      }
      boolean renew() {
      EurekaHttpResponse httpResponse;
      try {
          // 2. 向 server 的 apps/{appName}/{id} 接口发起续约请求
          httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
          logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
          // 3. 如果服务端没有找到本服务
          if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
              REREGISTER_COUNTER.increment();
              logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
              long timestamp = instanceInfo.setIsDirtyWithTime();
              // 4. 注册, 上面也讲过这个方法
              boolean success = register();
              if (success) {
                  instanceInfo.unsetIsDirty(timestamp);
              }
              return success;
          }
          return httpResponse.getStatusCode() == Status.OK.getStatusCode();
      } catch (Throwable e) {
          logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
          return false;
      }
      }

2) EurekaClient 服务下线

我们回顾一下 EurekaClientAUtoConfiguration 中的 eurekaClient 方法:
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
        search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
        EurekaClientConfig config) {
    return new CloudEurekaClient(manager, config, this.optionalArgs,
            this.context);
}

在 Java 应用正常关闭时, 会触发这个 shutdown 方法进行下线逻辑, 我们其实也可以手动去调用它手动下线服务:
com.netflix.discovery.DiscoveryClient#shutdown

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        // 1. 将之前提到的两个定时任务取消
        cancelScheduledTasks();

        // 2. 如果开启注册到 server, 则将自己状态改为 DOWN, 发送下线请求 
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
        // 3. 一些监控正常 shutdown
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient");
    }
}