本文采用知识共享 署名-相同方式共享 4.0 国际 许可协议进行许可。
访问 https://creativecommons.org/licenses/by-sa/4.0/ 查看该许可协议。
Eureka Server 启动流程
本文基于上游最新 RELEASE 版, SpringBoot 2.2.5, SpringCloud Hoxton.SR8 撰写
1) Eureka Server 启动流程
通常我们的 Eureka Server 是以 SpringBoot 的 Starter 引入的, 如: spring-cloud-starter-netflix-eureka-server.
那么一个正常的 Starter 依赖, 会遵循 SpringBoot 自动装配的原则, 在 spring.factories 中定义对应的 AutoConfiguration 自动装配类, 那我们就从这里开始找吧~
在 spring-cloud-netflix-eureka-server 依赖中找到 spring.factories 文件, 打开康康:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
文件中只定义了一个自动装配类: EurekaServerAutoConfiguration, 即 EurekaServer 的启动流程关键类
1.1) EurekaServerAutoConfiguration 注解分析
// 1. 标注为配置类, 且关闭 Bean 的代理(提高启动速度)
@Configuration(proxyBeanMethods = false)
// 2. **1.1.2**
@Import(EurekaServerInitializerConfiguration.class)
// 3. 当容器中存在 EurekaServerMarkerConfiguration.Marker 时才加载此配置类, **1.1.1**
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
看完注解信息, 我们将会分三节来讲 EurekaServer 的启动流程:
- 1.2 开启自动装配条件
- 1.3 EurekaServerAutoConfiguration 类分析
- 1.4 自动装配结束后续操作 @Import(EurekaServerInitializerConfiguration.class)
1.2) Eureka Server 开启自动装配条件
EurekaServerAutoConfiguration 中定义了一个 @ConditionalOnBean
, 自动装配的前提是容器中存在一个 Marker
对象, 哪儿来的呢? @EnableEurekaServer
注解中 Import
的:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
所以我们正常在启动类上加的 @EnableEurekaServer
即开启自动装配的前提.
1.3) EurekaServerAutoConfiguration 自动装配类分析
本小节会讲一些自动装配类中重要或者可扩展的方法点:
-
EurekaServer 仪表盘
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#eurekaController
@Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); }
@RequestMapping("${eureka.dashboard.path:/}") public class EurekaController {
- EurekaController 是 Server 仪表盘的实现 Controller, 可以通过配置
eureka.dashboard=false
关闭 - 可以通过配置
eureka.dashboard.path
修改仪表盘的基路由
- EurekaController 是 Server 仪表盘的实现 Controller, 可以通过配置
-
PeerAwareInstanceRegistry 对等服务实例注册表
用于获取其他 Server 实例的信息, 如用于注册表复制
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#peerAwareInstanceRegistry
@Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); }
-
PeerEurekaNodes 对等节点
用于封装, 操作对等节点相关信息
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#peerEurekaNodes
@Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, replicationClientAdditionalFilters); }
其中的
start
方法可用于启动一个, 用于同步对等节点的 Task, 这里用到了一个单例线程池, 可以配置peer-eureka-nodes-update-interval-ms=600000
来修改同步对等节点间隔时间
com.netflix.eureka.cluster.PeerEurekaNodes#start
public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } }
-
EurekaServerContext EurekaServer 上下文
封装了 EurekaServer 的包括前面提到的各个组件, 默认使用DefaultEurekaServerContext
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#eurekaServerContext
@Bean @ConditionalOnMissingBean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); }
其内有一个
initialize()
方法会在 Spring Bean 创建完成后, 调用上小节中的定时任务start
方法同步对等节点的信息. -
EurekaServerBootstrap EurekaServer 引导程序
也封装了各个组件, 在 1.4 节中会使用到它进行一些结尾初始化配置
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap
@Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); }
-
jerseyFilterRegistration/jerseyApplication Jersey 相关
EurekaServer 使用 Jersey 代替 SpringMVC 发布 Restful 服务@Bean public FilterRegistrationBean<?> jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; } /** * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources * required by the Eureka server. * @param environment an {@link Environment} instance to retrieve classpath resources * @param resourceLoader a {@link ResourceLoader} instance to get classloader from * @return created {@link Application} object */ @Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( false, environment); // Filter to include only classes that have a particular annotation. // provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); // Find classes in Eureka packages (or subpackages) // Set<Class<?>> classes = new HashSet<>(); for (String basePackage : EUREKA_PACKAGES) { Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); for (BeanDefinition bd : beans) { Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } } // Construct the Jersey ResourceConfig Map<String, Object> propsAndFeatures = new HashMap<>(); propsAndFeatures.put( // Skip static content used by the webapp ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*"); DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures); return rc; }
1.4) @Import(EurekaServerInitializerConfiguration.class)
配置类上的 @Import 引入的类, 会在当前配置类加载结束后加载, 我们来康康自动装配加载后做的事儿:
- 注解以及接口实现
org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration
@Configuration(proxyBeanMethods = false) public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
发现这也是一个配置类, 且实现了 SmartLifecycle
接口, 会在 Spring 初始化完成后执行 start
方法, 其他方法不太重要, 直接来看 start
的代码:
org.springframework.cloud.netflix.eureka.server.EurekaServerInitializerConfiguration#start
@Override
public void setServletContext(ServletContext servletContext) {
1. 实现 ServletContextAware 的 setServletContext 方法, **注入了 ServletContext 供于使用**
this.servletContext = servletContext;
}
public void start() {
new Thread(() -> {
try {
// 2. 调用 eurekaServerBootstrap.contextInitialized 初始化完成方法
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
// 3. 发布注册表可用事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
// 4. 广播 EurekaServer 启动完成事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
可以看到 3 和 4 步, 发布了两个事件, 在 EurekaServer 中其实并没有针对这两个事件做捕获, 所以是两个扩展点, 可以用 Spring 的监听器捕获这两个事件扩展业务.
-
我们看看第 2 步中的
contextInitialized
方法, 将servletContext
传入, 源码:
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized
public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); // 1. 初始化环境参数, 不重要不深入了 initEurekaServerContext(); // 2. 初始化上下文 context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
-
我们接着来看看重点方法
initEurekaServerContext()
初始化:
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#initEurekaServerContext
protected PeerAwareInstanceRegistry registry;
protected void initEurekaServerContext() throws Exception {
// 1. 转换器兼容性参数配置
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
// AWS 云主机相关配置
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
// 注入 serverContext
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// 2. 从其他节点复制注册表
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// 监控统计登记
EurekaMonitors.registerAllStats();
}
- 同步注册表
syncUp
:
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
@Override public int syncUp() { int count = 0; // 1. 通过 for 循环重试获取注册表, 配置文件中 registry-sync-retries=0 控制重试次数 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { // 重试 try { // 配置文件中 registry-sync-retry-wait-ms 控制重试间隔 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 2. 调用 Client 获取配置节点的注册表, 注册到本节点中 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { // 3. 尝试复制注册表, 注册到本节点中 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; # 返回 Server 节点数量 }
复制注册表的方法 register
代码量有点多, 这里不深入了, 简单了解以下注册表的数据结构:
com.netflix.eureka.registry.AbstractInstanceRegistry#register
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
openForTraffic
开启同步阀门方法, 其有两个实现(单/多实例), 这里只看多示例的实现:
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
// 需要发送心跳的客户端数量
protected volatile int expectedNumberOfClientsSendingRenews;
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// 1. 自我保护阈值计算
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 2. 实例状态更改为 UP
super.postInit(); // 3. 定时任务剔除服务
}
Eureka Server 的启动流程结束.