Eureka Client 源码
本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。
本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写, 发现 EurekaClient 的源码变动幅度还是有点大的.
1) Eureka Client 启动流程
通常我们的 Eureka Client 是以 SpringBoot 的 Starter 引入的, 如: spring-cloud-starter-netflix-eureka-client.
那么一个正常的 Starter 依赖, 会遵循 SpringBoot 自动装配的原则, 在 spring.factories 中定义对应的 AutoConfiguration 自动装配类, 那我们就从这里开始找吧~
在 spring-cloud-netflix-eureka-client 依赖中找到 spring.factories 文件, 打开康康:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration
我几乎把他们全看了一遍, 核心自动装配类为: EurekaClientAutoConfiguration, 我们只关注他就好了.
1.1) EurekaClientAutoConfiguration 注解分析
@Configuration(proxyBeanMethods = false) // 1. 配置类, 关闭 Bean 代理
@EnableConfigurationProperties // 2. 注入 SpringBoot 配置
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) // 3. 默认开启自动装配
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
"org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", // 额外的可选参数自动配置
"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", // 动态刷新 Config 相关
// 内部定义了个刷新监听器有兴趣可以看看 -> EurekaClientConfigurationRefresher
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
// 自动服务注册配置(以前用来更新上报服务状态, 好似已弃用)
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
我们接着往下看, 看一下重点的方法:
1.2) EurekaClientAutoConfiguration 源码分析
-
eurekaClient: 初始化 Eureka Client 实例
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient
@Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }
往构造方法中走走看:
org.springframework.cloud.netflix.eureka.CloudEurekaClient#CloudEurekaClient
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs> args, ApplicationEventPublisher publisher) { super(applicationInfoManager, config, args); this.applicationInfoManager = applicationInfoManager; // 注册所需信息类注入 this.publisher = publisher; this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport"); ReflectionUtils.makeAccessible(this.eurekaTransportField); }
注入了一下注册所需的信息, 进入父类构造康康:
com.netflix.discovery.DiscoveryClient#DiscoveryClientpublic DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) { this(applicationInfoManager, config, args, ResolverUtils::randomize); } public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) { // 1. 这边继续掉了一个重载方法 this(applicationInfoManager, config, args, new Provider
() { // 这里实际上实现了一个回调, 当访问不到默认 server 的时候, 回调到这个备份节点 // **需要配置** private volatile BackupRegistry backupRegistryInstance; @Override public synchronized BackupRegistry get() { if (backupRegistryInstance == null) { String backupRegistryClassName = config.getBackupRegistryImpl(); if (null != backupRegistryClassName) { try { backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance(); logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass()); } catch (InstantiationException e) { logger.error("Error instantiating BackupRegistry.", e); } catch (IllegalAccessException e) { logger.error("Error instantiating BackupRegistry.", e); } catch (ClassNotFoundException e) { logger.error("Error instantiating BackupRegistry.", e); } } if (backupRegistryInstance == null) { logger.warn("Using default backup registry implementation which does not do anything."); backupRegistryInstance = new NotImplementedRegistryImpl(); } } return backupRegistryInstance; } }, randomizer); } @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // 2. 校验和引用, 略过 if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference (clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); // 3. 如果同时关闭了 register-with-eureka 和 fetch-registry, return 掉 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); return; // no need to setup up an network tasks and we are done } // 4. 一些线程池创建 try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 5. 若开启了 fetch-registry 执行 if (clientConfig.shouldFetchRegistry()) { try { // 6. **重点方法 fetchRegistry**, 拉取 server 注册表信息 boolean primaryFetchRegistryResult = fetchRegistry(false); if (!primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed"); } boolean backupFetchRegistryResult = true; if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { backupFetchRegistryResult = false; logger.info("Initial registry fetch from backup servers failed"); } if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); } } catch (Throwable th) { logger.error("Fetch registry error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // 6. 若开启了 register-with-eureka 执行 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { // 7. **重点方法 register**, 向 server 发起注册请 throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // 8. **重点方法 initScheduledTasks** 初始化各调度任务 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); } 上面提到几个重点方法, fetchRegistry 和 register, 还有 initScheduledTasks, 我们来康康他们:
-
fetchRegistry 拉取注册表
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 1. 获取缓存中数据, 第一次进来为空 Applications applications = getApplications(); // 2. 判断是否需要拉取全量数据, 覆盖缓存 if (clientConfig.shouldDisableDelta() // 开启 disable-delta // 开启 registry-refresh-single-vip-address || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch // false || (applications == null) // 第一次进方法时, 需要初始化 || (applications.getRegisteredApplications().size() == 0) // 缓存为空 || (applications.getVersion() == -1)) // 好似已经被弃用 { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); // 3. 获取全量数据 } else { // 3. 获取增量数据 getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
先来看看获取增量的代码:
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
```java private void getAndUpdateDelta(Applications applications) throws Throwable { // 1. 获取当前注册表版本号 long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; // 2. 请求 server 的 apps/delta 获取增量数据(jerseyClient 实现) EurekaHttpResponsehttpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } // 3. 下面更新的代码就不讲了, 充分利用了 JUC if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } } 再来看看获取全量的代码:
private void getAndStoreFullRegistry() throws Throwable { // 1. 获取当前注册表版本号 long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; // 2. 判断是否配置了 registry-refresh-single-vip-address, 只对 VIP 感兴趣 // 未配置则请求 server 的 apps/ 接口获取全量注册表 EurekaHttpResponse
httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } 拉取服务的代码看完啦, 接着看看 register 注册服务
- register 注册服务
com.netflix.discovery.DiscoveryClient#register
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse
httpResponse; try { // 1. 请求 server 的 apps/{appname} 注册服务 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); } -
initScheduledTasks 初始化调度任务
private void initScheduledTasks() { // 1. 若开启了 fetch-registry 执行此定时任务 if (clientConfig.shouldFetchRegistry()) { // 2. 获取 registry-fetch-intereval-seconds 配置 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, // 3. 创建一个定时任务, 执行 **CacheRefreshThread** 业务 new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, // 4. 使用上面取得的配置时间间隔执行 registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 5. 若开启了 register-with-eureka 执行此定时任务 if (clientConfig.shouldRegisterWithEureka()) { // 6. 获取 lease-renewal-interval-in-seconds 配置 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, // 7. 创建定时任务, 执行 **HeartbeatThread** 任务 new HeartbeatThread() ); scheduler.schedule( heartbeatTask, / 8. 使用上面取得的心跳间隔执行 renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
上面提到了两个任务, 分别是 CacheRefreshThread 和 HeartbeatThread, 我们看看具体的执行代码:
-
CacheRefreshThread
class CacheRefreshThread implements Runnable { public void run() { // 掉了外部类的方法 refreshRegistry(); } } @VisibleForTesting void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } // 1. **定时执行 fetchRegistry**, 之前讲过这个方法!! boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); // 2. 成功则更新最后成功更新时间戳 lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry
entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } } -
HeartbeatThread
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { // 1. 调用 renew 方法发起续约 // 2. 更新最后成功续约的时间戳 lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse
httpResponse; try { // 2. 向 server 的 apps/{appName}/{id} 接口发起续约请求 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); // 3. 如果服务端没有找到本服务 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); // 4. 注册, 上面也讲过这个方法 boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
-
2) EurekaClient 服务下线
我们回顾一下 EurekaClientAUtoConfiguration 中的 eurekaClient 方法:
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
在 Java 应用正常关闭时, 会触发这个 shutdown 方法进行下线逻辑, 我们其实也可以手动去调用它手动下线服务:
com.netflix.discovery.DiscoveryClient#shutdown
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 1. 将之前提到的两个定时任务取消
cancelScheduledTasks();
// 2. 如果开启注册到 server, 则将自己状态改为 DOWN, 发送下线请求
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
// 3. 一些监控正常 shutdown
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}